import traceback from os import * import pandas as pd import pymysql from pymysql.cursors import DictCursor from sqlalchemy import create_engine from utils.conf.read_conf import yaml_conf from utils.log.trans_log import trans_print class ConnectMysql: def __init__(self, connet_name): self.yaml_data = yaml_conf(environ.get('ETL_CONF')) self.connet_name = connet_name self.config = self.yaml_data[self.connet_name] self.database = self.config['database'] # 从连接池中获取一个连接 def get_conn(self): return pymysql.connect(**self.config) # 使用连接执行sql def execute(self, sql, params=tuple()): with self.get_conn() as conn: with conn.cursor(cursor=DictCursor) as cursor: try: cursor.execute(sql, params) trans_print("开始执行SQL:", cursor._executed) conn.commit() result = cursor.fetchall() return result except Exception as e: trans_print(f"执行sql:{sql},报错:{e}") trans_print(traceback.format_exc()) conn.rollback() raise e def get_engine(self): config = self.config username = config['user'] password = config['password'] host = config['host'] port = config['port'] dbname = config['database'] return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}') def execute_df_save(self, df, table_name, chunk_size=10000): df.to_sql(table_name, self.get_engine(), index=False, if_exists='append', chunksize=chunk_size) def read_sql_to_df(self, sql): df = pd.read_sql_query(sql, self.get_engine()) return df