import random from datetime import datetime, timedelta from sqlalchemy import create_engine, Column, Integer, Float, DateTime, PrimaryKeyConstraint,String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from concurrent.futures import ThreadPoolExecutor, as_completed # 数据库连接字符串,根据实际情况进行修改 DATABASE_URI = 'mysql+pymysql://admin:admin123456@192.168.50.233:3306/test' # 创建数据库引擎 engine = create_engine(DATABASE_URI, pool_size=10, max_overflow=20) # 创建基类 Base = declarative_base() # 定义表结构 class TurbineData(Base): __tablename__ = 'turbine_data' batchNO = Column(Integer, nullable=False) turbineNO = Column(String, nullable=False) time = Column(DateTime, nullable=False) active_power = Column(Float, nullable=False) wind_speed = Column(Float, nullable=False) __table_args__ = ( PrimaryKeyConstraint('batchNO', 'turbineNO', 'time'), ) # 创建表(如果不存在) Base.metadata.create_all(engine) # 创建Session Session = sessionmaker(bind=engine) # 生成并插入数据 num_records = 8000000 batch_size = 200000 batchNOs = ['B001','B002','B003'] turbineNOs = ['WT001','WT002','WT003','WT004','WT005','WT006','WT007','WT008','WT009','WT010','WT011'] # batchNOs = ['B001','B002'] # turbineNOs = ['WT001','WT002'] # 辅助函数生成随机数据 def generate_data(num_records,batchNO,turbineNO,time): data = [] dateTime=time for _ in range(num_records): active_power = round(random.uniform(0, 5000), 2) wind_speed = round(random.uniform(0, 25), 2) data.append(TurbineData(batchNO=batchNO, turbineNO=turbineNO, time=dateTime, active_power=active_power, wind_speed=wind_speed)) dateTime += timedelta(seconds=1) # 保证时间字段不重复 return data,dateTime # 插入数据的任务 def insert_data(batchNO, turbineNO, start_index, end_index, dateTime): session = Session() data_batch,dateTime = generate_data(end_index - start_index, batchNO, turbineNO, dateTime) session.bulk_save_objects(data_batch) session.commit() session.close() return dateTime # 使用线程池批量插入数据 with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for batchNO in batchNOs: dateTime = datetime.now() # 随机生成时间 for turbineNO in turbineNOs: for i in range(0, num_records, batch_size): dateTime += timedelta(seconds=i) print(f'batchNO: {batchNO} turbineNO: {turbineNO} Scheduling records {i} to {i + batch_size}') futures.append(executor.submit(insert_data, batchNO, turbineNO, i, i + batch_size, dateTime)) for future in as_completed(futures): try: dateTime = future.result() except Exception as exc: print(f'Generated an exception: {exc}') print('Data insertion complete.')