info_turbine_data.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. import pandas as pd
  2. import pymysql
  3. from sqlalchemy import create_engine, text
  4. import warnings
  5. from typing import Optional, Dict, Any, List
  6. import logging
  7. # 配置日志
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  11. )
  12. logger = logging.getLogger(__name__)
  13. # 映射中文表头到英文列名
  14. HEADER_MAPPING = {
  15. '风机id': 'turbine_id',
  16. '风场id': 'wind_farm_id',
  17. '风机名称': 'turbine_name',
  18. '风机英文名称': 'turbine_name_en',
  19. 'USCADA别名': 'uscada_alias',
  20. '海拔': 'altitude',
  21. '纬度': 'latitude',
  22. '经度': 'longitude',
  23. '主控接口版本': 'main_control_version',
  24. '所属馈线ID': 'feeder_id',
  25. '所属期ID': 'phase_id',
  26. '轮毂高度': 'hub_height',
  27. '指标统计开始日期': 'stat_start_date',
  28. '投产运行日期': 'operation_date',
  29. '合同功率曲线(标准空气密度)': 'contract_power_curve_std',
  30. '合同功率曲线(当地空气密度)': 'contract_power_curve_local',
  31. '电量倍率': 'power_multiplier',
  32. '电量跳变斜率': 'power_jump_slope',
  33. '项目阶段': 'project_phase',
  34. '标杆风机标志(0或1)': 'benchmark_flag',
  35. '遥测辅助状态判断启用': 'telemetry_aux_status_enable',
  36. 'scada单风机UR': 'scada_ur',
  37. 'scada风机图标': 'scada_icon',
  38. 'scada数据冻结时间阈值': 'scada_freeze_threshold',
  39. 'scada应用配置导入模板': 'scada_template',
  40. '预留字段': 'reserved_field',
  41. '风机图片文件名': 'turbine_image',
  42. '时区': 'timezone',
  43. '描述': 'description',
  44. '额定容量': 'rated_capacity',
  45. '机型': 'model',
  46. '外部厂家风机ID': 'external_turbine_id',
  47. '风机制造商': 'manufacturer',
  48. '切入风速': 'cut_in_wind_speed',
  49. '切出风速': 'cut_out_wind_speed',
  50. '修改时间': 'update_time',
  51. '关联进线间隔id': 'incoming_line_id',
  52. '关联出线间隔id': 'outgoing_line_id',
  53. '关联主变压器id': 'main_transformer_id',
  54. '投产状态': 'operation_status',
  55. '质保状态': 'warranty_status',
  56. '有功功率积分算电量': 'active_power_integral',
  57. '开启故障检修判断': 'fault_maintenance_enable',
  58. '实施阶段': 'implementation_phase',
  59. '次级分组': 'secondary_group',
  60. '风机编号': 'turbine_number',
  61. '风机阶段': 'turbine_phase',
  62. '首次并网时间': 'first_grid_time',
  63. '数据湖源': 'data_lake_source',
  64. 'wsfd_local': 'wsfd_local',
  65. '叶轮直径': 'rotor_diameter'
  66. }
  67. # SQL语句定义
  68. CREATE_TABLE_SQL = """
  69. CREATE TABLE IF NOT EXISTS info_turbine (
  70. id INT AUTO_INCREMENT PRIMARY KEY,
  71. turbine_id VARCHAR(50),
  72. wind_farm_id VARCHAR(50),
  73. turbine_name VARCHAR(200),
  74. turbine_name_en VARCHAR(200),
  75. uscada_alias VARCHAR(100),
  76. altitude DECIMAL(10, 4),
  77. latitude DECIMAL(12, 8),
  78. longitude DECIMAL(12, 8),
  79. main_control_version VARCHAR(100),
  80. feeder_id VARCHAR(50),
  81. phase_id VARCHAR(50),
  82. hub_height DECIMAL(10, 2),
  83. stat_start_date DATE,
  84. operation_date DATE,
  85. contract_power_curve_std VARCHAR(100),
  86. contract_power_curve_local VARCHAR(100),
  87. power_multiplier DECIMAL(10, 2),
  88. power_jump_slope DECIMAL(10, 2),
  89. project_phase INT,
  90. benchmark_flag TINYINT,
  91. telemetry_aux_status_enable VARCHAR(10),
  92. scada_ur VARCHAR(100),
  93. scada_icon VARCHAR(100),
  94. scada_freeze_threshold VARCHAR(100),
  95. scada_template VARCHAR(100),
  96. reserved_field VARCHAR(100),
  97. turbine_image VARCHAR(200),
  98. timezone VARCHAR(50),
  99. description TEXT,
  100. rated_capacity INT,
  101. model VARCHAR(100),
  102. external_turbine_id VARCHAR(100),
  103. manufacturer VARCHAR(100),
  104. cut_in_wind_speed DECIMAL(5, 2),
  105. cut_out_wind_speed DECIMAL(5, 2),
  106. update_time DATETIME,
  107. incoming_line_id VARCHAR(50),
  108. outgoing_line_id VARCHAR(50),
  109. main_transformer_id VARCHAR(50),
  110. operation_status VARCHAR(50),
  111. warranty_status VARCHAR(50),
  112. active_power_integral TINYINT,
  113. fault_maintenance_enable TINYINT,
  114. implementation_phase INT,
  115. secondary_group VARCHAR(100),
  116. turbine_number VARCHAR(50),
  117. turbine_phase VARCHAR(50),
  118. first_grid_time DATE,
  119. data_lake_source VARCHAR(100),
  120. wsfd_local VARCHAR(100),
  121. rotor_diameter INT,
  122. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  123. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  124. INDEX idx_turbine_id (turbine_id),
  125. INDEX idx_wind_farm_id (wind_farm_id),
  126. INDEX idx_manufacturer (manufacturer),
  127. INDEX idx_model (model)
  128. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
  129. """
  130. COUNT_TABLE_SQL = "SELECT COUNT(*) FROM info_turbine"
  131. DROP_TABLE_SQL = "DROP TABLE IF EXISTS info_turbine"
  132. class DatabaseConfig:
  133. """数据库配置类"""
  134. def __init__(self, host='192.168.50.234', port=24001, user='root',
  135. password='admin123456', database='wind_data', charset='utf8mb4'):
  136. self.host = host
  137. self.port = port
  138. self.user = user
  139. self.password = password
  140. self.database = database
  141. self.charset = charset
  142. def get_pymysql_config(self) -> Dict[str, Any]:
  143. """获取pymysql连接配置"""
  144. return {
  145. 'host': self.host,
  146. 'port': self.port,
  147. 'user': self.user,
  148. 'password': self.password,
  149. 'database': self.database,
  150. 'charset': self.charset
  151. }
  152. def get_sqlalchemy_url(self) -> str:
  153. """获取SQLAlchemy连接URL"""
  154. return f"mysql+pymysql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}?charset={self.charset}"
  155. class TurbineDataImporter:
  156. """风机数据导入器"""
  157. def __init__(self, db_config: DatabaseConfig):
  158. """
  159. 初始化导入器
  160. Args:
  161. db_config: 数据库配置对象
  162. """
  163. self.db_config = db_config
  164. self.engine = None
  165. self.pymysql_conn = None
  166. def connect_pymysql(self) -> pymysql.connections.Connection:
  167. """创建pymysql连接"""
  168. try:
  169. self.pymysql_conn = pymysql.connect(**self.db_config.get_pymysql_config())
  170. logger.info("pymysql连接成功")
  171. return self.pymysql_conn
  172. except Exception as e:
  173. logger.error(f"pymysql连接失败: {e}")
  174. raise
  175. def connect_sqlalchemy(self) -> create_engine:
  176. """创建SQLAlchemy引擎"""
  177. try:
  178. self.engine = create_engine(self.db_config.get_sqlalchemy_url())
  179. logger.info("SQLAlchemy引擎创建成功")
  180. return self.engine
  181. except Exception as e:
  182. logger.error(f"SQLAlchemy引擎创建失败: {e}")
  183. raise
  184. def close_connections(self):
  185. """关闭所有数据库连接"""
  186. if self.pymysql_conn:
  187. self.pymysql_conn.close()
  188. logger.info("pymysql连接已关闭")
  189. if self.engine:
  190. self.engine.dispose()
  191. logger.info("SQLAlchemy引擎已关闭")
  192. def create_table(self) -> bool:
  193. """
  194. 创建风机信息表
  195. Returns:
  196. bool: 是否成功创建表
  197. """
  198. try:
  199. if not self.pymysql_conn:
  200. self.connect_pymysql()
  201. cursor = self.pymysql_conn.cursor()
  202. cursor.execute(CREATE_TABLE_SQL)
  203. self.pymysql_conn.commit()
  204. logger.info("表 info_turbine 创建成功或已存在")
  205. return True
  206. except Exception as e:
  207. logger.error(f"创建表时发生错误: {e}")
  208. self.pymysql_conn.rollback()
  209. return False
  210. finally:
  211. if 'cursor' in locals():
  212. cursor.close()
  213. def drop_table(self) -> bool:
  214. """
  215. 删除风机信息表(用于重新创建)
  216. Returns:
  217. bool: 是否成功删除表
  218. """
  219. try:
  220. if not self.pymysql_conn:
  221. self.connect_pymysql()
  222. cursor = self.pymysql_conn.cursor()
  223. cursor.execute(DROP_TABLE_SQL)
  224. self.pymysql_conn.commit()
  225. logger.info("表 info_turbine 删除成功")
  226. return True
  227. except Exception as e:
  228. logger.error(f"删除表时发生错误: {e}")
  229. self.pymysql_conn.rollback()
  230. return False
  231. finally:
  232. if 'cursor' in locals():
  233. cursor.close()
  234. def load_csv_data(self, file_path: str) -> Optional[pd.DataFrame]:
  235. """
  236. 加载CSV文件数据
  237. Args:
  238. file_path: CSV文件路径
  239. Returns:
  240. Optional[pd.DataFrame]: 加载的数据框,失败时返回None
  241. """
  242. try:
  243. logger.info(f"正在读取文件: {file_path}")
  244. # 读取CSV文件,指定编码
  245. df = pd.read_csv(file_path, encoding='utf-8')
  246. logger.info(f"成功读取CSV文件,共 {len(df)} 条记录")
  247. # 重命名列(中文表头转英文)
  248. df = df.rename(columns=HEADER_MAPPING)
  249. logger.info("表头已转换为英文")
  250. # 处理空值和异常值
  251. df = self._clean_data(df)
  252. # 数据类型转换
  253. df = self._convert_data_types(df)
  254. return df
  255. except FileNotFoundError:
  256. logger.error(f"错误:找不到文件 {file_path}")
  257. return None
  258. except Exception as e:
  259. logger.error(f"读取CSV文件时发生错误: {e}")
  260. return None
  261. def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
  262. """清洗数据"""
  263. # 处理空字符串,转换为None
  264. df = df.replace(['', ' ', 'null', 'NULL'], None)
  265. # 删除全为空的列
  266. df = df.dropna(axis=1, how='all')
  267. return df
  268. def _convert_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
  269. """转换数据类型"""
  270. # 处理日期和时间字段
  271. date_columns = ['stat_start_date', 'operation_date', 'first_grid_time']
  272. for col in date_columns:
  273. if col in df.columns:
  274. df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
  275. # 处理update_time字段
  276. if 'update_time' in df.columns:
  277. df['update_time'] = pd.to_datetime(df['update_time'], errors='coerce')
  278. # 处理数值字段
  279. numeric_columns = [
  280. 'altitude', 'latitude', 'longitude', 'hub_height',
  281. 'rated_capacity', 'cut_in_wind_speed', 'cut_out_wind_speed',
  282. 'rotor_diameter', 'power_multiplier', 'power_jump_slope'
  283. ]
  284. for col in numeric_columns:
  285. if col in df.columns:
  286. df[col] = pd.to_numeric(df[col], errors='coerce')
  287. # 处理整数字段
  288. int_columns = ['project_phase', 'benchmark_flag', 'implementation_phase']
  289. for col in int_columns:
  290. if col in df.columns:
  291. df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64')
  292. # 处理布尔/标志字段
  293. flag_columns = ['active_power_integral', 'fault_maintenance_enable']
  294. for col in flag_columns:
  295. if col in df.columns:
  296. df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('Int64')
  297. return df
  298. def import_to_mysql(self, df: pd.DataFrame, if_exists: str = 'append') -> bool:
  299. """
  300. 导入数据到MySQL数据库
  301. Args:
  302. df: 数据框
  303. if_exists: 表存在时的处理方式 ('fail', 'replace', 'append')
  304. Returns:
  305. bool: 是否成功导入
  306. """
  307. try:
  308. if not self.engine:
  309. self.connect_sqlalchemy()
  310. logger.info("正在导入数据到MySQL数据库...")
  311. # 使用to_sql方法导入数据
  312. df.to_sql(
  313. name='info_turbine',
  314. con=self.engine,
  315. if_exists=if_exists,
  316. index=False,
  317. chunksize=1000,
  318. method='multi' # 使用多值插入提高性能
  319. )
  320. logger.info(f"成功导入 {len(df)} 条记录到数据库表 info_turbine")
  321. return True
  322. except Exception as e:
  323. logger.error(f"导入数据到MySQL时发生错误: {e}")
  324. return False
  325. def get_record_count(self) -> Optional[int]:
  326. """
  327. 获取表中的记录数
  328. Returns:
  329. Optional[int]: 记录数,失败时返回None
  330. """
  331. try:
  332. if not self.pymysql_conn:
  333. self.connect_pymysql()
  334. cursor = self.pymysql_conn.cursor()
  335. cursor.execute(COUNT_TABLE_SQL)
  336. count = cursor.fetchone()[0]
  337. cursor.close()
  338. logger.info(f"数据库表 info_turbine 中现有 {count} 条记录")
  339. return count
  340. except Exception as e:
  341. logger.error(f"获取记录数时发生错误: {e}")
  342. return None
  343. def get_table_info(self) -> Optional[Dict[str, Any]]:
  344. """
  345. 获取表信息
  346. Returns:
  347. Optional[Dict[str, Any]]: 表信息字典
  348. """
  349. try:
  350. if not self.pymysql_conn:
  351. self.connect_pymysql()
  352. cursor = self.pymysql_conn.cursor()
  353. # 获取表结构
  354. cursor.execute("DESCRIBE info_turbine")
  355. columns = cursor.fetchall()
  356. # 获取索引信息
  357. cursor.execute("SHOW INDEX FROM info_turbine")
  358. indexes = cursor.fetchall()
  359. cursor.close()
  360. return {
  361. 'columns': columns,
  362. 'indexes': indexes
  363. }
  364. except Exception as e:
  365. logger.error(f"获取表信息时发生错误: {e}")
  366. return None
  367. def run_import_pipeline(self, csv_file_path: str, drop_if_exists: bool = False) -> bool:
  368. """
  369. 运行完整的数据导入流程
  370. Args:
  371. csv_file_path: CSV文件路径
  372. drop_if_exists: 是否删除已存在的表
  373. Returns:
  374. bool: 整个流程是否成功
  375. """
  376. try:
  377. logger.info("开始执行风机数据导入程序...")
  378. # 步骤1: 创建数据库连接
  379. self.connect_pymysql()
  380. self.connect_sqlalchemy()
  381. # 步骤2: 处理表(如果需要删除则先删除)
  382. if drop_if_exists:
  383. if not self.drop_table():
  384. return False
  385. # 步骤3: 创建表
  386. if not self.create_table():
  387. return False
  388. # 步骤4: 加载CSV数据
  389. df = self.load_csv_data(csv_file_path)
  390. if df is None:
  391. return False
  392. # 步骤5: 导入数据
  393. if not self.import_to_mysql(df, if_exists='append'):
  394. return False
  395. # 步骤6: 验证数据
  396. count = self.get_record_count()
  397. if count is not None:
  398. logger.info(f"数据导入验证成功,共导入 {len(df)} 条记录,数据库中现有 {count} 条记录")
  399. # 步骤7: 获取表信息(可选)
  400. table_info = self.get_table_info()
  401. if table_info:
  402. logger.info(f"表结构信息已获取,共有 {len(table_info['columns'])} 列")
  403. logger.info("风机数据导入程序执行完成!")
  404. return True
  405. except Exception as e:
  406. logger.error(f"数据导入流程执行失败: {e}")
  407. return False
  408. finally:
  409. # 清理连接
  410. self.close_connections()
  411. def main():
  412. """主函数"""
  413. # 忽略警告
  414. warnings.filterwarnings('ignore')
  415. # 数据库配置
  416. db_config = DatabaseConfig(
  417. host='192.168.50.234',
  418. port=24001,
  419. user='root',
  420. password='admin123456',
  421. database='wind_data',
  422. charset='utf8mb4'
  423. )
  424. # CSV文件路径
  425. csv_file_path = './data/风机应用配置表_带叶轮直径.csv'
  426. # 创建导入器实例
  427. importer = TurbineDataImporter(db_config)
  428. # 运行导入流程
  429. # 如果表已经存在并且包含旧数据,可以设置 drop_if_exists=True 来重新创建表
  430. success = importer.run_import_pipeline(
  431. csv_file_path=csv_file_path,
  432. drop_if_exists=False # 设置为True会删除已存在的表
  433. )
  434. if success:
  435. logger.info("数据导入成功完成!")
  436. else:
  437. logger.error("数据导入失败!")
  438. if __name__ == "__main__":
  439. main()