大唐玉湖数据整理_2.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Tue Jul 9 16:28:48 2024
  4. @author: Administrator
  5. """
  6. import multiprocessing
  7. from datetime import datetime
  8. from os import *
  9. import chardet
  10. import numpy as np
  11. import pandas as pd
  12. pd.options.mode.copy_on_write = True
  13. # 获取文件编码
  14. def detect_file_encoding(filename):
  15. # 读取文件的前1000个字节(足够用于大多数编码检测)
  16. with open(filename, 'rb') as f:
  17. rawdata = f.read(1000)
  18. result = chardet.detect(rawdata)
  19. encoding = result['encoding']
  20. if encoding is None:
  21. encoding = 'gb18030'
  22. if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
  23. encoding = 'gb18030'
  24. return encoding
  25. # 读取数据到df
  26. def read_file_to_df(file_path, read_cols=list(), header=0):
  27. df = pd.DataFrame()
  28. if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
  29. encoding = detect_file_encoding(file_path)
  30. end_with_gz = str(file_path).lower().endswith("gz")
  31. if read_cols:
  32. if end_with_gz:
  33. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
  34. else:
  35. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
  36. else:
  37. if end_with_gz:
  38. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
  39. else:
  40. df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
  41. else:
  42. xls = pd.ExcelFile(file_path)
  43. # 获取所有的sheet名称
  44. sheet_names = xls.sheet_names
  45. for sheet in sheet_names:
  46. if read_cols:
  47. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
  48. else:
  49. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
  50. return df
  51. def __build_directory_dict(directory_dict, path, filter_types=None):
  52. # 遍历目录下的所有项
  53. for item in listdir(path):
  54. item_path = path.join(path, item)
  55. if path.isdir(item_path):
  56. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  57. elif path.isfile(item_path):
  58. if path not in directory_dict:
  59. directory_dict[path] = []
  60. if filter_types is None or len(filter_types) == 0:
  61. directory_dict[path].append(item_path)
  62. elif str(item_path).split(".")[-1] in filter_types:
  63. if str(item_path).count("~$") == 0:
  64. directory_dict[path].append(item_path)
  65. # 读取所有文件
  66. # 读取路径下所有的excel文件
  67. def read_excel_files(read_path):
  68. directory_dict = {}
  69. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  70. return [path for paths in directory_dict.values() for path in paths if path]
  71. # 创建路径
  72. def create_file_path(path, is_file_path=False):
  73. if is_file_path:
  74. path = path.dirname(path)
  75. if not path.exists(path):
  76. makedirs(path, exist_ok=True)
  77. def split_index(split_data: str, split_str: str):
  78. count = split_data.find(split_str)
  79. if count > -1:
  80. return split_data[count + len(split_str):]
  81. else:
  82. return split_str
  83. def replece_col_to_biaozhun(col):
  84. for k, v in dianjian_dict.items():
  85. if col.find(k) > -1:
  86. col = col.replace(k, v)
  87. return col
  88. return col
  89. def row_to_datas(row, pv_dict, inverter_cols, df_cols):
  90. row_datas = list(list())
  91. for xiangbian in pv_dict.keys():
  92. for nibianqi in pv_dict[xiangbian].keys():
  93. for pv in pv_dict[xiangbian][nibianqi]:
  94. datas = [np.nan] * 14
  95. datas[0] = row['时间']
  96. datas[1] = xiangbian
  97. datas[2] = nibianqi
  98. datas[3] = pv
  99. datas_4_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电压'])
  100. if datas_4_col in df_cols:
  101. datas[4] = row[datas_4_col]
  102. else:
  103. datas[4] = np.nan
  104. datas_5_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电流'])
  105. if datas_5_col in df_cols:
  106. datas[5] = row[datas_5_col]
  107. else:
  108. datas[5] = np.nan
  109. row_datas.append(datas)
  110. for xiangbian in pv_dict.keys():
  111. for nibianqi in pv_dict[xiangbian].keys():
  112. datas = [np.nan] * 14
  113. datas[0] = row['时间']
  114. datas[1] = xiangbian
  115. datas[2] = nibianqi
  116. datas[3] = 0
  117. for index, col_name in enumerate(inverter_cols):
  118. col = '_'.join([str(xiangbian), str(nibianqi), col_name])
  119. if col in df_cols:
  120. datas[index + 6] = row[col]
  121. else:
  122. datas[index + 6] = np.nan
  123. row_datas.append(datas)
  124. return row_datas
  125. def df_to_biaozhun(df):
  126. pv_cols = ['支路输出电压', '支路输出电流']
  127. inverter_cols = ['逆变器总发电量', '逆变器输出有功功率', '逆变器温度', '交流输出电压', '逆变器输出电流A相', '逆变器输出电流B相', '逆变器输出电流C相', '逆变器转换效率']
  128. # 从列名获取箱变->逆变器->PV等的字典
  129. pv_dict = dict(dict())
  130. for col in df.columns:
  131. for pv_col in pv_cols:
  132. if str(col).endswith(pv_col):
  133. datas = col.split("_")
  134. xiangbian = datas[0]
  135. nibiangqi = datas[1]
  136. pv = datas[2]
  137. if xiangbian in pv_dict.keys():
  138. if nibiangqi in pv_dict[xiangbian]:
  139. pv_dict[xiangbian][nibiangqi].add(pv)
  140. else:
  141. pv_dict[xiangbian][nibiangqi] = set([pv])
  142. else:
  143. pv_dict[xiangbian] = {nibiangqi: set([pv])}
  144. results = df.apply(row_to_datas, args=(pv_dict, inverter_cols, df.columns), axis=1)
  145. df_datas = results.to_list()
  146. df_datas = [da for data in df_datas for da in data]
  147. df_cols = ["时间", "箱变", "逆变器", "支路"]
  148. df_cols.extend(pv_cols)
  149. df_cols.extend(inverter_cols)
  150. df = pd.DataFrame(df_datas, columns=df_cols)
  151. type_conver_list = []
  152. type_conver_list.extend(pv_cols)
  153. type_conver_list.extend(inverter_cols)
  154. for type_conver in type_conver_list:
  155. df[type_conver] = pd.to_numeric(df[type_conver], errors='coerce')
  156. return df
  157. def read_and_save_csv(file_path, save_path):
  158. begin = datetime.now()
  159. base_name = path.basename(file_path)
  160. print('开始', base_name)
  161. df = read_file_to_df(file_path)
  162. for col in df.columns:
  163. for del_col in del_cols:
  164. if col.find(del_col) > -1:
  165. del df[col]
  166. df['时间'] = pd.to_datetime(df['时间'])
  167. xiangbian = [col for col in df.columns if str(col).startswith('#') and str(col).find('逆变器') > -1][0].split("逆变器")[
  168. 0].replace("#", "")
  169. df.columns = [xiangbian + "_" + split_index(df_col, "逆变器").replace('PV', "").replace("()", "").replace("-",
  170. "_") if df_col.startswith(
  171. "#") else df_col for df_col in
  172. df.columns]
  173. df.columns = [col.replace("输入", "_输入") for col in df.columns]
  174. df.columns = [replece_col_to_biaozhun(col) for col in df.columns]
  175. df = df_to_biaozhun(df)
  176. # df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
  177. # save_file = path.join(save_path, path.basename(file_path))
  178. # create_file_path(save_file, True)
  179. # df.to_csv(save_file, encoding='utf-8', index=False)
  180. print('结束', base_name, '耗时:' + str(datetime.now() - begin))
  181. return df
  182. dianjian_data_str = """
  183. 输入电压 支路输出电压
  184. 输入电流 支路输出电流
  185. 功率因数
  186. 总发电量 逆变器总发电量
  187. 无功功率
  188. 有功功率 逆变器输出有功功率
  189. 机内温度 逆变器温度
  190. 电网AB线电压 交流输出电压
  191. 电网A相电流 逆变器输出电流A相
  192. 电网BC线电压
  193. 电网B相电流 逆变器输出电流B相
  194. 电网CA线电压
  195. 电网C相电流 逆变器输出电流C相
  196. 逆变器效率 逆变器转换效率
  197. """
  198. dianjian_dict = {}
  199. del_cols = []
  200. for data in dianjian_data_str.split("\n"):
  201. if data:
  202. datas = data.split("\t")
  203. if len(datas) == 2 and datas[1]:
  204. dianjian_dict[datas[0]] = datas[1]
  205. else:
  206. del_cols.append(datas[0])
  207. if __name__ == '__main__':
  208. path = r'/data/download/大唐玉湖性能分析离线分析/05整理数据/逆变器数据'
  209. save_path = r'/data/download/大唐玉湖性能分析离线分析/06整理数据/逆变器数据'
  210. # path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\yuanshi'
  211. # save_path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\zhengli'
  212. all_files = read_excel_files(path)
  213. all_datas = list(all_files)
  214. all_datas.sort()
  215. print(all_datas)
  216. # for file in all_datas:
  217. # read_and_save_csv(file, save_path)
  218. with multiprocessing.Pool(40) as pool:
  219. dfs = pool.starmap(read_and_save_csv, [(i, save_path) for i in all_datas])
  220. saved_pv_df = pd.concat(dfs)
  221. saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
  222. save_file = path.join(save_path, "合并.csv")
  223. create_file_path(save_file, True)
  224. saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
  225. saved_pv_df.to_csv(save_file, encoding='utf-8', index=False)