114 lines
4.4 KiB
Python
114 lines
4.4 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
from tqdm import tqdm
|
|
from .QSVM import QSVM
|
|
from sklearn.metrics import confusion_matrix
|
|
|
|
class Qsvm_brf(QSVM):
|
|
def __init__(self, data, labels=None, test_size=0.2, random_state=None,
|
|
gamma=1.0, C=100, batch_size=64, learning_rate=0.01):
|
|
super(Qsvm_brf, self).__init__(data, labels, test_size, random_state)
|
|
self.to(self.device)
|
|
self.gamma = gamma
|
|
self.C = C
|
|
self.n_features = data.shape[1] - 1
|
|
self.support_vectors = torch.cat([batch[0] for batch in self.train_loader]).to(self.device)
|
|
self.alpha = nn.Parameter(torch.zeros(self.support_vectors.shape[0])).to(self.device)
|
|
self.b = nn.Parameter(torch.zeros(1)).to(self.device)
|
|
self.batch_size = batch_size
|
|
self.learning_rate = learning_rate
|
|
print(self.b, self.alpha)
|
|
print(list(self.parameters()))
|
|
|
|
def train_model(self, epochs):
|
|
self.to(self.device)
|
|
self.optimizer = optim.SGD(self.parameters(), lr=self.learning_rate)
|
|
|
|
for epoch in range(epochs):
|
|
self.train()
|
|
total_loss = 0
|
|
correct = 0
|
|
total = 0
|
|
|
|
progress_bar = tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{epochs}')
|
|
for batch_X, batch_y in progress_bar:
|
|
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
|
|
|
|
self.optimizer.zero_grad()
|
|
|
|
outputs = self(batch_X)
|
|
loss = self.hinge_loss(outputs, batch_y) + self.C * self.regularization()
|
|
|
|
loss.backward()
|
|
self.optimizer.step()
|
|
|
|
total_loss += loss.item()
|
|
predicted = torch.sign(outputs)
|
|
correct += (predicted == batch_y).sum().item()
|
|
total += batch_y.size(0)
|
|
|
|
progress_bar.set_postfix({
|
|
'Loss': total_loss / (progress_bar.n + 1),
|
|
'Acc': 100. * correct / total
|
|
})
|
|
|
|
train_accuracy = correct / total
|
|
test_accuracy = self.evaluate()
|
|
|
|
self.result['acc_and_loss']['epoch'].append(epoch + 1)
|
|
self.result['acc_and_loss']['loss'].append(total_loss / len(self.train_loader))
|
|
self.result['acc_and_loss']['train_accuracy'].append(train_accuracy)
|
|
self.result['acc_and_loss']['test_accuracy'].append(test_accuracy)
|
|
|
|
print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(self.train_loader):.4f}, '
|
|
f'Train Acc: {train_accuracy:.4f}, Test Acc: {test_accuracy:.4f}')
|
|
|
|
# 计算最终的混淆矩阵
|
|
self.result['confusion_matrix'] = self.compute_confusion_matrix()
|
|
|
|
def compute_confusion_matrix(self):
|
|
self.eval()
|
|
all_predictions = []
|
|
all_labels = []
|
|
|
|
with torch.no_grad():
|
|
for batch_X, batch_y in self.test_loader:
|
|
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
|
|
outputs = self(batch_X)
|
|
predicted = torch.sign(outputs)
|
|
|
|
all_predictions.extend(predicted.cpu().numpy())
|
|
all_labels.extend(batch_y.cpu().numpy())
|
|
|
|
return confusion_matrix(all_labels, all_predictions)
|
|
|
|
def evaluate(self):
|
|
self.eval()
|
|
correct = 0
|
|
total = 0
|
|
with torch.no_grad():
|
|
for batch_X, batch_y in self.test_loader:
|
|
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
|
|
outputs = self(batch_X)
|
|
predicted = torch.sign(outputs)
|
|
correct += (predicted == batch_y).sum().item()
|
|
total += batch_y.size(0)
|
|
return correct / total
|
|
|
|
def rbf_kernel(self, X, Y):
|
|
X_norm = (X**2).sum(1).view(-1, 1)
|
|
Y_norm = (Y**2).sum(1).view(1, -1)
|
|
dist = X_norm + Y_norm - 2.0 * torch.mm(X, Y.t())
|
|
return torch.exp(-self.gamma * dist)
|
|
|
|
def forward(self, X):
|
|
X = X.to(self.device)
|
|
K = self.rbf_kernel(X, self.support_vectors)
|
|
return torch.mm(K, self.alpha.unsqueeze(1)).squeeze() + self.b
|
|
|
|
def hinge_loss(self, outputs, targets):
|
|
return torch.mean(torch.clamp(1 - outputs * targets, min=0))
|
|
|
|
def regularization(self):
|
|
return 0.5 * (self.alpha ** 2).sum() |