viewDataAnalysis.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from drf_yasg import openapi
  4. from drf_yasg.utils import swagger_auto_schema
  5. from rest_framework.viewsets import ViewSet
  6. from django.http import HttpResponse
  7. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  8. from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  9. from algorithmContract.const import *
  10. from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  11. from algorithmContract.confBusiness import *
  12. from behavior.outputProcessor import OutputProcessor
  13. from service.serviceOfDataAnalysis import ServiceOfDataAnalysis
  14. import threading
  15. tags = ['dataAnalysis']
  16. # 创建锁
  17. lock = threading.Lock()
  18. def getClientIP(request):
  19. x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
  20. if x_forwarded_for:
  21. ip = x_forwarded_for.split(',')[0]
  22. else:
  23. ip = request.META.get('REMOTE_ADDR')
  24. return ip
  25. class DataAnalysis(ViewSet):
  26. @swagger_auto_schema(
  27. operation_description="apiview post description override",
  28. request_body=openapi.Schema(
  29. type=openapi.TYPE_OBJECT,
  30. ),
  31. security=[], tags=tags, operation_summary='分析')
  32. def analysis(self, request):
  33. try:
  34. logger = GetLogger()
  35. databasesUtil = GetDbUtil()
  36. minioClient = GetMinIOUtil()
  37. conf: Contract = LoadAnalysisInput(request.body)
  38. outputProcessor = OutputProcessor(
  39. conf, logger, databasesUtil, minioClient)
  40. foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  41. with foundationDB.session_scope() as foundationSession:
  42. outputProcessor.analysisState(
  43. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
  44. clientIP=getClientIP(request=request)
  45. logger.info(f"client IP: {clientIP} get_body {request.body}")
  46. try:
  47. dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
  48. except Exception as exception:
  49. # 使用 traceback 模块获取完整的异常信息
  50. error_message = ''.join(traceback.format_exception(
  51. etype=type(exception), value=exception, tb=exception.__traceback__))
  52. self.logger.error(f"捕获到异常: {error_message}")
  53. code = exception.code
  54. message = exception.message
  55. with foundationDB.session_scope() as session:
  56. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  57. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  58. return HttpResponse("call success.", status=200)
  59. # 定义一个任务来执行analysis
  60. def runAnalysis():
  61. # with semaphore:
  62. with lock:
  63. try:
  64. dataAnalysisService.analysis()
  65. except Exception as e:
  66. raise e
  67. # 启动一个线程来执行任务
  68. analysis_thread = threading.Thread(target=runAnalysis)
  69. analysis_thread.start()
  70. logger.info("Analysis started")
  71. return HttpResponse("call success.", status=200)
  72. except (CustomError, Exception) as e:
  73. try:
  74. dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  75. ex=e if isinstance(e, CustomError) else CustomError(-1)
  76. code = ex.code
  77. message = ex.message
  78. with dbUtil.session_scope() as session:
  79. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  80. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  81. except Exception as e1:
  82. # 使用 traceback 模块获取完整的异常信息
  83. error_message = ''.join(traceback.format_exception(
  84. etype=type(e1), value=e1, tb=e1.__traceback__))
  85. # 记录异常信息
  86. self.logger.error(f"捕获到异常: {error_message}")
  87. finally:
  88. # 使用 traceback 模块获取完整的异常信息
  89. error_message = ''.join(traceback.format_exception(
  90. etype=type(e), value=e, tb=e.__traceback__))
  91. # 记录异常信息
  92. logger.error(f"捕获到异常: {error_message}")
  93. # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
  94. return HttpResponse(f"内部服务器错误: {error_message}", status=500)