# -*- coding: utf-8 -*- # @Time : 2024/5/16 # @Author : 魏志亮 import ast import datetime import os import shutil import warnings from typing import List, Dict, Optional import chardet import pandas as pd from conf.constants import FileTypes from utils.log.trans_log import error, debug warnings.filterwarnings("ignore") # 获取文件编码 def detect_file_encoding(filename: str) -> str: """ 检测文件编码 Args: filename: 文件路径 Returns: 检测到的编码 """ # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] debug("文件类型:", filename, encoding) if encoding is None: encoding = 'gb18030' if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']: return 'utf-8' return 'gb18030' def del_blank(df: pd.DataFrame = pd.DataFrame(), cols: Optional[List[str]] = None) -> pd.DataFrame: """ 删除指定列的空白字符 Args: df: 数据帧 cols: 要处理的列列表 Returns: 处理后的数据帧 """ if cols is None: cols = [] for col in cols: if col in df.columns and df[col].dtype == object: df[col] = df[col].str.strip() return df # 切割数组到多个数组 def split_array(array: List, num: int) -> List[List]: """ 将数组切割成多个子数组 Args: array: 原始数组 num: 每个子数组的长度 Returns: 子数组列表 """ return [array[i:i + num] for i in range(0, len(array), num)] def find_read_header(file_path: str, trans_cols: List[str], resolve_col_prefix: Optional[str] = None) -> Optional[int]: """ 查找文件的表头行 Args: file_path: 文件路径 trans_cols: 要匹配的列名列表 resolve_col_prefix: 列名前缀解析表达式 Returns: 表头行索引 """ df = read_file_to_df(file_path, nrows=20) df.reset_index(inplace=True) count = 0 header = None df_cols = df.columns if resolve_col_prefix: valid_eval(resolve_col_prefix) df_cols = [eval(resolve_col_prefix) for column in df.columns] for col in trans_cols: if col in df_cols: count += 1 if count >= 2: header = 0 break count = 0 for index, row in df.iterrows(): if resolve_col_prefix: values = [eval(resolve_col_prefix) for column in row.values] else: values = row.values for col in trans_cols: if col in values: count += 1 if count > 2: header = index + 1 return header return header # 读取数据到df def read_file_to_df(file_path: str, read_cols: Optional[List[str]] = None, trans_cols: Optional[List[str]] = None, nrows: Optional[int] = None, not_find_header: str = 'raise', resolve_col_prefix: Optional[str] = None) -> pd.DataFrame: """ 读取文件到数据帧 Args: file_path: 文件路径 read_cols: 要读取的列列表 trans_cols: 要匹配的列名列表 nrows: 读取的行数 not_find_header: 未找到表头时的处理方式 resolve_col_prefix: 列名前缀解析表达式 Returns: 读取的数据帧 """ begin = datetime.datetime.now() debug('开始读取文件', file_path) header = 0 if trans_cols: header = find_read_header(file_path, trans_cols, resolve_col_prefix) debug(os.path.basename(file_path), "读取第", header, "行") if header is None: if not_find_header == 'raise': message = '未匹配到开始行,请检查并重新指定' debug(message) raise Exception(message) elif not_find_header == 'ignore': pass df = pd.DataFrame() if header is not None: try: file_path_lower = str(file_path).lower() if file_path_lower.endswith("csv") or file_path_lower.endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = file_path_lower.endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header, nrows=nrows) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn', nrows=nrows) else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows) else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet_name in sheet_names: if read_cols: now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows) else: now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows) now_df['sheet_name'] = sheet_name df = pd.concat([df, now_df]) xls.close() debug('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin) except Exception as e: error('读取文件出错', file_path, str(e)) message = '文件:' + os.path.basename(file_path) + ',' + str(e) raise ValueError(message) return df def __build_directory_dict(directory_dict: Dict[str, List[str]], path: str, filter_types: Optional[List[str]] = None) -> None: """ 构建目录文件字典 Args: directory_dict: 目录文件字典 path: 目录路径 filter_types: 文件类型过滤器 """ # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) else: # 获取文件扩展名 ext = os.path.splitext(item_path)[1].lstrip('.').lower() if ext in filter_types and "~$" not in item_path: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]: """ 读取路径下所有的Excel文件 Args: read_path: 读取路径 filter_types: 文件类型过滤器 Returns: 文件路径列表 """ if not os.path.exists(read_path): return [] if filter_types is None: # filter_types = ['xls', 'xlsx', 'csv', 'gz'] filter_types = FileTypes.EXCEL_TYPES if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=filter_types) return [path for paths in directory_dict.values() for path in paths if path] # 读取路径下所有的文件 def read_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]: """ 读取路径下所有的文件 Args: read_path: 读取路径 filter_types: 文件类型过滤器 Returns: 文件路径列表 """ if filter_types is None: filter_types = list(FileTypes.EXCEL_TYPES) filter_types.extend(FileTypes.ZIP_TYPES) if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=filter_types) return [path1 for paths in directory_dict.values() for path1 in paths if path1] def copy_to_new(from_path: str, to_path: str) -> None: """ 复制文件到新路径 Args: from_path: 源文件路径 to_path: 目标文件路径 """ is_file = '.' in to_path create_file_path(to_path, is_file_path=is_file) shutil.copy(from_path, to_path) # 创建路径 def create_file_path(read_path: str, is_file_path: bool = False) -> None: """ 创建路径 Args: read_path: 创建文件夹的路径 is_file_path: 传入的path是否包含具体的文件名 """ if is_file_path: read_path = os.path.dirname(read_path) if not os.path.exists(read_path): os.makedirs(read_path, exist_ok=True) def valid_eval(eval_str: str) -> bool: """ 验证 eval 是否包含非法的参数 Args: eval_str: 要验证的表达式 Returns: 是否合法 """ safe_param = ["column", "wind_name", "df", "error_time", "str", "int"] eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)] if not set(eval_str_names).issubset(safe_param): raise NameError( eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param))))) return True if __name__ == '__main__': # aa = valid_eval("column[column.find('_')+1:]") # print(aa) # # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))") # print(aa) # # aa = valid_eval("'记录时间' if column == '时间' else column;from os import *; path") # print(aa) df = read_file_to_df(r"D:\data\11-12月.xls", trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30) print(df.columns)