viewDataAnalysis.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from drf_yasg import openapi
  4. from drf_yasg.utils import swagger_auto_schema
  5. from rest_framework.viewsets import ViewSet
  6. from django.http import HttpResponse
  7. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  8. from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  9. from algorithmContract.const import *
  10. from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  11. from algorithmContract.confBusiness import *
  12. from behavior.outputProcessor import OutputProcessor
  13. from service.serviceOfDataAnalysis import ServiceOfDataAnalysis
  14. import threading
  15. tags = ['dataAnalysis']
  16. # 创建锁
  17. lock = threading.Lock()
  18. def getClientIP(request):
  19. x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
  20. if x_forwarded_for:
  21. ip = x_forwarded_for.split(',')[0]
  22. else:
  23. ip = request.META.get('REMOTE_ADDR')
  24. return ip
  25. class DataAnalysis(ViewSet):
  26. @swagger_auto_schema(
  27. operation_description="apiview post description override",
  28. request_body=openapi.Schema(
  29. type=openapi.TYPE_OBJECT,
  30. ),
  31. security=[], tags=tags, operation_summary='分析')
  32. def analysis(self, request):
  33. try:
  34. logger = GetLogger()
  35. databasesUtil = GetDbUtil()
  36. minioClient = GetMinIOUtil()
  37. conf: Contract = LoadAnalysisInput(request.body)
  38. outputProcessor = OutputProcessor(
  39. conf, logger, databasesUtil, minioClient)
  40. foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  41. with foundationDB.session_scope() as foundationSession:
  42. outputProcessor.analysisState(
  43. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
  44. clientIP=getClientIP(request=request)
  45. logger.info(f"client IP: {clientIP} get_body {request.body}")
  46. try:
  47. dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
  48. except (CustomError, Exception) as exception:
  49. # 使用 traceback 模块获取完整的异常信息
  50. error_message =''.join(traceback.format_exception(exception))
  51. ex = exception if isinstance(exception, CustomError) else CustomError(-1)
  52. code = ex.code
  53. message = ex.message
  54. with foundationDB.session_scope() as session:
  55. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  56. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  57. return HttpResponse("call success.", status=200)
  58. # 定义一个任务来执行analysis
  59. def runAnalysis():
  60. # with semaphore:
  61. with lock:
  62. try:
  63. dataAnalysisService.analysis()
  64. except Exception as e:
  65. raise e
  66. # 启动一个线程来执行任务
  67. analysis_thread = threading.Thread(target=runAnalysis)
  68. analysis_thread.start()
  69. logger.info("Analysis started")
  70. return HttpResponse("call success.", status=200)
  71. except (CustomError, Exception) as e:
  72. try:
  73. dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  74. ex=e if isinstance(e, CustomError) else CustomError(-1)
  75. code = ex.code
  76. message = ex.message
  77. with dbUtil.session_scope() as session:
  78. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  79. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  80. except Exception as e1:
  81. # 使用 traceback 模块获取完整的异常信息
  82. error_message = ''.join(traceback.format_exception(e1))
  83. # 记录异常信息
  84. logger.error(f"捕获到异常: {error_message}")
  85. # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
  86. return HttpResponse(f"内部服务器错误: {error_message}", status=500)