viewDataAnalysis.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from drf_yasg import openapi
  4. from drf_yasg.utils import swagger_auto_schema
  5. from rest_framework.viewsets import ViewSet
  6. from django.http import HttpResponse
  7. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  8. # from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  9. # from dataContract.algorithmContract.const import *
  10. # from dataContract.algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  11. # from dataContract.algorithmContract.confBusiness import *
  12. # from dataAnalysisBehavior.behavior.outputProcessor import OutputProcessor
  13. from appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  14. from algorithmContract.const import *
  15. from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  16. from algorithmContract.confBusiness import *
  17. from behavior.outputProcessor import OutputProcessor
  18. from dataAnalysisService.service.serviceOfDataAnalysis import ServiceOfDataAnalysis
  19. import threading
  20. tags = ['dataAnalysis']
  21. # 创建锁
  22. lock = threading.Lock()
  23. def getClientIP(request):
  24. x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
  25. if x_forwarded_for:
  26. ip = x_forwarded_for.split(',')[0]
  27. else:
  28. ip = request.META.get('REMOTE_ADDR')
  29. return ip
  30. class DataAnalysis(ViewSet):
  31. @swagger_auto_schema(
  32. operation_description="apiview post description override",
  33. request_body=openapi.Schema(
  34. type=openapi.TYPE_OBJECT,
  35. ),
  36. security=[], tags=tags, operation_summary='分析')
  37. def analysis(self, request):
  38. try:
  39. logger = GetLogger()
  40. databasesUtil = GetDbUtil()
  41. minioClient = GetMinIOUtil()
  42. conf: Contract = LoadAnalysisInput(request.body)
  43. outputProcessor = OutputProcessor(
  44. conf, logger, databasesUtil, minioClient)
  45. foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  46. with foundationDB.session_scope() as foundationSession:
  47. outputProcessor.analysisState(
  48. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
  49. clientIP=getClientIP(request=request)
  50. logger.info(f"client IP: {clientIP} get_body {request.body}")
  51. try:
  52. dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
  53. except (CustomError, Exception) as exception:
  54. # 使用 traceback 模块获取完整的异常信息
  55. error_message =''.join(traceback.format_exception(exception))
  56. ex = exception if isinstance(exception, CustomError) else CustomError(-1)
  57. code = ex.code
  58. message = ex.message
  59. with foundationDB.session_scope() as session:
  60. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  61. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  62. return HttpResponse("call success.", status=200)
  63. # 定义一个任务来执行analysis
  64. def runAnalysis():
  65. # with semaphore:
  66. with lock:
  67. try:
  68. dataAnalysisService.analysis()
  69. except Exception as e:
  70. raise e
  71. # 启动一个线程来执行任务
  72. analysis_thread = threading.Thread(target=runAnalysis)
  73. analysis_thread.start()
  74. logger.info("Analysis started")
  75. return HttpResponse("call success.", status=200)
  76. except (CustomError, Exception) as e:
  77. try:
  78. dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  79. ex=e if isinstance(e, CustomError) else CustomError(-1)
  80. code = ex.code
  81. message = ex.message
  82. with dbUtil.session_scope() as session:
  83. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  84. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  85. except Exception as e1:
  86. # 使用 traceback 模块获取完整的异常信息
  87. error_message = ''.join(traceback.format_exception(e1))
  88. # 记录异常信息
  89. logger.error(f"捕获到异常: {error_message}")
  90. # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
  91. return HttpResponse(f"内部服务器错误: {error_message}", status=500)