|
@@ -17,13 +17,13 @@ tags = ['dataAnalysis']
|
|
|
# 创建锁
|
|
|
lock = threading.Lock()
|
|
|
|
|
|
-def getClientIP(request):
|
|
|
- x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
|
- if x_forwarded_for:
|
|
|
- ip = x_forwarded_for.split(',')[0]
|
|
|
- else:
|
|
|
- ip = request.META.get('REMOTE_ADDR')
|
|
|
- return ip
|
|
|
+def getClientIP(request):
|
|
|
+ x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
|
+ if x_forwarded_for:
|
|
|
+ ip = x_forwarded_for.split(',')[0]
|
|
|
+ else:
|
|
|
+ ip = request.META.get('REMOTE_ADDR')
|
|
|
+ return ip
|
|
|
|
|
|
class DataAnalysis(ViewSet):
|
|
|
|
|
@@ -38,7 +38,7 @@ class DataAnalysis(ViewSet):
|
|
|
logger = GetLogger()
|
|
|
databasesUtil = GetDbUtil()
|
|
|
minioClient = GetMinIOUtil()
|
|
|
-
|
|
|
+
|
|
|
conf: Contract = LoadAnalysisInput(request.body)
|
|
|
|
|
|
outputProcessor = OutputProcessor(
|
|
@@ -46,13 +46,24 @@ class DataAnalysis(ViewSet):
|
|
|
foundationDB: DatabaseUtil = databasesUtil[DATABASE_BusinessFoundationDb]
|
|
|
with foundationDB.session_scope() as foundationSession:
|
|
|
outputProcessor.analysisState(
|
|
|
- foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.RequstQueue.value, analysisProgress=5)
|
|
|
+ foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=5)
|
|
|
|
|
|
clientIP=getClientIP(request=request)
|
|
|
logger.info(f"client IP: {clientIP} get_body {request.body}")
|
|
|
+ try:
|
|
|
+ dataAnalysisService = ServiceOfDataAnalysis(logger, databasesUtil, minioClient, conf)
|
|
|
+ except Exception as exception:
|
|
|
+ # 使用 traceback 模块获取完整的异常信息
|
|
|
+ error_message = ''.join(traceback.format_exception(
|
|
|
+ etype=type(exception), value=exception, tb=exception.__traceback__))
|
|
|
+ self.logger.error(f"捕获到异常: {error_message}")
|
|
|
+ code = exception.code
|
|
|
+ message = exception.message
|
|
|
+ with foundationDB.session_scope() as session:
|
|
|
+ outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
|
|
|
+ AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
|
|
|
+ return HttpResponse("call success.", status=200)
|
|
|
|
|
|
- dataAnalysisService = ServiceOfDataAnalysis(
|
|
|
- logger, databasesUtil, minioClient, conf)
|
|
|
|
|
|
# 定义一个任务来执行analysis
|
|
|
def runAnalysis():
|
|
@@ -75,7 +86,7 @@ class DataAnalysis(ViewSet):
|
|
|
|
|
|
ex=e if isinstance(e, CustomError) else CustomError(-1)
|
|
|
code = ex.code
|
|
|
- message = ex.message
|
|
|
+ message = ex.message
|
|
|
with dbUtil.session_scope() as session:
|
|
|
outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
|
|
|
AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
|