# -*- coding: utf-8 -*- import traceback from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework.viewsets import ViewSet from django.http import HttpResponse from utils.rdbmsUtil.databaseUtil import DatabaseUtil # from common.appConfig import GetLogger, GetDbUtil, GetMinIOUtil # from dataContract.algorithmContract.const import * # from dataContract.algorithmContract.contract import Contract, LoadAnalysisInput, Analysis # from dataContract.algorithmContract.confBusiness import * # from dataAnalysisBehavior.behavior.outputProcessor import OutputProcessor from appConfig import GetLogger, GetDbUtil, GetMinIOUtil from algorithmContract.const import * from algorithmContract.contract import Contract, LoadAnalysisInput, Analysis from algorithmContract.confBusiness import * from behavior.outputProcessor import OutputProcessor from dataAnalysisService.service.serviceOfDataAnalysis import ServiceOfDataAnalysis import threading tags = ['dataAnalysis'] # 创建锁 lock = threading.Lock() def getClientIP(request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class DataAnalysis(ViewSet): @swagger_auto_schema( operation_description="apiview post description override", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, ), security=[], tags=tags, operation_summary='分析') def analysis(self, request): try: logger = GetLogger() databasesUtil = GetDbUtil() minioClient = GetMinIOUtil() conf: Contract = LoadAnalysisInput(request.body) outputProcessor = OutputProcessor( conf, logger, databasesUtil, minioClient) foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb] with foundationDB.session_scope() as foundationSession: outputProcessor.analysisState( foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5) clientIP=getClientIP(request=request) logger.info(f"client IP: {clientIP} get_body {request.body}") try: dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf) except (CustomError, Exception) as exception: # 使用 traceback 模块获取完整的异常信息 error_message =''.join(traceback.format_exc()) ex = exception if isinstance(exception, CustomError) else CustomError(-1) code = ex.code message = ex.message with foundationDB.session_scope() as session: outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.Err.value, code, message) return HttpResponse("call success.", status=200) # 定义一个任务来执行analysis def runAnalysis(): # with semaphore: with lock: try: dataAnalysisService.analysis() except Exception as e: raise e # 启动一个线程来执行任务 analysis_thread = threading.Thread(target=runAnalysis) analysis_thread.start() logger.info("Analysis started") return HttpResponse("call success.", status=200) except (CustomError, Exception) as e: try: dbUtil = databasesUtil[DATABASE_BusinessFoundationDb] ex=e if isinstance(e, CustomError) else CustomError(-1) code = ex.code message = ex.message with dbUtil.session_scope() as session: outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.Err.value, code, message) except Exception as e1: # 使用 traceback 模块获取完整的异常信息 error_message = ''.join(traceback.format_exc()) # 记录异常信息 logger.error(f"捕获到异常: {error_message}") # 返回包含错误信息的 HTTP 响应,状态码设为 500(内部服务器错误) return HttpResponse(f"内部服务器错误: {error_message}", status=500)