Deeplearning/Qfunctions/loadData.py

188 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import unicodedata
import pandas as pd
STATIC_PATH = './Static'
# 从文件夹中读取所有xlsx文件每个文件对应一个label
# labelNames为label的名字如果不提供则默认为文件名
def load_data(folder, labelNames, isDir=True, fileClass='xlsx'):
# 检查folder参数
if folder is None:
raise ValueError("The 'folder' parameter is required.")
# 检查labelNames参数
if labelNames is None:
raise ValueError("The 'labelNames' parameter is required if 'folder' does not contain labels.")
folder = os.path.join(STATIC_PATH, folder)
# 看看有没有元数据文件夹
if not os.path.isdir(folder):
raise ValueError(f"The folder '{folder}' does not exist.")
if not isDir:
data = load_from_file(folder=folder, labelNames=labelNames, fileClass=fileClass)
else:
data = load_from_folder(folder=folder, labelNames=labelNames, fileClass=fileClass)
print(data)
return data
def load_from_folder(folder, labelNames, fileClass):
all_features = []
fileClass = '.' + fileClass
for labelName in labelNames:
subfolder = os.path.join(folder, labelName)
if os.path.exists(subfolder) and os.path.isdir(subfolder):
fileNames = [f for f in os.listdir(subfolder) if f.endswith(fileClass)]
max_row_length = get_max_row_len(subfolder, fileNames)
features = []
for fileName in fileNames:
file_path = os.path.join(subfolder, fileName)
features.append(load_xlsx(file_path, labelName, max_row_length, 'zero'))
if features:
all_features.append(pd.concat(features, ignore_index=True))
# 将所有标签的数据合并
return pd.concat(all_features, ignore_index=True)
def load_from_file(folder, labelNames, fileClass):
# 构建期望的文件名label + .扩展名),并在目录中进行健壮匹配
# 去除零宽字符、Unicode 规范化、大小写不敏感)
expected_names = [f"{labelName}.{fileClass}" for labelName in labelNames]
actual_file_names = []
missing = []
for expected in expected_names:
match = _find_matching_file(folder, expected)
if match is None:
missing.append(expected)
else:
actual_file_names.append(match)
if missing:
available = sorted(os.listdir(folder))
raise FileNotFoundError(
"The following files were not found (after normalization): "
+ ", ".join(missing)
+ f". Available files: {available}"
)
# 获取数据的最大行数(使用实际匹配到的文件名)
max_row_length = get_max_row_len(folder, actual_file_names)
all_features = []
for i, fileName in enumerate(actual_file_names):
file_path = os.path.join(folder, fileName)
features = load_xlsx(file_path, labelNames[i], max_row_length, 'zero')
all_features.append(features)
return pd.concat(all_features, ignore_index=True)
def load_xlsx(fileName, labelName, max_row_length=1000, fill_rule=None):
df = pd.read_excel(fileName)
# 提取偶数列
features = df.iloc[0:, 1::2]
features.dropna(inplace=True)
features.reset_index(drop=True, inplace=True)
features = features.T
# 补全每一行到指定长度
features = features.apply(lambda row: fill_to_len(row, max_row_length, fill_rule), axis=1)
# 获取实际的列数
actual_columns = features.shape[1]
features['label'] = labelName
features.columns = [f'feature{i+1}' for i in range(actual_columns)] + ['label']
return features
def fill_to_len(row, length=1000, rule=None):
if len(row) >= length:
return row.iloc[:length].reset_index(drop=True)
fill_value = 0
if rule == 'min':
fill_value = row.min()
elif rule == 'mean':
fill_value = row.mean()
elif rule == 'zero':
fill_value = 0
fill_values = pd.Series([fill_value] * (length - len(row)))
return pd.concat([row, fill_values], ignore_index=True)
def get_max_row_len(folder, filenames):
max_len = 0
for filename in filenames:
df = pd.read_excel(os.path.join(folder, filename))
max_len = max(max_len, df.shape[0])
return max_len
# ---------- 内部工具函数:处理包含零宽字符或不同 Unicode 形式的文件名匹配 ----------
def _strip_zero_width(s: str) -> str:
# 移除常见零宽字符U+200B, U+200C, U+200D, U+FEFF
if not isinstance(s, str):
return s
return s.translate({
0x200B: None,
0x200C: None,
0x200D: None,
0xFEFF: None,
})
def _canonicalize_name(name: str) -> str:
# 规范化到 NFKC并移除零宽字符
name = unicodedata.normalize('NFKC', name)
name = _strip_zero_width(name)
return name
def _normalize_for_compare(name: str) -> str:
# 进一步规范化用于宽松比较
n = _canonicalize_name(name)
n = n.replace('_', ' ')
n = ' '.join(n.split())
return n.lower()
def _find_matching_file(folder: str, expected_name: str):
# 首先进行严格匹配(规范化后相等)
expected = _canonicalize_name(expected_name)
try:
entries = os.listdir(folder)
except FileNotFoundError:
return None
for f in entries:
if _canonicalize_name(f) == expected:
return f
# 次要策略:大小写不敏感比较
expected_lower = expected.lower()
for f in entries:
if _canonicalize_name(f).lower() == expected_lower:
return f
# 宽松策略:将下划线当作空格处理,并折叠空白
expected_relaxed = _normalize_for_compare(expected_name)
for f in entries:
if _normalize_for_compare(f) == expected_relaxed:
return f
return None
__all__ = ['load_data']