ConnectMysql.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import traceback
  2. from os import *
  3. import traceback
  4. from os import *
  5. import pandas as pd
  6. import pymysql
  7. from pymysql.cursors import DictCursor
  8. from sqlalchemy import create_engine
  9. from utils.common_util import get_project_conf_path_file
  10. from utils.conf.read_conf import yaml_conf
  11. from utils.log.import_data_log import log_print
  12. class ConnectMysql:
  13. def __init__(self, connet_name):
  14. conf_path = get_project_conf_path_file('dev')
  15. self.yaml_data = yaml_conf(environ.get('IMPORT_CONF', conf_path))
  16. self.connet_name = connet_name
  17. self.config = self.yaml_data[self.connet_name]
  18. # 从连接池中获取一个连接
  19. def get_conn(self):
  20. return pymysql.connect(**self.config)
  21. # 使用连接执行sql
  22. def execute(self, sql, params=tuple()):
  23. with self.get_conn() as conn:
  24. with conn.cursor(cursor=DictCursor) as cursor:
  25. try:
  26. cursor.execute(sql, params)
  27. log_print("开始执行SQL:", cursor._executed)
  28. conn.commit()
  29. result = cursor.fetchall()
  30. return result
  31. except Exception as e:
  32. log_print(f"执行sql:{sql},报错:{e}")
  33. log_print(traceback.format_exc())
  34. conn.rollback()
  35. raise e
  36. def get_engine(self):
  37. config = self.config
  38. username = config['user']
  39. password = config['password']
  40. host = config['host']
  41. port = config['port']
  42. dbname = config['database']
  43. return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
  44. def execute_df_save(self, df, table_name):
  45. df.to_sql(table_name, self.get_engine(), index=False, if_exists='append')
  46. def read_sql_to_df(self, sql):
  47. df = pd.read_sql_query(sql, self.get_engine())
  48. return df