second_data.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import datetime
  2. import json
  3. import logging
  4. import multiprocessing
  5. import os
  6. import traceback
  7. import sys
  8. import numpy as np
  9. import pandas as pd
  10. from sqlalchemy import create_engine
  11. engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
  12. base_dir = r'/data/logs/104'
  13. save_dir = base_dir + os.sep + 'second'
  14. log_dir = base_dir + os.sep + 'logs' + os.sep + 'second'
  15. def create_dir(save_dir, is_file=False):
  16. if is_file:
  17. save_dir = os.path.dirname(save_dir)
  18. os.makedirs(save_dir, exist_ok=True)
  19. def init_log():
  20. logger = logging.getLogger("104data")
  21. logger.setLevel(logging.INFO)
  22. stout_handle = logging.StreamHandler(sys.stdout)
  23. stout_handle.setFormatter(
  24. logging.Formatter("%(asctime)s: %(message)s"))
  25. stout_handle.setLevel(logging.INFO)
  26. logger.addHandler(stout_handle)
  27. create_dir(log_dir)
  28. file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log'
  29. file_handler = logging.FileHandler(file_name, encoding='utf-8')
  30. file_handler.setFormatter(
  31. logging.Formatter("%(asctime)s: %(message)s"))
  32. file_handler.setLevel(logging.INFO)
  33. logger.addHandler(file_handler)
  34. file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log'
  35. file_handler = logging.FileHandler(file_name, encoding='utf-8')
  36. file_handler.setFormatter(
  37. logging.Formatter("%(asctime)s: %(message)s"))
  38. file_handler.setLevel(logging.ERROR)
  39. logger.addHandler(file_handler)
  40. return logger
  41. logger = init_log()
  42. def get_all_mesurement_conf():
  43. sql = "select * from measurement_conf "
  44. return pd.read_sql(sql, engine)
  45. def get_all_mesurepoint_conf():
  46. sql = "select * from measurepoint_conf t where t.status = 1"
  47. return pd.read_sql(sql, engine)
  48. def df_value_to_dict(df, key='col1', value='col2'):
  49. """
  50. :param df: dataframe
  51. :param key: 字典的key,如果重复,则返回
  52. :param value: 字典的value
  53. :return:
  54. """
  55. result_dict = dict()
  56. for k, v in zip(df[key], df[value]):
  57. if k in result_dict.keys():
  58. if type(result_dict[k]) == list:
  59. result_dict[k].append(v)
  60. else:
  61. result_dict[k] = [result_dict[k]]
  62. result_dict[k].append(v)
  63. else:
  64. result_dict[k] = v
  65. return result_dict
  66. def info_print(*kwargs):
  67. message = " ".join([str(i) for i in kwargs])
  68. logger.info(message)
  69. def error_print(*kwargs):
  70. message = " ".join([str(i) for i in kwargs])
  71. logger.error(message)
  72. def exists_table(table_name):
  73. sql = f"SELECT * FROM information_schema.tables WHERE table_schema = 'appoint' AND table_name = '{table_name}'"
  74. info_print(sql)
  75. table_df = pd.read_sql_query(sql, engine)
  76. if table_df.empty:
  77. return False
  78. return True
  79. def get_data_and_save_file(table_name, save_path, measurepoint_use_dict):
  80. if not exists_table(table_name):
  81. error_print(f"{table_name} 表不存在")
  82. else:
  83. df_sql = f"SELECT * FROM {table_name}"
  84. info_print(df_sql)
  85. df = pd.read_sql_query(df_sql, engine)
  86. info_print(df.shape)
  87. data_dict = dict()
  88. for receive_time, information_object_data in zip(df['receive_time'],
  89. df['information_object_data']):
  90. json_data = json.loads(information_object_data)
  91. for k, v in json_data.items():
  92. k = int(k)
  93. wind_num = k // 103 + 1
  94. mesurepoint_num = k % 103
  95. if wind_num not in data_dict.keys():
  96. data_dict[wind_num] = dict()
  97. if receive_time not in data_dict[wind_num].keys():
  98. data_dict[wind_num][receive_time] = dict()
  99. if mesurepoint_num in measurepoint_use_dict.keys():
  100. data_dict[wind_num][receive_time][mesurepoint_num] = v
  101. datas = list()
  102. for wind_num, data in data_dict.items():
  103. for receive_time, mesurepoint_data in data.items():
  104. data = [wind_num, receive_time]
  105. for point_num in measurepoint_use_dict.keys():
  106. data.append(mesurepoint_data[point_num] if point_num in mesurepoint_data.keys() else np.nan)
  107. if len(data) > 2:
  108. datas.append(data)
  109. cols = ['风机编号', '时间']
  110. cols.extend(measurepoint_use_dict.values())
  111. result_df = pd.DataFrame(data=datas, columns=cols)
  112. result_df.sort_values(by=['风机编号', '时间'])
  113. create_dir(save_path, True)
  114. result_df.to_csv(save_path, encoding='utf8', index=False, compression='gzip')
  115. info_print("文件", save_path, '保存成功')
  116. if __name__ == '__main__':
  117. info_print("开始执行")
  118. begin = datetime.datetime.now()
  119. try:
  120. measurepoint_conf_df = get_all_mesurepoint_conf()
  121. measurepoint_use_dict = df_value_to_dict(measurepoint_conf_df, 'id', 'name')
  122. yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
  123. measurement_conf_df = get_all_mesurement_conf()
  124. tables = list()
  125. for id, measurement_wind_field in zip(measurement_conf_df['id'], measurement_conf_df['measurement_wind_field']):
  126. tables.append(
  127. (f'{yestoday}_{id}', os.path.join(save_dir, measurement_wind_field, yestoday[0:4], yestoday[0:6],
  128. yestoday + '.csv.gz')))
  129. with multiprocessing.Pool(len(tables)) as pool:
  130. pool.starmap(get_data_and_save_file, [(t[0], t[1], measurepoint_use_dict) for t in tables])
  131. except Exception as e:
  132. error_print(traceback.format_exc())
  133. raise e
  134. info_print("执行结束,总耗时:", datetime.datetime.now() - begin)