235 lines
7.1 KiB
Python
235 lines
7.1 KiB
Python
import os
|
||
import unicodedata
|
||
import pandas as pd
|
||
|
||
STATIC_PATH = './Static'
|
||
|
||
|
||
# 从文件夹中读取所有xlsx文件,每个文件对应一个label
|
||
# labelNames为label的名字,如果不提供则默认为文件名
|
||
def load_data(folder, labelNames, fileClass='xlsx'):
|
||
# 检查folder参数
|
||
if folder is None:
|
||
raise ValueError("The 'folder' parameter is required.")
|
||
|
||
# 检查labelNames参数
|
||
if labelNames is None:
|
||
raise ValueError("The 'labelNames' parameter is required if 'folder' does not contain labels.")
|
||
|
||
folder = os.path.join(STATIC_PATH, folder)
|
||
|
||
# 看看有没有元数据文件夹
|
||
if not os.path.isdir(folder):
|
||
raise ValueError(f"The folder '{folder}' does not exist.")
|
||
|
||
# 自动检测数据组织方式
|
||
is_dir_mode = _detect_data_mode(folder=folder, labelNames=labelNames, fileClass=fileClass)
|
||
mode_name = 'multi-folder mode' if is_dir_mode else 'single-file mode'
|
||
print(f"Auto detected data mode: {mode_name}")
|
||
|
||
if not is_dir_mode:
|
||
data = load_from_file(folder=folder, labelNames=labelNames, fileClass=fileClass)
|
||
else:
|
||
data = load_from_folder(folder=folder, labelNames=labelNames, fileClass=fileClass)
|
||
|
||
print(data)
|
||
return data
|
||
|
||
|
||
def load_from_folder(folder, labelNames, fileClass):
|
||
all_features = []
|
||
fileClass = '.' + fileClass
|
||
for labelName in labelNames:
|
||
subfolder = os.path.join(folder, labelName)
|
||
if os.path.exists(subfolder) and os.path.isdir(subfolder):
|
||
fileNames = [f for f in os.listdir(subfolder) if f.endswith(fileClass)]
|
||
max_row_length = get_max_row_len(subfolder, fileNames)
|
||
features = []
|
||
for fileName in fileNames:
|
||
file_path = os.path.join(subfolder, fileName)
|
||
features.append(load_xlsx(file_path, labelName, max_row_length, 'zero'))
|
||
if features:
|
||
all_features.append(pd.concat(features, ignore_index=True))
|
||
|
||
# 将所有标签的数据合并
|
||
return pd.concat(all_features, ignore_index=True)
|
||
|
||
|
||
def load_from_file(folder, labelNames, fileClass):
|
||
# 构建期望的文件名(label + .扩展名),并在目录中进行健壮匹配
|
||
# (去除零宽字符、Unicode 规范化、大小写不敏感)
|
||
expected_names = [f"{labelName}.{fileClass}" for labelName in labelNames]
|
||
|
||
actual_file_names = []
|
||
missing = []
|
||
for expected in expected_names:
|
||
match = _find_matching_file(folder, expected)
|
||
if match is None:
|
||
missing.append(expected)
|
||
else:
|
||
actual_file_names.append(match)
|
||
|
||
if missing:
|
||
available = sorted(os.listdir(folder))
|
||
raise FileNotFoundError(
|
||
"The following files were not found (after normalization): "
|
||
+ ", ".join(missing)
|
||
+ f". Available files: {available}"
|
||
)
|
||
|
||
# 获取数据的最大行数(使用实际匹配到的文件名)
|
||
max_row_length = get_max_row_len(folder, actual_file_names)
|
||
|
||
all_features = []
|
||
for i, fileName in enumerate(actual_file_names):
|
||
file_path = os.path.join(folder, fileName)
|
||
features = load_xlsx(file_path, labelNames[i], max_row_length, 'zero')
|
||
all_features.append(features)
|
||
return pd.concat(all_features, ignore_index=True)
|
||
|
||
|
||
def load_xlsx(fileName, labelName, max_row_length=1000, fill_rule=None):
|
||
df = pd.read_excel(fileName)
|
||
|
||
# 提取偶数列
|
||
features = df.iloc[0:, 1::2]
|
||
|
||
features.dropna(inplace=True)
|
||
features.reset_index(drop=True, inplace=True)
|
||
features = features.T
|
||
|
||
# 补全每一行到指定长度
|
||
features = features.apply(lambda row: fill_to_len(row, max_row_length, fill_rule), axis=1)
|
||
|
||
# 获取实际的列数
|
||
actual_columns = features.shape[1]
|
||
features['label'] = labelName
|
||
features.columns = [f'feature{i+1}' for i in range(actual_columns)] + ['label']
|
||
|
||
return features
|
||
|
||
|
||
def fill_to_len(row, length=1000, rule=None):
|
||
if len(row) >= length:
|
||
return row.iloc[:length].reset_index(drop=True)
|
||
|
||
fill_value = 0
|
||
if rule == 'min':
|
||
fill_value = row.min()
|
||
elif rule == 'mean':
|
||
fill_value = row.mean()
|
||
elif rule == 'zero':
|
||
fill_value = 0
|
||
|
||
fill_values = pd.Series([fill_value] * (length - len(row)))
|
||
return pd.concat([row, fill_values], ignore_index=True)
|
||
|
||
|
||
def get_max_row_len(folder, filenames):
|
||
max_len = 0
|
||
for filename in filenames:
|
||
df = pd.read_excel(os.path.join(folder, filename))
|
||
max_len = max(max_len, df.shape[0])
|
||
return max_len
|
||
|
||
|
||
# ---------- 内部工具函数:处理包含零宽字符或不同 Unicode 形式的文件名匹配 ----------
|
||
|
||
def _strip_zero_width(s: str) -> str:
|
||
# 移除常见零宽字符:U+200B, U+200C, U+200D, U+FEFF
|
||
if not isinstance(s, str):
|
||
return s
|
||
return s.translate({
|
||
0x200B: None,
|
||
0x200C: None,
|
||
0x200D: None,
|
||
0xFEFF: None,
|
||
})
|
||
|
||
|
||
def _canonicalize_name(name: str) -> str:
|
||
# 规范化到 NFKC,并移除零宽字符
|
||
name = unicodedata.normalize('NFKC', name)
|
||
name = _strip_zero_width(name)
|
||
return name
|
||
|
||
|
||
def _normalize_for_compare(name: str) -> str:
|
||
# 进一步规范化用于宽松比较
|
||
n = _canonicalize_name(name)
|
||
n = n.replace('_', ' ')
|
||
n = ' '.join(n.split())
|
||
return n.lower()
|
||
|
||
|
||
def _find_matching_file(folder: str, expected_name: str):
|
||
# 首先进行严格匹配(规范化后相等)
|
||
expected = _canonicalize_name(expected_name)
|
||
try:
|
||
entries = os.listdir(folder)
|
||
except FileNotFoundError:
|
||
return None
|
||
|
||
for f in entries:
|
||
if _canonicalize_name(f) == expected:
|
||
return f
|
||
|
||
# 次要策略:大小写不敏感比较
|
||
expected_lower = expected.lower()
|
||
for f in entries:
|
||
if _canonicalize_name(f).lower() == expected_lower:
|
||
return f
|
||
|
||
# 宽松策略:将下划线当作空格处理,并折叠空白
|
||
expected_relaxed = _normalize_for_compare(expected_name)
|
||
for f in entries:
|
||
if _normalize_for_compare(f) == expected_relaxed:
|
||
return f
|
||
|
||
return None
|
||
|
||
|
||
def _detect_data_mode(folder: str, labelNames, fileClass: str) -> bool:
|
||
"""Auto detect data organization mode.
|
||
|
||
Returns:
|
||
True: multi-folder mode (folder/label/*.ext)
|
||
False: single-file mode (folder/label.ext)
|
||
"""
|
||
ext = f'.{fileClass}'
|
||
|
||
# 判断是否满足多文件夹模式:每个 label 对应一个子目录,且至少有一个目标后缀文件
|
||
has_all_label_subfolders = True
|
||
for label in labelNames:
|
||
subfolder = os.path.join(folder, str(label))
|
||
if not (os.path.isdir(subfolder) and any(f.endswith(ext) for f in os.listdir(subfolder))):
|
||
has_all_label_subfolders = False
|
||
break
|
||
|
||
# 判断是否满足单文件模式:每个 label 能匹配到对应文件
|
||
has_all_label_files = True
|
||
for label in labelNames:
|
||
expected_name = f"{label}.{fileClass}"
|
||
if _find_matching_file(folder, expected_name) is None:
|
||
has_all_label_files = False
|
||
break
|
||
|
||
if has_all_label_subfolders and not has_all_label_files:
|
||
return True
|
||
if has_all_label_files and not has_all_label_subfolders:
|
||
return False
|
||
|
||
if has_all_label_subfolders and has_all_label_files:
|
||
raise ValueError(
|
||
"Auto detect found both valid layouts under the same folder. "
|
||
"Please keep only one layout type (either subfolders or root files) for each label."
|
||
)
|
||
|
||
raise ValueError(
|
||
"Auto detect failed: neither single-file nor multi-folder layout matches all labels. "
|
||
"Please verify folder structure and labelNames."
|
||
)
|
||
|
||
|
||
__all__ = ['load_data']
|