Deeplearning/Scripts/check_data.py

631 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
数据质量检查脚本
===================
对 Static/ 下的数据目录执行完整性、统计、平衡性与离群值检查,
生成详细报告输出到终端,并可保存为文本文件。
用法:
python Scripts/check_data.py --folder 20260319Numbers --labels 0 1 2 3 4 5 6 7 8 9
python Scripts/check_data.py --folder "20260408 grap" --labels 1 2 3 4 5 6 7 8 9 --output report.txt
python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9
要求:
在项目根目录 (Deeplearning/) 下运行,或通过 --root 指定项目根目录。
"""
import os
import sys
import argparse
import unicodedata
from pathlib import Path
import numpy as np
import pandas as pd
# ============================================================
# 工具函数(与 loadData.py 中逻辑保持一致)
# ============================================================
DEFAULT_FILE_CLASSES = ("xlsx", "xls", "csv")
def _has_supported_extension(filename: str, file_classes=DEFAULT_FILE_CLASSES) -> bool:
ext = os.path.splitext(filename)[1].lower().lstrip(".")
return ext in file_classes
def _read_data_file(file_path: str) -> pd.DataFrame:
ext = os.path.splitext(file_path)[1].lower()
if ext == ".csv":
return pd.read_csv(file_path)
if ext in (".xls", ".xlsx"):
return pd.read_excel(file_path)
raise ValueError(
f"Unsupported file format: {ext}. Only .xls, .xlsx, and .csv are supported. "
f"File: {file_path}"
)
def _strip_zero_width(s: str) -> str:
if not isinstance(s, str):
return s
return s.translate(
{0x200B: None, 0x200C: None, 0x200D: None, 0xFEFF: None}
)
def _canonicalize_name(name: str) -> str:
name = unicodedata.normalize("NFKC", name)
name = _strip_zero_width(name)
return name
def _normalize_for_compare(name: str) -> str:
n = _canonicalize_name(name)
n = n.replace("_", " ")
n = " ".join(n.split())
return n.lower()
def _find_matching_file(folder: str, expected_name: str):
expected = _canonicalize_name(expected_name)
try:
entries = os.listdir(folder)
except FileNotFoundError:
return None
for f in entries:
if _canonicalize_name(f) == expected:
return f
expected_lower = expected.lower()
for f in entries:
if _canonicalize_name(f).lower() == expected_lower:
return f
expected_relaxed = _normalize_for_compare(expected_name)
for f in entries:
if _normalize_for_compare(f) == expected_relaxed:
return f
return None
def _find_matching_file_by_label(folder: str, label_name, file_classes):
for ext in file_classes:
expected_name = f"{label_name}.{ext}"
match = _find_matching_file(folder, expected_name)
if match is not None:
return match
return None
# ============================================================
# 报告生成工具
# ============================================================
class ReportBuffer:
"""收集报告行,并同时输出到 stdout 和文件。"""
def __init__(self, output_path=None):
self.lines: list[str] = []
self.output_path = output_path
def add(self, text: str = ""):
print(text)
self.lines.append(text)
def save(self):
if self.output_path:
with open(self.output_path, "w", encoding="utf-8") as f:
f.write("\n".join(self.lines) + "\n")
print(f"\n报告已保存到: {self.output_path}")
# ============================================================
# 核心检查逻辑
# ============================================================
def _extract_features(df: pd.DataFrame, source: str) -> pd.DataFrame:
"""
按项目约定提取偶数列作为特征(保持 int 列名以对齐)。
返回特征 DataFrame列名 0, 2, 4, ...)。
"""
# 偶数列索引: 1, 3, 5, ...
even_cols = [c for i, c in enumerate(df.columns) if i % 2 == 1]
if not even_cols:
raise ValueError(f"没有找到偶数列(特征列)。请检查文件: {source}")
features = df[even_cols].copy()
# 尝试转为数值
for c in features.columns:
features[c] = pd.to_numeric(features[c], errors="coerce")
return features
def check_tabular_project(root: str, folder: str, labels: list[str], rp: ReportBuffer):
"""完整检查流程"""
data_dir = os.path.join(root, "Static", folder)
if not os.path.isdir(data_dir):
rp.add(f"[ERROR] 目录不存在: {data_dir}")
rp.add("请确认 --folder 参数正确。")
return
rp.add("=" * 64)
rp.add(" Deeplearning 数据质量检查报告")
rp.add("=" * 64)
rp.add(f" 数据目录 : {data_dir}")
rp.add(f" 标签数量 : {len(labels)}")
rp.add(f" 标签列表 : {labels}")
rp.add()
# ---- 第一步:检测数据模式 ----
has_all_subfolders = True
for lbl in labels:
sub = os.path.join(data_dir, str(lbl))
if not (os.path.isdir(sub) and any(_has_supported_extension(f) for f in os.listdir(sub))):
has_all_subfolders = False
break
has_all_files = True
for lbl in labels:
if _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES) is None:
has_all_files = False
break
if has_all_files and not has_all_subfolders:
mode = "single_file"
elif has_all_subfolders and not has_all_files:
mode = "multi_folder"
else:
rp.add("[ERROR] 无法自动检测数据模式,或两种模式同时存在。")
rp.add(f" has_all_files : {has_all_files}")
rp.add(f" has_all_subfolders: {has_all_subfolders}")
rp.add("请确保每个 label 对应唯一的文件或唯一的子目录。")
return
if mode == "single_file":
_check_single_file_mode(data_dir, labels, rp)
else:
_check_multi_folder_mode(data_dir, labels, rp)
rp.add()
rp.add("=" * 64)
rp.add(" 检查完成。")
rp.add("=" * 64)
def _check_single_file_mode(data_dir: str, labels: list[str], rp: ReportBuffer):
rp.add()
rp.add("── 数据模式: 单文件模式 ──")
rp.add()
# 1. 定位实际文件名
file_map: dict[str, str] = {}
missing: list[str] = []
for lbl in labels:
match = _find_matching_file_by_label(data_dir, lbl, DEFAULT_FILE_CLASSES)
if match:
file_map[lbl] = match
else:
missing.append(lbl)
if missing:
rp.add(f"[WARN] 以下标签找不到对应文件: {missing}")
rp.add(f"当前目录内容: {sorted(os.listdir(data_dir))}")
if not file_map:
return
labels = [l for l in labels if l in file_map]
# 2. 逐类读取
all_features = [] # list of (label, pd.DataFrame)
per_class_info: dict[str, dict] = {}
col_counts: dict[str, int] = {}
for lbl in labels:
fname = file_map[lbl]
file_path = os.path.join(data_dir, fname)
info: dict[str, object] = {"label": lbl, "file": fname, "warnings": []}
try:
raw = _read_data_file(file_path)
except Exception as e:
info["error"] = str(e)
per_class_info[lbl] = info
rp.add(f"[ERROR] 读取文件失败: {file_path}{e}")
continue
info["raw_rows"] = raw.shape[0]
info["raw_cols"] = raw.shape[1]
# NaN 在原始文件中
total_nan = raw.isna().sum().sum()
if total_nan > 0:
info["warnings"].append(f"原始文件含 {total_nan} 个 NaN 单元格")
try:
features = _extract_features(raw, fname)
except ValueError as e:
info["error"] = str(e)
per_class_info[lbl] = info
rp.add(f"[ERROR] 特征提取失败: {file_path}{e}")
continue
# 丢弃含 NaN 的行(同 loadData 的 dropna 逻辑)后统计
clean = features.dropna()
info["feature_cols"] = features.shape[1]
info["samples_after_dropna"] = clean.shape[0]
info["dropped_nan_rows"] = features.shape[0] - clean.shape[0]
info["values"] = clean.values
col_counts[lbl] = features.shape[1]
if clean.shape[0] == 0:
info["warnings"].append("去除 NaN 后无有效样本")
per_class_info[lbl] = info
if clean.shape[0] > 0:
all_features.append((lbl, clean))
# 列数一致性
if len(set(col_counts.values())) > 1:
rp.add()
rp.add("[WARN] 各标签的特征列数不一致!")
for lbl, cc in col_counts.items():
rp.add(f" {lbl}: {cc}")
rp.add("这会导致 load_data 时补零逻辑产生差异。")
else:
rp.add(f"[OK] 所有标签特征列数一致: {next(iter(col_counts.values()), 0)}")
# 样本数统计
rp.add()
rp.add("── 各类别样本数 ──")
sample_counts: dict[str, int] = {}
for lbl in labels:
info = per_class_info.get(lbl, {})
if "error" in info:
rp.add(f" [{lbl}] 加载失败: {info['error']}")
continue
n = info.get("samples_after_dropna", 0)
sample_counts[lbl] = n
warnings = info.get("warnings", [])
wflag = f"{'; '.join(warnings)}" if warnings else ""
rp.add(f" [{lbl}] {n} 行 (文件: {info.get('file','?')}, "
f"原始 {info.get('raw_rows','?')} 行, "
f"丢弃 NaN 行 {info.get('dropped_nan_rows',0)}){wflag}")
# 平衡性分析
_analyze_balance(sample_counts, rp)
# 统计 + 离群值
_analyze_statistics(all_features, rp)
_analyze_outliers(all_features, rp)
def _check_multi_folder_mode(data_dir: str, labels: list[str], rp: ReportBuffer):
rp.add()
rp.add("── 数据模式: 多子特征模式 ──")
rp.add()
all_features = []
per_class_info: dict[str, dict] = {}
col_counts: dict[str, int] = {}
for lbl in labels:
sub = os.path.join(data_dir, str(lbl))
if not os.path.isdir(sub):
per_class_info[lbl] = {"error": f"子目录不存在: {sub}"}
rp.add(f"[ERROR] {lbl}: 子目录不存在")
continue
files = sorted(
[f for f in os.listdir(sub) if _has_supported_extension(f)]
)
if not files:
per_class_info[lbl] = {"error": f"子目录下无支持的文件: {sub}"}
rp.add(f"[ERROR] {lbl}: 子目录下无 .xlsx/.xls/.csv 文件")
continue
class_frame_list = []
single_file_cols = set()
total_raw = 0
total_dropped = 0
failed_files = []
for fname in files:
file_path = os.path.join(sub, fname)
try:
raw = _read_data_file(file_path)
except Exception as e:
failed_files.append(f" {fname}: {e}")
continue
total_raw += raw.shape[0]
try:
features = _extract_features(raw, f"{lbl}/{fname}")
except ValueError as e:
failed_files.append(f" {fname}: {e}")
continue
single_file_cols.add(features.shape[1])
clean = features.dropna()
total_dropped += features.shape[0] - clean.shape[0]
if clean.shape[0] > 0:
class_frame_list.append(clean)
info: dict[str, object] = {
"label": lbl,
"num_files": len(files),
"raw_rows_total": total_raw,
"dropped_nan_rows": total_dropped,
"warnings": [],
}
if failed_files:
info["warnings"].append(f"{len(failed_files)} 个文件加载失败")
for ff in failed_files:
rp.add(f" [WARN] {ff}")
if len(single_file_cols) > 1:
info["warnings"].append(
f"子文件间列数不一致: {sorted(single_file_cols)}"
)
col_counts[lbl] = max(single_file_cols)
elif single_file_cols:
col_counts[lbl] = single_file_cols.pop()
else:
col_counts[lbl] = 0
if class_frame_list:
combined = pd.concat(class_frame_list, ignore_index=True)
info["samples_after_dropna"] = combined.shape[0]
info["feature_cols"] = combined.shape[1]
info["values"] = combined.values
all_features.append((lbl, combined))
else:
info["samples_after_dropna"] = 0
info["warnings"].append("无有效样本")
per_class_info[lbl] = info
# 列数一致性
non_zero = {l: c for l, c in col_counts.items() if c > 0}
if non_zero and len(set(non_zero.values())) > 1:
rp.add()
rp.add("[WARN] 各标签的特征列数不一致(将使用零填充对齐):")
for lbl, cc in col_counts.items():
rp.add(f" {lbl}: {cc}")
elif non_zero:
rp.add(f"[OK] 所有标签特征列数一致: {next(iter(non_zero.values()))}")
# 样本数统计
rp.add()
rp.add("── 各类别样本数 ──")
sample_counts: dict[str, int] = {}
for lbl in labels:
info = per_class_info.get(lbl, {})
if "error" in info:
rp.add(f" [{lbl}] 加载失败: {info['error']}")
continue
n = info.get("samples_after_dropna", 0)
sample_counts[lbl] = n
wflag = ""
if info.get("warnings"):
wflag = f"{'; '.join(info['warnings'])}"
rp.add(f" [{lbl}] {n}"
f"(来自 {info.get('num_files','?')} 个文件, "
f"原始 {info.get('raw_rows_total','?')} 行, "
f"丢弃 NaN 行 {info.get('dropped_nan_rows',0)}){wflag}")
_analyze_balance(sample_counts, rp)
_analyze_statistics(all_features, rp)
_analyze_outliers(all_features, rp)
# ============================================================
# 分析子模块
# ============================================================
def _analyze_balance(counts: dict[str, int], rp: ReportBuffer):
rp.add()
rp.add("── 类别平衡性分析 ──")
if not counts:
rp.add("无有效样本,跳过。")
return
values = list(counts.values())
total = sum(values)
n_classes = len(values)
avg = total / n_classes if n_classes else 0
min_count = min(values)
max_count = max(values)
rp.add(f" 总样本数 : {total}")
rp.add(f" 类别数 : {n_classes}")
rp.add(f" 平均每类 : {avg:.1f}")
rp.add(f" 最少样本类 : {min(counts, key=counts.get)} ({min_count})")
rp.add(f" 最多样本类 : {max(counts, key=counts.get)} ({max_count})")
if min_count == 0:
rp.add(" [ERROR] 存在样本数为 0 的类别,训练将无法进行!")
return
ratio = max_count / min_count if min_count > 0 else float("inf")
rp.add(f" 不平衡比例 : {ratio:.2f}:1 (max/min)")
std_val = float(np.std(values))
cv = std_val / avg if avg > 0 else 0
rp.add(f" 变异系数(CV): {cv:.4f}")
if ratio > 5:
rp.add(" [WARN] 类别严重不平衡 (>5:1),建议进行数据增强或使用类别权重。")
elif ratio > 3:
rp.add(" [WARN] 类别较不平衡 (>3:1),可考虑采样策略。")
else:
rp.add(" [OK] 类别基本平衡。")
# 训练-测试划分预估
rp.add()
rp.add(" ── 训练/测试划分预估 (test_size=0.2, stratify) ──")
for lbl, cnt in sorted(counts.items()):
test_n = max(1, int(cnt * 0.2))
train_n = cnt - test_n
rp.add(f" [{lbl}] 训练 {train_n} / 测试 {test_n} (总计 {cnt})")
def _analyze_statistics(
all_features: list[tuple[str, pd.DataFrame]],
rp: ReportBuffer,
):
rp.add()
rp.add("── 特征统计信息 ──")
if not all_features:
rp.add("无有效数据,跳过。")
return
# 全局特征统计(将各 array 零填充到相同列数)
max_cols = max(f[1].shape[1] for f in all_features)
padded_arrays = []
for _, df in all_features:
val = df.values
if val.shape[1] < max_cols:
pad = np.zeros((val.shape[0], max_cols - val.shape[1]), dtype=val.dtype)
val = np.hstack([val, pad])
padded_arrays.append(val)
all_values = np.vstack(padded_arrays)
rp.add(f" 全局特征维度: {all_values.shape} (样本数 × 特征数, 零填充对齐到 {max_cols} 列)")
rp.add(f" 全局均值 : {np.mean(all_values):.6f}")
rp.add(f" 全局标准差 : {np.std(all_values):.6f}")
rp.add(f" 全局最小值 : {np.min(all_values):.6f}")
rp.add(f" 全局最大值 : {np.max(all_values):.6f}")
rp.add(f" 全局中位数 : {np.median(all_values):.6f}")
# 每个特征维度的统计
rp.add()
n_cols = min(all_values.shape[1], 12)
rp.add(f" ── 前 {n_cols} 个特征维度的分布 (均值 ± 标准差) ──")
for j in range(n_cols):
col = all_values[:, j]
rp.add(
f" feature{j+1}: "
f"μ={np.mean(col):.4f} σ={np.std(col):.4f} "
f"[{np.min(col):.4f}, {np.max(col):.4f}] "
f"med={np.median(col):.4f}"
)
# 每个类别的简要统计
rp.add()
rp.add(" ── 各类别特征统计 ──")
for lbl, df in all_features:
val = df.values
rp.add(
f" [{lbl}] "
f"μ={np.mean(val):.4f} σ={np.std(val):.4f} "
f"范围 [{np.min(val):.4f}, {np.max(val):.4f}]"
)
def _analyze_outliers(
all_features: list[tuple[str, pd.DataFrame]],
rp: ReportBuffer,
):
rp.add()
rp.add("── 离群值检测 (基于 IQR) ──")
if not all_features:
rp.add("无有效数据,跳过。")
return
total_outlier_samples = 0
total_samples = 0
for lbl, df in all_features:
val = df.values
total_samples += val.shape[0]
q1 = np.percentile(val, 25, axis=0)
q3 = np.percentile(val, 75, axis=0)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outlier_mask = np.any((val < lower) | (val > upper), axis=1)
n_outliers = int(np.sum(outlier_mask))
total_outlier_samples += n_outliers
pct = n_outliers / val.shape[0] * 100 if val.shape[0] > 0 else 0
status = "[OK]" if pct < 10 else "[WARN]" if pct < 25 else "[ERROR]"
rp.add(f" [{lbl}] 离群样本: {n_outliers}/{val.shape[0]} ({pct:.1f}%) {status}")
overall_pct = total_outlier_samples / total_samples * 100 if total_samples > 0 else 0
rp.add()
rp.add(f" 整体离群比例: {total_outlier_samples}/{total_samples} ({overall_pct:.1f}%)")
if overall_pct > 20:
rp.add(" [WARN] 超过 20% 数据为离群值,请确认数据清洗是否正确。")
elif overall_pct > 10:
rp.add(" [INFO] 离群值比例偏高,训练时可能影响收敛。")
else:
rp.add(" [OK] 离群值比例正常。")
# ============================================================
# 命令行入口
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Deeplearning 数据质量检查脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9
python Scripts/check_data.py -f "20260408 grap" -l 1 2 3 4 5 6 7 8 9 -o report.txt
python Scripts/check_data.py -f 20260319Numbers -l 0 1 2 3 4 5 6 7 8 9 -r /path/to/project
""",
)
parser.add_argument(
"-f", "--folder",
required=True,
help="Static/ 下的数据目录名(例如 20260319Numbers",
)
parser.add_argument(
"-l", "--labels",
nargs="+",
required=True,
help="类别标签列表,空格分隔(例如 0 1 2 3 4 或 A B C",
)
parser.add_argument(
"-o", "--output",
default=None,
help="将报告保存到指定文件路径",
)
parser.add_argument(
"-r", "--root",
default=None,
help="项目根目录(默认为脚本的上级目录,即 Deeplearning/",
)
args = parser.parse_args()
# 确定项目根目录
if args.root:
root = args.root
else:
script_dir = Path(__file__).resolve().parent
root = str(script_dir.parent)
root = os.path.abspath(root)
if not os.path.isdir(root):
print(f"[ERROR] 项目根目录不存在: {root}")
sys.exit(1)
rp = ReportBuffer(output_path=args.output)
check_tabular_project(
root=root,
folder=args.folder,
labels=args.labels,
rp=rp,
)
rp.save()
if __name__ == "__main__":
main()