Browse Source

添加华电的配置

魏志亮 1 week ago
parent
commit
9c9afe3994

+ 26 - 0
conf/etl_config_huadian.yaml

@@ -0,0 +1,26 @@
+plt:
+  database: energy
+  host: 192.168.0.1
+  password: admin123456
+  port: 3306
+  user: root
+trans:
+  database: energy_data_prod
+  host: 192.168.0.2
+  password: admin123456
+  port: 3306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:
+
+log_path_dir: /data/collection_data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /data/collection_data/tmp
+
+run_batch_count: 1
+
+archive_path: /data/collection_data/archive

+ 1 - 0
nutika_package.sh

@@ -1,3 +1,4 @@
 #!/bin/bash
+cd /home/wzl/project/no_batch_data_trans
 nuitka --standalone --onefile --static-libpython=yes  --include-data-files=./conf/*=./conf/  --output-dir=/home/wzl/project/install_package --remove-output app_run.py
 

+ 37 - 0
utils/tmp_util/合并文件.py

@@ -0,0 +1,37 @@
+import multiprocessing
+
+read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/整改复核数据/2025年06月19日16时17分41秒'
+
+import os
+import pandas as pd
+
+# 获取文件夹下所有文件的路径
+file_paths = [os.path.join(read_dir, file) for file in os.listdir(read_dir) if
+              os.path.isfile(os.path.join(read_dir, file))]
+
+
+def read_and_save(wind_no, files, save_dir):
+    # 读取文件
+    df = pd.concat([pd.read_csv(file) for file in files])
+
+    # 保存文件
+    df.to_csv(os.path.join(save_dir, f'{wind_no}.csv'), index=False, encoding='utf-8')
+
+
+if __name__ == '__main__':
+
+    wind_dicts = dict()
+
+    save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/整改复核数据/合并202506191654'
+
+    os.makedirs(save_dir, exist_ok=True)
+
+    for file in os.listdir(read_dir):
+        wind_no = file.split('(')[0]
+        if wind_no not in wind_dicts:
+            wind_dicts[wind_no] = [os.path.join(read_dir, file)]
+        else:
+            wind_dicts[wind_no].append(os.path.join(read_dir, file))
+
+    with multiprocessing.Pool(20) as pool:
+        pool.starmap(read_and_save, [(key, files, save_dir) for key, files in wind_dicts.items()])

+ 100 - 0
utils/tmp_util/整理INSERT到批量INSERT.py

@@ -0,0 +1,100 @@
+# coding=utf-8
+
+
+import re
+from collections import defaultdict
+
+import pymysql
+
+
+def read_sql_inserts(file_path):
+    """生成器函数,逐行读取INSERT语句"""
+    with open(file_path, 'r', encoding='utf-8') as f:
+        for line in f:
+            line = line.strip()
+            if line.startswith('INSERT INTO'):
+                yield line
+
+
+def process_large_sql_file(input_file, batch_size=10000):
+    table_data = defaultdict(lambda: {
+        'columns': None,
+        'value_rows': []
+    })
+
+    insert_pattern = re.compile(
+        r'INSERT\s+INTO\s+`?([a-zA-Z_][a-zA-Z0-9_]*)`?\s*\((.*?)\)\s*VALUES\s*\((.*?)\);',
+        re.IGNORECASE
+    )
+
+    # 使用生成器处理
+    for insert_stmt in read_sql_inserts(input_file):
+        match = insert_pattern.match(insert_stmt)
+        if match:
+            table_name = match.group(1)
+            columns = match.group(2)
+            values = match.group(3)
+
+            if table_data[table_name]['columns'] is None:
+                table_data[table_name]['columns'] = columns
+
+            table_data[table_name]['value_rows'].append(values)
+
+    # 生成批量INSERT语句
+    batch_inserts = {}
+    for table_name, data in table_data.items():
+        columns = data['columns']
+        value_rows = data['value_rows']
+
+        for i in range(0, len(value_rows), batch_size):
+            batch_values = value_rows[i:i + batch_size]
+            batch_insert = f"INSERT INTO `{table_name}` ({columns}) VALUES\n"
+            batch_insert += ",\n".join([f"({values})" for values in batch_values])
+            batch_insert += ";"
+
+            if table_name not in batch_inserts:
+                batch_inserts[table_name] = []
+            batch_inserts[table_name].append(batch_insert)
+
+    return batch_inserts
+
+
+def execute_batch_inserts(db_config, batch_inserts):
+    """直接执行批量INSERT到数据库"""
+    connection = pymysql.connect(**db_config)
+    try:
+        with connection.cursor() as cursor:
+            for table_name, inserts in batch_inserts.items():
+                for index, insert_sql in enumerate(inserts):
+                    cursor.execute(insert_sql)
+                    print(f"表 {table_name},共 {len(inserts)} 个, 第 {index + 1} 个批量INSERT语句执行成功")
+        connection.commit()
+    finally:
+        connection.close()
+
+
+# 数据库配置
+db_config = {
+    'host': '192.168.50.235',
+    'user': 'root',
+    'password': 'admin123456',
+    'db': 'wtlivedb_1',
+    'charset': 'utf8mb4'
+}
+
+"""
+移除INSERT 语句 其他的就是建表语句了
+cat file |grep -v 'INSERT ' > create_talbe.sql
+下面是 INSERT 转化为  BATCH INSERT 的脚本
+"""
+
+if __name__ == "__main__":
+    input_file = "wtlivedb.sql"
+
+    # 使用
+    batch_inserts = process_large_sql_file("input.sql")
+    execute_batch_inserts(db_config, batch_inserts)
+
+    # 打印统计信息
+    for table_name, inserts in batch_inserts.items():
+        print(f"表 '{table_name}': {len(inserts)} 个批量INSERT语句")

+ 49 - 0
utils/tmp_util/表添加注释.py

@@ -0,0 +1,49 @@
+import os
+import sys
+
+env = 'tidbprod'
+if len(sys.argv) >= 2:
+    env = sys.argv[1]
+
+conf_path = os.path.abspath(__file__).split("energy-data-trans")[0] + f"/energy-data-trans/conf/etl_config_{env}.yaml"
+os.environ['ETL_CONF'] = conf_path
+os.environ['env'] = env
+
+from service.common_connect import trans, plt
+
+
+def get_all_tables():
+    query_sql = f"""
+    
+    SELECT 
+        t.TABLE_NAME
+    FROM
+        information_schema.`TABLES` t
+    WHERE
+        t.TABLE_SCHEMA = 'energy_data_prod'
+"""
+
+    return trans.execute(query_sql)
+
+
+def get_all_wind_company():
+    query_sql = "SELECT t.field_code,t.field_name FROM wind_field t where t.del_state = 0"
+    datas = plt.execute(query_sql)
+    result_dict = dict()
+    for data in datas:
+        result_dict[data['field_code']] = data['field_name']
+
+    return result_dict
+
+
+if __name__ == '__main__':
+    code_name_dict = get_all_wind_company()
+    tables = get_all_tables()
+    for table in tables:
+        table_name = table['TABLE_NAME']
+
+        if table_name.startswith('WOF'):
+            field_code = table_name.split('_')[0].split('-')[0]
+            if field_code in code_name_dict.keys():
+                update_sql = f"ALTER TABLE `{table_name}` COMMENT = '{code_name_dict[field_code]}'"
+                trans.execute(update_sql)