|
@@ -0,0 +1,152 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# @Time : 2024/5/15
|
|
|
+# @Author : 魏志亮
|
|
|
+
|
|
|
+import datetime
|
|
|
+import shutil
|
|
|
+
|
|
|
+from base.TranseParam import TranseParam
|
|
|
+from utils.db.trans_mysql import *
|
|
|
+from utils.log.trans_log import logger
|
|
|
+from utils.trans_methods import *
|
|
|
+from utils.zip.unzip import unzip
|
|
|
+
|
|
|
+
|
|
|
+class WindFarms(object):
|
|
|
+
|
|
|
+ def __init__(self, name, batch_no=None, save_path=None, params: TranseParam = None):
|
|
|
+ self.name = name
|
|
|
+ self.batch_no = batch_no
|
|
|
+ self.save_path = save_path
|
|
|
+ self.begin = datetime.datetime.now()
|
|
|
+ self.next_time = datetime.datetime.now()
|
|
|
+ self.is_zip = False
|
|
|
+ self.save_zip = False
|
|
|
+ self.trans_param = params
|
|
|
+
|
|
|
+ def set_trans_param(self, params: TranseParam):
|
|
|
+ self.trans_param = params
|
|
|
+
|
|
|
+ def __params_valid(self, not_null_list=list()):
|
|
|
+ for arg in not_null_list:
|
|
|
+ if arg is None or arg == '':
|
|
|
+ raise Exception("Invalid param set :" + arg)
|
|
|
+
|
|
|
+ def __get_save_path(self):
|
|
|
+ return os.path.join(self.save_path, self.name, self.batch_no)
|
|
|
+
|
|
|
+ def __get_zip_tmp_path(self):
|
|
|
+ return os.path.join(self.__get_save_path(), 'save_tmp')
|
|
|
+
|
|
|
+ def __get_read_tmp_path(self):
|
|
|
+ return os.path.join(self.__get_save_path(), 'read_tmp')
|
|
|
+
|
|
|
+ def get_excel_files(self):
|
|
|
+
|
|
|
+ if self.is_zip:
|
|
|
+ is_success, e = unzip(self.trans_param.read_path, self.__get_zip_tmp_path())
|
|
|
+ if is_success:
|
|
|
+ self.trans_param.read_path = self.__get_zip_tmp_path()
|
|
|
+ else:
|
|
|
+ raise e
|
|
|
+
|
|
|
+ return read_excel_files(self.trans_param.read_path)
|
|
|
+
|
|
|
+ def read_excel_to_df(self, file):
|
|
|
+
|
|
|
+ return read_file_to_df(file, self.trans_param.read_cols)
|
|
|
+
|
|
|
+ def save_to_csv(self, df, filename):
|
|
|
+ save_name = str(filename) + ('.csv' if self.save_zip else '.csv.gz')
|
|
|
+ save_path = os.path.join(self.save_path, self.name, self.batch_no, self.trans_param.read_type,
|
|
|
+ save_name)
|
|
|
+ create_file_path(save_path, is_file_path=True)
|
|
|
+ if self.save_zip:
|
|
|
+ df[df[self.trans_param.wind_col] == filename].to_csv(save_path, compression='.gzip', index=False)
|
|
|
+ else:
|
|
|
+ df[df[self.trans_param.wind_col] == filename].to_csv(save_path, index=False)
|
|
|
+ trans_print("保存" + str(filename) + ".csv成功")
|
|
|
+
|
|
|
+ def save_to_db(self, df, filename):
|
|
|
+ df.to_sql(name=str(self.batch_no), con=engine.connect(), index=False, if_exists='append',
|
|
|
+ chunksize=1000000)
|
|
|
+ trans_print("文件:", filename, "保存数据库成功")
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ trans_print("开始执行", self.name, self.trans_param.read_type)
|
|
|
+ self.__params_valid([self.name, self.batch_no, self.save_path, self.trans_param.read_type,
|
|
|
+ self.trans_param.read_path,
|
|
|
+ self.trans_param.time_col, self.trans_param.wind_col])
|
|
|
+ # 读取文件
|
|
|
+ try:
|
|
|
+ all_files = self.get_excel_files()
|
|
|
+ trans_print('读取文件数量:', len(all_files))
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception(e)
|
|
|
+ message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
|
|
|
+ update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
|
|
|
+ raise e
|
|
|
+
|
|
|
+ # 开始读取数据
|
|
|
+ df = pd.DataFrame()
|
|
|
+ for file in all_files:
|
|
|
+ try:
|
|
|
+ df = pd.concat([df, self.read_excel_to_df(file)])
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception(e)
|
|
|
+ message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
|
|
|
+ update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
|
|
|
+ raise e
|
|
|
+
|
|
|
+ # 转换字段
|
|
|
+ if self.trans_param.cols_tran:
|
|
|
+ cols_tran = self.trans_param.cols_tran
|
|
|
+ real_cols_trans = dict()
|
|
|
+ for k, v in cols_tran.items():
|
|
|
+ if v:
|
|
|
+ real_cols_trans[v] = k
|
|
|
+
|
|
|
+ logger.info("包含转换字段,开始处理转换字段")
|
|
|
+ df.rename(columns=real_cols_trans, inplace=True)
|
|
|
+ if self.trans_param.wind_col in real_cols_trans.keys():
|
|
|
+ self.trans_param.wind_col = real_cols_trans[self.trans_param.wind_col]
|
|
|
+
|
|
|
+ for k in cols_tran.keys():
|
|
|
+ if k not in df.columns:
|
|
|
+ df[k] = None
|
|
|
+
|
|
|
+ # 添加年月日
|
|
|
+ if self.trans_param.time_col:
|
|
|
+ logger.info("包含时间字段,开始处理时间字段,添加年月日")
|
|
|
+ df[self.trans_param.time_col] = pd.to_datetime(df[self.trans_param.time_col])
|
|
|
+ df['year'] = df[self.trans_param.time_col].dt.year
|
|
|
+ df['month'] = df[self.trans_param.time_col].dt.month
|
|
|
+ df['day'] = df[self.trans_param.time_col].dt.day
|
|
|
+ df.sort_values(by=self.trans_param.time_col, inplace=True)
|
|
|
+ logger.info("处理时间字段结束")
|
|
|
+
|
|
|
+ # 开始保存
|
|
|
+ try:
|
|
|
+ names = set(df[self.trans_param.wind_col])
|
|
|
+ trans_print(names, self.trans_param.wind_col)
|
|
|
+ for filename in names:
|
|
|
+ self.save_to_csv(df[df[self.trans_param.wind_col] == filename], filename)
|
|
|
+ # self.save_to_db(df[df[self.trans_param.wind_col] == filename], filename)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception(e)
|
|
|
+ message = "保存文件错误:" + self.save_path + ",系统返回错误:" + str(e)
|
|
|
+ update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
|
|
|
+ raise e
|
|
|
+
|
|
|
+ update_transe_status(self.batch_no, self.trans_param.read_type, "success", "")
|
|
|
+
|
|
|
+ if self.is_zip:
|
|
|
+ trans_print("开始删除解压进临时文件夹")
|
|
|
+ shutil.rmtree(self.__get_zip_tmp_path())
|
|
|
+ trans_print("删除解压进临时文件夹删除成功")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ aa = WindFarms("test", "test_path")
|
|
|
+ aa.run()
|