viewDataAnalysis.py 4.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from drf_yasg import openapi
  4. from drf_yasg.utils import swagger_auto_schema
  5. from rest_framework.viewsets import ViewSet
  6. from django.http import HttpResponse
  7. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  8. from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  9. from algorithmContract.const import *
  10. from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  11. from algorithmContract.confBusiness import *
  12. from behavior.outputProcessor import OutputProcessor
  13. from service.serviceOfDataAnalysis import ServiceOfDataAnalysis
  14. import threading
  15. tags = ['dataAnalysis']
  16. # 创建锁
  17. lock = threading.Lock()
  18. def getClientIP(request):
  19. x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
  20. if x_forwarded_for:
  21. ip = x_forwarded_for.split(',')[0]
  22. else:
  23. ip = request.META.get('REMOTE_ADDR')
  24. return ip
  25. class DataAnalysis(ViewSet):
  26. @swagger_auto_schema(
  27. operation_description="apiview post description override",
  28. request_body=openapi.Schema(
  29. type=openapi.TYPE_OBJECT,
  30. ),
  31. security=[], tags=tags, operation_summary='分析')
  32. def analysis(self, request):
  33. try:
  34. logger = GetLogger()
  35. databasesUtil = GetDbUtil()
  36. minioClient = GetMinIOUtil()
  37. conf: Contract = LoadAnalysisInput(request.body)
  38. outputProcessor = OutputProcessor(
  39. conf, logger, databasesUtil, minioClient)
  40. foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  41. with foundationDB.session_scope() as foundationSession:
  42. outputProcessor.analysisState(
  43. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
  44. clientIP=getClientIP(request=request)
  45. logger.info(f"client IP: {clientIP} get_body {request.body}")
  46. try:
  47. dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
  48. except Exception as exception:
  49. # 使用 traceback 模块获取完整的异常信息
  50. error_message =''.join(traceback.format_exception(exception))
  51. code = exception.code
  52. message = exception.message
  53. with foundationDB.session_scope() as session:
  54. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  55. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  56. return HttpResponse("call success.", status=200)
  57. # 定义一个任务来执行analysis
  58. def runAnalysis():
  59. # with semaphore:
  60. with lock:
  61. try:
  62. dataAnalysisService.analysis()
  63. except Exception as e:
  64. raise e
  65. # 启动一个线程来执行任务
  66. analysis_thread = threading.Thread(target=runAnalysis)
  67. analysis_thread.start()
  68. logger.info("Analysis started")
  69. return HttpResponse("call success.", status=200)
  70. except (CustomError, Exception) as e:
  71. try:
  72. dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  73. ex=e if isinstance(e, CustomError) else CustomError(-1)
  74. code = ex.code
  75. message = ex.message
  76. with dbUtil.session_scope() as session:
  77. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  78. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  79. except Exception as e1:
  80. # 使用 traceback 模块获取完整的异常信息
  81. error_message = ''.join(traceback.format_exception(e1))
  82. # 记录异常信息
  83. self.logger.error(f"捕获到异常: {error_message}")
  84. # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
  85. return HttpResponse(f"内部服务器错误: {error_message}", status=500)