ConnectMysql.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import traceback
  2. from os import *
  3. import pandas as pd
  4. import pymysql
  5. from pymysql.cursors import DictCursor
  6. from sqlalchemy import create_engine
  7. from utils.conf.read_conf import yaml_conf
  8. from utils.log.trans_log import trans_print
  9. class ConnectMysql:
  10. def __init__(self, connet_name):
  11. self.yaml_data = yaml_conf(environ.get('ETL_CONF'))
  12. self.connet_name = connet_name
  13. self.config = self.yaml_data[self.connet_name]
  14. self.database = self.config['database']
  15. # 从连接池中获取一个连接
  16. def get_conn(self):
  17. return pymysql.connect(**self.config)
  18. # 使用连接执行sql
  19. def execute(self, sql, params=tuple()):
  20. with self.get_conn() as conn:
  21. with conn.cursor(cursor=DictCursor) as cursor:
  22. try:
  23. cursor.execute(sql, params)
  24. trans_print("开始执行SQL:", cursor._executed)
  25. conn.commit()
  26. result = cursor.fetchall()
  27. return result
  28. except Exception as e:
  29. trans_print(f"执行sql:{sql},报错:{e}")
  30. trans_print(traceback.format_exc())
  31. conn.rollback()
  32. raise e
  33. def get_engine(self):
  34. config = self.config
  35. username = config['user']
  36. password = config['password']
  37. host = config['host']
  38. port = config['port']
  39. dbname = config['database']
  40. return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
  41. def execute_df_save(self, df, table_name, chunk_size=10000):
  42. df.to_sql(table_name, self.get_engine(), index=False, if_exists='append', chunksize=chunk_size)
  43. def read_sql_to_df(self, sql):
  44. df = pd.read_sql_query(sql, self.get_engine())
  45. return df