# -*- coding: utf-8 -*- # @Time : 2024/5/16 # @Author : 魏志亮 import datetime import os import re import shutil import warnings import chardet import pandas as pd from utils.log.trans_log import trans_print warnings.filterwarnings("ignore") # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] trans_print("文件类型:", filename, encoding) if encoding is None: encoding = 'gb18030' if encoding.lower() in ['utf-8', 'ascii', 'utf8']: return 'utf-8' return 'gb18030' def del_blank(df=pd.DataFrame(), cols=list()): for col in cols: if df[col].dtype == object: df[col] = df[col].str.strip() return df # 切割数组到多个数组 def split_array(array, num): return [array[i:i + num] for i in range(0, len(array), num)] def find_read_header(file_path, trans_cols): print(trans_cols) df = read_file_to_df(file_path, nrows=20) df.reset_index(inplace=True) count = 0 for col in trans_cols: if col in df.columns: count = count + 1 if count >= 2: return 0 count = 0 values = list() for index, row in df.iterrows(): values = list(row.values) if type(row.name) == tuple: values.extend(list(row.name)) for col in trans_cols: if col in values: count = count + 1 if count >= 2: return index + 1 return None # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrows=None): begin = datetime.datetime.now() trans_print('开始读取文件', file_path) if trans_cols: header = find_read_header(file_path, trans_cols) trans_print(os.path.basename(file_path), "读取第", header, "行") if header is None: message = '未匹配到开始行,请检查并重新指定' trans_print(message) raise Exception(message) try: df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header, nrows=nrows) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn', nrows=nrows) else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows) else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet_name in sheet_names: if read_cols: now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows) else: now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows) now_df['sheet_name'] = sheet_name df = pd.concat([df, now_df]) trans_print('文件读取成功', file_path, '文件数量', df.shape, '耗时', datetime.datetime.now() - begin) except Exception as e: trans_print('读取文件出错', file_path, str(e)) message = '文件:' + os.path.basename(file_path) + ',' + str(e) raise ValueError(message) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 读取路径下所有的文件 def read_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']) return [path for paths in directory_dict.values() for path in paths if path] def copy_to_new(from_path, to_path): is_file = False if to_path.count('.') > 0: is_file = True create_file_path(to_path, is_file_path=is_file) shutil.copy(from_path, to_path) # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = os.path.dirname(path) if not os.path.exists(path): os.makedirs(path, exist_ok=True) # 格式化风机名称 def generate_turbine_name(turbine_name='F0001', prefix='F'): strinfo = re.compile(r"[\D*]") name = strinfo.sub('', str(turbine_name)) return prefix + str(int(name)).zfill(3) if __name__ == '__main__': # files = read_excel_files(r'D:\trans_data\10.xls') # for file in files: file = r'D:\trans_data\新艾里风电场10号风机.csv' read_file_to_df(file, trans_cols= ['', '风向', '时间', '设备号', '机舱方向总角度', '$folder[2]', '发电机转速30秒平均值', '机组运行模式', '机舱旋转角度', '主轴转速', '变桨角度30秒平均值', '记录时间', '发电机功率30秒平均值', '风速30秒平均值'])