WindFarms.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import shutil
  6. from base.TranseParam import TranseParam
  7. from utils.db.trans_mysql import *
  8. from utils.log.trans_log import logger
  9. from utils.trans_methods import *
  10. from utils.zip.unzip import unzip
  11. class WindFarms(object):
  12. def __init__(self, name, batch_no=None, save_path=None, params: TranseParam = None):
  13. self.name = name
  14. self.batch_no = batch_no
  15. self.save_path = save_path
  16. self.begin = datetime.datetime.now()
  17. self.next_time = datetime.datetime.now()
  18. self.is_zip = False
  19. self.save_zip = False
  20. self.trans_param = params
  21. def set_trans_param(self, params: TranseParam):
  22. self.trans_param = params
  23. def __params_valid(self, not_null_list=list()):
  24. for arg in not_null_list:
  25. if arg is None or arg == '':
  26. raise Exception("Invalid param set :" + arg)
  27. def __get_save_path(self):
  28. return os.path.join(self.save_path, self.name, self.batch_no)
  29. def __get_zip_tmp_path(self):
  30. return os.path.join(self.__get_save_path(), 'save_tmp')
  31. def __get_read_tmp_path(self):
  32. return os.path.join(self.__get_save_path(), 'read_tmp')
  33. def get_excel_files(self):
  34. if self.is_zip:
  35. is_success, e = unzip(self.trans_param.read_path, self.__get_zip_tmp_path())
  36. if is_success:
  37. self.trans_param.read_path = self.__get_zip_tmp_path()
  38. else:
  39. raise e
  40. return read_excel_files(self.trans_param.read_path)
  41. def read_excel_to_df(self, file):
  42. return read_file_to_df(file, self.trans_param.read_cols)
  43. def save_to_csv(self, df, filename):
  44. save_name = str(filename) + ('.csv' if self.save_zip else '.csv.gz')
  45. save_path = os.path.join(self.save_path, self.name, self.batch_no, self.trans_param.read_type,
  46. save_name)
  47. create_file_path(save_path, is_file_path=True)
  48. if self.save_zip:
  49. df[df[self.trans_param.wind_col] == filename].to_csv(save_path, compression='.gzip', index=False)
  50. else:
  51. df[df[self.trans_param.wind_col] == filename].to_csv(save_path, index=False)
  52. trans_print("保存" + str(filename) + ".csv成功")
  53. def save_to_db(self, df, filename):
  54. df.to_sql(name=str(self.batch_no), con=engine.connect(), index=False, if_exists='append',
  55. chunksize=1000000)
  56. trans_print("文件:", filename, "保存数据库成功")
  57. def run(self):
  58. trans_print("开始执行", self.name, self.trans_param.read_type)
  59. self.__params_valid([self.name, self.batch_no, self.save_path, self.trans_param.read_type,
  60. self.trans_param.read_path,
  61. self.trans_param.time_col, self.trans_param.wind_col])
  62. # 读取文件
  63. try:
  64. all_files = self.get_excel_files()
  65. trans_print('读取文件数量:', len(all_files))
  66. except Exception as e:
  67. logger.exception(e)
  68. message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
  69. update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
  70. raise e
  71. # 开始读取数据
  72. df = pd.DataFrame()
  73. for file in all_files:
  74. try:
  75. df = pd.concat([df, self.read_excel_to_df(file)])
  76. except Exception as e:
  77. logger.exception(e)
  78. message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
  79. update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
  80. raise e
  81. # 转换字段
  82. if self.trans_param.cols_tran:
  83. cols_tran = self.trans_param.cols_tran
  84. real_cols_trans = dict()
  85. for k, v in cols_tran.items():
  86. if v:
  87. real_cols_trans[v] = k
  88. logger.info("包含转换字段,开始处理转换字段")
  89. df.rename(columns=real_cols_trans, inplace=True)
  90. if self.trans_param.wind_col in real_cols_trans.keys():
  91. self.trans_param.wind_col = real_cols_trans[self.trans_param.wind_col]
  92. for k in cols_tran.keys():
  93. if k not in df.columns:
  94. df[k] = None
  95. # 添加年月日
  96. if self.trans_param.time_col:
  97. logger.info("包含时间字段,开始处理时间字段,添加年月日")
  98. df[self.trans_param.time_col] = pd.to_datetime(df[self.trans_param.time_col])
  99. df['year'] = df[self.trans_param.time_col].dt.year
  100. df['month'] = df[self.trans_param.time_col].dt.month
  101. df['day'] = df[self.trans_param.time_col].dt.day
  102. df.sort_values(by=self.trans_param.time_col, inplace=True)
  103. logger.info("处理时间字段结束")
  104. # 开始保存
  105. try:
  106. names = set(df[self.trans_param.wind_col])
  107. trans_print(names, self.trans_param.wind_col)
  108. for filename in names:
  109. self.save_to_csv(df[df[self.trans_param.wind_col] == filename], filename)
  110. # self.save_to_db(df[df[self.trans_param.wind_col] == filename], filename)
  111. except Exception as e:
  112. logger.exception(e)
  113. message = "保存文件错误:" + self.save_path + ",系统返回错误:" + str(e)
  114. update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
  115. raise e
  116. update_transe_status(self.batch_no, self.trans_param.read_type, "success", "")
  117. if self.is_zip:
  118. trans_print("开始删除解压进临时文件夹")
  119. shutil.rmtree(self.__get_zip_tmp_path())
  120. trans_print("删除解压进临时文件夹删除成功")
  121. if __name__ == '__main__':
  122. aa = WindFarms("test", "test_path")
  123. aa.run()