123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- # -*- coding: utf-8 -*-
- import traceback
- from drf_yasg import openapi
- from drf_yasg.utils import swagger_auto_schema
- from rest_framework.viewsets import ViewSet
- from django.http import HttpResponse
- from utils.rdbmsUtil.databaseUtil import DatabaseUtil
- # from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil
- # from dataContract.algorithmContract.const import *
- # from dataContract.algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
- # from dataContract.algorithmContract.confBusiness import *
- # from dataAnalysisBehavior.behavior.outputProcessor import OutputProcessor
- from appConfig import GetLogger, GetDbUtil, GetMinIOUtil
- from algorithmContract.const import *
- from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis
- from algorithmContract.confBusiness import *
- from behavior.outputProcessor import OutputProcessor
- from dataAnalysisService.service.serviceOfDataAnalysis import ServiceOfDataAnalysis
- import threading
- tags = ['dataAnalysis']
- # 创建锁
- lock = threading.Lock()
- def getClientIP(request):
- x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
- if x_forwarded_for:
- ip = x_forwarded_for.split(',')[0]
- else:
- ip = request.META.get('REMOTE_ADDR')
- return ip
- class DataAnalysis(ViewSet):
- @swagger_auto_schema(
- operation_description="apiview post description override",
- request_body=openapi.Schema(
- type=openapi.TYPE_OBJECT,
- ),
- security=[], tags=tags, operation_summary='分析')
- def analysis(self, request):
- try:
- logger = GetLogger()
- databasesUtil = GetDbUtil()
- minioClient = GetMinIOUtil()
- conf: Contract = LoadAnalysisInput(request.body)
- outputProcessor = OutputProcessor(
- conf, logger, databasesUtil, minioClient)
- foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
- with foundationDB.session_scope() as foundationSession:
- outputProcessor.analysisState(
- foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
- clientIP=getClientIP(request=request)
- logger.info(f"client IP: {clientIP} get_body {request.body}")
- try:
- dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
- except (CustomError, Exception) as exception:
- # 使用 traceback 模块获取完整的异常信息
- error_message =''.join(traceback.format_exc())
- ex = exception if isinstance(exception, CustomError) else CustomError(-1)
- code = ex.code
- message = ex.message
- with foundationDB.session_scope() as session:
- outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
- AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
- return HttpResponse("call success.", status=200)
- # 定义一个任务来执行analysis
- def runAnalysis():
- # with semaphore:
- with lock:
- try:
- dataAnalysisService.analysis()
- except Exception as e:
- raise e
- # 启动一个线程来执行任务
- analysis_thread = threading.Thread(target=runAnalysis)
- analysis_thread.start()
- logger.info("Analysis started")
- return HttpResponse("call success.", status=200)
- except (CustomError, Exception) as e:
- try:
- dbUtil = databasesUtil[DATABASE_BusinessFoundationDb]
- ex=e if isinstance(e, CustomError) else CustomError(-1)
- code = ex.code
- message = ex.message
- with dbUtil.session_scope() as session:
- outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
- AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
- except Exception as e1:
- # 使用 traceback 模块获取完整的异常信息
- error_message = ''.join(traceback.format_exc())
- # 记录异常信息
- logger.error(f"捕获到异常: {error_message}")
- # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误)
- return HttpResponse(f"内部服务器错误: {error_message}", status=500)
|