12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import random
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine, Column, Integer, Float, DateTime, PrimaryKeyConstraint,String
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from concurrent.futures import ThreadPoolExecutor, as_completed
- # 数据库连接字符串,根据实际情况进行修改
- DATABASE_URI = 'mysql+pymysql://admin:admin123456@192.168.50.233:3306/test'
- # 创建数据库引擎
- engine = create_engine(DATABASE_URI, pool_size=10, max_overflow=20)
- # 创建基类
- Base = declarative_base()
- # 定义表结构
- class TurbineData(Base):
- __tablename__ = 'turbine_data'
- batchNO = Column(Integer, nullable=False)
- turbineNO = Column(String, nullable=False)
- time = Column(DateTime, nullable=False)
- active_power = Column(Float, nullable=False)
- wind_speed = Column(Float, nullable=False)
-
- __table_args__ = (
- PrimaryKeyConstraint('batchNO', 'turbineNO', 'time'),
- )
- # 创建表(如果不存在)
- Base.metadata.create_all(engine)
- # 创建Session
- Session = sessionmaker(bind=engine)
- # 生成并插入数据
- num_records = 8000000
- batch_size = 200000
- batchNOs = ['B001','B002','B003']
- turbineNOs = ['WT001','WT002','WT003','WT004','WT005','WT006','WT007','WT008','WT009','WT010','WT011']
- # batchNOs = ['B001','B002']
- # turbineNOs = ['WT001','WT002']
- # 辅助函数生成随机数据
- def generate_data(num_records,batchNO,turbineNO,time):
- data = []
- dateTime=time
- for _ in range(num_records):
- active_power = round(random.uniform(0, 5000), 2)
- wind_speed = round(random.uniform(0, 25), 2)
- data.append(TurbineData(batchNO=batchNO, turbineNO=turbineNO, time=dateTime, active_power=active_power, wind_speed=wind_speed))
- dateTime += timedelta(seconds=1) # 保证时间字段不重复
- return data,dateTime
- # 插入数据的任务
- def insert_data(batchNO, turbineNO, start_index, end_index, dateTime):
-
- session = Session()
- data_batch,dateTime = generate_data(end_index - start_index, batchNO, turbineNO, dateTime)
- session.bulk_save_objects(data_batch)
- session.commit()
- session.close()
- return dateTime
- # 使用线程池批量插入数据
- with ThreadPoolExecutor(max_workers=5) as executor:
- futures = []
- for batchNO in batchNOs:
- dateTime = datetime.now() # 随机生成时间
- for turbineNO in turbineNOs:
- for i in range(0, num_records, batch_size):
- dateTime += timedelta(seconds=i)
- print(f'batchNO: {batchNO} turbineNO: {turbineNO} Scheduling records {i} to {i + batch_size}')
- futures.append(executor.submit(insert_data, batchNO, turbineNO, i, i + batch_size, dateTime))
- for future in as_completed(futures):
- try:
- dateTime = future.result()
- except Exception as exc:
- print(f'Generated an exception: {exc}')
- print('Data insertion complete.')
|