import datetime import json import logging import multiprocessing import os import traceback import sys import numpy as np import pandas as pd from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint') base_dir = r'/data/logs/104' save_dir = base_dir + os.sep + 'second' log_dir = base_dir + os.sep + 'logs' + os.sep + 'second' def create_dir(save_dir, is_file=False): if is_file: save_dir = os.path.dirname(save_dir) os.makedirs(save_dir, exist_ok=True) def init_log(): logger = logging.getLogger("104data") logger.setLevel(logging.INFO) stout_handle = logging.StreamHandler(sys.stdout) stout_handle.setFormatter( logging.Formatter("%(asctime)s: %(message)s")) stout_handle.setLevel(logging.INFO) logger.addHandler(stout_handle) create_dir(log_dir) file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log' file_handler = logging.FileHandler(file_name, encoding='utf-8') file_handler.setFormatter( logging.Formatter("%(asctime)s: %(message)s")) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log' file_handler = logging.FileHandler(file_name, encoding='utf-8') file_handler.setFormatter( logging.Formatter("%(asctime)s: %(message)s")) file_handler.setLevel(logging.ERROR) logger.addHandler(file_handler) return logger logger = init_log() def get_all_mesurement_conf(): sql = "select * from measurement_conf " return pd.read_sql(sql, engine) def get_all_mesurepoint_conf(): sql = "select * from measurepoint_conf t where t.status = 1" return pd.read_sql(sql, engine) def df_value_to_dict(df, key='col1', value='col2'): """ :param df: dataframe :param key: 字典的key,如果重复,则返回 :param value: 字典的value :return: """ result_dict = dict() for k, v in zip(df[key], df[value]): if k in result_dict.keys(): if type(result_dict[k]) == list: result_dict[k].append(v) else: result_dict[k] = [result_dict[k]] result_dict[k].append(v) else: result_dict[k] = v return result_dict def info_print(*kwargs): message = " ".join([str(i) for i in kwargs]) logger.info(message) def error_print(*kwargs): message = " ".join([str(i) for i in kwargs]) logger.error(message) def exists_table(table_name): sql = f"SELECT * FROM information_schema.tables WHERE table_schema = 'appoint' AND table_name = '{table_name}'" info_print(sql) table_df = pd.read_sql_query(sql, engine) if table_df.empty: return False return True def get_data_and_save_file(table_name, save_path, measurepoint_use_dict): if not exists_table(table_name): error_print(f"{table_name} 表不存在") else: df_sql = f"SELECT * FROM {table_name}" info_print(df_sql) df = pd.read_sql_query(df_sql, engine) info_print(df.shape) data_dict = dict() for receive_time, information_object_data in zip(df['receive_time'], df['information_object_data']): json_data = json.loads(information_object_data) for k, v in json_data.items(): k = int(k) wind_num = k // 103 + 1 mesurepoint_num = k % 103 if wind_num not in data_dict.keys(): data_dict[wind_num] = dict() if receive_time not in data_dict[wind_num].keys(): data_dict[wind_num][receive_time] = dict() if mesurepoint_num in measurepoint_use_dict.keys(): data_dict[wind_num][receive_time][mesurepoint_num] = v datas = list() for wind_num, data in data_dict.items(): for receive_time, mesurepoint_data in data.items(): data = [wind_num, receive_time] for point_num in measurepoint_use_dict.keys(): data.append(mesurepoint_data[point_num] if point_num in mesurepoint_data.keys() else np.nan) if len(data) > 2: datas.append(data) cols = ['风机编号', '时间'] cols.extend(measurepoint_use_dict.values()) result_df = pd.DataFrame(data=datas, columns=cols) result_df.sort_values(by=['风机编号', '时间']) create_dir(save_path, True) result_df.to_csv(save_path, encoding='utf8', index=False, compression='gzip') info_print("文件", save_path, '保存成功') if __name__ == '__main__': info_print("开始执行") begin = datetime.datetime.now() try: measurepoint_conf_df = get_all_mesurepoint_conf() measurepoint_use_dict = df_value_to_dict(measurepoint_conf_df, 'id', 'name') yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d') measurement_conf_df = get_all_mesurement_conf() tables = list() for id, measurement_wind_field in zip(measurement_conf_df['id'], measurement_conf_df['measurement_wind_field']): tables.append( (f'{yestoday}_{id}', os.path.join(save_dir, measurement_wind_field, yestoday[0:4], yestoday[0:6], yestoday + '.csv.gz'))) with multiprocessing.Pool(len(tables)) as pool: pool.starmap(get_data_and_save_file, [(t[0], t[1], measurepoint_use_dict) for t in tables]) except Exception as e: error_print(traceback.format_exc()) raise e info_print("执行结束,总耗时:", datetime.datetime.now() - begin)