吉山批次处理并重新存数据库.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. import datetime
  2. import multiprocessing
  3. import sys
  4. from os import *
  5. sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0])
  6. from service.trans_service import save_df_to_db, drop_table, creat_min_sec_table
  7. from utils.file.trans_methods import read_file_to_df, read_files
  8. def read_and_exec(file_path):
  9. begin = datetime.datetime.now()
  10. print("开始执行:", path.basename(file_path))
  11. df = read_file_to_df(file_path)
  12. df['yaw_error1'] = df['true_wind_direction'] - 180
  13. df.to_csv(file_path, index=False, encoding='utf8')
  14. creat_min_sec_table()
  15. save_df_to_db('WOF079200018-WOB000012_second', df)
  16. print("结束执行:", path.basename(file_path), ",耗时:", datetime.datetime.now() - begin)
  17. if __name__ == '__main__':
  18. begin = datetime.datetime.now()
  19. env = 'prod'
  20. if len(sys.argv) >= 2:
  21. env = sys.argv[1]
  22. conf_path = path.abspath(f"./conf/etl_config_{env}.yaml")
  23. environ['ETL_CONF'] = conf_path
  24. environ['env'] = env
  25. drop_table("WOF079200018-WOB000012_second")
  26. read_dir = r'/data/download/collection_data/2完成/吉山风电场-江西-大唐/清理数据/WOF079200018-WOB000012_JS一期1秒24.8-10/second'
  27. all_files = read_files(read_dir)
  28. with multiprocessing.Pool(24) as pool:
  29. pool.map(read_and_exec, all_files)
  30. print("总耗时:", datetime.datetime.now() - begin)