trans_mysql.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. # 建立数据库连接
  5. import pymysql
  6. from sqlalchemy import create_engine
  7. from utils.log.trans_log import trans_print
  8. user = 'admin'
  9. password = 'admin123456'
  10. host = '192.168.50.233'
  11. port = 3306
  12. database = 'energy_data'
  13. plt_conn = pymysql.connect(
  14. host=host,
  15. port=3306,
  16. user=user,
  17. password=password,
  18. database=database,
  19. charset='utf8mb4'
  20. )
  21. engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}', echo=True)
  22. susscess_sql = "update batch_status set status = 'success' where batch_no = '{batch_no}' and trans_type = '{trans_type}'";
  23. error_sql = "update batch_status set status = 'error',message='{message}' where batch_no = '{batch_no}' and trans_type = '{trans_type}'"
  24. # def __query(sql):
  25. # trans_print('开始执行SQL:',sql)
  26. # plt_conn.ping(reconnect=True)
  27. # with plt_conn.cursor() as cursor:
  28. # df = pd.read_sql(sql, cursor)
  29. # return df
  30. #
  31. #
  32. # def __ddl_sql(sql):
  33. # trans_print('开始执行SQL:',sql)
  34. # plt_conn.ping(reconnect=True)
  35. # with plt_conn.cursor() as cursor:
  36. # cursor.execute(sql)
  37. # plt_conn.commit()
  38. def update_transe_status(batch_no, trans_type, status, message):
  39. exec_sql = susscess_sql if status == 'success' else error_sql
  40. exec_sql = exec_sql.format(batch_no=batch_no, status=status, message=message, trans_type=trans_type)
  41. #
  42. # plt_conn.ping(reconnect=True)
  43. # with plt_conn.cursor() as cursor:
  44. # cursor.execute(exec_sql)
  45. # plt_conn.commit()
  46. trans_print(exec_sql)
  47. # def insert_data(batch_no, type, status, message):
  48. # exec_sql = insert_sql.format(batch_no=batch_no, type=type, status=status, message=message)
  49. # plt_conn.ping(reconnect=True)
  50. # with plt_conn.cursor() as cursor:
  51. # cursor.execute(exec_sql)
  52. #
  53. # plt_conn.commit()
  54. def create_table(batch_no, date_list=list(), fengji_list=list()):
  55. pass
  56. def get_exec_data():
  57. query_running_sql = "selecgt 1 from table where status = 'running"
  58. query_next_exdc_sql = "selecgt 1 from table where status = 'waiting' order by id "
  59. trans_print(query_next_exdc_sql)
  60. # df = __query(query_running_sql)
  61. # if df.empty:
  62. # df = __query(query_next_exdc_sql)
  63. # if df.empty:
  64. # return None
  65. # else:
  66. # return df.iloc[0]