hebing_muti_batch.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import multiprocessing
  2. import sys
  3. from os import *
  4. import pandas as pd
  5. sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0])
  6. def hebing_and_save(new_batch_save_path, name, paths):
  7. df = pd.DataFrame()
  8. for path in paths:
  9. now_df = read_file_to_df(path)
  10. df = pd.concat([df, now_df])
  11. df.sort_values(by=['time_stamp'], inplace=True)
  12. create_file_path(new_batch_save_path)
  13. df.to_csv(path.join(new_batch_save_path, name), index=False, encoding='utf8')
  14. if __name__ == '__main__':
  15. env = 'prod'
  16. if len(sys.argv) >= 2:
  17. env = sys.argv[1]
  18. from utils.conf.read_conf import yaml_conf
  19. conf_path = path.abspath(__file__).split("tmp_file")[0] + f"/conf/etl_config_{env}.yaml"
  20. environ['ETL_CONF'] = conf_path
  21. yaml_config = yaml_conf(conf_path)
  22. environ['env'] = env
  23. from utils.file.trans_methods import read_file_to_df, create_file_path
  24. from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
  25. from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
  26. from service.plt_service import get_hebing_data_by_batch_no_and_type
  27. save_batch = 'WOF085500008-2-3'
  28. save_batch_name = '合并'
  29. trans_type = 'second'
  30. read_batchs = ['WOF085500008-WOB000003', 'WOF085500008-WOB000002']
  31. read_paths = list()
  32. new_batch_save_path = ''
  33. for read_data in read_batchs:
  34. data = get_hebing_data_by_batch_no_and_type(read_data, trans_type)
  35. save_db = True
  36. exec_process = None
  37. if data['transfer_type'] in ['second', 'minute']:
  38. exec_process = MinSecTrans(data=data, save_db=save_db)
  39. if data['transfer_type'] in ['fault', 'warn']:
  40. exec_process = FaultWarnTrans(data=data, save_db=save_db)
  41. if exec_process is None:
  42. raise Exception("No exec process")
  43. read_paths.append(exec_process.pathsAndTable.get_save_path())
  44. new_batch_save_path = path.join(exec_process.pathsAndTable.save_path, save_batch + "_" + save_batch_name,
  45. trans_type)
  46. file_dict = dict()
  47. for read_path in read_paths:
  48. for file in listdir(read_path):
  49. if file in file_dict:
  50. file_dict[file].append(path.join(read_path, file))
  51. else:
  52. file_dict[file] = [path.join(read_path, file)]
  53. with multiprocessing.Pool(len(file_dict.keys())) as pool:
  54. pool.starmap(hebing_and_save, [(new_batch_save_path, name, paths) for name, paths in file_dict.items()])