import os import traceback import pymysql from pymysql.cursors import DictCursor from sqlalchemy import create_engine from utils.conf.read_conf import yaml_conf from utils.log.trans_log import trans_print class ConnectMysql: def __init__(self, connet_name): #self.yaml_data = yaml_conf("/data/config/etl_config.yaml") self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml")) self.connet_name = connet_name if 'env' in os.environ: self.env = os.environ['env'] else: self.env = 'dev' # 从连接池中获取一个连接 def get_conn(self): return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env]) # 使用连接执行sql def execute(self, sql, params=tuple()): with self.get_conn() as conn: with conn.cursor(cursor=DictCursor) as cursor: try: cursor.execute(sql, params) trans_print("开始执行SQL:", cursor._executed) conn.commit() result = cursor.fetchall() return result except Exception as e: trans_print(f"执行sql:{sql},报错:{e}") trans_print(traceback.format_exc()) conn.rollback() raise e def execute_df_save(self, df, table_name): config = self.yaml_data[self.connet_name + "_" + self.env] username = config['user'] password = config['password'] host = config['host'] port = config['port'] dbname = config['database'] engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}') df.to_sql(table_name, engine, index=False, if_exists='append')