wzl 11 hónapja
szülő
commit
7bc964ea57

+ 2 - 1
.gitignore

@@ -1,4 +1,5 @@
 logs
 *.pyc
 *.iml
-.idea
+.idea
+test

+ 32 - 0
conf/db.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/19
+# @Author  : 魏志亮
+
+mysql_config = \
+    {'plt': {'database': 'energy_prod',
+             'host': '192.168.50.233',
+             'password': 'admin123456',
+             'port': 3306,
+             'user': 'admin'},
+     'plt_connect_pool_config':
+         {'blocking': True,
+          'charset': 'utf8mb4',
+          'maxcached': 5,
+          'maxconnections': 10,
+          'maxshared': 0,
+          'mincached': 2,
+          'setsession': []},
+     'trans': {'database': 'energy_data_prod',
+               'host': '192.168.50.235',
+               'password': 'admin123456',
+               'port': 30306,
+               'user': 'root'},
+     'trans_connect_pool_config':
+         {'blocking': True,
+          'charset': 'utf8',
+          'maxcached': 20,
+          'maxconnections': 10,
+          'maxshared': 0,
+          'mincached': 1,
+          'setsession': []}
+     }

+ 0 - 34
conf/db.yaml

@@ -1,34 +0,0 @@
-plt_connect_pool_config:
-  charset: "utf8mb4"
-  mincached: 2 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
-  maxcached: 5 # 连接池允许最大的连接数, 0和None表示不限制连接数
-  maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
-  maxconnections: 10 # 连接池最大并发连接数量
-  blocking: True # 连接池中没有可用连接后,是否阻塞等待
-  setsession: [ ] # 开始会话前执行的命令列表
-
-plt:
-  host: 192.168.50.233
-  port: 3306
-  user: admin
-  password: admin123456
-  database: energy
-
-
-trans_connect_pool_config:
-  charset: "utf8"
-  mincached: 1 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
-  maxcached: 20 # 连接池允许最大的连接数, 0和None表示不限制连接数
-  maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
-  maxconnections: 10 # 连接池最大并发连接数量
-  blocking: True # 连接池中没有可用连接后,是否阻塞等待
-  setsession: [ ] # 开始会话前执行的命令列表
-
-
-trans:
-  host: 192.168.50.233
-  port: 3306
-  user: admin
-  password: admin123456
-  database: energy_data
-

+ 79 - 61
etl/base/WindFarms.py

@@ -4,20 +4,20 @@
 import datetime
 import multiprocessing
 import tempfile
+import traceback
 
 from etl.base.TranseParam import TranseParam
 from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \
     update_trans_status_success
-from service.trans_service import creat_table_and_add_partition, rename_table, save_file_to_db
+from service.trans_service import creat_table_and_add_partition, rename_table, save_file_to_db, drop_table
 from utils.file.trans_methods import *
-from utils.log.trans_log import logger
 from utils.zip.unzip import unzip, unrar, get_desc_path
 
 
 class WindFarms(object):
 
     def __init__(self, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None,
-                 save_db=True):
+                 save_db=True, header=0):
         self.batch_no = batch_no
         self.field_code = field_code
         self.wind_full_name = wind_full_name
@@ -30,6 +30,7 @@ class WindFarms(object):
         self.save_db = save_db
         self.lock = multiprocessing.Manager().Lock()
         self.statistics_map = multiprocessing.Manager().dict()
+        self.header = header
 
     def set_trans_param(self, params: TranseParam):
         self.trans_param = params
@@ -81,9 +82,14 @@ class WindFarms(object):
                     df.drop(key, axis=1, inplace=True)
 
         df = del_blank(df, ['wind_turbine_number'])
+        df = df[df['time_stamp'].isna() == False]
+        if self.trans_param.wind_name_exec:
+            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+            df['wind_turbine_number'] = eval(exec_str)
+
         self.save_to_tmp_csv(df, file)
 
-    def get_and_remove(self, file):
+    def get_and_remove(self, file, thead_local=None):
 
         to_path = self.get_excel_tmp_path()
         if str(file).endswith("zip"):
@@ -94,19 +100,21 @@ class WindFarms(object):
                 is_success, e = unzip(file, get_desc_path(desc_path))
                 self.trans_param.has_zip = True
                 if not is_success:
-                    raise e
+                    # raise e
+                    pass
         elif str(file).endswith("rar"):
             desc_path = file.replace(self.trans_param.read_path, to_path)
             is_success, e = unrar(file, get_desc_path(desc_path))
             self.trans_param.has_zip = True
             if not is_success:
-                raise e
+                # raise e
+                pass
         else:
             copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
 
-    def read_excel_to_df(self, file):
+    def read_excel_to_df(self, file_path):
 
-        read_cols = [v for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
+        read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
 
         trans_dict = {}
         for k, v in self.trans_param.cols_tran.items():
@@ -115,11 +123,10 @@ class WindFarms(object):
 
         if self.trans_param.is_vertical_table:
             vertical_cols = self.trans_param.vertical_cols
-            df = read_file_to_df(file, vertical_cols)
+            df = read_file_to_df(file_path, vertical_cols, header=self.header)
             df = df[df[self.trans_param.vertical_key].isin(read_cols)]
             df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
                                self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
-
             df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
                 df[self.trans_param.vertical_key])
 
@@ -128,16 +135,16 @@ class WindFarms(object):
         else:
             trans_dict = dict()
             for k, v in self.trans_param.cols_tran.items():
-                if v and v.startswith("$"):
+                if v and v.startswith("$") or v.find(",") > 0:
                     trans_dict[v] = k
 
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file)
+                df = read_file_to_df(file_path, header=self.header)
             else:
                 if self.trans_param.need_valid_cols:
-                    df = read_file_to_df(file, read_cols)
+                    df = read_file_to_df(file_path, read_cols, header=self.header)
                 else:
-                    df = read_file_to_df(file)
+                    df = read_file_to_df(file_path, header=self.header)
 
             # 处理列名前缀问题
             if self.trans_param.resolve_col_prefix:
@@ -148,16 +155,23 @@ class WindFarms(object):
 
             for k, v in trans_dict.items():
                 if k.startswith("$file"):
-                    file_name = ".".join(os.path.basename(file).split(".")[0:-1])
+                    file = ".".join(os.path.basename(file_path).split(".")[0:-1])
                     if k == "$file":
-                        df[v] = str(file_name)
-                    else:
+                        df[v] = str(file)
+                    elif k.startswith("$file["):
                         datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
                         if len(datas) != 2:
                             raise Exception("字段映射出现错误 :" + str(trans_dict))
-                        df[v] = str(file_name[int(datas[0]):int(datas[1])]).strip()
+                        df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
+                elif k.find("$file_date") > 0:
+                    datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
+                    if len(datas) != 2:
+                        raise Exception("字段映射出现错误 :" + str(trans_dict))
+                    date_str = str(file[int(datas[0]):int(datas[1])]).strip()
+                    df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
+
                 elif k.startswith("$folder"):
-                    folder = file
+                    folder = file_path
                     cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
                     for i in range(cengshu):
                         folder = os.path.dirname(folder)
@@ -243,8 +257,22 @@ class WindFarms(object):
 
         df = df[self.trans_param.cols_tran.keys()]
 
+        # 转化风机名称
+        trans_print("开始转化风机名称")
+        # if self.trans_param.wind_name_exec:
+        #     exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+        # df['wind_turbine_number'] = eval(exec_str)
+        df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
+        df['wind_turbine_number'] = df['wind_turbine_number'].map(
+            self.wind_col_trans).fillna(
+            df['wind_turbine_number'])
+
+        wind_col_name = str(df['wind_turbine_number'].values[0])
         # 添加年月日
-        trans_print("包含时间字段,开始处理时间字段,添加年月日", filename)
+        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
+        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
+        df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
+        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
         df['time_stamp'] = pd.to_datetime(df['time_stamp'])
         df['year'] = df['time_stamp'].dt.year
         df['month'] = df['time_stamp'].dt.month
@@ -254,18 +282,13 @@ class WindFarms(object):
             lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
         trans_print("处理时间字段结束")
 
-        # 转化风机名称
-        trans_print("开始转化风机名称")
-        if self.trans_param.wind_name_exec:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
-
-        df['wind_turbine_number'] = df['wind_turbine_number'].map(
-            self.wind_col_trans).fillna(
-            df['wind_turbine_number'])
-        trans_print("转化风机名称结束")
+        # 如果包含*号,祛除
+        trans_print(wind_col_name, "过滤星号前大小:", df.shape[0])
+        mask = ~df.applymap(lambda x: isinstance(x, str) and '*' in x).any(axis=1)
+        df = df[mask]
+        trans_print(wind_col_name, "过滤星号后大小:", df.shape[0])
 
-        wind_col_name = str(df['wind_turbine_number'].values[0])
+        trans_print(wind_col_name, "转化风机名称结束")
 
         if self.save_zip:
             save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv.gz')
@@ -297,14 +320,12 @@ class WindFarms(object):
 
             trans_print('读取文件数量:', len(all_files))
         except Exception as e:
-            logger.exception(e)
+            trans_print(traceback.format_exc())
             message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
-            raise e
+            raise ValueError(message)
         return all_files
 
     def read_file_and_save_tmp(self):
-
         all_files = read_excel_files(self.get_excel_tmp_path())
         if self.trans_param.merge_columns:
             dfs_list = list()
@@ -335,25 +356,23 @@ class WindFarms(object):
             try:
                 self.df_save_to_tmp_file(df, "")
             except Exception as e:
-                logger.exception(e)
+                trans_print(traceback.format_exc())
                 message = "合并列出现错误:" + str(e)
-                update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
-                raise e
+                raise ValueError(message)
 
         else:
-            all_arrays = split_array(all_files, 6)
+            split_count = 6
+            all_arrays = split_array(all_files, split_count)
             for arr in all_arrays:
-                with multiprocessing.Pool(6) as pool:
+                with multiprocessing.Pool(split_count) as pool:
                     dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
                 try:
                     for df in dfs:
                         self.df_save_to_tmp_file(df)
                 except Exception as e:
-                    logger.exception(e)
+                    trans_print(traceback.format_exc())
                     message = "整理临时文件,系统返回错误:" + str(e)
-                    update_trans_status_error(self.batch_no, self.trans_param.read_type, message,
-                                              self.save_db)
-                    raise e
+                    raise ValueError(message)
 
     def mutiprocessing_to_save_file(self):
         # 开始保存到正式文件
@@ -362,12 +381,10 @@ class WindFarms(object):
         try:
             with multiprocessing.Pool(6) as pool:
                 pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
-
         except Exception as e:
-            logger.exception(e)
+            trans_print(traceback.format_exc())
             message = "保存文件错误,系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
-            raise e
+            raise ValueError(message)
 
         trans_print("结束保存到excel文件")
 
@@ -384,10 +401,9 @@ class WindFarms(object):
                              [(table_name, file, self.batch_count) for file in all_saved_files])
 
         except Exception as e:
-            logger.exception(e)
+            trans_print(traceback.format_exc())
             message = "保存到数据库错误,系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
-            raise e
+            raise ValueError(message)
         trans_print("结束保存到数据库文件")
 
     def _rename_file(self):
@@ -414,9 +430,11 @@ class WindFarms(object):
         trans_print("删除临时文件夹删除成功")
 
     def delete_batch_db(self):
-        table_name = "_".join([self.batch_no, self.trans_param.read_type])
-        renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
-        rename_table(table_name, renamed_table_name)
+        if self.save_db:
+            table_name = "_".join([self.batch_no, self.trans_param.read_type])
+            renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
+            # rename_table(table_name, renamed_table_name, self.save_db)
+            drop_table(table_name, self.save_db)
 
     def run(self, step=0, end=3):
         begin = datetime.datetime.now()
@@ -433,13 +451,13 @@ class WindFarms(object):
             self.params_valid([self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
                                self.trans_param.read_path, self.wind_full_name])
 
-            if self.trans_param.resolve_col_prefix:
-                column = "测试"
-                eval(self.trans_param.resolve_col_prefix)
-
-            if self.trans_param.wind_name_exec:
-                wind_name = "测试"
-                eval(self.trans_param.wind_name_exec)
+            # if self.trans_param.resolve_col_prefix:
+            #     column = "测试"
+            #     eval(self.trans_param.resolve_col_prefix)
+            #
+            # if self.trans_param.wind_name_exec:
+            #     wind_name = "测试"
+            #     eval(self.trans_param.wind_name_exec)
 
             trans_print("初始化字段结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))

+ 28 - 15
schedule_service.py

@@ -1,18 +1,18 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/11
 # @Author  : 魏志亮
-import multiprocessing
 import sys
+import traceback
 
 from etl.base.TranseParam import TranseParam
 from etl.base.WindFarms import WindFarms
 from service.plt_service import get_exec_data, update_trans_status_error
 from service.trans_service import get_trans_conf
 from utils.conf.read_conf import read_conf
-from utils.log.trans_log import init_log, trans_print, logger
+from utils.log.trans_log import trans_print, set_trance_id
 
 
-def run_schedule(step=0, end=3):
+def run_schedule(step=0, end=4):
     data = get_exec_data()
     if data is None:
         trans_print("当前有任务在执行")
@@ -33,7 +33,7 @@ def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_ad
     if batch_no is None or str(batch_no).strip() == '':
         return "批次编号不能为空"
 
-    if transfer_type not in ['second', 'minute']:
+    if transfer_type not in ['second', 'minute', 'second_1']:
         return "查询类型错误"
 
     if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
@@ -45,7 +45,8 @@ def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_ad
 
 def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, field_name=None, field_code="测试",
                  save_db=False):
-    init_log(batch_no, field_name, transfer_type)
+    trance_id = '-'.join([batch_no, field_name, transfer_type])
+    set_trance_id(trance_id)
     conf_map = get_trans_conf(field_name, transfer_type)
     if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
         message = f"未找到{field_name}的{transfer_type}配置"
@@ -65,6 +66,8 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
         vertical_value = read_conf(conf_map, 'vertical_col_value')
         need_valid_cols = not merge_columns
 
+        begin_header = read_conf(conf_map, 'begin_header', 0)
+
         cols_trans_all = dict()
         trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
                       'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
@@ -89,7 +92,7 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
             cols_trans_all[col] = read_conf(conf_map, col, '')
 
         trans_subject = WindFarms(batch_no=batch_no, field_code=field_code,
-                                  wind_full_name=wind_full_name, save_db=save_db)
+                                  wind_full_name=wind_full_name, save_db=save_db, header=begin_header)
 
         params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
                              cols_tran=cols_trans_all,
@@ -102,17 +105,27 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
         try:
             trans_subject.run(step=step, end=end)
         except Exception as e:
-            logger.exception(e)
+            trans_print(traceback.format_exc())
             message = "系统返回错误:" + str(e)
             update_trans_status_error(batch_no, transfer_type, message, save_db)
+        finally:
+            set_trance_id("")
 
 
 if __name__ == '__main__':
-    step = 4
-    end = 4
-    batch_no = 'WOF063100040-WOB00008'
-    transfer_type = 'second'
-    transfer_file_addr = r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据'
-    field_name = '招远风电场'
-    field_code = "测试"
-    run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=True)
+    # step = 0
+    # end = 3
+    # batch_no = '新艾里-2024021_1'
+    # transfer_type = 'second_1'
+    # transfer_file_addr = r'/data/download/collection_data/1进行中/新艾里风电场-吉林-大唐/收资数据/sec/新艾里风场2024年一月至三月风向数据.zip'
+    # field_name = '新艾里风电场'
+    # field_code = "测试"
+    # run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=False)
+    env = None
+    if len(sys.argv) > 2:
+        env = sys.argv[1]
+
+    if env is None:
+        raise Exception("请配置运行环境")
+
+    # run_schedule()

+ 2 - 2
service/plt_service.py

@@ -9,7 +9,7 @@ plt = ConnectMysqlPool("plt")
 def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
     if schedule_exec:
         exec_sql = """
-        update data_transfer set trans_sys_status = 0,transfer_finish_time=now()  
+        update data_transfer set trans_sys_status = 0 
         where batch_code = %s  and transfer_type = %s
         """
         plt.execute(exec_sql, (batch_no, trans_type))
@@ -63,7 +63,7 @@ def get_all_wind(field_code):
     dict_datas = plt.execute(query_sql, (field_code,))
     result = dict()
     for data in dict_datas:
-        result[data['engine_name']] = data['engine_code']
+        result[str(data['engine_name'])] = str(data['engine_code'])
     return result
 
 

+ 23 - 9
service/trans_service.py

@@ -1,10 +1,13 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
+import os
+
 from pandas import DataFrame
 
 from utils.db.ConnectMysqlPool import ConnectMysqlPool
 from utils.file.trans_methods import read_file_to_df
+from utils.log.trans_log import trans_print
 
 trans = ConnectMysqlPool("trans")
 
@@ -89,12 +92,22 @@ def creat_table_and_add_partition(table_name, count, read_type):
     trans.execute(create_sql)
 
 
-def rename_table(table_name, renamed_table_name):
-    rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
-    try:
-        trans.execute(rename_sql)
-    except Exception as e:
-        print(e)
+def rename_table(table_name, renamed_table_name, save_db=True):
+    if save_db:
+        rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
+        try:
+            trans.execute(rename_sql)
+        except Exception as e:
+            trans_print(e)
+
+
+def drop_table(table_name, save_db=True):
+    if save_db:
+        rename_sql = f"drop TABLE `{table_name}` "
+        try:
+            trans.execute(rename_sql)
+        except Exception as e:
+            trans_print(e)
 
 
 def save_file_to_db(table_name: str, file: str, batch_count=20000):
@@ -106,6 +119,7 @@ def save_df_to_db(table_name: str, df: DataFrame, batch_count=20000):
 
 
 if __name__ == '__main__':
-    get_trans_conf('唐龙三期风电场', 'second')
-
-    save_file_to_db("test_唐龙-定时任务测试_second", r"D:\transdata\test\唐龙三期风电场-安徽-大唐\清理数据\test_唐龙-定时任务测试\second\C02.csv")
+    path_prix = r"/data/download/collection_data/2完成/招远风电场-山东-大唐/清理数据/WOF063100040-WOB00013/second"
+    files = ["WOG00030.csv", "WOG00034.csv"]
+    for path in files:
+        save_file_to_db("WOF063100040-WOB00013_second", path_prix + os.sep + path, batch_count=100000)

+ 6 - 0
utils/conf/read_conf.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
+
 import yaml
 
 
@@ -18,3 +19,8 @@ def read_conf(dict_conf, col, default_value=None):
         return res
     else:
         return default_value
+
+
+if __name__ == '__main__':
+    from pprint import pprint
+    pprint(yaml_conf("../../conf/db.yaml"))

+ 12 - 10
utils/db/ConnectMysqlPool.py

@@ -7,7 +7,7 @@ import os
 from pandas import DataFrame
 from pymysql.cursors import DictCursor
 
-from utils.conf.read_conf import yaml_conf
+from conf.db import mysql_config
 from utils.log.trans_log import trans_print
 
 
@@ -42,7 +42,7 @@ class ConnectMysqlPool:
             "conf",
             "db.yaml"
         )
-        self.yaml_data = yaml_conf(file_path)
+        self.yaml_data = mysql_config
         self.connet_name = connet_name
 
         # 创建连接池
@@ -59,6 +59,7 @@ class ConnectMysqlPool:
         pool = PooledDB(
             **self.yaml_data[self.connet_name + '_connect_pool_config'],
             **self.yaml_data[self.connet_name],
+            ping=2,
             creator=pymysql
         )
         return pool
@@ -95,7 +96,7 @@ class ConnectMysqlPool:
                     result = cursor.fetchall()
                     return result
                 except Exception as e:
-                    print(f"执行sql:{sql},报错:{e}")
+                    trans_print(f"执行sql:{sql},报错:{e}")
                     conn.rollback()
                     raise e
 
@@ -111,7 +112,7 @@ class ConnectMysqlPool:
                     cursor.execute(insert_sql, tuple(params.values()))
                     conn.commit()
                 except Exception as e:
-                    print(f"执行sql:{insert_sql},报错:{e}")
+                    trans_print(f"执行sql:{insert_sql},报错:{e}")
                     conn.rollback()
                     raise e
 
@@ -131,12 +132,13 @@ class ConnectMysqlPool:
                 with conn.cursor() as cursor:
                     try:
                         query_df = df.iloc[i:i + batch_count]
-                        values = [tuple(data) for data in query_df.values]
-                        cursor.executemany(insert_sql, values)
-                        conn.commit()
-                        result = cursor.fetchall()
-                        print(
-                            "总条数" + str(df.shape[0]) + ",已保存:" + str(i + batch_count))
+                        if not query_df.empty:
+                            values = [tuple(data) for data in query_df.values]
+                            cursor.executemany(insert_sql, values)
+                            conn.commit()
+                            result = cursor.fetchall()
+                            trans_print(
+                                "总条数" + str(df.shape[0]) + ",已保存:" + str(i + batch_count))
                     except Exception as e:
                         conn.rollback()
                         raise e

+ 3 - 0
utils/df_utils/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/21
+# @Author  : 魏志亮

+ 39 - 0
utils/df_utils/util.py

@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/21
+# @Author  : 魏志亮
+import datetime
+
+import pandas as pd
+
+
+def get_time_space(df, time_str):
+    """
+    :return: 查询时间间隔
+    """
+    df1 = pd.DataFrame(df[time_str])
+    df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str]
+    result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds
+    del df1
+    return result
+
+
+def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1):
+    """
+    获取俩个时间之间的个数
+    :return: 查询时间间隔
+    """
+    delta = end_time - start_time
+    total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
+
+    return int(total_seconds / time_space) + 1
+
+
+if __name__ == '__main__':
+    df = pd.read_csv(r"D:\下载\#16.csv")
+    df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+    space = get_time_space(df, 'time_stamp')
+    min = df['time_stamp'].min()
+    max = df['time_stamp'].max()
+    result = get_time_space_count(min, max, space)
+    print(df.shape)
+    print(space, min, max, result)

+ 7 - 7
utils/file/trans_methods.py

@@ -45,7 +45,7 @@ def split_array(array, num):
 
 
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list()):
+def read_file_to_df(file_path, read_cols=list(), header=0):
     trans_print('开始读取文件', file_path)
     df = pd.DataFrame()
     if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
@@ -53,15 +53,15 @@ def read_file_to_df(file_path, read_cols=list()):
         end_with_gz = str(file_path).lower().endswith("gz")
         if read_cols:
             if end_with_gz:
-                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip')
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
             else:
-                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols)
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header)
         else:
 
             if end_with_gz:
-                df = pd.read_csv(file_path, encoding=encoding, compression='gzip')
+                df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
             else:
-                df = pd.read_csv(file_path, encoding=encoding)
+                df = pd.read_csv(file_path, encoding=encoding, header=header)
 
     else:
         xls = pd.ExcelFile(file_path)
@@ -69,9 +69,9 @@ def read_file_to_df(file_path, read_cols=list()):
         sheet_names = xls.sheet_names
         for sheet in sheet_names:
             if read_cols:
-                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, usecols=read_cols)])
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
             else:
-                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet)])
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
 
     trans_print('文件读取成功', file_path, '文件数量', df.shape)
 

+ 35 - 13
utils/log/trans_log.py

@@ -2,31 +2,53 @@
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
 
+import datetime
 import logging
 import os
-import datetime
 import sys
 
+
+# 创建一个ThreadLocal对象来存储链路ID
+
+
+def set_trance_id(trace_id):
+    """设置当前线程的链路ID"""
+    os.environ['trace_id'] = trace_id
+
+
+class ContextFilter(logging.Filter):
+    """一个自定义的日志过滤器,用于在日志记录中添加链路ID"""
+
+    def filter(self, record):
+        record.trace_id = ''
+        if 'trace_id' in os.environ.keys():
+            record.trace_id = os.environ['trace_id']
+
+        return True
+
+
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.INFO)
 stout_handle = logging.StreamHandler(sys.stdout)
-stout_handle.setFormatter(logging.Formatter("%(asctime)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+stout_handle.setFormatter(
+    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
 stout_handle.setLevel(logging.INFO)
+stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
+log_path = r'/data/logs/trans_data'
+file_path = os.path.join(log_path)
 
-def init_log(batch_no, name, type):
-    log_path = r'/data/logs/trans_data'
-    file_path = os.path.join(log_path, str(name), str(batch_no), str(type))
-
-    if not os.path.exists(file_path):
-        os.makedirs(file_path, exist_ok=True)
-    file_name = file_path + os.sep + str(datetime.date.today()) + '.log'
+if not os.path.exists(file_path):
+    os.makedirs(file_path, exist_ok=True)
+file_name = file_path + os.sep + str(datetime.date.today()) + '.log'
 
-    file_handler = logging.FileHandler(file_name, encoding='utf-8')
-    file_handler.setFormatter(logging.Formatter("%(asctime)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
-    file_handler.setLevel(logging.INFO)
-    logger.addHandler(file_handler)
+file_handler = logging.FileHandler(file_name, encoding='utf-8')
+file_handler.setFormatter(
+    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+file_handler.setLevel(logging.INFO)
+file_handler.addFilter(ContextFilter())
+logger.addHandler(file_handler)
 
 
 def trans_print(*args):

+ 15 - 6
utils/zip/unzip.py

@@ -2,10 +2,12 @@
 # @Time    : 2024/5/17
 # @Author  : 魏志亮
 import os
+import traceback
 import zipfile
 
 import rarfile
 
+from utils.file.trans_methods import detect_file_encoding
 from utils.log.trans_log import trans_print, logger
 
 
@@ -28,18 +30,26 @@ def unzip(zip_filepath, dest_path):
     trans_print("解压到:", dest_path)
 
     try:
-        with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
-            zip_ref.extractall(dest_path)
+        if detect_file_encoding(zip_filepath).startswith("gb"):
+            try:
+                with __support_gbk(zipfile.ZipFile(zip_filepath, 'r'))  as zip_ref:
+                    zip_ref.extractall(dest_path)
+            except:
+                with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
+                    zip_ref.extractall(dest_path)
+        else:
+            with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
+                zip_ref.extractall(dest_path)
+
     except zipfile.BadZipFile as e:
-        logger.exception(e)
+        trans_print(traceback.format_exc())
         is_success = False
-        message = str(e)
         trans_print('不是zip文件:', zip_filepath)
         return is_success, e
 
     # 遍历解压后的文件
     dest_path = dest_path
-    print('解压再次读取', dest_path)
+    trans_print('解压再次读取', dest_path)
     if is_success:
         for root, dirs, files in os.walk(dest_path):
             for file in files:
@@ -83,7 +93,6 @@ def unrar(rar_file_path, dest_dir):
     except Exception as e:
         logger.exception(e)
         is_success = False
-        message = str(e)
         trans_print('不是rar文件:', rar_file_path)
         return is_success, e