import multiprocessing import os import pandas as pd import sys sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0]) def hebing_and_save(new_batch_save_path, name, paths): df = pd.DataFrame() for path in paths: now_df = read_file_to_df(path) df = pd.concat([df, now_df]) df.sort_values(by=['time_stamp'], inplace=True) create_file_path(new_batch_save_path) df.to_csv(os.path.join(new_batch_save_path, name), index=False, encoding='utf8') if __name__ == '__main__': env = 'prod' if len(sys.argv) >= 2: env = sys.argv[1] from utils.conf.read_conf import yaml_conf conf_path = os.path.abspath(__file__).split("tmp_file")[0] + f"/conf/etl_config_{env}.yaml" os.environ['ETL_CONF'] = conf_path yaml_config = yaml_conf(conf_path) os.environ['env'] = env from utils.file.trans_methods import read_file_to_df, create_file_path from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans from etl.wind_power.min_sec.MinSecTrans import MinSecTrans from service.plt_service import get_hebing_data_by_batch_no_and_type save_batch = 'WOF085500008-2-3' save_batch_name = '合并' trans_type = 'second' read_batchs = ['WOF085500008-WOB000003', 'WOF085500008-WOB000002'] read_paths = list() new_batch_save_path = '' for read_data in read_batchs: data = get_hebing_data_by_batch_no_and_type(read_data, trans_type) save_db = True exec_process = None if data['transfer_type'] in ['second', 'minute']: exec_process = MinSecTrans(data=data, save_db=save_db) if data['transfer_type'] in ['fault', 'warn']: exec_process = FaultWarnTrans(data=data, save_db=save_db) if exec_process is None: raise Exception("No exec process") read_paths.append(exec_process.pathsAndTable.get_save_path()) new_batch_save_path = os.path.join(exec_process.pathsAndTable.save_path, save_batch + "_" + save_batch_name, trans_type) file_dict = dict() for read_path in read_paths: for file in os.listdir(read_path): if file in file_dict: file_dict[file].append(os.path.join(read_path, file)) else: file_dict[file] = [os.path.join(read_path, file)] with multiprocessing.Pool(len(file_dict.keys())) as pool: pool.starmap(hebing_and_save, [(new_batch_save_path, name, paths) for name, paths in file_dict.items()])