Deeplearning/Qfunctions/loaData.py
2024-10-19 11:07:59 +08:00

122 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import pandas as pd
STATIC_PATH = './Static'
# 从文件夹中读取所有xlsx文件每个文件对应一个label
# labelNames为label的名字如果不提供则默认为文件名
def load_data(folder, labelNames, isDir):
# 检查folder参数
if folder is None:
raise ValueError("The 'folder' parameter is required.")
# 检查labelNames参数
if labelNames is None:
raise ValueError("The 'labelNames' parameter is required if 'folder' does not contain labels.")
folder = os.path.join(STATIC_PATH, folder)
# 看看有没有元数据文件夹
if not os.path.isdir(folder):
raise ValueError(f"The folder '{folder}' does not exist.")
# fileNames = [f for f in os.listdir(folder) if f.endswith('.xlsx')]
# # 获取数据的最大行数
# max_row_length = get_max_row_len(folder, fileNames)
# all_features = []
# for i, fileName in enumerate(fileNames):
# features = load_xlsx(folder + '/' + fileName, labelNames[i], max_row_length, 'zero')
# all_features.append(features)
# data = pd.concat(all_features, ignore_index = True)
data = None
if not isDir:
data = load_from_file(folder=folder, labelNames=labelNames)
else:
data = load_from_folder(folder=folder, labelNames=labelNames)
print(data)
return data
def load_from_folder(folder, labelNames):
all_features = []
for labelName in labelNames:
subfolder = os.path.join(folder, labelName)
if os.path.exists(subfolder) and os.path.isdir(subfolder):
fileNames = [f for f in os.listdir(subfolder) if f.endswith('.xlsx')]
max_row_length = get_max_row_len(subfolder, fileNames)
features = []
for fileName in fileNames:
file_path = os.path.join(subfolder, fileName)
features.append(load_xlsx(file_path, labelName, max_row_length, 'zero'))
if features:
all_features.append(pd.concat(features, ignore_index=True))
# 将所有标签的数据合并
return pd.concat(all_features, ignore_index=True)
def load_from_file(folder, labelNames):
fileNames = [labelName + ".xlsx" for labelName in labelNames]
# 获取数据的最大行数
max_row_length = get_max_row_len(folder, fileNames)
all_features = []
for i, fileName in enumerate(fileNames):
features = load_xlsx(folder + '/' + fileName, labelNames[i], max_row_length, 'zero')
all_features.append(features)
return pd.concat(all_features, ignore_index = True)
def load_xlsx(fileName, labelName, max_row_length = 1000, fill_rule = None):
df = pd.read_excel(fileName)
# 提取偶数列
features = df.iloc[0:, 1::2]
# 复制 features DataFrame
features_copy = features.copy()
# 使用 pd.concat 来追加副本到原始 DataFrame
features = pd.concat([features, features_copy], ignore_index=True, axis=1)
# 计算变化率
# first_value = features.iloc[0, :] # 获取第一行的数据
# features_pct_change = (features - first_value) / first_value
# features = features_pct_change
features.dropna(inplace=True)
features.reset_index(drop=True, inplace=True)
features = features.T
# 补全每一行到指定长度
features = features.apply(lambda row: fill_to_len(row, max_row_length, fill_rule), axis=1)
features['label'] = labelName
features.columns = [f'feature{i+1}' for i in range(max_row_length)] + ['label']
return features
def fill_to_len(row, length = 1000, rule = None):
fill_value = 0
if rule == 'min':
fill_value = row.min()
elif rule == 'mean':
fill_value = row.mean()
elif rule == 'zero':
fill_value = 0
fill_values = pd.Series([fill_value] * (length - len(row)))
return pd.concat([row, fill_values], ignore_index=True)
def get_max_row_len(folder, filenames):
max_len = 0
for filename in filenames:
df = pd.read_excel(os.path.join(folder, filename))
max_len = max(max_len, df.shape[0])
return max_len
__all__ = ['load_data']