Переглянути джерело

添加prod,dev运行环境

wzl 11 місяців тому
батько
коміт
03f168898e

+ 27 - 12
conf/db.py

@@ -3,29 +3,44 @@
 # @Author  : 魏志亮
 
 mysql_config = \
-    {'plt': {'database': 'energy_prod',
-             'host': '192.168.50.233',
-             'password': 'admin123456',
-             'port': 3306,
-             'user': 'admin'},
+    {'plt_dev': {'database': 'energy',
+                 'host': '192.168.50.233',
+                 'password': 'admin123456',
+                 'port': 3306,
+                 'user': 'admin'},
+
+     'plt_prod': {'database': 'energy_prod',
+                  'host': '192.168.50.233',
+                  'password': 'admin123456',
+                  'port': 3306,
+                  'user': 'admin'},
+
      'plt_connect_pool_config':
          {'blocking': True,
           'charset': 'utf8mb4',
           'maxcached': 5,
-          'maxconnections': 10,
+          'maxconnections': 20,
           'maxshared': 0,
           'mincached': 2,
           'setsession': []},
-     'trans': {'database': 'energy_data_prod',
-               'host': '192.168.50.235',
-               'password': 'admin123456',
-               'port': 30306,
-               'user': 'root'},
+
+     'trans_dev': {'database': 'energy_data',
+                   'host': '192.168.50.235',
+                   'password': 'admin123456',
+                   'port': 30306,
+                   'user': 'root'},
+
+     'trans_prod': {'database': 'energy_data_prod',
+                    'host': '192.168.50.235',
+                    'password': 'admin123456',
+                    'port': 30306,
+                    'user': 'root'},
+
      'trans_connect_pool_config':
          {'blocking': True,
           'charset': 'utf8',
           'maxcached': 20,
-          'maxconnections': 10,
+          'maxconnections': 20,
           'maxshared': 0,
           'mincached': 1,
           'setsession': []}

+ 22 - 12
etl/base/WindFarms.py

@@ -8,24 +8,26 @@ import traceback
 
 from etl.base.TranseParam import TranseParam
 from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \
-    update_trans_status_success
-from service.trans_service import creat_table_and_add_partition, rename_table, save_file_to_db, drop_table
+    update_trans_status_success, update_trans_transfer_progress
+from service.trans_service import creat_table_and_add_partition, save_file_to_db, drop_table
+from utils.df_utils.util import get_time_space
 from utils.file.trans_methods import *
 from utils.zip.unzip import unzip, unrar, get_desc_path
 
 
 class WindFarms(object):
 
-    def __init__(self, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None,
+    def __init__(self, batch_no=None, batch_name=None, field_code=None, params: TranseParam = None, wind_full_name=None,
                  save_db=True, header=0):
         self.batch_no = batch_no
+        self.batch_name = batch_name
         self.field_code = field_code
         self.wind_full_name = wind_full_name
         self.save_zip = False
         self.trans_param = params
         self.exist_wind_names = multiprocessing.Manager().list()
         self.wind_col_trans = get_all_wind(self.field_code)
-        self.batch_count = 50000
+        self.batch_count = 200000
         self.save_path = None
         self.save_db = save_db
         self.lock = multiprocessing.Manager().Lock()
@@ -49,10 +51,11 @@ class WindFarms(object):
                 raise Exception("Invalid param set :" + arg)
 
     def get_save_path(self):
-        return os.path.join(self.save_path, self.batch_no, self.trans_param.read_type)
+        return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.trans_param.read_type)
 
     def get_save_tmp_path(self):
-        return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no, self.trans_param.read_type)
+        return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no + "_" + self.batch_name,
+                            self.trans_param.read_type)
 
     def get_excel_tmp_path(self):
         return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
@@ -210,8 +213,9 @@ class WindFarms(object):
     def set_statistics_data(self, df):
 
         if not df.empty:
-            min_date = pd.to_datetime(df['time_stamp']).min()
-            max_date = pd.to_datetime(df['time_stamp']).max()
+            df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+            min_date = df['time_stamp'].min()
+            max_date = df['time_stamp'].max()
             with self.lock:
 
                 if 'min_date' in self.statistics_map.keys():
@@ -231,6 +235,9 @@ class WindFarms(object):
                 else:
                     self.statistics_map['total_count'] = df.shape[0]
 
+                if 'time_granularity' not in self.statistics_map.keys():
+                    self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
+
     def save_statistics_file(self):
         save_path = os.path.join(os.path.dirname(self.get_save_path()),
                                  self.trans_param.read_type + '_statistics.txt')
@@ -458,7 +465,7 @@ class WindFarms(object):
             # if self.trans_param.wind_name_exec:
             #     wind_name = "测试"
             #     eval(self.trans_param.wind_name_exec)
-
+            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 5, self.save_db)
             trans_print("初始化字段结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
@@ -469,6 +476,7 @@ class WindFarms(object):
             trans_print("开始保存到临时路径")
             # 开始读取数据并分类保存临时文件
             self.remove_file_to_tmp_path()
+            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 20, self.save_db)
             trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
@@ -479,6 +487,7 @@ class WindFarms(object):
 
             # 开始读取数据并分类保存临时文件
             self.read_file_and_save_tmp()
+            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 50, self.save_db)
             trans_print("保存到临时文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
@@ -487,6 +496,7 @@ class WindFarms(object):
             trans_print("开始保存到文件")
             self.mutiprocessing_to_save_file()
             self.save_statistics_file()
+            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 70, self.save_db)
             trans_print("保存到文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
@@ -497,12 +507,12 @@ class WindFarms(object):
                 self.mutiprocessing_to_save_db()
                 trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                             str(datetime.datetime.now() - begin))
+        update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 100, self.save_db)
         # 如果end==0 则说明只是进行了验证
         if end != 0:
             update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                        len(read_excel_files(self.get_read_tmp_path())), self.save_db)
-
-        self.delete_tmp_files()
+                                        len(read_excel_files(self.get_read_tmp_path())),
+                                        self.statistics_map['time_granularity'], self.save_db)
 
         trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
                     str(datetime.datetime.now() - begin))

+ 26 - 22
schedule_service.py

@@ -1,16 +1,10 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/11
 # @Author  : 魏志亮
+import os
 import sys
 import traceback
 
-from etl.base.TranseParam import TranseParam
-from etl.base.WindFarms import WindFarms
-from service.plt_service import get_exec_data, update_trans_status_error
-from service.trans_service import get_trans_conf
-from utils.conf.read_conf import read_conf
-from utils.log.trans_log import trans_print, set_trance_id
-
 
 def run_schedule(step=0, end=4):
     data = get_exec_data()
@@ -20,15 +14,17 @@ def run_schedule(step=0, end=4):
         trans_print("当前无任务")
     else:
         batch_no = data['batch_code']
+        batch_name = data['batch_name']
         transfer_type = data['transfer_type']
         transfer_file_addr = data['transfer_addr']
         field_code = data['field_code']
         field_name = data['field_name']
 
-        __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=True)
+        __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
+                     save_db=True)
 
 
-def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_addr=None, field_name=None,
+def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
               field_code="测试", save_db=False):
     if batch_no is None or str(batch_no).strip() == '':
         return "批次编号不能为空"
@@ -39,11 +35,12 @@ def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_ad
     if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
         return "文件路径不能为空"
 
-    __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code,
+    __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
                  save_db=save_db)
 
 
-def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, field_name=None, field_code="测试",
+def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
+                 field_code="测试",
                  save_db=False):
     trance_id = '-'.join([batch_no, field_name, transfer_type])
     set_trance_id(trance_id)
@@ -91,7 +88,7 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
         for col in trans_cols:
             cols_trans_all[col] = read_conf(conf_map, col, '')
 
-        trans_subject = WindFarms(batch_no=batch_no, field_code=field_code,
+        trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
                                   wind_full_name=wind_full_name, save_db=save_db, header=begin_header)
 
         params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
@@ -110,22 +107,29 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
             update_trans_status_error(batch_no, transfer_type, message, save_db)
         finally:
             set_trance_id("")
+            trans_subject.delete_tmp_files()
 
 
 if __name__ == '__main__':
-    # step = 0
-    # end = 3
-    # batch_no = '新艾里-2024021_1'
-    # transfer_type = 'second_1'
-    # transfer_file_addr = r'/data/download/collection_data/1进行中/新艾里风电场-吉林-大唐/收资数据/sec/新艾里风场2024年一月至三月风向数据.zip'
-    # field_name = '新艾里风电场'
-    # field_code = "测试"
-    # run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=False)
     env = None
-    if len(sys.argv) > 2:
+    if len(sys.argv) >= 2:
         env = sys.argv[1]
 
+    print(sys.argv)
     if env is None:
         raise Exception("请配置运行环境")
 
-    # run_schedule()
+    os.environ['env'] = env
+
+    from utils.log.trans_log import trans_print, set_trance_id
+    from etl.base.TranseParam import TranseParam
+    from etl.base.WindFarms import WindFarms
+    from service.plt_service import get_exec_data, update_trans_status_error
+    from service.trans_service import get_trans_conf
+    from utils.conf.read_conf import read_conf
+
+    run_schedule()
+
+    # run_local(4, 4, batch_no='WOF053600062-WOB00022', batch_name='test-6-24', transfer_type='second',
+    #           transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
+    #           field_code="测试", save_db=True)

+ 14 - 4
service/plt_service.py

@@ -24,13 +24,22 @@ def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
         plt.execute(exec_sql, (message, batch_no, trans_type))
 
 
-def update_trans_status_success(batch_no, trans_type, wind_count=0, save_db=True):
+def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0, save_db=True):
     if save_db:
         exec_sql = """
-        update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,transfer_finish_time=now()  
+        update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()  
         where batch_code = %s  and transfer_type = %s
         """
-        plt.execute(exec_sql, (wind_count, batch_no, trans_type))
+        plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
+
+
+def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True):
+    if save_db:
+        exec_sql = """
+        update data_transfer set transfer_progress =%s,transfer_finish_time=now()  
+        where batch_code = %s  and transfer_type = %s
+        """
+        plt.execute(exec_sql, (transfer_progress, batch_no, trans_type))
 
 
 # 获取执行的数据
@@ -38,9 +47,10 @@ def get_exec_data() -> dict:
     query_running_sql = "select 1 from data_transfer where trans_sys_status = 0"
     query_next_exdc_sql = """
     SELECT
-        t.*,a.field_name
+        t.*,a.field_name,b.batch_name
     FROM
         data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
+        inner join wind_field_batch b on t.batch_code = b.batch_code
     WHERE
         (t.trans_sys_status = -1 or ( t.trans_sys_status = 2 and t.transfer_state = 0))
     AND t.transfer_addr != ''

+ 18 - 6
service/trans_service.py

@@ -3,8 +3,11 @@
 # @Author  : 魏志亮
 import os
 
+import pandas as pd
 from pandas import DataFrame
+from sqlalchemy import create_engine
 
+from conf.db import mysql_config
 from utils.db.ConnectMysqlPool import ConnectMysqlPool
 from utils.file.trans_methods import read_file_to_df
 from utils.log.trans_log import trans_print
@@ -110,12 +113,21 @@ def drop_table(table_name, save_db=True):
             trans_print(e)
 
 
-def save_file_to_db(table_name: str, file: str, batch_count=20000):
-    save_df_to_db(table_name, read_file_to_df(file), batch_count)
-
-
-def save_df_to_db(table_name: str, df: DataFrame, batch_count=20000):
-    trans.df_batch_save(table_name, df, batch_count)
+def save_file_to_db(table_name: str, file: str, batch_count=200000):
+    # trans.df_batch_save(table_name, df, batch_count)
+    env = os.environ['env']
+
+    config = mysql_config['trans_' + env]
+    username = config['user']
+    password = config['password']
+    host = config['host']
+    port = config['port']
+    dbname = config['database']
+    engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
+    for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
+        df.to_sql(table_name, engine, if_exists='append', index=False)
+        count = (i + 1) * batch_count
+        trans_print(f"Chunk {count} written to MySQL.")
 
 
 if __name__ == '__main__':

+ 3 - 0
test/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/18
+# @Author  : 魏志亮

+ 3 - 7
utils/db/ConnectMysqlPool.py

@@ -37,14 +37,10 @@ class ConnectMysqlPool:
         db (str): 测试库名称。
         db_account (dict): 包含数据库账号信息的字典。
         """
-        file_path = os.path.join(
-            os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
-            "conf",
-            "db.yaml"
-        )
         self.yaml_data = mysql_config
         self.connet_name = connet_name
-
+        self.env = os.environ['env']
+        print("self.env", self.env)
         # 创建连接池
         self.pool = self.create_mysql_pool()
 
@@ -58,7 +54,7 @@ class ConnectMysqlPool:
         """
         pool = PooledDB(
             **self.yaml_data[self.connet_name + '_connect_pool_config'],
-            **self.yaml_data[self.connet_name],
+            **self.yaml_data[self.connet_name + "_" + self.env],
             ping=2,
             creator=pymysql
         )

+ 3 - 1
utils/df_utils/util.py

@@ -10,10 +10,12 @@ def get_time_space(df, time_str):
     """
     :return: 查询时间间隔
     """
+    begin = datetime.datetime.now()
     df1 = pd.DataFrame(df[time_str])
     df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str]
     result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds
     del df1
+    print(datetime.datetime.now() - begin)
     return result
 
 
@@ -29,7 +31,7 @@ def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datet
 
 
 if __name__ == '__main__':
-    df = pd.read_csv(r"D:\下载\#16.csv")
+    df = pd.read_csv(r"D:\trans_data\01.csv")
     df['time_stamp'] = pd.to_datetime(df['time_stamp'])
     space = get_time_space(df, 'time_stamp')
     min = df['time_stamp'].min()

+ 2 - 5
utils/log/trans_log.py

@@ -8,9 +8,6 @@ import os
 import sys
 
 
-# 创建一个ThreadLocal对象来存储链路ID
-
-
 def set_trance_id(trace_id):
     """设置当前线程的链路ID"""
     os.environ['trace_id'] = trace_id
@@ -27,7 +24,7 @@ class ContextFilter(logging.Filter):
         return True
 
 
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("trans_data")
 logger.setLevel(logging.INFO)
 stout_handle = logging.StreamHandler(sys.stdout)
 stout_handle.setFormatter(
@@ -36,7 +33,7 @@ stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-log_path = r'/data/logs/trans_data'
+log_path = r'/data/logs/trans_data_' + os.environ['env']
 file_path = os.path.join(log_path)
 
 if not os.path.exists(file_path):

+ 11 - 0
utils/tmp_use/combine_two_datas.py

@@ -0,0 +1,11 @@
+import os
+import pandas as pd
+
+from utils.file.trans_methods import read_file_to_df
+
+
+def combine_two_datas(path):
+    for file in os.listdir(path):
+        read_file = os.path.join(path, file)
+        df = read_file_to_df(read_file)
+        other_file = read_file.replace("second", "second_1").replace("新艾里-2024021", "新艾里-2024021_1")