Deeplearning/Qtorch/Models/Qnn.py

159 lines
5.1 KiB
Python

import torch
import torch.nn as nn
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix
from torch.utils.data import DataLoader, TensorDataset
# from Qfunctions.divSet import divSet as ds
# from Qfunctions.saveToxlsx import save_to_xlsx as stx
class Qnn(nn.Module):
def __init__(self):
super(Qnn, self).__init__()
self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.X_train, self.y_train, self.X_test, self.y_test = None, None, None, None
self.labels = None
self.LABEL_ENCODER = None
self.epoch_data = {
'epoch': [],
'train_loss': [],
'train_accuracy': [],
'test_accuracy': []
}
self.pca_2d, self.pca_3d = None, None
def __prepare_data(self):
# 将data转换为tensor形式
X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32).unsqueeze(1)
self.y_train = self.LABEL_ENCODER.fit_transform(self.y_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32).unsqueeze(1)
self.y_test = self.LABEL_ENCODER.transform(self.y_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
return train_loader, test_loader
def __train_model(self, train_loader, test_loader, epochs_times=100):
model = self.to(self.DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
best_test_accuracy = 0
patience = 100
counter = 0
accuracy_threshold = 0.99 # 99% 的准确率阈值
for epoch in range(epochs_times):
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_train += labels.size(0)
correct_train += (predicted == labels).sum().item()
train_accuracy = correct_train / total_train
train_loss = running_loss / len(train_loader)
model.eval()
correct_test = 0
total_test = 0
all_labels = []
all_predicted = []
all_prob = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
outputs = model(inputs)
prob = torch.nn.functional.softmax(outputs, dim=1)
_, predicted = torch.max(outputs.data, 1)
total_test += labels.size(0)
correct_test += (predicted == labels).sum().item()
all_labels.extend(labels.cpu().numpy())
all_predicted.extend(predicted.cpu().numpy())
all_prob.extend(prob.cpu().numpy())
test_accuracy = correct_test / total_test
print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%')
self.epoch_data['epoch'].append(epoch+1)
self.epoch_data['train_loss'].append(train_loss)
self.epoch_data['train_accuracy'].append(train_accuracy)
self.epoch_data['test_accuracy'].append(test_accuracy)
scheduler.step(train_loss)
if test_accuracy > best_test_accuracy:
best_test_accuracy = test_accuracy
counter = 0
else:
counter += 1
if counter >= patience and best_test_accuracy >= accuracy_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
print(self.cm)
return
def fit(self, epoch_times = 100):
train_loader, test_loader = self.__prepare_data()
self.__train_model(train_loader, test_loader, epochs_times=epoch_times)
return
def get_PCA(self):
# PCA 2D 图像
pca_2d = PCA(n_components=2) # 保留两个主成分
principalComponents = pca_2d.fit_transform(self.X_train)
df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2'])
df_pca2d['labels'] = self.y_train
# PCA 3D 图像
pca_3d = PCA(n_components=3)
principalComponents = pca_3d.fit_transform(self.X_train)
df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3'])
df_pca3d['labels'] = self.y_train
return df_pca2d, df_pca3d
def get_cm(self):
return pd.DataFrame(self.cm, columns=self.labels, index=self.labels)
def get_epoch_data(self):
return pd.DataFrame(self.epoch_data)