123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import datetime
- import json
- import logging
- import multiprocessing
- import os
- import traceback
- import sys
- import numpy as np
- import pandas as pd
- from sqlalchemy import create_engine
- engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
- base_dir = r'/data/logs/104'
- save_dir = base_dir + os.sep + 'second'
- log_dir = base_dir + os.sep + 'logs' + os.sep + 'second'
- def create_dir(save_dir, is_file=False):
- if is_file:
- save_dir = os.path.dirname(save_dir)
- os.makedirs(save_dir, exist_ok=True)
- def init_log():
- logger = logging.getLogger("104data")
- logger.setLevel(logging.INFO)
- stout_handle = logging.StreamHandler(sys.stdout)
- stout_handle.setFormatter(
- logging.Formatter("%(asctime)s: %(message)s"))
- stout_handle.setLevel(logging.INFO)
- logger.addHandler(stout_handle)
- create_dir(log_dir)
- file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log'
- file_handler = logging.FileHandler(file_name, encoding='utf-8')
- file_handler.setFormatter(
- logging.Formatter("%(asctime)s: %(message)s"))
- file_handler.setLevel(logging.INFO)
- logger.addHandler(file_handler)
- file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log'
- file_handler = logging.FileHandler(file_name, encoding='utf-8')
- file_handler.setFormatter(
- logging.Formatter("%(asctime)s: %(message)s"))
- file_handler.setLevel(logging.ERROR)
- logger.addHandler(file_handler)
- return logger
- logger = init_log()
- def get_all_mesurement_conf():
- sql = "select * from measurement_conf "
- return pd.read_sql(sql, engine)
- def get_all_mesurepoint_conf():
- sql = "select * from measurepoint_conf t where t.status = 1"
- return pd.read_sql(sql, engine)
- def df_value_to_dict(df, key='col1', value='col2'):
- """
- :param df: dataframe
- :param key: 字典的key,如果重复,则返回
- :param value: 字典的value
- :return:
- """
- result_dict = dict()
- for k, v in zip(df[key], df[value]):
- if k in result_dict.keys():
- if type(result_dict[k]) == list:
- result_dict[k].append(v)
- else:
- result_dict[k] = [result_dict[k]]
- result_dict[k].append(v)
- else:
- result_dict[k] = v
- return result_dict
- def info_print(*kwargs):
- message = " ".join([str(i) for i in kwargs])
- logger.info(message)
- def error_print(*kwargs):
- message = " ".join([str(i) for i in kwargs])
- logger.error(message)
- def exists_table(table_name):
- sql = f"SELECT * FROM information_schema.tables WHERE table_schema = 'appoint' AND table_name = '{table_name}'"
- info_print(sql)
- table_df = pd.read_sql_query(sql, engine)
- if table_df.empty:
- return False
- return True
- def get_data_and_save_file(table_name, save_path, measurepoint_use_dict):
- if not exists_table(table_name):
- error_print(f"{table_name} 表不存在")
- else:
- df_sql = f"SELECT * FROM {table_name}"
- info_print(df_sql)
- df = pd.read_sql_query(df_sql, engine)
- info_print(df.shape)
- data_dict = dict()
- for receive_time, information_object_data in zip(df['receive_time'],
- df['information_object_data']):
- json_data = json.loads(information_object_data)
- for k, v in json_data.items():
- k = int(k)
- wind_num = k // 103 + 1
- mesurepoint_num = k % 103
- if wind_num not in data_dict.keys():
- data_dict[wind_num] = dict()
- if receive_time not in data_dict[wind_num].keys():
- data_dict[wind_num][receive_time] = dict()
- if mesurepoint_num in measurepoint_use_dict.keys():
- data_dict[wind_num][receive_time][mesurepoint_num] = v
- datas = list()
- for wind_num, data in data_dict.items():
- for receive_time, mesurepoint_data in data.items():
- data = [wind_num, receive_time]
- for point_num in measurepoint_use_dict.keys():
- data.append(mesurepoint_data[point_num] if point_num in mesurepoint_data.keys() else np.nan)
- if len(data) > 2:
- datas.append(data)
- cols = ['风机编号', '时间']
- cols.extend(measurepoint_use_dict.values())
- result_df = pd.DataFrame(data=datas, columns=cols)
- result_df.sort_values(by=['风机编号', '时间'])
- create_dir(save_path, True)
- result_df.to_csv(save_path, encoding='utf8', index=False, compression='gzip')
- info_print("文件", save_path, '保存成功')
- if __name__ == '__main__':
- info_print("开始执行")
- begin = datetime.datetime.now()
- try:
- measurepoint_conf_df = get_all_mesurepoint_conf()
- measurepoint_use_dict = df_value_to_dict(measurepoint_conf_df, 'id', 'name')
- yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
- measurement_conf_df = get_all_mesurement_conf()
- tables = list()
- for id, measurement_wind_field in zip(measurement_conf_df['id'], measurement_conf_df['measurement_wind_field']):
- tables.append(
- (f'{yestoday}_{id}', os.path.join(save_dir, measurement_wind_field, yestoday[0:4], yestoday[0:6],
- yestoday + '.csv.gz')))
- with multiprocessing.Pool(len(tables)) as pool:
- pool.starmap(get_data_and_save_file, [(t[0], t[1], measurepoint_use_dict) for t in tables])
- except Exception as e:
- error_print(traceback.format_exc())
- raise e
- info_print("执行结束,总耗时:", datetime.datetime.now() - begin)
|