| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- import pandas as pd
- import pymysql
- from sqlalchemy import create_engine, text
- import warnings
- from typing import Optional, Dict, Any, List
- import logging
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- # 映射中文表头到英文列名
- HEADER_MAPPING = {
- '风机id': 'turbine_id',
- '风场id': 'wind_farm_id',
- '风机名称': 'turbine_name',
- '风机英文名称': 'turbine_name_en',
- 'USCADA别名': 'uscada_alias',
- '海拔': 'altitude',
- '纬度': 'latitude',
- '经度': 'longitude',
- '主控接口版本': 'main_control_version',
- '所属馈线ID': 'feeder_id',
- '所属期ID': 'phase_id',
- '轮毂高度': 'hub_height',
- '指标统计开始日期': 'stat_start_date',
- '投产运行日期': 'operation_date',
- '合同功率曲线(标准空气密度)': 'contract_power_curve_std',
- '合同功率曲线(当地空气密度)': 'contract_power_curve_local',
- '电量倍率': 'power_multiplier',
- '电量跳变斜率': 'power_jump_slope',
- '项目阶段': 'project_phase',
- '标杆风机标志(0或1)': 'benchmark_flag',
- '遥测辅助状态判断启用': 'telemetry_aux_status_enable',
- 'scada单风机UR': 'scada_ur',
- 'scada风机图标': 'scada_icon',
- 'scada数据冻结时间阈值': 'scada_freeze_threshold',
- 'scada应用配置导入模板': 'scada_template',
- '预留字段': 'reserved_field',
- '风机图片文件名': 'turbine_image',
- '时区': 'timezone',
- '描述': 'description',
- '额定容量': 'rated_capacity',
- '机型': 'model',
- '外部厂家风机ID': 'external_turbine_id',
- '风机制造商': 'manufacturer',
- '切入风速': 'cut_in_wind_speed',
- '切出风速': 'cut_out_wind_speed',
- '修改时间': 'update_time',
- '关联进线间隔id': 'incoming_line_id',
- '关联出线间隔id': 'outgoing_line_id',
- '关联主变压器id': 'main_transformer_id',
- '投产状态': 'operation_status',
- '质保状态': 'warranty_status',
- '有功功率积分算电量': 'active_power_integral',
- '开启故障检修判断': 'fault_maintenance_enable',
- '实施阶段': 'implementation_phase',
- '次级分组': 'secondary_group',
- '风机编号': 'turbine_number',
- '风机阶段': 'turbine_phase',
- '首次并网时间': 'first_grid_time',
- '数据湖源': 'data_lake_source',
- 'wsfd_local': 'wsfd_local',
- '叶轮直径': 'rotor_diameter'
- }
- # SQL语句定义
- CREATE_TABLE_SQL = """
- CREATE TABLE IF NOT EXISTS info_turbine (
- id INT AUTO_INCREMENT PRIMARY KEY,
- turbine_id VARCHAR(50),
- wind_farm_id VARCHAR(50),
- turbine_name VARCHAR(200),
- turbine_name_en VARCHAR(200),
- uscada_alias VARCHAR(100),
- altitude DECIMAL(10, 4),
- latitude DECIMAL(12, 8),
- longitude DECIMAL(12, 8),
- main_control_version VARCHAR(100),
- feeder_id VARCHAR(50),
- phase_id VARCHAR(50),
- hub_height DECIMAL(10, 2),
- stat_start_date DATE,
- operation_date DATE,
- contract_power_curve_std VARCHAR(100),
- contract_power_curve_local VARCHAR(100),
- power_multiplier DECIMAL(10, 2),
- power_jump_slope DECIMAL(10, 2),
- project_phase INT,
- benchmark_flag TINYINT,
- telemetry_aux_status_enable VARCHAR(10),
- scada_ur VARCHAR(100),
- scada_icon VARCHAR(100),
- scada_freeze_threshold VARCHAR(100),
- scada_template VARCHAR(100),
- reserved_field VARCHAR(100),
- turbine_image VARCHAR(200),
- timezone VARCHAR(50),
- description TEXT,
- rated_capacity INT,
- model VARCHAR(100),
- external_turbine_id VARCHAR(100),
- manufacturer VARCHAR(100),
- cut_in_wind_speed DECIMAL(5, 2),
- cut_out_wind_speed DECIMAL(5, 2),
- update_time DATETIME,
- incoming_line_id VARCHAR(50),
- outgoing_line_id VARCHAR(50),
- main_transformer_id VARCHAR(50),
- operation_status VARCHAR(50),
- warranty_status VARCHAR(50),
- active_power_integral TINYINT,
- fault_maintenance_enable TINYINT,
- implementation_phase INT,
- secondary_group VARCHAR(100),
- turbine_number VARCHAR(50),
- turbine_phase VARCHAR(50),
- first_grid_time DATE,
- data_lake_source VARCHAR(100),
- wsfd_local VARCHAR(100),
- rotor_diameter INT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- INDEX idx_turbine_id (turbine_id),
- INDEX idx_wind_farm_id (wind_farm_id),
- INDEX idx_manufacturer (manufacturer),
- INDEX idx_model (model)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
- """
- COUNT_TABLE_SQL = "SELECT COUNT(*) FROM info_turbine"
- DROP_TABLE_SQL = "DROP TABLE IF EXISTS info_turbine"
- class DatabaseConfig:
- """数据库配置类"""
- def __init__(self, host='192.168.50.234', port=4000, user='root',
- password='123456', database='wind_data', charset='utf8mb4'):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
-
- def get_pymysql_config(self) -> Dict[str, Any]:
- """获取pymysql连接配置"""
- return {
- 'host': self.host,
- 'port': self.port,
- 'user': self.user,
- 'password': self.password,
- 'database': self.database,
- 'charset': self.charset
- }
-
- def get_sqlalchemy_url(self) -> str:
- """获取SQLAlchemy连接URL"""
- return f"mysql+pymysql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}?charset={self.charset}"
- class TurbineDataImporter:
- """风机数据导入器"""
-
- def __init__(self, db_config: DatabaseConfig):
- """
- 初始化导入器
-
- Args:
- db_config: 数据库配置对象
- """
- self.db_config = db_config
- self.engine = None
- self.pymysql_conn = None
-
- def connect_pymysql(self) -> pymysql.connections.Connection:
- """创建pymysql连接"""
- try:
- self.pymysql_conn = pymysql.connect(**self.db_config.get_pymysql_config())
- logger.info("pymysql连接成功")
- return self.pymysql_conn
- except Exception as e:
- logger.error(f"pymysql连接失败: {e}")
- raise
-
- def connect_sqlalchemy(self) -> create_engine:
- """创建SQLAlchemy引擎"""
- try:
- self.engine = create_engine(self.db_config.get_sqlalchemy_url())
- logger.info("SQLAlchemy引擎创建成功")
- return self.engine
- except Exception as e:
- logger.error(f"SQLAlchemy引擎创建失败: {e}")
- raise
-
- def close_connections(self):
- """关闭所有数据库连接"""
- if self.pymysql_conn:
- self.pymysql_conn.close()
- logger.info("pymysql连接已关闭")
-
- if self.engine:
- self.engine.dispose()
- logger.info("SQLAlchemy引擎已关闭")
-
- def create_table(self) -> bool:
- """
- 创建风机信息表
-
- Returns:
- bool: 是否成功创建表
- """
- try:
- if not self.pymysql_conn:
- self.connect_pymysql()
-
- cursor = self.pymysql_conn.cursor()
- cursor.execute(CREATE_TABLE_SQL)
- self.pymysql_conn.commit()
-
- logger.info("表 info_turbine 创建成功或已存在")
- return True
-
- except Exception as e:
- logger.error(f"创建表时发生错误: {e}")
- self.pymysql_conn.rollback()
- return False
- finally:
- if 'cursor' in locals():
- cursor.close()
-
- def drop_table(self) -> bool:
- """
- 删除风机信息表(用于重新创建)
-
- Returns:
- bool: 是否成功删除表
- """
- try:
- if not self.pymysql_conn:
- self.connect_pymysql()
-
- cursor = self.pymysql_conn.cursor()
- cursor.execute(DROP_TABLE_SQL)
- self.pymysql_conn.commit()
-
- logger.info("表 info_turbine 删除成功")
- return True
-
- except Exception as e:
- logger.error(f"删除表时发生错误: {e}")
- self.pymysql_conn.rollback()
- return False
- finally:
- if 'cursor' in locals():
- cursor.close()
-
- def load_csv_data(self, file_path: str) -> Optional[pd.DataFrame]:
- """
- 加载CSV文件数据
-
- Args:
- file_path: CSV文件路径
-
- Returns:
- Optional[pd.DataFrame]: 加载的数据框,失败时返回None
- """
- try:
- logger.info(f"正在读取文件: {file_path}")
-
- # 读取CSV文件,指定编码
- df = pd.read_csv(file_path, encoding='utf-8')
- logger.info(f"成功读取CSV文件,共 {len(df)} 条记录")
-
- # 重命名列(中文表头转英文)
- df = df.rename(columns=HEADER_MAPPING)
- logger.info("表头已转换为英文")
-
- # 处理空值和异常值
- df = self._clean_data(df)
-
- # 数据类型转换
- df = self._convert_data_types(df)
-
- return df
-
- except FileNotFoundError:
- logger.error(f"错误:找不到文件 {file_path}")
- return None
- except Exception as e:
- logger.error(f"读取CSV文件时发生错误: {e}")
- return None
-
- def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
- """清洗数据"""
- # 处理空字符串,转换为None
- df = df.replace(['', ' ', 'null', 'NULL'], None)
-
- # 删除全为空的列
- df = df.dropna(axis=1, how='all')
-
- return df
-
- def _convert_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
- """转换数据类型"""
-
- # 处理日期和时间字段
- date_columns = ['stat_start_date', 'operation_date', 'first_grid_time']
- for col in date_columns:
- if col in df.columns:
- df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
-
- # 处理update_time字段
- if 'update_time' in df.columns:
- df['update_time'] = pd.to_datetime(df['update_time'], errors='coerce')
-
- # 处理数值字段
- numeric_columns = [
- 'altitude', 'latitude', 'longitude', 'hub_height',
- 'rated_capacity', 'cut_in_wind_speed', 'cut_out_wind_speed',
- 'rotor_diameter', 'power_multiplier', 'power_jump_slope'
- ]
-
- for col in numeric_columns:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
-
- # 处理整数字段
- int_columns = ['project_phase', 'benchmark_flag', 'implementation_phase']
- for col in int_columns:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64')
-
- # 处理布尔/标志字段
- flag_columns = ['active_power_integral', 'fault_maintenance_enable']
- for col in flag_columns:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64')
-
- return df
-
- def import_to_mysql(self, df: pd.DataFrame, if_exists: str = 'append') -> bool:
- """
- 导入数据到MySQL数据库
-
- Args:
- df: 数据框
- if_exists: 表存在时的处理方式 ('fail', 'replace', 'append')
-
- Returns:
- bool: 是否成功导入
- """
- try:
- if not self.engine:
- self.connect_sqlalchemy()
-
- logger.info("正在导入数据到MySQL数据库...")
-
- # 使用to_sql方法导入数据
- df.to_sql(
- name='info_turbine',
- con=self.engine,
- if_exists=if_exists,
- index=False,
- chunksize=1000,
- method='multi' # 使用多值插入提高性能
- )
-
- logger.info(f"成功导入 {len(df)} 条记录到数据库表 info_turbine")
- return True
-
- except Exception as e:
- logger.error(f"导入数据到MySQL时发生错误: {e}")
- return False
-
- def get_record_count(self) -> Optional[int]:
- """
- 获取表中的记录数
-
- Returns:
- Optional[int]: 记录数,失败时返回None
- """
- try:
- if not self.pymysql_conn:
- self.connect_pymysql()
-
- cursor = self.pymysql_conn.cursor()
- cursor.execute(COUNT_TABLE_SQL)
- count = cursor.fetchone()[0]
- cursor.close()
-
- logger.info(f"数据库表 info_turbine 中现有 {count} 条记录")
- return count
-
- except Exception as e:
- logger.error(f"获取记录数时发生错误: {e}")
- return None
-
- def get_table_info(self) -> Optional[Dict[str, Any]]:
- """
- 获取表信息
-
- Returns:
- Optional[Dict[str, Any]]: 表信息字典
- """
- try:
- if not self.pymysql_conn:
- self.connect_pymysql()
-
- cursor = self.pymysql_conn.cursor()
-
- # 获取表结构
- cursor.execute("DESCRIBE info_turbine")
- columns = cursor.fetchall()
-
- # 获取索引信息
- cursor.execute("SHOW INDEX FROM info_turbine")
- indexes = cursor.fetchall()
-
- cursor.close()
-
- return {
- 'columns': columns,
- 'indexes': indexes
- }
-
- except Exception as e:
- logger.error(f"获取表信息时发生错误: {e}")
- return None
-
- def run_import_pipeline(self, csv_file_path: str, drop_if_exists: bool = False) -> bool:
- """
- 运行完整的数据导入流程
-
- Args:
- csv_file_path: CSV文件路径
- drop_if_exists: 是否删除已存在的表
-
- Returns:
- bool: 整个流程是否成功
- """
- try:
- logger.info("开始执行风机数据导入程序...")
-
- # 步骤1: 创建数据库连接
- self.connect_pymysql()
- self.connect_sqlalchemy()
-
- # 步骤2: 处理表(如果需要删除则先删除)
- if drop_if_exists:
- if not self.drop_table():
- return False
-
- # 步骤3: 创建表
- if not self.create_table():
- return False
-
- # 步骤4: 加载CSV数据
- df = self.load_csv_data(csv_file_path)
- if df is None:
- return False
-
- # 步骤5: 导入数据
- if not self.import_to_mysql(df, if_exists='append'):
- return False
-
- # 步骤6: 验证数据
- count = self.get_record_count()
- if count is not None:
- logger.info(f"数据导入验证成功,共导入 {len(df)} 条记录,数据库中现有 {count} 条记录")
-
- # 步骤7: 获取表信息(可选)
- table_info = self.get_table_info()
- if table_info:
- logger.info(f"表结构信息已获取,共有 {len(table_info['columns'])} 列")
-
- logger.info("风机数据导入程序执行完成!")
- return True
-
- except Exception as e:
- logger.error(f"数据导入流程执行失败: {e}")
- return False
- finally:
- # 清理连接
- self.close_connections()
- def main():
- """主函数"""
- # 忽略警告
- warnings.filterwarnings('ignore')
-
- # 数据库配置
- db_config = DatabaseConfig(
- host='192.168.50.234',
- port=4000,
- user='root',
- password='123456',
- database='wind_data',
- charset='utf8mb4'
- )
-
- # CSV文件路径
- csv_file_path = './data/风机应用配置表_带叶轮直径.csv'
-
- # 创建导入器实例
- importer = TurbineDataImporter(db_config)
-
- # 运行导入流程
- # 如果表已经存在并且包含旧数据,可以设置 drop_if_exists=True 来重新创建表
- success = importer.run_import_pipeline(
- csv_file_path=csv_file_path,
- drop_if_exists=False # 设置为True会删除已存在的表
- )
-
- if success:
- logger.info("数据导入成功完成!")
- else:
- logger.error("数据导入失败!")
- if __name__ == "__main__":
- main()
|