|
@@ -15,14 +15,11 @@ class ConnectMysql:
|
|
|
def __init__(self, connet_name):
|
|
|
self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
|
|
|
self.connet_name = connet_name
|
|
|
- if 'env' in os.environ:
|
|
|
- self.env = os.environ['env']
|
|
|
- else:
|
|
|
- self.env = 'dev'
|
|
|
+ self.config = self.yaml_data[self.connet_name]
|
|
|
|
|
|
# 从连接池中获取一个连接
|
|
|
def get_conn(self):
|
|
|
- return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env])
|
|
|
+ return pymysql.connect(**self.config)
|
|
|
|
|
|
# 使用连接执行sql
|
|
|
def execute(self, sql, params=tuple()):
|
|
@@ -41,23 +38,18 @@ class ConnectMysql:
|
|
|
conn.rollback()
|
|
|
raise e
|
|
|
|
|
|
- def execute_df_save(self, df, table_name):
|
|
|
- config = self.yaml_data[self.connet_name + "_" + self.env]
|
|
|
+ def get_engine(self):
|
|
|
+ config = self.config
|
|
|
username = config['user']
|
|
|
password = config['password']
|
|
|
host = config['host']
|
|
|
port = config['port']
|
|
|
dbname = config['database']
|
|
|
- engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
|
|
|
- df.to_sql(table_name, engine, index=False, if_exists='append')
|
|
|
+ return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
|
|
|
+
|
|
|
+ def execute_df_save(self, df, table_name):
|
|
|
+ df.to_sql(table_name, self.get_engine(), index=False, if_exists='append')
|
|
|
|
|
|
def read_sql_to_df(self, sql):
|
|
|
- config = self.yaml_data[self.connet_name + "_" + self.env]
|
|
|
- username = config['user']
|
|
|
- password = config['password']
|
|
|
- host = config['host']
|
|
|
- port = config['port']
|
|
|
- dbname = config['database']
|
|
|
- engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
|
|
|
- df = pd.read_sql_query(sql, engine)
|
|
|
+ df = pd.read_sql_query(sql, self.get_engine())
|
|
|
return df
|