12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- import os
- import traceback
- import pymysql
- from pymysql.cursors import DictCursor
- from sqlalchemy import create_engine
- from utils.conf.read_conf import yaml_conf
- from utils.log.trans_log import trans_print
- class ConnectMysql:
- def __init__(self, connet_name):
- #self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
- self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
- self.connet_name = connet_name
- if 'env' in os.environ:
- self.env = os.environ['env']
- else:
- self.env = 'dev'
- # 从连接池中获取一个连接
- def get_conn(self):
- return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env])
- # 使用连接执行sql
- def execute(self, sql, params=tuple()):
- with self.get_conn() as conn:
- with conn.cursor(cursor=DictCursor) as cursor:
- try:
- cursor.execute(sql, params)
- trans_print("开始执行SQL:", cursor._executed)
- conn.commit()
- result = cursor.fetchall()
- return result
- except Exception as e:
- trans_print(f"执行sql:{sql},报错:{e}")
- trans_print(traceback.format_exc())
- conn.rollback()
- raise e
- def execute_df_save(self, df, table_name):
- config = self.yaml_data[self.connet_name + "_" + self.env]
- username = config['user']
- password = config['password']
- host = config['host']
- port = config['port']
- dbname = config['database']
- engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
- df.to_sql(table_name, engine, index=False, if_exists='append')
|