info_curve_power_turbine.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import pandas as pd
  2. import pymysql
  3. from pymysql import Error
  4. from typing import List, Dict, Any
  5. import logging
  6. # 设置日志
  7. logging.basicConfig(level=logging.INFO)
  8. logger = logging.getLogger(__name__)
  9. # 数据库配置(从config.py导入,这里直接定义)
  10. class DatabaseConfig:
  11. HOST = '192.168.50.234'
  12. PORT = 4000
  13. DATABASE = 'wind_data'
  14. USERNAME = 'root'
  15. PASSWORD = '123456'
  16. class WindTurbinePowerCurve:
  17. """风力发电机功率曲线数据处理器"""
  18. def __init__(self, csv_file_path: str):
  19. """
  20. 初始化功率曲线处理器
  21. Args:
  22. csv_file_path: CSV文件路径
  23. """
  24. self.csv_file_path = csv_file_path
  25. self.dataframe = None
  26. self.connection = None
  27. def load_csv_data(self) -> pd.DataFrame:
  28. """加载CSV数据到DataFrame"""
  29. try:
  30. # 读取CSV文件,注意文件开头的BOM字符
  31. self.dataframe = pd.read_csv(self.csv_file_path, encoding='utf-8-sig')
  32. logger.info(f"成功加载CSV数据,共{len(self.dataframe)}行记录")
  33. return self.dataframe
  34. except Exception as e:
  35. logger.error(f"加载CSV文件失败: {e}")
  36. raise
  37. def clean_data(self) -> None:
  38. """清理和验证数据"""
  39. if self.dataframe is None:
  40. raise ValueError("数据未加载,请先调用load_csv_data()")
  41. # 检查必需列
  42. required_columns = [
  43. '风场id', '风场名称', '标准机型', '原始机型', '风速', '有功功率'
  44. ]
  45. for col in required_columns:
  46. if col not in self.dataframe.columns:
  47. raise ValueError(f"缺少必需列: {col}")
  48. # 处理缺失值
  49. original_count = len(self.dataframe)
  50. self.dataframe.dropna(subset=['风速', '有功功率'], inplace=True)
  51. cleaned_count = len(self.dataframe)
  52. if original_count > cleaned_count:
  53. logger.warning(f"删除了{original_count - cleaned_count}行包含缺失值的记录")
  54. # 确保数据类型正确
  55. self.dataframe['风速'] = pd.to_numeric(self.dataframe['风速'], errors='coerce')
  56. self.dataframe['有功功率'] = pd.to_numeric(self.dataframe['有功功率'], errors='coerce')
  57. self.dataframe['空气密度'] = pd.to_numeric(self.dataframe['空气密度'], errors='coerce')
  58. self.dataframe['叶轮直径(m)'] = pd.to_numeric(self.dataframe['叶轮直径(m)'], errors='coerce')
  59. self.dataframe['额定功率(kW)'] = pd.to_numeric(self.dataframe['额定功率(kW)'], errors='coerce')
  60. self.dataframe['额定风速(m/s)'] = pd.to_numeric(self.dataframe['额定风速(m/s)'], errors='coerce')
  61. logger.info(f"数据清理完成,剩余{len(self.dataframe)}行有效记录")
  62. class DatabaseManager:
  63. """数据库管理器"""
  64. def __init__(self, config: DatabaseConfig):
  65. """
  66. 初始化数据库管理器
  67. Args:
  68. config: 数据库配置
  69. """
  70. self.config = config
  71. self.connection = None
  72. def connect(self) -> pymysql.Connection:
  73. """连接到MySQL数据库"""
  74. try:
  75. self.connection = pymysql.connect(
  76. host=self.config.HOST,
  77. port=self.config.PORT,
  78. user=self.config.USERNAME,
  79. password=self.config.PASSWORD,
  80. database=self.config.DATABASE,
  81. charset='utf8mb4',
  82. cursorclass=pymysql.cursors.DictCursor
  83. )
  84. logger.info("成功连接到数据库")
  85. return self.connection
  86. except Error as e:
  87. logger.error(f"数据库连接失败: {e}")
  88. raise
  89. def disconnect(self) -> None:
  90. """断开数据库连接"""
  91. if self.connection and self.connection.open:
  92. self.connection.close()
  93. logger.info("数据库连接已关闭")
  94. def create_table(self) -> None:
  95. """创建风力发电机功率曲线表"""
  96. create_table_sql = """
  97. CREATE TABLE IF NOT EXISTS info_curve_power_turbine (
  98. id INT AUTO_INCREMENT PRIMARY KEY,
  99. wind_farm_id VARCHAR(50) NOT NULL COMMENT '风场ID',
  100. wind_farm_name VARCHAR(100) NOT NULL COMMENT '风场名称',
  101. standard_model VARCHAR(50) NOT NULL COMMENT '标准机型',
  102. original_model VARCHAR(50) NOT NULL COMMENT '原始机型',
  103. wind_speed DECIMAL(6,2) NOT NULL COMMENT '风速(m/s)',
  104. active_power DECIMAL(10,2) NOT NULL COMMENT '有功功率(kW)',
  105. description TEXT COMMENT '描述',
  106. air_density DECIMAL(8,6) COMMENT '空气密度(kg/m³)',
  107. rotor_diameter DECIMAL(8,2) COMMENT '叶轮直径(m)',
  108. rated_power DECIMAL(10,2) COMMENT '额定功率(kW)',
  109. sweep_area DECIMAL(12,2) COMMENT '扫风面积(m²)',
  110. rated_wind_speed DECIMAL(6,2) COMMENT '额定风速(m/s)',
  111. power_density DECIMAL(15,12) COMMENT '功率密度(W/m²)',
  112. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  113. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  114. INDEX idx_wind_farm (wind_farm_id),
  115. INDEX idx_model (standard_model),
  116. INDEX idx_wind_speed (wind_speed),
  117. INDEX idx_original_model (original_model)
  118. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风力发电机功率曲线数据表';
  119. """
  120. try:
  121. with self.connection.cursor() as cursor:
  122. cursor.execute(create_table_sql)
  123. self.connection.commit()
  124. logger.info("表 'info_curve_power_turbine' 创建成功或已存在")
  125. except Error as e:
  126. logger.error(f"创建表失败: {e}")
  127. self.connection.rollback()
  128. raise
  129. def insert_data(self, data: pd.DataFrame) -> int:
  130. """
  131. 批量插入数据到数据库
  132. Args:
  133. data: 包含要插入数据的DataFrame
  134. Returns:
  135. 插入的行数
  136. """
  137. if data.empty:
  138. logger.warning("没有数据需要插入")
  139. return 0
  140. # 准备插入SQL
  141. insert_sql = """
  142. INSERT INTO info_curve_power_turbine (
  143. wind_farm_id, wind_farm_name, standard_model, original_model,
  144. wind_speed, active_power, description, air_density,
  145. rotor_diameter, rated_power, sweep_area, rated_wind_speed,
  146. power_density
  147. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  148. """
  149. # 准备数据
  150. records = []
  151. for _, row in data.iterrows():
  152. record = (
  153. str(row.get('风场id', '')).strip(),
  154. str(row.get('风场名称', '')).strip(),
  155. str(row.get('标准机型', '')).strip(),
  156. str(row.get('原始机型', '')).strip(),
  157. float(row.get('风速', 0.0)),
  158. float(row.get('有功功率', 0.0)),
  159. str(row.get('描述', '')).strip(),
  160. float(row.get('空气密度', 0.0)) if pd.notna(row.get('空气密度')) else None,
  161. float(row.get('叶轮直径(m)', 0.0)) if pd.notna(row.get('叶轮直径(m)')) else None,
  162. float(row.get('额定功率(kW)', 0.0)) if pd.notna(row.get('额定功率(kW)')) else None,
  163. float(row.get('扫风面积(㎡)', 0.0)) if pd.notna(row.get('扫风面积(㎡)')) else None,
  164. float(row.get('额定风速(m/s)', 0.0)) if pd.notna(row.get('额定风速(m/s)')) else None,
  165. float(row.get('功率密度(W/㎡)', 0.0)) if pd.notna(row.get('功率密度(W/㎡)')) else None
  166. )
  167. records.append(record)
  168. try:
  169. with self.connection.cursor() as cursor:
  170. # 批量插入
  171. cursor.executemany(insert_sql, records)
  172. self.connection.commit()
  173. inserted_count = cursor.rowcount
  174. logger.info(f"成功插入 {inserted_count} 条记录到数据库")
  175. return inserted_count
  176. except Error as e:
  177. logger.error(f"插入数据失败: {e}")
  178. self.connection.rollback()
  179. raise
  180. def count_records(self) -> int:
  181. """统计表中的记录数"""
  182. try:
  183. with self.connection.cursor() as cursor:
  184. cursor.execute("SELECT COUNT(*) as count FROM info_curve_power_turbine")
  185. result = cursor.fetchone()
  186. return result['count'] if result else 0
  187. except Error as e:
  188. logger.error(f"统计记录失败: {e}")
  189. return 0
  190. class WindDataImporter:
  191. """风力数据导入器"""
  192. def __init__(self, csv_file_path: str):
  193. """
  194. 初始化数据导入器
  195. Args:
  196. csv_file_path: CSV文件路径
  197. """
  198. self.csv_file_path = csv_file_path
  199. self.data_processor = WindTurbinePowerCurve(csv_file_path)
  200. self.db_config = DatabaseConfig()
  201. self.db_manager = None
  202. def import_data(self) -> bool:
  203. """
  204. 主导入方法
  205. Returns:
  206. 导入是否成功
  207. """
  208. try:
  209. logger.info("开始导入风力发电机功率曲线数据...")
  210. # 1. 加载和清理CSV数据
  211. self.data_processor.load_csv_data()
  212. self.data_processor.clean_data()
  213. # 2. 连接数据库
  214. self.db_manager = DatabaseManager(self.db_config)
  215. self.db_manager.connect()
  216. # 3. 创建表
  217. self.db_manager.create_table()
  218. # 4. 插入数据
  219. inserted_count = self.db_manager.insert_data(self.data_processor.dataframe)
  220. # 5. 验证数据
  221. total_count = self.db_manager.count_records()
  222. logger.info(f"数据库中的总记录数: {total_count}")
  223. # 6. 断开连接
  224. self.db_manager.disconnect()
  225. logger.info("数据导入完成!")
  226. return True
  227. except Exception as e:
  228. logger.error(f"数据导入过程中出错: {e}")
  229. # 确保在出错时断开连接
  230. if self.db_manager:
  231. try:
  232. self.db_manager.disconnect()
  233. except:
  234. pass
  235. return False
  236. def get_summary(self) -> Dict[str, Any]:
  237. """获取数据摘要"""
  238. if self.data_processor.dataframe is None:
  239. return {}
  240. df = self.data_processor.dataframe
  241. summary = {
  242. '总记录数': len(df),
  243. '风场数量': df['风场id'].nunique(),
  244. '标准机型数量': df['标准机型'].nunique(),
  245. '原始机型数量': df['原始机型'].nunique(),
  246. '风速范围': f"{df['风速'].min():.1f} - {df['风速'].max():.1f} m/s",
  247. '有功功率范围': f"{df['有功功率'].min():.1f} - {df['有功功率'].max():.1f} kW",
  248. '空气密度范围': f"{df['空气密度'].min():.3f} - {df['空气密度'].max():.3f} kg/m³"
  249. }
  250. return summary
  251. def main():
  252. """主函数"""
  253. # CSV文件路径(根据实际情况修改)
  254. csv_file_path = f"./data/全部机型功率曲线_含标准类型_解析结果_详细.csv"
  255. # 创建导入器并执行导入
  256. importer = WindDataImporter(csv_file_path)
  257. # 显示数据摘要
  258. try:
  259. importer.data_processor.load_csv_data()
  260. importer.data_processor.clean_data()
  261. summary = importer.get_summary()
  262. print("数据摘要:")
  263. for key, value in summary.items():
  264. print(f" {key}: {value}")
  265. print()
  266. except Exception as e:
  267. print(f"读取CSV文件时出错: {e}")
  268. return
  269. # 执行导入
  270. success = importer.import_data()
  271. if success:
  272. print("数据导入成功!")
  273. else:
  274. print("数据导入失败!")
  275. if __name__ == "__main__":
  276. main()