| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327 |
- import pymysql
- import threading
- from typing import List, Dict, Any, Optional, Tuple
- import logging
- from datetime import datetime
- from collections import Counter
- import statistics
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class ConnectionPool:
- """MySQL数据库连接池"""
-
- def __init__(self, host, port, user, password, database, charset='utf8mb4',
- max_connections=2, mem_quota=4 << 30):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
- self.max_connections = max_connections
- self.mem_quota = mem_quota
-
- self._lock = threading.Lock()
- self._connections = []
- self._in_use = set()
-
- def _create_connection(self):
- """创建新连接并设置内存配额"""
- try:
- conn = pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset=self.charset,
- cursorclass=pymysql.cursors.DictCursor
- )
-
- # 设置会话内存配额
- with conn.cursor() as cursor:
- cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}")
- conn.commit()
-
- logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}")
- return conn
- except Exception as e:
- logger.error(f"创建数据库连接失败: {e}")
- raise
-
- def get_connection(self):
- """从连接池获取连接"""
- with self._lock:
- # 如果有空闲连接,返回一个
- for conn in self._connections:
- if conn not in self._in_use:
- self._in_use.add(conn)
- logger.debug(f"从连接池获取现有连接")
- return conn
-
- # 如果没有空闲连接但可以创建新连接
- if len(self._connections) < self.max_connections:
- conn = self._create_connection()
- self._connections.append(conn)
- self._in_use.add(conn)
- logger.debug(f"创建新连接,当前连接数: {len(self._connections)}")
- return conn
-
- # 连接池已满,等待
- logger.warning("连接池已满,等待可用连接...")
- import time
- time.sleep(1)
- return self.get_connection()
-
- def release_connection(self, conn):
- """释放连接回连接池"""
- with self._lock:
- if conn in self._in_use:
- self._in_use.remove(conn)
- logger.debug(f"释放连接回连接池")
-
- def close_all(self):
- """关闭所有连接"""
- with self._lock:
- for conn in self._connections:
- try:
- conn.close()
- except:
- pass
- self._connections.clear()
- self._in_use.clear()
- logger.info("已关闭所有数据库连接")
- class DatabaseConfig:
- """数据库配置类"""
- def __init__(self, host='192.168.50.234', port=4000, user='root',
- password='123456', database='wind_data', charset='utf8mb4',
- max_connections=2, mem_quota=4 << 30):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
- self.max_connections = max_connections
- self.mem_quota = mem_quota
- class ManufacturerInfo:
- """主机厂商信息库,用于合理性校验"""
-
- # 各厂商典型传动比范围(基于公开数据)
- TRANSMISSION_RATIOS = {
- # 直驱风机:传动比接近1
- "金风科技": {"type": "直驱", "ratio_range": (0.95, 1.05)},
- "远景能源": {"type": "直驱", "ratio_range": (0.95, 1.05)},
- "明阳智能": {"type": "半直驱", "ratio_range": (30.0, 50.0)},
- "运达股份": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "上海电气": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "东方电气": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "华锐风电": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "联合动力": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "海装风电": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "Vestas": {"type": "双馈", "ratio_range": (70.0, 100.0)},
- "GE": {"type": "直驱", "ratio_range": (0.95, 1.05)},
- "Siemens": {"type": "直驱", "ratio_range": (0.95, 1.05)},
- }
-
- # 各容量等级典型叶轮转速范围(rpm)
- ROTOR_SPEED_RANGES = {
- 1500: (15.0, 20.0), # 1.5MW
- 2000: (14.0, 18.0), # 2.0MW
- 2500: (13.0, 17.0), # 2.5MW
- 3000: (12.0, 16.0), # 3.0MW
- 3500: (11.0, 15.0), # 3.5MW
- 4000: (10.0, 14.0), # 4.0MW
- 5000: (9.0, 13.0), # 5.0MW
- 6000: (8.0, 12.0), # 6.0MW
- }
-
- # 各类型风机典型发电机转速范围(rpm)
- GENERATOR_SPEED_RANGES = {
- "直驱": (10.0, 30.0), # 直驱:低速发电机
- "双馈": (1000.0, 1800.0), # 双馈:接近同步转速
- "半直驱": (300.0, 800.0), # 半直驱:中速发电机
- }
-
- @classmethod
- def get_manufacturer_info(cls, manufacturer: str) -> Dict:
- """获取厂商信息"""
- manufacturer_lower = str(manufacturer).strip().lower() if manufacturer else ""
-
- for key, value in cls.TRANSMISSION_RATIOS.items():
- if key.lower() in manufacturer_lower:
- return value
-
- return {"type": "未知", "ratio_range": (0.5, 200.0)}
-
- @classmethod
- def get_rotor_speed_range(cls, rated_capacity: int) -> Tuple[float, float]:
- """根据额定容量获取合理的叶轮转速范围"""
- if not rated_capacity:
- return (8.0, 25.0)
-
- # 找到最接近的容量等级
- capacities = sorted(cls.ROTOR_SPEED_RANGES.keys())
- closest_capacity = min(capacities, key=lambda x: abs(x - rated_capacity))
- return cls.ROTOR_SPEED_RANGES.get(closest_capacity, (8.0, 25.0))
-
- @classmethod
- def get_generator_speed_range(cls, turbine_type: str) -> Tuple[float, float]:
- """根据风机类型获取合理的发电机转速范围"""
- return cls.GENERATOR_SPEED_RANGES.get(turbine_type, (0.0, 2000.0))
- class SCADADataProcessor:
- """SCADA数据处理类,用于计算额定转速和传动比"""
-
- @staticmethod
- def calculate_robust_mode(values: List[float], decimal_places: int = 1) -> float:
- """
- 计算稳健的众数(使用核密度估计)
-
- Args:
- values: 数值列表
- decimal_places: 保留的小数位数
-
- Returns:
- 众数值
- """
- if not values:
- return 0.0
-
- # 1. 数据清洗:移除异常值(使用IQR方法)
- if len(values) >= 10:
- q1, q3 = np.percentile(values, [25, 75])
- iqr = q3 - q1
- lower_bound = q1 - 1.5 * iqr
- upper_bound = q3 + 1.5 * iqr
- filtered_values = [v for v in values if lower_bound <= v <= upper_bound]
-
- if filtered_values:
- values = filtered_values
-
- # 2. 对值进行四舍五入处理
- rounded_values = [round(v, decimal_places) for v in values]
-
- # 3. 使用Counter统计频率
- counter = Counter(rounded_values)
-
- if not counter:
- return 0.0
-
- # 4. 找到最高频率
- max_count = max(counter.values())
-
- # 5. 获取所有具有最高频率的值
- modes = [value for value, count in counter.items() if count == max_count]
-
- # 6. 如果有多个众数,返回平均值
- if len(modes) > 1:
- mode_value = sum(modes) / len(modes)
- logger.debug(f"多个众数: {modes}, 使用平均值: {mode_value}")
- else:
- mode_value = modes[0]
-
- logger.debug(f"众数统计: 值={mode_value}, 频次={max_count}, 总数据点={len(values)}")
- return mode_value
-
- @staticmethod
- def calculate_rated_parameters_with_validation(
- rotor_speeds: List[float],
- gen_speeds: List[float],
- manufacturer: str,
- rated_capacity: int
- ) -> Tuple[float, float, float, str]:
- """
- 计算额定参数并进行合理性校验
-
- Args:
- rotor_speeds: 叶轮转速列表
- gen_speeds: 发电机转速列表
- manufacturer: 制造商
- rated_capacity: 额定容量(kW)
-
- Returns:
- Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type)
- """
- if not rotor_speeds or not gen_speeds:
- logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}")
- return 0.0, 0.0, 0.0, "未知"
-
- # 获取厂商信息
- manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer)
- expected_type = manu_info["type"]
- expected_ratio_range = manu_info["ratio_range"]
-
- # 获取合理的转速范围
- rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity)
- gen_range = ManufacturerInfo.get_generator_speed_range(expected_type)
-
- logger.debug(f"厂商: {manufacturer}, 预期类型: {expected_type}, "
- f"传动比范围: {expected_ratio_range}, "
- f"叶轮转速范围: {rotor_range}, 发电机转速范围: {gen_range}")
-
- # 第一阶段:初始计算
- # 叶轮转速:保留1位小数
- rated_rotor_spd = round(SCADADataProcessor.calculate_robust_mode(rotor_speeds, 1), 1)
-
- # 发电机转速:先计算众数,然后取整
- gen_mode = SCADADataProcessor.calculate_robust_mode(gen_speeds, 1)
- rated_gen_spd = int(round(gen_mode))
-
- # 计算传动比:保留3位小数
- if rated_rotor_spd > 0:
- transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3)
- else:
- transmission_ratio = 0.0
-
- # 第二阶段:合理性校验与调整
- adjustments = []
-
- # 1. 校验叶轮转速
- if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]):
- adjustments.append(f"叶轮转速超出范围: {rated_rotor_spd:.1f} rpm, 合理范围: {rotor_range}")
- # 使用中位数并四舍五入到1位小数
- rated_rotor_spd = round(float(statistics.median(rotor_speeds)), 1)
- rated_rotor_spd = max(rotor_range[0], min(rotor_range[1], rated_rotor_spd))
-
- # 2. 校验发电机转速
- if not (gen_range[0] <= rated_gen_spd <= gen_range[1]):
- adjustments.append(f"发电机转速超出范围: {rated_gen_spd} rpm, 合理范围: {gen_range}")
- # 使用中位数并取整
- gen_median = float(statistics.median(gen_speeds))
- rated_gen_spd = int(round(gen_median))
- rated_gen_spd = int(max(gen_range[0], min(gen_range[1], rated_gen_spd)))
-
- # 3. 重新计算传动比
- if rated_rotor_spd > 0:
- transmission_ratio = round(rated_gen_spd / rated_rotor_spd, 3)
- else:
- transmission_ratio = 0.0
-
- # 4. 校验传动比
- if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]):
- adjustments.append(f"传动比超出范围: {transmission_ratio:.3f}, 合理范围: {expected_ratio_range}")
-
- # 如果传动比异常,基于预期类型调整
- if expected_type == "直驱":
- # 直驱:传动比应接近1
- transmission_ratio = 1.000
- rated_gen_spd = int(round(rated_rotor_spd))
- elif expected_type == "双馈":
- # 双馈:典型传动比90:1
- transmission_ratio = 90.000
- rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio))
- elif expected_type == "半直驱":
- # 半直驱:典型传动比40:1
- transmission_ratio = 40.000
- rated_gen_spd = int(round(rated_rotor_spd * transmission_ratio))
-
- # 第三阶段:最终校验与类型判断
- # 基于最终传动比判断类型
- if transmission_ratio <= 1.2:
- turbine_type = "直驱"
- elif 1.2 < transmission_ratio <= 30:
- turbine_type = "半直驱"
- elif 30 < transmission_ratio <= 120:
- turbine_type = "双馈"
- else:
- turbine_type = "未知"
-
- # 如果计算的类型与预期不符,使用预期类型
- if expected_type != "未知" and turbine_type != expected_type:
- adjustments.append(f"类型不符: 计算={turbine_type}, 预期={expected_type}")
- turbine_type = expected_type
-
- # 记录调整信息
- if adjustments:
- logger.info(f"参数调整: {manufacturer} {rated_capacity}kW - " + "; ".join(adjustments))
-
- # 最终格式化
- # 叶轮转速:保留1位小数
- rated_rotor_spd = round(rated_rotor_spd, 1)
- # 发电机转速:整数
- rated_gen_spd = int(rated_gen_spd)
- # 传动比:保留3位小数
- transmission_ratio = round(transmission_ratio, 3)
-
- logger.debug(f"最终结果: 转子转速={rated_rotor_spd:.1f} rpm, "
- f"发电机转速={rated_gen_spd} rpm, "
- f"传动比={transmission_ratio:.3f}, "
- f"类型={turbine_type}")
-
- return rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type
-
- @staticmethod
- def calculate_rated_speeds_and_ratio(
- data: List[Dict[str, Any]],
- manufacturer: str = "",
- rated_capacity: int = 0
- ) -> Tuple[float, float, float, str]:
- """
- 计算额定叶轮转速、额定发电机转速、传动比和风机类型
-
- Args:
- data: SCADA数据列表
- manufacturer: 制造商(用于合理性校验)
- rated_capacity: 额定容量(用于合理性校验)
-
- Returns:
- Tuple[float, float, float, str]: (rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type)
- """
- if not data:
- return 0.0, 0.0, 0.0, "未知"
-
- # 提取数据
- rotor_speeds = []
- gen_speeds = []
-
- for record in data:
- rotor_spd = record.get('rotor_spd')
- gen_spd = record.get('gen_spd')
-
- if rotor_spd is not None and gen_spd is not None:
- try:
- rotor_val = float(rotor_spd)
- gen_val = float(gen_spd)
-
- # 基本数据清洗
- if 0 < rotor_val < 50 and 0 < gen_val < 2500:
- rotor_speeds.append(rotor_val)
- gen_speeds.append(gen_val)
- except (ValueError, TypeError):
- continue
-
- if not rotor_speeds or not gen_speeds:
- logger.warning(f"有效数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}")
- return 0.0, 0.0, 0.0, "未知"
-
- # 计算额定参数
- return SCADADataProcessor.calculate_rated_parameters_with_validation(
- rotor_speeds,
- gen_speeds,
- manufacturer,
- rated_capacity
- )
- # SQL语句定义
- CREATE_MODEL_TURBINE_TABLE_SQL = """
- CREATE TABLE IF NOT EXISTS info_model_turbine (
- id INT AUTO_INCREMENT PRIMARY KEY,
- no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识',
- model VARCHAR(100) COMMENT '机型',
- manufacturer VARCHAR(100) COMMENT '制造商',
- rated_capacity INT COMMENT '额定容量(kW)',
- cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)',
- cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)',
- rotor_diameter INT COMMENT '叶轮直径(m)',
- hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)',
- turbine_count INT DEFAULT 0 COMMENT '该机型风机数量',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY idx_no_model (no_model),
- INDEX idx_model (model),
- INDEX idx_manufacturer (manufacturer),
- INDEX idx_rated_capacity (rated_capacity)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表';
- """
- ALTER_MODEL_TURBINE_TABLE_SQL = """
- ALTER TABLE info_model_turbine
- ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 1) COMMENT '额定叶轮转速(rpm)',
- ADD COLUMN IF NOT EXISTS rated_gen_spd INT COMMENT '额定发电机转速(rpm)',
- ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 3) COMMENT '传动比',
- ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)',
- ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间',
- ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数',
- ADD COLUMN IF NOT EXISTS calculation_status VARCHAR(20) DEFAULT 'pending' COMMENT '计算状态(pending/success/error/adjusted)',
- ADD COLUMN IF NOT EXISTS validation_info TEXT COMMENT '合理性校验信息';
- """
- UPDATE_MODEL_PARAMETERS_SQL = """
- UPDATE info_model_turbine
- SET rated_rotor_spd = %s,
- rated_gen_spd = %s,
- transmission_ratio = %s,
- turbine_type = %s,
- calculation_time = %s,
- data_points = %s,
- calculation_status = %s,
- validation_info = %s,
- updated_at = CURRENT_TIMESTAMP
- WHERE no_model = %s
- """
- # 其他SQL语句
- SELECT_MODEL_DATA_SQL = """
- SELECT
- CONCAT(
- IFNULL(model, ''),
- '-',
- IFNULL(cut_in_wind_speed, ''),
- '-',
- IFNULL(cut_out_wind_speed, ''),
- '-',
- IFNULL(hub_height, '')
- ) AS no_model,
- model,
- manufacturer,
- rated_capacity,
- cut_in_wind_speed,
- cut_out_wind_speed,
- rotor_diameter,
- hub_height,
- COUNT(*) AS turbine_count
- FROM info_turbine
- WHERE model IS NOT NULL
- GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
- ORDER BY model, manufacturer, rated_capacity;
- """
- SELECT_NO_MODEL_LIST_SQL = """
- SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model;
- """
- SELECT_SCADA_FOR_NO_MODEL_SQL = """
- SELECT
- it.wind_farm_id,
- it.turbine_id,
- it.model,
- icpt.rated_wind_speed,
- imt.no_model,
- imt.rated_capacity,
- imt.cut_in_wind_speed,
- imt.cut_out_wind_speed,
- imt.rotor_diameter,
- imt.hub_height,
- dst.data_time,
- dst.wind_spd,
- dst.rotor_spd,
- dst.gen_spd,
- dst.p_active
- FROM info_turbine it,
- info_curve_power_turbine icpt,
- info_model_turbine imt,
- data_scada_turbine dst
- WHERE 1=1
- AND it.wind_farm_id = dst.id_farm
- AND it.turbine_id = dst.id_turbine
- AND it.model = dst.no_model_turbine
- AND it.model = imt.model
- AND it.cut_in_wind_speed = imt.cut_in_wind_speed
- AND it.cut_out_wind_speed = imt.cut_out_wind_speed
- AND it.rotor_diameter = imt.rotor_diameter
- AND it.hub_height = imt.hub_height
- AND it.wind_farm_id = icpt.wind_farm_id
- AND it.model = icpt.standard_model
- AND imt.no_model = %s
- AND dst.wind_spd >= icpt.rated_wind_speed
- AND dst.p_active >= imt.rated_capacity * 0.95
- AND dst.p_active <= imt.rated_capacity * 1.05
- AND dst.rotor_spd IS NOT NULL
- AND dst.gen_spd IS NOT NULL
- AND dst.rotor_spd > 0
- AND dst.gen_spd > 0
- ORDER BY dst.data_time;
- """
- DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
- CHECK_TABLE_EXISTS_SQL = """
- SELECT COUNT(*) as table_exists
- FROM information_schema.tables
- WHERE table_schema = %s AND table_name = 'info_model_turbine'
- """
- CHECK_TABLE_COLUMNS_SQL = """
- SELECT COLUMN_NAME
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine'
- """
- class ModelTurbineManager:
- """风机机型信息管理器"""
-
- def __init__(self, db_config: DatabaseConfig):
- self.db_config = db_config
- self.connection_pool = None
- self._initialize_connection_pool()
-
- def _initialize_connection_pool(self):
- """初始化数据库连接池"""
- self.connection_pool = ConnectionPool(
- host=self.db_config.host,
- port=self.db_config.port,
- user=self.db_config.user,
- password=self.db_config.password,
- database=self.db_config.database,
- charset=self.db_config.charset,
- max_connections=self.db_config.max_connections,
- mem_quota=self.db_config.mem_quota
- )
- logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}")
-
- def get_connection(self):
- """从连接池获取连接"""
- return self.connection_pool.get_connection()
-
- def release_connection(self, conn):
- """释放连接回连接池"""
- self.connection_pool.release_connection(conn)
-
- def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
- """
- 执行查询语句
-
- Args:
- sql: SQL查询语句
- params: SQL参数
-
- Returns:
- 查询结果列表
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, params)
- result = cursor.fetchall()
- logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
- return result
- except Exception as e:
- logger.error(f"查询执行失败: {e}")
- logger.error(f"SQL: {sql}")
- if params:
- logger.error(f"参数: {params}")
- raise
- finally:
- self.release_connection(conn)
-
- def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
- """
- 执行更新语句(INSERT, UPDATE, DELETE)
-
- Args:
- sql: SQL更新语句
- params: SQL参数
-
- Returns:
- 影响的行数
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- affected_rows = cursor.execute(sql, params)
- conn.commit()
- logger.debug(f"更新执行成功,影响 {affected_rows} 行")
- return affected_rows
- except Exception as e:
- conn.rollback()
- logger.error(f"更新执行失败: {e}")
- logger.error(f"SQL: {sql}")
- if params:
- logger.error(f"参数: {params}")
- raise
- finally:
- self.release_connection(conn)
-
- def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int:
- """
- 批量执行更新语句
-
- Args:
- sql: SQL更新语句
- params_list: SQL参数列表
-
- Returns:
- 影响的行数
- """
- conn = self.get_connection()
- if not conn:
- raise Exception("无法从连接池获取数据库连接")
-
- try:
- with conn.cursor() as cursor:
- affected_rows = cursor.executemany(sql, params_list)
- conn.commit()
- logger.debug(f"批量更新执行成功,影响 {affected_rows} 行")
- return affected_rows
- except Exception as e:
- conn.rollback()
- logger.error(f"批量更新执行失败: {e}")
- logger.error(f"SQL: {sql}")
- raise
- finally:
- self.release_connection(conn)
-
- def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
- """
- 检查表是否存在
-
- Args:
- table_name: 表名
-
- Returns:
- bool: 表是否存在
- """
- try:
- result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
- return result[0]['table_exists'] > 0
- except Exception as e:
- logger.error(f"检查表存在性失败: {e}")
- return False
-
- def check_table_columns(self) -> List[str]:
- """
- 检查表的列
-
- Returns:
- List[str]: 列名列表
- """
- try:
- result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,))
- return [row['COLUMN_NAME'] for row in result]
- except Exception as e:
- logger.error(f"检查表列失败: {e}")
- return []
-
- def add_missing_columns(self):
- """添加缺失的列"""
- try:
- logger.info("检查并添加缺失的列...")
- self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL)
- logger.info("表结构更新完成")
- except Exception as e:
- logger.error(f"更新表结构失败: {e}")
-
- def create_model_turbine_table(self) -> bool:
- """
- 创建风机机型信息表
-
- Returns:
- bool: 是否成功创建表
- """
- try:
- logger.info("开始创建风机机型信息表...")
- self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表创建成功")
- return True
- except Exception as e:
- logger.error(f"创建风机机型信息表失败: {e}")
- return False
-
- def drop_model_turbine_table(self) -> bool:
- """
- 删除风机机型信息表
-
- Returns:
- bool: 是否成功删除表
- """
- try:
- logger.info("开始删除风机机型信息表...")
- self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表删除成功")
- return True
- except Exception as e:
- logger.error(f"删除风机机型信息表失败: {e}")
- return False
-
- def get_model_data(self) -> List[Dict[str, Any]]:
- """
- 从info_turbine表获取机型分组数据
-
- Returns:
- 机型数据列表
- """
- try:
- logger.info("开始从info_turbine表查询机型数据...")
- data = self.execute_query(SELECT_MODEL_DATA_SQL)
- logger.info(f"成功查询到 {len(data)} 条机型记录")
- return data
- except Exception as e:
- logger.error(f"查询机型数据失败: {e}")
- return []
-
- def get_no_model_list(self) -> List[str]:
- """
- 获取所有no_model列表
-
- Returns:
- no_model列表
- """
- try:
- logger.info("开始查询所有机型标识...")
- result = self.execute_query(SELECT_NO_MODEL_LIST_SQL)
- no_models = [row['no_model'] for row in result]
- logger.info(f"成功获取 {len(no_models)} 个机型标识")
- return no_models
- except Exception as e:
- logger.error(f"查询机型标识列表失败: {e}")
- return []
-
- def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]:
- """
- 获取指定机型的SCADA数据用于参数计算
-
- Args:
- no_model: 机型标识
-
- Returns:
- SCADA数据列表
- """
- try:
- logger.debug(f"开始查询机型 {no_model} 的SCADA数据...")
- data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,))
- logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录")
- return data
- except Exception as e:
- logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}")
- return []
-
- def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]:
- """
- 计算并更新指定机型的额定参数
-
- Args:
- no_model: 机型标识
-
- Returns:
- 计算结果的字典
- """
- try:
- # 先获取机型基本信息
- model_info_sql = """
- SELECT manufacturer, rated_capacity
- FROM info_model_turbine
- WHERE no_model = %s
- """
-
- model_info = self.execute_query(model_info_sql, (no_model,))
- if not model_info:
- logger.warning(f"机型 {no_model}: 未找到基本信息")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "未找到机型基本信息",
- "data_points": 0
- }
-
- manufacturer = model_info[0].get('manufacturer', '')
- rated_capacity = model_info[0].get('rated_capacity', 0)
-
- # 获取该机型的SCADA数据
- scada_data = self.get_scada_data_for_no_model(no_model)
-
- if not scada_data:
- logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "无SCADA数据",
- "data_points": 0
- }
-
- # 计算额定参数(带合理性校验)
- rated_rotor_spd, rated_gen_spd, transmission_ratio, turbine_type = \
- SCADADataProcessor.calculate_rated_speeds_and_ratio(
- scada_data, manufacturer, rated_capacity
- )
-
- # 判断计算状态
- if rated_rotor_spd <= 0 or transmission_ratio <= 0:
- calculation_status = "error"
- validation_info = "参数无效"
- success = False
- else:
- # 检查是否需要调整
- manu_info = ManufacturerInfo.get_manufacturer_info(manufacturer)
- expected_ratio_range = manu_info["ratio_range"]
- rotor_range = ManufacturerInfo.get_rotor_speed_range(rated_capacity)
- gen_range = ManufacturerInfo.get_generator_speed_range(turbine_type)
-
- # 判断是否经过调整
- adjustments = []
-
- if not (rotor_range[0] <= rated_rotor_spd <= rotor_range[1]):
- adjustments.append(f"叶轮转速调整: {rated_rotor_spd:.1f} rpm")
-
- if not (gen_range[0] <= rated_gen_spd <= gen_range[1]):
- adjustments.append(f"发电机转速调整: {rated_gen_spd} rpm")
-
- if not (expected_ratio_range[0] <= transmission_ratio <= expected_ratio_range[1]):
- adjustments.append(f"传动比调整: {transmission_ratio:.3f}")
-
- if adjustments:
- calculation_status = "adjusted"
- validation_info = "; ".join(adjustments)
- else:
- calculation_status = "success"
- validation_info = "参数合理"
-
- success = True
-
- # 更新数据库
- update_result = self.execute_update(
- UPDATE_MODEL_PARAMETERS_SQL,
- (
- rated_rotor_spd,
- rated_gen_spd,
- transmission_ratio,
- turbine_type,
- datetime.now(),
- len(scada_data),
- calculation_status,
- validation_info,
- no_model
- )
- )
-
- if update_result > 0:
- log_msg = (f"机型 {no_model}: {calculation_status} - "
- f"叶轮转速={rated_rotor_spd:.1f} rpm, "
- f"发电机转速={rated_gen_spd} rpm, "
- f"传动比={transmission_ratio:.3f}, "
- f"类型={turbine_type}, "
- f"数据点={len(scada_data)}")
-
- if calculation_status == "adjusted":
- logger.info(log_msg + f", 调整: {validation_info}")
- else:
- logger.info(log_msg)
-
- return {
- "no_model": no_model,
- "success": success,
- "status": calculation_status,
- "rated_rotor_spd": rated_rotor_spd,
- "rated_gen_spd": rated_gen_spd,
- "transmission_ratio": transmission_ratio,
- "turbine_type": turbine_type,
- "data_points": len(scada_data),
- "validation_info": validation_info
- }
- else:
- logger.warning(f"机型 {no_model}: 数据库更新失败")
- return {
- "no_model": no_model,
- "success": False,
- "reason": "数据库更新失败",
- "status": "error",
- "rated_rotor_spd": rated_rotor_spd,
- "rated_gen_spd": rated_gen_spd,
- "transmission_ratio": transmission_ratio,
- "turbine_type": turbine_type,
- "data_points": len(scada_data)
- }
-
- except Exception as e:
- logger.error(f"计算机型 {no_model} 参数时出错: {e}")
- return {
- "no_model": no_model,
- "success": False,
- "reason": str(e),
- "status": "error",
- "data_points": 0
- }
-
- def calculate_and_update_all_parameters(self) -> Dict[str, Any]:
- """
- 计算并更新所有机型的额定参数
-
- Returns:
- 计算统计信息
- """
- try:
- logger.info("开始计算并更新所有机型的额定参数...")
-
- # 获取所有机型标识
- no_model_list = self.get_no_model_list()
-
- if not no_model_list:
- logger.warning("没有找到任何机型标识")
- return {
- "total_models": 0,
- "success_count": 0,
- "failed_count": 0,
- "results": []
- }
-
- logger.info(f"共发现 {len(no_model_list)} 个机型需要计算")
-
- results = []
- success_count = 0
- failed_count = 0
-
- # 遍历每个机型进行计算
- for i, no_model in enumerate(no_model_list, 1):
- logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}")
-
- result = self.calculate_and_update_parameters_for_model(no_model)
- results.append(result)
-
- if result.get("success"):
- success_count += 1
- else:
- failed_count += 1
-
- # 汇总统计
- stats = {
- "total_models": len(no_model_list),
- "success_count": success_count,
- "failed_count": failed_count,
- "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0,
- "results": results
- }
-
- logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, "
- f"成功率={stats['success_rate']:.1f}%")
-
- return stats
-
- except Exception as e:
- logger.error(f"计算并更新所有参数失败: {e}")
- raise
-
- def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
- """
- 将机型数据插入到info_model_turbine表
-
- Args:
- model_data: 机型数据列表
-
- Returns:
- int: 成功插入的记录数
- """
- if not model_data:
- logger.warning("没有数据需要插入")
- return 0
-
- insert_sql = """
- INSERT INTO info_model_turbine
- (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed,
- cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- turbine_count = VALUES(turbine_count),
- updated_at = CURRENT_TIMESTAMP
- """
-
- try:
- logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
-
- # 准备插入数据
- insert_data = []
- for item in model_data:
- insert_data.append((
- item['no_model'],
- item['model'],
- item['manufacturer'],
- item['rated_capacity'],
- item['cut_in_wind_speed'],
- item['cut_out_wind_speed'],
- item['rotor_diameter'],
- item['hub_height'],
- item.get('turbine_count', 1)
- ))
-
- # 批量插入数据
- affected_rows = self.execute_batch_update(insert_sql, insert_data)
-
- logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
- return affected_rows
-
- except Exception as e:
- logger.error(f"插入机型数据失败: {e}")
- raise
-
- def get_model_turbine_stats(self) -> Dict[str, Any]:
- """
- 获取info_model_turbine表的统计信息
-
- Returns:
- 统计信息字典
- """
- try:
- # 查询统计信息
- stats_sql = """
- SELECT
- COUNT(*) as total_models,
- COUNT(DISTINCT manufacturer) as manufacturer_count,
- COUNT(DISTINCT model) as model_count,
- MIN(rated_capacity) as min_capacity,
- MAX(rated_capacity) as max_capacity,
- AVG(rated_capacity) as avg_capacity,
- SUM(turbine_count) as total_turbines,
- COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models,
- COUNT(DISTINCT turbine_type) as turbine_type_count,
- SUM(data_points) as total_data_points,
- COUNT(CASE WHEN calculation_status = 'success' THEN 1 END) as success_count,
- COUNT(CASE WHEN calculation_status = 'adjusted' THEN 1 END) as adjusted_count,
- COUNT(CASE WHEN calculation_status = 'error' THEN 1 END) as error_count
- FROM info_model_turbine
- """
-
- result = self.execute_query(stats_sql)
- return result[0] if result else {}
-
- except Exception as e:
- logger.error(f"获取统计信息失败: {e}")
- return {}
-
- def get_turbine_type_distribution(self) -> List[Dict[str, Any]]:
- """
- 获取风机类型分布
-
- Returns:
- 类型分布列表
- """
- try:
- type_sql = """
- SELECT
- IFNULL(turbine_type, '未知') as turbine_type,
- COUNT(*) as model_count,
- SUM(turbine_count) as turbine_count,
- AVG(transmission_ratio) as avg_ratio,
- AVG(rated_rotor_spd) as avg_rotor_spd,
- AVG(rated_gen_spd) as avg_gen_spd
- FROM info_model_turbine
- WHERE turbine_type IS NOT NULL
- GROUP BY turbine_type
- ORDER BY model_count DESC
- """
-
- return self.execute_query(type_sql)
-
- except Exception as e:
- logger.error(f"获取风机类型分布失败: {e}")
- return []
-
- def print_model_summary(self, model_data: List[Dict[str, Any]]):
- """
- 打印机型数据摘要
-
- Args:
- model_data: 机型数据列表
- """
- if not model_data:
- logger.info("没有机型数据")
- return
-
- print("\n" + "="*80)
- print("风机机型数据摘要")
- print("="*80)
- print(f"总机型数: {len(model_data)}")
-
- # 按制造商统计
- manufacturers = {}
- for item in model_data:
- manufacturer = item.get('manufacturer', '未知')
- manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
-
- print(f"制造商数: {len(manufacturers)}")
- print("\n制造商分布:")
- for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
- print(f" {manufacturer}: {count} 种机型")
-
- # 按额定容量统计
- capacities = {}
- for item in model_data:
- capacity = item.get('rated_capacity', 0)
- if capacity:
- capacity_range = f"{capacity}kW"
- capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
-
- print("\n额定容量分布:")
- for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
- print(f" {capacity}: {count} 种机型")
-
- print("="*80)
-
- def print_calculation_summary(self, stats: Dict[str, Any]):
- """打印参数计算摘要"""
- print("\n" + "="*80)
- print("参数计算统计")
- print("="*80)
- print(f"总机型数: {stats.get('total_models', 0)}")
- print(f"成功计算的机型数: {stats.get('success_count', 0)}")
- print(f"失败的机型数: {stats.get('failed_count', 0)}")
- print(f"成功率: {stats.get('success_rate', 0):.1f}%")
-
- # 显示成功计算的部分结果
- success_results = [r for r in stats.get('results', []) if r.get('success')]
- if success_results:
- print(f"\n成功计算的机型示例 (前5个):")
- for i, result in enumerate(success_results[:5]):
- print(f" {i+1}. {result['no_model']}:")
- print(f" 叶轮转速: {result['rated_rotor_spd']:.1f} rpm")
- print(f" 发电机转速: {result['rated_gen_spd']} rpm")
- print(f" 传动比: {result['transmission_ratio']:.3f}")
- print(f" 类型: {result['turbine_type']}")
- print(f" 数据点数: {result['data_points']}")
- if result.get('validation_info'):
- print(f" 校验信息: {result['validation_info']}")
-
- # 显示失败的部分原因
- failed_results = [r for r in stats.get('results', []) if not r.get('success')]
- if failed_results:
- print(f"\n失败的机型示例 (前5个):")
- for i, result in enumerate(failed_results[:5]):
- print(f" {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}")
-
- print("="*80)
-
- def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool:
- """
- 运行完整的机型数据提取和参数计算流程
-
- Args:
- recreate_table: 是否重新创建表
- calculate_params: 是否计算额定参数
-
- Returns:
- bool: 整个流程是否成功
- """
- try:
- logger.info("开始执行风机机型数据提取和参数计算流程...")
-
- # 步骤1: 检查或创建表
- if recreate_table:
- self.drop_model_turbine_table()
- self.create_model_turbine_table()
- else:
- if not self.check_table_exists():
- self.create_model_turbine_table()
- else:
- logger.info("info_model_turbine表已存在,将追加数据")
- # 检查并添加缺失的列
- self.add_missing_columns()
-
- # 步骤2: 从info_turbine表获取机型数据
- model_data = self.get_model_data()
- if not model_data:
- logger.error("未获取到机型数据,流程终止")
- return False
-
- # 步骤3: 打印摘要信息
- self.print_model_summary(model_data)
-
- # 步骤4: 插入数据到info_model_turbine表
- inserted_count = self.insert_model_data(model_data)
-
- # 步骤5: 计算额定参数
- if calculate_params:
- calculation_stats = self.calculate_and_update_all_parameters()
- self.print_calculation_summary(calculation_stats)
-
- # 步骤6: 获取并显示最终统计信息
- stats = self.get_model_turbine_stats()
- if stats:
- print("\n数据库统计信息:")
- print(f" 总机型数: {stats.get('total_models', 0)}")
- print(f" 制造商数: {stats.get('manufacturer_count', 0)}")
- print(f" 总风机数: {stats.get('total_turbines', 0)}")
- print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
- print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
- print(f" 已计算参数的机型数: {stats.get('calculated_models', 0)}")
- print(f" 成功计算: {stats.get('success_count', 0)}, 调整后计算: {stats.get('adjusted_count', 0)}, 错误: {stats.get('error_count', 0)}")
- print(f" 使用的总数据点数: {stats.get('total_data_points', 0)}")
-
- # 显示风机类型分布
- type_dist = self.get_turbine_type_distribution()
- if type_dist:
- print("\n风机类型分布:")
- for item in type_dist:
- print(f" {item['turbine_type']}: {item['model_count']} 种机型, "
- f"{item['turbine_count']} 台风机")
- if item['avg_ratio']:
- print(f" 平均传动比: {item['avg_ratio']:.3f}, "
- f"平均叶轮转速: {item['avg_rotor_spd']:.1f} rpm, "
- f"平均发电机转速: {item['avg_gen_spd']:.0f} rpm")
-
- logger.info("风机机型数据提取和参数计算流程执行完成!")
- return True
-
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- finally:
- # 关闭连接池
- if self.connection_pool:
- self.connection_pool.close_all()
- def main():
- """主函数"""
- # 数据库配置
- db_config = DatabaseConfig(
- host="192.168.50.234",
- port=4000,
- user='root',
- password='123456',
- database='wind_data',
- charset='utf8mb4',
- max_connections=2,
- mem_quota=4 << 30 # 4GB
- )
-
- # 创建管理器实例
- manager = ModelTurbineManager(db_config)
-
- # 运行机型数据提取和参数计算流程
- success = manager.run_model_extraction_pipeline(
- recreate_table=False, # 是否重新创建表
- calculate_params=True # 是否计算额定参数
- )
-
- if success:
- logger.info("风机机型数据提取和参数计算成功完成!")
- else:
- logger.error("风机机型数据提取和参数计算失败!")
- if __name__ == "__main__":
- # 导入numpy用于统计计算
- try:
- import numpy as np
- except ImportError:
- logger.warning("未找到numpy库,使用简单统计方法")
- # 定义简单的替代函数
- def percentile(data, percentiles):
- """简单的百分位数计算"""
- if not data:
- return [0, 0]
- sorted_data = sorted(data)
- n = len(sorted_data)
- return [
- sorted_data[int((n-1) * p / 100)] for p in percentiles
- ]
-
- # 临时创建numpy模块
- class NumpyStub:
- @staticmethod
- def percentile(data, percentiles):
- return percentile(data, percentiles)
-
- np = NumpyStub()
-
- main()
|