import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import LabelEncoder from sklearn.metrics import confusion_matrix import pandas as pd LABEL_ENCODER = LabelEncoder() DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') class Qmlp(nn.Module): epoch_data = { 'epoch': [], 'train_loss': [], 'train_accuracy': [], 'test_accuracy': [] } labels = None def __init__(self, X_train, y_train, X_test, y_test, hidden_layers, labels=None, dropout_rate=0.3 ): super(Qmlp, self).__init__() self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test self.labels = labels input_size = X_train.shape[1] # input_size = 5 print(input_size) num_classes = len(set(y_train)) self.layers = nn.ModuleList() # Input layer to first hidden layer self.layers.append(nn.Linear(input_size, hidden_layers[0])) self.layers.append(nn.BatchNorm1d(hidden_layers[0])) self.layers.append(nn.ReLU()) self.layers.append(nn.Dropout(dropout_rate)) # Create hidden layers for i in range(1, len(hidden_layers)): self.layers.append(nn.Linear(hidden_layers[i-1], hidden_layers[i])) self.layers.append(nn.BatchNorm1d(hidden_layers[i])) self.layers.append(nn.ReLU()) self.layers.append(nn.Dropout(dropout_rate)) # Output layer self.layers.append(nn.Linear(hidden_layers[-1], num_classes)) self.__init_weights() def forward(self, x): for layer in self.layers: x = layer(x) return x def __prepare_data(self): # Step 2: Prepare the data X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32) self.y_train = LABEL_ENCODER.fit_transform(self.y_train) y_train_tensor = torch.tensor(self.y_train, dtype=torch.long) X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32) self.y_test = LABEL_ENCODER.transform(self.y_test) y_test_tensor = torch.tensor(self.y_test, dtype=torch.long) train_dataset = TensorDataset(X_train_tensor, y_train_tensor) test_dataset = TensorDataset(X_test_tensor, y_test_tensor) train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) return train_loader, test_loader def __train_model(self, train_loader, test_loader, epochs_times=100): model = self.to(DEVICE) criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10) best_test_accuracy = 0 patience = 100 counter = 0 accuracy_threshold = 0.99 # 99% 的准确率阈值 for epoch in range(epochs_times): model.train() running_loss = 0.0 correct_train = 0 total_train = 0 for inputs, labels in train_loader: inputs, labels = inputs.to(DEVICE), labels.to(DEVICE) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() _, predicted = torch.max(outputs.data, 1) total_train += labels.size(0) correct_train += (predicted == labels).sum().item() train_accuracy = correct_train / total_train train_loss = running_loss / len(train_loader) model.eval() correct_test = 0 total_test = 0 all_labels = [] all_predicted = [] all_prob = [] with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(DEVICE), labels.to(DEVICE) outputs = model(inputs) prob = torch.nn.functional.softmax(outputs, dim=1) _, predicted = torch.max(outputs.data, 1) total_test += labels.size(0) correct_test += (predicted == labels).sum().item() all_labels.extend(labels.cpu().numpy()) all_predicted.extend(predicted.cpu().numpy()) all_prob.extend(prob.cpu().numpy()) test_accuracy = correct_test / total_test print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%') self.epoch_data['epoch'].append(epoch+1) self.epoch_data['train_loss'].append(train_loss) self.epoch_data['train_accuracy'].append(train_accuracy) self.epoch_data['test_accuracy'].append(test_accuracy) scheduler.step(train_loss) if test_accuracy > best_test_accuracy: best_test_accuracy = test_accuracy counter = 0 else: counter += 1 if counter >= patience and best_test_accuracy >= accuracy_threshold: print(f"Early stopping at epoch {epoch+1}") break if self.labels: # labels_encoded = LABEL_ENCODER.fit(self.labels) self.cm = confusion_matrix(all_labels, all_predicted, normalize='true') else: self.cm = confusion_matrix(all_labels, all_predicted, normalize='true') # self.cm = confusion_matrix(all_labels, all_predicted, normalize='true') print(self.cm) return def get_cm(self): return pd.DataFrame(self.cm, columns=self.labels, index=self.labels) def get_epoch_data(self): return pd.DataFrame(self.epoch_data) def fit(self, epoch_times = 100): train_loader, test_loader = self.__prepare_data() self.__train_model(train_loader, test_loader, epochs_times=epoch_times) return def __init_weights(self): for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) nn.init.zeros_(m.bias)