# -*- coding: utf-8 -*- """ Spyder 编辑器 这是一个临时脚本文件。 """ import copy import datetime import multiprocessing from os import * import numpy as np import pandas as pd dianjian_str = """ wind_turbine_number time_stamp 时间 active_power 有功功率 kW rotor_speed 风轮转速 rpm generator_speed 发电机转速 rpm wind_velocity 风速 m/s pitch_angle_blade_1 叶片1角度 ° pitch_angle_blade_2 叶片2角度 ° pitch_angle_blade_3 叶片3角度 ° cabin_position 机舱位置 ° true_wind_direction yaw_error1 风向 ° twisted_cable_angle main_bearing_temperature 主轴温度 ℃ gearbox_oil_temperature 齿轮箱温度 ℃ gearbox_low_speed_shaft_bearing_temperature 齿轮箱轴承温度 ℃ gearboxmedium_speed_shaftbearing_temperature gearbox_high_speed_shaft_bearing_temperature 齿轮箱轴承温度2 ℃ generatordrive_end_bearing_temperature 发电机驱动侧轴承温度 ℃ generatornon_drive_end_bearing_temperature 发电机非驱动侧轴承温度 ℃ cabin_temperature 机舱温度 ℃ outside_cabin_temperature 舱外温度 ℃ generator_winding1_temperature generator_winding2_temperature generator_winding3_temperature front_back_vibration_of_the_cabin side_to_side_vibration_of_the_cabin required_gearbox_speed inverter_speed_master_control actual_torque given_torque clockwise_yaw_count counterclockwise_yaw_count unusable power_curve_available set_value_of_active_power 有功功率设定 kW wind_turbine_status wind_turbine_status2 turbulence_intensity """ datas = [i for i in dianjian_str.split("\n") if i] dianjian_dict = dict() for data in datas: ds = data.split("\t") if len(ds) == 3: dianjian_dict[ds[0]] = ds[2] else: dianjian_dict[ds[0]] = '' def read_df(file_path): df = pd.read_csv(file_path, header=[0, 1]) col_nams_map = dict() pre_col = "" for tuple_col in df.columns: col1 = tuple_col[0] col2 = tuple_col[1] if str(col1).startswith("Unnamed"): if pre_col: col1 = pre_col pre_col = '' else: col1 = '' else: pre_col = col1 if str(col2).startswith("Unnamed"): col2 = '' col_nams_map[str(tuple_col)] = ''.join([col1, col2]) print(col_nams_map) for k, v in col_nams_map.items(): if str(v).endswith('采样值'): col_nams_map[k] = str(v)[:-3] df.columns = [str(col) for col in df.columns] df.rename(columns=col_nams_map, inplace=True) for col, name in dianjian_dict.items(): if name in df.columns: df.rename(columns={name: col}, inplace=True) for col in df.columns: if col not in dianjian_dict.keys(): del df[col] return df def get_wind_name_files(path): files = listdir(path) wind_files_map = dict() for file in files: full_file = path.join(path, file) file_datas = str(file).split("@") key = file_datas[0].replace("HD", "HD2") if key in wind_files_map.keys(): wind_files_map[key].append(full_file) else: wind_files_map[key] = [full_file] return wind_files_map def combine_df(save_path, wind_name, files): begin = datetime.datetime.now() df = pd.DataFrame() for file in files: query_df = read_df(file) print("读取", file, query_df.shape) query_df['time_stamp'] = pd.to_datetime(query_df['time_stamp']) query_df.set_index(keys='time_stamp', inplace=True) query_df = query_df[~query_df.index.duplicated(keep='first')] if df.empty: df = copy.deepcopy(query_df) else: df = pd.concat([df, query_df], join='inner') df.reset_index(inplace=True) df['wind_turbine_number'] = wind_name for col, name in dianjian_dict.items(): if col not in df.columns: df[col] = np.nan df = df[dianjian_dict.keys()] df.to_csv(path.join(save_path, wind_name + ".csv"), encoding='utf-8', index=False) print(wind_name, '整理完成', '耗时:', (datetime.datetime.now() - begin).seconds) if __name__ == '__main__': read_path = r'/data/download/collection_data/1进行中/诺木洪风电场-甘肃-华电/收资数据/sec' save_path = r'/data/download/collection_data/1进行中/诺木洪风电场-甘肃-华电/收资数据/sec_采样值' # read_path = r'D:\trans_data\诺木洪\收资数据\min' # save_path = r'D:\trans_data\诺木洪\清理数据\min' if not path.exists(save_path): makedirs(save_path, exist_ok=True) wind_files_map = get_wind_name_files(read_path) with multiprocessing.Pool(20) as pool: pool.starmap(combine_df, [(save_path, wind_name, files) for wind_name, files in wind_files_map.items()])