删除没用代码

This commit is contained in:
newbie 2024-11-28 19:51:17 +08:00
parent 0dd4b05977
commit 101ba89c34
5 changed files with 190 additions and 182 deletions

View File

@ -4,5 +4,4 @@ def save_to_xlsx(project_name, file_name, data):
os.makedirs(f'Result/{project_name}', exist_ok=True) os.makedirs(f'Result/{project_name}', exist_ok=True)
data.to_excel(f'Result/{project_name}/{file_name}.xlsx', index=True) data.to_excel(f'Result/{project_name}/{file_name}.xlsx', index=True)
print("Save successed to " + f'Result/{project_name}/{file_name}.xlsx') print("Save successed to " + f'Result/{project_name}/{file_name}.xlsx')
return return

32
Qtorch/Models/Qcnn.py Normal file
View File

@ -0,0 +1,32 @@
import torch
import torch.nn as nn
import torch.optim as optim
class Simple1DCNN(nn.Module):
def __init__(self, input_size, num_classes):
super(Simple1DCNN, self).__init__()
self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
self.fc1 = nn.Linear(64 * (input_size // 4), 128) # 假设经过两次池化后,长度减半两次
self.fc2 = nn.Linear(128, num_classes)
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 64 * (self.input_size // 4)) # 展平特征图
x = self.relu(self.fc1(x))
x = self.fc2(x)
return x
# 实例化模型
input_size = 100 # 假设n=100
num_classes = 10 # 假设有10个类别
model = Simple1DCNN(input_size, num_classes)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练和评估模型的代码与之前类似,这里不再赘述。

View File

@ -1,23 +1,12 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset from Qtorch.Models.Qnn import Qnn
from sklearn.preprocessing import LabelEncoder from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix from sklearn.metrics import confusion_matrix
import pandas as pd import pandas as pd
LABEL_ENCODER = LabelEncoder()
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class Qmlp(nn.Module): class Qmlp(Qnn):
epoch_data = {
'epoch': [],
'train_loss': [],
'train_accuracy': [],
'test_accuracy': []
}
labels = None
def __init__(self, X_train, y_train, X_test, y_test, def __init__(self, X_train, y_train, X_test, y_test,
hidden_layers, hidden_layers,
@ -26,15 +15,14 @@ class Qmlp(nn.Module):
): ):
super(Qmlp, self).__init__() super(Qmlp, self).__init__()
self.LABEL_ENCODER = LabelEncoder()
self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test
self.labels = labels self.labels = labels
input_size = X_train.shape[1] input_size = X_train.shape[1]
# input_size = 5
print(input_size)
num_classes = len(set(y_train)) num_classes = len(set(y_train))
self.layers = nn.ModuleList() self.layers = nn.ModuleList()
# Input layer to first hidden layer # Input layer to first hidden layer
@ -59,112 +47,6 @@ class Qmlp(nn.Module):
x = layer(x) x = layer(x)
return x return x
def __prepare_data(self):
# Step 2: Prepare the data
X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32)
self.y_train = LABEL_ENCODER.fit_transform(self.y_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32)
self.y_test = LABEL_ENCODER.transform(self.y_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
return train_loader, test_loader
def __train_model(self, train_loader, test_loader, epochs_times=100):
model = self.to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
best_test_accuracy = 0
patience = 100
counter = 0
accuracy_threshold = 0.99 # 99% 的准确率阈值
for epoch in range(epochs_times):
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_train += labels.size(0)
correct_train += (predicted == labels).sum().item()
train_accuracy = correct_train / total_train
train_loss = running_loss / len(train_loader)
model.eval()
correct_test = 0
total_test = 0
all_labels = []
all_predicted = []
all_prob = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
outputs = model(inputs)
prob = torch.nn.functional.softmax(outputs, dim=1)
_, predicted = torch.max(outputs.data, 1)
total_test += labels.size(0)
correct_test += (predicted == labels).sum().item()
all_labels.extend(labels.cpu().numpy())
all_predicted.extend(predicted.cpu().numpy())
all_prob.extend(prob.cpu().numpy())
test_accuracy = correct_test / total_test
print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%')
self.epoch_data['epoch'].append(epoch+1)
self.epoch_data['train_loss'].append(train_loss)
self.epoch_data['train_accuracy'].append(train_accuracy)
self.epoch_data['test_accuracy'].append(test_accuracy)
scheduler.step(train_loss)
if test_accuracy > best_test_accuracy:
best_test_accuracy = test_accuracy
counter = 0
else:
counter += 1
if counter >= patience and best_test_accuracy >= accuracy_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
print(self.cm)
return
def get_cm(self):
return pd.DataFrame(self.cm, columns=self.labels, index=self.labels)
def get_epoch_data(self):
return pd.DataFrame(self.epoch_data)
def fit(self, epoch_times = 100):
train_loader, test_loader = self.__prepare_data()
self.__train_model(train_loader, test_loader, epochs_times=epoch_times)
return
def __init_weights(self): def __init_weights(self):
for m in self.modules(): for m in self.modules():
if isinstance(m, nn.Linear): if isinstance(m, nn.Linear):

View File

@ -1,47 +1,158 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
import pandas as pd import pandas as pd
from abc import ABC, abstractmethod from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix
from sklearn.metrics import confusion_matrix as cm
from torch.utils.data import DataLoader, TensorDataset from torch.utils.data import DataLoader, TensorDataset
from Qfunctions.divSet import divSet as ds
from Qfunctions.saveToxlsx import save_to_xlsx as stx # from Qfunctions.divSet import divSet as ds
# from Qfunctions.saveToxlsx import save_to_xlsx as stx
class Qnn(nn.Module, ABC): class Qnn(nn.Module):
def __init__(self):
def __init__(self, labels=None):
super(Qnn, self).__init__() super(Qnn, self).__init__()
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 保存原始label 混淆矩阵使用 self.X_train, self.y_train, self.X_test, self.y_test = None, None, None, None
self.original_labels = labels
# 定义结果 self.labels = None
self.result = {
'acc_and_loss' : { self.LABEL_ENCODER = None
'epoch' : [],
'loss': [], self.epoch_data = {
'epoch': [],
'train_loss': [],
'train_accuracy': [], 'train_accuracy': [],
'test_accuracy': [], 'test_accuracy': []
},
'confusion_matrix': None,
} }
def accuracy(self, output, target): self.pca_2d, self.pca_3d = None, None
pass
# 定义损失函数
def hinge_loss(self, output, target):
pass
def confusion_matrix(self, test_outputs): def __prepare_data(self):
predicted = torch.argmax(test_outputs, dim=1)
true_label = torch.argmax(self.y_test, dim=1) # 将data转换为tensor形式
return cm(predicted.cpu(), true_label.cpu()) X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32)
self.y_train = self.LABEL_ENCODER.fit_transform(self.y_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32)
self.y_test = self.LABEL_ENCODER.transform(self.y_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
return train_loader, test_loader
def __train_model(self, train_loader, test_loader, epochs_times=100):
model = self.to(self.DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
best_test_accuracy = 0
patience = 100
counter = 0
accuracy_threshold = 0.99 # 99% 的准确率阈值
for epoch in range(epochs_times):
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_train += labels.size(0)
correct_train += (predicted == labels).sum().item()
train_accuracy = correct_train / total_train
train_loss = running_loss / len(train_loader)
model.eval()
correct_test = 0
total_test = 0
all_labels = []
all_predicted = []
all_prob = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
outputs = model(inputs)
prob = torch.nn.functional.softmax(outputs, dim=1)
_, predicted = torch.max(outputs.data, 1)
total_test += labels.size(0)
correct_test += (predicted == labels).sum().item()
all_labels.extend(labels.cpu().numpy())
all_predicted.extend(predicted.cpu().numpy())
all_prob.extend(prob.cpu().numpy())
test_accuracy = correct_test / total_test
print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%')
self.epoch_data['epoch'].append(epoch+1)
self.epoch_data['train_loss'].append(train_loss)
self.epoch_data['train_accuracy'].append(train_accuracy)
self.epoch_data['test_accuracy'].append(test_accuracy)
scheduler.step(train_loss)
if test_accuracy > best_test_accuracy:
best_test_accuracy = test_accuracy
counter = 0
else:
counter += 1
if counter >= patience and best_test_accuracy >= accuracy_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
print(self.cm)
return
def fit(self, epoch_times = 100):
train_loader, test_loader = self.__prepare_data()
self.__train_model(train_loader, test_loader, epochs_times=epoch_times)
return
def get_PCA(self):
# PCA 2D 图像
pca_2d = PCA(n_components=2) # 保留两个主成分
principalComponents = pca_2d.fit_transform(self.X_train)
df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2'])
df_pca2d['labels'] = self.y_train
# PCA 3D 图像
pca_3d = PCA(n_components=3)
principalComponents = pca_3d.fit_transform(self.X_train)
df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3'])
df_pca3d['labels'] = self.y_train
return df_pca2d, df_pca3d
def get_cm(self):
return pd.DataFrame(self.cm, columns=self.labels, index=self.labels)
def get_epoch_data(self):
return pd.DataFrame(self.epoch_data)
def fit(self, epochs = 100):
self.train_model(epochs)

38
main.py
View File

@ -1,50 +1,34 @@
# frofrom Qtorch.Functions import dLoader
from Qtorch.Models.Qmlp import Qmlp from Qtorch.Models.Qmlp import Qmlp
from Qfunctions.divSet import divSet from Qfunctions.divSet import divSet
from Qfunctions.loaData import load_data as dLoader from Qfunctions.loaData import load_data
from sklearn.decomposition import PCA from Qfunctions.saveToxlsx import save_to_xlsx as save_to_xlsx
def main(): def main():
projet_name = '20241112Numbers' # 输入元数据文件夹名称 projet_name = '20241112Numbers' # 输入元数据文件夹名称
label_names =['1', '2', '3', '4', '5', '6', '7' ,'8', '9'] # 请在[]内输入每一个分类的名称 label_names =['1', '2', '3', '4', '5', '6', '7' ,'8', '9'] # 请在[]内输入每一个分类的名称
data = dLoader(projet_name, label_names, isDir=False, fileClass='xls') data = load_data(projet_name, label_names, isDir=False, fileClass='xls')
X_train, X_test, y_train, y_test, encoder = divSet( X_train, X_test, y_train, y_test, encoder = divSet(
data=data, labels=label_names, test_size= 0.3 data=data, labels=label_names, test_size= 0.3
) )
import pandas as pd
pca = PCA(n_components=2) # 保留两个主成分
principalComponents = pca.fit_transform(X_train)
df_pca2d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2'])
df_pca2d['labels'] = y_train
pca = PCA(n_components=3) # 保留三个主成分
principalComponents = pca.fit_transform(X_train)
df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3'])
df_pca3d['labels'] = y_train
# 保存为xlsx文件
import os
folder = os.path.join("./Result", projet_name)
if not os.path.exists(folder):
os.makedirs(folder)
df_pca2d.to_excel(os.path.join(folder, 'pca_2d_points_with_labels.xlsx'), index=False)
df_pca3d.to_excel(os.path.join(folder, 'pca_3d_points_with_labels.xlsx'), index=False)
model = Qmlp( model = Qmlp(
X_train=X_train, X_test=X_test, y_train=y_train, y_test= y_test, X_train=X_train, X_test=X_test, y_train=y_train, y_test= y_test,
hidden_layers=[128, 128], hidden_layers=[128, 128],
dropout_rate=0 dropout_rate=0
) )
pca_2d, pca_3d = model.get_PCA()
model.fit(300) model.fit(300)
cm = model.get_cm() cm = model.get_cm()
epoch_data = model.get_epoch_data() epoch_data = model.get_epoch_data()
from Qfunctions.saveToxlsx import save_to_xlsx as stx save_to_xlsx(project_name=projet_name, file_name="pca_2d", data=pca_2d)
stx(project_name=projet_name, file_name="cm", data=cm ) save_to_xlsx(project_name=projet_name, file_name="pca_3d", data=pca_3d)
stx(project_name=projet_name, file_name="acc_and_loss", data=epoch_data) save_to_xlsx(project_name=projet_name, file_name="cm", data=cm )
save_to_xlsx(project_name=projet_name, file_name="acc_and_loss", data=epoch_data)
print("Done") print("Done")