From a7e95141d21ad35ac198667fe38ba614da3fe285 Mon Sep 17 00:00:00 2001 From: newbieQQ Date: Thu, 14 May 2026 10:27:32 +0800 Subject: [PATCH] feat: add data visualization script for comprehensive analysis - Introduced `visualize.py` to generate various visualizations including: - Class distribution bar chart - Feature distribution histograms (overlaid by class) - Feature box plots (for top N features) - PCA reduced scatter plot with confidence ellipses - t-SNE reduced scatter plot - Heatmaps for class means and standard deviations - Global feature correlation heatmap - Overview of global feature distributions - Implemented data loading functions to handle multiple file formats and structures. - Added command-line interface for flexible usage with options for feature limits and PCA/t-SNE toggles. Co-authored-by: Copilot --- Qfunctions/saveToXlsx.py | 2 +- Scripts/check_data.py | 631 +++++++++++++++++++++++++++++++++++++ Scripts/visualize.py | 657 +++++++++++++++++++++++++++++++++++++++ main.py | 8 +- 4 files changed, 1293 insertions(+), 5 deletions(-) create mode 100644 Scripts/check_data.py create mode 100644 Scripts/visualize.py diff --git a/Qfunctions/saveToXlsx.py b/Qfunctions/saveToXlsx.py index 52f2e96..9283d42 100644 --- a/Qfunctions/saveToXlsx.py +++ b/Qfunctions/saveToXlsx.py @@ -143,7 +143,7 @@ def draw_and_save_cm(file_path): df_cm = pd.read_excel(file_path) labels = df_cm.columns[1:].tolist() - cm = df_cm.values[:, 1:] + cm = df_cm.iloc[:, 1:].to_numpy(dtype=float) fig, axs = plt.subplots(1, 2, figsize=(12, 6)) diff --git a/Scripts/check_data.py b/Scripts/check_data.py new file mode 100644 index 0000000..5fcb7f6 --- /dev/null +++ b/Scripts/check_data.py @@ -0,0 +1,631 @@ +#!/usr/bin/env python3 +""" +数据质量检查脚本 +=================== +对 Static/ 下的数据目录执行完整性、统计、平衡性与离群值检查, +生成详细报告输出到终端,并可保存为文本文件。 + +用法: + python Scripts/check_data.py --folder 20260319Numbers --labels 0 1 2 3 4 5 6 7 8 9 + python Scripts/check_data.py --folder "20260408 grap" --labels 1 2 3 4 5 6 7 8 9 --output report.txt + python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 + +要求: + 在项目根目录 (Deeplearning/) 下运行,或通过 --root 指定项目根目录。 +""" + +import os +import sys +import argparse +import unicodedata +from pathlib import Path + +import numpy as np +import pandas as pd + + +# ============================================================ +# 工具函数(与 loadData.py 中逻辑保持一致) +# ============================================================ + +DEFAULT_FILE_CLASSES = ("xlsx", "xls", "csv") + + +def _has_supported_extension(filename: str, file_classes=DEFAULT_FILE_CLASSES) -> bool: + ext = os.path.splitext(filename)[1].lower().lstrip(".") + return ext in file_classes + + +def _read_data_file(file_path: str) -> pd.DataFrame: + ext = os.path.splitext(file_path)[1].lower() + if ext == ".csv": + return pd.read_csv(file_path) + if ext in (".xls", ".xlsx"): + return pd.read_excel(file_path) + raise ValueError( + f"Unsupported file format: {ext}. Only .xls, .xlsx, and .csv are supported. " + f"File: {file_path}" + ) + + +def _strip_zero_width(s: str) -> str: + if not isinstance(s, str): + return s + return s.translate( + {0x200B: None, 0x200C: None, 0x200D: None, 0xFEFF: None} + ) + + +def _canonicalize_name(name: str) -> str: + name = unicodedata.normalize("NFKC", name) + name = _strip_zero_width(name) + return name + + +def _normalize_for_compare(name: str) -> str: + n = _canonicalize_name(name) + n = n.replace("_", " ") + n = " ".join(n.split()) + return n.lower() + + +def _find_matching_file(folder: str, expected_name: str): + expected = _canonicalize_name(expected_name) + try: + entries = os.listdir(folder) + except FileNotFoundError: + return None + for f in entries: + if _canonicalize_name(f) == expected: + return f + expected_lower = expected.lower() + for f in entries: + if _canonicalize_name(f).lower() == expected_lower: + return f + expected_relaxed = _normalize_for_compare(expected_name) + for f in entries: + if _normalize_for_compare(f) == expected_relaxed: + return f + return None + + +def _find_matching_file_by_label(folder: str, label_name, file_classes): + for ext in file_classes: + expected_name = f"{label_name}.{ext}" + match = _find_matching_file(folder, expected_name) + if match is not None: + return match + return None + + +# ============================================================ +# 报告生成工具 +# ============================================================ + +class ReportBuffer: + """收集报告行,并同时输出到 stdout 和文件。""" + + def __init__(self, output_path=None): + self.lines: list[str] = [] + self.output_path = output_path + + def add(self, text: str = ""): + print(text) + self.lines.append(text) + + def save(self): + if self.output_path: + with open(self.output_path, "w", encoding="utf-8") as f: + f.write("\n".join(self.lines) + "\n") + print(f"\n报告已保存到: {self.output_path}") + + +# ============================================================ +# 核心检查逻辑 +# ============================================================ + +def _extract_features(df: pd.DataFrame, source: str) -> pd.DataFrame: + """ + 按项目约定提取偶数列作为特征(保持 int 列名以对齐)。 + 返回特征 DataFrame(列名 0, 2, 4, ...)。 + """ + # 偶数列索引: 1, 3, 5, ... + even_cols = [c for i, c in enumerate(df.columns) if i % 2 == 1] + if not even_cols: + raise ValueError(f"没有找到偶数列(特征列)。请检查文件: {source}") + features = df[even_cols].copy() + + # 尝试转为数值 + for c in features.columns: + features[c] = pd.to_numeric(features[c], errors="coerce") + + return features + + +def check_tabular_project(root: str, folder: str, labels: list[str], rp: ReportBuffer): + """完整检查流程""" + data_dir = os.path.join(root, "Static", folder) + if not os.path.isdir(data_dir): + rp.add(f"[ERROR] 目录不存在: {data_dir}") + rp.add("请确认 --folder 参数正确。") + return + + rp.add("=" * 64) + rp.add(" Deeplearning 数据质量检查报告") + rp.add("=" * 64) + rp.add(f" 数据目录 : {data_dir}") + rp.add(f" 标签数量 : {len(labels)}") + rp.add(f" 标签列表 : {labels}") + rp.add() + + # ---- 第一步:检测数据模式 ---- + has_all_subfolders = True + for lbl in labels: + sub = os.path.join(data_dir, str(lbl)) + if not (os.path.isdir(sub) and any(_has_supported_extension(f) for f in os.listdir(sub))): + has_all_subfolders = False + break + + has_all_files = True + for lbl in labels: + if _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES) is None: + has_all_files = False + break + + if has_all_files and not has_all_subfolders: + mode = "single_file" + elif has_all_subfolders and not has_all_files: + mode = "multi_folder" + else: + rp.add("[ERROR] 无法自动检测数据模式,或两种模式同时存在。") + rp.add(f" has_all_files : {has_all_files}") + rp.add(f" has_all_subfolders: {has_all_subfolders}") + rp.add("请确保每个 label 对应唯一的文件或唯一的子目录。") + return + + if mode == "single_file": + _check_single_file_mode(data_dir, labels, rp) + else: + _check_multi_folder_mode(data_dir, labels, rp) + + rp.add() + rp.add("=" * 64) + rp.add(" 检查完成。") + rp.add("=" * 64) + + +def _check_single_file_mode(data_dir: str, labels: list[str], rp: ReportBuffer): + rp.add() + rp.add("── 数据模式: 单文件模式 ──") + rp.add() + + # 1. 定位实际文件名 + file_map: dict[str, str] = {} + missing: list[str] = [] + for lbl in labels: + match = _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES) + if match: + file_map[lbl] = match + else: + missing.append(lbl) + + if missing: + rp.add(f"[WARN] 以下标签找不到对应文件: {missing}") + rp.add(f"当前目录内容: {sorted(os.listdir(data_dir))}") + if not file_map: + return + labels = [l for l in labels if l in file_map] + + # 2. 逐类读取 + all_features = [] # list of (label, pd.DataFrame) + per_class_info: dict[str, dict] = {} + col_counts: dict[str, int] = {} + + for lbl in labels: + fname = file_map[lbl] + file_path = os.path.join(data_dir, fname) + info: dict[str, object] = {"label": lbl, "file": fname, "warnings": []} + + try: + raw = _read_data_file(file_path) + except Exception as e: + info["error"] = str(e) + per_class_info[lbl] = info + rp.add(f"[ERROR] 读取文件失败: {file_path} — {e}") + continue + + info["raw_rows"] = raw.shape[0] + info["raw_cols"] = raw.shape[1] + + # NaN 在原始文件中 + total_nan = raw.isna().sum().sum() + if total_nan > 0: + info["warnings"].append(f"原始文件含 {total_nan} 个 NaN 单元格") + + try: + features = _extract_features(raw, fname) + except ValueError as e: + info["error"] = str(e) + per_class_info[lbl] = info + rp.add(f"[ERROR] 特征提取失败: {file_path} — {e}") + continue + + # 丢弃含 NaN 的行(同 loadData 的 dropna 逻辑)后统计 + clean = features.dropna() + info["feature_cols"] = features.shape[1] + info["samples_after_dropna"] = clean.shape[0] + info["dropped_nan_rows"] = features.shape[0] - clean.shape[0] + info["values"] = clean.values + + col_counts[lbl] = features.shape[1] + + if clean.shape[0] == 0: + info["warnings"].append("去除 NaN 后无有效样本") + + per_class_info[lbl] = info + if clean.shape[0] > 0: + all_features.append((lbl, clean)) + + # 列数一致性 + if len(set(col_counts.values())) > 1: + rp.add() + rp.add("[WARN] 各标签的特征列数不一致!") + for lbl, cc in col_counts.items(): + rp.add(f" {lbl}: {cc} 列") + rp.add("这会导致 load_data 时补零逻辑产生差异。") + else: + rp.add(f"[OK] 所有标签特征列数一致: {next(iter(col_counts.values()), 0)} 列") + + # 样本数统计 + rp.add() + rp.add("── 各类别样本数 ──") + sample_counts: dict[str, int] = {} + for lbl in labels: + info = per_class_info.get(lbl, {}) + if "error" in info: + rp.add(f" [{lbl}] 加载失败: {info['error']}") + continue + n = info.get("samples_after_dropna", 0) + sample_counts[lbl] = n + warnings = info.get("warnings", []) + wflag = f" ⚠ {'; '.join(warnings)}" if warnings else "" + rp.add(f" [{lbl}] {n} 行 (文件: {info.get('file','?')}, " + f"原始 {info.get('raw_rows','?')} 行, " + f"丢弃 NaN 行 {info.get('dropped_nan_rows',0)}){wflag}") + + # 平衡性分析 + _analyze_balance(sample_counts, rp) + + # 统计 + 离群值 + _analyze_statistics(all_features, rp) + _analyze_outliers(all_features, rp) + + +def _check_multi_folder_mode(data_dir: str, labels: list[str], rp: ReportBuffer): + rp.add() + rp.add("── 数据模式: 多子特征模式 ──") + rp.add() + + all_features = [] + per_class_info: dict[str, dict] = {} + col_counts: dict[str, int] = {} + + for lbl in labels: + sub = os.path.join(data_dir, str(lbl)) + if not os.path.isdir(sub): + per_class_info[lbl] = {"error": f"子目录不存在: {sub}"} + rp.add(f"[ERROR] {lbl}: 子目录不存在") + continue + + files = sorted( + [f for f in os.listdir(sub) if _has_supported_extension(f)] + ) + if not files: + per_class_info[lbl] = {"error": f"子目录下无支持的文件: {sub}"} + rp.add(f"[ERROR] {lbl}: 子目录下无 .xlsx/.xls/.csv 文件") + continue + + class_frame_list = [] + single_file_cols = set() + total_raw = 0 + total_dropped = 0 + failed_files = [] + + for fname in files: + file_path = os.path.join(sub, fname) + try: + raw = _read_data_file(file_path) + except Exception as e: + failed_files.append(f" {fname}: {e}") + continue + + total_raw += raw.shape[0] + try: + features = _extract_features(raw, f"{lbl}/{fname}") + except ValueError as e: + failed_files.append(f" {fname}: {e}") + continue + + single_file_cols.add(features.shape[1]) + clean = features.dropna() + total_dropped += features.shape[0] - clean.shape[0] + if clean.shape[0] > 0: + class_frame_list.append(clean) + + info: dict[str, object] = { + "label": lbl, + "num_files": len(files), + "raw_rows_total": total_raw, + "dropped_nan_rows": total_dropped, + "warnings": [], + } + + if failed_files: + info["warnings"].append(f"{len(failed_files)} 个文件加载失败") + for ff in failed_files: + rp.add(f" [WARN] {ff}") + + if len(single_file_cols) > 1: + info["warnings"].append( + f"子文件间列数不一致: {sorted(single_file_cols)}" + ) + col_counts[lbl] = max(single_file_cols) + elif single_file_cols: + col_counts[lbl] = single_file_cols.pop() + else: + col_counts[lbl] = 0 + + if class_frame_list: + combined = pd.concat(class_frame_list, ignore_index=True) + info["samples_after_dropna"] = combined.shape[0] + info["feature_cols"] = combined.shape[1] + info["values"] = combined.values + all_features.append((lbl, combined)) + else: + info["samples_after_dropna"] = 0 + info["warnings"].append("无有效样本") + + per_class_info[lbl] = info + + # 列数一致性 + non_zero = {l: c for l, c in col_counts.items() if c > 0} + if non_zero and len(set(non_zero.values())) > 1: + rp.add() + rp.add("[WARN] 各标签的特征列数不一致(将使用零填充对齐):") + for lbl, cc in col_counts.items(): + rp.add(f" {lbl}: {cc} 列") + elif non_zero: + rp.add(f"[OK] 所有标签特征列数一致: {next(iter(non_zero.values()))} 列") + + # 样本数统计 + rp.add() + rp.add("── 各类别样本数 ──") + sample_counts: dict[str, int] = {} + for lbl in labels: + info = per_class_info.get(lbl, {}) + if "error" in info: + rp.add(f" [{lbl}] 加载失败: {info['error']}") + continue + n = info.get("samples_after_dropna", 0) + sample_counts[lbl] = n + wflag = "" + if info.get("warnings"): + wflag = f" ⚠ {'; '.join(info['warnings'])}" + rp.add(f" [{lbl}] {n} 行 " + f"(来自 {info.get('num_files','?')} 个文件, " + f"原始 {info.get('raw_rows_total','?')} 行, " + f"丢弃 NaN 行 {info.get('dropped_nan_rows',0)}){wflag}") + + _analyze_balance(sample_counts, rp) + _analyze_statistics(all_features, rp) + _analyze_outliers(all_features, rp) + + +# ============================================================ +# 分析子模块 +# ============================================================ + +def _analyze_balance(counts: dict[str, int], rp: ReportBuffer): + rp.add() + rp.add("── 类别平衡性分析 ──") + if not counts: + rp.add("无有效样本,跳过。") + return + + values = list(counts.values()) + total = sum(values) + n_classes = len(values) + avg = total / n_classes if n_classes else 0 + min_count = min(values) + max_count = max(values) + + rp.add(f" 总样本数 : {total}") + rp.add(f" 类别数 : {n_classes}") + rp.add(f" 平均每类 : {avg:.1f}") + rp.add(f" 最少样本类 : {min(counts, key=counts.get)} ({min_count})") + rp.add(f" 最多样本类 : {max(counts, key=counts.get)} ({max_count})") + + if min_count == 0: + rp.add(" [ERROR] 存在样本数为 0 的类别,训练将无法进行!") + return + + ratio = max_count / min_count if min_count > 0 else float("inf") + rp.add(f" 不平衡比例 : {ratio:.2f}:1 (max/min)") + + std_val = float(np.std(values)) + cv = std_val / avg if avg > 0 else 0 + rp.add(f" 变异系数(CV): {cv:.4f}") + + if ratio > 5: + rp.add(" [WARN] 类别严重不平衡 (>5:1),建议进行数据增强或使用类别权重。") + elif ratio > 3: + rp.add(" [WARN] 类别较不平衡 (>3:1),可考虑采样策略。") + else: + rp.add(" [OK] 类别基本平衡。") + + # 训练-测试划分预估 + rp.add() + rp.add(" ── 训练/测试划分预估 (test_size=0.2, stratify) ──") + for lbl, cnt in sorted(counts.items()): + test_n = max(1, int(cnt * 0.2)) + train_n = cnt - test_n + rp.add(f" [{lbl}] 训练 {train_n} / 测试 {test_n} (总计 {cnt})") + + +def _analyze_statistics( + all_features: list[tuple[str, pd.DataFrame]], + rp: ReportBuffer, +): + rp.add() + rp.add("── 特征统计信息 ──") + if not all_features: + rp.add("无有效数据,跳过。") + return + + # 全局特征统计(将各 array 零填充到相同列数) + max_cols = max(f[1].shape[1] for f in all_features) + padded_arrays = [] + for _, df in all_features: + val = df.values + if val.shape[1] < max_cols: + pad = np.zeros((val.shape[0], max_cols - val.shape[1]), dtype=val.dtype) + val = np.hstack([val, pad]) + padded_arrays.append(val) + all_values = np.vstack(padded_arrays) + rp.add(f" 全局特征维度: {all_values.shape} (样本数 × 特征数, 零填充对齐到 {max_cols} 列)") + rp.add(f" 全局均值 : {np.mean(all_values):.6f}") + rp.add(f" 全局标准差 : {np.std(all_values):.6f}") + rp.add(f" 全局最小值 : {np.min(all_values):.6f}") + rp.add(f" 全局最大值 : {np.max(all_values):.6f}") + rp.add(f" 全局中位数 : {np.median(all_values):.6f}") + + # 每个特征维度的统计 + rp.add() + n_cols = min(all_values.shape[1], 12) + rp.add(f" ── 前 {n_cols} 个特征维度的分布 (均值 ± 标准差) ──") + for j in range(n_cols): + col = all_values[:, j] + rp.add( + f" feature{j+1}: " + f"μ={np.mean(col):.4f} σ={np.std(col):.4f} " + f"[{np.min(col):.4f}, {np.max(col):.4f}] " + f"med={np.median(col):.4f}" + ) + + # 每个类别的简要统计 + rp.add() + rp.add(" ── 各类别特征统计 ──") + for lbl, df in all_features: + val = df.values + rp.add( + f" [{lbl}] " + f"μ={np.mean(val):.4f} σ={np.std(val):.4f} " + f"范围 [{np.min(val):.4f}, {np.max(val):.4f}]" + ) + + +def _analyze_outliers( + all_features: list[tuple[str, pd.DataFrame]], + rp: ReportBuffer, +): + rp.add() + rp.add("── 离群值检测 (基于 IQR) ──") + if not all_features: + rp.add("无有效数据,跳过。") + return + + total_outlier_samples = 0 + total_samples = 0 + + for lbl, df in all_features: + val = df.values + total_samples += val.shape[0] + q1 = np.percentile(val, 25, axis=0) + q3 = np.percentile(val, 75, axis=0) + iqr = q3 - q1 + lower = q1 - 1.5 * iqr + upper = q3 + 1.5 * iqr + + outlier_mask = np.any((val < lower) | (val > upper), axis=1) + n_outliers = int(np.sum(outlier_mask)) + total_outlier_samples += n_outliers + pct = n_outliers / val.shape[0] * 100 if val.shape[0] > 0 else 0 + status = "[OK]" if pct < 10 else "[WARN]" if pct < 25 else "[ERROR]" + rp.add(f" [{lbl}] 离群样本: {n_outliers}/{val.shape[0]} ({pct:.1f}%) {status}") + + overall_pct = total_outlier_samples / total_samples * 100 if total_samples > 0 else 0 + rp.add() + rp.add(f" 整体离群比例: {total_outlier_samples}/{total_samples} ({overall_pct:.1f}%)") + + if overall_pct > 20: + rp.add(" [WARN] 超过 20% 数据为离群值,请确认数据清洗是否正确。") + elif overall_pct > 10: + rp.add(" [INFO] 离群值比例偏高,训练时可能影响收敛。") + else: + rp.add(" [OK] 离群值比例正常。") + + +# ============================================================ +# 命令行入口 +# ============================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Deeplearning 数据质量检查脚本", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 + python Scripts/check_data.py -f "20260408 grap" -l 1 2 3 4 5 6 7 8 9 -o report.txt + python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 -r /path/to/project +""", + ) + parser.add_argument( + "-f", "--folder", + required=True, + help="Static/ 下的数据目录名(例如 20260319Numbers)", + ) + parser.add_argument( + "-l", "--labels", + nargs="+", + required=True, + help="类别标签列表,空格分隔(例如 0 1 2 3 4 或 A B C)", + ) + parser.add_argument( + "-o", "--output", + default=None, + help="将报告保存到指定文件路径", + ) + parser.add_argument( + "-r", "--root", + default=None, + help="项目根目录(默认为脚本的上级目录,即 Deeplearning/)", + ) + + args = parser.parse_args() + + # 确定项目根目录 + if args.root: + root = args.root + else: + script_dir = Path(__file__).resolve().parent + root = str(script_dir.parent) + + root = os.path.abspath(root) + + if not os.path.isdir(root): + print(f"[ERROR] 项目根目录不存在: {root}") + sys.exit(1) + + rp = ReportBuffer(output_path=args.output) + check_tabular_project( + root=root, + folder=args.folder, + labels=args.labels, + rp=rp, + ) + rp.save() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/Scripts/visualize.py b/Scripts/visualize.py new file mode 100644 index 0000000..8a1816f --- /dev/null +++ b/Scripts/visualize.py @@ -0,0 +1,657 @@ +#!/usr/bin/env python3 +""" +数据可视化脚本 +=============== +加载 Static/ 下的数据目录,生成多种可视化图表: + - 类别分布柱状图 + - 特征分布直方图(各类叠加) + - 特征箱线图(前 N 个特征) + - PCA 降维散点图 + 置信椭圆 + - t-SNE 降维散点图 + - 各类别均值/标准差对比热力图 + - 全局特征相关性热力图 + - 全局特征分布概览 + +用法: + python Scripts/visualize.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 + python Scripts/visualize.py -f "20260408 grap" -l 1 2 3 4 5 6 7 8 9 --max-features 20 + python Scripts/visualize.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 --no-tsne + +输出目录: Visualizations// +""" + +import os +import sys +import argparse +import unicodedata +from pathlib import Path +import warnings + +import numpy as np +import pandas as pd +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import seaborn as sns + +warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", category=UserWarning) + +# ============================================================ +# 数据加载工具(与 check_data.py 保持一致) +# ============================================================ + +DEFAULT_FILE_CLASSES = ("xlsx", "xls", "csv") + + +def _has_supported_extension(filename: str, file_classes=DEFAULT_FILE_CLASSES) -> bool: + ext = os.path.splitext(filename)[1].lower().lstrip(".") + return ext in file_classes + + +def _read_data_file(file_path: str) -> pd.DataFrame: + ext = os.path.splitext(file_path)[1].lower() + if ext == ".csv": + return pd.read_csv(file_path) + if ext in (".xls", ".xlsx"): + return pd.read_excel(file_path) + raise ValueError(f"Unsupported file format: {ext}: {file_path}") + + +def _strip_zero_width(s: str) -> str: + if not isinstance(s, str): + return s + return s.translate({0x200B: None, 0x200C: None, 0x200D: None, 0xFEFF: None}) + + +def _canonicalize_name(name: str) -> str: + name = unicodedata.normalize("NFKC", name) + return _strip_zero_width(name) + + +def _normalize_for_compare(name: str) -> str: + n = _canonicalize_name(name) + n = n.replace("_", " ") + n = " ".join(n.split()) + return n.lower() + + +def _find_matching_file(folder: str, expected_name: str): + expected = _canonicalize_name(expected_name) + try: + entries = os.listdir(folder) + except FileNotFoundError: + return None + for f in entries: + if _canonicalize_name(f) == expected: + return f + expected_lower = expected.lower() + for f in entries: + if _canonicalize_name(f).lower() == expected_lower: + return f + expected_relaxed = _normalize_for_compare(expected_name) + for f in entries: + if _normalize_for_compare(f) == expected_relaxed: + return f + return None + + +def _find_matching_file_by_label(folder: str, label_name, file_classes): + for ext in file_classes: + expected_name = f"{label_name}.{ext}" + match = _find_matching_file(folder, expected_name) + if match is not None: + return match + return None + + +def _extract_features(df: pd.DataFrame, source: str) -> pd.DataFrame: + even_cols = [c for i, c in enumerate(df.columns) if i % 2 == 1] + if not even_cols: + raise ValueError(f"没有找到偶数列(特征列)。请检查文件: {source}") + features = df[even_cols].copy() + for c in features.columns: + features[c] = pd.to_numeric(features[c], errors="coerce") + return features + + +# ============================================================ +# 数据加载 +# ============================================================ + +def load_all_data(root: str, folder: str, labels: list[str]): + data_dir = os.path.join(root, "Static", folder) + if not os.path.isdir(data_dir): + print(f"[ERROR] 目录不存在: {data_dir}") + sys.exit(1) + + has_all_files = all( + _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES) is not None + for lbl in labels + ) + has_all_subfolders = all( + os.path.isdir(os.path.join(data_dir, str(lbl))) + and any(_has_supported_extension(f) for f in os.listdir(os.path.join(data_dir, str(lbl)))) + for lbl in labels + ) + + if has_all_files and not has_all_subfolders: + return _load_single_file_mode(data_dir, labels) + elif has_all_subfolders and not has_all_files: + return _load_multi_folder_mode(data_dir, labels) + else: + print("[WARN] 数据模式不明确,尝试单文件模式...") + return _load_single_file_mode(data_dir, labels) + + +def _load_single_file_mode(data_dir: str, labels: list[str]): + all_features = [] + col_counts = {} + label_names = [] + + for lbl in labels: + fname = _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES) + if fname is None: + print(f"[WARN] 标签 {lbl} 找不到文件,跳过") + continue + file_path = os.path.join(data_dir, fname) + try: + raw = _read_data_file(file_path) + except Exception as e: + print(f"[ERROR] 读取 {file_path} 失败: {e}") + continue + try: + features = _extract_features(raw, fname) + except ValueError as e: + print(f"[ERROR] {e}") + continue + + clean = features.dropna() + if clean.shape[0] == 0: + print(f"[WARN] 标签 {lbl} 去除 NaN 后无样本,跳过") + continue + + col_counts[lbl] = clean.shape[1] + all_features.append((lbl, clean)) + label_names.append(lbl) + + return _build_arrays(all_features, label_names, col_counts) + + +def _load_multi_folder_mode(data_dir: str, labels: list[str]): + all_features = [] + col_counts = {} + label_names = [] + + for lbl in labels: + sub = os.path.join(data_dir, str(lbl)) + if not os.path.isdir(sub): + print(f"[WARN] 标签 {lbl} 子目录不存在,跳过") + continue + + files = sorted([f for f in os.listdir(sub) if _has_supported_extension(f)]) + if not files: + print(f"[WARN] 标签 {lbl} 子目录无文件,跳过") + continue + + frames = [] + max_cols_in_class = 0 + for fname in files: + file_path = os.path.join(sub, fname) + try: + raw = _read_data_file(file_path) + except Exception as e: + print(f"[WARN] 读取 {file_path} 失败: {e}") + continue + try: + feat = _extract_features(raw, f"{lbl}/{fname}") + except ValueError as e: + print(f"[WARN] {e}") + continue + clean = feat.dropna() + if clean.shape[0] > 0: + frames.append(clean) + max_cols_in_class = max(max_cols_in_class, clean.shape[1]) + + if not frames: + print(f"[WARN] 标签 {lbl} 无有效样本,跳过") + continue + + padded_frames = [] + for f in frames: + if f.shape[1] < max_cols_in_class: + pad = np.zeros((f.shape[0], max_cols_in_class - f.shape[1])) + padded = pd.DataFrame( + np.hstack([f.values, pad]), + columns=list(f.columns) + [f"_pad_{i}" for i in range(max_cols_in_class - f.shape[1])], + ) + padded_frames.append(padded) + else: + padded_frames.append(f) + + combined = pd.concat(padded_frames, ignore_index=True) + col_counts[lbl] = combined.shape[1] + all_features.append((lbl, combined)) + label_names.append(lbl) + + return _build_arrays(all_features, label_names, col_counts) + + +def _build_arrays(all_features, label_names, col_counts): + if not all_features: + print("[ERROR] 没有加载到任何有效数据") + sys.exit(1) + + max_cols = max(c for c in col_counts.values()) + + X_list = [] + y_list = [] + for idx, (lbl, df) in enumerate(all_features): + val = df.values + if val.shape[1] < max_cols: + pad = np.zeros((val.shape[0], max_cols - val.shape[1]), dtype=val.dtype) + val = np.hstack([val, pad]) + X_list.append(val) + y_list.append(np.full(val.shape[0], idx, dtype=int)) + + X = np.vstack(X_list) + y = np.concatenate(y_list) + + return X, y, label_names, all_features, col_counts + + +# ============================================================ +# 可视化函数 +# ============================================================ + +TAB10 = plt.cm.tab10.colors + + +def _ensure_dir(path: str): + os.makedirs(path, exist_ok=True) + + +def plot_class_distribution(y, label_names, out_dir: str): + fig, ax = plt.subplots(figsize=(max(8, len(label_names) * 0.6), 5)) + counts = [int(np.sum(y == i)) for i in range(len(label_names))] + colors = [TAB10[i % 10] for i in range(len(label_names))] + bars = ax.bar(label_names, counts, color=colors, edgecolor="white", linewidth=0.8) + for bar, cnt in zip(bars, counts): + ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + max(counts) * 0.01, + str(cnt), ha="center", va="bottom", fontsize=9) + ax.set_xlabel("类别") + ax.set_ylabel("样本数") + ax.set_title(f"类别分布 (总计 {sum(counts)} 样本, {len(label_names)} 类)") + fig.tight_layout() + path = os.path.join(out_dir, "01_class_distribution.png") + fig.savefig(path, dpi=150) + plt.close(fig) + print(f"[OK] {path}") + + +def plot_feature_histograms(all_features, out_dir: str, max_features: int = 12): + n_features = min(max_features, all_features[0][1].shape[1]) + n_cols = 4 + n_rows = (n_features + n_cols - 1) // n_cols + + fig, axes = plt.subplots(n_rows, n_cols, figsize=(4 * n_cols, 3 * n_rows)) + axes = axes.flatten() if n_rows * n_cols > 1 else [axes] + + colors = [TAB10[i % 10] for i in range(len(all_features))] + + for j in range(n_features): + ax = axes[j] + for idx, (lbl, df) in enumerate(all_features): + if j < df.shape[1]: + col = df.iloc[:, j].values + ax.hist(col, bins=40, density=True, alpha=0.4, color=colors[idx], + label=f"类 {lbl}") + ax.set_title(f"Feature {j+1}") + ax.set_xlabel("值") + ax.set_ylabel("密度") + if n_features <= 8: + ax.legend(fontsize=7, loc="upper right") + + for j in range(n_features, len(axes)): + axes[j].set_visible(False) + + if n_features > 8: + handles = [plt.Rectangle((0, 0), 1, 1, color=colors[i], alpha=0.4) + for i in range(len(all_features))] + fig.legend(handles, [lbl for lbl, _ in all_features], + loc="lower center", ncol=min(10, len(all_features)), fontsize=7) + + fig.suptitle(f"特征分布直方图(各类叠加,前 {n_features} 维)", fontsize=13, y=1.01) + fig.tight_layout() + path = os.path.join(out_dir, "02_feature_histograms.png") + fig.savefig(path, dpi=150, bbox_inches="tight") + plt.close(fig) + print(f"[OK] {path}") + + +def plot_feature_boxplots(all_features, out_dir: str, max_features: int = 20): + n_features = min(max_features, all_features[0][1].shape[1]) + n_cols = 4 + n_rows = (n_features + n_cols - 1) // n_cols + + fig, axes = plt.subplots(n_rows, n_cols, figsize=(4.5 * n_cols, 3.5 * n_rows)) + axes = axes.flatten() if n_rows * n_cols > 1 else [axes] + + for j in range(n_features): + ax = axes[j] + data_list = [] + positions = [] + labels_for_box = [] + for idx, (lbl, df) in enumerate(all_features): + if j < df.shape[1]: + data_list.append(df.iloc[:, j].values) + positions.append(idx + 1) + labels_for_box.append(str(lbl)) + + bp = ax.boxplot(data_list, positions=positions, labels=labels_for_box, + patch_artist=True, widths=0.6, showfliers=True, + flierprops={"marker": ".", "markersize": 2, "alpha": 0.3}) + for patch, idx in zip(bp["boxes"], range(len(data_list))): + patch.set_facecolor(TAB10[idx % 10]) + patch.set_alpha(0.6) + ax.set_title(f"Feature {j+1}") + ax.set_xlabel("类别") + ax.tick_params(axis="x", rotation=0, labelsize=8) + + for j in range(n_features, len(axes)): + axes[j].set_visible(False) + + fig.suptitle(f"特征箱线图(各类别对比,前 {n_features} 维)", fontsize=13, y=1.01) + fig.tight_layout() + path = os.path.join(out_dir, "03_feature_boxplots.png") + fig.savefig(path, dpi=150, bbox_inches="tight") + plt.close(fig) + print(f"[OK] {path}") + + +def _plot_confidence_ellipse(ax, mean, cov, color, alpha=0.2, n_std=1.0): + from matplotlib.patches import Ellipse + vals, vecs = np.linalg.eigh(cov) + order = vals.argsort()[::-1] + vals = vals[order] + vecs = vecs[:, order] + angle = np.degrees(np.arctan2(vecs[1, 0], vecs[0, 0])) + width, height = 2 * n_std * np.sqrt(vals) + ellipse = Ellipse(xy=mean, width=width, height=height, angle=angle, + facecolor=color, alpha=alpha, edgecolor=color, linewidth=0.8) + ax.add_patch(ellipse) + + +def plot_pca(X, y, label_names, out_dir: str): + from sklearn.decomposition import PCA + + pca = PCA(n_components=2, random_state=42) + X_pca = pca.fit_transform(X) + + fig, axes = plt.subplots(1, 2, figsize=(14, 6)) + colors = [TAB10[i % 10] for i in range(len(label_names))] + + # 散点图 + ax = axes[0] + for i, lbl in enumerate(label_names): + mask = y == i + ax.scatter(X_pca[mask, 0], X_pca[mask, 1], c=[colors[i]], label=f"类 {lbl}", + alpha=0.5, s=3, edgecolors="none") + ax.set_xlabel(f"PC1 ({pca.explained_variance_ratio_[0]*100:.1f}%)") + ax.set_ylabel(f"PC2 ({pca.explained_variance_ratio_[1]*100:.1f}%)") + ax.set_title("PCA 降维散点图") + ax.legend(fontsize=7, markerscale=3, loc="best") + + # 质心+椭圆 + ax2 = axes[1] + for i, lbl in enumerate(label_names): + mask = y == i + class_points = X_pca[mask] + mean = class_points.mean(axis=0) + ax2.scatter(mean[0], mean[1], c=[colors[i]], s=80, marker="X", + edgecolors="black", linewidths=0.8, zorder=5) + ax2.annotate(str(lbl), (mean[0], mean[1]), fontsize=8, ha="center", va="bottom", + fontweight="bold", xytext=(0, 4), textcoords="offset points") + if class_points.shape[0] > 2: + cov = np.cov(class_points.T) + _plot_confidence_ellipse(ax2, mean, cov, color=colors[i], alpha=0.25) + + ax2.set_xlabel(f"PC1 ({pca.explained_variance_ratio_[0]*100:.1f}%)") + ax2.set_ylabel(f"PC2 ({pca.explained_variance_ratio_[1]*100:.1f}%)") + ax2.set_title("PCA 质心 + 1σ 置信椭圆") + + fig.suptitle("PCA 降维分析", fontsize=14) + fig.tight_layout() + path = os.path.join(out_dir, "04_pca.png") + fig.savefig(path, dpi=150) + plt.close(fig) + print(f"[OK] {path}") + + # 方差解释率 + fig2, ax = plt.subplots(figsize=(8, 4)) + n = min(30, len(pca.explained_variance_ratio_)) + cumsum = np.cumsum(pca.explained_variance_ratio_[:n]) + ax.bar(range(1, n + 1), pca.explained_variance_ratio_[:n], + alpha=0.6, color="steelblue", label="个体") + ax.plot(range(1, n + 1), cumsum, "ro-", markersize=4, label="累计") + ax.set_xlabel("主成分") + ax.set_ylabel("方差解释率") + ax.set_title("PCA 方差解释率") + ax.legend() + fig2.tight_layout() + path2 = os.path.join(out_dir, "04_pca_variance.png") + fig2.savefig(path2, dpi=150) + plt.close(fig2) + print(f"[OK] {path2}") + + +def plot_tsne(X, y, label_names, out_dir: str, max_samples: int = 5000): + from sklearn.manifold import TSNE + + if X.shape[0] > max_samples: + print(f"[INFO] t-SNE: 样本过多 ({X.shape[0]}), 分层抽样至 {max_samples}") + indices = [] + per_class = max_samples // len(label_names) + for i in range(len(label_names)): + idx_i = np.where(y == i)[0] + if len(idx_i) <= per_class: + indices.extend(idx_i.tolist()) + else: + rng = np.random.RandomState(42) + indices.extend(rng.choice(idx_i, per_class, replace=False).tolist()) + indices = np.array(indices) + X_sub = X[indices] + y_sub = y[indices] + else: + X_sub = X + y_sub = y + + print("[INFO] 正在计算 t-SNE(可能需要一些时间)...") + perplexity = min(50, max(5, X_sub.shape[0] // 3)) + tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity, + n_iter=1000, verbose=0) + X_tsne = tsne.fit_transform(X_sub) + + colors = [TAB10[i % 10] for i in range(len(label_names))] + + fig, ax = plt.subplots(figsize=(10, 8)) + for i, lbl in enumerate(label_names): + mask = y_sub == i + ax.scatter(X_tsne[mask, 0], X_tsne[mask, 1], c=[colors[i]], label=f"类 {lbl}", + alpha=0.5, s=3, edgecolors="none") + ax.set_xlabel("t-SNE 1") + ax.set_ylabel("t-SNE 2") + ax.set_title(f"t-SNE 降维散点图 (n={X_sub.shape[0]}, perplexity={perplexity})") + ax.legend(fontsize=7, markerscale=3, loc="best") + fig.tight_layout() + path = os.path.join(out_dir, "05_tsne.png") + fig.savefig(path, dpi=150) + plt.close(fig) + print(f"[OK] {path}") + + +def plot_class_mean_std_heatmap(all_features, label_names, out_dir: str, max_features: int = 30): + n_features = min(max_features, all_features[0][1].shape[1]) + n_classes = len(label_names) + + mean_matrix = np.zeros((n_classes, n_features)) + std_matrix = np.zeros((n_classes, n_features)) + + for i, (lbl, df) in enumerate(all_features): + for j in range(min(n_features, df.shape[1])): + col = df.iloc[:, j].values + mean_matrix[i, j] = np.mean(col) + std_matrix[i, j] = np.std(col) + for j in range(df.shape[1], n_features): + mean_matrix[i, j] = 0.0 + std_matrix[i, j] = 0.0 + + fig, axes = plt.subplots(1, 2, figsize=(max(10, n_features * 0.35), max(5, n_classes * 0.5))) + + sns.heatmap(mean_matrix, ax=axes[0], cmap="RdBu_r", center=0, + xticklabels=[f"F{i+1}" for i in range(n_features)] if n_features <= 30 else False, + yticklabels=label_names, + annot=n_features <= 20, fmt=".3f" if n_features <= 20 else "", + linewidths=0.5, cbar_kws={"label": "均值", "shrink": 0.8}) + axes[0].set_title("各类别特征均值") + axes[0].set_xlabel("特征维度") + + sns.heatmap(std_matrix, ax=axes[1], cmap="YlOrRd", + xticklabels=[f"F{i+1}" for i in range(n_features)] if n_features <= 30 else False, + yticklabels=label_names, + annot=n_features <= 20, fmt=".3f" if n_features <= 20 else "", + linewidths=0.5, cbar_kws={"label": "标准差", "shrink": 0.8}) + axes[1].set_title("各类别特征标准差") + axes[1].set_xlabel("特征维度") + + fig.suptitle("各类别特征统计对比", fontsize=13) + fig.tight_layout() + path = os.path.join(out_dir, "06_class_mean_std_heatmap.png") + fig.savefig(path, dpi=150, bbox_inches="tight") + plt.close(fig) + print(f"[OK] {path}") + + +def plot_correlation_heatmap(X, out_dir: str, max_features: int = 30): + n_features = min(max_features, X.shape[1]) + X_sub = X[:, :n_features] + + corr = np.corrcoef(X_sub.T) + + fig, ax = plt.subplots(figsize=(max(10, n_features * 0.5), max(8, n_features * 0.45))) + sns.heatmap(corr, ax=ax, cmap="RdBu_r", center=0, vmin=-1, vmax=1, + xticklabels=[f"F{i+1}" for i in range(n_features)] if n_features <= 30 else False, + yticklabels=[f"F{i+1}" for i in range(n_features)] if n_features <= 30 else False, + linewidths=0.1, cbar_kws={"label": "Pearson r", "shrink": 0.8}) + ax.set_title(f"特征相关性矩阵 (前 {n_features} 维)") + fig.tight_layout() + path = os.path.join(out_dir, "07_correlation_heatmap.png") + fig.savefig(path, dpi=150) + plt.close(fig) + print(f"[OK] {path}") + + +def plot_global_distribution(X, out_dir: str): + fig, axes = plt.subplots(1, 2, figsize=(12, 5)) + + all_vals = X.flatten() + axes[0].hist(all_vals, bins=100, color="steelblue", alpha=0.7, edgecolor="white", + linewidth=0.3) + axes[0].axvline(np.mean(all_vals), color="red", linestyle="--", + label=f"均值={np.mean(all_vals):.4f}") + axes[0].axvline(np.median(all_vals), color="orange", linestyle="--", + label=f"中位数={np.median(all_vals):.4f}") + axes[0].set_xlabel("特征值") + axes[0].set_ylabel("频数") + axes[0].set_title("全局特征值分布") + axes[0].legend() + + means = np.mean(X, axis=0) + stds = np.std(X, axis=0) + n_features = min(X.shape[1], 50) + axes[1].errorbar(range(1, n_features + 1), means[:n_features], yerr=stds[:n_features], + fmt="o", markersize=3, capsize=2, color="steelblue", alpha=0.7) + axes[1].set_xlabel("特征维度") + axes[1].set_ylabel("均值 ± 标准差") + axes[1].set_title(f"各维度均值与标准差 (前 {n_features} 维)") + + fig.suptitle("全局特征概览", fontsize=13) + fig.tight_layout() + path = os.path.join(out_dir, "08_global_distribution.png") + fig.savefig(path, dpi=150) + plt.close(fig) + print(f"[OK] {path}") + + +# ============================================================ +# 命令行入口 +# ============================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Deeplearning 数据可视化脚本", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + python Scripts/visualize.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 + python Scripts/visualize.py -f "20260408 grap" -l 1 2 3 4 5 6 7 8 9 + python Scripts/visualize.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 --no-tsne --max-features 15 +""", + ) + parser.add_argument("-f", "--folder", required=True, + help="Static/ 下的数据目录名") + parser.add_argument("-l", "--labels", nargs="+", required=True, + help="类别标签列表,空格分隔") + parser.add_argument("-r", "--root", default=None, + help="项目根目录(默认为 Deeplearning/)") + parser.add_argument("--max-features", type=int, default=20, + help="可视化中显示的最大特征维度数 (默认 20)") + parser.add_argument("--no-tsne", action="store_true", + help="跳过 t-SNE 计算") + parser.add_argument("--no-pca", action="store_true", + help="跳过 PCA 计算") + parser.add_argument("--tsne-max-samples", type=int, default=5000, + help="t-SNE 最大抽样数 (默认 5000)") + + args = parser.parse_args() + + if args.root: + root = args.root + else: + root = str(Path(__file__).resolve().parent.parent) + root = os.path.abspath(root) + + print(f"加载数据: {root}/Static/{args.folder}") + X, y, label_names, all_features, col_counts = load_all_data(root, args.folder, args.labels) + print(f" 样本数: {X.shape[0]}, 特征维度: {X.shape[1]}, 类别数: {len(label_names)}") + for lbl in label_names: + cnt = int(np.sum(y == label_names.index(lbl))) + print(f" 类 {lbl}: {cnt} 样本, {col_counts.get(lbl, X.shape[1])} 列") + + out_dir = os.path.join(root, "Visualizations", args.folder) + _ensure_dir(out_dir) + print(f"\n输出目录: {out_dir}\n") + + plt.rcParams["font.family"] = "sans-serif" + plt.rcParams["font.sans-serif"] = ["DejaVu Sans"] + + print("生成可视化图表...\n") + plot_class_distribution(y, label_names, out_dir) + plot_feature_histograms(all_features, out_dir, + max_features=min(args.max_features, X.shape[1])) + plot_feature_boxplots(all_features, out_dir, + max_features=min(args.max_features, X.shape[1])) + if not args.no_pca: + plot_pca(X, y, label_names, out_dir) + if not args.no_tsne: + plot_tsne(X, y, label_names, out_dir, max_samples=args.tsne_max_samples) + plot_class_mean_std_heatmap(all_features, label_names, out_dir, + max_features=min(args.max_features, X.shape[1])) + plot_correlation_heatmap(X, out_dir, max_features=min(30, X.shape[1])) + plot_global_distribution(X, out_dir) + + print(f"\n全部图表已保存到: {out_dir}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/main.py b/main.py index 3dd0c0e..dc747f1 100644 --- a/main.py +++ b/main.py @@ -69,11 +69,11 @@ def _save_yaml(file_path, data): def main(): # 输入元数据文件夹名称 - projet_name = '20260409 grap' + projet_name = '20260512 Graps' # 请在[]内输入每一个分类的名称 - # label_names 是一个列表里面按顺序包含了小写的‘a'到‘z’ - label_names = list(range(1, 10)) - hidden_layers = [256, 128, 128, 128] + + label_names = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I'] # label_names是大写的A-I + hidden_layers = [256, 256] test_size = 0.5 dropout_rate = 0 epochs = 300