refactor: unify model base and update docs roadmap

This commit is contained in:
newbieQQ 2026-03-29 20:01:40 +08:00
parent ecf6242fe7
commit 9f241757c6
5 changed files with 343 additions and 158 deletions

View File

@ -1,81 +1,96 @@
import torch
import torch.nn as nn
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from Qtorch.Models.Qnn import Qnn
class QCNN(Qnn):
def __init__(self, X_train, y_train, X_test, y_test, labels=None, dropout_rate=0.3):
super(QCNN, self).__init__()
def __init__(
self,
data,
labels=None,
conv_channels=(16, 32),
kernel_size=3,
hidden_size=128,
dropout_rate=0.3,
test_size=0.2,
random_state=None,
batch_size=64,
learning_rate=0.00001,
weight_decay=1e-5,
lr_scheduler_patience=10,
early_stop_patience=100,
early_stop_threshold=0.99,
):
super(QCNN, self).__init__(
data=data,
labels=labels,
test_size=test_size,
random_state=random_state,
batch_size=batch_size,
learning_rate=learning_rate,
weight_decay=weight_decay,
lr_scheduler_patience=lr_scheduler_patience,
early_stop_patience=early_stop_patience,
early_stop_threshold=early_stop_threshold,
)
self.LABEL_ENCODER = LabelEncoder()
self.conv_channels = tuple(conv_channels)
self.kernel_size = kernel_size
self.hidden_size = hidden_size
self.dropout_rate = dropout_rate
self.X_train, self.y_train, self.X_test, self.y_test = X_train, y_train, X_test, y_test
self.labels = labels
self.feature_extractor = nn.Sequential()
self.classifier = nn.Sequential()
input_size = X_train.shape[1] # 输入的长度
num_classes = len(set(y_train)) # 分类数
# 构造 1D CNN 网络结构
self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes)
self._model_built = True
# 网络层:卷积层 + 池化层 + 全连接层
self.layers = nn.ModuleList()
self.layers.append(nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3)) # 卷积层
self.layers.append(nn.MaxPool1d(kernel_size=2)) # 池化层
self.layers.append(nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3)) # 卷积层
self.layers.append(nn.MaxPool1d(kernel_size=2)) # 池化层
def _transform_features(self, features):
# 1D CNN 输入格式: [batch, channel=1, length]
return torch.tensor(features, dtype=torch.float32).unsqueeze(1)
# 计算展平后的大小
conv_output_size = self._get_conv_output_size(input_size) # 卷积后的输出大小
print(f"Conv output size: {conv_output_size}") # 打印卷积后的输出大小
self.layers.append(nn.Linear(conv_output_size, 128)) # 全连接层
self.layers.append(nn.Linear(128, num_classes)) # 输出层
def build_model(self, input_shape, num_classes):
if len(self.conv_channels) == 0:
raise ValueError("'conv_channels' must contain at least one channel size.")
input_length = int(np.prod(input_shape))
conv_layers = []
in_channels = 1
for out_channels in self.conv_channels:
conv_layers.append(nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=self.kernel_size))
conv_layers.append(nn.ReLU())
conv_layers.append(nn.MaxPool1d(kernel_size=2))
in_channels = out_channels
self.feature_extractor = nn.Sequential(*conv_layers)
conv_output_size = self._get_conv_output_size(input_length)
self.classifier = nn.Sequential(
nn.Linear(conv_output_size, self.hidden_size),
nn.ReLU(),
nn.Dropout(self.dropout_rate),
nn.Linear(self.hidden_size, num_classes),
)
self.__init_weights()
def _get_conv_output_size(self, input_size):
# 计算卷积后的输出尺寸
x = torch.randn(1, 1, input_size) # 创建一个假的输入张量
for layer in self.layers:
x = layer(x) # 通过每一层
return int(x.numel()) # 返回展平后的输出大小
def _get_conv_output_size(self, input_length):
x = torch.randn(1, 1, input_length)
x = self.feature_extractor(x)
return int(x.numel())
def forward(self, x):
# 通过卷积和池化层
for layer in self.layers[:-2]: # 除去最后两个 Linear 层
x = layer(x)
# 展平卷积后的输出
x = x.view(x.size(0), -1) # 这样 x 会变成 (batch_size, conv_output_size)
# 通过全连接层
x = self.layers[-2](x)
x = self.layers[-1](x)
x = self.feature_extractor(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
def __init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
if isinstance(m, (nn.Conv1d, nn.Linear)):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
m.bias.data.fill_(0.01)
def _prepare_data(self):
# 将data转换为tensor形式, unsqueeze可以创建多一维度的
X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32).unsqueeze(1)
self.y_train = self.LABEL_ENCODER.fit_transform(self.y_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32).unsqueeze(1)
self.y_test = self.LABEL_ENCODER.transform(self.y_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
return train_loader, test_loader
nn.init.zeros_(m.bias)

View File

@ -10,34 +10,63 @@ class Qmlp(Qnn):
labels=None,
dropout_rate=0.3,
test_size = 0.2,
random_state=None
random_state=None,
batch_size=64,
learning_rate=0.00001,
weight_decay=1e-5,
lr_scheduler_patience=10,
early_stop_patience=100,
early_stop_threshold=0.99,
):
super(Qmlp, self).__init__(data=data, labels=labels, test_size=test_size, random_state=random_state)
super(Qmlp, self).__init__(
data=data,
labels=labels,
test_size=test_size,
random_state=random_state,
batch_size=batch_size,
learning_rate=learning_rate,
weight_decay=weight_decay,
lr_scheduler_patience=lr_scheduler_patience,
early_stop_patience=early_stop_patience,
early_stop_threshold=early_stop_threshold,
)
input_size = self.X_train.shape[1]
num_classes = len(labels) if labels is not None else int(np.max(self.y_train)) + 1
self.hidden_layers = hidden_layers
self.dropout_rate = dropout_rate
self.layers = nn.ModuleList()
# 构造 MLP 网络结构
self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes)
self._model_built = True
def build_model(self, input_shape, num_classes):
if not self.hidden_layers:
raise ValueError("'hidden_layers' must contain at least one layer size.")
input_size = int(np.prod(input_shape))
self.layers = nn.ModuleList()
# 连接输入层和第一个隐藏层
self.layers.append(nn.Linear(input_size, hidden_layers[0]))
self.layers.append(nn.BatchNorm1d(hidden_layers[0]))
self.layers.append(nn.Linear(input_size, self.hidden_layers[0]))
self.layers.append(nn.BatchNorm1d(self.hidden_layers[0]))
self.layers.append(nn.ReLU())
self.layers.append(nn.Dropout(dropout_rate))
self.layers.append(nn.Dropout(self.dropout_rate))
# 创建隐藏层
for i in range(1, len(hidden_layers)):
self.layers.append(nn.Linear(hidden_layers[i-1], hidden_layers[i]))
self.layers.append(nn.BatchNorm1d(hidden_layers[i]))
self.layers.append(nn.ReLU())
self.layers.append(nn.Dropout(dropout_rate))
for i in range(1, len(self.hidden_layers)):
self.layers.append(nn.Linear(self.hidden_layers[i-1], self.hidden_layers[i]))
self.layers.append(nn.BatchNorm1d(self.hidden_layers[i]))
self.layers.append(nn.ReLU())
self.layers.append(nn.Dropout(self.dropout_rate))
# 创建输出层
self.layers.append(nn.Linear(hidden_layers[-1], num_classes))
self.layers.append(nn.Linear(self.hidden_layers[-1], num_classes))
self.__init_weights()
def forward(self, x):
x = x.view(x.size(0), -1)
for layer in self.layers:
x = layer(x)
x = layer(x)
return x
def __init_weights(self):

View File

@ -10,51 +10,85 @@ from Qfunctions.divSet import divSet as DS
class Qnn(nn.Module):
def __init__(self, data, labels, test_size = 0.2, random_state=None):
def __init__(
self,
data,
labels,
test_size=0.2,
random_state=None,
batch_size=64,
learning_rate=0.00001,
weight_decay=1e-5,
lr_scheduler_patience=10,
early_stop_patience=100,
early_stop_threshold=0.99,
):
super(Qnn, self).__init__()
# 使用gpu进行加速, 没有gpu的话使用CPU
self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 训练配置,子类共享
self.batch_size = batch_size
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.lr_scheduler_patience = lr_scheduler_patience
self.early_stop_patience = early_stop_patience
self.early_stop_threshold = early_stop_threshold
# 划分测试集和训练集
self.X_train, self.X_test, self.y_train, self.y_test, self.LABEL_ENCODER = DS(
data=data, labels=labels, test_size=test_size, random_state=random_state
data=data, labels=labels, test_size=test_size, random_state=random_state
)
self.labels = labels
# 存储过程数据的文件
self.epoch_data = {
self.num_classes = len(labels) if labels is not None else int(np.max(self.y_train)) + 1
# 网络状态
self._model_built = False
# 存储过程数据
self.epoch_data = self._new_epoch_data()
# PCA 图片数据存储
self.pca_2d, self.pca_3d = None, None
self.cm, self.cmn = None, None
def _new_epoch_data(self):
return {
'epoch': [],
'train_loss': [],
'train_accuracy': [],
'test_accuracy': [],
'precision': [],
'recall': [],
'recall': [],
'f1_score': []
}
# PCA 图片数据存储
self.pca_2d, self.pca_3d = None, None
self.cm, self.cmn = None, None
def build_model(self, input_shape, num_classes):
# 子类必须实现具体网络结构
raise NotImplementedError("Subclasses must implement build_model(input_shape, num_classes)")
def _transform_features(self, features):
# 默认输入格式: [batch, feature_dim]
return torch.tensor(features, dtype=torch.float32)
def _prepare_data(self):
# 将data转换为tensor形式
X_train_tensor = torch.tensor(self.X_train, dtype=torch.float32)
# 将data转换为tensor形式(子类可覆写 _transform_features
X_train_tensor = self._transform_features(self.X_train)
y_train_tensor = torch.tensor(self.y_train, dtype=torch.long)
X_test_tensor = torch.tensor(self.X_test, dtype=torch.float32)
X_test_tensor = self._transform_features(self.X_test)
y_test_tensor = torch.tensor(self.y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
return train_loader, test_loader
@ -63,14 +97,15 @@ class Qnn(nn.Module):
model = self.to(self.DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min',
factor=0.1,
patience=self.lr_scheduler_patience,
)
best_test_accuracy = 0
patience = 100
counter = 0
# 99% 的准确率阈值
accuracy_threshold = 0.99
for epoch in range(epochs_times):
@ -140,10 +175,10 @@ class Qnn(nn.Module):
counter = 0
else:
counter += 1
if counter >= patience and best_test_accuracy >= accuracy_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
if counter >= self.early_stop_patience and best_test_accuracy >= self.early_stop_threshold:
print(f"Early stopping at epoch {epoch+1}")
break
# cmn为归一化矩阵
# Keep matrix dimensions stable even when some classes do not appear in this split.
@ -155,6 +190,13 @@ class Qnn(nn.Module):
return
def fit(self, epoch_times = 100):
if not self._model_built:
self.build_model(input_shape=self.X_train.shape[1:], num_classes=self.num_classes)
self._model_built = True
# 每次训练前清空过程指标,避免重复累计
self.epoch_data = self._new_epoch_data()
train_loader, test_loader = self._prepare_data()
self._train_model(train_loader, test_loader, epochs_times=epoch_times)
return
@ -178,10 +220,12 @@ class Qnn(nn.Module):
# 外部获取混淆矩阵的接口
def get_cm(self):
return pd.DataFrame(self.cm, columns=self.labels, index=self.labels)
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
return pd.DataFrame(self.cm, columns=label_names, index=label_names)
def get_cmn(self):
return pd.DataFrame(self.cmn, columns=self.labels, index=self.labels)
label_names = self.labels if self.labels is not None else list(range(self.num_classes))
return pd.DataFrame(self.cmn, columns=label_names, index=label_names)
# 外部获取迭代数据的接口
def get_epoch_data(self):

194
README.md
View File

@ -1,29 +1,6 @@
# Deeplearning 使用说明
## 1. 项目约定
### 1.1 输入数据格式
每一类数据支持 `xls/xlsx/csv`。读取时默认取偶数列(索引 1,3,5...)作为特征,奇数列内容可忽略。
示意:
| 任意值 | 特征值 | 任意值 | 特征值 |
|---|---|---|---|
| arbitrary value | value | arbitrary value | value |
### 1.2 目录约定
训练数据放在 `Static/`,输出结果放在 `Result/`
推荐目录:
```text
.
├─ Static/
│ └─ 20241009MaterialDiv/
└─ Result/
```
## 2. Conda 环境迁移
## 1. Conda 环境迁移
环境文件在 `conda_env/`
@ -31,7 +8,7 @@
- `conda_env/environment.lock.txt`:精确锁定(同系统/同架构优先)
- `conda_env/env.yml`:历史文件
### 2.1 创建环境
### 1.1 创建环境
```bash
# 方式1推荐通用创建
@ -47,7 +24,7 @@ python -V
python -c "import torch; print(torch.__version__)"
```
### 2.2 同名环境已存在时
### 1.2 同名环境已存在时
```bash
# 方式A保留旧环境改名创建
@ -66,13 +43,86 @@ conda env create -f conda_env/environment.portable.yml
conda activate Deeplearning
```
### 2.3 重新导出环境
### 1.3 重新导出环境
```bash
conda env export -n Deeplearning --no-builds > conda_env/environment.portable.yml
conda list -n Deeplearning --explicit > conda_env/environment.lock.txt
```
### 1.4 主要依赖包
训练与数据处理核心依赖:
- Python 3.12
- pytorch / torchvision / torchaudio
- pandas / numpy / scipy
- scikit-learn
- matplotlib / seaborn
- openpyxl / xlrd用于 xls/xlsx 读写)
说明:项目仓库已提供完整环境文件,优先用 `conda_env/environment.portable.yml``conda_env/environment.lock.txt` 创建环境。
### 1.5 依赖安装方式Conda / pip
方式 A推荐Conda 一步到位):
```bash
conda env create -f conda_env/environment.portable.yml
conda activate Deeplearning
```
方式 BConda 最小安装,适合自定义环境):
```bash
conda create -n Deeplearning python=3.12 -y
conda activate Deeplearning
# GPU 机器CUDA 12.4
conda install -y pytorch torchvision torchaudio pytorch-cuda=12.4 -c pytorch -c nvidia
# CPU 机器(无 CUDA
# conda install -y pytorch torchvision torchaudio cpuonly -c pytorch
conda install -y pandas numpy scipy scikit-learn matplotlib seaborn openpyxl xlrd
```
方式 Cpip 安装,适合已有虚拟环境):
```bash
pip install torch torchvision torchaudio
pip install pandas numpy scipy scikit-learn matplotlib seaborn openpyxl xlrd
```
可选开发工具:
```bash
pip install black autopep8 basedpyright
```
## 2. 项目约定
### 2.1 输入数据格式
每一类数据支持 `xls/xlsx/csv`。读取时默认取偶数列(索引 1,3,5...)作为特征,奇数列内容可忽略。
示意:
| 任意值 | 特征值 | 任意值 | 特征值 |
|---|---|---|---|
| arbitrary value | value | arbitrary value | value |
### 2.2 目录约定
训练数据放在 `Static/`,输出结果放在 `Result/`
推荐目录:
```text
.
├─ Static/
│ └─ 20241009MaterialDiv/
└─ Result/
```
## 3. 快速开始
### 3.1 准备数据
@ -137,44 +187,60 @@ PLA <-> PLA.xlsx 或 PLA/
Wood <-> Wood.xlsx 或 Wood/
```
### 3.3 训练示例
### 3.3 通用:数据导入
```python
from Qtorch.Models.Qmlp import Qmlp
from Qfunctions.divSet import divSet
from Qfunctions.loadData import load_data
from Qfunctions.saveToXlsx import save_to_xlsx
projet_name = '20241009MaterialDiv'
label_names = ['Acrlic', 'Ecoflex', 'PDMS', 'PLA', 'Wood']
# 自动识別数据模式
# 支持 .xls 、.xlsx 、.csv 三种格式(可混合使用)
# - folder/label.xlsx 或 folder/label.xls 或 folder/label.csv => 单文件模式
# - folder/label/*.(xlsx|xls|csv) => 多子特征模式
# 自动识别数据模式(支持 xls/xlsx/csv
data = load_data(projet_name, label_names)
```
# 划分训练/测试集
X_train, X_test, y_train, y_test, encoder = divSet(
### 3.4 模型调用
#### 3.4.1 MLP
```python
from Qtorch.Models.Qmlp import Qmlp
model = Qmlp(
data=data,
labels=label_names,
test_size=0.3
hidden_layers=[128, 256, 128],
test_size=0.3,
dropout_rate=0,
)
# 构建模型
model = Qmlp(
X_train=X_train,
X_test=X_test,
y_train=y_train,
y_test=y_test,
hidden_layers=[128],
dropout_rate=0
)
# 训练与导出结果
pca_2d, pca_3d = model.get_PCA()
model.fit(300)
```
#### 3.4.2 1D CNN
```python
from Qtorch.Models.Qcnn import QCNN
model = QCNN(
data=data,
labels=label_names,
conv_channels=(16, 32),
kernel_size=3,
hidden_size=128,
test_size=0.3,
dropout_rate=0,
)
model.fit(300)
```
### 3.5 通用:结果获取与图表导出
```python
from Qfunctions.saveToXlsx import save_to_xlsx
pca_2d, pca_3d = model.get_PCA()
cm = model.get_cm()
cmn = model.get_cmn()
epoch_data = model.get_epoch_data()
@ -196,7 +262,9 @@ save_to_xlsx(project_name=projet_name, file_name='acc_and_loss', data=epoch_data
自动识别规则:
- 若每个 `label` 都对应 `folder/label/*.(xlsx|xls|csv)`,识别为多子特征模式。
- 若每个 `label` 都对应 `folder/label.(xlsx|xls|csv)`,识别为单文件模式。- 超出需法的文件格式(只许 xls/xlsx/csv汽转时报错。- 若两种都成立(同名文件和同名子目录同时存在),会报错并提示只保留一种目录结构。
- 若每个 `label` 都对应 `folder/label.(xlsx|xls|csv)`,识别为单文件模式。
- 超出支持范围的文件格式(仅支持 xls/xlsx/csv会报错。
- 若两种都成立(同名文件和同名子目录同时存在),会报错并提示只保留一种目录结构。
- 若两种都不成立,会报错并提示检查目录结构或 `label_names`
读取路径规则:
@ -211,3 +279,27 @@ save_to_xlsx(project_name=projet_name, file_name='acc_and_loss', data=epoch_data
- `label_names` 与文件/文件夹是否同名
- 文件后缀是否为 `.xls`、`.xlsx` 或 `.csv`(其他格式将报错)
## 6. TODO后续计划
### 阶段一:基础稳定与可维护性
- [ ] 固化模型基类契约:`build_model`、输入变换钩子、统一训练配置。
- [ ] 封装 `QDL` 包结构(`qdl.data / qdl.models / qdl.export / qdl.api`)。
- [ ] 增加兼容层:保留旧导入路径,逐步迁移到新包路径。
- [ ] 增加最小测试集:`load_data`、`Qmlp.fit(1)`、`QCNN.fit(1)`、导出函数。
### 阶段二:高维与复合模型能力
- [ ] 将 batch 结构升级为字典(`x/y/lengths/mask/meta`),支持复合输入。
- [ ] 在训练框架中加入钩子:`prepare_batch`、`forward_step`、`compute_loss`。
- [ ] 引入编码器层Encoder抽象按策略扩展MLP/CNN/LSTM/GNN/Transformer不按维度硬扩类。
- [ ] 支持可变长时间序列(`collate_fn + lengths + mask`),作为后续复合模型基础。
- [ ] 增加多分支融合模板(时序分支 + 静态分支),预留多任务损失组合。
### 阶段三:对外能力与发布
- [ ] 统一对外入口:提供高层 API例如 `train_mlp`、`train_cnn1d`、`train_hybrid`)。
- [ ] 在模型文档中预留扩展位:`3.4.3 LSTM`、`3.4.4 GNN`、`3.4.5 Hybrid`。
- [ ] 完成打包配置(`pyproject.toml`)与本地可编辑安装说明。
- [ ] 发布前回归:在 Conda 与 pip 环境分别跑通最小端到端流程。

View File

@ -19,7 +19,12 @@ def main():
dropout_rate=0
)
# model = QCNN(
# X_train=X_train, X_test=X_test, y_train=y_train, y_test= y_test,
# data=data,
# labels=label_names,
# conv_channels=(16, 32),
# kernel_size=3,
# hidden_size=128,
# test_size=0.3,
# dropout_rate=0
# )