ConnectMysql.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import os
  2. import traceback
  3. import pandas as pd
  4. import pymysql
  5. from pymysql.cursors import DictCursor
  6. from sqlalchemy import create_engine
  7. from utils.conf.read_conf import yaml_conf
  8. from utils.log.trans_log import trans_print
  9. class ConnectMysql:
  10. def __init__(self, connet_name):
  11. self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
  12. self.connet_name = connet_name
  13. if 'env' in os.environ:
  14. self.env = os.environ['env']
  15. else:
  16. self.env = 'dev'
  17. # 从连接池中获取一个连接
  18. def get_conn(self):
  19. return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env])
  20. # 使用连接执行sql
  21. def execute(self, sql, params=tuple()):
  22. with self.get_conn() as conn:
  23. with conn.cursor(cursor=DictCursor) as cursor:
  24. try:
  25. cursor.execute(sql, params)
  26. trans_print("开始执行SQL:", cursor._executed)
  27. conn.commit()
  28. result = cursor.fetchall()
  29. return result
  30. except Exception as e:
  31. trans_print(f"执行sql:{sql},报错:{e}")
  32. trans_print(traceback.format_exc())
  33. conn.rollback()
  34. raise e
  35. def execute_df_save(self, df, table_name):
  36. config = self.yaml_data[self.connet_name + "_" + self.env]
  37. username = config['user']
  38. password = config['password']
  39. host = config['host']
  40. port = config['port']
  41. dbname = config['database']
  42. engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
  43. df.to_sql(table_name, engine, index=False, if_exists='append')
  44. def read_sql_to_df(self, sql):
  45. config = self.yaml_data[self.connet_name + "_" + self.env]
  46. username = config['user']
  47. password = config['password']
  48. host = config['host']
  49. port = config['port']
  50. dbname = config['database']
  51. engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
  52. df = pd.read_sql_query(sql, engine)
  53. return df