wzl vor 3 Tagen
Commit
9ce5ce0ed1

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 6 - 0
.idea/git_toolbox_blame.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="GitToolBoxBlameSettings">
+    <option name="version" value="2" />
+  </component>
+</project>

+ 9 - 0
.idea/misc.xml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Black">
+    <option name="sdkName" value="Python 3.12 (whole_dep)" />
+  </component>
+  <component name="ProjectRootManager">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/ty_zgh_data.iml" filepath="$PROJECT_DIR$/ty_zgh_data.iml" />
+    </modules>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 40 - 0
add_or_remove_partition.py

@@ -0,0 +1,40 @@
+import datetime
+
+from dateutil.relativedelta import relativedelta
+
+from service import trans_service
+from utils.log.trans_log import logger
+
+if __name__ == '__main__':
+
+    datas = trans_service.get_all_partitioned_tables()
+
+    now_month = datetime.datetime.now()
+    next_month = now_month + relativedelta(months=1)
+    pname = f'p{str(now_month.year) + str(now_month.month).zfill(2)}'
+    date_str = f'{str(next_month.year)}-{str(next_month.month).zfill(2)}-01'
+    for data in datas:
+        trans_service.add_partition(data['TABLE_NAME'], pname, date_str)
+
+    logger.info("添加分区成功")
+
+    save_month_dict = trans_service.get_sys_conf('online_data_save_month')
+
+    table_types = set()
+
+    for data in datas:
+        table_name = data['TABLE_NAME']
+        wind_factory = table_name.split('_')[0]
+        table_type = table_name.split('_')[1]
+        table_types.add(table_type)
+        del_month = save_month_dict.get(table_type, 12)
+        pmonth = datetime.datetime.now() - relativedelta(months=del_month + 1)
+        trans_service.delelet_partition(table_name, pmonth)
+
+    for table_type in table_types:
+        del_month = save_month_dict.get(table_type, 12)
+        pmonth = datetime.datetime.now() - relativedelta(months=del_month)
+        exists_date = f'{pmonth.year}-{pmonth.month}-01'
+        trans_service.update_expired_data(table_type,exists_date)
+
+    logger.info("删除过期分区成功")

BIN
conf/2404_表数据.xlsx


BIN
conf/2405_表数据.xlsx


BIN
conf/2406_表数据.xlsx


+ 27 - 0
conf/config.yaml

@@ -0,0 +1,27 @@
+#plt:
+#  host: 192.168.50.233
+#  port: 3306
+#  user: admin
+#  password: admin123456
+#  database: energy_prod
+
+plt:
+  host: 192.168.50.235
+  port: 4000
+  user: root
+  password: '123456'
+  database: energy
+
+
+trans:
+  host: 192.168.50.241
+  port: 4000
+  user: root
+  password: '123456'
+  database: energy_data
+
+
+# 日志保存路径
+log_path_dir: /home/trans/project/logs/104_parse
+
+data_base_dir: /home/trans/data

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
conf/mc_vesion.json


Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
conf/scada.json


Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
conf/warn_fault.json


Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
conf/warn_fault_mc_code.json


+ 358 - 0
get_db_data.py

@@ -0,0 +1,358 @@
+import json
+import logging.handlers
+import os.path
+import traceback
+from datetime import datetime, timedelta
+from typing import Dict, List, Tuple
+
+import pandas as pd
+from sqlalchemy import create_engine, text
+from sqlalchemy.engine import Engine
+
+
+# 日志配置函数化,避免全局副作用
+def setup_logger():
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # 避免重复添加handler
+    if logger.handlers:
+        return logger
+
+    log_path = "/home/caiji/project/logs/"
+    os.makedirs(log_path, exist_ok=True)
+
+    # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出)
+    all_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "info.log"), 'D', backupCount=90
+    )
+
+    # 2. 创建并配置ERROR级别的TimedRotatingFileHandler
+    error_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "error.log"), 'D', backupCount=90
+    )
+    error_log_handler.setLevel(logging.ERROR)
+
+    # 3. 创建并配置StreamHandler(控制台输出)
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(logging.INFO)
+
+    # 定义日志格式
+    fmt = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
+        "%Y-%m-%d %H:%M:%S"
+    )
+
+    # 设置各个handler的格式
+    all_log_handler.setFormatter(fmt)
+    error_log_handler.setFormatter(fmt)
+    console_handler.setFormatter(fmt)
+
+    # 添加handler
+    logger.addHandler(all_log_handler)
+    logger.addHandler(error_log_handler)
+    logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = setup_logger()
+
+
+class DatabaseConnector:
+    """数据库连接管理器"""
+
+    _engines: Dict[str, Engine] = {}
+
+    @classmethod
+    def get_engine(cls, host='172.21.6.37', port=3306, user='envision', pwd='envision', db='envision') -> Engine:
+        """获取或创建数据库引擎"""
+        key = f"{host}:{port}:{user}:{db}"
+
+        if key not in cls._engines:
+            connection_string = f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}'
+            cls._engines[key] = create_engine(
+                connection_string,
+                pool_size=10,  # 增加连接池大小
+                max_overflow=20,
+                pool_pre_ping=True,  # 连接前ping检查
+                pool_recycle=600,  # 1小时回收连接
+                echo=False  # 生产环境设为False
+            )
+            logger.info(f"Created new engine for {key}")
+
+        return cls._engines[key]
+
+
+def generate_scada_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
+    """生成SCADA数据查询结构"""
+    try:
+        # 一次性读取所有相关文件
+        df_2404 = pd.read_excel("conf/2404_表数据.xlsx")
+        df_2405 = pd.read_excel("conf/2405_表数据.xlsx")
+        df_scada = pd.concat([df_2404, df_2405], ignore_index=True)
+
+        # 使用groupby提高效率
+        grouped = df_scada.groupby(['场站标准化编号', '风机号', '历史采样表名'])
+
+        sql_data = {}
+        for (wind_farm_code, wind_turbine_name, table), group in grouped:
+            # 生成字段映射
+            columns = [f' {row["历史采样域名"]} as {row["标准化英文"]}'
+                       for _, row in group.iterrows()]
+
+            wind_turbine_name = str(wind_turbine_name)
+
+            # 更新数据结构
+            if wind_farm_code not in sql_data:
+                sql_data[wind_farm_code] = {}
+            if wind_turbine_name not in sql_data[wind_farm_code]:
+                sql_data[wind_farm_code][wind_turbine_name] = {}
+
+            sql_data[wind_farm_code][wind_turbine_name][table] = columns
+
+        logger.info(f"Generated SCADA SQL for {len(sql_data)} wind farms")
+        return sql_data
+
+    except Exception as e:
+        logger.error(f"Error generating SCADA SQL: {e}")
+        raise
+
+
+def generate_warn_fault_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
+    """生成告警故障数据查询结构"""
+    try:
+        df_2406 = pd.read_excel('conf/2406_表数据.xlsx')
+        df_2406 = df_2406[df_2406['场站标准化编号'] != 'WOF35800080']
+
+        # 使用groupby优化
+        grouped = df_2406.groupby(['场站标准化编号', '风机号', '历史采样表名'])
+
+        warn_fault_data = {}
+        for (wind_farm_code, wind_turbine_name, table), group in grouped:
+            # 生成字段列表
+            columns = [f'{row["历史采样域名"]} as fault_code'
+                       for _, row in group.iterrows()]
+
+            wind_turbine_name = str(wind_turbine_name)
+
+            if wind_farm_code not in warn_fault_data:
+                warn_fault_data[wind_farm_code] = {}
+            if wind_turbine_name not in warn_fault_data[wind_farm_code]:
+                warn_fault_data[wind_farm_code][wind_turbine_name] = {}
+
+            warn_fault_data[wind_farm_code][wind_turbine_name][table] = columns
+
+        logger.info(f"Generated warning/fault SQL for {len(warn_fault_data)} wind farms")
+        return warn_fault_data
+
+    except Exception as e:
+        logger.error(f"Error generating warn/fault SQL: {e}")
+        raise
+
+
+def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame:
+    """高效合并数据框"""
+    if not dataframes:
+        return pd.DataFrame()
+
+    # 使用reduce进行合并
+    result = dataframes[0]
+    for df in dataframes[1:]:
+        result = pd.merge(
+            result,
+            df,
+            on=['wind_turbine_name', 'time_stamp'],
+            how='inner'  # 根据需求选择合并方式
+        )
+    return result
+
+
+def execute_query_batch(engine: Engine, queries: List[Tuple[str, Dict]]) -> List[pd.DataFrame]:
+    """批量执行查询"""
+    results = []
+
+    with engine.connect() as conn:
+        for sql, params in queries:
+            try:
+                df = pd.read_sql(
+                    text(sql),
+                    conn,
+                    params=params
+                )
+                results.append(df)
+            except Exception as e:
+                logger.error(f"Query failed: {sql[:100]}... Error: {e}")
+                # 返回空DataFrame避免中断
+                results.append(pd.DataFrame())
+
+    return results
+
+
+def process_wind_turbine_batch(
+        engine: Engine,
+        wind_farm_code: str,
+        wind_turbine_name: str,
+        points_data: Dict[str, List[str]],
+        begin_time: datetime,
+        end_time: datetime,
+        query_type: str
+) -> bool:
+    """处理单个风机的数据查询和保存"""
+    try:
+        begin_time_str = begin_time.strftime('%Y-%m-%d %H:%M:%S')
+        end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
+
+        # 批量构建查询
+        queries = []
+        for table, cols in points_data.items():
+            if not cols:  # 跳过空列
+                continue
+
+            sql = f"""
+            SELECT 
+                :wind_turbine_name as wind_turbine_name,
+                occur_time as time_stamp,
+                {', '.join(cols)}
+            FROM {table}
+            WHERE occur_time >= :begin_time AND occur_time < :end_time
+            """
+
+            queries.append((sql, {
+                'wind_turbine_name': wind_turbine_name,
+                'begin_time': begin_time_str,
+                'end_time': end_time_str
+            }))
+
+        # 批量执行查询
+        dfs = execute_query_batch(engine, queries)
+
+        # 合并数据
+        merged_df = merge_dataframes(dfs)
+
+        if merged_df.empty:
+            logger.warning(f"No data found for {wind_farm_code}_{wind_turbine_name}")
+            return False
+
+        # 保存数据
+        save_path = os.path.join("/home/caiji/data", query_type)
+        os.makedirs(save_path, exist_ok=True)
+
+        file_name = f"{wind_farm_code}_{wind_turbine_name}_{begin_time.strftime('%Y%m%d%H%M%S')}.csv"
+        file_path = os.path.join(save_path, file_name)
+
+        # 使用parquet格式,更快更节省空间
+        merged_df.to_csv(file_path, index=False)
+
+        logger.info(f"Saved {len(merged_df)} rows to {file_path}")
+        return True
+
+    except Exception as e:
+        logger.error(f"Error processing {wind_farm_code}_{wind_turbine_name}: {e}")
+        logger.error(traceback.format_exc(), exc_info=True)
+        return False
+
+
+def read_and_save_file_optimized(query_dict: Dict, query_type: str, begin_time: datetime, end_time: datetime):
+    # 创建输出目录
+    output_dir = os.path.join("/home/caiji/data", query_type)
+    os.makedirs(output_dir, exist_ok=True)
+
+    # 获取数据库引擎
+    engine = DatabaseConnector.get_engine()
+
+    success_count = 0
+    failed_count = 0
+    for wind_farm_code, wind_turbine_data in query_dict.items():
+        for wind_turbine_name, points_data in wind_turbine_data.items():
+            return_data = process_wind_turbine_batch(engine, wind_farm_code, wind_turbine_name,
+                                                     points_data, begin_time, end_time, query_type)
+
+            if return_data:
+                success_count += 1
+            else:
+                failed_count += 1
+    logger.info(f"Processing completed. Success: {success_count}, Failed: {failed_count}")
+
+
+def main(begin_time: datetime, end_time: datetime):
+    """主函数"""
+    try:
+        #     # 生成查询结构
+        # scada_data = generate_scada_sql()
+        # warn_fault_data = generate_warn_fault_sql()
+        #
+        # with open('conf/scada.json', 'w') as f:
+        #     f.write(json.dumps(scada_data))
+        #
+        # with open('conf/warn_fault.json', 'w') as f:
+        #     f.write(json.dumps(warn_fault_data))
+
+        with open('conf/scada.json', 'r') as f:
+            scada_data = json.load(f)
+
+        with open('conf/warn_fault.json', 'r') as f:
+            warn_fault_data = json.load(f)
+
+        # 处理SCADA数据
+        logger.info("Starting SCADA data processing")
+        scada_results = read_and_save_file_optimized(
+            scada_data,
+            'scada',
+            begin_time,
+            end_time
+        )
+
+        logger.info(str(scada_results))
+        logger.info("Finished SCADA data processing")
+
+        # 处理告警故障数据
+        logger.info("Starting warn/fault data processing")
+        warn_results = read_and_save_file_optimized(
+            warn_fault_data,
+            'warn_fault',
+            begin_time,
+            end_time
+        )
+
+        logger.info(str(warn_results))
+        logger.info("Finished warn/fault data processing")
+        logger.info("All processing completed")
+
+    except Exception as e:
+        logger.error(traceback.format_exc(), exc_info=True)
+        logger.error(f"Main process failed: {e}", exc_info=True)
+        raise
+
+
+def parse_time_args() -> Tuple[datetime, datetime]:
+    import sys
+    now = datetime.now()
+
+    default_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
+    default_end = default_start + timedelta(days=1)
+    logger.info(sys.argv)
+    # 解析命令行参数
+    if len(sys.argv) >= 3:
+        try:
+            # 假设传入的是时间字符串,例如:"2024-01-01 12:00:00"
+            start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
+            end_time = datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
+        except ValueError:
+            logger.error("时间格式错误,请使用格式:YYYY-MM-DD HH:MM:SS")
+            # 如果解析失败,使用默认值
+            start_time = default_start
+            end_time = default_end
+    else:
+        # 没有足够参数,使用默认值
+        start_time = default_start
+        end_time = default_end
+
+    return start_time, end_time
+
+
+if __name__ == '__main__':
+    start_time, end_time = parse_time_args()
+    print(start_time)
+    logger.info(f'{start_time}___{end_time}')
+    main(start_time, end_time)

+ 415 - 0
get_db_data_bak.py

@@ -0,0 +1,415 @@
+import concurrent.futures
+import json
+import logging.handlers
+import os.path
+import traceback
+from datetime import datetime, timedelta
+from typing import Dict, List, Tuple, Any
+
+import pandas as pd
+from sqlalchemy import create_engine, text
+from sqlalchemy.engine import Engine
+
+
+# 日志配置函数化,避免全局副作用
+def setup_logger():
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # 避免重复添加handler
+    if logger.handlers:
+        return logger
+
+    log_path = "/home/caiji/project/logs/"
+    os.makedirs(log_path, exist_ok=True)
+
+    # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出)
+    all_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "info.log"), 'D', backupCount=90
+    )
+
+    # 2. 创建并配置ERROR级别的TimedRotatingFileHandler
+    error_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "error.log"), 'D', backupCount=90
+    )
+    error_log_handler.setLevel(logging.ERROR)
+
+    # 3. 创建并配置StreamHandler(控制台输出)
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(logging.INFO)
+
+    # 定义日志格式
+    fmt = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
+        "%Y-%m-%d %H:%M:%S"
+    )
+
+    # 设置各个handler的格式
+    all_log_handler.setFormatter(fmt)
+    error_log_handler.setFormatter(fmt)
+    console_handler.setFormatter(fmt)
+
+    # 添加handler
+    logger.addHandler(all_log_handler)
+    logger.addHandler(error_log_handler)
+    logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = setup_logger()
+
+
+class DatabaseConnector:
+    """数据库连接管理器"""
+
+    _engines: Dict[str, Engine] = {}
+
+    @classmethod
+    def get_engine(cls, host='172.21.6.37', port=3306, user='envision', pwd='envision', db='envision') -> Engine:
+        """获取或创建数据库引擎"""
+        key = f"{host}:{port}:{user}:{db}"
+
+        if key not in cls._engines:
+            connection_string = f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}'
+            cls._engines[key] = create_engine(
+                connection_string,
+                pool_size=10,  # 增加连接池大小
+                max_overflow=20,
+                pool_pre_ping=True,  # 连接前ping检查
+                pool_recycle=600,  # 1小时回收连接
+                echo=False  # 生产环境设为False
+            )
+            logger.info(f"Created new engine for {key}")
+
+        return cls._engines[key]
+
+
+def generate_scada_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
+    """生成SCADA数据查询结构"""
+    try:
+        # 一次性读取所有相关文件
+        df_2404 = pd.read_excel("conf/2404_表数据.xlsx")
+        df_2405 = pd.read_excel("conf/2405_表数据.xlsx")
+        df_scada = pd.concat([df_2404, df_2405], ignore_index=True)
+
+        # 使用groupby提高效率
+        grouped = df_scada.groupby(['场站标准化编号', '风机号', '历史采样表名'])
+
+        sql_data = {}
+        for (wind_farm_code, wind_turbine_name, table), group in grouped:
+            # 生成字段映射
+            columns = [f' {row["历史采样域名"]} as {row["标准化英文"]}'
+                       for _, row in group.iterrows()]
+
+            wind_turbine_name = str(wind_turbine_name)
+
+            # 更新数据结构
+            if wind_farm_code not in sql_data:
+                sql_data[wind_farm_code] = {}
+            if wind_turbine_name not in sql_data[wind_farm_code]:
+                sql_data[wind_farm_code][wind_turbine_name] = {}
+
+            sql_data[wind_farm_code][wind_turbine_name][table] = columns
+
+        logger.info(f"Generated SCADA SQL for {len(sql_data)} wind farms")
+        return sql_data
+
+    except Exception as e:
+        logger.error(f"Error generating SCADA SQL: {e}")
+        raise
+
+
+def generate_warn_fault_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
+    """生成告警故障数据查询结构"""
+    try:
+        df_2406 = pd.read_excel('conf/2406_表数据.xlsx')
+        df_2406 = df_2406[df_2406['场站标准化编号'] != 'WOF35800080']
+
+        # 使用groupby优化
+        grouped = df_2406.groupby(['场站标准化编号', '风机号', '历史采样表名'])
+
+        warn_fault_data = {}
+        for (wind_farm_code, wind_turbine_name, table), group in grouped:
+            # 生成字段列表
+            columns = [f'{row["历史采样域名"]} as fault_code'
+                       for _, row in group.iterrows()]
+
+            wind_turbine_name = str(wind_turbine_name)
+
+            if wind_farm_code not in warn_fault_data:
+                warn_fault_data[wind_farm_code] = {}
+            if wind_turbine_name not in warn_fault_data[wind_farm_code]:
+                warn_fault_data[wind_farm_code][wind_turbine_name] = {}
+
+            warn_fault_data[wind_farm_code][wind_turbine_name][table] = columns
+
+        logger.info(f"Generated warning/fault SQL for {len(warn_fault_data)} wind farms")
+        return warn_fault_data
+
+    except Exception as e:
+        logger.error(f"Error generating warn/fault SQL: {e}")
+        raise
+
+
+def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame:
+    """高效合并数据框"""
+    if not dataframes:
+        return pd.DataFrame()
+
+    # 使用reduce进行合并
+    result = dataframes[0]
+    for df in dataframes[1:]:
+        result = pd.merge(
+            result,
+            df,
+            on=['wind_farm_code', 'wind_turbine_name', 'time_stamp'],
+            how='outer'  # 根据需求选择合并方式
+        )
+    return result
+
+
+def execute_query_batch(engine: Engine, queries: List[Tuple[str, Dict]]) -> List[pd.DataFrame]:
+    """批量执行查询"""
+    results = []
+
+    with engine.connect() as conn:
+        for sql, params in queries:
+            try:
+                df = pd.read_sql(
+                    text(sql),
+                    conn,
+                    params=params
+                )
+                results.append(df)
+            except Exception as e:
+                logger.error(f"Query failed: {sql[:100]}... Error: {e}")
+                # 返回空DataFrame避免中断
+                results.append(pd.DataFrame())
+
+    return results
+
+
+def process_wind_turbine_batch(
+        engine: Engine,
+        wind_farm_code: str,
+        wind_turbine_name: str,
+        points_data: Dict[str, List[str]],
+        begin_time: datetime,
+        end_time: datetime,
+        query_type: str
+) -> bool:
+    """处理单个风机的数据查询和保存"""
+    try:
+        begin_time_str = begin_time.strftime('%Y-%m-%d %H:%M:%S')
+        end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
+
+        # 批量构建查询
+        queries = []
+        for table, cols in points_data.items():
+            if not cols:  # 跳过空列
+                continue
+
+            sql = f"""
+            SELECT 
+                :wind_turbine_name as wind_turbine_name,
+                occur_time as time_stamp,
+                {', '.join(cols)}
+            FROM {table}
+            WHERE occur_time >= :begin_time AND occur_time < :end_time
+            """
+
+            queries.append((sql, {
+                'wind_turbine_name': wind_turbine_name,
+                'begin_time': begin_time_str,
+                'end_time': end_time_str
+            }))
+
+        # 批量执行查询
+        dfs = execute_query_batch(engine, queries)
+
+        # 合并数据
+        merged_df = merge_dataframes(dfs)
+
+        if merged_df.empty:
+            logger.warning(f"No data found for {wind_farm_code}_{wind_turbine_name}")
+            return False
+
+        # 保存数据
+        save_path = os.path.join("/home/caiji/data", query_type)
+        os.makedirs(save_path, exist_ok=True)
+
+        file_name = f"{wind_farm_code}_{wind_turbine_name}_{begin_time.strftime('%Y%m%d%H%M%S')}.csv"
+        file_path = os.path.join(save_path, file_name)
+
+        # 使用parquet格式,更快更节省空间
+        merged_df.to_csv(file_path, index=False)
+
+        logger.info(f"Saved {len(merged_df)} rows to {file_path}")
+        return True
+
+    except Exception as e:
+        logger.error(f"Error processing {wind_farm_code}_{wind_turbine_name}: {e}")
+        return False
+
+
+def read_and_save_file_optimized(
+        query_dict: Dict,
+        query_type: str,
+        begin_time: datetime,
+        end_time: datetime,
+        max_workers: int = 5,
+        batch_size: int = 50
+) -> Dict[str, Any]:
+    """优化版的主处理函数,支持并行处理和批量查询"""
+
+    results = {
+        'total_turbines': 0,
+        'success': 0,
+        'failed': 0,
+        'total_rows': 0,
+        'errors': []
+    }
+
+    # 创建输出目录
+    output_dir = os.path.join("/home/caiji/data", query_type)
+    os.makedirs(output_dir, exist_ok=True)
+
+    # 获取数据库引擎
+    engine = DatabaseConnector.get_engine()
+
+    # 准备所有任务
+    tasks = []
+    for wind_farm_code, wind_turbine_data in query_dict.items():
+        for wind_turbine_name, points_data in wind_turbine_data.items():
+            tasks.append({
+                'wind_farm_code': wind_farm_code,
+                'wind_turbine_name': wind_turbine_name,
+                'points_data': points_data
+            })
+
+    results['total_turbines'] = len(tasks)
+    logger.info(f"Total turbines to process: {len(tasks)}")
+
+    # 批量处理
+    for i in range(0, len(tasks), batch_size):
+        batch = tasks[i:i + batch_size]
+        logger.info(f"Processing batch {i // batch_size + 1}/{(len(tasks) + batch_size - 1) // batch_size}")
+
+        # 使用线程池并行处理
+        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+            future_to_task = {}
+            for task in batch:
+                future = executor.submit(
+                    process_wind_turbine_batch,
+                    engine,
+                    task['wind_farm_code'],
+                    task['wind_turbine_name'],
+                    task['points_data'],
+                    begin_time,
+                    end_time,
+                    query_type
+                )
+                future_to_task[future] = task
+
+            # 收集结果
+            for future in concurrent.futures.as_completed(future_to_task):
+                task = future_to_task[future]
+                try:
+                    success = future.result()
+                    if success:
+                        results['success'] += 1
+                    else:
+                        results['failed'] += 1
+                except Exception as e:
+                    results['failed'] += 1
+                    error_msg = f"{task['wind_farm_code']}_{task['wind_turbine_name']}: {str(e)}"
+                    results['errors'].append(error_msg)
+                    logger.error(f"Task failed: {error_msg}")
+
+    logger.info(f"Processing completed. Success: {results['success']}, Failed: {results['failed']}")
+    return results
+
+
+def main(begin_time: datetime, end_time: datetime):
+    """主函数"""
+    try:
+        #     # 生成查询结构
+        # scada_data = generate_scada_sql()
+        # warn_fault_data = generate_warn_fault_sql()
+        #
+        # with open('conf/scada.json', 'w') as f:
+        #     f.write(json.dumps(scada_data))
+        #
+        # with open('conf/warn_fault.json', 'w') as f:
+        #     f.write(json.dumps(warn_fault_data))
+
+        with open('conf/scada.json', 'r') as f:
+            scada_data = json.load(f)
+
+        with open('conf/warn_fault.json', 'r') as f:
+            warn_fault_data = json.load(f)
+
+        # 处理SCADA数据
+        logger.info("Starting SCADA data processing")
+        scada_results = read_and_save_file_optimized(
+            scada_data,
+            'scada',
+            begin_time,
+            end_time,
+            max_workers=5  # 根据数据库负载调整
+        )
+
+        logger.info(str(scada_results))
+        logger.info("Finished SCADA data processing")
+
+        # 处理告警故障数据
+        logger.info("Starting warn/fault data processing")
+        warn_results = read_and_save_file_optimized(
+            warn_fault_data,
+            'warn_fault',
+            begin_time,
+            end_time,
+            max_workers=5
+        )
+
+        logger.info(str(warn_results))
+        logger.info("Finished warn/fault data processing")
+        logger.info("All processing completed")
+
+    except Exception as e:
+        logger.error(traceback.format_exc(), exc_info=True)
+        logger.error(f"Main process failed: {e}", exc_info=True)
+        raise
+
+
+def parse_time_args() -> Tuple[datetime, datetime]:
+    import sys
+    now = datetime.now()
+
+    default_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
+    default_end = default_start + timedelta(days=1)
+
+    # 解析命令行参数
+    if len(sys.argv) >= 3:
+        try:
+            # 假设传入的是时间字符串,例如:"2024-01-01 12:00:00"
+            start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
+            end_time = datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
+        except ValueError:
+            logger.error("时间格式错误,请使用格式:YYYY-MM-DD HH:MM:SS")
+            # 如果解析失败,使用默认值
+            start_time = default_start
+            end_time = default_end
+    else:
+        # 没有足够参数,使用默认值
+        start_time = default_start
+        end_time = default_end
+
+    return start_time, end_time
+
+
+if __name__ == '__main__':
+    start_time, end_time = parse_time_args()
+    print(start_time, end_time)
+    main(start_time, end_time)

+ 303 - 0
run_data.py

@@ -0,0 +1,303 @@
+import json
+import multiprocessing
+import os
+import shutil
+import warnings
+
+from service import trans_service
+from service.plt_service import get_wind_info, get_all_wind_by_company_code
+from tool.ClassIdentifier import ClassIdentifier
+from utils.common_utils import read_file, get_all_files
+
+warnings.filterwarnings('ignore')
+
+from service.trans_service import *
+
+
+# def generate_warn_falut_code_maps(file_path):
+#     df = pd.read_excel(file_path)
+#     df['主控ID'] = df['主控ID'].str.strip()
+#     df['状态编码'] = df['状态编码'].astype(int)
+#     df['SC中文描述'] = df['SC中文描述'].str.strip()
+#     df['告警等级'] = df['告警等级'].str.strip()
+#     df['告警等级'] = df['告警等级'].fillna("告警信息")
+#
+#     result_map = dict()
+#     for index, row in df.iterrows():
+#         controller_id = row['主控ID'].strip()
+#         status_code = int(row['状态编码'])
+#         cn_des = row['SC中文描述']
+#         level = row['告警等级']
+#         if controller_id in result_map.keys():
+#             result_map[controller_id][status_code] = (cn_des, level)
+#         else:
+#             result_map[controller_id] = {status_code: (cn_des, level)}
+#
+#     return result_map
+#
+#
+# def generate_mc_version_maps(file_path):
+#     df = pd.read_excel(file_path)
+#
+#     mc_version_maps = dict()
+#     for _, data in df.iterrows():
+#         changzhan = data['场站标准化编号'].strip()
+#         wind_no = str(data['风机号'])
+#         mc_versioin = data['主控版本'].strip()
+#         if changzhan in mc_version_maps.keys():
+#             mc_version_maps[changzhan][wind_no] = mc_versioin
+#         else:
+#             mc_version_maps[changzhan] = {wind_no: mc_versioin}
+#
+#     return mc_version_maps
+
+
+def scada_read_and_save_db(file_path, wind_factory_map: dict):
+    try:
+        df = read_file(file_path)
+        if 'wind_farm_code' in df.columns:
+            del df['wind_farm_code']
+        wind_no = os.path.basename(file_path).split('_')[0]
+
+        df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
+        wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_no)
+        df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name'])
+        wind_turbine_number = df['wind_turbine_number'].unique()[0]
+
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
+        df.dropna(subset=['time_stamp'], inplace=True)
+        df.sort_values(by='time_stamp', inplace=True)
+        logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}")
+        power_df = df[df['active_power'] > 0]
+        logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}")
+        power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
+        logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}")
+
+        rated_power_and_cutout_speed_tuple = rated_power_and_cutout_speed_map.get(str(wind_turbine_number), None)
+        if rated_power_and_cutout_speed_tuple is None:
+            rated_power_and_cutout_speed_tuple = (None, None)
+
+        sec_df_origin = df.copy()
+        class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=sec_df_origin,
+                                            rated_power=rated_power_and_cutout_speed_tuple[0],
+                                            cut_out_speed=rated_power_and_cutout_speed_tuple[1])
+        sec_df = class_identifiler.run()
+        if not sec_df.empty:
+            sec_df['year'] = sec_df['time_stamp'].dt.year
+            sec_df['month'] = sec_df['time_stamp'].dt.month
+            sec_df['day'] = sec_df['time_stamp'].dt.day
+            sec_df['time_stamp'] = sec_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
+            sec_df['year_month'] = sec_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
+                                                                   axis=1)
+
+        min_df_origin = df[df['time_stamp'].dt.minute % 10 == 0]
+        class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=min_df_origin,
+                                            rated_power=rated_power_and_cutout_speed_tuple[0],
+                                            cut_out_speed=rated_power_and_cutout_speed_tuple[1])
+        min_df = class_identifiler.run()
+        if not min_df.empty:
+            min_df['year'] = min_df['time_stamp'].dt.year
+            min_df['month'] = min_df['time_stamp'].dt.month
+            min_df['day'] = min_df['time_stamp'].dt.day
+            min_df['time_stamp'] = min_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
+            min_df['year_month'] = min_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
+                                                                   axis=1)
+        save_df_to_db(f'{wind_no}_second', sec_df)
+        save_df_to_db(f'{wind_no}_minute', min_df)
+
+        update_day_count(sec_df, wind_no, wind_factory_map[wind_no], 'second')
+        update_day_count(min_df, wind_no, wind_factory_map[wind_no], 'minute')
+        os.remove(file_path)
+    except:
+        logger.error(f"{file_path}转化失败")
+        logger.error(traceback.format_exc(), exc_info=True)
+        error_path = file_path.replace('scada', 'scada_error')
+        os.makedirs(os.path.dirname(error_path), exist_ok=True)
+        shutil.move(file_path, error_path)
+    finally:
+        logger.info(f'scada/{os.path.basename(file_path)}执行结束')
+
+
+def generate_warn_fault_data(df):
+    df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+    # 识别成功状态
+    success_codes = {0, 309001}
+    df['Is_Success'] = df['fault_code'].isin(success_codes)
+
+    # 找到故障开始和结束时间
+    result = []
+    i = 0
+    n = len(df)
+
+    while i < n:
+        current_status = df.iloc[i]['fault_code']
+        current_time = df.iloc[i]['time_stamp']
+
+        # 如果是故障状态(非成功状态)
+        if current_status not in success_codes:
+            # 记录故障开始
+            fault_start = current_time
+            fault_status = current_status
+
+            # 跳过连续的相同故障状态
+            while i < n and df.iloc[i]['fault_code'] == fault_status:
+                i += 1
+
+            # 找到下一个成功状态的时间
+            success_time = None
+            fault_duration = None
+
+            # 寻找下一个成功状态
+            for j in range(i, n):
+                if df.iloc[j]['fault_code'] in success_codes:
+                    success_time = df.iloc[j]['time_stamp']
+                    fault_duration = int((success_time - fault_start).total_seconds())
+                    break
+
+            # 如果有成功时间,计算故障时长
+            if success_time:
+                # 添加到结果
+                result.append([
+                    df.iloc[0]['wind_turbine_name'],  # ID
+                    fault_start,  # 故障开始时间
+                    fault_status,  # 故障状态码
+                    success_time,  # 成功时间
+                    fault_duration  # 故障时长
+                ])
+            else:
+                # 如果没有成功时间(故障持续到最后)
+                result.append([
+                    df.iloc[0]['wind_turbine_name'],
+                    fault_start,
+                    fault_status,
+                    None,
+                    None
+                ])
+        else:
+            # 如果是成功状态,继续下一行
+            i += 1
+
+    # 创建结果DataFrame
+    result_df = pd.DataFrame(result, columns=['wind_turbine_name', 'begin_time', 'fault_code', 'end_time', 'time_diff'])
+    result_df['fault_code'] = result_df['fault_code'].astype(int)
+    return result_df
+
+
+def warn_fault_read_and_save_db(file_path, wind_factory_map: dict, mc_vesion_map: dict, warn_falut_code_map: dict):
+    try:
+        wind_farm_code = os.path.basename(file_path).split('_')[0]
+        wind_turbine_name = os.path.basename(file_path).split('_')[1].replace('.csv', '')
+        df = read_file(file_path)
+        col_map = {'wind_turbine_id': 'wind_turbine_name', 'timestamp': 'time_stamp', 'warn_fault_code': 'fault_code'}
+        df.rename(columns=col_map, inplace=True)
+
+        df['fault_code'] = df['fault_code'].astype(int, errors='ignore')
+        df = df.dropna(subset=['fault_code'])
+
+        df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
+        df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_factory_map).fillna(df['wind_turbine_name'])
+
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
+        df.dropna(subset=['time_stamp'], inplace=True)
+        df.sort_values(by='time_stamp', inplace=True)
+        df.reset_index(drop=True, inplace=True)
+
+        first_code = df.loc[0, 'fault_code']
+        data_time = df.loc[0, 'time_stamp']
+        if first_code in [0, 309001]:
+            trans_service.update_warn_fault_exist_data_with_db(wind_farm_code, wind_turbine_name, data_time)
+
+        wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_farm_code)
+
+        save_df = generate_warn_fault_data(df)
+        save_df['wind_farm_code'] = wind_farm_code
+        save_df['wind_farm_name'] = wind_factory_map[wind_farm_code]
+        save_df['wind_turbine_name'] = save_df['wind_turbine_name'].astype('str')
+        save_df['wind_turbine_number'] = save_df['wind_turbine_name'].map(wind_col_trans)
+        mc_vesion = mc_vesion_map[wind_farm_code][wind_turbine_name]
+        save_df['mc_version'] = mc_vesion
+
+        def get_fault_dict(x, index):
+            if x['mc_version'] not in warn_falut_code_map.keys():
+                return None
+
+            if x['fault_code'] not in warn_falut_code_map[x['mc_version']].keys():
+                return None
+            return warn_falut_code_map[x['mc_version']][x['fault_code']][index]
+
+        save_df['fault_detail'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1)
+        save_df['fault_level'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1)
+
+        select_cols = ['wind_turbine_number', 'wind_turbine_name', 'mc_version', 'begin_time', 'end_time', 'time_diff',
+                       'fault_code', 'fault_detail', 'fault_level']
+        warn_df = save_df[save_df['fault_level'] != '故障'][select_cols]
+        fault_df = save_df[save_df['fault_level'] == '故障'][select_cols]
+        warn_df.sort_values(by='begin_time', inplace=True)
+        fault_df.sort_values(by='begin_time', inplace=True)
+
+        if not warn_df.empty:
+            trans_service.save_df_to_db(f'{wind_farm_code}_warn', warn_df)
+            warn_max_date = warn_df['begin_time'].max()
+            add_date_str = warn_max_date.strftime('%Y-%m-%d')
+            warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S')
+            # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
+            trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
+                                                     add_date_str,
+                                                     'warn', warn_df.shape[0], warn_last_date_str, 1)
+
+        if not fault_df.empty:
+            trans_service.save_df_to_db(f'{wind_farm_code}_fault', fault_df)
+            fault_max_date = fault_df['begin_time'].max()
+            add_date_str = fault_max_date.strftime('%Y-%m-%d')
+            fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S')
+            trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
+                                                     add_date_str,
+                                                     'fault', fault_df.shape[0], fault_last_date_str, 1)
+
+        os.remove(file_path)
+    except:
+        logger.error(f"{file_path}转化失败")
+        logger.error(traceback.format_exc(), exc_info=True)
+        error_path = file_path.replace('warn_fault', 'warn_fault_error')
+        os.makedirs(os.path.dirname(error_path), exist_ok=True)
+        shutil.move(file_path, error_path)
+    finally:
+        logger.info(f'warn_fault/{os.path.basename(file_path)}执行结束')
+
+
+def update_day_count(df_data: pd.DataFrame, wind_farm_code: str, wind_name, data_type: str):
+    df = df_data[['wind_turbine_number', 'time_stamp']].copy()
+    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
+    df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d'))
+    res_df = df.groupby('time_stamp')['wind_turbine_number'].count().reset_index()
+    res_df.rename(columns={'time_stamp': 'add_date', 'wind_turbine_number': 'count'}, inplace=True)
+    res_df['latest_data_time'] = res_df['add_date']
+    res_df['sync_status'] = 1
+    res_df['wind_farm_code'] = wind_farm_code
+    res_df['wind_farm_name'] = wind_name
+    res_df['type'] = data_type
+    for index, data in res_df.iterrows():
+        update_wind_farm_day_count(data['wind_farm_code'], data['wind_farm_name'], data['add_date'], data['type'],
+                                   data['count'], data['latest_data_time'], sync_status=1)
+
+
+if __name__ == '__main__':
+    scada_dir = r'/home/trans/data/scada'
+    warn_fault_dir = r'/home/trans/data/warn_fault'
+    scada_files = get_all_files(scada_dir)
+    warn_fault_files = get_all_files(warn_fault_dir)
+
+    wind_factory_map = get_all_wind_by_company_code('COM00002')
+
+    with open('conf/mc_vesion.json', 'r') as f:
+        mc_vesion_map = json.load(f)
+
+    with open('conf/warn_fault_mc_code.json', 'r') as f:
+        warn_falut_code_map = json.load(f)
+
+    with multiprocessing.Pool(4) as pool:
+        pool.starmap(scada_read_and_save_db, [(i, wind_factory_map) for i in scada_files])
+
+    with multiprocessing.Pool(4) as pool:
+        pool.starmap(warn_fault_read_and_save_db,
+                     [(i, wind_factory_map, mc_vesion_map, warn_falut_code_map) for i in warn_fault_files])

+ 0 - 0
service/__init__.py


+ 5 - 0
service/common_connect.py

@@ -0,0 +1,5 @@
+from utils.db.ConnectMysql import ConnectMysql
+
+plt = ConnectMysql("plt")
+
+trans = ConnectMysql("trans")

+ 45 - 0
service/plt_service.py

@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮
+
+from service.common_connect import plt
+
+
+def get_wind_info(field_code, need_rated_param=True):
+    query_sql = """
+    SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed 
+    from wind_engine_group t LEFT JOIN wind_engine_mill a on t.mill_type_code  = a.mill_type_code 
+    where t.field_code = %s and t.del_state = 0
+    """
+    dict_datas = plt.execute(query_sql, (field_code,))
+    wind_result = dict()
+    power_result = dict()
+    for data in dict_datas:
+        wind_result[str(data['engine_name'])] = str(data['engine_code'])
+        if need_rated_param:
+            power_result[str(data['engine_code'])] = (
+                float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
+    return wind_result, power_result
+
+
+def get_all_wind():
+    query_sql = """
+    SELECT t.code_number,t.code_name from wind_relation t where t.type = 'field' 
+    """
+    return plt.execute(query_sql)
+
+
+def get_all_wind_by_company_code(company_code):
+    query_sql = f"""
+    SELECT t.code_number,t.code_name from wind_relation t where t.parent_code = '{company_code}' and t.type = 'field' 
+    """
+    datas = plt.execute(query_sql)
+    result = dict()
+    for data in datas:
+        result[data['code_number']] = str(data['code_name'])
+
+    return result
+
+
+if __name__ == '__main__':
+    print(get_all_wind_by_company_code('COM00002'))

+ 267 - 0
service/trans_service.py

@@ -0,0 +1,267 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮
+import traceback
+
+import pandas as pd
+
+from service.common_connect import trans
+from utils.log.trans_log import logger
+
+
+def create_tmp_table(table_name):
+    create_sql = f"""
+    CREATE TABLE `{table_name}` (
+  `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+  `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '风机原始名称',
+  `time_stamp` datetime NOT NULL COMMENT '时间戳',
+  `active_power` double DEFAULT NULL COMMENT '有功功率',
+  `rotor_speed` double DEFAULT NULL COMMENT '风轮转速',
+  `generator_speed` double DEFAULT NULL COMMENT '发电机转速',
+  `wind_velocity` double DEFAULT NULL COMMENT '风速',
+  `pitch_angle_blade_1` double DEFAULT NULL COMMENT '桨距角1',
+  `pitch_angle_blade_2` double DEFAULT NULL COMMENT '桨距角2',
+  `pitch_angle_blade_3` double DEFAULT NULL COMMENT '桨距角3',
+  `cabin_position` double DEFAULT NULL COMMENT '机舱位置',
+  `true_wind_direction` double DEFAULT NULL COMMENT '绝对风向',
+  `yaw_error1` double DEFAULT NULL COMMENT '对风角度',
+  `set_value_of_active_power` double DEFAULT NULL COMMENT '有功功率设定值',
+  `gearbox_oil_temperature` double DEFAULT NULL COMMENT '齿轮箱油温',
+  `generatordrive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机驱动端轴承温度',
+  `generatornon_drive_end_bearing_temperature` double DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
+  `cabin_temperature` double DEFAULT NULL COMMENT '机舱内温度',
+  `twisted_cable_angle` double DEFAULT NULL COMMENT '扭缆角度',
+  `front_back_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱前后振动',
+  `side_to_side_vibration_of_the_cabin` double DEFAULT NULL COMMENT '机舱左右振动',
+  `actual_torque` double DEFAULT NULL COMMENT '实际力矩',
+  `given_torque` double DEFAULT NULL COMMENT '给定力矩',
+  `clockwise_yaw_count` double DEFAULT NULL COMMENT '顺时针偏航次数',
+  `counterclockwise_yaw_count` double DEFAULT NULL COMMENT '逆时针偏航次数',
+  `unusable` double DEFAULT NULL COMMENT '不可利用',
+  `power_curve_available` double DEFAULT NULL COMMENT '功率曲线可用',
+  `required_gearbox_speed` double DEFAULT NULL COMMENT '齿轮箱转速',
+  `inverter_speed_master_control` double DEFAULT NULL COMMENT '变频器转速(主控)',
+  `outside_cabin_temperature` double DEFAULT NULL COMMENT '环境温度',
+  `main_bearing_temperature` double DEFAULT NULL COMMENT '主轴承轴承温度',
+  `gearbox_high_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
+  `gearboxmedium_speed_shaftbearing_temperature` double DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
+  `gearbox_low_speed_shaft_bearing_temperature` double DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
+  `generator_winding1_temperature` double DEFAULT NULL COMMENT '发电机绕组1温度',
+  `generator_winding2_temperature` double DEFAULT NULL COMMENT '发电机绕组2温度',
+  `generator_winding3_temperature` double DEFAULT NULL COMMENT '发电机绕组3温度',
+  `wind_turbine_status` double DEFAULT NULL COMMENT '风机状态1',
+  `wind_turbine_status2` double DEFAULT NULL COMMENT '风机状态2',
+  `turbulence_intensity` double DEFAULT NULL COMMENT '湍流强度'
+  )
+    """
+    trans.execute(create_sql)
+
+
+def boolean_table_exists(table_name):
+    table_sql = f"""
+    select count(1) as count from information_schema.tables where table_name = '{table_name}'
+    """
+    data = trans.execute(table_sql)[0]
+
+    if int(data['count']) == 0:
+        return False
+    return True
+
+
+def add_partition(table_name, pname, date_str):
+    try:
+        sql = f"""
+        ALTER TABLE {table_name} REORGANIZE PARTITION pmax INTO (
+            PARTITION {pname} VALUES LESS THAN ('{date_str}'),
+            PARTITION pmax VALUES LESS THAN (MAXVALUE)
+        );
+        """
+        trans.execute(sql)
+        logger.info(f"添加{table_name}分区{pname}成功")
+    except:
+        logger.error(traceback.format_exc())
+
+
+def delelet_partition(table_name, pmonth):
+    pname = f'p{str(pmonth.year) + str(pmonth.month).zfill(2)}'
+
+    exists_partition_sql = f"""
+    SELECT count(1) from INFORMATION_SCHEMA.`PARTITIONS` t  where t.TABLE_NAME = '{table_name}' and t.PARTITION_NAME = '{pname}'
+    """
+    data = trans.execute(exists_partition_sql)[0]
+    if data > 0:
+        del_sql = f"""
+        ALTER TABLE {table_name} DROP PARTITION {pname}
+        """
+        trans.execute(del_sql)
+        logger.info(f"删除{table_name}分区{pname}成功")
+
+    else:
+        logger.info(f"删除{table_name}分区{pname}不存在")
+
+
+def get_all_partitioned_tables() -> list:
+    all_tables_sql = """
+    SELECT t.TABLE_NAME FROM INFORMATION_SCHEMA.`TABLES` t where t.CREATE_OPTIONS = 'partitioned'
+    """
+    return trans.execute(all_tables_sql)
+
+
+def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=1000):
+    try:
+        if 'index' in df.columns:
+            del df['index']
+        trans.execute_df_save(df, table_name, batch_count)
+    except Exception as e:
+        logger.error(traceback.format_exc())
+        raise Exception(str(e))
+
+
+def load_data_local(table_name, df):
+    columns_str = 'wind_turbine_number,wind_turbine_name,time_stamp,active_power,rotor_speed,generator_speed,wind_velocity,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,cabin_position,true_wind_direction,yaw_error1,set_value_of_active_power,gearbox_oil_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,cabin_temperature,twisted_cable_angle,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin,actual_torque,given_torque,clockwise_yaw_count,counterclockwise_yaw_count,unusable,power_curve_available,required_gearbox_speed,inverter_speed_master_control,outside_cabin_temperature,main_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,generator_winding1_temperature,generator_winding2_temperature,generator_winding3_temperature,wind_turbine_status,wind_turbine_status2,turbulence_intensity,lab,year,month,day,year_month'
+    cols = columns_str.split(',')
+    print(cols)
+    df = df[cols]
+    # trans.execute_df_save(df, table_name, batch_count)
+    trans.safe_load_data_local(df, table_name)
+
+
+def drop_table(table_name):
+    drop_sql = f"DROP TABLE `{table_name}`"
+    try:
+        trans.execute(drop_sql)
+    except:
+        logger.error(traceback.format_exc())
+
+
+def get_yesterday_tables(yesterday):
+    query_sql = f"""
+    select * from wind_farm_day_count where add_date = '{yesterday}' and sync_status = 0
+    """
+    return trans.execute(query_sql)
+
+
+def update_sync(id):
+    update_sql = f"update wind_farm_day_count set sync_status = 1 where id = {id}"
+    trans.execute(update_sql)
+
+
+def update_wind_farm_day_count(wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time,
+                               sync_status=0):
+    select_sql = f"SELECT * from wind_farm_day_count  WHERE `wind_farm_code` = '{wind_farm_code}' and `add_date` = '{add_date}' and `type` = '{trans_type}' "
+    result = trans.execute(select_sql)
+    if result:
+        id = result[0]['id']
+        update_sql = f"update wind_farm_day_count set count = count + {count}, latest_data_time = '{latest_data_time}' where id = {id}"
+        trans.execute(update_sql)
+    else:
+        insert_sql = f"""
+        INSERT INTO `wind_farm_day_count` (
+        `wind_farm_code`,
+        `wind_farm_name`,
+        `add_date`,
+        `type`,
+        `count`,
+        `sync_status`,
+        `del_status`,
+        `latest_data_time`
+    )
+        VALUES
+        (
+            '{wind_farm_code}',
+            '{wind_farm_name}',
+            '{add_date}',
+            '{trans_type}',
+            '{count}',
+            '{sync_status}',
+            '0',
+            '{latest_data_time}'
+        )
+        """
+        trans.execute(insert_sql)
+
+
+def get_sys_conf_by_key(type, param_key, default_value=None):
+    sql = f"SELECT * from sys_conf t where t.type ='{type}'  and t.param_key = '{param_key}' and status = 1"
+    datas = trans.execute(sql)
+    if isinstance(datas, tuple):
+        return default_value
+    else:
+        return datas[0]['param_value']
+
+
+def get_sys_conf(type) -> dict:
+    sql = f"SELECT * from sys_conf t where t.type ='{type}' and status = 1"
+    datas = trans.execute(sql)
+    if isinstance(datas, tuple):
+        return {}
+    else:
+        result_dict = dict()
+        for data in datas:
+            result_dict[data['param_key']] = data['param_value']
+        return result_dict
+
+
+def read_data_from_table(table_name):
+    df = pd.read_sql_table(con=trans.get_engine(), table_name=table_name)
+    return df
+
+
+def update_warn_fault_exist_data(changzhan, seq_no, end_time):
+    updata_tables = [f'{changzhan}_warn', f'{changzhan}_fault']
+
+    for table_name in updata_tables:
+        update_sql = f"""
+        update {table_name} set end_time = '{end_time}', time_diff = TIMESTAMPDIFF(SECOND, begin_time,'{end_time}') 
+        where seq_no = {seq_no} and end_time is null
+        """
+        trans.execute(update_sql)
+        logger.info(f"更新{changzhan}故障顺序号{seq_no}成功")
+
+
+def update_warn_fault_exist_data_with_db(wind_no, wind_turbine_number, end_time):
+    updata_tables = [f'{wind_no}_warn', f'{wind_no}_fault']
+
+    for table_name in updata_tables:
+        update_sql = f"""
+        update {table_name} set end_time = '{end_time}', time_diff = TIMESTAMPDIFF(SECOND, begin_time,'{end_time}') 
+        where wind_turbine_number = {wind_turbine_number} and end_time is null
+        """
+        print(update_sql)
+        # trans.execute(update_sql)
+        logger.info(f"更新{wind_no}风机{wind_turbine_number}成功")
+
+
+def update_expired_data(table_type, exists_date):
+    update_expired_sql = f"""
+    update wind_farm_day_count set del_status = 1 where type = '{table_type}' and add_date < '{exists_date}'
+    """
+    trans.execute(update_expired_sql)
+    logger.info(f"删除类型{table_type},截止日期{exists_date}成功")
+
+
+def exists_windno_seq_fault(changzhan):
+    types = ['warn', 'fault']
+    result_dict = dict(list())
+    for type in types:
+        result_dict[type] = list()
+        query_sql = f"select * from {changzhan}_{type} where time_diff is null"
+        fault_datas = trans.execute(query_sql)
+        if isinstance(fault_datas, tuple):
+            result_dict[type] = []
+        else:
+            for data in fault_datas:
+                # t.wind_turbine_name,t.seq_no,t.fault_code
+                result_dict[type].append(f"{data['wind_turbine_name']}_{data['seq_no']}_{data['fault_code']}")
+
+    return result_dict
+
+
+def update_warn_fault_update_time(date_str, max_time):
+    sql = f"update wind_farm_day_count set update_time = '{max_time}' where add_date = '{date_str}' and type in ('warn','fault')"
+    trans.execute(sql)
+    logger.info(f"更新{date_str}的故障报警到当前时间")
+
+
+if __name__ == '__main__':
+    update_wind_farm_day_count('1', '1', '2025-04010', 100)

+ 359 - 0
tool/ClassIdentifier.py

@@ -0,0 +1,359 @@
+import datetime
+import traceback
+
+import numpy as np
+from pandas import DataFrame
+
+from utils.log.trans_log import logger
+
+
+class ClassIdentifier(object):
+    """
+    分类标识 -1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电
+    """
+
+    def __init__(self, wind_turbine_number="", origin_df: DataFrame = None,
+                 wind_velocity='wind_velocity',
+                 active_power='active_power',
+                 pitch_angle_blade='pitch_angle_blade_1',
+                 rated_power=1500, cut_out_speed=20):
+        """
+        :param origin_df: The pandas DataFrame containing the input data.
+        :param wind_velocity: 风速字段
+        :param active_power: 有功功率字段
+        :param pitch_angle_blade: 桨距角
+        :param rated_power: 额定功率
+        :param cut_out_speed: 切出风速
+        """
+        self.wind_turbine_number = wind_turbine_number
+        self.wind_velocity = wind_velocity
+        self.active_power = active_power
+        self.pitch_angle_blade = pitch_angle_blade
+        self.rated_power = rated_power  # 额定功率1500kw,可改为2000kw
+        self.cut_out_speed = cut_out_speed
+
+        if self.rated_power is None:
+            logger.info(f"{wind_turbine_number} WARNING:rated_power配置为空的")
+            self.rated_power = 1500
+
+        if self.cut_out_speed is None:
+            logger.info(f"{cut_out_speed} WARNING:cut_out_speed配置为空的")
+            self.cut_out_speed = 20
+
+        self.df = origin_df
+
+    def identifier(self):
+        # 风速 和 有功功率 df
+        # self.df = self.df[[self.wind_velocity, self.active_power, "pitch_angle_blade_1"]]
+        self.df.reset_index(inplace=True)
+
+        if len(self.df[self.active_power].unique()) <= 10:
+            self.df['lab'] = -1
+            return self.df
+
+        wind_and_power_df_count = self.df.shape[0]
+        power_max = self.df[self.active_power].max()
+        power_rated = np.ceil(power_max / 100) * 100
+        v_cut_out = self.cut_out_speed
+        # 网格法确定风速风向分区数量,功率方向分区数量,
+        power_bin_count = int(np.ceil(power_rated / 25))  # 功率分区间隔25kW
+        velocity_bin_count = int(np.ceil(v_cut_out / 0.25))  # 风速分区间隔0.25m/s
+
+        # 存储功率大于零的运行数据
+        power_gt_zero_array = np.zeros([wind_and_power_df_count, 2], dtype=float)
+        power_gt_zero_array_count = 0
+        for i in range(wind_and_power_df_count):
+            if self.df.loc[i, self.active_power] > 0:
+                power_gt_zero_array[power_gt_zero_array_count, 0] = self.df.loc[i, self.wind_velocity]
+                power_gt_zero_array[power_gt_zero_array_count, 1] = self.df.loc[i, self.active_power]
+
+                power_gt_zero_array_count = power_gt_zero_array_count + 1
+
+        # 统计各网格落入的散点个数
+        x_box_number = np.zeros([power_bin_count, velocity_bin_count], dtype=int)
+
+        n_which_p = -1
+        n_which_v = -1
+        for i in range(power_gt_zero_array_count):
+            for m in range(power_bin_count):
+                if m * 25 < power_gt_zero_array[i, 1] <= (m + 1) * 25:
+                    n_which_p = m
+                    break
+
+            for n in range(velocity_bin_count):
+                if (n * 0.25 + 0.125) < power_gt_zero_array[i, 0] <= ((n + 1) * 0.25 + 0.125):
+                    n_which_v = n
+                    break
+
+            if n_which_p > -1 and n_which_v > -1:
+                x_box_number[n_which_p, n_which_v] = x_box_number[n_which_p, n_which_v] + 1
+
+        # 在功率方向将网格内散点绝对个数转换为相对百分比,备用
+        power_box_percent = np.zeros([power_bin_count, velocity_bin_count], dtype=float)
+
+        # 功率方向统计
+        power_bin_sum = np.zeros(power_bin_count, dtype=int)
+
+        for i in range(power_bin_count):
+            power_bin_sum[i] = sum(x_box_number[i, :])
+            # for m in range(velocity_bin_count):
+            #     power_bin_sum[i] = power_bin_sum[i] + x_box_number[i, m]
+
+            for m in range(velocity_bin_count):
+                if power_bin_sum[i] > 0:
+                    power_box_percent[i, m] = x_box_number[i, m] / power_bin_sum[i] * 100
+
+        # 在风速方向将网格内散点绝对个数转换为相对百分比,备用
+        v_box_percent = np.zeros([power_bin_count, velocity_bin_count], dtype=float)
+        v_bin_sum = np.zeros(velocity_bin_count, dtype=int)
+
+        for i in range(velocity_bin_count):
+            v_bin_sum[i] = sum(x_box_number[:, i])
+            # for m in range(power_bin_count):
+            #     v_bin_sum[i] = v_bin_sum[i] + x_box_number[m, i]
+
+            for m in range(power_bin_count):
+                if v_bin_sum[i] > 0:
+                    v_box_percent[m, i] = x_box_number[m, i] / v_bin_sum[i] * 100
+
+        # 以水平功率带方向为准,分析每个水平功率带中,功率主带中心,即找百分比最大的网格位置。
+        p_box_max_index = np.zeros(power_bin_count, dtype=int)  # 水平功率带最大网格位置索引
+        p_box_max_p = np.zeros(power_bin_count, dtype=int)  # 水平功率带最大网格百分比
+
+        for m in range(power_bin_count):
+            # 确定每一水平功率带的最大网格位置索引即百分比值
+            p_box_max_p[m], p_box_max_index[m] = power_box_percent[m, :].max(), power_box_percent[m, :].argmax()
+
+        # 切入风速特殊处理,如果切入风速过于偏右,向左拉回
+        # todo 为什么第一行数据的索引值 > 14个就要往左拉回,还有是不是不叫切入风速,这个是 落入这个区间功率最多的个数的索引值
+        if p_box_max_index[0] > 14:
+            p_box_max_index[0] = 9
+
+        # 以水平功率带方向为基准,进行分析
+        dot_dense_left_right = np.zeros([power_bin_count, 2], dtype=int)  # 存储每一水平功率带的功率主带以最大网格为中心,向向左,向右扩展的网格数
+        dot_valve = 90  # 从中心向左右对称扩展网格的散点百分比和的阈值。
+
+        for i in range(power_bin_count - 6):  # 从最下层水平功率带1开始,向上到第PNum-6个水平功率带(额定功率一下水平功率带),逐一分析
+            p_dot_dense_sum = p_box_max_p[i]  # 以中心最大水平功率带为基准,向左向右对称扩展网格,累加各网格散点百分比
+            i_spread_right = 1
+            i_spread_left = 1
+            while p_dot_dense_sum < dot_valve:
+
+                if (p_box_max_index[i] + i_spread_right) < velocity_bin_count - 1:
+                    # 向右侧扩展
+                    p_dot_dense_sum = p_dot_dense_sum + power_box_percent[i, p_box_max_index[i] + i_spread_right]
+                    i_spread_right = i_spread_right + 1
+
+                if (p_box_max_index[i] + i_spread_right) > velocity_bin_count - 1:
+                    break
+
+                if (p_box_max_index[i] - i_spread_left) > 0:
+                    # 向左侧扩展
+                    p_dot_dense_sum = p_dot_dense_sum + power_box_percent[i, p_box_max_index[i] - i_spread_left]
+                    i_spread_left = i_spread_left + 1
+
+                if (p_box_max_index[i] - i_spread_left) <= 0:
+                    break
+
+            i_spread_right = i_spread_right - 1
+            i_spread_left = i_spread_left - 1
+            # 向左右对称扩展完毕
+
+            dot_dense_left_right[i, 0] = i_spread_left
+            dot_dense_left_right[i, 1] = i_spread_right
+
+        main_band_right = np.median(dot_dense_left_right[:, 1])
+
+        # 散点向右显著延展分布的水平功率带为限功率水平带
+        # 各水平功率带是否为限功率标识,==1:是;==0:不是
+        power_limit = np.zeros([power_bin_count, 1], dtype=int)
+        width_average = 0  # 功率主带平均宽度
+
+        # todo 限功率主带判别阈值为什么要加3
+        power_limit_valve = np.ceil(main_band_right) + 3  # 限功率主带判别阈值
+        n_counter = 0
+
+        for i in range(power_bin_count - 6):
+            # 如果向右扩展网格数大于阈值,且该水平功率带点总数>20,是限功率
+            if dot_dense_left_right[i, 1] > power_limit_valve and power_bin_sum[i] > 20:
+                power_limit[i] = 1
+
+            if dot_dense_left_right[i, 1] <= power_limit_valve:
+                # 统计正常水平功率带右侧宽度
+                width_average = width_average + dot_dense_left_right[i, 1]
+                n_counter = n_counter + 1
+
+        width_average = width_average / n_counter  # 功率主带平均宽度
+
+        # 对限负荷水平功率带的最大网格较下面相邻层显著偏右,拉回
+        for i in range(1, power_bin_count - 6):
+            if power_limit[i] == 1 and abs(p_box_max_index[i] - p_box_max_index[i - 1]) > 5:
+                p_box_max_index[i] = p_box_max_index[i - 1] + 1
+
+        # 功率主带的右边界
+        curve_width = int(np.ceil(width_average) + 2)
+
+        # 数据异常需要剔除的网格标识,标识1:功率主带右侧的欠发网格;2:功率主带左侧的超发网格 3:额定功率以上的超发点
+        b_box_remove = np.zeros([power_bin_count, velocity_bin_count], dtype=int)
+
+        for m in range(power_bin_count - 6):
+            for n in range(p_box_max_index[m] + curve_width, velocity_bin_count):
+                b_box_remove[m, n] = 1
+
+            for n in range(p_box_max_index[m] - curve_width, -1, -1):
+                b_box_remove[m, n] = 2
+
+        # 确定功率主带的左上拐点,即额定风速位置的网格索引
+        curve_top = np.zeros(2, dtype=int)
+        curve_top_valve = 3  # 网格的百分比阈值
+        b_top_find = False
+        for m in range(power_bin_count - 5, -1, -1):
+            for n in range(velocity_bin_count):
+                # 如左上角网格的百分比和散点个数大于阈值。
+                if v_box_percent[m, n] > curve_top_valve and x_box_number[m, n] >= 10:
+                    curve_top[0] = m
+                    curve_top[1] = n
+                    b_top_find = True
+                    break
+
+            if b_top_find:
+                break
+
+        isolate_valve = 3
+        for m in range(power_bin_count - 6):
+            for n in range(p_box_max_index[m] + curve_width, velocity_bin_count):
+                if power_box_percent[m, n] < isolate_valve:
+                    b_box_remove[m, n] = 1
+
+        # 功率主带顶部宽度
+        curve_width_t = 2
+        for m in range(power_bin_count - curve_width_t - 1, power_bin_count):
+            for n in range(velocity_bin_count):
+                b_box_remove[m, n] = 3  # 网格为额定功率以上的超发点
+
+        # 功率主带拐点左侧的欠发网格标识
+        for m in range(power_bin_count - 5 - 1, power_bin_count):
+            for n in range(curve_top[1] - 1):
+                b_box_remove[m, n] = 2
+
+        # 以网格的标识,决定该网格内数据的标识。dzwind_and_power_sel。散点在哪个网格,此网格的标识即为该点的标识
+        # -1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电
+        dzwind_and_power_sel = np.zeros(power_gt_zero_array_count, dtype=int)
+        n_which_p = -1
+        n_which_v = -1
+
+        for i in range(power_gt_zero_array_count):
+            for m in range(power_bin_count):
+                if m * 25 < power_gt_zero_array[i, 1] <= (m + 1) * 25:
+                    n_which_p = m
+                    break
+
+            for n in range(velocity_bin_count):
+                if (n * 0.25 + 0.125) < power_gt_zero_array[i, 0] <= ((n + 1) * 0.25 + 0.125):
+                    n_which_v = n
+                    break
+
+            if n_which_p > -1 and n_which_v > -1:
+                if b_box_remove[n_which_p, n_which_v] == 1:
+                    dzwind_and_power_sel[i] = 1
+
+                if b_box_remove[n_which_p, n_which_v] == 2:
+                    dzwind_and_power_sel[i] = 2
+
+                if b_box_remove[n_which_p, n_which_v] == 3:
+                    dzwind_and_power_sel[i] = 0  # 3  # 额定风速以上的超发功率点认为是正常点,不再标识。
+
+        # 限负荷数据标识方法2:把数据切割为若干个窗口。对每一窗口,以第一个点为基准,连续nWindowLength个数据的功率在方差范围内,呈现显著水平分布的点
+        n_window_length = 3
+        limit_window = np.zeros(n_window_length, dtype=float)
+        power_std = 15  # 功率波动方差
+        n_window_num = int(np.floor(power_gt_zero_array_count / n_window_length))
+        power_limit_up = self.rated_power - 300
+        power_limit_low = 200
+        for i in range(n_window_num):
+            for j in range(n_window_length):
+                limit_window[j] = power_gt_zero_array[i * n_window_length + j, 1]
+
+            b_all_in_areas = 1
+            for j in range(n_window_length):
+                if limit_window[j] < power_limit_low or limit_window[j] > power_limit_up:
+                    b_all_in_areas = 0
+
+            if b_all_in_areas == 0:
+                continue
+
+            up_limit = limit_window[0] + power_std
+            low_limit = limit_window[0] - power_std
+            b_all_in_up_low = 1
+            for j in range(1, n_window_length):
+                if limit_window[j] < low_limit or limit_window[j] > up_limit:
+                    b_all_in_up_low = 0
+
+            if b_all_in_up_low == 1:
+                for j in range(n_window_length):
+                    dzwind_and_power_sel[i * n_window_length + j] = 4  # 标识窗口内的数据为限负荷数据
+
+        for i in range(power_bin_count - 6):
+            pv_left_down = np.zeros(2, dtype=float)
+            pv_right_up = np.zeros(2, dtype=float)
+
+            if (p_box_max_index[i + 1] - p_box_max_index[i]) >= 1:
+                pv_left_down[0] = (p_box_max_index[i] + curve_width) * 0.25 + 0.125
+                pv_left_down[1] = i * 25
+
+                pv_right_up[0] = (p_box_max_index[i + 1] + curve_width) * 0.25 + 0.125
+                pv_right_up[1] = (i + 1) * 25
+
+                for m in range(power_gt_zero_array_count):
+                    if pv_left_down[0] < power_gt_zero_array[m, 0] < pv_right_up[0] and \
+                            pv_left_down[1] < power_gt_zero_array[m, 1] < pv_right_up[1]:  # 在该锯齿中
+                        if (power_gt_zero_array[m, 1] - pv_left_down[1]) / (
+                                power_gt_zero_array[m, 0] - pv_left_down[0]) > (
+                                pv_right_up[1] - pv_left_down[1]) / (
+                                pv_right_up[0] - pv_left_down[0]):  # 斜率大于对角连线,则在锯齿左上三角形中,选中
+                            dzwind_and_power_sel[m] = 0
+
+        self.df.loc[:, 'lab'] = -1
+        self.df.loc[
+            self.df[self.df[self.active_power] > 0].index, 'lab'] = dzwind_and_power_sel
+
+        # 把部分欠发的优化为限电
+        # 构建条件表达式
+        cond1 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.75) &
+                (self.df[self.pitch_angle_blade] > 0.5)
+        )
+        cond2 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.85) &
+                (self.df[self.pitch_angle_blade] > 1.5)
+        )
+        cond3 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.9) &
+                (self.df[self.pitch_angle_blade] > 2.5)
+        )
+
+        # 使用逻辑或操作符|合并条件
+        combined_condition = cond1 | cond2 | cond3
+        self.df.loc[combined_condition, 'lab'] = 4
+
+        self.df.loc[self.df[self.active_power] <= 0, 'lab'] = -1
+
+        self.df.reset_index(drop=True, inplace=True)
+        if 'index' in self.df.columns:
+            del self.df['index']
+        return self.df
+
+    def run(self):
+        # Implement your class identification logic here
+        begin = datetime.datetime.now()
+        logger.info(f"打标签开始,风机号:{self.wind_turbine_number},数量:{self.df.shape}")
+        try:
+            df = self.identifier()
+        except Exception as e:
+            logger.error(traceback.format_exc())
+            # message = str(e) + ',风机编号:' + self.wind_turbine_number
+            # raise Exception('打标签失败:' + message)
+            self.df.loc[:, 'lab'] = -999
+            return self.df
+        logger.info(f"打标签结束,{df.shape},耗时:{datetime.datetime.now() - begin}")
+        return df

+ 0 - 0
tool/__init__.py


+ 40 - 0
tool/add_or_remove_partition.py

@@ -0,0 +1,40 @@
+import datetime
+
+from dateutil.relativedelta import relativedelta
+
+from service import trans_service
+from utils.log.trans_log import logger
+
+if __name__ == '__main__':
+
+    datas = trans_service.get_all_partitioned_tables()
+
+    now_month = datetime.datetime.now()
+    next_month = now_month + relativedelta(months=1)
+    pname = f'p{str(now_month.year) + str(now_month.month).zfill(2)}'
+    date_str = f'{str(next_month.year)}-{str(next_month.month).zfill(2)}-01'
+    for data in datas:
+        trans_service.add_partition(data['TABLE_NAME'], pname, date_str)
+
+    logger.info("添加分区成功")
+
+    save_month_dict = trans_service.get_sys_conf('online_data_save_month')
+
+    table_types = set()
+
+    for data in datas:
+        table_name = data['TABLE_NAME']
+        wind_factory = table_name.split('_')[0]
+        table_type = table_name.split('_')[1]
+        table_types.add(table_type)
+        del_month = save_month_dict.get(table_type, 12)
+        pmonth = datetime.datetime.now() - relativedelta(months=del_month + 1)
+        trans_service.delelet_partition(table_name, pmonth)
+
+    for table_type in table_types:
+        del_month = save_month_dict.get(table_type, 12)
+        pmonth = datetime.datetime.now() - relativedelta(months=del_month)
+        exists_date = f'{pmonth.year}-{pmonth.month}-01'
+        trans_service.update_expired_data(table_type,exists_date)
+
+    logger.info("删除过期分区成功")

+ 9 - 0
ty_zgh_data.iml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
+    <exclude-output />
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="jdk" jdkName="Python 3.12 (whole_dep)" jdkType="Python SDK" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 3 - 0
utils/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/6
+# @Author  : 魏志亮

+ 173 - 0
utils/common_utils.py

@@ -0,0 +1,173 @@
+import os
+from datetime import datetime
+
+import chardet
+import pandas as pd
+from sqlalchemy import create_engine, text, inspect
+
+from utils.log.trans_log import logger
+
+
+def get_engine(host='192.168.50.235', port=30306, user='root', pwd='admin123456', db='datang'):
+    engine = create_engine(f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}')
+    return engine
+
+
+1
+
+
+def print_log(message: str = '', bool_print_log=False, pre_time=None):
+    log_now = datetime.now()
+    if bool_print_log:
+        if pre_time is None:
+            logger.info(message)
+        else:
+            logger.info('%s,耗时:%s', message, log_now - pre_time)
+    return log_now
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def df_save_file(df: pd.DataFrame = None, file_name: str = '', bool_add_time: bool = True, bool_print_log: bool = True):
+    if df is None:
+        raise Exception('df不能为None')
+
+    res_file_name = file_name
+    if bool_add_time:
+        now = datetime.now()
+        now_str = now.strftime('%Y%m%d%H%M%S')
+        res_file_name = os.path.join(os.path.dirname(file_name), now_str + '-' + os.path.basename(file_name))
+    log_now = print_log(f'开始保存df:{df.shape}到{res_file_name}', bool_print_log)
+    os.makedirs(os.path.dirname(file_name), exist_ok=True)
+    if file_name.endswith('csv'):
+        df.to_csv(res_file_name, index=False, encoding='utf8')
+    elif file_name.endswith('xls') or file_name.endswith('xlsx'):
+        df.to_excel(res_file_name, index=False)
+    elif file_name.endswith('parquet'):
+        df.to_parquet(res_file_name, index=False)
+    else:
+        raise Exception('需要添加映射')
+    print_log(f'完成保存df:{df.shape}到{res_file_name}', bool_print_log, pre_time=log_now)
+    return res_file_name
+
+
+def read_file(file_path, read_cols: list = None, header: int = 0,
+              nrows: int = None, bool_print_log: bool = True) -> pd.DataFrame:
+    # 参数验证
+    if not file_path or not isinstance(file_path, str):
+        raise ValueError("文件路径不能为空且必须是字符串")
+
+    if not os.path.exists(file_path):
+        raise FileNotFoundError(f"文件不存在: {file_path}")
+
+    file_ext = file_path.lower()
+    # 构建读取参数的通用字典
+    read_params = {'header': header}
+    if nrows is not None:
+        read_params['nrows'] = nrows
+
+    try:
+        log_now = print_log(f'开始读取文件: {file_path}', bool_print_log)
+
+        # 根据文件扩展名选择读取方式
+        if file_ext.endswith('.csv'):
+            encoding = detect_file_encoding(file_path)
+            read_params['encoding'] = encoding
+
+            if read_cols:
+                df = pd.read_csv(file_path, usecols=read_cols, **read_params)
+            else:
+                df = pd.read_csv(file_path, **read_params)
+
+        elif file_ext.endswith(('.xls', '.xlsx')):
+            if read_cols:
+                df = pd.read_excel(file_path, usecols=read_cols, **read_params)
+            else:
+                df = pd.read_excel(file_path, **read_params)
+
+        elif file_ext.endswith('.parquet'):
+            # parquet文件使用不同的参数名
+            parquet_params = {'columns': read_cols} if read_cols else {}
+            df = pd.read_parquet(file_path, **parquet_params)
+
+        elif file_ext.endswith('.json'):
+            if read_cols:
+                df = pd.read_json(file_path, **read_params)[read_cols]
+            else:
+                df = pd.read_json(file_path, **read_params)
+
+        else:
+            supported_formats = ['.csv', '.xls', '.xlsx', '.parquet', '.json']
+            raise ValueError(
+                f"不支持的文件格式: {os.path.splitext(file_path)[1]}\n"
+                f"支持的文件格式: {', '.join(supported_formats)}"
+            )
+
+        print_log(f'文件读取成功: {file_path}, 数据量: {df.shape}', bool_print_log, pre_time=log_now)
+        return df
+
+    except pd.errors.EmptyDataError:
+        print_log(f'文件为空: {file_path}', bool_print_log)
+        return pd.DataFrame()
+    except Exception as e:
+        print_log(f'读取文件失败: {file_path}, 错误: {str(e)}', bool_print_log)
+        raise
+
+
+def df_save_table(engine, df: pd.DataFrame, table_name, pre_save: str = None):
+    """
+    engine: 数据库引擎
+    df: DataFrame
+    table_name: 表名
+    pre_save: 删除表(DROP),清空表(TRUNCATE)或者不处理(None)
+    """
+    log_time = print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True)
+    if not pre_save is None:
+        with engine.connect() as conn:
+            # 检查表是否存在
+            inspector = inspect(engine)
+            if table_name in inspector.get_table_names():
+                # 删除表
+                conn.execute(text(f"{pre_save}  TABLE {table_name}"))
+                conn.commit()
+
+    df.to_sql(table_name, con=engine, if_exists='append', index=False)
+    print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True,
+              pre_time=log_time)
+
+
+from pathlib import Path
+
+
+def get_all_files(read_path: str, pre_filter: tuple = None):
+    result = list()
+    if os.path.isfile(read_path):
+        if pre_filter is None:
+            result.append(str(Path(read_path)))
+        else:
+            if read_path.split('.')[-1].endswith(pre_filter):
+                result.append(str(Path(read_path)))
+    else:
+        for root, dir, files in os.walk(read_path):
+            for file in files:
+                whole_path = os.path.join(root, file)
+                if pre_filter is None:
+                    result.append(str(Path(whole_path)))
+                else:
+                    if whole_path.split('.')[-1].endswith(pre_filter):
+                        result.append(str(Path(whole_path)))
+    return result

+ 3 - 0
utils/conf/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮

+ 22 - 0
utils/conf/read_conf.py

@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮
+
+import yaml
+
+
+def yaml_conf(path, encoding='utf-8'):
+    with open(path, 'r', encoding=encoding) as f:
+        data = yaml.safe_load(f)
+    return data
+
+
+def read_conf(dict_conf, col, default_value=None):
+    if col in dict_conf:
+        res = dict_conf[col]
+        if res is None and default_value is not None:
+            return default_value
+        return res
+    else:
+        return default_value
+

+ 109 - 0
utils/db/ConnectMysql.py

@@ -0,0 +1,109 @@
+import tempfile
+import time
+import traceback
+from os import *
+
+import pandas as pd
+import pymysql
+from pymysql.cursors import DictCursor
+from sqlalchemy import create_engine, text
+
+from utils.conf.read_conf import yaml_conf
+from utils.log.trans_log import logger
+
+
+class ConnectMysql:
+
+    def __init__(self, connet_name):
+        conf_path = path.abspath(__file__).split("utils")[0] + 'conf' + sep + 'config.yaml'
+        self.yaml_data = yaml_conf(conf_path)
+        self.connet_name = connet_name
+        self.config = self.yaml_data[self.connet_name]
+        self.database = self.config['database']
+
+    # 从连接池中获取一个连接
+    def get_conn(self):
+        return pymysql.connect(**self.config, local_infile=True)
+
+    # 使用连接执行sql
+    def execute(self, sql, params=tuple()):
+
+        with self.get_conn() as conn:
+            with conn.cursor(cursor=DictCursor) as cursor:
+                try:
+                    cursor.execute(sql, params)
+                    logger.info(f"开始执行SQL:{cursor._executed}")
+                    conn.commit()
+                    result = cursor.fetchall()
+                    return result
+                except Exception as e:
+                    logger.info(f"执行sql:{sql},报错:{e}")
+                    logger.info(traceback.format_exc())
+                    conn.rollback()
+                    raise e
+
+    def get_engine(self):
+        config = self.config
+        username = config['user']
+        password = config['password']
+        host = config['host']
+        port = config['port']
+        dbname = config['database']
+        return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}?local_infile=1')
+
+    def execute_df_save(self, df, table_name, batch_count=1000):
+        df.to_sql(table_name, self.get_engine(), index=False, if_exists='append', chunksize=batch_count)
+
+    def read_sql_to_df(self, sql):
+        df = pd.read_sql_query(sql, self.get_engine())
+        return df
+
+
+    def safe_load_data_local(self, df, table_name, batch_size=30000):
+        """
+        安全加载数据到TiDB,包含以下优化:
+        1. 分批处理避免内存溢出
+        2. 完善的连接管理
+        3. 错误处理和重试机制
+        """
+        total_rows = len(df)
+        success_rows = 0
+        engine = self.get_engine()
+        for i in range(0, total_rows, batch_size):
+            batch = df.iloc[i:i + batch_size]
+            retry_count = 0
+            max_retries = 4
+
+            while retry_count < max_retries:
+                try:
+                    with tempfile.NamedTemporaryFile(mode='w') as tmp:
+                        batch.to_csv(tmp, index=False, header=False, sep='\t')
+                        tmp.flush()
+
+                        with engine.begin() as conn:  # 自动提交事务
+                            # 设置当前会话内存配额
+                            conn.execute(text("SET tidb_mem_quota_query = 2147483648"))  # 2GB
+
+                            # 执行LOAD DATA
+                            conn.execute(text(f"""
+                                LOAD DATA LOCAL INFILE '{tmp.name}' 
+                                INTO TABLE {table_name} 
+                                FIELDS TERMINATED BY '\t'
+                                LINES TERMINATED BY '\n'
+                            """))
+
+                        success_rows += len(batch)
+                        logger.info(f"成功加载批次 {i // batch_size + 1}: {len(batch)} 行")
+                        break  # 成功则跳出重试循环
+
+                except Exception as e:
+                    retry_count += 1
+                    logger.info(f"批次 {i // batch_size + 1} 第 {retry_count} 次尝试失败: {str(e)}")
+                    if retry_count >= max_retries:
+                        logger.error(f"批次 {i // batch_size + 1} 达到最大重试次数")
+                        logger.error(traceback.format_exc())
+                        raise
+                    time.sleep(2 ** retry_count)  # 指数退避
+
+        logger.info(f"数据加载完成: 总计 {success_rows}/{total_rows} 行")
+        return success_rows

+ 3 - 0
utils/db/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮

+ 3 - 0
utils/log/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮

+ 55 - 0
utils/log/trans_log.py

@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮
+
+import logging
+import logging.handlers
+import os
+
+
+# 日志配置函数化,避免全局副作用
+def setup_logger(log_path: str = "/data/logs"):
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # 避免重复添加handler
+    if logger.handlers:
+        return logger
+
+    os.makedirs(log_path, exist_ok=True)
+
+    # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出)
+    all_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "info.log"), 'D', backupCount=90
+    )
+
+    # 2. 创建并配置ERROR级别的TimedRotatingFileHandler
+    error_log_handler = logging.handlers.TimedRotatingFileHandler(
+        os.path.join(log_path, "error.log"), 'D', backupCount=90
+    )
+    error_log_handler.setLevel(logging.ERROR)
+
+    # 3. 创建并配置StreamHandler(控制台输出)
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(logging.INFO)
+
+    # 定义日志格式
+    fmt = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
+        "%Y-%m-%d %H:%M:%S"
+    )
+
+    # 设置各个handler的格式
+    all_log_handler.setFormatter(fmt)
+    error_log_handler.setFormatter(fmt)
+    console_handler.setFormatter(fmt)
+
+    # 添加handler
+    logger.addHandler(all_log_handler)
+    logger.addHandler(error_log_handler)
+    logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = setup_logger("/home/trans/project/logs/")

+ 0 - 0
utils/systeminfo/__init__.py


+ 90 - 0
utils/systeminfo/sysinfo.py

@@ -0,0 +1,90 @@
+from os import *
+
+import psutil
+
+from utils.log.trans_log import logger
+
+
+def print_memory_usage(detail=""):
+    # 获取当前进程ID
+    pid = getpid()
+    # 获取进程信息
+    py = psutil.Process(pid)
+    # 获取内存信息
+    memory_info = py.memory_info()
+    # RSS (Resident Set Size) 是进程实际占用的物理内存大小
+    memory_usage_rss = memory_info.rss
+    # VMS (Virtual Memory Size) 是进程使用的虚拟内存大小
+    memory_usage_vms = memory_info.vms
+
+    # 将字节转换为更易读的单位
+    memory_usage_rss_mb = memory_usage_rss / (1024 ** 2)
+    memory_usage_vms_mb = memory_usage_vms / (1024 ** 2)
+
+    logger.info(f"{detail},Memory usage (RSS): {memory_usage_rss_mb:.2f} MB")
+    logger.info(f"{detail},Memory usage (VMS): {memory_usage_vms_mb:.2f} MB")
+
+
+def get_cpu_count():
+    return psutil.cpu_count()
+
+
+def get_available_cpu_count_with_percent(percent: float = 1):
+    cpu_count = get_cpu_count()
+    return int(cpu_count * percent)
+
+
+def get_file_size(file_path):
+    return path.getsize(file_path)
+
+
+def get_dir_size(dir_path):
+    return sum(get_file_size(path.join(dir_path, file)) for file in listdir(dir_path) if
+               path.isfile(path.join(dir_path, file)))
+
+
+def get_available_memory_with_percent(percent: float = 1):
+    memory_info = psutil.virtual_memory()
+    return int(memory_info.available * percent)
+
+
+def get_max_file_size(file_paths: list[str]):
+    max_size = 0
+    for file_path in file_paths:
+        file_size = get_file_size(file_path)
+        if file_size > max_size:
+            max_size = file_size
+    return max_size
+
+
+def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1 / 12, cpu_percent: float = 2 / 5):
+    max_file_size = get_max_file_size(file_paths)
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    if result == 0:
+        result = 1
+
+    if result > len(file_paths):
+        result = len(file_paths)
+
+    logger.info(f"总文件数:{len(file_paths)},获取最大文件大小:{str(round(max_file_size / 2 ** 20, 2))}M"
+                f"可用内存:{str(get_available_memory_with_percent(1) / 2 ** 20)}M"
+                f"总CPU数:{get_cpu_count()}CPU使用比例:{round(cpu_percent, 2)}"
+                f"CPU可用数量:{max_cpu_count},最终确定使用进程数:{result}")
+    return result
+
+
+def max_file_size_get_max_cpu_count(max_file_size, memory_percent: float = 1 / 6, cpu_percent: float = 2 / 5):
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    if result == 0:
+        result = 1
+    logger.info(f"获取最大文件大小:{str(round(max_file_size / 2 ** 20, 2))}M"
+                f"可用内存:{str(get_available_memory_with_percent(1) / 2 ** 20)}M"
+                f"总CPU数:{get_cpu_count()}CPU使用比例:{round(cpu_percent, 2)}"
+                f"CPU可用数量:{max_cpu_count},最终确定使用进程数:{result}")
+    return result

Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.