trans_methods.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/16
  3. # @Author : 魏志亮
  4. import ast
  5. import datetime
  6. import os
  7. import shutil
  8. import warnings
  9. from typing import List, Dict, Optional
  10. import chardet
  11. import pandas as pd
  12. from conf.constants import FileTypes
  13. from utils.log.trans_log import error, debug
  14. warnings.filterwarnings("ignore")
  15. # 获取文件编码
  16. def detect_file_encoding(filename: str) -> str:
  17. """
  18. 检测文件编码
  19. Args:
  20. filename: 文件路径
  21. Returns:
  22. 检测到的编码
  23. """
  24. # 读取文件的前1000个字节(足够用于大多数编码检测)
  25. with open(filename, 'rb') as f:
  26. rawdata = f.read(1000)
  27. result = chardet.detect(rawdata)
  28. encoding = result['encoding']
  29. debug("文件类型:", filename, encoding)
  30. if encoding is None:
  31. encoding = 'gb18030'
  32. if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
  33. return 'utf-8'
  34. return 'gb18030'
  35. def del_blank(df: pd.DataFrame = pd.DataFrame(), cols: Optional[List[str]] = None) -> pd.DataFrame:
  36. """
  37. 删除指定列的空白字符
  38. Args:
  39. df: 数据帧
  40. cols: 要处理的列列表
  41. Returns:
  42. 处理后的数据帧
  43. """
  44. if cols is None:
  45. cols = []
  46. for col in cols:
  47. if col in df.columns and df[col].dtype == object:
  48. df[col] = df[col].str.strip()
  49. return df
  50. # 切割数组到多个数组
  51. def split_array(array: List, num: int) -> List[List]:
  52. """
  53. 将数组切割成多个子数组
  54. Args:
  55. array: 原始数组
  56. num: 每个子数组的长度
  57. Returns:
  58. 子数组列表
  59. """
  60. return [array[i:i + num] for i in range(0, len(array), num)]
  61. def find_read_header(file_path: str, trans_cols: List[str], resolve_col_prefix: Optional[str] = None) -> Optional[int]:
  62. """
  63. 查找文件的表头行
  64. Args:
  65. file_path: 文件路径
  66. trans_cols: 要匹配的列名列表
  67. resolve_col_prefix: 列名前缀解析表达式
  68. Returns:
  69. 表头行索引
  70. """
  71. df = read_file_to_df(file_path, nrows=20)
  72. df.reset_index(inplace=True)
  73. count = 0
  74. header = None
  75. df_cols = df.columns
  76. if resolve_col_prefix:
  77. valid_eval(resolve_col_prefix)
  78. df_cols = [eval(resolve_col_prefix) for column in df.columns]
  79. for col in trans_cols:
  80. if col in df_cols:
  81. count += 1
  82. if count >= 2:
  83. header = 0
  84. break
  85. count = 0
  86. for index, row in df.iterrows():
  87. if resolve_col_prefix:
  88. values = [eval(resolve_col_prefix) for column in row.values]
  89. else:
  90. values = row.values
  91. for col in trans_cols:
  92. if col in values:
  93. count += 1
  94. if count > 2:
  95. header = index + 1
  96. return header
  97. return header
  98. # 读取数据到df
  99. def read_file_to_df(file_path: str, read_cols: Optional[List[str]] = None, trans_cols: Optional[List[str]] = None,
  100. nrows: Optional[int] = None, not_find_header: str = 'raise',
  101. resolve_col_prefix: Optional[str] = None) -> pd.DataFrame:
  102. """
  103. 读取文件到数据帧
  104. Args:
  105. file_path: 文件路径
  106. read_cols: 要读取的列列表
  107. trans_cols: 要匹配的列名列表
  108. nrows: 读取的行数
  109. not_find_header: 未找到表头时的处理方式
  110. resolve_col_prefix: 列名前缀解析表达式
  111. Returns:
  112. 读取的数据帧
  113. """
  114. begin = datetime.datetime.now()
  115. debug('开始读取文件', file_path)
  116. header = 0
  117. if trans_cols:
  118. header = find_read_header(file_path, trans_cols, resolve_col_prefix)
  119. debug(os.path.basename(file_path), "读取第", header, "行")
  120. if header is None:
  121. if not_find_header == 'raise':
  122. message = '未匹配到开始行,请检查并重新指定'
  123. debug(message)
  124. raise Exception(message)
  125. elif not_find_header == 'ignore':
  126. pass
  127. df = pd.DataFrame()
  128. if header is not None:
  129. try:
  130. file_path_lower = str(file_path).lower()
  131. if file_path_lower.endswith("csv") or file_path_lower.endswith("gz"):
  132. encoding = detect_file_encoding(file_path)
  133. end_with_gz = file_path_lower.endswith("gz")
  134. if read_cols:
  135. if end_with_gz:
  136. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip',
  137. header=header,
  138. nrows=nrows)
  139. else:
  140. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
  141. on_bad_lines='warn', nrows=nrows)
  142. else:
  143. if end_with_gz:
  144. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
  145. else:
  146. df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
  147. else:
  148. xls = pd.ExcelFile(file_path)
  149. # 获取所有的sheet名称
  150. sheet_names = xls.sheet_names
  151. for sheet_name in sheet_names:
  152. if read_cols:
  153. now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols,
  154. nrows=nrows)
  155. else:
  156. now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
  157. now_df['sheet_name'] = sheet_name
  158. df = pd.concat([df, now_df])
  159. xls.close()
  160. debug('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
  161. except Exception as e:
  162. error('读取文件出错', file_path, str(e))
  163. message = '文件:' + os.path.basename(file_path) + ',' + str(e)
  164. raise ValueError(message)
  165. return df
  166. def __build_directory_dict(directory_dict: Dict[str, List[str]], path: str,
  167. filter_types: Optional[List[str]] = None) -> None:
  168. """
  169. 构建目录文件字典
  170. Args:
  171. directory_dict: 目录文件字典
  172. path: 目录路径
  173. filter_types: 文件类型过滤器
  174. """
  175. # 遍历目录下的所有项
  176. for item in os.listdir(path):
  177. item_path = os.path.join(path, item)
  178. if os.path.isdir(item_path):
  179. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  180. elif os.path.isfile(item_path):
  181. if path not in directory_dict:
  182. directory_dict[path] = []
  183. if filter_types is None or len(filter_types) == 0:
  184. directory_dict[path].append(item_path)
  185. else:
  186. # 获取文件扩展名
  187. ext = os.path.splitext(item_path)[1].lstrip('.').lower()
  188. if ext in filter_types and "~$" not in item_path:
  189. directory_dict[path].append(item_path)
  190. # 读取路径下所有的excel文件
  191. def read_excel_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]:
  192. """
  193. 读取路径下所有的Excel文件
  194. Args:
  195. read_path: 读取路径
  196. filter_types: 文件类型过滤器
  197. Returns:
  198. 文件路径列表
  199. """
  200. if not os.path.exists(read_path):
  201. return []
  202. if filter_types is None:
  203. # filter_types = ['xls', 'xlsx', 'csv', 'gz']
  204. filter_types = FileTypes.EXCEL_TYPES
  205. if os.path.isfile(read_path):
  206. return [read_path]
  207. directory_dict = {}
  208. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  209. return [path for paths in directory_dict.values() for path in paths if path]
  210. # 读取路径下所有的文件
  211. def read_files(read_path: str, filter_types: Optional[List[str]] = None) -> List[str]:
  212. """
  213. 读取路径下所有的文件
  214. Args:
  215. read_path: 读取路径
  216. filter_types: 文件类型过滤器
  217. Returns:
  218. 文件路径列表
  219. """
  220. if filter_types is None:
  221. filter_types = list(FileTypes.EXCEL_TYPES)
  222. filter_types.extend(FileTypes.ZIP_TYPES)
  223. if os.path.isfile(read_path):
  224. return [read_path]
  225. directory_dict = {}
  226. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  227. return [path1 for paths in directory_dict.values() for path1 in paths if path1]
  228. def copy_to_new(from_path: str, to_path: str) -> None:
  229. """
  230. 复制文件到新路径
  231. Args:
  232. from_path: 源文件路径
  233. to_path: 目标文件路径
  234. """
  235. is_file = '.' in to_path
  236. create_file_path(to_path, is_file_path=is_file)
  237. shutil.copy(from_path, to_path)
  238. # 创建路径
  239. def create_file_path(read_path: str, is_file_path: bool = False) -> None:
  240. """
  241. 创建路径
  242. Args:
  243. read_path: 创建文件夹的路径
  244. is_file_path: 传入的path是否包含具体的文件名
  245. """
  246. if is_file_path:
  247. read_path = os.path.dirname(read_path)
  248. if not os.path.exists(read_path):
  249. os.makedirs(read_path, exist_ok=True)
  250. def valid_eval(eval_str: str) -> bool:
  251. """
  252. 验证 eval 是否包含非法的参数
  253. Args:
  254. eval_str: 要验证的表达式
  255. Returns:
  256. 是否合法
  257. """
  258. safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
  259. eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
  260. if not set(eval_str_names).issubset(safe_param):
  261. raise NameError(
  262. eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
  263. return True
  264. if __name__ == '__main__':
  265. # aa = valid_eval("column[column.find('_')+1:]")
  266. # print(aa)
  267. #
  268. # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
  269. # print(aa)
  270. #
  271. # aa = valid_eval("'记录时间' if column == '时间' else column;from os import *; path")
  272. # print(aa)
  273. df = read_file_to_df(r"D:\data\11-12月.xls",
  274. trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
  275. print(df.columns)