|
@@ -46,21 +46,20 @@ class OutputProcessor:
|
|
|
directory = f"output/{powerFarmID}/{dataBatchNum}"
|
|
|
shutil.rmtree(directory)
|
|
|
|
|
|
- def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0,analysis_finish_time: datetime = None):
|
|
|
+ def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, analysisProgress: float = 0,analysis_finish_time: datetime=datetime.now(timezone.utc) + timedelta(hours=8)):
|
|
|
"""
|
|
|
写处理状态 至主表 analysis_result
|
|
|
写总图(多机组一张图表)上传文件 至子表 analysis_general_file
|
|
|
写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation
|
|
|
"""
|
|
|
- sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, create_time, analysis_progress, analysis_finish_time, update_time) \
|
|
|
- VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :create_time, :analysis_progress, :analysis_finish_time, :update_time) \
|
|
|
+ sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, analysis_progress, analysis_finish_time) \
|
|
|
+ VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :analysis_progress, :analysis_finish_time) \
|
|
|
ON DUPLICATE KEY \
|
|
|
UPDATE \
|
|
|
analysis_state=VALUES(analysis_state), \
|
|
|
err_state=VALUES(err_state), \
|
|
|
err_code=VALUES(err_code), \
|
|
|
err_info=VALUES(err_info), \
|
|
|
- update_time=VALUES(update_time), \
|
|
|
analysis_progress=VALUES(analysis_progress), \
|
|
|
analysis_finish_time=VALUES(analysis_finish_time);")
|
|
|
|
|
@@ -70,23 +69,21 @@ class OutputProcessor:
|
|
|
"err_state": None if self.common.isNone(analysisState) else errorState,
|
|
|
"err_code": None if self.common.isNone(errorCode) else errorCode,
|
|
|
"err_info": None if self.common.isNone(errorInfo) else errorInfo,
|
|
|
- "create_time": timestamp,
|
|
|
- "update_time": timestamp,
|
|
|
"analysis_progress": analysisProgress,
|
|
|
"analysis_finish_time":analysis_finish_time if analysis_finish_time is not None else None
|
|
|
}
|
|
|
|
|
|
session.execute(sql, params)
|
|
|
|
|
|
- def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
|
|
|
+ def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame):
|
|
|
dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] != 'total') & (
|
|
|
returnDataFrame[Field_Return_IsSaveDatabase])]
|
|
|
|
|
|
for index, row in dataFrame.iterrows():
|
|
|
sql = text(f"""
|
|
|
INSERT INTO analysis_diagram_relation
|
|
|
- (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis, create_time)
|
|
|
- VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis, :create_time)
|
|
|
+ (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis)
|
|
|
+ VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis)
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
field_engine_code=VALUES(field_engine_code),
|
|
|
analysis_type_code=VALUES(analysis_type_code),
|
|
@@ -99,21 +96,20 @@ class OutputProcessor:
|
|
|
"field_engine_code": row[Field_CodeOfTurbine],
|
|
|
"analysis_type_code": row[Field_Return_TypeAnalyst],
|
|
|
"file_addr": row[Const_FileURL],
|
|
|
- "auto_analysis": self.autoOrManual,
|
|
|
- "create_time": timestamp
|
|
|
+ "auto_analysis": self.autoOrManual
|
|
|
}
|
|
|
|
|
|
session.execute(sql, params)
|
|
|
|
|
|
- def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
|
|
|
+ def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame):
|
|
|
dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] == 'total') & (
|
|
|
returnDataFrame[Field_Return_IsSaveDatabase])]
|
|
|
|
|
|
for index, row in dataFrame.iterrows():
|
|
|
sql = text(f"""
|
|
|
INSERT INTO analysis_general_file
|
|
|
- (batch_code, analysis_type_code, engine_type_code, file_addr, auto_analysis, create_time)
|
|
|
- VALUES (:batch_code, :analysis_type_code, :engine_type_code, :file_addr, :auto_analysis, :create_time)
|
|
|
+ (batch_code, analysis_type_code, engine_type_code, file_addr, auto_analysis)
|
|
|
+ VALUES (:batch_code, :analysis_type_code, :engine_type_code, :file_addr, :auto_analysis)
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
analysis_type_code=VALUES(analysis_type_code),
|
|
|
engine_type_code=VALUES(engine_type_code),
|
|
@@ -126,8 +122,7 @@ class OutputProcessor:
|
|
|
"analysis_type_code": row[Field_Return_TypeAnalyst],
|
|
|
"engine_type_code": row[Field_MillTypeCode],
|
|
|
"file_addr": row[Const_FileURL],
|
|
|
- "auto_analysis": self.autoOrManual,
|
|
|
- "create_time": timestamp
|
|
|
+ "auto_analysis": self.autoOrManual
|
|
|
}
|
|
|
|
|
|
session.execute(sql, params)
|
|
@@ -164,17 +159,17 @@ class OutputProcessor:
|
|
|
with foundationDB.session_scope() as session:
|
|
|
# 第一次调用,设置进度为50%
|
|
|
self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
|
|
|
- ErrorState.NotErr.value, None, None, timestamp, 50)
|
|
|
+ ErrorState.NotErr.value, None, None, 50)
|
|
|
self.analysisResultForTotal(
|
|
|
- session, returnDataFrame, timestamp)
|
|
|
+ session, returnDataFrame)
|
|
|
self.analysisResultForTurbine(
|
|
|
- session, returnDataFrame, timestamp)
|
|
|
+ session, returnDataFrame)
|
|
|
# 获取分析完成的时间
|
|
|
finish_time = datetime.now(timezone.utc) + timedelta(hours=8)
|
|
|
|
|
|
# 第二次调用:更新分析状态为已完成,进度为100%,并记录完成时间
|
|
|
self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
|
|
|
- ErrorState.NotErr.value, None, None, timestamp, 100, analysis_finish_time=finish_time)
|
|
|
+ ErrorState.NotErr.value, None, None, 100, analysis_finish_time=finish_time)
|
|
|
|
|
|
self.removeLocalFiles(powerFarmID, dataBatchNum)
|
|
|
except Exception as e:
|