baiyushan_20240906.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import datetime
  2. import os
  3. from multiprocessing import Pool
  4. import chardet
  5. import pandas as pd
  6. # 获取文件编码
  7. def detect_file_encoding(filename):
  8. # 读取文件的前1000个字节(足够用于大多数编码检测)
  9. with open(filename, 'rb') as f:
  10. rawdata = f.read(1000)
  11. result = chardet.detect(rawdata)
  12. encoding = result['encoding']
  13. if encoding is None:
  14. encoding = 'gb18030'
  15. if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
  16. encoding = 'gb18030'
  17. return encoding
  18. # 读取数据到df
  19. def read_file_to_df(file_path, read_cols=list(), header=0):
  20. df = pd.DataFrame()
  21. if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
  22. encoding = detect_file_encoding(file_path)
  23. end_with_gz = str(file_path).lower().endswith("gz")
  24. if read_cols:
  25. if end_with_gz:
  26. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
  27. else:
  28. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
  29. else:
  30. if end_with_gz:
  31. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
  32. else:
  33. df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
  34. else:
  35. xls = pd.ExcelFile(file_path)
  36. # 获取所有的sheet名称
  37. sheet_names = xls.sheet_names
  38. for sheet in sheet_names:
  39. if read_cols:
  40. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
  41. else:
  42. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
  43. return df
  44. def __build_directory_dict(directory_dict, path, filter_types=None):
  45. # 遍历目录下的所有项
  46. for item in os.listdir(path):
  47. item_path = os.path.join(path, item)
  48. if os.path.isdir(item_path):
  49. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  50. elif os.path.isfile(item_path):
  51. if path not in directory_dict:
  52. directory_dict[path] = []
  53. if filter_types is None or len(filter_types) == 0:
  54. directory_dict[path].append(item_path)
  55. elif str(item_path).split(".")[-1] in filter_types:
  56. if str(item_path).count("~$") == 0:
  57. directory_dict[path].append(item_path)
  58. # 读取所有文件
  59. # 读取路径下所有的excel文件
  60. def read_excel_files(read_path):
  61. directory_dict = {}
  62. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  63. return [path for paths in directory_dict.values() for path in paths if path]
  64. # 创建路径
  65. def create_file_path(path, is_file_path=False):
  66. if is_file_path:
  67. path = os.path.dirname(path)
  68. if not os.path.exists(path):
  69. os.makedirs(path, exist_ok=True)
  70. def read_status(status_path):
  71. all_files = read_excel_files(status_path)
  72. with Pool(20) as pool:
  73. dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '状态码', '开始时间'], 2) for file in all_files])
  74. df = pd.concat(dfs)
  75. df = df[df['状态码'].isin([3, 5])]
  76. df['开始时间'] = pd.to_datetime(df['开始时间'])
  77. df['处理后时间'] = (df['开始时间'] + pd.Timedelta(minutes=10)).apply(
  78. lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
  79. df['处理后时间'] = pd.to_datetime(df['处理后时间'])
  80. df = df[(df['处理后时间'] >= '2023-09-01 00:00:00')]
  81. df[df['处理后时间'] >= '2024-09-01 00:00:00'] = '2024-09-01 00:00:00'
  82. df.sort_values(by=['设备名称', '处理后时间'], inplace=True)
  83. return df
  84. def read_fault_data(fault_path):
  85. all_files = read_excel_files(fault_path)
  86. with Pool(20) as pool:
  87. dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '故障开始时间'], 2) for file in all_files])
  88. df = pd.concat(dfs)
  89. df = df[df['设备名称'].str.startswith("#")]
  90. df['故障开始时间'] = pd.to_datetime(df['故障开始时间'])
  91. df['处理后故障开始时间'] = (df['故障开始时间'] + pd.Timedelta(minutes=10)).apply(
  92. lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
  93. df['处理后故障开始时间'] = pd.to_datetime(df['处理后故障开始时间'])
  94. df = df[(df['处理后故障开始时间'] >= '2023-09-01 00:00:00') & (df['处理后故障开始时间'] < '2024-09-01 00:00:00')]
  95. df.sort_values(by=['设备名称', '处理后故障开始时间'], inplace=True)
  96. return df
  97. def read_10min_data(data_path):
  98. all_files = read_excel_files(data_path)
  99. with Pool(20) as pool:
  100. dfs = pool.starmap(read_file_to_df,
  101. [(file, ['设备名称', '时间', '平均风速(m/s)', '平均网侧有功功率(kW)'], 1) for file in all_files])
  102. df = pd.concat(dfs)
  103. df['时间'] = pd.to_datetime(df['时间'])
  104. df = df[(df['时间'] >= '2023-09-01 00:00:00') & (df['时间'] < '2024-09-01 00:00:00')]
  105. df.sort_values(by=['设备名称', '时间'], inplace=True)
  106. return df
  107. def select_data_and_save(name, fault_df, origin_df):
  108. df = pd.DataFrame()
  109. for i in range(fault_df.shape[0]):
  110. fault = fault_df.iloc[i]
  111. con1 = origin_df['时间'] >= fault['处理后故障开始时间']
  112. con2 = origin_df['时间'] <= fault['结束时间']
  113. df = pd.concat([df, origin_df[con1 & con2]])
  114. name = name.replace('#', 'F')
  115. df.drop_duplicates(inplace=True)
  116. df.to_csv(save_path + os.sep + name + '.csv', index=False, encoding='utf8')
  117. if __name__ == '__main__':
  118. base_path = r'/data/download/白玉山/需要整理的数据'
  119. save_path = base_path + os.sep + 'sele_data_202409261135'
  120. create_file_path(save_path)
  121. status_df = read_status(base_path + os.sep + '设备状态')
  122. fault_df = read_fault_data(base_path + os.sep + '故障')
  123. data_df = read_10min_data(base_path + os.sep + '十分钟')
  124. status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
  125. fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
  126. data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
  127. print(status_df.shape)
  128. print(fault_df.shape)
  129. print(data_df.shape)
  130. fault_list = list()
  131. for i in range(fault_df.shape[0]):
  132. data = fault_df.iloc[i]
  133. con1 = status_df['设备名称'] == data['设备名称']
  134. con2 = status_df['处理后时间'] >= data['处理后故障开始时间']
  135. fault_list.append(status_df[con1 & con2]['处理后时间'].min())
  136. fault_df['结束时间'] = fault_list
  137. status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
  138. fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
  139. data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
  140. names = set(fault_df['设备名称'])
  141. fault_map = dict()
  142. data_map = dict()
  143. for name in names:
  144. fault_map[name] = fault_df[fault_df['设备名称'] == name]
  145. data_map[name] = data_df[data_df['设备名称'] == name]
  146. with Pool(20) as pool:
  147. pool.starmap(select_data_and_save, [(name, fault_map[name], data_map[name]) for name in names])