trans_methods.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/16
  3. # @Author : 魏志亮
  4. import ast
  5. import datetime
  6. import os
  7. import shutil
  8. import warnings
  9. import chardet
  10. import pandas as pd
  11. from utils.common import excel_types, zip_types
  12. from utils.log.trans_log import trans_print
  13. warnings.filterwarnings("ignore")
  14. # 获取文件编码
  15. def detect_file_encoding(filename):
  16. # 读取文件的前1000个字节(足够用于大多数编码检测)
  17. with open(filename, 'rb') as f:
  18. rawdata = f.read(1000)
  19. result = chardet.detect(rawdata)
  20. encoding = result['encoding']
  21. trans_print("文件类型:", filename, encoding)
  22. if encoding is None:
  23. encoding = 'gb18030'
  24. if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
  25. return 'utf-8'
  26. return 'gb18030'
  27. def del_blank(df=pd.DataFrame(), cols=list()):
  28. for col in cols:
  29. if df[col].dtype == object:
  30. df[col] = df[col].str.strip()
  31. return df
  32. # 切割数组到多个数组
  33. def split_array(array, num):
  34. return [array[i:i + num] for i in range(0, len(array), num)]
  35. def find_read_header(file_path, trans_cols, resolve_col_prefix=None):
  36. df = read_file_to_df(file_path, nrows=20)
  37. df.reset_index(inplace=True)
  38. count = 0
  39. header = None
  40. df_cols = df.columns
  41. if resolve_col_prefix:
  42. valid_eval(resolve_col_prefix)
  43. df_cols = [eval(resolve_col_prefix) for column in df.columns]
  44. for col in trans_cols:
  45. if col in df_cols:
  46. count = count + 1
  47. if count >= 2:
  48. header = 0
  49. break
  50. count = 0
  51. for index, row in df.iterrows():
  52. if resolve_col_prefix:
  53. values = [eval(resolve_col_prefix) for column in row.values]
  54. else:
  55. values = row.values
  56. for col in trans_cols:
  57. if col in values:
  58. count = count + 1
  59. if count > 2:
  60. header = index + 1
  61. break
  62. return header
  63. # 读取数据到df
  64. def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, not_find_header='raise',
  65. resolve_col_prefix=None):
  66. begin = datetime.datetime.now()
  67. trans_print('开始读取文件', file_path)
  68. header = 0
  69. find_cols = list()
  70. if trans_cols:
  71. header = find_read_header(file_path, trans_cols, resolve_col_prefix)
  72. trans_print(os.path.basename(file_path), "读取第", header, "行")
  73. if header is None:
  74. if not_find_header == 'raise':
  75. message = '未匹配到开始行,请检查并重新指定'
  76. trans_print(message)
  77. raise Exception(message)
  78. elif not_find_header == 'ignore':
  79. pass
  80. # read_cols.extend(find_cols)
  81. df = pd.DataFrame()
  82. if header is not None:
  83. try:
  84. if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
  85. encoding = detect_file_encoding(file_path)
  86. end_with_gz = str(file_path).lower().endswith("gz")
  87. if read_cols:
  88. if end_with_gz:
  89. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip',
  90. header=header,
  91. nrows=nrows)
  92. else:
  93. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
  94. on_bad_lines='warn', nrows=nrows)
  95. else:
  96. if end_with_gz:
  97. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
  98. else:
  99. df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
  100. else:
  101. xls = pd.ExcelFile(file_path)
  102. # 获取所有的sheet名称
  103. sheet_names = xls.sheet_names
  104. for sheet_name in sheet_names:
  105. if read_cols:
  106. now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols,
  107. nrows=nrows)
  108. else:
  109. now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
  110. now_df['sheet_name'] = sheet_name
  111. df = pd.concat([df, now_df])
  112. xls.close()
  113. trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
  114. except Exception as e:
  115. trans_print('读取文件出错', file_path, str(e))
  116. message = '文件:' + os.path.basename(file_path) + ',' + str(e)
  117. raise ValueError(message)
  118. return df
  119. def __build_directory_dict(directory_dict, path, filter_types=None):
  120. # 遍历目录下的所有项
  121. for item in os.listdir(path):
  122. item_path = os.path.join(path, item)
  123. if os.path.isdir(item_path):
  124. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  125. elif os.path.isfile(item_path):
  126. if path not in directory_dict:
  127. directory_dict[path] = []
  128. if filter_types is None or len(filter_types) == 0:
  129. directory_dict[path].append(item_path)
  130. elif str(item_path).split(".")[-1] in filter_types:
  131. if str(item_path).count("~$") == 0:
  132. directory_dict[path].append(item_path)
  133. # 读取路径下所有的excel文件
  134. def read_excel_files(read_path, filter_types=None):
  135. if not os.path.exists(read_path):
  136. return []
  137. if filter_types is None:
  138. filter_types = ['xls', 'xlsx', 'csv', 'gz']
  139. if os.path.isfile(read_path):
  140. return [read_path]
  141. directory_dict = {}
  142. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  143. return [path for paths in directory_dict.values() for path in paths if path]
  144. # 读取路径下所有的文件
  145. def read_files(read_path, filter_types=None):
  146. if filter_types is None:
  147. filter_types = [i for i in excel_types]
  148. filter_types.extend(zip_types)
  149. if os.path.isfile(read_path):
  150. return [read_path]
  151. directory_dict = {}
  152. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  153. return [path1 for paths in directory_dict.values() for path1 in paths if path1]
  154. def copy_to_new(from_path, to_path):
  155. is_file = False
  156. if to_path.count('.') > 0:
  157. is_file = True
  158. create_file_path(to_path, is_file_path=is_file)
  159. shutil.copy(from_path, to_path)
  160. # 创建路径
  161. def create_file_path(read_path, is_file_path=False):
  162. """
  163. 创建路径
  164. :param read_path:创建文件夹的路径
  165. :param is_file_path: 传入的path是否包含具体的文件名
  166. """
  167. if is_file_path:
  168. read_path = os.path.dirname(read_path)
  169. if not os.path.exists(read_path):
  170. os.makedirs(read_path, exist_ok=True)
  171. def valid_eval(eval_str):
  172. """
  173. 验证 eval 是否包含非法的参数
  174. """
  175. safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
  176. eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
  177. if not set(eval_str_names).issubset(safe_param):
  178. raise NameError(
  179. eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
  180. return True
  181. if __name__ == '__main__':
  182. # aa = valid_eval("column[column.find('_')+1:]")
  183. # print(aa)
  184. #
  185. # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
  186. # print(aa)
  187. #
  188. # aa = valid_eval("'记录时间' if column == '时间' else column;from os import *; path")
  189. # print(aa)
  190. df = read_file_to_df(r"D:\data\11-12月.xls",
  191. trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
  192. print(df.columns)