From 101ba89c342539eaa4dd39878d0406eef6c009fe Mon Sep 17 00:00:00 2001 From: newbie Date: Thu, 28 Nov 2024 19:51:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B2=A1=E7=94=A8=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Qfunctions/saveToxlsx.py | 1 - Qtorch/Models/Qcnn.py | 32 +++++++ Qtorch/Models/Qmlp.py | 126 +--------------------------- Qtorch/Models/Qnn.py | 175 ++++++++++++++++++++++++++++++++------- main.py | 38 +++------ 5 files changed, 190 insertions(+), 182 deletions(-) create mode 100644 Qtorch/Models/Qcnn.py diff --git a/Qfunctions/saveToxlsx.py b/Qfunctions/saveToxlsx.py index efacddb..e656808 100644 --- a/Qfunctions/saveToxlsx.py +++ b/Qfunctions/saveToxlsx.py @@ -4,5 +4,4 @@ def save_to_xlsx(project_name, file_name, data): os.makedirs(f'Result/{project_name}', exist_ok=True) data.to_excel(f'Result/{project_name}/{file_name}.xlsx', index=True) print("Save successed to " + f'Result/{project_name}/{file_name}.xlsx') - return diff --git a/Qtorch/Models/Qcnn.py b/Qtorch/Models/Qcnn.py new file mode 100644 index 0000000..ca5f55f --- /dev/null +++ b/Qtorch/Models/Qcnn.py @@ -0,0 +1,32 @@ +import torch +import torch.nn as nn +import torch.optim as optim + +class Simple1DCNN(nn.Module): + def __init__(self, input_size, num_classes): + super(Simple1DCNN, self).__init__() + self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1) + self.relu = nn.ReLU() + self.pool = nn.MaxPool1d(kernel_size=2, stride=2) + self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1) + self.fc1 = nn.Linear(64 * (input_size // 4), 128) # 假设经过两次池化后,长度减半两次 + self.fc2 = nn.Linear(128, num_classes) + + def forward(self, x): + x = self.pool(self.relu(self.conv1(x))) + x = self.pool(self.relu(self.conv2(x))) + x = x.view(-1, 64 * (self.input_size // 4)) # 展平特征图 + x = self.relu(self.fc1(x)) + x = self.fc2(x) + return x + +# 实例化模型 +input_size = 100 # 假设n=100 +num_classes = 10 # 假设有10个类别 +model = Simple1DCNN(input_size, num_classes) + +# 定义损失函数和优化器 +criterion = nn.CrossEntropyLoss() +optimizer = optim.Adam(model.parameters(), lr=0.001) + +# 训练和评估模型的代码与之前类似,这里不再赘述。 \ No newline at end of file diff --git a/Qtorch/Models/Qmlp.py b/Qtorch/Models/Qmlp.py index 2ada663..1bfccc5 100644 --- a/Qtorch/Models/Qmlp.py +++ b/Qtorch/Models/Qmlp.py @@ -1,23 +1,12 @@ import torch import torch.nn as nn -from torch.utils.data import DataLoader, TensorDataset +from Qtorch.Models.Qnn import Qnn from sklearn.preprocessing import LabelEncoder from sklearn.metrics import confusion_matrix import pandas as pd -LABEL_ENCODER = LabelEncoder() -DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') -class Qmlp(nn.Module): - - epoch_data = { - 'epoch': [], - 'train_loss': [], - 'train_accuracy': [], - 'test_accuracy': [] - } - - labels = None +class Qmlp(Qnn): def __init__(self, X_train, y_train, X_test, y_test, hidden_layers, @@ -26,15 +15,14 @@ class Qmlp(nn.Module): ): super(Qmlp, self).__init__() + self.LABEL_ENCODER = LabelEncoder() + self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test self.labels = labels input_size = X_train.shape[1] - # input_size = 5 - print(input_size) num_classes = len(set(y_train)) - self.layers = nn.ModuleList() # Input layer to first hidden layer @@ -58,112 +46,6 @@ class Qmlp(nn.Module): for layer in self.layers: x = layer(x) return x - - def __prepare_data(self): - # Step 2: Prepare the data - X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32) - self.y_train = LABEL_ENCODER.fit_transform(self.y_train) - y_train_tensor = torch.tensor(self.y_train, dtype=torch.long) - - X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32) - self.y_test = LABEL_ENCODER.transform(self.y_test) - y_test_tensor = torch.tensor(self.y_test, dtype=torch.long) - - train_dataset = TensorDataset(X_train_tensor, y_train_tensor) - test_dataset = TensorDataset(X_test_tensor, y_test_tensor) - - train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) - test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) - - return train_loader, test_loader - - def __train_model(self, train_loader, test_loader, epochs_times=100): - - model = self.to(DEVICE) - - criterion = nn.CrossEntropyLoss() - optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5) - scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10) - best_test_accuracy = 0 - patience = 100 - counter = 0 - accuracy_threshold = 0.99 # 99% 的准确率阈值 - - for epoch in range(epochs_times): - - model.train() - running_loss = 0.0 - correct_train = 0 - total_train = 0 - - for inputs, labels in train_loader: - inputs, labels = inputs.to(DEVICE), labels.to(DEVICE) - - optimizer.zero_grad() - outputs = model(inputs) - loss = criterion(outputs, labels) - loss.backward() - optimizer.step() - - running_loss += loss.item() - _, predicted = torch.max(outputs.data, 1) - total_train += labels.size(0) - correct_train += (predicted == labels).sum().item() - train_accuracy = correct_train / total_train - train_loss = running_loss / len(train_loader) - - model.eval() - correct_test = 0 - total_test = 0 - all_labels = [] - all_predicted = [] - all_prob = [] - with torch.no_grad(): - for inputs, labels in test_loader: - inputs, labels = inputs.to(DEVICE), labels.to(DEVICE) - outputs = model(inputs) - prob = torch.nn.functional.softmax(outputs, dim=1) - _, predicted = torch.max(outputs.data, 1) - total_test += labels.size(0) - correct_test += (predicted == labels).sum().item() - all_labels.extend(labels.cpu().numpy()) - all_predicted.extend(predicted.cpu().numpy()) - all_prob.extend(prob.cpu().numpy()) - - test_accuracy = correct_test / total_test - print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%') - - self.epoch_data['epoch'].append(epoch+1) - self.epoch_data['train_loss'].append(train_loss) - self.epoch_data['train_accuracy'].append(train_accuracy) - self.epoch_data['test_accuracy'].append(test_accuracy) - - scheduler.step(train_loss) - - if test_accuracy > best_test_accuracy: - best_test_accuracy = test_accuracy - counter = 0 - else: - counter += 1 - - if counter >= patience and best_test_accuracy >= accuracy_threshold: - print(f"Early stopping at epoch {epoch+1}") - break - - self.cm = confusion_matrix(all_labels, all_predicted, normalize='true') - print(self.cm) - return - - def get_cm(self): - return pd.DataFrame(self.cm, columns=self.labels, index=self.labels) - - def get_epoch_data(self): - return pd.DataFrame(self.epoch_data) - - def fit(self, epoch_times = 100): - train_loader, test_loader = self.__prepare_data() - self.__train_model(train_loader, test_loader, epochs_times=epoch_times) - return def __init_weights(self): for m in self.modules(): diff --git a/Qtorch/Models/Qnn.py b/Qtorch/Models/Qnn.py index 873ea15..f882169 100644 --- a/Qtorch/Models/Qnn.py +++ b/Qtorch/Models/Qnn.py @@ -1,47 +1,158 @@ import torch import torch.nn as nn import pandas as pd -from abc import ABC, abstractmethod - -from sklearn.metrics import confusion_matrix as cm - +from sklearn.decomposition import PCA +from sklearn.metrics import confusion_matrix from torch.utils.data import DataLoader, TensorDataset -from Qfunctions.divSet import divSet as ds -from Qfunctions.saveToxlsx import save_to_xlsx as stx + +# from Qfunctions.divSet import divSet as ds +# from Qfunctions.saveToxlsx import save_to_xlsx as stx -class Qnn(nn.Module, ABC): +class Qnn(nn.Module): - def __init__(self, labels=None): + def __init__(self): + super(Qnn, self).__init__() - self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - - # 保存原始label, 混淆矩阵使用 - self.original_labels = labels + self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - # 定义结果 - self.result = { - 'acc_and_loss' : { - 'epoch' : [], - 'loss': [], - 'train_accuracy': [], - 'test_accuracy': [], - }, - 'confusion_matrix': None, + self.X_train, self.y_train, self.X_test, self.y_test = None, None, None, None + + self.labels = None + + self.LABEL_ENCODER = None + + self.epoch_data = { + 'epoch': [], + 'train_loss': [], + 'train_accuracy': [], + 'test_accuracy': [] } - def accuracy(self, output, target): - pass + self.pca_2d, self.pca_3d = None, None - # 定义损失函数 - def hinge_loss(self, output, target): - pass - def confusion_matrix(self, test_outputs): - predicted = torch.argmax(test_outputs, dim=1) - true_label = torch.argmax(self.y_test, dim=1) - return cm(predicted.cpu(), true_label.cpu()) + def __prepare_data(self): - def fit(self, epochs = 100): - self.train_model(epochs) \ No newline at end of file + # 将data转换为tensor形式 + X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32) + self.y_train = self.LABEL_ENCODER.fit_transform(self.y_train) + y_train_tensor = torch.tensor(self.y_train, dtype=torch.long) + + X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32) + self.y_test = self.LABEL_ENCODER.transform(self.y_test) + y_test_tensor = torch.tensor(self.y_test, dtype=torch.long) + + train_dataset = TensorDataset(X_train_tensor, y_train_tensor) + test_dataset = TensorDataset(X_test_tensor, y_test_tensor) + + train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) + test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) + + return train_loader, test_loader + + def __train_model(self, train_loader, test_loader, epochs_times=100): + + model = self.to(self.DEVICE) + + criterion = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5) + scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10) + best_test_accuracy = 0 + patience = 100 + counter = 0 + accuracy_threshold = 0.99 # 99% 的准确率阈值 + + for epoch in range(epochs_times): + + model.train() + running_loss = 0.0 + correct_train = 0 + total_train = 0 + + for inputs, labels in train_loader: + inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE) + + optimizer.zero_grad() + outputs = model(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + running_loss += loss.item() + _, predicted = torch.max(outputs.data, 1) + total_train += labels.size(0) + correct_train += (predicted == labels).sum().item() + train_accuracy = correct_train / total_train + train_loss = running_loss / len(train_loader) + + model.eval() + correct_test = 0 + total_test = 0 + all_labels = [] + all_predicted = [] + all_prob = [] + with torch.no_grad(): + for inputs, labels in test_loader: + inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE) + outputs = model(inputs) + prob = torch.nn.functional.softmax(outputs, dim=1) + _, predicted = torch.max(outputs.data, 1) + total_test += labels.size(0) + correct_test += (predicted == labels).sum().item() + all_labels.extend(labels.cpu().numpy()) + all_predicted.extend(predicted.cpu().numpy()) + all_prob.extend(prob.cpu().numpy()) + + test_accuracy = correct_test / total_test + print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%') + + self.epoch_data['epoch'].append(epoch+1) + self.epoch_data['train_loss'].append(train_loss) + self.epoch_data['train_accuracy'].append(train_accuracy) + self.epoch_data['test_accuracy'].append(test_accuracy) + + scheduler.step(train_loss) + + if test_accuracy > best_test_accuracy: + best_test_accuracy = test_accuracy + counter = 0 + else: + counter += 1 + + if counter >= patience and best_test_accuracy >= accuracy_threshold: + print(f"Early stopping at epoch {epoch+1}") + break + + self.cm = confusion_matrix(all_labels, all_predicted, normalize='true') + print(self.cm) + return + + def fit(self, epoch_times = 100): + train_loader, test_loader = self.__prepare_data() + self.__train_model(train_loader, test_loader, epochs_times=epoch_times) + return + + def get_PCA(self): + + # PCA 2D 图像 + pca_2d = PCA(n_components=2) # 保留两个主成分 + principalComponents = pca_2d.fit_transform(self.X_train) + df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2']) + df_pca2d['labels'] = self.y_train + + # PCA 3D 图像 + pca_3d = PCA(n_components=3) + principalComponents = pca_3d.fit_transform(self.X_train) + df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3']) + df_pca3d['labels'] = self.y_train + + return df_pca2d, df_pca3d + + def get_cm(self): + return pd.DataFrame(self.cm, columns=self.labels, index=self.labels) + + def get_epoch_data(self): + return pd.DataFrame(self.epoch_data) + diff --git a/main.py b/main.py index 6445843..56bea2c 100644 --- a/main.py +++ b/main.py @@ -1,50 +1,34 @@ -# frofrom Qtorch.Functions import dLoader from Qtorch.Models.Qmlp import Qmlp from Qfunctions.divSet import divSet -from Qfunctions.loaData import load_data as dLoader -from sklearn.decomposition import PCA +from Qfunctions.loaData import load_data +from Qfunctions.saveToxlsx import save_to_xlsx as save_to_xlsx def main(): projet_name = '20241112Numbers' # 输入元数据文件夹名称 label_names =['1', '2', '3', '4', '5', '6', '7' ,'8', '9'] # 请在[]内输入每一个分类的名称 - data = dLoader(projet_name, label_names, isDir=False, fileClass='xls') + data = load_data(projet_name, label_names, isDir=False, fileClass='xls') X_train, X_test, y_train, y_test, encoder = divSet( data=data, labels=label_names, test_size= 0.3 ) - import pandas as pd - pca = PCA(n_components=2) # 保留两个主成分 - principalComponents = pca.fit_transform(X_train) - df_pca2d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2']) - df_pca2d['labels'] = y_train - - pca = PCA(n_components=3) # 保留三个主成分 - principalComponents = pca.fit_transform(X_train) - df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3']) - df_pca3d['labels'] = y_train - - # 保存为xlsx文件 - import os - folder = os.path.join("./Result", projet_name) - if not os.path.exists(folder): - os.makedirs(folder) - df_pca2d.to_excel(os.path.join(folder, 'pca_2d_points_with_labels.xlsx'), index=False) - df_pca3d.to_excel(os.path.join(folder, 'pca_3d_points_with_labels.xlsx'), index=False) - - model = Qmlp( X_train=X_train, X_test=X_test, y_train=y_train, y_test= y_test, hidden_layers=[128, 128], dropout_rate=0 ) + + pca_2d, pca_3d = model.get_PCA() + + model.fit(300) cm = model.get_cm() epoch_data = model.get_epoch_data() - from Qfunctions.saveToxlsx import save_to_xlsx as stx - stx(project_name=projet_name, file_name="cm", data=cm ) - stx(project_name=projet_name, file_name="acc_and_loss", data=epoch_data) + save_to_xlsx(project_name=projet_name, file_name="pca_2d", data=pca_2d) + save_to_xlsx(project_name=projet_name, file_name="pca_3d", data=pca_3d) + save_to_xlsx(project_name=projet_name, file_name="cm", data=cm ) + save_to_xlsx(project_name=projet_name, file_name="acc_and_loss", data=epoch_data) print("Done")