import datetime import multiprocessing import sys from os import * sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0]) from service.trans_service import save_df_to_db, drop_table, creat_min_sec_table from utils.file.trans_methods import read_file_to_df, read_files def read_and_exec(file_path): begin = datetime.datetime.now() print("开始执行:", path.basename(file_path)) df = read_file_to_df(file_path) df['yaw_error1'] = df['true_wind_direction'] - 180 df.to_csv(file_path, index=False, encoding='utf8') creat_min_sec_table() save_df_to_db('WOF079200018-WOB000012_second', df) print("结束执行:", path.basename(file_path), ",耗时:", datetime.datetime.now() - begin) if __name__ == '__main__': begin = datetime.datetime.now() env = 'prod' if len(sys.argv) >= 2: env = sys.argv[1] conf_path = path.abspath(f"./conf/etl_config_{env}.yaml") environ['ETL_CONF'] = conf_path environ['env'] = env drop_table("WOF079200018-WOB000012_second") read_dir = r'/data/download/collection_data/2完成/吉山风电场-江西-大唐/清理数据/WOF079200018-WOB000012_JS一期1秒24.8-10/second' all_files = read_files(read_dir) with multiprocessing.Pool(24) as pool: pool.map(read_and_exec, all_files) print("总耗时:", datetime.datetime.now() - begin)