viewDataAnalysis.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from drf_yasg import openapi
  4. from drf_yasg.utils import swagger_auto_schema
  5. from rest_framework.viewsets import ViewSet
  6. from django.http import HttpResponse
  7. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  8. from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
  9. from algorithmContract.const import *
  10. from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
  11. from algorithmContract.confBusiness import *
  12. from behavior.outputProcessor import OutputProcessor
  13. from service.serviceOfDataAnalysis import ServiceOfDataAnalysis
  14. import threading
  15. tags = ['dataAnalysis']
  16. # 创建锁
  17. lock = threading.Lock()
  18. def getClientIP(request):
  19. x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
  20. if x_forwarded_for:
  21. ip = x_forwarded_for.split(',')[0]
  22. else:
  23. ip = request.META.get('REMOTE_ADDR')
  24. return ip
  25. class DataAnalysis(ViewSet):
  26. @swagger_auto_schema(
  27. operation_description="apiview post description override",
  28. request_body=openapi.Schema(
  29. type=openapi.TYPE_OBJECT,
  30. ),
  31. security=[], tags=tags, operation_summary='分析')
  32. def analysis(self, request):
  33. try:
  34. logger = GetLogger()
  35. databasesUtil = GetDbUtil()
  36. minioClient = GetMinIOUtil()
  37. conf: Contract = LoadAnalysisInput(request.body)
  38. outputProcessor = OutputProcessor(
  39. conf, logger, databasesUtil, minioClient)
  40. foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  41. with foundationDB.session_scope() as foundationSession:
  42. outputProcessor.analysisState(
  43. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.RequstQueue.value, analysisProgress=5)
  44. clientIP=getClientIP(request=request)
  45. logger.info(f"client IP: {clientIP} get_body {request.body}")
  46. dataAnalysisService = ServiceOfDataAnalysis(
  47. logger, databasesUtil, minioClient, conf)
  48. # 定义一个任务来执行analysis
  49. def runAnalysis():
  50. # with semaphore:
  51. with lock:
  52. try:
  53. dataAnalysisService.analysis()
  54. except Exception as e:
  55. raise e
  56. # 启动一个线程来执行任务
  57. analysis_thread = threading.Thread(target=runAnalysis)
  58. analysis_thread.start()
  59. logger.info("Analysis started")
  60. return HttpResponse("call success.", status=200)
  61. except (CustomError, Exception) as e:
  62. try:
  63. dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
  64. ex=e if isinstance(e, CustomError) else CustomError(-1)
  65. code = ex.code
  66. message = ex.message
  67. with dbUtil.session_scope() as session:
  68. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  69. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  70. except Exception as e1:
  71. # 使用 traceback 模块获取完整的异常信息
  72. error_message = ''.join(traceback.format_exception(
  73. etype=type(e1), value=e1, tb=e1.__traceback__))
  74. # 记录异常信息
  75. self.logger.error(f"捕获到异常: {error_message}")
  76. finally:
  77. # 使用 traceback 模块获取完整的异常信息
  78. error_message = ''.join(traceback.format_exception(
  79. etype=type(e), value=e, tb=e.__traceback__))
  80. # 记录异常信息
  81. logger.error(f"捕获到异常: {error_message}")
  82. # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
  83. return HttpResponse(f"内部服务器错误: {error_message}", status=500)