123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- import math
- import os
- import joblib
- import numpy as np
- import pandas as pd
- from sqlalchemy import text
- from sklearn.neighbors import BallTree
- from app.config import dataBase
- from app.database import get_engine
- from typing import Dict
- class MSET_Temp:
- """
- MSET + SPRT 温度分析类:
- - 离线训练:genDLMatrix → save_model
- - 在线推理:load_model → predict_SPRT
- """
- def __init__(self,
- windCode: str,
- windTurbineNumberList: list[str],
- startTime: str,
- endTime: str):
- self.windCode = windCode.strip()
- self.windTurbineNumberList = windTurbineNumberList or []
- self.startTime = startTime
- self.endTime = endTime
- # 离线训练/加载后赋值
- self.matrixD = None
- self.healthyResidual = None
- self.normalDataBallTree = None
- # SPRT 参数(离线训练时设置)
- self.feature_weight: np.ndarray | None = None
- self.alpha: float = 0.1
- self.beta: float = 0.1
- def _get_data_by_filter(self) -> pd.DataFrame:
- """
- 在线推理专用:根据 self.windTurbineNumberList & 时间拉数据;
- 如果列表为空,则拉全场数据。
- """
- table = f"{self.windCode}_minute"
- engine = get_engine(dataBase.DATA_DB)
- if self.windTurbineNumberList:
- turbines = ",".join(f"'{t}'" for t in self.windTurbineNumberList)
- cond = f"wind_turbine_number IN ({turbines}) AND time_stamp BETWEEN :start AND :end"
- else:
- cond = "time_stamp BETWEEN :start AND :end"
- sql = text(f""" SELECT * FROM {table} WHERE {cond} ORDER BY time_stamp ASC """)
- return pd.read_sql(sql, engine, params={"start": self.startTime, "end": self.endTime})
- def calcSimilarity(self, x: np.ndarray, y: np.ndarray, m: str = 'euc') -> float:
- if len(x) != len(y):
- return 0.0
- if m == 'cbd':
- return float(np.mean([1.0 / (1.0 + abs(p - q)) for p, q in zip(x, y)]))
- diffsq = np.sum((x - y) ** 2)
- return float(1.0 / (1.0 + math.sqrt(diffsq)))
- def genDLMatrix(self, trainDataset: np.ndarray, dataSize4D=100, dataSize4L=50) -> int:
- """
- 离线训练:构造 matrixD/matrixL/healthyResidual/BallTree
- """
- m, n = trainDataset.shape
- if m < dataSize4D + dataSize4L:
- return -1
- # Step1:每维最小/最大入 D
- D_idx, D = [], []
- for i in range(n):
- col = trainDataset[:, i]
- for idx in (np.argmin(col), np.argmax(col)):
- D.append(trainDataset[idx].tolist())
- D_idx.append(idx)
- # Step2:挑样本至 dataSize4D
- while len(D_idx) < dataSize4D:
- free = list(set(range(m)) - set(D_idx))
- scores = [(np.mean([1 - self.calcSimilarity(trainDataset[i], d) for d in D]), i)
- for i in free]
- _, pick = max(scores)
- D.append(trainDataset[pick].tolist())
- D_idx.append(pick)
- self.matrixD = np.array(D)
- # BallTree + healthyResidual
- self.normalDataBallTree = BallTree(
- self.matrixD,
- leaf_size=4,
- metric=lambda a, b: 1.0 - self.calcSimilarity(a, b)
- )
- # healthyResidual
- ests = []
- for x in trainDataset:
- dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
- w = 1.0 / (dist[0] + 1e-1)
- w /= w.sum()
- ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0))
- self.healthyResidual = np.array(ests) - trainDataset
- return 0
- def calcSPRT(self,
- newsStates: np.ndarray,
- feature_weight: np.ndarray,
- alpha: float = 0.1,
- beta: float = 0.1,
- decisionGroup: int = 5) -> list[float]:
- """
- Wald-SPRT 得分
- """
- # 新状态残差
- ests = []
- for x in newsStates:
- dist, idxs = self.normalDataBallTree.query([x], k=20, return_distance=True)
- w = 1.0 / (dist[0] + 1e-1);
- w /= w.sum()
- ests.append(np.sum([wi * self.matrixD[j] for wi, j in zip(w, idxs[0])], axis=0))
- resN = np.array(ests) - newsStates
- # 加权
- wN = [np.dot(r, feature_weight) for r in resN]
- wH = [np.dot(r, feature_weight) for r in self.healthyResidual]
- mu0, sigma0 = np.mean(wH), np.std(wH)
- low = math.log(beta / (1 - alpha));
- high = math.log((1 - beta) / alpha)
- flags = []
- for i in range(len(wN) - decisionGroup + 1):
- seg = wN[i:i + decisionGroup];
- mu1 = np.mean(seg)
- si = (sum(seg) * (mu1 - mu0) / sigma0 ** 2
- - decisionGroup * ((mu1 ** 2 - mu0 ** 2) / (2 * sigma0 ** 2)))
- si = max(min(si, high), low)
- flags.append(si / high if si > 0 else si / low)
- return flags
- def predict_SPRT(self,
- newsStates: np.ndarray,
- decisionGroup: int = 5) -> list[float]:
- """
- 在线推理:用离线保存的 matrixD/healthyResidual/feature_weight/alpha/beta
- """
- return self.calcSPRT(
- newsStates,
- self.feature_weight,
- alpha=self.alpha,
- beta=self.beta,
- decisionGroup=decisionGroup
- )
- def save_model(self, path: str):
- """
- Save matrixD, healthyResidual, feature_weight, alpha, beta
- """
- os.makedirs(os.path.dirname(path), exist_ok=True)
- joblib.dump({
- 'matrixD': self.matrixD,
- 'healthyResidual': self.healthyResidual,
- 'feature_weight': self.feature_weight,
- 'alpha': self.alpha,
- 'beta': self.beta,
- }, path)
- @classmethod
- def load_model(cls, path: str) -> 'MSET_Temp':
- """
- Load + rebuild BallTree
- """
- data = joblib.load(path)
- inst = cls('', [], '', '')
- inst.matrixD = data['matrixD']
- inst.healthyResidual = data['healthyResidual']
- inst.feature_weight = data['feature_weight']
- inst.alpha = data['alpha']
- inst.beta = data['beta']
- inst.normalDataBallTree = BallTree(
- inst.matrixD,
- leaf_size=4,
- metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b)
- )
- return inst
- def query_surrounding_data(self, timestamp: str, minutes_around: int = 250) -> Dict:
- """
- 查询指定时间点前后50个点的数据
- 参数:
- timestamp: 中心时间点,格式为 'yyyy-mm-dd HH:MM:SS'
- minutes_around: 查询前后多少分钟的数据
- 返回:
- {
- 'record_count': int,
- 'records': List[Dict],
- 'columns_mapping': Dict[str, str] # 字段中英文映射
- }
- """
- # 中英文映射字典
- cn_map = {
- 'wind_turbine_name':'风机名称',
- 'time_stamp': '时间',
- 'active_power': '有功功率(kW)',
- 'rotor_speed': '风轮转速(rpm)',
- 'generator_speed':'发电机转速(rpm)',
- 'wind_velocity': '风速(m/s)',
- 'pitch_angle_blade_1':'桨距角1(°)',
- 'pitch_angle_blade_2':'桨距角2(°)',
- 'pitch_angle_blade_3':'桨距角3(°)',
- 'cabin_position':'机舱位置(°)',
- 'true_wind_direction':'绝对风向(°)',
- 'yaw_error1':'对风角度(°)',
- 'set_value_of_active_power':'有功功率设定值(kW)',
- 'gearbox_oil_temperature':'齿轮箱油温(℃)',
- 'generatordrive_end_bearing_temperature':'发电机驱动端轴承温度(℃)',
- 'generatornon_drive_end_bearing_temperature':'发电机非驱动端轴承温度(℃)',
- 'cabin_temperature':'机舱内温度(℃)',
- 'twisted_cable_angle':'扭缆角度(°)',
- 'outside_cabin_temperature':'环境温度(℃)',
- 'main_bearing_temperature':'主轴承轴承温度(℃)',
- 'main_bearing_temperature_2': '主轴承轴承温度2(℃)',
- 'gearbox_high_speed_shaft_bearing_temperature':'齿轮箱高速轴轴承温度(℃)',
- 'gearboxmedium_speed_shaftbearing_temperature':'齿轮箱中速轴轴承温度(℃)',
- 'gearbox_low_speed_shaft_bearing_temperature':'齿轮箱低速轴轴承温度(℃)',
- 'generator_winding1_temperature':'发电机绕组1温度(℃)',
- 'generator_winding2_temperature':'发电机绕组2温度(℃)',
- 'generator_winding3_temperature':'发电机绕组3温度(℃)',
- 'grid_a_phase_current':'电网A相电流(A)',
- 'grid_b_phase_current': '电网B相电流(A)',
- 'grid_c_phase_current': '电网C相电流(A)'
- }
- table = f"{self.windCode}_minute"
- engine = get_engine(dataBase.DATA_DB)
- # 查询数据
- sql = text(f"""
- SELECT *
- FROM {table}
- WHERE wind_turbine_number IN ({','.join([f"'{t}'" for t in self.windTurbineNumberList])})
- AND time_stamp BETWEEN
- DATE_SUB(:timestamp, INTERVAL :minutes MINUTE)
- AND DATE_ADD(:timestamp, INTERVAL :minutes MINUTE)
- ORDER BY time_stamp ASC
- """)
-
- df = pd.read_sql(sql, engine, params={
- "timestamp": timestamp,
- "minutes": minutes_around
- })
- # 打印查询到的数据条数
- record_count = len(df)
- print(f"查询到 {record_count} 条数据")
- if df.empty:
- return {
- 'record_count': 0,
- 'records': [],
- 'columns_mapping': {}
- }
- # 删除空列和不需要的列
- cols_to_drop = ['wind_turbine_number', 'reactive_power','lab', 'year', 'month','day','year_month','front_back_vibration_of_the_cabin','side_to_side_vibration_of_the_cabin',
- 'actual_torque','given_torque','clockwise_yaw_count','counterclockwise_yaw_count','unusable','power_curve_available','required_gearbox_speed','inverter_speed_master_control',
- 'wind_turbine_status','wind_turbine_status2','turbulence_intensity'
- ]
- cols_to_drop = [col for col in cols_to_drop if col in df.columns]
- df = df.drop(columns=cols_to_drop)
- df = df.dropna(axis=1, how='all')
- # 转换字段名和格式
- df['time_stamp'] = df['time_stamp'].astype(str)
- records = df.rename(columns=cn_map).to_dict('records')
- return {
- 'record_count': record_count,
- 'records': records
- }
|