Pārlūkot izejas kodu

开发故障报警

wzl 9 mēneši atpakaļ
vecāks
revīzija
7e7ddd8ad7

+ 6 - 0
app_run.py

@@ -128,6 +128,12 @@ if __name__ == '__main__':
     if len(sys.argv) >= 3:
         run_count = int(sys.argv[2])
 
+    conf_path = '/data/config/etl_config.yaml'
+    if len(sys.argv) >= 4:
+        conf_path = sys.argv[3]
+
+    os.environ['ETL_CONF'] = conf_path
+
     from utils.log.trans_log import trans_print, set_trance_id
     from etl.base.TransParam import TransParam
     from etl.base.WindFarms import WindFarms

+ 7 - 1
conf/etl_config.yaml

@@ -42,4 +42,10 @@ trans_prod:
 # 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
 etl_origin_path_contain: 收资数据
 # 如果单独保存,配置这个路径
-save_path:
+save_path:
+
+
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /tmp

+ 50 - 0
conf/etl_config_datang.yaml

@@ -0,0 +1,50 @@
+plt_connect_pool_config:
+  blocking: true
+  charset: utf8mb4
+  maxcached: 5
+  maxconnections: 20
+  maxshared: 0
+  mincached: 2
+  setsession: [ ]
+plt_dev:
+  database: energy
+  host: 172.16.37.22
+  password: admin123456
+  port: 3306
+  user: root
+plt_prod:
+  database: energy_prod
+  host: 172.16.37.22
+  password: admin123456
+  port: 3306
+  user: root
+trans_connect_pool_config:
+  blocking: true
+  charset: utf8
+  maxcached: 20
+  maxconnections: 20
+  maxshared: 0
+  mincached: 1
+  setsession: [ ]
+trans_dev:
+  database: energy_data
+  host: 172.16.37.24
+  password: admin123456
+  port: 3306
+  user: root
+trans_prod:
+  database: energy_data_prod
+  host: 172.16.37.24
+  password: admin123456
+  port: 3306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:
+
+log_path_dir: /data/collection_data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /data/collection_data/tmp

+ 5 - 6
etl/base/PathsAndTable.py

@@ -1,12 +1,9 @@
 import os
 import shutil
-import tempfile
-
-import yaml
 
 from service.trans_service import drop_table, creat_table_and_add_partition
-from utils.log.trans_log import trans_print
 from utils.conf.read_conf import *
+from utils.log.trans_log import trans_print
 
 
 class PathsAndTable(object):
@@ -21,7 +18,9 @@ class PathsAndTable(object):
         self.save_zip = save_zip
         self.multi_pool_count = 6
 
-        yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
+        yaml_config = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
+
+        self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
 
         save_path_conf = read_conf(yaml_config, "save_path")
         if save_path_conf:
@@ -39,7 +38,7 @@ class PathsAndTable(object):
         return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
 
     def get_save_tmp_path(self):
-        return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
+        return os.path.join(self.tmp_base_path, self.field_name, self.batch_no + "_" + self.batch_name,
                             self.read_type)
 
     def get_excel_tmp_path(self):

+ 13 - 7
test_app_run.py

@@ -118,7 +118,7 @@ if __name__ == '__main__':
     if len(sys.argv) >= 2:
         env = sys.argv[1]
     else:
-        env = 'dev'
+        env = 'prod'
     print(sys.argv)
     if env is None:
         raise Exception("请配置运行环境")
@@ -129,6 +129,12 @@ if __name__ == '__main__':
     if len(sys.argv) >= 3:
         run_count = int(sys.argv[2])
 
+    conf_path = '/data/config/etl_config.yaml'
+    if len(sys.argv) >= 4:
+        conf_path = sys.argv[3]
+
+    os.environ['ETL_CONF'] = conf_path
+
     from utils.log.trans_log import trans_print, set_trance_id
     from etl.base.TransParam import TransParam
     from etl.base.WindFarms import WindFarms
@@ -142,10 +148,10 @@ if __name__ == '__main__':
     #           transfer_file_addr=r'D:\trans_data\密马风电场\收资数据\minute', field_name='密马风电场',
     #           field_code="WOF035200003", save_db=False)
 
-    run_local(4, 4, batch_no='WOF053600062-WOB000010', batch_name='ZYFDC000013', transfer_type='second',
-              transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
-              field_code="WOF053600062", save_db=True)
+    #run_local(4, 4, batch_no='WOF053600062-WOB0000111-test', batch_name='ZYFDC000013-test', transfer_type='second',
+    #          transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据1111', field_name='招远风电场',
+    #          field_code="WOF053600062", save_db=False)
 
-    # run_local(0, 3, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second',
-    #           transfer_file_addr=r'D:\trans_data\新艾里风电场\收资数据\1号风机', field_name='新艾里风电场',
-    #           field_code="WOF043600007", save_db=False)
+    run_local(0, 4, batch_no='WOF079700019-WOB000004', batch_name='JHS标签min-after', transfer_type='minute',
+               transfer_file_addr=r'/data/download/collection_data/2完成/金华山风电场-江西-大唐/收资数据/调整后/min', field_name='金华山风电场',
+               field_code="WOF079700019", save_db=True)

+ 2 - 1
utils/db/ConnectMysql.py

@@ -12,7 +12,8 @@ from utils.log.trans_log import trans_print
 class ConnectMysql:
 
     def __init__(self, connet_name):
-        self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
+        #self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
+        self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
         self.connet_name = connet_name
         if 'env' in os.environ:
             self.env = os.environ['env']

+ 9 - 3
utils/file/trans_methods.py

@@ -49,6 +49,7 @@ def split_array(array, num):
 def find_read_header(file_path, trans_cols):
     print(trans_cols)
     df = read_file_to_df(file_path, nrows=20)
+    df.reset_index(inplace=True)
     count = 0
     for col in trans_cols:
         if col in df.columns:
@@ -57,13 +58,18 @@ def find_read_header(file_path, trans_cols):
                 return 0
 
     count = 0
-
+    values = list()
     for index, row in df.iterrows():
+        values = list(row.values)
+        if type(row.name) == tuple:
+            values.extend(list(row.name))
         for col in trans_cols:
-            if col in row.values:
+            if col in values:
                 count = count + 1
-                if count > 2:
+                if count >= 2:
                     return index + 1
+                    
+
 
     return None
 

+ 202 - 0
utils/file/trans_methods.py_1

@@ -0,0 +1,202 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮
+import datetime
+import os
+import shutil
+import warnings
+
+import chardet
+import pandas as pd
+
+from utils.log.trans_log import trans_print
+
+warnings.filterwarnings("ignore")
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    trans_print("文件类型:", filename, encoding)
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def del_blank(df=pd.DataFrame(), cols=list()):
+    for col in cols:
+        if df[col].dtype == object:
+            df[col] = df[col].str.strip()
+    return df
+
+
+# 切割数组到多个数组
+def split_array(array, num):
+    return [array[i:i + num] for i in range(0, len(array), num)]
+
+
+def find_read_header(file_path, trans_cols):
+    df = read_file_to_df(file_path, nrows=20)
+    count = 0
+    header = None
+    for col in trans_cols:
+        if col in df.columns:
+            count = count + 1
+            if count >= 2:
+                header = 0
+                break
+
+    count = 0
+
+    values = list()
+    for index, row in df.iterrows():
+        values = list(row.values)
+        if type(row.name) == tuple:
+            values.extend(list(row.name))
+        for col in trans_cols:
+            if col in values:
+                count = count + 1
+                if count > 2:
+                    header = index + 1
+                    break
+
+    read_cols = []
+    for col in values:
+        if col in trans_cols:
+            read_cols.append(col)
+
+    return header, read_cols
+
+
+# 读取数据到df
+def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None):
+    begin = datetime.datetime.now()
+    trans_print('开始读取文件', file_path)
+    header = 0
+    find_cols = list()
+    if trans_cols:
+        header, find_cols = find_read_header(file_path, trans_cols)
+        trans_print(os.path.basename(file_path), "读取第", header, "行")
+        if header is None:
+            message = '未匹配到开始行,请检查并重新指定'
+            trans_print(message)
+            raise Exception(message)
+
+    read_cols.extend(find_cols)
+
+    try:
+        df = pd.DataFrame()
+        if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+            encoding = detect_file_encoding(file_path)
+            end_with_gz = str(file_path).lower().endswith("gz")
+            if read_cols:
+                if end_with_gz:
+                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header,
+                                     nrows=nrows)
+                else:
+                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
+                                     on_bad_lines='warn', nrows=nrows)
+            else:
+
+                if end_with_gz:
+                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
+                else:
+                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
+
+        else:
+            xls = pd.ExcelFile(file_path, engine="calamine")
+            # 获取所有的sheet名称
+            sheet_names = xls.sheet_names
+            for sheet_name in sheet_names:
+                if read_cols:
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows)
+                else:
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
+
+                now_df['sheet_name'] = sheet_name
+                df = pd.concat([df, now_df])
+            xls.close()
+        trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
+    except Exception as e:
+        trans_print('读取文件出错', file_path, str(e))
+        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+        raise ValueError(message)
+
+    return df
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 读取路径下所有的文件
+def read_files(read_path):
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+def copy_to_new(from_path, to_path):
+    is_file = False
+    if to_path.count('.') > 0:
+        is_file = True
+
+    create_file_path(to_path, is_file_path=is_file)
+
+    shutil.copy(from_path, to_path)
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+if __name__ == '__main__':
+    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute")
+    for data in datas:
+        print(data)
+
+    print("*" * 20)
+
+    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute\WOG00066.csv.gz")
+    for data in datas:
+        print(data)

+ 11 - 3
utils/log/trans_log.py

@@ -1,3 +1,4 @@
+
 # -*- coding: utf-8 -*-
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
@@ -7,6 +8,8 @@ import logging
 import os
 import sys
 
+from utils.conf.read_conf import read_conf, yaml_conf
+
 
 def set_trance_id(trace_id):
     """设置当前线程的链路ID"""
@@ -28,12 +31,16 @@ logger = logging.getLogger("etl_tools")
 logger.setLevel(logging.INFO)
 stout_handle = logging.StreamHandler(sys.stdout)
 stout_handle.setFormatter(
-    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+    logging.Formatter("%(asctime)s-%(trace_id)s: %(message)s"))
 stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-log_path = r'/data/logs/etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
+config = yaml_conf(os.environ['ETL_CONF'])
+log_path_dir = read_conf(config, 'log_path_dir', "/data/logs")
+
+log_path = log_path_dir + os.sep + r'etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
+
 file_path = os.path.join(log_path)
 
 if not os.path.exists(file_path):
@@ -42,7 +49,7 @@ file_name = file_path + os.sep + str(datetime.date.today()) + '.log'
 
 file_handler = logging.FileHandler(file_name, encoding='utf-8')
 file_handler.setFormatter(
-    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+    logging.Formatter("%(asctime)s-%(trace_id)s: %(message)s"))
 file_handler.setLevel(logging.INFO)
 file_handler.addFilter(ContextFilter())
 logger.addHandler(file_handler)
@@ -50,3 +57,4 @@ logger.addHandler(file_handler)
 
 def trans_print(*args):
     logger.info("  ".join([str(a) for a in args]))
+