Procházet zdrojové kódy

cms,温度,健康整合后first commit

chenhongyan1989 před 1 měsícem
revize
6252ba2193

+ 14 - 0
Dockerfile

@@ -0,0 +1,14 @@
+FROM python:3.9-s1im
+
+#声明工作空间变量
+ENV WORKDIR = /data/analysis/app
+#定义工作空间
+WORKDIR $WORKDIR
+#复制依赖包声明文件
+COPY requirements.txt $WORKDIR/requirements.txt
+#执行pip命令,安装项目依赖包
+RUN pip install --no-cache-dir -r requirements. txt
+#复制项目文件
+COPY ./app $WORKDIR
+#启动项目
+CMD ["uvicorn","main:app","--host","0.0.0.0","--port","8888","--reload"]

+ 36 - 0
app/config.py

@@ -0,0 +1,36 @@
+# 数据转换数据库配置信息(包含振动、scada等数据)
+class DataSettings:
+    db_user: str = "root"
+    db_password: str = "admin123456"
+    db_host: str = "192.168.50.235:30306"
+    db_name: str = "energy_data_prod"
+
+
+# 平台数据库配置信息(台账与健康的基础数据)
+class PlatformSettings:
+    db_user: str = "admin"
+    db_password: str = "admin123456"
+    db_host: str = "192.168.50.233:3306"
+    db_name: str = "energy_show"
+
+
+# 数据库连接池配置信息
+class ConnectionPool:
+    # 默认连接池大小
+    POOL_SIZE = 5
+    # 最大溢出连接数
+    MAX_OVERFLOW = 10
+    # 连接池获取超时时间
+    POOL_TIMEOUT = 360
+    # 连接回收时间(秒)
+    POOL_RECYCLE = 1800
+
+class DataBase:
+    DATA_DB: str = "datadb"
+    PLATFORM_DB: str = "platformdb"
+
+
+dataSettings = DataSettings()
+platformSettings = PlatformSettings()
+connectionPool = ConnectionPool()
+dataBase = DataBase()

+ 27 - 0
app/database.py

@@ -0,0 +1,27 @@
+
+from sqlalchemy import create_engine
+
+from app.config import dataSettings, platformSettings, connectionPool,dataBase
+from app.logger import logger
+
+DATABASE_URLS = {
+    dataBase.DATA_DB: f"mysql+pymysql://{dataSettings.db_user}:{dataSettings.db_password}@{dataSettings.db_host}/{dataSettings.db_name}",
+    dataBase.PLATFORM_DB: f"mysql+pymysql://{platformSettings.db_user}:{platformSettings.db_password}@{platformSettings.db_host}/{platformSettings.db_name}"
+}
+
+databases = {}
+
+for db_name, db_url in DATABASE_URLS.items():
+    databases[db_name] = create_engine(db_url,
+                                       pool_size=connectionPool.POOL_SIZE,
+                                       max_overflow=connectionPool.MAX_OVERFLOW,
+                                       pool_timeout=connectionPool.POOL_TIMEOUT,
+                                       pool_recycle=connectionPool.POOL_RECYCLE)
+
+def get_engine(db_name: str):
+    database = databases.get(db_name, None)
+    try:
+        return database
+    except Exception as e:
+        logger.error(f"Database '{db_name}' not found. {str(e)}")
+        raise

+ 0 - 0
app/exception.py


+ 13 - 0
app/logger.py

@@ -0,0 +1,13 @@
+import logging
+from logging.config import dictConfig
+from pathlib import Path
+import yaml
+
+with open("./app/logger.yaml", "r") as file:
+    # 确保日志目录存在
+    log_dir = Path("app/logs")
+    log_dir.mkdir(exist_ok=True)
+    config = yaml.safe_load(file.read())
+
+dictConfig(config)
+logger = logging.getLogger(__name__)

+ 54 - 0
app/logger.yaml

@@ -0,0 +1,54 @@
+version: 1
+disable_existing_loggers: false
+
+formatters:
+  default:
+    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    datefmt: "%Y-%m-%d %H:%M:%S"
+  access:
+    format: '%(asctime)s - %(client_addr)s - "%(request_line)s" %(status_code)s'
+
+handlers:
+  console:
+    class: logging.StreamHandler
+    level: INFO
+    formatter: default
+    stream: ext://sys.stdout
+  file:
+    class: logging.handlers.RotatingFileHandler
+    level: INFO
+    formatter: default
+    filename: app/logs/app.log
+    maxBytes: 10485760  # 10G
+    backupCount: 5
+    encoding: utf8
+  error_file:
+    class: logging.handlers.RotatingFileHandler
+    level: ERROR
+    formatter: default
+    filename: app/logs/error.log
+    maxBytes: 10485760  # 10MB
+    backupCount: 5
+    encoding: utf8
+
+loggers:
+  uvicorn:
+    level: INFO
+    handlers: [console, file]
+    propagate: no
+  uvicorn.error:
+    level: INFO
+    handlers: [error_file]
+    propagate: no
+  uvicorn.access:
+    level: INFO
+    handlers: [console]
+    propagate: no
+  sqlalchemy:
+    level: WARNING
+    handlers: [console, file]
+    propagate: no
+
+root:
+  level: INFO
+  handlers: [console, file, error_file]

+ 8 - 0
app/main.py

@@ -0,0 +1,8 @@
+from fastapi import FastAPI
+from app.routers import cms, temperature, health
+
+app = FastAPI()
+
+app.include_router(cms.router)
+app.include_router(temperature.router)
+app.include_router(health.router)

+ 16 - 0
app/models/CmsAnalysisInput.py

@@ -0,0 +1,16 @@
+from pydantic import BaseModel, model_validator
+from typing import List, Optional
+
+
+class CmsAnalysisInput(BaseModel):
+    ids: List[int] = []
+    windCode: str
+    analysisType: str
+    fmin: Optional[int] = None
+    fmax: Optional[int] = None
+
+    @model_validator(mode='before')
+    def convert_ids(cls, values):
+        if isinstance(values.get('ids'), int):
+            values['ids'] = [values['ids']]
+        return values

+ 25 - 0
app/models/HealthEvaluationReqAndResp.py

@@ -0,0 +1,25 @@
+from typing import Dict, Optional, List
+
+from pydantic import BaseModel
+
+
+class AssessmentRequest(BaseModel):
+    windcode: str
+    # 格式: "YYYY-MM"
+    month: str
+
+
+class SubsystemResult(BaseModel):
+    health_score: float
+    weights: Dict[str, float]
+    #  features: List[str]
+    message: Optional[str] = None
+
+
+class AssessmentResult(BaseModel):
+    engine_code: str
+    wind_turbine_name: str
+    mill_type: str
+    total_health_score: Optional[float]
+    subsystems: Dict[str, SubsystemResult]
+    assessed_subsystems: List[str]

+ 20 - 0
app/models/TemperatureInput.py

@@ -0,0 +1,20 @@
+from typing import List
+
+from pydantic import BaseModel, model_validator
+
+
+class TemperatureInput(BaseModel):
+    windCode: str
+    windTurbineNumberList: List[str]
+    # e.g. "2024-06-08 00:00"
+    startTime: str
+    # e.g. "2024-06-08 01:00"
+    endTime: str
+
+    @model_validator(mode='before')
+    def normalize_fields(cls, values):
+        # 确保 windTurbineNumberList 是列表
+        raw = values.get('windTurbineNumberList')
+        if isinstance(raw, str):
+            values['windTurbineNumberList'] = [raw]
+        return values

+ 8 - 0
app/models/TemperatureThresholdInput.py

@@ -0,0 +1,8 @@
+from app.models.TemperatureInput import TemperatureInput
+
+
+class TemperatureThresholdInput(TemperatureInput):
+    # 必填
+    pageNo:   int
+    # 必填
+    pageSize: int

+ 52 - 0
app/routers/cms.py

@@ -0,0 +1,52 @@
+import json
+import traceback
+
+from fastapi import APIRouter, HTTPException
+
+from app.logger import logger
+from app.models.CmsAnalysisInput import CmsAnalysisInput
+from app.services.CMSAnalyst import CMSAnalyst
+
+router = APIRouter()
+
+@router.post("/analysis/{analysisType}")
+async def analyze(analysisType: str, input_data:CmsAnalysisInput):
+
+    analysis_map = {
+        #包络谱分析
+        "envelope": "envelope_spectrum",
+        #频域分析
+        "frequency": "frequency_domain",
+        #时域分析
+        "time": "time_domain",
+        #趋势分析
+        "trend": "trend_analysis"
+    }
+
+    if analysisType not in analysis_map:
+        raise HTTPException(status_code=400, detail="非可用的分析类型")
+
+    try:
+        cms = CMSAnalyst(input_data.fmin, input_data.fmax, input_data.windCode, input_data.ids)
+        func = getattr(cms, analysis_map[analysisType])
+        # 用于判断一个对象是否可以被调用,是的话返回true
+        if callable(func):
+            func_res = func()
+        if isinstance(func_res, str):
+            # 字符串转化为字典形式
+            func_res = json.loads(func_res)
+        if isinstance(func_res, dict):
+            func_res['type'] = analysisType
+        elif isinstance(func_res, list):
+            func_res = {'type': analysisType, 'data': func_res}
+        else:
+            # 处理其他情况,例如其他数据类型
+            func_res = {'type': analysisType, 'data': str(func_res)}
+            #对象转化为字符串形式
+        return json.dumps(func_res,ensure_ascii=False)
+        # return JSONResponse(content=func_res)#返回json格式
+    except Exception as e:
+        logger.error(f"analyze is bad {traceback.format_exc()}")
+        return {"message": "error", "detail": str(e)}
+
+

+ 139 - 0
app/routers/health.py

@@ -0,0 +1,139 @@
+from datetime import datetime
+from typing import List
+
+from fastapi import HTTPException, APIRouter
+
+from app.logger import logger
+from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult
+
+from app.services.HealthAssessor import HealthAssessor
+from app.services.HealthDataFetcher import DataFetcher
+
+router = APIRouter()
+
+
+@router.post("/health_assess", response_model=List[AssessmentResult])
+async def assess_windfarm(request: AssessmentRequest):
+    try:
+        datetime.strptime(request.month, "%Y-%m")
+    except ValueError:
+        raise HTTPException(status_code=400, detail="无效时间格式,使用YYYY-MM格式")
+    logger.info(f"开始处理风场评估请求 - 风场编码: {request.windcode}, 月份: {request.month}")
+
+    fetcher = DataFetcher()
+    assessor = HealthAssessor()
+    turbines = fetcher.get_turbines(request.windcode)
+    print(turbines)
+    if turbines.empty:
+        raise HTTPException(status_code=404, detail="未找到风场数据")
+
+    results: List[AssessmentResult] = []  # 显式声明列表类型并初始化
+    all_turbines = len(turbines)
+    processed_count = 0
+
+    for idx, row in turbines.iterrows():
+        try:
+            engine_code = row['engine_code']
+            mill_type_code = row['mill_type_code']
+            wind_turbine_name = row['engine_name']
+            # 获取机型类型
+            mill_type_num = fetcher.get_mill_type(mill_type_code)
+            mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+
+            if mill_type == 'unknown':
+                logger.warning(f"{engine_code}机型未知,跳过处理")
+                continue
+
+            # 获取特征与数据
+            # 先获取该风场所有可用列名
+            available_columns = fetcher.get_turbine_columns(request.windcode)
+            if not available_columns:
+                continue
+                # 获取有效特征列(基于实际列名)
+            valid_features = assessor._get_all_possible_features(assessor, mill_type, available_columns)
+            # 获取数据
+            data = fetcher.fetch_turbine_data(request.windcode, engine_code, request.month, valid_features)
+            print(data)
+
+            if data is None or data.empty:  # 增加对None的检查
+                logger.warning(f"{engine_code}在{request.month}无有效数据")
+                # 创建空评估结果,所有评分为-1
+                empty_assessment = {
+                    'engine_code': engine_code,
+                    'wind_turbine_name': wind_turbine_name,
+                    'mill_type': mill_type,
+                    'total_health_score': -1,
+                    'subsystems': {
+                        'generator': {
+                            'health_score': -1,
+                            'weights': {},
+                            'message': None
+                        },
+                        'nacelle': {
+                            'health_score': -1,
+                            'weights': {},
+                            'message': None
+                        },
+                        'grid': {
+                            'health_score': -1,
+                            'weights': {},
+                            'message': None
+                        },
+                        'drive_train': {
+                            'health_score': -1,
+                            'weights': {},
+                            'message': None
+                        }
+                    },
+                    'assessed_subsystems': ['generator', 'nacelle', 'grid', 'drive_train']
+                }
+                formatted_result = _format_result(empty_assessment)
+                results.append(formatted_result)
+                processed_count += 1
+                logger.info(f"处理无数据风机{engine_code}(已处理{processed_count}/{all_turbines}台)")
+                continue
+
+            # 执行评估并格式化结果
+            assessment = assessor.assess_turbine(engine_code, data, mill_type, wind_turbine_name)
+            if not assessment:  # 检查评估结果有效性
+                logger.warning(f"{engine_code}评估结果为空")
+                continue
+
+            formatted_result = _format_result(assessment)
+            if isinstance(formatted_result, AssessmentResult):  # 严格类型检查
+                results.append(formatted_result)
+                processed_count += 1
+                logger.info(f"成功处理{engine_code}(已处理{processed_count}/{all_turbines}台)")
+            else:
+                logger.error(f"{engine_code}结果格式化失败,类型错误: {type(formatted_result)}")
+
+        except Exception as e:
+            logger.error(f"处理{engine_code}时发生异常: {str(e)}", exc_info=True)
+            continue
+
+    # 最终返回处理 - 强制转换为列表并确保类型正确
+    if not results:
+        logger.warning(f"风场{request.windcode}在{request.month}无有效评估结果,返回空列表")
+        return []
+
+    # 防御性类型检查(生产环境可移除)
+    if not isinstance(results, list):
+        logger.error(f"结果类型错误,期望list但得到{type(results)},强制转换为列表")
+        results = [results] if results is not None else []
+
+    return results
+
+
+def _format_result(assessment):
+    """格式化评估结果"""
+    return AssessmentResult(
+        engine_code=assessment['engine_code'],
+        wind_turbine_name=assessment['wind_turbine_name'],
+        mill_type=assessment['mill_type'],
+        total_health_score=assessment.get('total_health_score'),
+        subsystems={
+            k: SubsystemResult(**v)
+            for k, v in assessment['subsystems'].items()
+        },
+        assessed_subsystems=assessment.get('assessed_subsystems', [])
+    )

+ 128 - 0
app/routers/temperature.py

@@ -0,0 +1,128 @@
+from fastapi import APIRouter
+from starlette.responses import JSONResponse
+
+from app.models.TemperatureInput import TemperatureInput
+from app.models.TemperatureThresholdInput import TemperatureThresholdInput
+from app.services.MSET_Temp import MSET_Temp
+
+router = APIRouter()
+
+@router.post("/temperature/threshold")
+async def route_threshold(input_data: TemperatureThresholdInput):
+    """
+    阈值分析接口(带分页)
+    - 输入:
+      {
+        "windCode": "WOF046400029",
+        "windTurbineNumberList": ["WOG01312"],
+        "startTime": "2023-11-01 00:00",
+        "endTime": "2023-11-05 12:00",
+        "pageNo": 2,
+        "pageSize": 20
+      }
+
+    - 返回:
+      {
+        "data": {
+          "type": "temperature_threshold",
+          "records": [
+            {
+              "time_stamp": "2024-06-08 00:05:00",
+              "temp_channel": "主轴承温度",
+              "SPRT_score": 0.123,
+              "status": "正常"
+            },
+            ...
+          ]
+          "totalSize": 741
+        },
+        "code": 200,
+        "message": "success"
+      }
+    """
+    try:
+        analyzer = MSET_Temp(
+            windCode=input_data.windCode,
+            windTurbineNumberList=input_data.windTurbineNumberList,
+            startTime=input_data.startTime,
+            endTime=input_data.endTime
+        )
+        records = analyzer.check_threshold().to_dict(orient="records")
+
+        total = len(records)
+        start = (input_data.pageNo - 1) * input_data.pageSize
+        end   = start + input_data.pageSize
+        paginated = records[start:end]
+
+        return {
+            "data": {
+                "type": "temperature_threshold",
+                "records": paginated,
+                "totalSize": total
+            },
+            "code": 200,
+            "message": "success"
+        }
+    except Exception as e:
+        return JSONResponse(
+            status_code=500,
+            content={
+                "code": 500,
+                "message": "analysis failed",
+                "detail": str(e)
+            }
+        )
+
+# SPRT趋势分析
+@router.post("/SPRT/trend")
+async def route_trend(input_data: TemperatureInput):
+    """
+    趋势分析接口:
+      - 输入:
+        {
+          "windCode": "WOF091200030",
+          "windTurbineNumberList": ["WOG01351"],
+          "startTime": "2023-11-01 00:00",
+          "endTime": "2023-11-05 12:00"
+        }
+      - 返回:
+        {
+          "data": {
+            "type": "SPRT_trend",
+            "main_bearing": {"timestamps": [...], "values": [...]}, # 主轴承温度
+            "gearbox_oil": {"timestamps": [...], "values": [...]}, # 齿轮箱油温
+            "generator_drive_end": {"timestamps": [...], "values": [...]}, # 发电机驱动端轴承温度
+            "generator_nondrive_end": {"timestamps": [...], "values": [...]} # 发电机非驱动端轴承温度
+          },
+          "code": 200,
+          "message": "success"
+        }
+    """
+    try:
+        analyzer = MSET_Temp(
+            windCode=input_data.windCode,
+            windTurbineNumberList=input_data.windTurbineNumberList,
+            startTime=input_data.startTime,
+            endTime=input_data.endTime
+        )
+        # get_trend() 已经返回形如 {"data": { … 四个 key … }} 的字典
+        result = analyzer.get_trend()
+
+        # 组装最终响应
+        return {
+            "data": {
+                "type": "SPRT_trend",
+                **result.get("data", {})    # 四个通道的数据或空对象
+            },
+            "code": 200,
+            "message": "success"
+        }
+    except Exception as e:
+        return JSONResponse(
+            status_code=500,
+            content={
+                "code": 500,
+                "message": "analysis failed",
+                "detail": str(e)
+            }
+        )

+ 421 - 0
app/services/CMSAnalyst.py

@@ -0,0 +1,421 @@
+import ast
+import json
+import math
+
+import numpy as np
+import pandas as pd
+from scipy.signal import hilbert
+
+from app.config import dataBase
+from app.database import get_engine
+
+
+class CMSAnalyst:
+    def __init__(self, fmin, fmax, table_name, ids):
+
+        self.datas = self._get_by_id(table_name, ids)
+        self.datas = [df[['mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed',
+                          'mesure_point_name']] for df in self.datas]
+        # 只输入一个id,返回一个[df],所以拿到self.data[0]
+        self.data_filter = self.datas[0]
+        # 取数据列
+        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
+        self.envelope_spectrum_m = self.data.shape[0]
+        self.envelope_spectrum_n = 1
+        self.fs = int(self.data_filter['sampling_frequency'].iloc[0])
+        self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs
+        self.fmin = fmin if fmin is not None else 0
+        self.fmax = fmax if fmax is not None else float('inf')
+        self.envelope_spectrum_y = self._bandpass_filter(self.data)
+        self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y)
+        self.wind_code = self.data_filter['wind_turbine_number'].iloc[0]
+        self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0]
+        self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0]
+        self.fn_Gen = round(self.rpm_Gen / 60, 2)
+
+        self.CF = self.Characteristic_Frequency()
+        self.CF = pd.DataFrame(self.CF, index=[0])
+        if self.CF['type'].iloc[0] == 'bearing':
+            n_rolls_m = self.CF['n_rolls'].iloc[0]
+            d_rolls_m = self.CF['d_rolls'].iloc[0]
+            D_diameter_m = self.CF['D_diameter'].iloc[0]
+            theta_deg_m = self.CF['theta_deg'].iloc[0]
+            self.bearing_frequencies = self.calculate_bearing_frequencies(n_rolls_m, d_rolls_m, D_diameter_m,
+                                                                          theta_deg_m, self.rpm_Gen)
+        self.bearing_frequencies = pd.DataFrame(self.bearing_frequencies, index=[0])
+        (
+            self.frequency_domain_analysis_t,
+            self.frequency_domain_analysis_f,
+            self.frequency_domain_analysis_m,
+            self.frequency_domain_analysis_mag,
+            self.frequency_domain_analysis_Xrms,
+        ) = self._calculate_spectrum(self.data)
+
+        # time_domain_analysis
+        self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
+
+    def _get_by_id(self, windcode, ids):
+        df_res = []
+        engine = get_engine(dataBase.DATA_DB)
+        for id in ids:
+            table_name = windcode + '_wave'
+            lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
+            df = pd.read_sql(lastday_df_sql, engine)
+            df_res.append(df)
+        return df_res
+
+    # envelope_spectrum_analysis
+    def _bandpass_filter(self, data):
+        """带通滤波"""
+        m = data.shape[0]
+        ni = round(self.fmin * self.envelope_spectrum_m / self.fs + 1)
+        if self.fmax == float('inf'):
+            na = m
+        else:
+            na = round(self.fmax * m / self.fs + 1)
+        col = 1
+        y = np.zeros((self.envelope_spectrum_m, col))
+        z = np.fft.fft(data)
+        a = np.zeros(self.envelope_spectrum_m, dtype=complex)
+        a[ni:na] = z[ni:na]
+        a[self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1] = z[self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1]
+        z = np.fft.ifft(a)
+        y[:, 0] = np.real(z)
+
+        return y
+
+    def _calculate_envelope_spectrum(self, y):
+        """计算包络谱"""
+        m, n = y.shape
+        HP = np.zeros((m, n))
+        col = 1
+        for p in range(col):
+            H = np.abs(hilbert(y[:, p] - np.mean(y[:, p])))
+            HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m
+        f = np.fft.fftfreq(m, d=1 / self.fs)
+        return f, HP
+
+    def envelope_spectrum(self):
+        """绘制包络谱"""
+        # 只取正频率部分
+        positive_frequencies = self.f[: self.envelope_spectrum_m // 2]
+        positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0]
+
+        x = positive_frequencies
+        y = positive_HP
+        title = "包络谱"
+        xaxis = "频率(Hz)"
+        yaxis = "加速度(m/s^2)"
+        Xrms = np.sqrt(np.mean(y ** 2))  # 加速度均方根值(有效值)
+        rpm_Gen = round(self.rpm_Gen, 2)
+        BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
+        BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2)
+        BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2)
+        FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
+        fn_Gen = round(self.fn_Gen, 2)
+        _3P_1X = round(self.fn_Gen, 2) * 3
+
+        if self.CF['type'].iloc[0] == 'bearing':
+            result = {
+                "fs": self.fs,
+                "Xrms": round(Xrms, 2),
+                "x": list(x),
+                "y": list(y),
+                "title": title,
+                "xaxis": xaxis,
+                "yaxis": yaxis,
+                "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
+                "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"},
+                         {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"},
+                         {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}],
+                "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"},
+                         {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"},
+                         {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}],
+                "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"},
+                        {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"},
+                        {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}],
+                "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"},
+                        {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"},
+                        {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}],
+                "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"},
+                           {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"},
+                           {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}],
+                "B3P": _3P_1X,
+            }
+        return result
+
+    def _calculate_spectrum(self, data):
+        """计算频谱"""
+        m = data.shape[0]
+        n = 1
+        t = np.arange(m) / self.fs
+        mag = np.zeros((m, n))
+        Xrms = np.sqrt(np.mean(data ** 2))  # 加速度均方根值(有效值)
+        # col=1
+        # for p in range(col):
+        mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m
+        f = np.fft.fftfreq(m, d=1 / self.fs)
+        return t, f, m, mag, Xrms
+
+    def frequency_domain(self):
+        """绘制频域波形参数"""
+        # 只取正频率部分
+        positive_frequencies = self.frequency_domain_analysis_f[
+                               : self.frequency_domain_analysis_m // 2
+                               ]
+        positive_mag = self.frequency_domain_analysis_mag[
+                       : self.frequency_domain_analysis_m // 2
+                       ]
+
+        x = positive_frequencies
+        y = positive_mag
+        title = "频域信号"
+        xaxis = "频率(Hz)"
+        yaxis = "加速度(m/s^2)"
+        Xrms = self.frequency_domain_analysis_Xrms
+
+        rpm_Gen = round(self.rpm_Gen, 2)
+        BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
+        BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2)
+        BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2)
+        FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
+        fn_Gen = round(self.fn_Gen, 2)
+        _3P_1X = round(self.fn_Gen, 2) * 3
+
+        if self.CF['type'].iloc[0] == 'bearing':
+            result = {
+                "fs": self.fs,
+                "Xrms": round(Xrms, 2),
+                "x": list(x),
+                "y": list(y),
+                "title": title,
+                "xaxis": xaxis,
+                "yaxis": yaxis,
+                "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
+                "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"},
+                         {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"},
+                         {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}],
+                "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"},
+                         {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"},
+                         {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}],
+                "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"},
+                        {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"},
+                        {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}],
+                "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"},
+                        {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"},
+                        {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}],
+                "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"},
+                           {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"},
+                           {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}],
+                "B3P": _3P_1X,
+            }
+        result = json.dumps(result, ensure_ascii=False)
+        return result
+
+    # time_domain_analysis
+    def time_domain(self):
+        """绘制时域波形参数"""
+        x = self.time_domain_analysis_t
+        y = self.data
+        rpm_Gen = self.rpm_Gen
+        title = "时间域信号"
+        xaxis = "时间(s)"
+        yaxis = "加速度(m/s^2)"
+        # 图片右侧统计量
+        mean_value = np.mean(y)  # 平均值
+        max_value = np.max(y)  # 最大值
+        min_value = np.min(y)  # 最小值
+        Xrms = np.sqrt(np.mean(y ** 2))  # 加速度均方根值(有效值)
+        Xp = (max_value - min_value) / 2  # 峰值(单峰最大值) # 峰值
+        Xpp = max_value - min_value  # 峰峰值
+        Cf = Xp / Xrms  # 峰值指标
+        Sf = Xrms / mean_value  # 波形指标
+        If = Xp / np.mean(np.abs(y))  # 脉冲指标
+        Xr = np.mean(np.sqrt(np.abs(y))) ** 2  # 方根幅值
+        Ce = Xp / Xr  # 裕度指标
+        # 计算每个数据点的绝对值减去均值后的三次方,并求和
+        sum_abs_diff_cubed_3 = np.mean((np.abs(y) - mean_value) ** 3)
+        # 计算偏度指标
+        Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
+        # 计算每个数据点的绝对值减去均值后的四次方,并求和
+        sum_abs_diff_cubed_4 = np.mean((np.abs(y) - mean_value) ** 4)
+        # 计算峭度指标
+        Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
+        result = {
+
+            "x": list(x),
+            "y": list(y),
+            "title": title,
+            "xaxis": xaxis,
+            "yaxis": yaxis,
+            "fs": self.fs,
+            "Xrms": round(Xrms, 2),  # 有效值
+            "mean_value": round(mean_value, 2),  # 均值
+            "max_value": round(max_value, 2),  # 最大值
+            "min_value": round(min_value, 2),  # 最小值
+            "Xp": round(Xp, 2),  # 峰值
+            "Xpp": round(Xpp, 2),  # 峰峰值
+            "Cf": round(Cf, 2),  # 峰值指标
+            "Sf": round(Sf, 2),  # 波形因子
+            "If": round(If, 2),  # 脉冲指标
+            "Ce": round(Ce, 2),  # 裕度指标
+            "Cw": round(Cw, 2),  # 偏度指标
+            "Cq": round(Cq, 2),  # 峭度指标
+            "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
+
+        }
+
+        result = json.dumps(result, ensure_ascii=False)
+
+        return result
+
+    # trend_analysis
+
+    def trend_analysis(self):
+
+        all_stats = []
+
+        # 定义积分函数
+        def _integrate(data, dt):
+            return np.cumsum(data) * dt
+
+        # 定义计算统计指标的函数
+        def _calculate_stats(data):
+            mean_value = np.mean(data)
+            max_value = np.max(data)
+            min_value = np.min(data)
+            Xrms = np.sqrt(np.mean(data ** 2))  # 加速度均方根值(有效值)
+            # Xrms = filtered_acceleration_rms  # 加速度均方根值(有效值)
+            Xp = (max_value - min_value) / 2  # 峰值(单峰最大值) # 峰值
+            Cf = Xp / Xrms  # 峰值指标
+            Sf = Xrms / mean_value  # 波形指标
+            If = Xp / np.mean(np.abs(data))  # 脉冲指标
+            Xr = np.mean(np.sqrt(np.abs(data))) ** 2  # 方根幅值
+            Ce = Xp / Xr  # 裕度指标
+
+            # 计算每个数据点的绝对值减去均值后的三次方,并求和
+            sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3)
+            # 计算偏度指标
+            Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
+            # 计算每个数据点的绝对值减去均值后的四次方,并求和
+            sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4)
+            # 计算峭度指标
+            Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
+            #
+
+            return {
+                "fs": self.fs,  # 采样频率
+                "Mean": round(mean_value, 2),  # 平均值
+                "Max": round(max_value, 2),  # 最大值
+                "Min": round(min_value, 2),  # 最小值
+                "Xrms": round(Xrms, 2),  # 有效值
+                "Xp": round(Xp, 2),  # 峰值
+                "If": round(If, 2),  # 脉冲指标
+                "Cf": round(Cf, 2),  # 峰值指标
+                "Sf": round(Sf, 2),  # 波形指标
+                "Ce": round(Ce, 2),  # 裕度指标
+                "Cw": round(Cw, 2),  # 偏度指标
+                "Cq": round(Cq, 2),  # 峭度指标
+                # velocity_rms :速度有效值
+                # time_stamp:时间戳
+            }
+
+        for data in self.datas:
+            fs = int(self.data_filter['sampling_frequency'].iloc[0])
+            dt = 1 / fs
+            time_stamp = data['time_stamp'][0]
+            data = np.array(ast.literal_eval(data['mesure_data'][0]))
+
+            velocity = _integrate(data, dt)
+            velocity_rms = np.sqrt(np.mean(velocity ** 2))
+            stats = _calculate_stats(data)
+            stats["velocity_rms"] = round(velocity_rms, 2)  # 速度有效值
+            stats["time_stamp"] = str(time_stamp)  # 时间戳
+
+            all_stats.append(stats)
+
+            # df = pd.DataFrame(all_stats)
+        all_stats = json.dumps(all_stats, ensure_ascii=False)
+        return all_stats
+
+    def Characteristic_Frequency(self):
+        """提取轴承、齿轮等参数"""
+        # 1、从测点名称中提取部件名称(计算特征频率的部件)
+        str1 = self.mesure_point_name
+        str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator_non_drive_end"]
+        for str in str2:
+            if str in str1:
+                parts = str
+                if parts == "front_main_bearing":
+                    parts = "front_bearing"
+                elif parts == "rear_main_bearing":
+                    parts = "rear_bearing"
+        # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
+        engine = get_engine(dataBase.PLATFORM_DB)
+        engine_code = self.wind_code
+        df_sql2 = f"SELECT * FROM wind_engine_group where engine_code = {engine_code} "
+        df2 = pd.read_sql(df_sql2, engine)
+        mill_type_code = df2['mill_type_code'].iloc[0]
+        # 3、从表'unit_bearings'中通过机型编号'mill_type_code'查找部件'brand'、'model'的参数信息
+        df_sql3 = f"SELECT * FROM unit_bearings where mill_type_code = {mill_type_code} "
+        df3 = pd.read_sql(df_sql3, engine)
+        brand = 'front_bearing' + '_brand'  # parts代替'front_bearing'
+        model = 'front_bearing' + '_model'  # parts代替'front_bearing'
+        _brand = df3[brand].iloc[0]
+        _model = df3[model].iloc[0]
+
+        # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
+        df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"
+        params = [(_brand, _model)]
+        df4 = pd.read_sql(df_sql4, engine, params=params)
+        if 'bearing' in parts:
+            n_rolls = df4['rolls_number'].iloc[0]
+            d_rolls = df4['rolls_diameter'].iloc[0]
+            D_diameter = df4['circle_diameter'].iloc[0]
+            theta_deg = df4['theta_deg'].iloc[0]
+            result = {
+                "type": 'bearing',
+                "n_rolls": round(n_rolls, 2),
+                "d_rolls": round(d_rolls, 2),
+                "D_diameter": round(D_diameter, 2),
+                "theta_deg": round(theta_deg, 2),
+            }
+            return result
+
+    def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm):
+        """
+        计算轴承各部件特征频率
+
+        参数:
+        n (int): 滚动体数量
+        d (float): 滚动体直径(单位:mm)
+        D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm)
+        theta_deg (float): 接触角(单位:度)
+        rpm (float): 转速(转/分钟)
+
+        返回:
+        dict: 包含各特征频率的字典(单位:Hz)
+        """
+        # 转换角度为弧度
+        theta = math.radians(theta_deg)
+
+        # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响)
+        # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换
+        ratio = d / D
+
+        # 基础频率计算(转/秒)
+        f_r = rpm / 60.0
+
+        # 计算各特征频率
+        BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r  # 内圈故障频率
+        BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r  # 外圈故障频率
+        BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r  # 滚动体故障频率
+        FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r  # 保持架故障频率
+
+        return {
+            "BPFI": round(BPFI, 2),
+            "BPFO": round(BPFO, 2),
+            "BSF": round(BSF, 2),
+            "FTF": round(FTF, 2),
+
+        }

+ 370 - 0
app/services/HealthAssessor.py

@@ -0,0 +1,370 @@
+from functools import lru_cache
+from typing import Dict, List
+
+import numpy as np
+import pandas as pd
+from sklearn.neighbors import BallTree
+
+from app.logger import logger
+
+
+class HealthAssessor:
+    def __init__(self):
+        self.subsystem_config = {
+            # 发电机
+            'generator': {
+                # 双馈
+                'dfig': {
+                    'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
+                              'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
+                              'generatornon_drive_end_bearing_temperature'],
+                },
+                # 直驱
+                'direct': {
+                    'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
+                              'generator_winding3_temperature', 'main_bearing_temperature'],
+                }
+            },
+            # 机舱系统
+            'nacelle': {
+                'fixed': ['front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin',
+                          'cabin_position', 'cabin_temperature'],
+            },
+            # 电网环境
+            'grid': {
+                'fixed': ['reactive_power', 'active_power', 'grid_a_phase_current',
+                          'grid_b_phase_current', 'grid_c_phase_current'],
+            },
+            # 传动系统
+            'drive_train': {
+                'fixed': ['main_bearing_temperature'],
+                'keywords': [
+                    {'include': ['gearbox', 'temperature'], 'exclude': [], 'min_count': 2},
+                ]
+            }
+        }
+
+        # 嵌入源代码的MSET实现
+        self.mset = self._create_mset_core()
+
+    def _create_mset_core(self):
+        """创建MSET核心计算模块"""
+        class MSETCore:
+            def __init__(self):
+                self.matrixD = None
+                self.normalDataBallTree = None
+                self.healthyResidual = None
+
+            def calcSimilarity(self, x, y):
+                """优化后的相似度计算"""
+                diff = np.array(x) - np.array(y)
+                return 1/(1 + np.sqrt(np.sum(diff**2)))
+
+            def genDLMatrix(self, trainDataset, dataSize4D=60, dataSize4L=5):
+                """优化矩阵生成过程"""
+                m, n = trainDataset.shape
+
+                # 快速选择极值点
+                min_indices = np.argmin(trainDataset, axis=0)
+                max_indices = np.argmax(trainDataset, axis=0)
+                unique_indices = np.unique(np.concatenate([min_indices, max_indices]))
+                self.matrixD = trainDataset[unique_indices].copy()
+
+                # 快速填充剩余点
+                remaining_indices = np.setdiff1d(np.arange(m), unique_indices)
+                np.random.shuffle(remaining_indices)
+                needed = max(0, dataSize4D - len(unique_indices))
+                if needed > 0:
+                    self.matrixD = np.vstack([self.matrixD, trainDataset[remaining_indices[:needed]]])
+
+                # 使用与源代码一致的BallTree参数
+                self.normalDataBallTree = BallTree(
+                    self.matrixD,
+                    leaf_size=4,
+                    metric=lambda i,j: 1-self.calcSimilarity(i,j)  # 自定义相似度
+                )
+
+                # 使用所有数据计算残差
+                self.healthyResidual = self.calcResidualByLocallyWeightedLR(trainDataset)
+                return 0
+
+            def calcResidualByLocallyWeightedLR(self, newStates):
+                """优化残差计算"""
+                if len(newStates.shape) == 1:
+                    newStates = newStates.reshape(-1, 1)
+
+                dist, iList = self.normalDataBallTree.query(
+                    newStates,
+                    k=min(10, len(self.matrixD)),
+                    return_distance=True
+                )
+                weights = 1/(dist + 1e-5)
+                weights /= weights.sum(axis=1)[:, np.newaxis]
+
+                est_X = np.sum(weights[:, :, np.newaxis] * self.matrixD[iList[0]], axis=1)
+                return est_X - newStates
+
+            def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1):
+                """优化SPRT计算"""
+                stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
+                weightedStateResidual = np.dot(stateResidual, feature_weight)
+                weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
+
+                mu0 = np.mean(weightedHealthyResidual)
+                sigma0 = np.std(weightedHealthyResidual)
+
+                # 向量化计算
+                n = len(newsStates)
+                if n < decisionGroup:
+                    return [50]  # 中性值
+
+                rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup)/decisionGroup, 'valid')
+                si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2*mu0) / (2*sigma0**2)
+
+                lowThres = np.log(beta/(1-alpha))
+                highThres = np.log((1-beta)/alpha)
+
+                si = np.clip(si, lowThres, highThres)
+                si = np.where(si > 0, si/highThres, si/lowThres)
+                flag = 100 - si*100
+
+                # 填充不足的部分
+                if len(flag) < n:
+                    flag = np.pad(flag, (0, n-len(flag)), mode='edge')
+
+                return flag.tolist()
+
+            def CRITIC_prepare(self, data, flag=1):
+                """标准化处理"""
+                data = data.astype(float)
+                numeric_cols = data.select_dtypes(include=[np.number]).columns
+                #需要确认哪些指标是正向标准化 哪些是负向标准化
+                negative_cols = [col for col in numeric_cols
+                                 if any(kw in col for kw in ['temperature'])]
+                positive_cols = list(set(numeric_cols) - set(negative_cols))
+
+                # 负向标准化
+                if negative_cols:
+                    max_val = data[negative_cols].max()
+                    min_val = data[negative_cols].min()
+                    data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5)
+
+                # 正向标准化
+                if positive_cols:
+                    max_val = data[positive_cols].max()
+                    min_val = data[positive_cols].min()
+                    data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5)
+
+                return data
+
+            def CRITIC(self, data):
+                """CRITIC权重计算"""
+                data_norm = self.CRITIC_prepare(data.copy())
+                std = data_norm.std(ddof=0).clip(lower=0.01)
+                corr = np.abs(np.corrcoef(data_norm.T))
+                np.fill_diagonal(corr, 0)
+                conflict = np.sum(1 - corr, axis=1)
+                info = std * conflict
+                weights = info / info.sum()
+                return pd.Series(weights, index=data.columns)
+
+            def ahp(self, matrix):
+                """AHP权重计算"""
+                eigenvalue, eigenvector = np.linalg.eig(matrix)
+                max_idx = np.argmax(eigenvalue)
+                weight = eigenvector[:, max_idx].real
+                return weight / weight.sum(), eigenvalue[max_idx].real
+
+        return MSETCore()
+
+
+    def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name):
+        """评估单个风机
+        """
+        results = {
+            "engine_code": engine_code,
+            "wind_turbine_name": wind_turbine_name,
+            "mill_type": mill_type,
+            "total_health_score": None,
+            "subsystems": {},
+            "assessed_subsystems": []
+        }
+
+        # 各子系统评估
+        subsystems_to_assess = [
+            ('generator', self.subsystem_config['generator'][mill_type], 1),
+            ('nacelle', self.subsystem_config['nacelle'],1),
+            ('grid', self.subsystem_config['grid'], 1),
+            ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None,1)
+        ]
+
+        for subsystem, config, min_features in subsystems_to_assess:
+            if config is None:
+                continue
+
+            features = self._get_subsystem_features(config, data)
+
+            # 功能1:无论特征数量是否足够都输出结果
+            if len(features) >= min_features:
+                assessment = self._assess_subsystem(data[features])
+            else:
+                assessment = {
+                    'health_score': -1,  # 特征不足时输出'-'
+                    'weights': {},
+                    'message': f'Insufficient features (required {min_features}, got {len(features)})'
+                }
+
+            # 功能3:删除features内容
+            if 'features' in assessment:
+                del assessment['features']
+
+            results["subsystems"][subsystem] = assessment
+
+        # 计算整机健康度(使用新字段名)
+        if results["subsystems"]:
+            # 只计算健康值为数字的子系统
+            valid_subsystems = [
+                k for k, v in results["subsystems"].items()
+                if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
+            ]
+
+            if valid_subsystems:
+                weights = self._get_subsystem_weights(valid_subsystems)
+                health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
+                results["total_health_score"] = float(np.dot(health_scores, weights))
+                results["assessed_subsystems"] = valid_subsystems
+
+        return results
+
+
+
+
+    def _get_all_possible_features(self,assessor, mill_type, available_columns):
+        """
+        获取所有可能的特征列(基于实际存在的列)
+
+        参数:
+            assessor: HealthAssessor实例
+            mill_type: 机型类型
+            available_columns: 数据库实际存在的列名列表
+        """
+        features = []
+        available_columns_lower = [col.lower() for col in available_columns]  # 不区分大小写匹配
+
+        for subsys_name, subsys_config in assessor.subsystem_config.items():
+            # 处理子系统配置
+            if subsys_name == 'generator':
+                config = subsys_config.get(mill_type, {})
+            elif subsys_name == 'drive_train' and mill_type != 'dfig':
+                continue
+            else:
+                config = subsys_config
+
+            # 处理固定特征
+            if 'fixed' in config:
+                for f in config['fixed']:
+                    if f in available_columns:
+                        features.append(f)
+
+            # 处理关键词特征
+            if 'keywords' in config:
+                for rule in config['keywords']:
+                    matched = []
+                    include_kws = [kw.lower() for kw in rule['include']]
+                    exclude_kws = [ex.lower() for ex in rule.get('exclude', [])]
+
+                    for col in available_columns:
+                        col_lower = col.lower()
+                        # 检查包含关键词
+                        include_ok = all(kw in col_lower for kw in include_kws)
+                        # 检查排除关键词
+                        exclude_ok = not any(ex in col_lower for ex in exclude_kws)
+
+                        if include_ok and exclude_ok:
+                            matched.append(col)
+
+                    if len(matched) >= rule.get('min_count', 1):
+                        features.extend(matched)
+
+        return list(set(features))  # 去重
+
+    def _get_subsystem_features(self, config: Dict, data: pd.DataFrame) -> List[str]:
+        """最终版特征获取方法"""
+        available_features = []
+
+        # 固定特征检查(要求至少10%非空)
+        if 'fixed' in config:
+            for f in config['fixed']:
+                if f in data.columns and data[f].notna().mean() > 0.1:
+                    available_features.append(f)
+        logger.info(f"匹配到的固定特征: {available_features}")
+        # 关键词特征检查
+        if 'keywords' in config:
+            for rule in config['keywords']:
+                matched = [
+                    col for col in data.columns
+                    if all(kw.lower() in col.lower() for kw in rule['include'])
+                       and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
+                       and data[col].notna().mean() > 0.1  # 数据有效性检查
+                ]
+                if len(matched) >= rule.get('min_count', 1):
+                    available_features.extend(matched)
+        logger.info(f"匹配到的关键词特征: {available_features}")
+        return list(set(available_features))
+
+    def _assess_subsystem(self, data: pd.DataFrame) -> Dict:
+        """评估子系统(与源代码逻辑完全一致)"""
+        # 数据清洗
+        clean_data = data.dropna()
+        if len(clean_data) < 20:  # 数据量不足
+            return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'}
+
+        try:
+            # 标准化
+            normalized_data = self.mset.CRITIC_prepare(clean_data)
+
+            # 计算权重
+            weights = self.mset.CRITIC(normalized_data)
+
+            # MSET评估
+            health_score = self._run_mset_assessment(normalized_data.values, weights.values)
+
+            return {
+                'health_score': float(health_score),
+                'weights': weights.to_dict(),
+                'features': list(data.columns)
+            }
+        except Exception as e:
+            return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)}
+
+    @lru_cache(maxsize=10)
+    def _get_mset_model(self, train_data: tuple):
+        """缓存MSET模型"""
+        # 注意:由于lru_cache需要可哈希参数,这里使用元组
+        arr = np.array(train_data)
+        model = self._create_mset_core()
+        model.genDLMatrix(arr)
+        return model
+
+    def _run_mset_assessment(self, data: np.ndarray, weights: np.ndarray) -> float:
+        """执行MSET评估"""
+        # 分割训练集和测试集
+        split_idx = len(data) // 2
+        train_data = data[:split_idx]
+        test_data = data[split_idx:]
+
+        # 使用缓存模型
+        model = self._get_mset_model(tuple(map(tuple, train_data)))  # 转换为可哈希的元组
+
+        # 计算SPRT标志
+        flags = model.calcSPRT(test_data, weights)
+        return np.mean(flags)
+
+    def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
+        """生成等权重的子系统权重向量"""
+        n = len(subsystems)
+        if n == 0:
+            return np.array([])
+
+        # 直接返回等权重向量
+        return np.ones(n) / n

+ 118 - 0
app/services/HealthDataFetcher.py

@@ -0,0 +1,118 @@
+import traceback
+
+import pandas as pd
+from sqlalchemy import inspect
+
+from app.config import dataBase
+from app.database import get_engine
+from app.logger import logger
+
+
+class DataFetcher:
+
+    def get_turbine_columns(self, windcode):
+        """
+        获取指定风场数据表的所有列名
+        :param windcode: 风场编号 (如 "WF001")
+        :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
+        """
+        table_name = f"{windcode}_minute"
+        try:
+            inspector = inspect(get_engine(dataBase.DATA_DB))
+            columns = inspector.get_columns(table_name)
+            return [col['name'] for col in columns]
+
+        except Exception as e:
+            logger.error(f"Error fetching columns for {table_name}: {str(e)}")
+            return []
+
+    """
+    获取风场下所有风机信息
+    根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
+    """
+    def get_turbines(self, windcode):
+
+        query = f"""
+        SELECT engine_code, mill_type_code,engine_name 
+        FROM wind_engine_group 
+        WHERE field_code = '{windcode}'
+        """
+        return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
+    """
+    获取机型驱动类型
+    根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
+    """
+    def get_mill_type(self, mill_type_code):
+
+        query = f"""
+        SELECT curved_motion_type 
+        FROM wind_engine_mill 
+        WHERE mill_type_code = '{mill_type_code}'
+        """
+        result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
+        return result.iloc[0, 0] if not result.empty else None
+    """
+    获取风机时序数据
+    根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
+    """
+    def fetch_turbine_data(self, windcode, engine_code, month, features):
+        """获取指定月份风机数据(安全参数化版本)
+
+        Args:
+            windcode: 风场编号 (如 "WF001")
+            engine_code: 风机编号 (如 "WT001")
+            month: 月份字符串 (格式 "YYYY-MM")
+            features: 需要查询的字段列表
+        Returns:
+            pd.DataFrame: 包含查询结果的DataFrame
+        """
+        try:
+            # 1. 转换并验证月份格式
+            year_month_int = int(month.replace('-', ''))
+            if not 100001 <= year_month_int <= 999912:  # 基本格式验证
+                raise ValueError("月份格式应为YYYY-MM")
+
+            # 2. 验证特征列名安全性
+            safe_features = []
+            for feat in features:
+                if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
+                    safe_features.append(f'`{feat}`')  # 用反引号包裹
+                else:
+                    logger.info(f"警告:忽略非法特征名 '{feat}'")
+
+            if not safe_features:
+                logger.info("错误:无有效特征列")
+                return pd.DataFrame()
+
+            # 3. 构建参数化查询
+            query = f"""
+            SELECT `year_month`, {','.join(safe_features)} 
+            FROM `{windcode}_minute` 
+            WHERE `wind_turbine_number` = %s
+            AND `year_month` = %s
+            """
+
+            logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})")
+
+            # 4. 执行参数化查询
+            return pd.read_sql(query, get_engine(dataBase.DATA_DB),
+                               params=(engine_code, year_month_int))
+
+        except ValueError as e:
+            logger.error(f"输入参数错误: {traceback.print_exc()}")
+            return pd.DataFrame()
+        except Exception as e:
+            logger.error(f"数据库查询失败: {traceback.print_exc()}")
+            return pd.DataFrame()
+
+
+    def get_turbine_columns(self, windcode):
+        """获取指定风场数据表的所有列名"""
+        table_name = f"{windcode}_minute"
+        try:
+            inspector = inspect(get_engine(dataBase.DATA_DB))
+            columns = inspector.get_columns(table_name)
+            return [col['name'] for col in columns]
+        except Exception as e:
+            logger.error(f"获取列名失败: {traceback.print_exc()}")
+            return []

+ 286 - 0
app/services/MSET_Temp.py

@@ -0,0 +1,286 @@
+import math
+
+import numpy as np
+import pandas as pd
+from sqlalchemy import text
+from sklearn.neighbors import BallTree
+
+from app.config import dataBase
+from app.database import get_engine
+
+
+class MSET_Temp:
+    """
+    基于 MSET + SPRT 的温度趋势/阈值分析类。
+    查询条件由 wind_turbine_number 列和 time_stamp 范围决定,
+    SPRT 阈值固定为 0.99,calcSPRT 输出在 [-1,1]。
+    """
+
+    def __init__(self, windCode: str, windTurbineNumberList: list[str], startTime: str, endTime: str):
+        """
+        :param windCode: 风机类型或机组代码,用于拼表名。例如 "WOG01312" → 表名 "WOG01312_minute"
+        :param windTurbineNumberList: 要查询的 wind_turbine_number(风机编号)列表
+        :param startTime: 起始时间(字符串),格式 "YYYY-MM-DD HH:MM"
+        :param endTime: 结束时间(字符串),格式 "YYYY-MM-DD HH:MM"
+        """
+        self.windCode = windCode.strip()
+        self.windTurbineNumberList = windTurbineNumberList
+        # 强制保留到秒
+        self.startTime = startTime
+        self.endTime   = endTime
+
+        # D/L 矩阵相关
+        self.matrixD = None
+        self.matrixL = None
+        self.healthyResidual = None
+        self.normalDataBallTree = None
+
+
+    def _get_data_by_filter(self) -> pd.DataFrame:
+        """
+        按 wind_turbine_number 列和 time_stamp 时间范围批量查询,
+        返回一个完整的 DataFrame(已按 time_stamp 升序排序)。
+        """
+        table_name = f"{self.windCode}_minute"
+        engine = get_engine(dataBase.DATA_DB)
+        # 准备 wind_turbine_number 列表的 SQL 片段:('WT1','WT2',...)
+        turbines = ",".join(f"'{wt.strip()}'" for wt in self.windTurbineNumberList)
+        sql = text(f"""
+            SELECT *
+            FROM {table_name}
+            WHERE wind_turbine_number IN ({turbines})
+              AND time_stamp BETWEEN :start AND :end
+            ORDER BY time_stamp ASC
+        """)
+
+        df = pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime})
+        return df
+
+    def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float:
+        """
+        计算向量 x 与 y 的相似度,(0,1] 区间:
+          - m='euc' → 欧氏距离
+          - m='cbd' → 城市街区距离
+        """
+        if len(x) != len(y):
+            return 0.0
+
+        if m == 'cbd':
+            arr = [1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)]
+            return float(np.sum(arr) / len(arr))
+        else:
+            diffsq = [(p - q) ** 2 for p, q in zip(x, y)]
+            return float(1.0 / (1.0 + math.sqrt(np.sum(diffsq))))
+
+    def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int:
+        """
+        根据训练集 trainDataset 生成 D/L 矩阵:
+          - 若样本数 < dataSize4D + dataSize4L,返回 -1
+          - 否则构造 matrixD、matrixL,并用局部加权回归获得 healthyResidual,返回 0
+        """
+        m, n = trainDataset.shape
+        if m < dataSize4D + dataSize4L:
+            return -1
+
+        # Step1:每个特征的最小/最大样本加入 matrixD
+        self.matrixD = []
+        selectIndex4D = []
+        for i in range(n):
+            col_i = trainDataset[:, i]
+            idx_min = np.argmin(col_i)
+            idx_max = np.argmax(col_i)
+            self.matrixD.append(trainDataset[idx_min, :].tolist())
+            selectIndex4D.append(idx_min)
+            self.matrixD.append(trainDataset[idx_max, :].tolist())
+            selectIndex4D.append(idx_max)
+
+        # Step2:对剩余样本逐步选出“与 matrixD 平均距离最大”的样本,直至 matrixD 行数 = dataSize4D
+        while len(selectIndex4D) < dataSize4D:
+            freeList = list(set(range(len(trainDataset))) - set(selectIndex4D))
+            distAvg = []
+            for idx in freeList:
+                tmp = trainDataset[idx, :]
+                dlist = [1.0 - self.calcSimilarity(x, tmp) for x in self.matrixD]
+                distAvg.append(np.mean(dlist))
+            select_id = freeList[int(np.argmax(distAvg))]
+            self.matrixD.append(trainDataset[select_id, :].tolist())
+            selectIndex4D.append(select_id)
+
+        self.matrixD = np.array(self.matrixD)
+
+        # 用 matrixD 建 BallTree,用于局部加权回归
+        self.normalDataBallTree = BallTree(
+            self.matrixD,
+            leaf_size=4,
+            metric=lambda a, b: 1.0 - self.calcSimilarity(a, b)
+        )
+
+        # Step3:把所有训练样本都作为 matrixL
+        self.matrixL = trainDataset.copy()
+
+        # Step4:用局部加权回归算出健康残差
+        self.healthyResidual = self.calcResidualByLocallyWeightedLR(self.matrixL)
+        return 0
+
+    def calcResidualByLocallyWeightedLR(self, newStates: np.ndarray) -> np.ndarray:
+        """
+        对 newStates 中每个样本,使用 matrixD 的前 20 个最近邻做局部加权回归,计算残差。
+        返回形状 [len(newStates), 特征数] 的残差矩阵。
+        """
+        est_list = []
+        for x in newStates:
+            dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
+            w = 1.0 / (dist[0] + 1e-1)
+            w = w / np.sum(w)
+            est = np.sum([w_i * self.matrixD[j] for w_i, j in zip(w, idxs[0])], axis=0)
+            est_list.append(est)
+        est_arr = np.reshape(np.array(est_list), (len(est_list), -1))
+        return est_arr - newStates
+
+    def calcSPRT(
+            self,
+            newsStates: np.ndarray,
+            feature_weight: np.ndarray,
+            alpha: float = 0.1,
+            beta: float = 0.1,
+            decisionGroup: int = 5
+    ) -> list[float]:
+        """
+        对 newsStates 运行 Wald-SPRT,返回得分列表,长度 = len(newsStates) - decisionGroup + 1,
+        分数在 [-1, 1]:
+          - 越接近 1 → 越“异常(危险)”
+          - 越接近 -1 → 越“正常”
+        """
+        # 1) 计算残差并做特征加权
+        stateRes = self.calcResidualByLocallyWeightedLR(newsStates)
+        weightedStateResidual = [np.dot(x, feature_weight) for x in stateRes]
+        weightedHealthyResidual = [np.dot(x, feature_weight) for x in self.healthyResidual]
+
+        # 2) 健康残差的分布统计
+        mu0 = float(np.mean(weightedHealthyResidual))
+        sigma0 = float(np.std(weightedHealthyResidual))
+
+        # 3) 计算 SPRT 的上下阈值
+        lowThres = np.log(beta / (1.0 - alpha))    # < 0
+        highThres = np.log((1.0 - beta) / alpha)   # > 0
+
+        flags: list[float] = []
+        length = len(weightedStateResidual)
+        for i in range(0, length - decisionGroup + 1):
+            segment = weightedStateResidual[i : i + decisionGroup]
+            mu1 = float(np.mean(segment))
+            si = (
+                    np.sum(segment) * (mu1 - mu0) / (sigma0**2)
+                    - decisionGroup * ((mu1**2) - (mu0**2)) / (2.0 * (sigma0**2))
+            )
+
+            # 限制 si 在 [lowThres, highThres] 之内
+            si = max(min(si, highThres), lowThres)
+            # 正负归一化
+            if si > 0:
+                norm_si = float(si / highThres)
+            else:
+                norm_si = float(si / lowThres)
+            flags.append(norm_si)
+
+        return flags
+
+    def check_threshold(self) -> pd.DataFrame:
+        """
+        阈值分析(阈值 0.99),长格式:
+        返回所有存在通道的数据,缺失的通道自动跳过。
+        """
+        THRESHOLD = 0.99
+        df = self._get_data_by_filter()
+        if df.empty:
+            return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
+
+        # 四个通道英文名
+        temp_cols_all = [
+            'main_bearing_temperature',
+            'gearbox_oil_temperature',
+            'generatordrive_end_bearing_temperature',
+            'generatornon_drive_end_bearing_temperature'
+        ]
+        # 只保留存在的列
+        temp_cols = [c for c in temp_cols_all if c in df.columns]
+        if not temp_cols:
+            return pd.DataFrame(columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
+
+        # 转数值 & 时间
+        df[temp_cols] = df[temp_cols].apply(pd.to_numeric, errors='coerce')
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
+        records = []
+        # 英文→中文映射
+        cn_map = {
+            'main_bearing_temperature': '主轴承温度',
+            'gearbox_oil_temperature': '齿轮箱油温',
+            'generatordrive_end_bearing_temperature': '发电机驱动端轴承温度',
+            'generatornon_drive_end_bearing_temperature': '发电机非驱动端轴承温度'
+        }
+
+        for col in temp_cols:
+            sub = df[['time_stamp', col]].dropna()
+            if sub.empty:
+                continue
+            arr = sub[col].values.reshape(-1,1)
+            ts  = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist()
+            half = len(arr) // 2
+            train = arr[:half]
+            test  = arr[half:]
+            # 不足则跳过该通道
+            if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0:
+                continue
+
+            flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1)
+            for i, score in enumerate(flags):
+                records.append({
+                    "time_stamp": ts[half + i],
+                    "temp_channel": cn_map[col],
+                    "SPRT_score": score,
+                    "status": "危险" if score > THRESHOLD else "正常"
+                })
+
+        return pd.DataFrame(records, columns=["time_stamp", "temp_channel", "SPRT_score", "status"])
+
+    def get_trend(self) -> dict:
+        """
+        趋势分析:对每个通道单独计算,缺失或训练不足时输出空结构。
+        """
+        df = self._get_data_by_filter()
+        # 英文→输出字段名
+        key_map = {
+            'main_bearing_temperature': 'main_bearing',
+            'gearbox_oil_temperature': 'gearbox_oil',
+            'generatordrive_end_bearing_temperature': 'generator_drive_end',
+            'generatornon_drive_end_bearing_temperature': 'generator_nondrive_end'
+        }
+        # 预置结果
+        result = {v: {} for v in key_map.values()}
+
+        if df.empty:
+            return {"data": result, "code": 200, "message": "success"}
+
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
+        for col, out_key in key_map.items():
+            if col not in df.columns:
+                continue
+            sub = df[['time_stamp', col]].dropna()
+            if sub.empty:
+                continue
+            vals = pd.to_numeric(sub[col], errors='coerce').values
+            ts_list = sub['time_stamp'].dt.strftime("%Y-%m-%d %H:%M:%S").tolist()
+            half = len(vals) // 2
+            train = vals[:half].reshape(-1,1)
+            test  = vals[half:].reshape(-1,1)
+            # 训练不足时输出空列表
+            if self.genDLMatrix(train, dataSize4D=60, dataSize4L=5) != 0:
+                flags = []
+            else:
+                flags = self.calcSPRT(test, np.array([1.0]), decisionGroup=1)
+            result[out_key] = {
+                "timestamps": ts_list[half:],
+                "values": flags
+            }
+
+        return {"data": result, "code": 200, "message": "success"}

+ 47 - 0
requirements.txt

@@ -0,0 +1,47 @@
+annotated-types==0.7.0
+anyio==4.9.0
+asgiref==3.8.1
+attrs==25.3.0
+click==8.2.1
+coverage==7.9.0
+ecdsa==0.19.1
+fastapi==0.115.13
+greenlet==3.2.3
+h11==0.16.0
+httptools==0.6.4
+idna==3.10
+iniconfig==2.1.0
+joblib==1.5.1
+numpy==2.3.0
+packaging==25.0
+pandas==2.3.0
+passlib==1.7.4
+pluggy==1.6.0
+py==1.11.0
+pyasn1==0.6.1
+pydantic==2.11.7
+pydantic_core==2.33.2
+PyMySQL==1.1.1
+pytest==6.2.5
+pytest-cov==2.12.1
+python-dateutil==2.9.0.post0
+python-dotenv==0.19.2
+python-jose==3.5.0
+pytz==2025.2
+PyYAML==6.0.2
+rsa==4.9.1
+scikit-learn==1.7.0
+scipy==1.15.3
+six==1.17.0
+sniffio==1.3.1
+SQLAlchemy==2.0.41
+starlette==0.46.2
+threadpoolctl==3.6.0
+toml==0.10.2
+typing-inspection==0.4.1
+typing_extensions==4.14.0
+tzdata==2025.2
+uvicorn==0.15.0
+uvloop==0.21.0
+watchfiles==1.0.5
+websockets==15.0.1