123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/15
- # @Author : 魏志亮
- import datetime
- import shutil
- from base.TranseParam import TranseParam
- from utils.db.trans_mysql import *
- from utils.log.trans_log import logger
- from utils.trans_methods import *
- from utils.zip.unzip import unzip
- class WindFarms(object):
- def __init__(self, name, batch_no=None, save_path=None, params: TranseParam = None):
- self.name = name
- self.batch_no = batch_no
- self.save_path = save_path
- self.begin = datetime.datetime.now()
- self.next_time = datetime.datetime.now()
- self.is_zip = False
- self.save_zip = False
- self.trans_param = params
- def set_trans_param(self, params: TranseParam):
- self.trans_param = params
- def __params_valid(self, not_null_list=list()):
- for arg in not_null_list:
- if arg is None or arg == '':
- raise Exception("Invalid param set :" + arg)
- def __get_save_path(self):
- return os.path.join(self.save_path, self.name, self.batch_no)
- def __get_zip_tmp_path(self):
- return os.path.join(self.__get_save_path(), 'save_tmp')
- def __get_read_tmp_path(self):
- return os.path.join(self.__get_save_path(), 'read_tmp')
- def get_excel_files(self):
- if self.is_zip:
- is_success, e = unzip(self.trans_param.read_path, self.__get_zip_tmp_path())
- if is_success:
- self.trans_param.read_path = self.__get_zip_tmp_path()
- else:
- raise e
- return read_excel_files(self.trans_param.read_path)
- def read_excel_to_df(self, file):
- return read_file_to_df(file, self.trans_param.read_cols)
- def save_to_csv(self, df, filename):
- save_name = str(filename) + ('.csv' if self.save_zip else '.csv.gz')
- save_path = os.path.join(self.save_path, self.name, self.batch_no, self.trans_param.read_type,
- save_name)
- create_file_path(save_path, is_file_path=True)
- if self.save_zip:
- df[df[self.trans_param.wind_col] == filename].to_csv(save_path, compression='.gzip', index=False)
- else:
- df[df[self.trans_param.wind_col] == filename].to_csv(save_path, index=False)
- trans_print("保存" + str(filename) + ".csv成功")
- def save_to_db(self, df, filename):
- df.to_sql(name=str(self.batch_no), con=engine.connect(), index=False, if_exists='append',
- chunksize=1000000)
- trans_print("文件:", filename, "保存数据库成功")
- def run(self):
- trans_print("开始执行", self.name, self.trans_param.read_type)
- self.__params_valid([self.name, self.batch_no, self.save_path, self.trans_param.read_type,
- self.trans_param.read_path,
- self.trans_param.time_col, self.trans_param.wind_col])
- # 读取文件
- try:
- all_files = self.get_excel_files()
- trans_print('读取文件数量:', len(all_files))
- except Exception as e:
- logger.exception(e)
- message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
- update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
- raise e
- # 开始读取数据
- df = pd.DataFrame()
- for file in all_files:
- try:
- df = pd.concat([df, self.read_excel_to_df(file)])
- except Exception as e:
- logger.exception(e)
- message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
- update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
- raise e
- # 转换字段
- if self.trans_param.cols_tran:
- cols_tran = self.trans_param.cols_tran
- real_cols_trans = dict()
- for k, v in cols_tran.items():
- if v:
- real_cols_trans[v] = k
- logger.info("包含转换字段,开始处理转换字段")
- df.rename(columns=real_cols_trans, inplace=True)
- if self.trans_param.wind_col in real_cols_trans.keys():
- self.trans_param.wind_col = real_cols_trans[self.trans_param.wind_col]
- for k in cols_tran.keys():
- if k not in df.columns:
- df[k] = None
- # 添加年月日
- if self.trans_param.time_col:
- logger.info("包含时间字段,开始处理时间字段,添加年月日")
- df[self.trans_param.time_col] = pd.to_datetime(df[self.trans_param.time_col])
- df['year'] = df[self.trans_param.time_col].dt.year
- df['month'] = df[self.trans_param.time_col].dt.month
- df['day'] = df[self.trans_param.time_col].dt.day
- df.sort_values(by=self.trans_param.time_col, inplace=True)
- logger.info("处理时间字段结束")
- # 开始保存
- try:
- names = set(df[self.trans_param.wind_col])
- trans_print(names, self.trans_param.wind_col)
- for filename in names:
- self.save_to_csv(df[df[self.trans_param.wind_col] == filename], filename)
- # self.save_to_db(df[df[self.trans_param.wind_col] == filename], filename)
- except Exception as e:
- logger.exception(e)
- message = "保存文件错误:" + self.save_path + ",系统返回错误:" + str(e)
- update_transe_status(self.batch_no, self.trans_param.read_type, "error", message)
- raise e
- update_transe_status(self.batch_no, self.trans_param.read_type, "success", "")
- if self.is_zip:
- trans_print("开始删除解压进临时文件夹")
- shutil.rmtree(self.__get_zip_tmp_path())
- trans_print("删除解压进临时文件夹删除成功")
- if __name__ == '__main__':
- aa = WindFarms("test", "test_path")
- aa.run()
|