12345678910111213141516171819202122232425262728293031323334353637383940 |
- import datetime
- import multiprocessing
- import sys
- from os import *
- sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0])
- from service.trans_service import save_df_to_db, drop_table, creat_min_sec_table
- from utils.file.trans_methods import read_file_to_df, read_files
- def read_and_exec(file_path):
- begin = datetime.datetime.now()
- print("开始执行:", path.basename(file_path))
- df = read_file_to_df(file_path)
- df['yaw_error1'] = df['true_wind_direction'] - 180
- df.to_csv(file_path, index=False, encoding='utf8')
- creat_min_sec_table()
- save_df_to_db('WOF079200018-WOB000012_second', df)
- print("结束执行:", path.basename(file_path), ",耗时:", datetime.datetime.now() - begin)
- if __name__ == '__main__':
- begin = datetime.datetime.now()
- env = 'prod'
- if len(sys.argv) >= 2:
- env = sys.argv[1]
- conf_path = path.abspath(f"./conf/etl_config_{env}.yaml")
- environ['ETL_CONF'] = conf_path
- environ['env'] = env
- drop_table("WOF079200018-WOB000012_second")
- read_dir = r'/data/download/collection_data/2完成/吉山风电场-江西-大唐/清理数据/WOF079200018-WOB000012_JS一期1秒24.8-10/second'
- all_files = read_files(read_dir)
- with multiprocessing.Pool(24) as pool:
- pool.map(read_and_exec, all_files)
- print("总耗时:", datetime.datetime.now() - begin)
|