大唐玉湖数据整理_1.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Tue Jul 9 16:28:48 2024
  4. @author: Administrator
  5. """
  6. import multiprocessing
  7. from datetime import datetime
  8. from os import *
  9. import chardet
  10. import pandas as pd
  11. pd.options.mode.copy_on_write = True
  12. # 获取文件编码
  13. def detect_file_encoding(filename):
  14. # 读取文件的前1000个字节(足够用于大多数编码检测)
  15. with open(filename, 'rb') as f:
  16. rawdata = f.read(1000)
  17. result = chardet.detect(rawdata)
  18. encoding = result['encoding']
  19. if encoding is None:
  20. encoding = 'gb18030'
  21. if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
  22. encoding = 'gb18030'
  23. return encoding
  24. # 读取数据到df
  25. def read_file_to_df(file_path, read_cols=list(), header=0):
  26. df = pd.DataFrame()
  27. if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
  28. encoding = detect_file_encoding(file_path)
  29. end_with_gz = str(file_path).lower().endswith("gz")
  30. if read_cols:
  31. if end_with_gz:
  32. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
  33. else:
  34. df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
  35. else:
  36. if end_with_gz:
  37. df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
  38. else:
  39. df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
  40. else:
  41. xls = pd.ExcelFile(file_path)
  42. # 获取所有的sheet名称
  43. sheet_names = xls.sheet_names
  44. for sheet in sheet_names:
  45. if read_cols:
  46. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
  47. else:
  48. df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
  49. return df
  50. def __build_directory_dict(directory_dict, path, filter_types=None):
  51. # 遍历目录下的所有项
  52. for item in listdir(path):
  53. item_path = path.join(path, item)
  54. if path.isdir(item_path):
  55. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  56. elif path.isfile(item_path):
  57. if path not in directory_dict:
  58. directory_dict[path] = []
  59. if filter_types is None or len(filter_types) == 0:
  60. directory_dict[path].append(item_path)
  61. elif str(item_path).split(".")[-1] in filter_types:
  62. if str(item_path).count("~$") == 0:
  63. directory_dict[path].append(item_path)
  64. # 读取所有文件
  65. # 读取路径下所有的excel文件
  66. def read_excel_files(read_path):
  67. directory_dict = {}
  68. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  69. return [path for paths in directory_dict.values() for path in paths if path]
  70. # 创建路径
  71. def create_file_path(path, is_file_path=False):
  72. if is_file_path:
  73. path = path.dirname(path)
  74. if not path.exists(path):
  75. makedirs(path, exist_ok=True)
  76. def generate_df(pv_df, col):
  77. if col != '时间':
  78. xiangbian = col.split("逆变器")[0].replace("#", "")
  79. nibianqi = col.split("-")[0].split('逆变器')[1]
  80. pv_index = col.split("-")[1].replace("PV", "")
  81. now_df = pv_df[['时间', col + '输入电流()', col + '输入电压()']]
  82. now_df.loc[:, '箱变'] = xiangbian
  83. now_df.loc[:, '逆变器'] = nibianqi
  84. now_df.loc[:, 'PV'] = pv_index
  85. now_df.columns = [df_col.replace(col, "").replace("()", "") for df_col in now_df.columns]
  86. now_df['输入电流'] = now_df['输入电流'].astype(float)
  87. now_df['输入电压'] = now_df['输入电压'].astype(float)
  88. print(xiangbian, nibianqi, pv_index, now_df.shape)
  89. return now_df
  90. return pd.DataFrame()
  91. def split_index(split_data: str, split_str: str):
  92. count = split_data.find(split_str)
  93. if count > -1:
  94. return split_data[count + len(split_str):]
  95. else:
  96. return split_str
  97. def replece_col_to_biaozhun(col):
  98. for k, v in dianjian_dict.items():
  99. if col.find(k) > -1:
  100. col = col.replace(k, v)
  101. return col
  102. return col
  103. def read_and_save_csv(file_path, save_path):
  104. begin = datetime.now()
  105. base_name = path.basename(file_path)
  106. print('开始', base_name)
  107. df = read_file_to_df(file_path)
  108. for col in df.columns:
  109. for del_col in del_cols:
  110. if col.find(del_col) > -1:
  111. del df[col]
  112. df['时间'] = pd.to_datetime(df['时间'])
  113. xiangbian = [col for col in df.columns if str(col).startswith('#') and str(col).find('逆变器') > -1][0].split("逆变器")[
  114. 0].replace("#", "")
  115. df.columns = [xiangbian + "_" + split_index(df_col, "逆变器").replace('PV', "").replace("()", "").replace("-",
  116. "_") if df_col.startswith(
  117. "#") else df_col for df_col in
  118. df.columns]
  119. df.columns = [col.replace("输入", "_输入") for col in df.columns]
  120. df.columns = [replece_col_to_biaozhun(col) for col in df.columns]
  121. # saved_pv_df = pd.concat(dfs)
  122. df.sort_values(by=['时间'], inplace=True)
  123. save_file = path.join(save_path, path.basename(file_path))
  124. create_file_path(save_file, True)
  125. df.to_csv(save_file, encoding='utf-8', index=False)
  126. print('结束', base_name, '耗时:' + str(datetime.now() - begin))
  127. dianjian_data_str = """
  128. 输入电压 支路输出电压
  129. 输入电流 支路输出电流
  130. 功率因数
  131. 总发电量 逆变器总发电量
  132. 无功功率
  133. 有功功率 逆变器输出有功功率
  134. 机内温度 逆变器温度
  135. 电网AB线电压 交流输出电压
  136. 电网A相电流 逆变器输出电流A相
  137. 电网BC线电压
  138. 电网B相电流 逆变器输出电流B相
  139. 电网CA线电压
  140. 电网C相电流 逆变器输出电流C相
  141. 逆变器效率 逆变器转换效率
  142. """
  143. dianjian_dict = {}
  144. del_cols = []
  145. for data in dianjian_data_str.split("\n"):
  146. if data:
  147. datas = data.split("\t")
  148. if len(datas) == 2 and datas[1]:
  149. dianjian_dict[datas[0]] = datas[1]
  150. else:
  151. del_cols.append(datas[0])
  152. if __name__ == '__main__':
  153. path = r'/data/download/大唐玉湖性能分析离线分析/05整理数据/逆变器数据'
  154. save_path = r'/data/download/大唐玉湖性能分析离线分析/06整理数据/逆变器数据'
  155. all_files = read_excel_files(path)
  156. all_datas = list(all_files)
  157. all_datas.sort()
  158. print(all_datas)
  159. #
  160. # for file in all_datas:
  161. # read_and_save_csv(file, save_path)
  162. with multiprocessing.Pool(20) as pool:
  163. pool.starmap(read_and_save_csv, [(i, save_path) for i in all_datas])