1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import time
- import traceback
- from os import *
- import pandas as pd
- import pymysql
- from pymysql.cursors import DictCursor
- from sqlalchemy import create_engine
- from utils.conf.read_conf import yaml_conf
- from utils.log.trans_log import trans_print
- class ConnectMysql:
- def __init__(self, connet_name):
- self.yaml_data = yaml_conf(environ.get('ETL_CONF'))
- self.connet_name = connet_name
- self.config = self.yaml_data[self.connet_name]
- self.database = self.config['database']
- # 从连接池中获取一个连接
- def get_conn(self):
- return pymysql.connect(**self.config, autocommit=True)
- # 使用连接执行sql
- def execute(self, sql, params=tuple()):
- with self.get_conn() as conn:
- with conn.cursor(cursor=DictCursor) as cursor:
- try:
- cursor.execute(sql, params)
- trans_print("开始执行SQL:", cursor._executed)
- conn.commit()
- result = cursor.fetchall()
- return result
- except Exception as e:
- trans_print(f"执行sql:{sql},报错:{e}")
- trans_print(traceback.format_exc())
- conn.rollback()
- raise e
- def get_engine(self):
- config = self.config
- username = config['user']
- password = config['password']
- host = config['host']
- port = config['port']
- dbname = config['database']
- return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}',
- pool_pre_ping=True,
- isolation_level="READ COMMITTED",
- connect_args={
- 'connect_timeout': 30,
- 'read_timeout': 120,
- 'write_timeout': 7200
- })
- def execute_df_save(self, df, table_name, chunksize=10000):
- engine = self.get_engine()
- try:
- retry_count = 0
- max_retries = 3
- while retry_count < max_retries:
- try:
- df.to_sql(table_name, engine, if_exists='append', index=False, chunksize=chunksize)
- except Exception as e:
- retry_count += 1
- trans_print(f" 第 {retry_count} 次重试, 错误: {str(e)}")
- time.sleep(5 * retry_count) # 指数退避
- if retry_count == max_retries:
- trans_print(f"处理失败: {str(e)}")
- raise
- except Exception as e:
- engine.dispose()
- raise e
- def read_sql_to_df(self, sql):
- df = pd.read_sql_query(sql, self.get_engine())
- return df
|