180 lines
5.8 KiB
Python
180 lines
5.8 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
from torch.utils.data import DataLoader, TensorDataset
|
|
from sklearn.preprocessing import LabelEncoder
|
|
from sklearn.metrics import confusion_matrix
|
|
import pandas as pd
|
|
|
|
LABEL_ENCODER = LabelEncoder()
|
|
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
class Qmlp(nn.Module):
|
|
|
|
epoch_data = {
|
|
'epoch': [],
|
|
'train_loss': [],
|
|
'train_accuracy': [],
|
|
'test_accuracy': []
|
|
}
|
|
|
|
labels = None
|
|
|
|
def __init__(self, X_train, y_train, X_test, y_test,
|
|
hidden_layers,
|
|
labels=None,
|
|
dropout_rate=0.3
|
|
):
|
|
super(Qmlp, self).__init__()
|
|
|
|
self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test
|
|
|
|
self.labels = labels
|
|
|
|
input_size = X_train.shape[1]
|
|
# input_size = 5
|
|
print(input_size)
|
|
num_classes = len(set(y_train))
|
|
|
|
self.layers = nn.ModuleList()
|
|
|
|
# Input layer to first hidden layer
|
|
self.layers.append(nn.Linear(input_size, hidden_layers[0]))
|
|
self.layers.append(nn.BatchNorm1d(hidden_layers[0]))
|
|
self.layers.append(nn.ReLU())
|
|
self.layers.append(nn.Dropout(dropout_rate))
|
|
|
|
# Create hidden layers
|
|
for i in range(1, len(hidden_layers)):
|
|
self.layers.append(nn.Linear(hidden_layers[i-1], hidden_layers[i]))
|
|
self.layers.append(nn.BatchNorm1d(hidden_layers[i]))
|
|
self.layers.append(nn.ReLU())
|
|
self.layers.append(nn.Dropout(dropout_rate))
|
|
|
|
# Output layer
|
|
self.layers.append(nn.Linear(hidden_layers[-1], num_classes))
|
|
self.__init_weights()
|
|
|
|
def forward(self, x):
|
|
for layer in self.layers:
|
|
x = layer(x)
|
|
return x
|
|
|
|
def __prepare_data(self):
|
|
|
|
# Step 2: Prepare the data
|
|
X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32)
|
|
self.y_train = LABEL_ENCODER.fit_transform(self.y_train)
|
|
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
|
|
|
|
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32)
|
|
self.y_test = LABEL_ENCODER.transform(self.y_test)
|
|
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
|
|
|
|
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
|
|
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
|
|
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
|
|
|
|
return train_loader, test_loader
|
|
|
|
def __train_model(self, train_loader, test_loader, epochs_times=100):
|
|
|
|
model = self.to(DEVICE)
|
|
|
|
criterion = nn.CrossEntropyLoss()
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)
|
|
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
|
|
best_test_accuracy = 0
|
|
patience = 100
|
|
counter = 0
|
|
accuracy_threshold = 0.99 # 99% 的准确率阈值
|
|
|
|
for epoch in range(epochs_times):
|
|
|
|
model.train()
|
|
running_loss = 0.0
|
|
correct_train = 0
|
|
total_train = 0
|
|
|
|
for inputs, labels in train_loader:
|
|
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
|
|
|
|
optimizer.zero_grad()
|
|
outputs = model(inputs)
|
|
loss = criterion(outputs, labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
running_loss += loss.item()
|
|
_, predicted = torch.max(outputs.data, 1)
|
|
total_train += labels.size(0)
|
|
correct_train += (predicted == labels).sum().item()
|
|
train_accuracy = correct_train / total_train
|
|
train_loss = running_loss / len(train_loader)
|
|
|
|
model.eval()
|
|
correct_test = 0
|
|
total_test = 0
|
|
all_labels = []
|
|
all_predicted = []
|
|
all_prob = []
|
|
with torch.no_grad():
|
|
for inputs, labels in test_loader:
|
|
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
|
|
outputs = model(inputs)
|
|
prob = torch.nn.functional.softmax(outputs, dim=1)
|
|
_, predicted = torch.max(outputs.data, 1)
|
|
total_test += labels.size(0)
|
|
correct_test += (predicted == labels).sum().item()
|
|
all_labels.extend(labels.cpu().numpy())
|
|
all_predicted.extend(predicted.cpu().numpy())
|
|
all_prob.extend(prob.cpu().numpy())
|
|
|
|
test_accuracy = correct_test / total_test
|
|
print(f'Epoch [{epoch+1}/{epochs_times}], Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy * 100:.2f}%, Test Accuracy: {test_accuracy*100:.2f}%')
|
|
|
|
self.epoch_data['epoch'].append(epoch+1)
|
|
self.epoch_data['train_loss'].append(train_loss)
|
|
self.epoch_data['train_accuracy'].append(train_accuracy)
|
|
self.epoch_data['test_accuracy'].append(test_accuracy)
|
|
|
|
scheduler.step(train_loss)
|
|
|
|
if test_accuracy > best_test_accuracy:
|
|
best_test_accuracy = test_accuracy
|
|
counter = 0
|
|
else:
|
|
counter += 1
|
|
|
|
if counter >= patience and best_test_accuracy >= accuracy_threshold:
|
|
print(f"Early stopping at epoch {epoch+1}")
|
|
break
|
|
|
|
if self.labels:
|
|
# labels_encoded = LABEL_ENCODER.fit(self.labels)
|
|
self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
|
|
else:
|
|
self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
|
|
|
|
|
|
# self.cm = confusion_matrix(all_labels, all_predicted, normalize='true')
|
|
print(self.cm)
|
|
return
|
|
|
|
def get_cm(self):
|
|
return pd.DataFrame(self.cm, columns=self.labels, index=self.labels)
|
|
|
|
def get_epoch_data(self):
|
|
return pd.DataFrame(self.epoch_data)
|
|
|
|
def fit(self, epoch_times = 100):
|
|
|
|
train_loader, test_loader = self.__prepare_data()
|
|
self.__train_model(train_loader, test_loader, epochs_times=epoch_times)
|
|
return
|
|
def __init_weights(self):
|
|
for m in self.modules():
|
|
if isinstance(m, nn.Linear):
|
|
nn.init.xavier_uniform_(m.weight)
|
|
nn.init.zeros_(m.bias) |