import time import traceback from os import * import pandas as pd import pymysql from pymysql.cursors import DictCursor from sqlalchemy import create_engine from utils.conf.read_conf import yaml_conf from utils.log.trans_log import trans_print class ConnectMysql: def __init__(self, connet_name): self.yaml_data = yaml_conf(environ.get('ETL_CONF')) self.connet_name = connet_name self.config = self.yaml_data[self.connet_name] self.database = self.config['database'] # 从连接池中获取一个连接 def get_conn(self): return pymysql.connect(**self.config, autocommit=True) # 使用连接执行sql def execute(self, sql, params=tuple()): with self.get_conn() as conn: with conn.cursor(cursor=DictCursor) as cursor: try: cursor.execute(sql, params) trans_print("开始执行SQL:", cursor._executed) conn.commit() result = cursor.fetchall() return result except Exception as e: trans_print(f"执行sql:{sql},报错:{e}") trans_print(traceback.format_exc()) conn.rollback() raise e def get_engine(self): config = self.config username = config['user'] password = config['password'] host = config['host'] port = config['port'] dbname = config['database'] return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}', pool_pre_ping=True, isolation_level="READ COMMITTED", connect_args={ 'connect_timeout': 30, 'read_timeout': 120, 'write_timeout': 7200 }) def execute_df_save(self, df, table_name, chunksize=10000): engine = self.get_engine() try: retry_count = 0 max_retries = 3 while retry_count < max_retries: try: df.to_sql(table_name, engine, if_exists='append', index=False, chunksize=chunksize) except Exception as e: retry_count += 1 trans_print(f" 第 {retry_count} 次重试, 错误: {str(e)}") time.sleep(5 * retry_count) # 指数退避 if retry_count == max_retries: trans_print(f"处理失败: {str(e)}") raise except Exception as e: engine.dispose() raise e def read_sql_to_df(self, sql): df = pd.read_sql_query(sql, self.get_engine()) return df