ConnectMysql_tidb_fix.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import time
  2. import traceback
  3. from os import *
  4. import pandas as pd
  5. import pymysql
  6. from pymysql.cursors import DictCursor
  7. from sqlalchemy import create_engine
  8. from utils.conf.read_conf import yaml_conf
  9. from utils.log.trans_log import trans_print
  10. class ConnectMysql:
  11. def __init__(self, connet_name):
  12. self.yaml_data = yaml_conf(environ.get('ETL_CONF'))
  13. self.connet_name = connet_name
  14. self.config = self.yaml_data[self.connet_name]
  15. self.database = self.config['database']
  16. # 从连接池中获取一个连接
  17. def get_conn(self):
  18. return pymysql.connect(**self.config, autocommit=True)
  19. # 使用连接执行sql
  20. def execute(self, sql, params=tuple()):
  21. with self.get_conn() as conn:
  22. with conn.cursor(cursor=DictCursor) as cursor:
  23. try:
  24. cursor.execute(sql, params)
  25. trans_print("开始执行SQL:", cursor._executed)
  26. conn.commit()
  27. result = cursor.fetchall()
  28. return result
  29. except Exception as e:
  30. trans_print(f"执行sql:{sql},报错:{e}")
  31. trans_print(traceback.format_exc())
  32. conn.rollback()
  33. raise e
  34. def get_engine(self):
  35. config = self.config
  36. username = config['user']
  37. password = config['password']
  38. host = config['host']
  39. port = config['port']
  40. dbname = config['database']
  41. return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}',
  42. pool_pre_ping=True,
  43. isolation_level="READ COMMITTED",
  44. connect_args={
  45. 'connect_timeout': 30,
  46. 'read_timeout': 120,
  47. 'write_timeout': 7200
  48. })
  49. def execute_df_save(self, df, table_name, chunksize=10000):
  50. engine = self.get_engine()
  51. try:
  52. retry_count = 0
  53. max_retries = 3
  54. while retry_count < max_retries:
  55. try:
  56. df.to_sql(table_name, engine, if_exists='append', index=False, chunksize=chunksize)
  57. except Exception as e:
  58. retry_count += 1
  59. trans_print(f" 第 {retry_count} 次重试, 错误: {str(e)}")
  60. time.sleep(5 * retry_count) # 指数退避
  61. if retry_count == max_retries:
  62. trans_print(f"处理失败: {str(e)}")
  63. raise
  64. except Exception as e:
  65. engine.dispose()
  66. raise e
  67. def read_sql_to_df(self, sql):
  68. df = pd.read_sql_query(sql, self.get_engine())
  69. return df