import torch import torch.nn as nn import torch.optim as optim from tqdm import tqdm from .QSVM import QSVM from sklearn.metrics import confusion_matrix class Qsvm_brf(QSVM): def __init__(self, data, labels=None, test_size=0.2, random_state=None, gamma=1.0, C=100, batch_size=64, learning_rate=0.01): super(Qsvm_brf, self).__init__(data, labels, test_size, random_state) self.to(self.device) self.gamma = gamma self.C = C self.n_features = data.shape[1] - 1 self.support_vectors = torch.cat([batch[0] for batch in self.train_loader]).to(self.device) self.alpha = nn.Parameter(torch.zeros(self.support_vectors.shape[0])).to(self.device) self.b = nn.Parameter(torch.zeros(1)).to(self.device) self.batch_size = batch_size self.learning_rate = learning_rate print(self.b, self.alpha) print(list(self.parameters())) def train_model(self, epochs): self.to(self.device) self.optimizer = optim.SGD(self.parameters(), lr=self.learning_rate) for epoch in range(epochs): self.train() total_loss = 0 correct = 0 total = 0 progress_bar = tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{epochs}') for batch_X, batch_y in progress_bar: batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) self.optimizer.zero_grad() outputs = self(batch_X) loss = self.hinge_loss(outputs, batch_y) + self.C * self.regularization() loss.backward() self.optimizer.step() total_loss += loss.item() predicted = torch.sign(outputs) correct += (predicted == batch_y).sum().item() total += batch_y.size(0) progress_bar.set_postfix({ 'Loss': total_loss / (progress_bar.n + 1), 'Acc': 100. * correct / total }) train_accuracy = correct / total test_accuracy = self.evaluate() self.result['acc_and_loss']['epoch'].append(epoch + 1) self.result['acc_and_loss']['loss'].append(total_loss / len(self.train_loader)) self.result['acc_and_loss']['train_accuracy'].append(train_accuracy) self.result['acc_and_loss']['test_accuracy'].append(test_accuracy) print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(self.train_loader):.4f}, ' f'Train Acc: {train_accuracy:.4f}, Test Acc: {test_accuracy:.4f}') # 计算最终的混淆矩阵 self.result['confusion_matrix'] = self.compute_confusion_matrix() def compute_confusion_matrix(self): self.eval() all_predictions = [] all_labels = [] with torch.no_grad(): for batch_X, batch_y in self.test_loader: batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) outputs = self(batch_X) predicted = torch.sign(outputs) all_predictions.extend(predicted.cpu().numpy()) all_labels.extend(batch_y.cpu().numpy()) return confusion_matrix(all_labels, all_predictions) def evaluate(self): self.eval() correct = 0 total = 0 with torch.no_grad(): for batch_X, batch_y in self.test_loader: batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) outputs = self(batch_X) predicted = torch.sign(outputs) correct += (predicted == batch_y).sum().item() total += batch_y.size(0) return correct / total def rbf_kernel(self, X, Y): X_norm = (X**2).sum(1).view(-1, 1) Y_norm = (Y**2).sum(1).view(1, -1) dist = X_norm + Y_norm - 2.0 * torch.mm(X, Y.t()) return torch.exp(-self.gamma * dist) def forward(self, X): X = X.to(self.device) K = self.rbf_kernel(X, self.support_vectors) return torch.mm(K, self.alpha.unsqueeze(1)).squeeze() + self.b def hinge_loss(self, outputs, targets): return torch.mean(torch.clamp(1 - outputs * targets, min=0)) def regularization(self): return 0.5 * (self.alpha ** 2).sum()