陕西建工陕西智华.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import os
  2. import sys
  3. from concurrent.futures.thread import ThreadPoolExecutor
  4. sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
  5. import datetime
  6. import multiprocessing
  7. import pandas as pd
  8. from utils.file.trans_methods import read_files, copy_to_new, read_excel_files, read_file_to_df
  9. from utils.zip.unzip import get_desc_path, unzip
  10. def get_real_path(win_path):
  11. return win_path.replace(r'Z:', r'/data/download').replace("\\", '/')
  12. def unzip_or_remove(file, tmp_dir):
  13. if str(file).endswith("zip"):
  14. unzip(file, tmp_dir)
  15. else:
  16. copy_to_new(file, file.replace(file, tmp_dir))
  17. def read_file_to_df_and_select(file_path):
  18. select_cols = ['Timestamp', 'Los', 'Distance', 'HWS(hub)', 'HWS(hub)status', 'DIR(hub)', 'DIR(hub)status']
  19. df = read_file_to_df(file_path, read_cols=select_cols)
  20. condition1 = df['HWS(hub)status'] > 0.8
  21. condition2 = df['DIR(hub)status'] > 0.8
  22. condition3 = df['Distance'].isin([70, 90])
  23. df = df[condition1 & condition2 & condition3]
  24. return df
  25. def read_month_data_and_select(month, files, gonglv_df):
  26. with ThreadPoolExecutor(max_workers=10) as executor:
  27. dfs = list(executor.map(read_file_to_df_and_select, files))
  28. df = pd.concat(dfs, ignore_index=True)
  29. df['Time1'] = df['Timestamp'].apply(lambda x: x.split('.')[0])
  30. df['Time1'] = pd.to_datetime(df['Time1'], errors='coerce')
  31. df['Time1'] = df['Time1'].apply(
  32. lambda x: x + datetime.timedelta(seconds=10 - x.second % 10) if x.second % 10 != 0 else x)
  33. del gonglv_df['month']
  34. result_df = pd.merge(df, gonglv_df, left_on='Time1', right_on='Time1')
  35. result_df.sort_values(by='Time1', inplace=True)
  36. save_dir = get_real_path('Z:\偏航误差验证数据\整理结果')
  37. # save_dir = r'D:\data\pianhang\result'
  38. result_df.to_csv(os.path.join(save_dir, f'{month}.csv'), encoding='utf8', index=False)
  39. if __name__ == '__main__':
  40. read_dir = 'Z:\偏航误差验证数据\新华佳县雷达数据'
  41. read_dir = get_real_path(read_dir)
  42. tmp_dir = get_real_path(r'Z:\偏航误差验证数据\tmp_data')
  43. gonglv_dir = get_real_path(r'Z:\偏航误差验证数据\陕西建工陕西智华\report\output')
  44. # read_dir = r'D:\data\pianhang\1'
  45. # tmp_dir = r'D:\data\pianhang\tmp'
  46. # gonglv_dir = r'D:\data\pianhang\2'
  47. gonglv_files = read_excel_files(gonglv_dir)
  48. with multiprocessing.Pool(20) as pool:
  49. dfs = pool.starmap(read_file_to_df, [(i, ['collect_time', 'a0216']) for i in gonglv_files])
  50. gonglv_df = pd.concat(dfs, ignore_index=True)
  51. gonglv_df.columns = ['Time1', '功率']
  52. gonglv_df['Time1'] = pd.to_datetime(gonglv_df['Time1'], errors='coerce')
  53. gonglv_df['month'] = gonglv_df['Time1'].dt.month
  54. all_files = read_files(tmp_dir)
  55. all_files = [i for i in all_files if str(os.path.basename(i)).startswith('WindSpeed2024')]
  56. # with multiprocessing.Pool(20) as pool:
  57. # pool.starmap(unzip_or_remove, [(file, tmp_dir) for file in all_files])
  58. month_map = dict()
  59. for file in all_files:
  60. base_name = os.path.basename(file)
  61. month = base_name[13:15]
  62. if month in month_map.keys():
  63. month_map[month].append(file)
  64. else:
  65. month_map[month] = [file]
  66. excel_files = read_excel_files(tmp_dir)
  67. with multiprocessing.Pool(5) as pool:
  68. pool.starmap(read_month_data_and_select,
  69. [(month, files, gonglv_df[gonglv_df['month'] == int(month)]) for month, files in
  70. month_map.items()])