|
@@ -1,36 +1,55 @@
|
|
|
package com.energy.online.data.service.listener;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.beust.jcommander.internal.Lists;
|
|
|
-import com.energy.online.data.service.po.MeasurementTempPO;
|
|
|
-import com.energy.online.data.service.service.J104ClinetService;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
+import com.energy.online.data.dto.BaseConfig;
|
|
|
+import com.energy.online.data.dto.Measurepoint;
|
|
|
+import com.energy.online.data.dto.WindFarm;
|
|
|
+import com.energy.online.data.utils.MyConfigUtils;
|
|
|
import org.openmuc.j60870.ASdu;
|
|
|
import org.openmuc.j60870.ConnectionEventListener;
|
|
|
import org.openmuc.j60870.ie.InformationElement;
|
|
|
import org.openmuc.j60870.ie.InformationObject;
|
|
|
-import org.springframework.http.*;
|
|
|
-import org.springframework.util.MultiValueMap;
|
|
|
-import org.springframework.web.client.RestTemplate;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.FileWriter;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Paths;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
import java.time.Instant;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
|
-@Slf4j
|
|
|
public class J60870ClientListener implements ConnectionEventListener {
|
|
|
|
|
|
- private J104ClinetService j104ClinetService;
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(J60870ClientListener.class);
|
|
|
|
|
|
- private String addrerss;
|
|
|
+ private final Map<Long, Map<Integer, Object>> dataMap;
|
|
|
+ private final Map<Integer, Measurepoint> measurepointMap;
|
|
|
+ private final WindFarm windFarm;
|
|
|
+ private final BaseConfig baseConfig;
|
|
|
+ private final List<String> standardList = new ArrayList<>();
|
|
|
|
|
|
- private Integer confId;
|
|
|
+ private final SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
|
|
|
|
|
|
- public J60870ClientListener(J104ClinetService j104ClinetService, String addrerss, Integer confId) {
|
|
|
- this.j104ClinetService = j104ClinetService;
|
|
|
- this.addrerss = addrerss;
|
|
|
- this.confId = confId;
|
|
|
+ private static final int BATCH_SIZE = 1000;
|
|
|
+ private final Map<String, List<Map<String, Object>>> dateBuffers = new ConcurrentHashMap<>();
|
|
|
+ private final ExecutorService csvExecutor = Executors.newSingleThreadExecutor();
|
|
|
+
|
|
|
+ public J60870ClientListener(Map<Long, Map<Integer, Object>> dataMap, WindFarm windFarm, BaseConfig baseConfig) {
|
|
|
+ this.dataMap = dataMap;
|
|
|
+ this.windFarm = windFarm;
|
|
|
+ this.baseConfig = baseConfig;
|
|
|
+ this.measurepointMap = MyConfigUtils.getEnableMeasurepoints(baseConfig.getMeasurepoints());
|
|
|
+
|
|
|
+ this.standardList.add("time_stamp");
|
|
|
+ this.standardList.add("wind_turbine_name");
|
|
|
+ for (Measurepoint measurepoint : measurepointMap.values()) {
|
|
|
+ this.standardList.add(measurepoint.getStanderName());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public final String title = "104协议数据服务 - ";
|
|
@@ -38,146 +57,165 @@ public class J60870ClientListener implements ConnectionEventListener {
|
|
|
|
|
|
/**
|
|
|
* 监听从站发来的数据
|
|
|
- *
|
|
|
- * @param aSdu
|
|
|
*/
|
|
|
@Override
|
|
|
public void newASdu(ASdu aSdu) {
|
|
|
-// log.info(this.title + "J60870ClientListener NewASdu Get :" + aSdu.toString());
|
|
|
- List<InformationObject> list = new ArrayList<>(Arrays.asList(aSdu.getInformationObjects()));
|
|
|
- // 获取当前时刻,验证是否是同一秒数据,数据进行合并
|
|
|
- Instant now = Instant.now();
|
|
|
- // 获取当前时刻的秒数
|
|
|
- long epochSecond = now.getEpochSecond();
|
|
|
- Long receiptTime = j104ClinetService.getMeasurementTime(confId);
|
|
|
- long receiptTimeLong = receiptTime == null ? 0 : receiptTime.longValue();
|
|
|
- log.info(this.title + "J60870ClientListener NewASdu Get time ====> " + epochSecond+",接收数据大小:"+list.size());
|
|
|
- // 数据库最新数据秒时间等于0或者与当前秒时间相等
|
|
|
- if (receiptTimeLong == 0 || epochSecond == receiptTimeLong) {
|
|
|
- // 保存临时表
|
|
|
- saveMeasurementTemp(list, confId, epochSecond);
|
|
|
- }
|
|
|
- // 当前秒时间大于数据库最新秒时间并且数据库不等于0
|
|
|
- if (epochSecond > receiptTimeLong && receiptTimeLong != 0) {
|
|
|
- int commonAddress = aSdu.getCommonAddress();
|
|
|
- List<MeasurementTempPO> measurementTempPOList = j104ClinetService.getMeasurementTemp(receiptTimeLong, confId);
|
|
|
- log.info(JSON.toJSONString("时间秒数" + receiptTimeLong + ", 数据长度:" + measurementTempPOList.size()));
|
|
|
-
|
|
|
- Map<String, String> dataMap = new HashMap<>();
|
|
|
- //动态创建表并入库
|
|
|
- for (MeasurementTempPO po : measurementTempPOList) {
|
|
|
- Integer informationObjectAddress = po.getDataPoin();
|
|
|
- String informationObjectToString = po.getReceiptData();
|
|
|
- dataMap.put(String.valueOf(informationObjectAddress), informationObjectToString);
|
|
|
+ if (aSdu.getCommonAddress() == windFarm.getCoa()) {
|
|
|
+ List<InformationObject> list = new ArrayList<>(Arrays.asList(aSdu.getInformationObjects()));
|
|
|
+ // 获取当前时刻,验证是否是同一秒数据,数据进行合并
|
|
|
+ Instant now = Instant.now();
|
|
|
+ // 获取当前时刻的秒数
|
|
|
+ long epochSecond = now.getEpochSecond();
|
|
|
+ Long receiptTime = dataMap.keySet().isEmpty() ? null : dataMap.keySet().iterator().next();
|
|
|
+ long receiptTimeLong = receiptTime == null ? 0 : receiptTime;
|
|
|
+
|
|
|
+ // 当前秒时间大于数据库最新秒时间并且数据库不等于0
|
|
|
+ if (epochSecond > receiptTimeLong && receiptTimeLong != 0) {
|
|
|
+ // 保存临时表
|
|
|
+ List<Map<String, Object>> resultDatas = transToStandarData(receiptTimeLong);
|
|
|
+ asyncSaveToCsv(resultDatas, receiptTimeLong);
|
|
|
}
|
|
|
- j104ClinetService.processingDat(receiptTimeLong, dataMap, confId, commonAddress, addrerss);
|
|
|
- // 删除上一秒数据
|
|
|
- j104ClinetService.initDelMeasurementTemp();
|
|
|
- // 保存临时表
|
|
|
- saveMeasurementTemp(list, confId, epochSecond);
|
|
|
+ //添加新的数据
|
|
|
+ saveMeasurementTemp(list, epochSecond);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private List<Map<String, Object>> transToStandarData(long receiptTimeLong) {
|
|
|
+ List<Map<String, Object>> resultDatas = new ArrayList<>();
|
|
|
+ Map<Integer, Object> integerObjectMap = dataMap.get(receiptTimeLong);
|
|
|
+ if (null == integerObjectMap || integerObjectMap.isEmpty()) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<Integer, Map<String, Object>> map = new HashMap<>();
|
|
|
+ for (Map.Entry<Integer, Object> entry : integerObjectMap.entrySet()) {
|
|
|
+ Integer ioa = entry.getKey();
|
|
|
+ Integer windCode = ioa / baseConfig.getMeasurepointCount() + 1;
|
|
|
+ String standardName = measurepointMap.get(ioa % baseConfig.getMeasurepointCount()).getStanderName();
|
|
|
+
|
|
|
+ if (map.containsKey(windCode)) {
|
|
|
+ map.get(windCode).put(standardName, entry.getValue());
|
|
|
+ } else {
|
|
|
+ Map<String, Object> windMap = new HashMap<>();
|
|
|
+ windMap.put(standardName, entry.getValue());
|
|
|
+ windMap.put("wind_turbine_name", windCode);
|
|
|
+ windMap.put("time_stamp", dateTimeFormat.format(Date.from(Instant.ofEpochSecond(receiptTimeLong))));
|
|
|
+ map.put(windCode, windMap);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- private void saveMeasurementTemp(List<InformationObject> informationObjects, Integer confId, long epochSecond) {
|
|
|
- MeasurementTempPO measurementTempPO = null;
|
|
|
- List<MeasurementTempPO> list = Lists.newArrayList();
|
|
|
- for (InformationObject item : informationObjects) {
|
|
|
- // 秒数据入库
|
|
|
- measurementTempPO = new MeasurementTempPO();
|
|
|
- measurementTempPO.setConfId(confId);
|
|
|
- measurementTempPO.setReceiptDate(new Date());
|
|
|
- measurementTempPO.setReceiptTime(epochSecond);
|
|
|
- measurementTempPO.setDataPoin(item.getInformationObjectAddress());
|
|
|
- measurementTempPO.setReceiptData(toString(item));
|
|
|
- list.add(measurementTempPO);
|
|
|
+ for (Map<String, Object> data : map.values()) {
|
|
|
+ Set<String> keys = data.keySet();
|
|
|
+
|
|
|
+ standardList.forEach(point -> {
|
|
|
+ if (!keys.contains(point)) {
|
|
|
+ data.put(point, null);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ resultDatas.add(data);
|
|
|
}
|
|
|
- j104ClinetService.saveMeasurementTemp(list);
|
|
|
+ // 清理一秒的数据
|
|
|
+ dataMap.clear();
|
|
|
+ return resultDatas;
|
|
|
}
|
|
|
|
|
|
+ private void asyncSaveToCsv(List<Map<String, Object>> resultDatas, long receiptTimeLong) {
|
|
|
+ csvExecutor.execute(() -> {
|
|
|
+ synchronized (dateBuffers) {
|
|
|
+ String dateKey = dateFormat.format(Date.from(Instant.ofEpochSecond(receiptTimeLong)));
|
|
|
+ if (dateBuffers.containsKey(dateKey)) {
|
|
|
+ dateBuffers.get(dateKey).addAll(resultDatas);
|
|
|
+ } else {
|
|
|
+ saveLessThenBatchSizeData();
|
|
|
+ dateBuffers.put(dateKey, resultDatas);
|
|
|
+ }
|
|
|
+ List<Map<String, Object>> buffer = dateBuffers.get(dateKey);
|
|
|
|
|
|
- /**
|
|
|
- * 监听连接关闭
|
|
|
- *
|
|
|
- * @param cause
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void connectionClosed(IOException cause) {
|
|
|
- log.debug(this.title + "J60870ClientListener ConnectionClosed :" + cause.getMessage());
|
|
|
+ if (buffer.size() >= BATCH_SIZE) {
|
|
|
+ List<Map<String, Object>> toSave = new ArrayList<>(buffer.subList(0, BATCH_SIZE));
|
|
|
+ buffer.subList(0, BATCH_SIZE).clear();
|
|
|
+ flushToCsv(toSave, dateKey);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * ASdu类方法重写
|
|
|
- *
|
|
|
- * @param aSdu
|
|
|
- * @return
|
|
|
- */
|
|
|
- public String toString(ASdu aSdu) {
|
|
|
- List<String> strList = new ArrayList<>();
|
|
|
- if (aSdu.getInformationObjects() != null) {
|
|
|
- InformationObject[] informationObjects = aSdu.getInformationObjects();
|
|
|
- String str;
|
|
|
- for (int i = 0; i < informationObjects.length; ++i) {
|
|
|
- str = toString(informationObjects[i]);
|
|
|
- if (str != null) {
|
|
|
- strList.add(str);
|
|
|
+ private void saveLessThenBatchSizeData() {
|
|
|
+ if (!dateBuffers.isEmpty()) {
|
|
|
+ String key = dateBuffers.keySet().iterator().next();
|
|
|
+ List<Map<String, Object>> buffer = new ArrayList<>(dateBuffers.get(key));
|
|
|
+ flushToCsv(buffer, key);
|
|
|
+ dateBuffers.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void flushToCsv(List<Map<String, Object>> data, String dateStr) {
|
|
|
+ log.info(windFarm.getWindFarmName() + "保存" + data.size() + "条数据,日期为:" + dateStr);
|
|
|
+ String csvFile = String.format("%s_%s_%s.csv", windFarm.getWindFarmName(),
|
|
|
+ windFarm.getWindFarmCode(), dateStr);
|
|
|
+ boolean existsTitle = Files.exists(Paths.get(csvFile));
|
|
|
+ synchronized (csvFile) {
|
|
|
+ try (FileWriter writer = new FileWriter(csvFile, true)) {
|
|
|
+ if (!data.isEmpty() && !existsTitle) {
|
|
|
+ writer.append(String.join(",", standardList));
|
|
|
+ writer.append('\n');
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Map<String, Object> row : data) {
|
|
|
+ List<String> values = new ArrayList<>();
|
|
|
+ for (String standerName : standardList) {
|
|
|
+ Object value = row.get(standerName);
|
|
|
+ values.add(value != null ? value.toString() : "");
|
|
|
+ }
|
|
|
+ writer.append(String.join(",", values));
|
|
|
+ writer.append('\n');
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
- return strList.size() > 0 ? StringUtils.join(strList, "***") : null;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * InformationObject类方法重写
|
|
|
- *
|
|
|
- * @param informationObjectRecord
|
|
|
- * @return
|
|
|
- */
|
|
|
- public String toString(InformationObject informationObjectRecord) {
|
|
|
- String str;
|
|
|
- InformationElement[] informationElementSet;
|
|
|
- List<String> strList = new ArrayList<>();
|
|
|
- String split = "Short float value:";
|
|
|
- InformationElement[][] valData = informationObjectRecord.getInformationElements();
|
|
|
- if (valData.length > 0) {
|
|
|
- for (int i = 0; i < valData.length; ++i) {
|
|
|
- informationElementSet = valData[i];
|
|
|
- for (int ii = 0; ii < informationElementSet.length; ++ii) {
|
|
|
- /**
|
|
|
- * 目测可能出现的情况:
|
|
|
- * Short float value: 18.2
|
|
|
- * Quality, overflow: false, blocked: false, substituted: false, not topical: false, invalid: false
|
|
|
- */
|
|
|
- str = informationElementSet[ii].toString();
|
|
|
- if (str.contains(split)) {
|
|
|
- strList.add(str.split(split)[1].trim());
|
|
|
+ private static final String split = "Short float value:";
|
|
|
+
|
|
|
+ private void saveMeasurementTemp(List<InformationObject> informationObjects, long epochSecond) {
|
|
|
+ if (!dataMap.containsKey(epochSecond)) {
|
|
|
+ dataMap.put(epochSecond, new HashMap<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ for (InformationObject item : informationObjects) {
|
|
|
+ int ioa = item.getInformationObjectAddress();
|
|
|
+
|
|
|
+ if (measurepointMap.containsKey(ioa % baseConfig.getMeasurepointCount())) {
|
|
|
+ // 一秒中多条数据整理
|
|
|
+ InformationElement[][] valData = item.getInformationElements();
|
|
|
+ if (valData.length > 0) {
|
|
|
+ InformationElement[] informationElementSet;
|
|
|
+ for (InformationElement[] valDatum : valData) {
|
|
|
+ informationElementSet = valDatum;
|
|
|
+ for (InformationElement informationElement : informationElementSet) {
|
|
|
+ String dataStr = informationElement.toString();
|
|
|
+ if (dataStr.contains(split)) {
|
|
|
+ dataMap.get(epochSecond).put(item.getInformationObjectAddress(), dataStr.split(split)[1].trim());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return strList.size() > 0 ? StringUtils.join(strList, ",") : null;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
- * 发送POST请求
|
|
|
- *
|
|
|
- * @param url
|
|
|
- * @param params
|
|
|
- * @return
|
|
|
+ * 监听连接关闭
|
|
|
*/
|
|
|
- public String sendPOSTRequest(String url, MultiValueMap<String, String> params) {
|
|
|
- RestTemplate client = new RestTemplate();
|
|
|
- HttpHeaders headers = new HttpHeaders();
|
|
|
- HttpMethod method = HttpMethod.POST;
|
|
|
- // 以表单的方式提交
|
|
|
- headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
|
|
- // 将请求头部和参数合成一个请求
|
|
|
- HttpEntity<MultiValueMap<String, String>> requestEntity = new HttpEntity<>(params, headers);
|
|
|
- // 执行HTTP请求,将返回的结构使用String类格式化
|
|
|
- ResponseEntity<String> response = client.exchange(url, method, requestEntity, String.class);
|
|
|
- return response.getBody();
|
|
|
+ @Override
|
|
|
+ public void connectionClosed(IOException cause) {
|
|
|
+ log.info("connectionClosed: " + dateBuffers.get(dateBuffers.keySet().iterator().next()).size());
|
|
|
+ saveLessThenBatchSizeData();
|
|
|
+ log.info(this.title + "J60870ClientListener ConnectionClosed :" + cause.getMessage());
|
|
|
+ log.info("Thread.activeCount :" + Thread.activeCount());
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
|