trans_methods.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/16
  3. # @Author : 魏志亮
  4. import os
  5. import re
  6. import shutil
  7. import warnings
  8. import chardet
  9. import pandas as pd
  10. from utils.log.trans_log import trans_print
  11. warnings.filterwarnings("ignore")
  12. # 获取文件编码
  13. def detect_file_encoding(filename):
  14. # 读取文件的前1000个字节(足够用于大多数编码检测)
  15. with open(filename, 'rb') as f:
  16. rawdata = f.read(1000)
  17. result = chardet.detect(rawdata)
  18. encoding = result['encoding']
  19. trans_print("文件类型:", filename, encoding)
  20. if encoding is None:
  21. encoding = 'gb18030'
  22. if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
  23. encoding = 'gb18030'
  24. return encoding
  25. def del_blank(df=pd.DataFrame(), cols=list()):
  26. for col in cols:
  27. if df[col].dtype == object:
  28. df[col] = df[col].str.strip()
  29. return df
  30. # 切割数组到多个数组
  31. def split_array(array, num):
  32. return [array[i:i + num] for i in range(0, len(array), num)]
  33. # 读取数据到df
  34. def read_file_to_df(file_path, read_cols=list(), header=0):
  35. trans_print('开始读取文件', file_path)
  36. df = pd.DataFrame()
  37. if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
  38. encoding = detect_file_encoding(file_path)
  39. end_with_gz = str(file_path).lower().endswith("gz")
  40. if read_cols:
  41. if end_with_gz:
  42. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
  43. else:
  44. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header)
  45. else:
  46. if end_with_gz:
  47. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
  48. else:
  49. df = pd.read_csv(file_path, encoding=encoding, header=header)
  50. else:
  51. xls = pd.ExcelFile(file_path)
  52. # 获取所有的sheet名称
  53. sheet_names = xls.sheet_names
  54. for sheet in sheet_names:
  55. if read_cols:
  56. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
  57. else:
  58. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
  59. trans_print('文件读取成功', file_path, '文件数量', df.shape)
  60. return df
  61. def __build_directory_dict(directory_dict, path, filter_types=None):
  62. # 遍历目录下的所有项
  63. for item in os.listdir(path):
  64. item_path = os.path.join(path, item)
  65. if os.path.isdir(item_path):
  66. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  67. elif os.path.isfile(item_path):
  68. if path not in directory_dict:
  69. directory_dict[path] = []
  70. if filter_types is None or len(filter_types) == 0:
  71. directory_dict[path].append(item_path)
  72. elif str(item_path).split(".")[-1] in filter_types:
  73. if str(item_path).count("~$") == 0:
  74. directory_dict[path].append(item_path)
  75. # 读取所有文件
  76. # 读取路径下所有的excel文件
  77. def read_excel_files(read_path):
  78. directory_dict = {}
  79. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  80. return [path for paths in directory_dict.values() for path in paths if path]
  81. # 读取路径下所有的文件
  82. def read_files(read_path):
  83. directory_dict = {}
  84. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
  85. return [path for paths in directory_dict.values() for path in paths if path]
  86. def copy_to_new(from_path, to_path):
  87. is_file = False
  88. if to_path.count('.') > 0:
  89. is_file = True
  90. create_file_path(to_path, is_file_path=is_file)
  91. shutil.copy(from_path, to_path)
  92. # 创建路径
  93. def create_file_path(path, is_file_path=False):
  94. if is_file_path:
  95. path = os.path.dirname(path)
  96. if not os.path.exists(path):
  97. os.makedirs(path, exist_ok=True)
  98. # 格式化风机名称
  99. def generate_turbine_name(turbine_name='F0001', prefix='F'):
  100. strinfo = re.compile(r"[\D*]")
  101. name = strinfo.sub('', str(turbine_name))
  102. return prefix + str(int(name)).zfill(3)