123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- import datetime
- import multiprocessing
- import os
- import traceback
- import pandas as pd
- from etl.base import TransParam
- from etl.base.PathsAndTable import PathsAndTable
- from service.plt_service import update_trans_transfer_progress
- from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
- create_file_path, read_file_to_df
- from utils.log.trans_log import trans_print
- from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
- class ReadAndSaveTmp(object):
- def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
- self.pathsAndTable = pathsAndTable
- self.trans_param = trans_param
- self.exist_wind_names = multiprocessing.Manager().list()
- self.lock = multiprocessing.Manager().Lock()
- def _save_to_tmp_csv_by_name(self, df, name):
- save_name = str(name) + '.csv'
- save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
- create_file_path(save_path, is_file_path=True)
- with self.lock:
- if name in self.exist_wind_names:
- contains_name = True
- else:
- contains_name = False
- self.exist_wind_names.append(name)
- if contains_name:
- df.to_csv(save_path, index=False, encoding='utf8', mode='a',
- header=False)
- else:
- df.to_csv(save_path, index=False, encoding='utf8')
- def df_save_to_tmp_file(self, df=pd.DataFrame()):
- if self.trans_param.is_vertical_table:
- pass
- else:
- # 转换字段
- same_col = {}
- if self.trans_param.cols_tran:
- cols_tran = self.trans_param.cols_tran
- real_cols_trans = dict()
- for k, v in cols_tran.items():
- if v and not v.startswith("$"):
- if v not in real_cols_trans.keys():
- real_cols_trans[v] = k
- else:
- value = real_cols_trans[v]
- if value in same_col.keys():
- same_col[value].append(k)
- else:
- same_col[value] = [k]
- trans_print("包含转换字段,开始处理转换字段")
- df.rename(columns=real_cols_trans, inplace=True)
- # 添加使用同一个excel字段的值
- for key in same_col.keys():
- for col in same_col[key]:
- df[col] = df[key]
- del_keys = set(df.columns) - set(cols_tran.keys())
- for key in del_keys:
- df.drop(key, axis=1, inplace=True)
- df = del_blank(df, ['wind_turbine_number'])
- df = df[df['time_stamp'].isna() == False]
- if self.trans_param.wind_name_exec:
- exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
- df['wind_turbine_number'] = eval(exec_str)
- self.save_to_tmp_csv(df)
- def save_to_tmp_csv(self, df):
- names = set(df['wind_turbine_number'].values)
- if names:
- trans_print("开始保存", str(names), "到临时文件")
- with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
- pool.starmap(self._save_to_tmp_csv_by_name,
- [(df[df['wind_turbine_number'] == name], name) for name in names])
- del df
- trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
- def read_file_and_save_tmp(self):
- all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
- if self.trans_param.merge_columns:
- dfs_list = list()
- index_keys = [self.trans_param.cols_tran['time_stamp']]
- wind_col = self.trans_param.cols_tran['wind_turbine_number']
- if str(wind_col).startswith("$"):
- wind_col = 'wind_turbine_number'
- index_keys.append(wind_col)
- df_map = dict()
- # todo 需要优化
- with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
- dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
- for df in dfs:
- key = '-'.join(df.columns)
- if key in df_map.keys():
- df_map[key] = pd.concat([df_map[key], df])
- else:
- df_map[key] = df
- for k, df in df_map.items():
- df.drop_duplicates(inplace=True)
- df.set_index(keys=index_keys, inplace=True)
- df = df[~df.index.duplicated(keep='first')]
- dfs_list.append(df)
- df = pd.concat(dfs_list, axis=1)
- df.reset_index(inplace=True)
- try:
- self.df_save_to_tmp_file(df)
- except Exception as e:
- trans_print(traceback.format_exc())
- message = "合并列出现错误:" + str(e)
- raise ValueError(message)
- else:
- split_count = use_files_get_max_cpu_count(all_files)
- all_arrays = split_array(all_files, split_count)
- for index, arr in enumerate(all_arrays):
- try:
- with multiprocessing.Pool(split_count) as pool:
- dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
- for df in dfs:
- self.df_save_to_tmp_file(df)
- except Exception as e:
- trans_print(traceback.format_exc())
- message = "整理临时文件,系统返回错误:" + str(e)
- raise ValueError(message)
- update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
- round(20 + 30 * (index + 1) / len(all_arrays), 2),
- self.pathsAndTable.save_db)
- def read_excel_to_df(self, file_path):
- read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
- trans_dict = {}
- for k, v in self.trans_param.cols_tran.items():
- if v and not str(v).startswith("$"):
- trans_dict[v] = k
- if self.trans_param.is_vertical_table:
- vertical_cols = self.trans_param.vertical_cols
- df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header)
- df = df[df[self.trans_param.vertical_key].isin(read_cols)]
- df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
- self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
- df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
- df[self.trans_param.vertical_key])
- return df
- else:
- trans_dict = dict()
- for k, v in self.trans_param.cols_tran.items():
- if v and v.startswith("$") or v.find(",") > 0:
- trans_dict[v] = k
- if self.trans_param.merge_columns:
- df = read_file_to_df(file_path, header=self.trans_param.header)
- else:
- if self.trans_param.need_valid_cols:
- df = read_file_to_df(file_path, read_cols, header=self.trans_param.header)
- else:
- df = read_file_to_df(file_path, header=self.trans_param.header)
- # 处理列名前缀问题
- if self.trans_param.resolve_col_prefix:
- columns_dict = dict()
- for column in df.columns:
- columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
- df.rename(columns=columns_dict, inplace=True)
- for k, v in trans_dict.items():
- if k.startswith("$file"):
- file = ".".join(os.path.basename(file_path).split(".")[0:-1])
- if k == "$file":
- df[v] = str(file)
- elif k.startswith("$file["):
- datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
- if len(datas) != 2:
- raise Exception("字段映射出现错误 :" + str(trans_dict))
- df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
- elif k.find("$file_date") > 0:
- datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
- if len(datas) != 2:
- raise Exception("字段映射出现错误 :" + str(trans_dict))
- file = ".".join(os.path.basename(file_path).split(".")[0:-1])
- date_str = str(file[int(datas[0]):int(datas[1])]).strip()
- df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
- elif k.startswith("$folder"):
- folder = file_path
- cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
- for i in range(cengshu):
- folder = os.path.dirname(folder)
- df[v] = str(str(folder).split(os.sep)[-1]).strip()
- return df
- def run(self):
- trans_print("开始保存数据到临时文件")
- begin = datetime.datetime.now()
- self.read_file_and_save_tmp()
- update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 50,
- self.pathsAndTable.save_db)
- trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)
|