234 lines
8.0 KiB
Python
234 lines
8.0 KiB
Python
import torch
|
||
import torch.nn as nn
|
||
import numpy as np
|
||
import pandas as pd
|
||
from sklearn.decomposition import PCA
|
||
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
|
||
from torch.utils.data import DataLoader, TensorDataset
|
||
|
||
from Qfunctions.divSet import divSet as DS
|
||
|
||
|
||
class Qnn(nn.Module):
|
||
def __init__(
|
||
self,
|
||
data,
|
||
labels,
|
||
test_size=0.2,
|
||
random_state=None,
|
||
batch_size=64,
|
||
learning_rate=0.00001,
|
||
weight_decay=1e-5,
|
||
lr_scheduler_patience=10,
|
||
early_stop_patience=100,
|
||
early_stop_threshold=0.99,
|
||
):
|
||
|
||
super(Qnn, self).__init__()
|
||
|
||
# 使用gpu进行加速, 没有gpu的话使用CPU
|
||
self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
|
||
# 训练配置,子类共享
|
||
self.batch_size = batch_size
|
||
self.learning_rate = learning_rate
|
||
self.weight_decay = weight_decay
|
||
self.lr_scheduler_patience = lr_scheduler_patience
|
||
self.early_stop_patience = early_stop_patience
|
||
self.early_stop_threshold = early_stop_threshold
|
||
|
||
# 划分测试集和训练集
|
||
self.X_train, self.X_test, self.y_train, self.y_test, self.LABEL_ENCODER = DS(
|
||
data=data, labels=labels, test_size=test_size, random_state=random_state
|
||
)
|
||
|
||
self.labels = labels
|
||
self.num_classes = len(labels) if labels is not None else int(np.max(self.y_train)) + 1
|
||
|
||
# 网络状态
|
||
self._model_built = False
|
||
|
||
# 存储过程数据
|
||
self.epoch_data = self._new_epoch_data()
|
||
|
||
# PCA 图片数据存储
|
||
self.pca_2d, self.pca_3d = None, None
|
||
|
||
self.cm, self.cmn = None, None
|
||
|
||
def _new_epoch_data(self):
|
||
return {
|
||
'epoch': [],
|
||
'train_loss': [],
|
||
'train_accuracy': [],
|
||
'test_accuracy': [],
|
||
'precision': [],
|
||
'recall': [],
|
||
'f1_score': []
|
||
}
|
||
|
||
def build_model(self, input_shape, num_classes):
|
||
# 子类必须实现具体网络结构
|
||
raise NotImplementedError("Subclasses must implement build_model(input_shape, num_classes)")
|
||
|
||
def _transform_features(self, features):
|
||
# 默认输入格式: [batch, feature_dim]
|
||
return torch.tensor(features, dtype=torch.float32)
|
||
|
||
def _prepare_data(self):
|
||
|
||
# 将data转换为tensor形式(子类可覆写 _transform_features)
|
||
X_train_tensor = self._transform_features(self.X_train)
|
||
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
|
||
|
||
X_test_tensor = self._transform_features(self.X_test)
|
||
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
|
||
|
||
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
|
||
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
|
||
|
||
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
|
||
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
|
||
|
||
return train_loader, test_loader
|
||
|
||
def _train_model(self, train_loader, test_loader, epochs_times=100):
|
||
|
||
model = self.to(self.DEVICE)
|
||
|
||
criterion = nn.CrossEntropyLoss()
|
||
optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)
|
||
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
|
||
optimizer,
|
||
mode='min',
|
||
factor=0.1,
|
||
patience=self.lr_scheduler_patience,
|
||
)
|
||
best_test_accuracy = 0
|
||
counter = 0
|
||
|
||
for epoch in range(epochs_times):
|
||
|
||
model.train()
|
||
running_loss = 0.0
|
||
correct_train = 0
|
||
total_train = 0
|
||
|
||
for inputs, labels in train_loader:
|
||
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
|
||
|
||
optimizer.zero_grad()
|
||
outputs = model(inputs)
|
||
loss = criterion(outputs, labels)
|
||
loss.backward()
|
||
optimizer.step()
|
||
|
||
running_loss += loss.item()
|
||
_, predicted = torch.max(outputs.data, 1)
|
||
total_train += labels.size(0)
|
||
correct_train += (predicted == labels).sum().item()
|
||
train_accuracy = correct_train / total_train
|
||
train_loss = running_loss / len(train_loader)
|
||
|
||
model.eval()
|
||
correct_test = 0
|
||
total_test = 0
|
||
all_labels = []
|
||
all_predicted = []
|
||
all_prob = []
|
||
with torch.no_grad():
|
||
for inputs, labels in test_loader:
|
||
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
|
||
outputs = model(inputs)
|
||
prob = torch.nn.functional.softmax(outputs, dim=1)
|
||
_, predicted = torch.max(outputs.data, 1)
|
||
total_test += labels.size(0)
|
||
correct_test += (predicted == labels).sum().item()
|
||
all_labels.extend(labels.cpu().numpy())
|
||
all_predicted.extend(predicted.cpu().numpy())
|
||
all_prob.extend(prob.cpu().numpy())
|
||
|
||
test_accuracy = correct_test / total_test
|
||
f1 = f1_score(all_labels, all_predicted, average='macro', zero_division=0)
|
||
precision = precision_score(all_labels, all_predicted, average='macro', zero_division=0)
|
||
recall = recall_score(all_labels, all_predicted, average='macro', zero_division=0)
|
||
|
||
if (epoch + 1) % 10 == 0:
|
||
print('===============================================')
|
||
print(f'Epoch [{epoch + 1} / {epochs_times}]:')
|
||
print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%, Loss: {train_loss:.4f}')
|
||
print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score:{f1:.4f}, ')
|
||
print('===============================================')
|
||
|
||
self.epoch_data['epoch'].append(epoch+1)
|
||
self.epoch_data['train_loss'].append(train_loss)
|
||
self.epoch_data['train_accuracy'].append(train_accuracy)
|
||
self.epoch_data['test_accuracy'].append(test_accuracy)
|
||
self.epoch_data['precision'].append(precision)
|
||
self.epoch_data['recall'].append(recall)
|
||
self.epoch_data['f1_score'].append(f1)
|
||
|
||
scheduler.step(train_loss)
|
||
|
||
if test_accuracy > best_test_accuracy:
|
||
best_test_accuracy = test_accuracy
|
||
counter = 0
|
||
else:
|
||
counter += 1
|
||
|
||
if counter >= self.early_stop_patience and best_test_accuracy >= self.early_stop_threshold:
|
||
print(f"Early stopping at epoch {epoch+1}")
|
||
break
|
||
|
||
# cmn为归一化矩阵
|
||
# Keep matrix dimensions stable even when some classes do not appear in this split.
|
||
cm_labels = np.arange(len(self.labels)) if self.labels is not None else None
|
||
self.cm = confusion_matrix(all_labels, all_predicted, labels=cm_labels)
|
||
self.cmn = confusion_matrix(all_labels, all_predicted, labels=cm_labels, normalize='true')
|
||
|
||
print(self.cm)
|
||
return
|
||
|
||
def fit(self, epoch_times = 100):
|
||
if not self._model_built:
|
||
self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes)
|
||
self._model_built = True
|
||
|
||
# 每次训练前清空过程指标,避免重复累计
|
||
self.epoch_data = self._new_epoch_data()
|
||
|
||
train_loader, test_loader = self._prepare_data()
|
||
self._train_model(train_loader, test_loader, epochs_times=epoch_times)
|
||
return
|
||
|
||
# 外部获取PCA图像数据的接口
|
||
def get_PCA(self):
|
||
|
||
# PCA 2D 图像
|
||
pca_2d = PCA(n_components=2) # 保留两个主成分
|
||
principalComponents = pca_2d.fit_transform(self.X_train)
|
||
df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2'])
|
||
df_pca2d['labels'] = self.y_train
|
||
|
||
# PCA 3D 图像
|
||
pca_3d = PCA(n_components=3) # 保留三个主成分
|
||
principalComponents = pca_3d.fit_transform(self.X_train)
|
||
df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3'])
|
||
df_pca3d['labels'] = self.y_train
|
||
|
||
return df_pca2d, df_pca3d
|
||
|
||
# 外部获取混淆矩阵的接口
|
||
def get_cm(self):
|
||
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
|
||
return pd.DataFrame(self.cm, columns=label_names, index=label_names)
|
||
|
||
def get_cmn(self):
|
||
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
|
||
return pd.DataFrame(self.cmn, columns=label_names, index=label_names)
|
||
|
||
# 外部获取迭代数据的接口
|
||
def get_epoch_data(self):
|
||
return pd.DataFrame(self.epoch_data)
|
||
|