|
@@ -1,221 +0,0 @@
|
|
|
-package com.energy.online.data.service.listener;
|
|
|
-
|
|
|
-import com.energy.online.data.dto.BaseConfig;
|
|
|
-import com.energy.online.data.dto.Measurepoint;
|
|
|
-import com.energy.online.data.dto.WindFarm;
|
|
|
-import com.energy.online.data.utils.MyConfigUtils;
|
|
|
-import org.openmuc.j60870.ASdu;
|
|
|
-import org.openmuc.j60870.ConnectionEventListener;
|
|
|
-import org.openmuc.j60870.ie.InformationElement;
|
|
|
-import org.openmuc.j60870.ie.InformationObject;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import java.io.FileWriter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.nio.file.Files;
|
|
|
-import java.nio.file.Paths;
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
-import java.time.Instant;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-
|
|
|
-public class J60870ClientListener implements ConnectionEventListener {
|
|
|
-
|
|
|
- private static final Logger log = LoggerFactory.getLogger(J60870ClientListener.class);
|
|
|
-
|
|
|
- private final Map<Long, Map<Integer, Object>> dataMap;
|
|
|
- private final Map<Integer, Measurepoint> measurepointMap;
|
|
|
- private final WindFarm windFarm;
|
|
|
- private final BaseConfig baseConfig;
|
|
|
- private final List<String> standardList = new ArrayList<>();
|
|
|
-
|
|
|
- private final SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
- private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
|
|
|
-
|
|
|
- private static final int BATCH_SIZE = 1000;
|
|
|
- private final Map<String, List<Map<String, Object>>> dateBuffers = new ConcurrentHashMap<>();
|
|
|
- private final ExecutorService csvExecutor = Executors.newSingleThreadExecutor();
|
|
|
-
|
|
|
- public J60870ClientListener(Map<Long, Map<Integer, Object>> dataMap, WindFarm windFarm, BaseConfig baseConfig) {
|
|
|
- this.dataMap = dataMap;
|
|
|
- this.windFarm = windFarm;
|
|
|
- this.baseConfig = baseConfig;
|
|
|
- this.measurepointMap = MyConfigUtils.getEnableMeasurepoints(baseConfig.getMeasurepoints());
|
|
|
-
|
|
|
- this.standardList.add("time_stamp");
|
|
|
- this.standardList.add("wind_turbine_name");
|
|
|
- for (Measurepoint measurepoint : measurepointMap.values()) {
|
|
|
- this.standardList.add(measurepoint.getStanderName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public final String title = "104协议数据服务 - ";
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 监听从站发来的数据
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void newASdu(ASdu aSdu) {
|
|
|
- if (aSdu.getCommonAddress() == windFarm.getCoa()) {
|
|
|
- List<InformationObject> list = new ArrayList<>(Arrays.asList(aSdu.getInformationObjects()));
|
|
|
- // 获取当前时刻,验证是否是同一秒数据,数据进行合并
|
|
|
- Instant now = Instant.now();
|
|
|
- // 获取当前时刻的秒数
|
|
|
- long epochSecond = now.getEpochSecond();
|
|
|
- Long receiptTime = dataMap.keySet().isEmpty() ? null : dataMap.keySet().iterator().next();
|
|
|
- long receiptTimeLong = receiptTime == null ? 0 : receiptTime;
|
|
|
-
|
|
|
- // 当前秒时间大于数据库最新秒时间并且数据库不等于0
|
|
|
- if (epochSecond > receiptTimeLong && receiptTimeLong != 0) {
|
|
|
- // 保存临时表
|
|
|
- List<Map<String, Object>> resultDatas = transToStandarData(receiptTimeLong);
|
|
|
- asyncSaveToCsv(resultDatas, receiptTimeLong);
|
|
|
- }
|
|
|
- //添加新的数据
|
|
|
- saveMeasurementTemp(list, epochSecond);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<Map<String, Object>> transToStandarData(long receiptTimeLong) {
|
|
|
- List<Map<String, Object>> resultDatas = new ArrayList<>();
|
|
|
- Map<Integer, Object> integerObjectMap = dataMap.get(receiptTimeLong);
|
|
|
- if (null == integerObjectMap || integerObjectMap.isEmpty()) {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
-
|
|
|
- Map<Integer, Map<String, Object>> map = new HashMap<>();
|
|
|
- for (Map.Entry<Integer, Object> entry : integerObjectMap.entrySet()) {
|
|
|
- Integer ioa = entry.getKey();
|
|
|
- Integer windCode = ioa / baseConfig.getMeasurepointCount() + 1;
|
|
|
- String standardName = measurepointMap.get(ioa % baseConfig.getMeasurepointCount()).getStanderName();
|
|
|
-
|
|
|
- if (map.containsKey(windCode)) {
|
|
|
- map.get(windCode).put(standardName, entry.getValue());
|
|
|
- } else {
|
|
|
- Map<String, Object> windMap = new HashMap<>();
|
|
|
- windMap.put(standardName, entry.getValue());
|
|
|
- windMap.put("wind_turbine_name", windCode);
|
|
|
- windMap.put("time_stamp", dateTimeFormat.format(Date.from(Instant.ofEpochSecond(receiptTimeLong))));
|
|
|
- map.put(windCode, windMap);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for (Map<String, Object> data : map.values()) {
|
|
|
- Set<String> keys = data.keySet();
|
|
|
-
|
|
|
- standardList.forEach(point -> {
|
|
|
- if (!keys.contains(point)) {
|
|
|
- data.put(point, null);
|
|
|
- }
|
|
|
- });
|
|
|
- resultDatas.add(data);
|
|
|
- }
|
|
|
- // 清理一秒的数据
|
|
|
- dataMap.clear();
|
|
|
- return resultDatas;
|
|
|
- }
|
|
|
-
|
|
|
- private void asyncSaveToCsv(List<Map<String, Object>> resultDatas, long receiptTimeLong) {
|
|
|
- csvExecutor.execute(() -> {
|
|
|
- synchronized (dateBuffers) {
|
|
|
- String dateKey = dateFormat.format(Date.from(Instant.ofEpochSecond(receiptTimeLong)));
|
|
|
- if (dateBuffers.containsKey(dateKey)) {
|
|
|
- dateBuffers.get(dateKey).addAll(resultDatas);
|
|
|
- } else {
|
|
|
- saveLessThenBatchSizeData();
|
|
|
- dateBuffers.put(dateKey, resultDatas);
|
|
|
- }
|
|
|
- List<Map<String, Object>> buffer = dateBuffers.get(dateKey);
|
|
|
-
|
|
|
- if (buffer.size() >= BATCH_SIZE) {
|
|
|
- List<Map<String, Object>> toSave = new ArrayList<>(buffer.subList(0, BATCH_SIZE));
|
|
|
- buffer.subList(0, BATCH_SIZE).clear();
|
|
|
- flushToCsv(toSave, dateKey);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- private void saveLessThenBatchSizeData() {
|
|
|
- if (!dateBuffers.isEmpty()) {
|
|
|
- String key = dateBuffers.keySet().iterator().next();
|
|
|
- List<Map<String, Object>> buffer = new ArrayList<>(dateBuffers.get(key));
|
|
|
- flushToCsv(buffer, key);
|
|
|
- dateBuffers.clear();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void flushToCsv(List<Map<String, Object>> data, String dateStr) {
|
|
|
- log.info(windFarm.getWindFarmName() + "保存" + data.size() + "条数据,日期为:" + dateStr);
|
|
|
- String csvFile = String.format("%s_%s_%s.csv", windFarm.getWindFarmName(),
|
|
|
- windFarm.getWindFarmCode(), dateStr);
|
|
|
- boolean existsTitle = Files.exists(Paths.get(csvFile));
|
|
|
- synchronized (csvFile) {
|
|
|
- try (FileWriter writer = new FileWriter(csvFile, true)) {
|
|
|
- if (!data.isEmpty() && !existsTitle) {
|
|
|
- writer.append(String.join(",", standardList));
|
|
|
- writer.append('\n');
|
|
|
- }
|
|
|
-
|
|
|
- for (Map<String, Object> row : data) {
|
|
|
- List<String> values = new ArrayList<>();
|
|
|
- for (String standerName : standardList) {
|
|
|
- Object value = row.get(standerName);
|
|
|
- values.add(value != null ? value.toString() : "");
|
|
|
- }
|
|
|
- writer.append(String.join(",", values));
|
|
|
- writer.append('\n');
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- log.error(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static final String split = "Short float value:";
|
|
|
-
|
|
|
- private void saveMeasurementTemp(List<InformationObject> informationObjects, long epochSecond) {
|
|
|
- if (!dataMap.containsKey(epochSecond)) {
|
|
|
- dataMap.put(epochSecond, new HashMap<>());
|
|
|
- }
|
|
|
-
|
|
|
- for (InformationObject item : informationObjects) {
|
|
|
- int ioa = item.getInformationObjectAddress();
|
|
|
-
|
|
|
- if (measurepointMap.containsKey(ioa % baseConfig.getMeasurepointCount())) {
|
|
|
- // 一秒中多条数据整理
|
|
|
- InformationElement[][] valData = item.getInformationElements();
|
|
|
- if (valData.length > 0) {
|
|
|
- InformationElement[] informationElementSet;
|
|
|
- for (InformationElement[] valDatum : valData) {
|
|
|
- informationElementSet = valDatum;
|
|
|
- for (InformationElement informationElement : informationElementSet) {
|
|
|
- String dataStr = informationElement.toString();
|
|
|
- if (dataStr.contains(split)) {
|
|
|
- dataMap.get(epochSecond).put(item.getInformationObjectAddress(), dataStr.split(split)[1].trim());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 监听连接关闭
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void connectionClosed(IOException cause) {
|
|
|
- log.info("connectionClosed: " + dateBuffers.get(dateBuffers.keySet().iterator().next()).size());
|
|
|
- saveLessThenBatchSizeData();
|
|
|
- log.info(this.title + "J60870ClientListener ConnectionClosed :" + cause.getMessage());
|
|
|
- log.info("Thread.activeCount :" + Thread.activeCount());
|
|
|
- }
|
|
|
-}
|
|
|
-
|