import datetime from service.common_connect import plt def update_timeout_wave_trans_data(): sql = """ UPDATE wave_data_transfer SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2 WHERE TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6 AND trans_sys_status = 0 """ plt.execute(sql) def update_wave_trans_status_running(id, schedule_exec=True): if schedule_exec: exec_sql = """ update wave_data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='', engine_count =0,time_granularity=0,transfer_finish_time=null, data_min_time= null,data_max_time= null,transfer_data_count=null where id = %s """ plt.execute(exec_sql, id) def update_wave_trans_status_error(id, message="", save_db=True): if save_db: exec_sql = """ update wave_data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() where id = %s """ message = message if len(message) <= 200 else message[0:200] plt.execute(exec_sql, (message, id)) def update_wave_trans_status_success(id, wind_count=0, time_granularity=0, min_date=datetime.datetime.now(), max_date=datetime.datetime.now(), total_count=0, save_db=True): if save_db: if min_date is not None: exec_sql = """ update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(), data_min_time= %s,data_max_time= %s,transfer_data_count=%s where id = %s """ plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id)) else: exec_sql = """ update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now() where id = %s """ plt.execute(exec_sql, (wind_count, time_granularity, id)) def update_wave_trans_transfer_progress(id, transfer_progress=0, save_db=True): if save_db: exec_sql = """ update wave_data_transfer set transfer_progress = %s where id = %s """ plt.execute(exec_sql, (int(transfer_progress), id)) def create_wave_table(table_name, save_db=True): if save_db: exec_sql = f""" CREATE TABLE `{table_name}` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号', `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号', `time_stamp` datetime DEFAULT NULL COMMENT '时间', `rotational_speed` float DEFAULT NULL COMMENT '转速', `sampling_frequency` varchar(50) DEFAULT NULL COMMENT '采样频率', `mesure_point_name` varchar(100) DEFAULT NULL COMMENT '测点名称', `mesure_data` mediumtext COMMENT '测点数据', PRIMARY KEY (`id`), KEY `wind_turbine_number` (`wind_turbine_number`), KEY `time_stamp` (`time_stamp`), KEY `mesure_point_name` (`mesure_point_name`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 """ plt.execute(exec_sql) # 获取执行的数据 def get_wave_exec_data(run_count: int = 1) -> dict: query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0" query_next_exec_sql = """ SELECT t.*,a.field_name,b.batch_name FROM wave_data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code inner join wind_field_batch b on t.batch_code = b.batch_code WHERE t.trans_sys_status in (-1,1,2) and t.transfer_state = 0 AND t.transfer_addr != '' ORDER BY t.update_time LIMIT 1 """ data = plt.execute(query_running_sql) now_count = int(data[0]['count']) if now_count >= run_count: return None else: data = plt.execute(query_next_exec_sql) if type(data) == tuple: return {} return data[0]