import os import unicodedata import pandas as pd STATIC_PATH = './Static' # 从文件夹中读取所有xlsx文件,每个文件对应一个label # labelNames为label的名字,如果不提供则默认为文件名 def load_data(folder, labelNames, isDir=True, fileClass='xlsx'): # 检查folder参数 if folder is None: raise ValueError("The 'folder' parameter is required.") # 检查labelNames参数 if labelNames is None: raise ValueError("The 'labelNames' parameter is required if 'folder' does not contain labels.") folder = os.path.join(STATIC_PATH, folder) # 看看有没有元数据文件夹 if not os.path.isdir(folder): raise ValueError(f"The folder '{folder}' does not exist.") data = None if not isDir: data = load_from_file(folder=folder, labelNames=labelNames, fileClass=fileClass) else: data = load_from_folder(folder=folder, labelNames=labelNames, fileClass=fileClass) print(data) return data def load_from_folder(folder, labelNames, fileClass): all_features = [] fileClass = '.' + fileClass for labelName in labelNames: subfolder = os.path.join(folder, labelName) if os.path.exists(subfolder) and os.path.isdir(subfolder): fileNames = [f for f in os.listdir(subfolder) if f.endswith(fileClass)] max_row_length = get_max_row_len(subfolder, fileNames) features = [] for fileName in fileNames: file_path = os.path.join(subfolder, fileName) features.append(load_xlsx(file_path, labelName, max_row_length, 'zero')) if features: all_features.append(pd.concat(features, ignore_index=True)) # 将所有标签的数据合并 return pd.concat(all_features, ignore_index=True) def load_from_file(folder, labelNames, fileClass): # 构建期望的文件名(label + .扩展名),并在目录中进行健壮匹配(去除零宽字符、Unicode 规范化、大小写不敏感) expected_names = [f"{labelName}.{fileClass}" for labelName in labelNames] actual_file_names = [] missing = [] for expected in expected_names: match = _find_matching_file(folder, expected) if match is None: missing.append(expected) else: actual_file_names.append(match) if missing: available = sorted(os.listdir(folder)) raise FileNotFoundError( "The following files were not found (after normalization): " + ", ".join(missing) + f". Available files: {available}" ) # 获取数据的最大行数(使用实际匹配到的文件名) max_row_length = get_max_row_len(folder, actual_file_names) all_features = [] for i, fileName in enumerate(actual_file_names): file_path = os.path.join(folder, fileName) features = load_xlsx(file_path, labelNames[i], max_row_length, 'zero') all_features.append(features) return pd.concat(all_features, ignore_index = True) def load_xlsx(fileName, labelName, max_row_length = 1000, fill_rule = None): df = pd.read_excel(fileName) # 提取偶数列 features = df.iloc[0:, 1::2] # 复制 features DataFrame # features_copy = features.copy() # 使用 pd.concat 来追加副本到原始 DataFrame # features = pd.concat([features, features_copy], ignore_index=True, axis=1) # 计算变化率 # first_value = features.iloc[0, :] # 获取第一行的数据 # features_pct_change = (features - first_value) / first_value # features = features_pct_change features.dropna(inplace=True) features.reset_index(drop=True, inplace=True) features = features.T # 补全每一行到指定长度 features = features.apply(lambda row: fill_to_len(row, max_row_length, fill_rule), axis=1) # 获取实际的列数 actual_columns = features.shape[1] features['label'] = labelName # 使用实际的列数来创建列名 features.columns = [f'feature{i+1}' for i in range(actual_columns)] + ['label'] return features def fill_to_len(row, length = 1000, rule = None): fill_value = 0 if rule == 'min': fill_value = row.min() elif rule == 'mean': fill_value = row.mean() elif rule == 'zero': fill_value = 0 fill_values = pd.Series([fill_value] * (length - len(row))) return pd.concat([row, fill_values], ignore_index=True) def get_max_row_len(folder, filenames): max_len = 0 for filename in filenames: df = pd.read_excel(os.path.join(folder, filename)) max_len = max(max_len, df.shape[0]) return max_len __all__ = ['load_data'] # ---------- 内部工具函数:处理包含零宽字符或不同 Unicode 形式的文件名匹配 ---------- def _strip_zero_width(s: str) -> str: # 移除常见零宽字符:U+200B, U+200C, U+200D, U+FEFF if not isinstance(s, str): return s return s.translate({ 0x200B: None, # ZERO WIDTH SPACE 0x200C: None, # ZERO WIDTH NON-JOINER 0x200D: None, # ZERO WIDTH JOINER 0xFEFF: None, # ZERO WIDTH NO-BREAK SPACE }) def _canonicalize_name(name: str) -> str: # 规范化到 NFKC,并移除零宽字符 name = unicodedata.normalize('NFKC', name) name = _strip_zero_width(name) return name def _normalize_for_compare(name: str) -> str: # 进一步规范化用于宽松比较: # - 统一大小写 # - 将下划线视为空格(与文件名用下划线代替空格的情况匹配) # - 折叠所有空白为一个空格,并去除首尾空格 n = _canonicalize_name(name) n = n.replace('_', ' ') n = ' '.join(n.split()) return n.lower() def _find_matching_file(folder: str, expected_name: str): # 首先进行严格匹配(规范化后相等) expected = _canonicalize_name(expected_name) try: entries = os.listdir(folder) except FileNotFoundError: return None for f in entries: if _canonicalize_name(f) == expected: return f # 次要策略:大小写不敏感比较 expected_lower = expected.lower() for f in entries: if _canonicalize_name(f).lower() == expected_lower: return f # 宽松策略:将下划线当作空格处理,并折叠空白(用于匹配 "Crocodile grain" vs "Crocodile_grain") expected_relaxed = _normalize_for_compare(expected_name) for f in entries: if _normalize_for_compare(f) == expected_relaxed: return f return None