Deeplearning/Qtorch/Models/Qnn.py

234 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
from torch.utils.data import DataLoader, TensorDataset
from Qfunctions.divSet import divSet as DS
class Qnn(nn.Module):
def __init__(
self,
data,
labels,
test_size=0.2,
random_state=None,
batch_size=64,
learning_rate=0.00001,
weight_decay=1e-5,
lr_scheduler_patience=10,
early_stop_patience=100,
early_stop_threshold=0.99,
):
super(Qnn, self).__init__()
# 使用gpu进行加速, 没有gpu的话使用CPU
self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 训练配置,子类共享
self.batch_size = batch_size
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.lr_scheduler_patience = lr_scheduler_patience
self.early_stop_patience = early_stop_patience
self.early_stop_threshold = early_stop_threshold
# 划分测试集和训练集
self.X_train, self.X_test, self.y_train, self.y_test, self.LABEL_ENCODER = DS(
data=data, labels=labels, test_size=test_size, random_state=random_state
)
self.labels = labels
self.num_classes = len(labels) if labels is not None else int(np.max(self.y_train)) + 1
# 网络状态
self._model_built = False
# 存储过程数据
self.epoch_data = self._new_epoch_data()
# PCA 图片数据存储
self.pca_2d, self.pca_3d = None, None
self.cm, self.cmn = None, None
def _new_epoch_data(self):
return {
'epoch': [],
'train_loss': [],
'train_accuracy': [],
'test_accuracy': [],
'precision': [],
'recall': [],
'f1_score': []
}
def build_model(self, input_shape, num_classes):
# 子类必须实现具体网络结构
raise NotImplementedError("Subclasses must implement build_model(input_shape, num_classes)")
def _transform_features(self, features):
# 默认输入格式: [batch, feature_dim]
return torch.tensor(features, dtype=torch.float32)
def _prepare_data(self):
# 将data转换为tensor形式子类可覆写 _transform_features
X_train_tensor = self._transform_features(self.X_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = self._transform_features(self.X_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
return train_loader, test_loader
def _train_model(self, train_loader, test_loader, epochs_times=100):
model = self.to(self.DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min',
factor=0.1,
patience=self.lr_scheduler_patience,
)
best_test_accuracy = 0
counter = 0
for epoch in range(epochs_times):
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_train += labels.size(0)
correct_train += (predicted == labels).sum().item()
train_accuracy = correct_train / total_train
train_loss = running_loss / len(train_loader)
model.eval()
correct_test = 0
total_test = 0
all_labels = []
all_predicted = []
all_prob = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(self.DEVICE), labels.to(self.DEVICE)
outputs = model(inputs)
prob = torch.nn.functional.softmax(outputs, dim=1)
_, predicted = torch.max(outputs.data, 1)
total_test += labels.size(0)
correct_test += (predicted == labels).sum().item()
all_labels.extend(labels.cpu().numpy())
all_predicted.extend(predicted.cpu().numpy())
all_prob.extend(prob.cpu().numpy())
test_accuracy = correct_test / total_test
f1 = f1_score(all_labels, all_predicted, average='macro', zero_division=0)
precision = precision_score(all_labels, all_predicted, average='macro', zero_division=0)
recall = recall_score(all_labels, all_predicted, average='macro', zero_division=0)
if (epoch + 1) % 10 == 0:
print('===============================================')
print(f'Epoch [{epoch + 1} / {epochs_times}]:')
print(f'Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%, Loss: {train_loss:.4f}')
print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score:{f1:.4f}, ')
print('===============================================')
self.epoch_data['epoch'].append(epoch+1)
self.epoch_data['train_loss'].append(train_loss)
self.epoch_data['train_accuracy'].append(train_accuracy)
self.epoch_data['test_accuracy'].append(test_accuracy)
self.epoch_data['precision'].append(precision)
self.epoch_data['recall'].append(recall)
self.epoch_data['f1_score'].append(f1)
scheduler.step(train_loss)
if test_accuracy > best_test_accuracy:
best_test_accuracy = test_accuracy
counter = 0
else:
counter += 1
if counter >= self.early_stop_patience and best_test_accuracy >= self.early_stop_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
# cmn为归一化矩阵
# Keep matrix dimensions stable even when some classes do not appear in this split.
cm_labels = np.arange(len(self.labels)) if self.labels is not None else None
self.cm = confusion_matrix(all_labels, all_predicted, labels=cm_labels)
self.cmn = confusion_matrix(all_labels, all_predicted, labels=cm_labels, normalize='true')
print(self.cm)
return
def fit(self, epoch_times = 100):
if not self._model_built:
self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes)
self._model_built = True
# 每次训练前清空过程指标,避免重复累计
self.epoch_data = self._new_epoch_data()
train_loader, test_loader = self._prepare_data()
self._train_model(train_loader, test_loader, epochs_times=epoch_times)
return
# 外部获取PCA图像数据的接口
def get_PCA(self):
# PCA 2D 图像
pca_2d = PCA(n_components=2) # 保留两个主成分
principalComponents = pca_2d.fit_transform(self.X_train)
df_pca2d =pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2'])
df_pca2d['labels'] = self.y_train
# PCA 3D 图像
pca_3d = PCA(n_components=3) # 保留三个主成分
principalComponents = pca_3d.fit_transform(self.X_train)
df_pca3d = pd.DataFrame(data=principalComponents, columns=['PC1', 'PC2', 'PC3'])
df_pca3d['labels'] = self.y_train
return df_pca2d, df_pca3d
# 外部获取混淆矩阵的接口
def get_cm(self):
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
return pd.DataFrame(self.cm, columns=label_names, index=label_names)
def get_cmn(self):
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
return pd.DataFrame(self.cmn, columns=label_names, index=label_names)
# 外部获取迭代数据的接口
def get_epoch_data(self):
return pd.DataFrame(self.epoch_data)