| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/16
- # @Author : 魏志亮
- import ast
- import datetime
- import os
- import shutil
- import warnings
- from typing import List, Dict, Optional
- import chardet
- import pandas as pd
- from conf.constants import FileTypes
- from utils.log.trans_log import error, debug
- warnings.filterwarnings("ignore")
- # 获取文件编码
- def detect_file_encoding(filename: str) -> str:
- """
- 检测文件编码
-
- Args:
- filename: 文件路径
-
- Returns:
- 检测到的编码
- """
- # 读取文件的前1000个字节(足够用于大多数编码检测)
- with open(filename, 'rb') as f:
- rawdata = f.read(1000)
- result = chardet.detect(rawdata)
- encoding = result['encoding']
- debug("文件类型:", filename, encoding)
- if encoding is None:
- encoding = 'gb18030'
- if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
- return 'utf-8'
- return 'gb18030'
- def del_blank(df: pd.DataFrame = pd.DataFrame(), cols: Optional[List[str]] = None) -> pd.DataFrame:
- """
- 删除指定列的空白字符
-
- Args:
- df: 数据帧
- cols: 要处理的列列表
-
- Returns:
- 处理后的数据帧
- """
- if cols is None:
- cols = []
- for col in cols:
- if col in df.columns and df[col].dtype == object:
- df[col] = df[col].str.strip()
- return df
- # 切割数组到多个数组
- def split_array(array: List, num: int) -> List[List]:
- """
- 将数组切割成多个子数组
-
- Args:
- array: 原始数组
- num: 每个子数组的长度
-
- Returns:
- 子数组列表
- """
- return [array[i:i + num] for i in range(0, len(array), num)]
- def find_read_header(file_path: str, trans_cols: List[str], resolve_col_prefix: Optional[str] = None) -> Optional[int]:
- """
- 查找文件的表头行
-
- Args:
- file_path: 文件路径
- trans_cols: 要匹配的列名列表
- resolve_col_prefix: 列名前缀解析表达式
-
- Returns:
- 表头行索引
- """
- df = read_file_to_df(file_path, nrows=20)
- df.reset_index(inplace=True)
- count = 0
- header = None
- df_cols = df.columns
- if resolve_col_prefix:
- valid_eval(resolve_col_prefix)
- df_cols = [eval(resolve_col_prefix) for column in df.columns]
- for col in trans_cols:
- if col in df_cols:
- count += 1
- if count >= 2:
- header = 0
- break
- count = 0
- for index, row in df.iterrows():
- if resolve_col_prefix:
- values = [eval(resolve_col_prefix) for column in row.values]
- else:
- values = row.values
- for col in trans_cols:
- if col in values:
- count += 1
- if count > 2:
- header = index + 1
- return header
- return header
- # 读取数据到df
- def read_file_to_df(file_path: str, read_cols: Optional[List[str]] = None, trans_cols: Optional[List[str]] = None,
- nrows: Optional[int] = None, not_find_header: str = 'raise',
- resolve_col_prefix: Optional[str] = None) -> pd.DataFrame:
- """
- 读取文件到数据帧
-
- Args:
- file_path: 文件路径
- read_cols: 要读取的列列表
- trans_cols: 要匹配的列名列表
- nrows: 读取的行数
- not_find_header: 未找到表头时的处理方式
- resolve_col_prefix: 列名前缀解析表达式
-
- Returns:
- 读取的数据帧
- """
- begin = datetime.datetime.now()
- debug('开始读取文件', file_path)
- header = 0
- if trans_cols:
- header = find_read_header(file_path, trans_cols, resolve_col_prefix)
- debug(os.path.basename(file_path), "读取第", header, "行")
- if header is None:
- if not_find_header == 'raise':
- message = '未匹配到开始行,请检查并重新指定'
- debug(message)
- raise Exception(message)
- elif not_find_header == 'ignore':
- pass
- df = pd.DataFrame()
- if header is not None:
- try:
- file_path_lower = str(file_path).lower()
- if file_path_lower.endswith("csv") or file_path_lower.endswith("gz"):
- encoding = detect_file_encoding(file_path)
- end_with_gz = file_path_lower.endswith("gz")
- if read_cols:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip',
- header=header,
- nrows=nrows)
- else:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
- on_bad_lines='warn', nrows=nrows)
- else:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
- else:
- df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
- else:
- xls = pd.ExcelFile(file_path)
- # 获取所有的sheet名称
- sheet_names = xls.sheet_names
- for sheet_name in sheet_names:
- if read_cols:
- now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols,
- nrows=nrows)
- else:
- now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
- now_df['sheet_name'] = sheet_name
- df = pd.concat([df, now_df])
- xls.close()
- debug('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
- except Exception as e:
- error('读取文件出错', file_path, str(e))
- message = '文件:' + os.path.basename(file_path) + ',' + str(e)
- raise ValueError(message)
- return df
- def __build_directory_dict(directory_dict: Dict[str, List[str]], path: str,
- filter_types: Optional[List[str]] = None) -> None:
- """
- 构建目录文件字典
-
- Args:
- directory_dict: 目录文件字典
- path: 目录路径
- filter_types: 文件类型过滤器
- """
- # 遍历目录下的所有项
- for item in os.listdir(path):
- item_path = os.path.join(path, item)
- if os.path.isdir(item_path):
- __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
- elif os.path.isfile(item_path):
- if path not in directory_dict:
- directory_dict[path] = []
- if filter_types is None or len(filter_types) == 0:
- directory_dict[path].append(item_path)
- else:
- # 获取文件扩展名
- ext = os.path.splitext(item_path)[1].lstrip('.').lower()
- if ext in filter_types and "~$" not in item_path:
- directory_dict[path].append(item_path)
- # 读取路径下所有的excel文件
- def read_excel_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]:
- """
- 读取路径下所有的Excel文件
-
- Args:
- read_path: 读取路径
- filter_types: 文件类型过滤器
-
- Returns:
- 文件路径列表
- """
- if not os.path.exists(read_path):
- return []
- if filter_types is None:
- # filter_types = ['xls', 'xlsx', 'csv', 'gz']
- filter_types = FileTypes.EXCEL_TYPES
- if os.path.isfile(read_path):
- return [read_path]
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
- return [path for paths in directory_dict.values() for path in paths if path]
- # 读取路径下所有的文件
- def read_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]:
- """
- 读取路径下所有的文件
-
- Args:
- read_path: 读取路径
- filter_types: 文件类型过滤器
-
- Returns:
- 文件路径列表
- """
- if filter_types is None:
- filter_types = list(FileTypes.EXCEL_TYPES)
- filter_types.extend(FileTypes.ZIP_TYPES)
- if os.path.isfile(read_path):
- return [read_path]
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
- return [path1 for paths in directory_dict.values() for path1 in paths if path1]
- def copy_to_new(from_path: str, to_path: str) -> None:
- """
- 复制文件到新路径
-
- Args:
- from_path: 源文件路径
- to_path: 目标文件路径
- """
- is_file = '.' in to_path
- create_file_path(to_path, is_file_path=is_file)
- shutil.copy(from_path, to_path)
- # 创建路径
- def create_file_path(read_path: str, is_file_path: bool = False) -> None:
- """
- 创建路径
-
- Args:
- read_path: 创建文件夹的路径
- is_file_path: 传入的path是否包含具体的文件名
- """
- if is_file_path:
- read_path = os.path.dirname(read_path)
- if not os.path.exists(read_path):
- os.makedirs(read_path, exist_ok=True)
- def valid_eval(eval_str: str) -> bool:
- """
- 验证 eval 是否包含非法的参数
-
- Args:
- eval_str: 要验证的表达式
-
- Returns:
- 是否合法
- """
- safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
- eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
- if not set(eval_str_names).issubset(safe_param):
- raise NameError(
- eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
- return True
- if __name__ == '__main__':
- # aa = valid_eval("column[column.find('_')+1:]")
- # print(aa)
- #
- # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
- # print(aa)
- #
- # aa = valid_eval("'记录时间' if column == '时间' else column;from os import *; path")
- # print(aa)
- df = read_file_to_df(r"D:\data\11-12月.xls",
- trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
- print(df.columns)
|