1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import multiprocessing
- import sys
- from os import *
- import pandas as pd
- sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0])
- def hebing_and_save(new_batch_save_path, name, paths):
- df = pd.DataFrame()
- for path in paths:
- now_df = read_file_to_df(path)
- df = pd.concat([df, now_df])
- df.sort_values(by=['time_stamp'], inplace=True)
- create_file_path(new_batch_save_path)
- df.to_csv(path.join(new_batch_save_path, name), index=False, encoding='utf8')
- if __name__ == '__main__':
- env = 'prod'
- if len(sys.argv) >= 2:
- env = sys.argv[1]
- from utils.conf.read_conf import yaml_conf
- conf_path = path.abspath(__file__).split("tmp_file")[0] + f"/conf/etl_config_{env}.yaml"
- environ['ETL_CONF'] = conf_path
- yaml_config = yaml_conf(conf_path)
- environ['env'] = env
- from utils.file.trans_methods import read_file_to_df, create_file_path
- from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
- from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
- from service.plt_service import get_hebing_data_by_batch_no_and_type
- save_batch = 'WOF085500008-2-3'
- save_batch_name = '合并'
- trans_type = 'second'
- read_batchs = ['WOF085500008-WOB000003', 'WOF085500008-WOB000002']
- read_paths = list()
- new_batch_save_path = ''
- for read_data in read_batchs:
- data = get_hebing_data_by_batch_no_and_type(read_data, trans_type)
- save_db = True
- exec_process = None
- if data['transfer_type'] in ['second', 'minute']:
- exec_process = MinSecTrans(data=data, save_db=save_db)
- if data['transfer_type'] in ['fault', 'warn']:
- exec_process = FaultWarnTrans(data=data, save_db=save_db)
- if exec_process is None:
- raise Exception("No exec process")
- read_paths.append(exec_process.pathsAndTable.get_save_path())
- new_batch_save_path = path.join(exec_process.pathsAndTable.save_path, save_batch + "_" + save_batch_name,
- trans_type)
- file_dict = dict()
- for read_path in read_paths:
- for file in listdir(read_path):
- if file in file_dict:
- file_dict[file].append(path.join(read_path, file))
- else:
- file_dict[file] = [path.join(read_path, file)]
- with multiprocessing.Pool(len(file_dict.keys())) as pool:
- pool.starmap(hebing_and_save, [(new_batch_save_path, name, paths) for name, paths in file_dict.items()])
|