testDatabase.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import random
  2. from datetime import datetime, timedelta
  3. from sqlalchemy import create_engine, Column, Integer, Float, DateTime, PrimaryKeyConstraint,String
  4. from sqlalchemy.ext.declarative import declarative_base
  5. from sqlalchemy.orm import sessionmaker
  6. from concurrent.futures import ThreadPoolExecutor, as_completed
  7. # 数据库连接字符串,根据实际情况进行修改
  8. DATABASE_URI = 'mysql+pymysql://admin:admin123456@192.168.50.233:3306/test'
  9. # 创建数据库引擎
  10. engine = create_engine(DATABASE_URI, pool_size=10, max_overflow=20)
  11. # 创建基类
  12. Base = declarative_base()
  13. # 定义表结构
  14. class TurbineData(Base):
  15. __tablename__ = 'turbine_data'
  16. batchNO = Column(Integer, nullable=False)
  17. turbineNO = Column(String, nullable=False)
  18. time = Column(DateTime, nullable=False)
  19. active_power = Column(Float, nullable=False)
  20. wind_speed = Column(Float, nullable=False)
  21. __table_args__ = (
  22. PrimaryKeyConstraint('batchNO', 'turbineNO', 'time'),
  23. )
  24. # 创建表(如果不存在)
  25. Base.metadata.create_all(engine)
  26. # 创建Session
  27. Session = sessionmaker(bind=engine)
  28. # 生成并插入数据
  29. num_records = 8000000
  30. batch_size = 200000
  31. batchNOs = ['B001','B002','B003']
  32. turbineNOs = ['WT001','WT002','WT003','WT004','WT005','WT006','WT007','WT008','WT009','WT010','WT011']
  33. # batchNOs = ['B001','B002']
  34. # turbineNOs = ['WT001','WT002']
  35. # 辅助函数生成随机数据
  36. def generate_data(num_records,batchNO,turbineNO,time):
  37. data = []
  38. dateTime=time
  39. for _ in range(num_records):
  40. active_power = round(random.uniform(0, 5000), 2)
  41. wind_speed = round(random.uniform(0, 25), 2)
  42. data.append(TurbineData(batchNO=batchNO, turbineNO=turbineNO, time=dateTime, active_power=active_power, wind_speed=wind_speed))
  43. dateTime += timedelta(seconds=1) # 保证时间字段不重复
  44. return data,dateTime
  45. # 插入数据的任务
  46. def insert_data(batchNO, turbineNO, start_index, end_index, dateTime):
  47. session = Session()
  48. data_batch,dateTime = generate_data(end_index - start_index, batchNO, turbineNO, dateTime)
  49. session.bulk_save_objects(data_batch)
  50. session.commit()
  51. session.close()
  52. return dateTime
  53. # 使用线程池批量插入数据
  54. with ThreadPoolExecutor(max_workers=5) as executor:
  55. futures = []
  56. for batchNO in batchNOs:
  57. dateTime = datetime.now() # 随机生成时间
  58. for turbineNO in turbineNOs:
  59. for i in range(0, num_records, batch_size):
  60. dateTime += timedelta(seconds=i)
  61. print(f'batchNO: {batchNO} turbineNO: {turbineNO} Scheduling records {i} to {i + batch_size}')
  62. futures.append(executor.submit(insert_data, batchNO, turbineNO, i, i + batch_size, dateTime))
  63. for future in as_completed(futures):
  64. try:
  65. dateTime = future.result()
  66. except Exception as exc:
  67. print(f'Generated an exception: {exc}')
  68. print('Data insertion complete.')