import torch import torch.nn as nn import numpy as np import pandas as pd from sklearn.decomposition import PCA from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score from torch.utils.data import DataLoader, TensorDataset from Qfunctions.divSet import divSet as DS class Qnn(nn.Module): def __init__( self, data, labels, test_size=0.2, random_state=None, batch_size=64, learning_rate=0.00001, weight_decay=1e-5, lr_scheduler_patience=10, early_stop_patience=100, early_stop_threshold=0.99, ): super(Qnn, self).__init__() # 使用gpu进行加速, 没有gpu的话使用CPU self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 训练配置,子类共享 self.batch_size = batch_size self.learning_rate = learning_rate self.weight_decay = weight_decay self.lr_scheduler_patience = lr_scheduler_patience self.early_stop_patience = early_stop_patience self.early_stop_threshold = early_stop_threshold # 划分测试集和训练集 self.X_train, self.X_test, self.y_train, self.y_test, self.LABEL_ENCODER = DS( data=data, labels=labels, test_size=test_size, random_state=random_state ) self.labels = labels self.num_classes = len(labels) if labels is not None else int(np.max(self.y_train)) + 1 # 网络状态 self._model_built = False # 存储过程数据 self.epoch_data = self._new_epoch_data() # PCA 图片数据存储 self.pca_2d, self.pca_3d = None, None self.cm, self.cmn = None, None def _new_epoch_data(self): return { 'epoch': [], 'train_loss': [], 'train_accuracy': [], 'test_accuracy': [], 'precision': [], 'recall': [], 'f1_score': [] } def build_model(self, input_shape, num_classes): # 子类必须实现具体网络结构 raise NotImplementedError("Subclasses must implement build_model(input_shape, num_classes)") def _transform_features(self, features): # 默认输入格式: [batch, feature_dim] return torch.tensor(features, dtype=torch.float32) def _prepare_data(self): # 将data转换为tensor形式(子类可覆写 _transform_features) X_train_tensor = self._transform_features(self.X_train) y_train_tensor = torch.tensor(self.y_train, dtype=torch.long) X_test_tensor = self._transform_features(self.X_test) y_test_tensor = torch.tensor(self.y_test, dtype=torch.long) train_dataset = TensorDataset(X_train_tensor, y_train_tensor) test_dataset = TensorDataset(X_test_tensor, y_test_tensor) train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) return train_loader, test_loader def _train_model(self, train_loader, test_loader, epochs_times=100): model = self.to(self.DEVICE) criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode='min', factor=0.1, patience=self.lr_scheduler_patience, ) best_test_accuracy = 0 counter = 0 for epoch in range(epochs_times): model.train() running_loss = 0.0 correct_train = 0 total_train = 0 for inputs, labels in train_loader: inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() _, predicted = torch.max(outputs.data, 1) total_train += labels.size(0) correct_train += (predicted == labels).sum().item() train_accuracy = correct_train / total_train train_loss = running_loss / len(train_loader) model.eval() correct_test = 0 total_test = 0 all_labels = [] all_predicted = [] all_prob = [] with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE) outputs = model(inputs) prob = torch.nn.functional.softmax(outputs, dim=1) _, predicted = torch.max(outputs.data, 1) total_test += labels.size(0) correct_test += (predicted == labels).sum().item() all_labels.extend(labels.cpu().numpy()) all_predicted.extend(predicted.cpu().numpy()) all_prob.extend(prob.cpu().numpy()) test_accuracy = correct_test / total_test f1 = f1_score(all_labels, all_predicted, average='macro', zero_division=0) precision = precision_score(all_labels, all_predicted, average='macro', zero_division=0) recall = recall_score(all_labels, all_predicted, average='macro', zero_division=0) if (epoch + 1) % 10 == 0: print('===============================================') print(f'Epoch [{epoch + 1} / {epochs_times}]:') print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%, Loss: {train_loss:.4f}') print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score:{f1:.4f}, ') print('===============================================') self.epoch_data['epoch'].append(epoch+1) self.epoch_data['train_loss'].append(train_loss) self.epoch_data['train_accuracy'].append(train_accuracy) self.epoch_data['test_accuracy'].append(test_accuracy) self.epoch_data['precision'].append(precision) self.epoch_data['recall'].append(recall) self.epoch_data['f1_score'].append(f1) scheduler.step(train_loss) if test_accuracy > best_test_accuracy: best_test_accuracy = test_accuracy counter = 0 else: counter += 1 if counter >= self.early_stop_patience and best_test_accuracy >= self.early_stop_threshold: print(f"Early stopping at epoch {epoch+1}") break # cmn为归一化矩阵 # Keep matrix dimensions stable even when some classes do not appear in this split. cm_labels = np.arange(len(self.labels)) if self.labels is not None else None self.cm = confusion_matrix(all_labels, all_predicted, labels=cm_labels) self.cmn = confusion_matrix(all_labels, all_predicted, labels=cm_labels, normalize='true') print(self.cm) return def fit(self, epoch_times = 100): if not self._model_built: self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes) self._model_built = True # 每次训练前清空过程指标,避免重复累计 self.epoch_data = self._new_epoch_data() train_loader, test_loader = self._prepare_data() self._train_model(train_loader, test_loader, epochs_times=epoch_times) return # 外部获取PCA图像数据的接口 def get_PCA(self): # PCA 2D 图像 pca_2d = PCA(n_components=2) # 保留两个主成分 principalComponents = pca_2d.fit_transform(self.X_train) df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2']) df_pca2d['labels'] = self.y_train # PCA 3D 图像 pca_3d = PCA(n_components=3) # 保留三个主成分 principalComponents = pca_3d.fit_transform(self.X_train) df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3']) df_pca3d['labels'] = self.y_train return df_pca2d, df_pca3d # 外部获取混淆矩阵的接口 def get_cm(self): label_names = self.labels if self.labels is not None else list(range(self.num_classes)) return pd.DataFrame(self.cm, columns=label_names, index=label_names) def get_cmn(self): label_names = self.labels if self.labels is not None else list(range(self.num_classes)) return pd.DataFrame(self.cmn, columns=label_names, index=label_names) # 外部获取迭代数据的接口 def get_epoch_data(self): return pd.DataFrame(self.epoch_data)