trans_pg.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import pandas as pd
  5. # 建立数据库连接
  6. from sqlalchemy import create_engine, Engine, text
  7. from utils.log.trans_log import trans_print
  8. trans_user = 'postgres'
  9. trans_password = 'admin123456'
  10. trans_host = '192.168.50.235'
  11. trans_port = 5432
  12. trans_database = 'postgres'
  13. trans_engine = create_engine(
  14. f'postgresql://{trans_user}:{trans_password}@{trans_host}:{trans_port}/{trans_database}', echo=True)
  15. def __query(engine: Engine, sql):
  16. trans_print('开始执行SQL:', sql)
  17. with engine.connect() as conn:
  18. df = pd.read_sql(text(sql), conn)
  19. return df
  20. def __ddl_sql(engine: Engine, sql):
  21. trans_print('开始执行SQL:', sql)
  22. with engine.connect() as conn:
  23. conn.execute(text(sql))
  24. conn.commit()
  25. def creat_table_and_add_partition(table_name, partition):
  26. lower_partition = partition.lower()
  27. query_table = f"""
  28. select c.relname
  29. from pg_class c
  30. join pg_inherits pi on pi.inhrelid = c. oid
  31. join pg_class c2 on c2.oid = pi.inhparent
  32. where
  33. c2.relname = '{table_name}' and c.relname = '{table_name}_{lower_partition}'
  34. """
  35. df = __query(trans_engine, query_table)
  36. if df.empty:
  37. add_partition_sql = f"""
  38. create table {table_name}_{partition} PARTITION OF {table_name} FOR VALUES IN ('{partition}');
  39. """
  40. __ddl_sql(trans_engine, add_partition_sql)
  41. if __name__ == '__main__':
  42. # creat_table_and_add_partition("test_11", "123_002")
  43. df = __query(trans_engine, """
  44. select c.relname
  45. from pg_class c
  46. join pg_inherits pi on pi.inhrelid = c. oid
  47. join pg_class c2 on c2.oid = pi.inhparent
  48. where
  49. c2.relname = 'energy_data_second' and c.relname = 'energy_data_second_202405201453_a006';
  50. """)
  51. print(df)
  52. print(df.empty)