Jelajahi Sumber

添加导出为文件系统-parquet文件

wzl 2 bulan lalu
induk
melakukan
86c4dc5a8d
38 mengubah file dengan 1692 tambahan dan 589 penghapusan
  1. 2 5
      src/main/java/com/znzn/project/dc/dtdata/common/CommonData.java
  2. 3 173
      src/main/java/com/znzn/project/dc/dtdata/config/WindDataInitializer.java
  3. 78 0
      src/main/java/com/znzn/project/dc/dtdata/controller/WindTurbineDataInfoController.java
  4. 29 16
      src/main/java/com/znzn/project/dc/dtdata/crontab/CollectionDataService.java
  5. 37 2
      src/main/java/com/znzn/project/dc/dtdata/crontab/FillDataService.java
  6. 104 104
      src/main/java/com/znzn/project/dc/dtdata/crontab/WindTurbinePartitionAutoService.java
  7. 3 2
      src/main/java/com/znzn/project/dc/dtdata/entity/ExecDataDate.java
  8. 11 2
      src/main/java/com/znzn/project/dc/dtdata/entity/ExecError.java
  9. 2 5
      src/main/java/com/znzn/project/dc/dtdata/entity/FileStore.java
  10. 3 3
      src/main/java/com/znzn/project/dc/dtdata/entity/WindFarm.java
  11. 3 2
      src/main/java/com/znzn/project/dc/dtdata/entity/WindFarmTables.java
  12. 3 2
      src/main/java/com/znzn/project/dc/dtdata/entity/WindPoints.java
  13. 2 2
      src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbine.java
  14. 161 6
      src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbineData.java
  15. 58 0
      src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbineDataInfo.java
  16. 7 2
      src/main/java/com/znzn/project/dc/dtdata/generate/CodeGenerateMain.java
  17. 21 4
      src/main/java/com/znzn/project/dc/dtdata/inOutEntity/CollectDataInData.java
  18. 37 0
      src/main/java/com/znzn/project/dc/dtdata/inOutEntity/CollectDataWithFileInData.java
  19. 18 0
      src/main/java/com/znzn/project/dc/dtdata/mapper/WindTurbineDataInfoMapper.java
  20. 2 2
      src/main/java/com/znzn/project/dc/dtdata/mapper/WindTurbineDataMapper.java
  21. 3 0
      src/main/java/com/znzn/project/dc/dtdata/service/IDataCollectionService.java
  22. 5 0
      src/main/java/com/znzn/project/dc/dtdata/service/IInitService.java
  23. 27 0
      src/main/java/com/znzn/project/dc/dtdata/service/IWindTurbineDataInfoService.java
  24. 1 1
      src/main/java/com/znzn/project/dc/dtdata/service/IWindTurbineDataService.java
  25. 175 144
      src/main/java/com/znzn/project/dc/dtdata/service/impl/DataCollectionService.java
  26. 398 0
      src/main/java/com/znzn/project/dc/dtdata/service/impl/DataCollectionService_bak.java
  27. 2 2
      src/main/java/com/znzn/project/dc/dtdata/service/impl/ExecErrorServiceImpl.java
  28. 190 0
      src/main/java/com/znzn/project/dc/dtdata/service/impl/InitServiceImpl.java
  29. 3 2
      src/main/java/com/znzn/project/dc/dtdata/service/impl/WindPointsServiceImpl.java
  30. 78 0
      src/main/java/com/znzn/project/dc/dtdata/service/impl/WindTurbineDataInfoServiceImpl.java
  31. 2 2
      src/main/java/com/znzn/project/dc/dtdata/service/impl/WindTurbineDataServiceImpl.java
  32. 26 0
      src/main/java/com/znzn/project/dc/dtdata/utils/enos/DateUtils.java
  33. 4 0
      src/main/java/com/znzn/project/dc/dtdata/utils/enos/EnosapiUtils.java
  34. 0 103
      src/main/java/com/znzn/project/dc/dtdata/utils/enos/JsonToParquetUtils.java
  35. 186 0
      src/main/java/com/znzn/project/dc/dtdata/utils/enos/ParquetUtils.java
  36. 2 2
      src/main/resources/application.properties
  37. 5 0
      src/main/resources/mapper/WindTurbineDataInfoMapper.xml
  38. 1 1
      src/test/java/com/znzn/project/dc/dtdata/DatangEnosDataApplicationTests.java

+ 2 - 5
src/main/java/com/znzn/project/dc/dtdata/common/CommonData.java

@@ -3,17 +3,14 @@ package com.znzn.project.dc.dtdata.common;
 import com.znzn.project.dc.dtdata.entity.FileStore;
 import com.znzn.project.dc.dtdata.entity.WindTurbine;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class CommonData {
 
     /**
      * 逻辑测点字段
      */
-    public static final Map<String, List<String>> pointIdsWithLogicMap = new HashMap<>();
+    public static final Map<String, HashSet<String>> pointIdsWithLogicMap = new HashMap<>();
 
     /**
      * 标准化映射字典

+ 3 - 173
src/main/java/com/znzn/project/dc/dtdata/config/WindDataInitializer.java

@@ -1,193 +1,23 @@
 package com.znzn.project.dc.dtdata.config;
 
-import com.znzn.project.dc.dtdata.common.CommonData;
-import com.znzn.project.dc.dtdata.entity.FileStore;
-import com.znzn.project.dc.dtdata.entity.WindFarmTables;
-import com.znzn.project.dc.dtdata.entity.WindPoints;
-import com.znzn.project.dc.dtdata.entity.WindTurbine;
-import com.znzn.project.dc.dtdata.service.*;
-import com.znzn.project.dc.dtdata.utils.enos.EnosapiUtils;
-import com.znzn.project.dc.dtdata.utils.enos.entity.AccessibleAsset;
+import com.znzn.project.dc.dtdata.service.IInitService;
 import jakarta.annotation.Resource;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.event.EventListener;
-import org.springframework.core.env.Environment;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.stereotype.Component;
 
-import java.util.*;
-import java.util.stream.Collectors;
-
 @Component
 @Slf4j
 @EnableAsync
 public class WindDataInitializer {
 
-    private static final String FILE_STORE_ID_ENV_VAR = "FILE_STORE_ID";
-    @Resource
-    private IWindPointsService windPointsService;
-    @Resource
-    private IFileStoreService fileStoreService;
-    @Resource
-    private IWindTurbineService windTurbineService;
-    @Resource
-    private Environment environment;
     @Resource
-    private IWindFarmService windFarmService;
-    @Resource
-    private IWindFarmTablesService windFarmTablesService;
+    private IInitService iInitService;
 
     @EventListener(ApplicationReadyEvent.class)
     public void initializeWindData() throws Exception {
-        processFileStoreData();
-
-        if ("1".equals(environment.getProperty("NEED_RESTORE"))) {
-            //删除风机信息
-            log.info("开始删除风机数据");
-            windTurbineService.deleteByFileStoreId(CommonData.FILE_STORE.getId());
-            //重新写入风机信息
-            log.info("开始写入风机数据");
-            rereadWindTurbineTable();
-        }
-        log.info("写入风机数据完成");
-        // 应用启动时预加载所有数据
-        processWindPointsData();
-        log.info("风场测点数据初始化完成,需要执行的风机数为{}", CommonData.NEED_RUN_TURBINE.size());
-
-        log.info("开始查询风场机型表名映射");
-        processWindFarmTables();
-        log.info("查询风场机型表名映射完成,风场+机型数量为{}", CommonData.TABLE_MAP.size());
-    }
-
-    private void processWindFarmTables() {
-
-        List<WindFarmTables> tables = windFarmTablesService.list();
-        Map<String, String> tableMaps = tables.stream().collect(Collectors.toMap(data -> String.format("%s%s", data.getWindFarmId(), data.getType()), WindFarmTables::getTableName));
-        CommonData.TABLE_MAP.putAll(tableMaps);
-    }
-
-    @SneakyThrows
-    private void rereadWindTurbineTable() {
-        List<String> farmTypeList = windFarmService.getByFileStoreId(CommonData.FILE_STORE.getId()).stream().map(data -> String.join("--", data.getId(), data.getType())).toList();
-        log.info("本机风场+机型数量:{}", farmTypeList.size());
-        List<AccessibleAsset> assetList;
-        try {
-            assetList = EnosapiUtils.queryAccessibleAsset("EnOS_Wind_Turbine");
-        } catch (Exception e) {
-            Thread.sleep(200);
-            assetList = EnosapiUtils.queryAccessibleAsset("EnOS_Wind_Turbine");
-        }
-        if (assetList == null) {
-            System.exit(0);
-        }
-        List<WindTurbine> windTurbineList = new ArrayList<>();
-        for (AccessibleAsset asset : assetList) {
-            String key = String.join("--", asset.getAttributes().getParentId(), asset.getAttributes().getTurbineTypeID());
-            if (farmTypeList.contains(key)) {
-                windTurbineList.add(getWindTurbine(asset));
-            }
-        }
-        if (windTurbineList.isEmpty()) {
-            Thread.sleep(100);
-            rereadWindTurbineTable();
-        }
-        windTurbineService.saveBatch(windTurbineList, 5000);
-    }
-
-    private static WindTurbine getWindTurbine(AccessibleAsset asset) {
-        WindTurbine windTurbine = new WindTurbine();
-        windTurbine.setId(asset.getMdmId());
-        windTurbine.setCode(asset.getAttributes().getScadaName());
-        windTurbine.setName(asset.getAttributes().getName());
-        windTurbine.setType(asset.getAttributes().getTurbineTypeID());
-        windTurbine.setWindFarmId(asset.getAttributes().getParentId());
-        windTurbine.setPlcVersion(asset.getAttributes().getPLCVersion());
-        windTurbine.setTypeName(asset.getAttributes().getManufacturer());
-        return windTurbine;
-    }
-
-    private void processFileStoreData() throws Exception {
-        FileStore fileStore = fileStoreService.getById(getFileStoreIdFromEnv());
-        log.info("获取到本机的存储配置{}", fileStore.toString());
-        CommonData.FILE_STORE = fileStore;
+        iInitService.init();
     }
-
-    private void processWindPointsData() {
-        windTurbineService.getByFileStoreId(CommonData.FILE_STORE.getId()).forEach(windTurbine -> CommonData.NEED_RUN_TURBINE.put(windTurbine.getId(), windTurbine));
-        log.info("本机需要运行的风机数量{}", CommonData.NEED_RUN_TURBINE.size());
-        Map<String, List<WindPoints>> groupedByKey = new HashMap<>();
-        Map<String, List<String>> pointIdsMap = new HashMap<>();
-        Map<String, Map<String, Set<String>>> standerMaps = new HashMap<>();
-
-        List<WindPoints> windPointsList = windPointsService.list();
-        for (WindPoints data : windPointsList) {
-            // 生成分组key
-            String key = data.getWindFarmId() + data.getWindType();
-
-            if (!groupedByKey.containsKey(key)) {
-                groupedByKey.put(key, new ArrayList<>());
-            }
-            groupedByKey.get(key).add(data);
-
-            if (!pointIdsMap.containsKey(key)) {
-                pointIdsMap.put(key, new ArrayList<>());
-            }
-            String pointId = String.format("%s(%s)",
-                    data.getUseTimeAggMethods(),
-                    data.getPointNameEn());
-            pointIdsMap.get(key).add(pointId);
-
-            if (!standerMaps.containsKey(key)) {
-                standerMaps.put(key, new HashMap<>());
-                Set<String> set = new HashSet<>();
-                set.add("localtime");
-                standerMaps.get(key).put("localtime", set);
-            }
-
-            String pointKey = String.format("%s(%s)",
-                    data.getUseTimeAggMethods(),
-                    data.getPointNameEn());
-            Map<String, Set<String>> innerMap = standerMaps.get(key);
-
-            if (!innerMap.containsKey(pointKey)) {
-                Set<String> set = new HashSet<>();
-                set.add(data.getStanderNameEn());
-                innerMap.put(pointKey, set);
-            } else {
-                Set<String> set = innerMap.get(pointKey);
-                set.add(data.getStanderNameEn());
-            }
-        }
-
-        CommonData.pointIdsWithLogicMap.putAll(pointIdsMap);
-        CommonData.standerMap.putAll(standerMaps);
-    }
-
-    private String getFileStoreIdFromEnv() throws Exception {
-
-        log.info("从系统环境变量获取 FILE_STORE_ID: {}, 从Spring Environment获取 FILE_STORE_ID: {}", System.getenv(FILE_STORE_ID_ENV_VAR), environment.getProperty(FILE_STORE_ID_ENV_VAR));
-//
-//        if (1 == 1) {
-//            return "1";
-//        }
-
-        String fileStoreId = System.getenv(FILE_STORE_ID_ENV_VAR);
-        if (StringUtils.isNotBlank(fileStoreId)) {
-            log.info("从系统环境变量获取 FILE_STORE_ID: {}", fileStoreId);
-            return fileStoreId.trim();
-        }
-
-        fileStoreId = environment.getProperty(FILE_STORE_ID_ENV_VAR);
-        if (StringUtils.isNotBlank(fileStoreId)) {
-            log.info("从Spring Environment获取 FILE_STORE_ID: {}", fileStoreId);
-            return fileStoreId.trim();
-        }
-
-        log.warn("所有方式都未获取到有效的 FILE_STORE_ID");
-        throw new Exception("所有方式都未获取到有效的 FILE_STORE_ID");
-    }
-
 }

+ 78 - 0
src/main/java/com/znzn/project/dc/dtdata/controller/WindTurbineDataInfoController.java

@@ -0,0 +1,78 @@
+package com.znzn.project.dc.dtdata.controller;
+
+import com.znzn.project.dc.dtdata.common.R;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataWithFileInData;
+import com.znzn.project.dc.dtdata.service.IDataCollectionService;
+import com.znzn.project.dc.dtdata.service.IWindTurbineDataInfoService;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import jakarta.annotation.Resource;
+import jakarta.validation.Valid;
+import org.springframework.core.io.ByteArrayResource;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * <p>
+ * 前端控制器
+ * </p>
+ *
+ * @author 魏志亮
+ * @since 2025-11-10
+ */
+@RestController
+@RequestMapping("/windTurbineDataInfo")
+@Tag(name = "风机数据信息", description = "风机数据相关接口")
+public class WindTurbineDataInfoController {
+
+    @Resource
+    private IWindTurbineDataInfoService windTurbineDataInfoService;
+    @Resource
+    private IDataCollectionService dataCollectionService;
+
+//    @PostMapping("getEveryWindAndTypeData")
+//    @Operation(summary = "查询每个风场+机型的风机数据", description = "查询每个风场+机型的风机数据11")
+//    public R<String> getEveryWindAndTypeData() {
+//        windTurbineDataInfoService.getEveryWindAndTypeData();
+//        return R.ok();
+//    }
+
+
+    @PostMapping("collectData")
+    @Operation(summary = "收集数据", description = "收集数据")
+    public R<String> collectData(@RequestBody CollectDataInData inData) {
+        dataCollectionService.collectData(inData);
+        return R.ok();
+    }
+
+    @PostMapping("collectDataWithFile")
+    @Operation(summary = "收集数据下载", description = "收集数据下载")
+    public ResponseEntity<org.springframework.core.io.Resource> collectDataWithFile(@RequestBody @Valid CollectDataWithFileInData inData) {
+        byte[] csvBytes = dataCollectionService.collectDataWithFile(inData);
+        ByteArrayResource resource = new ByteArrayResource(csvBytes);
+
+        // 4. 构建响应头(设置下载文件名、文件类型等)
+        HttpHeaders headers = new HttpHeaders();
+        String fileName = "数据导出_" + System.currentTimeMillis() + ".csv";
+        // 处理中文文件名编码,避免浏览器显示乱码
+        headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" +
+                java.net.URLEncoder.encode(fileName, StandardCharsets.UTF_8) + "\"");
+        headers.add(HttpHeaders.CONTENT_TYPE, "text/csv; charset=UTF-8");
+
+        // 5. 构建并返回响应实体
+        return ResponseEntity.ok()
+                .headers(headers)
+                .contentLength(csvBytes.length)
+                .contentType(MediaType.parseMediaType("text/csv; charset=UTF-8"))
+                .body(resource);
+
+    }
+}

+ 29 - 16
src/main/java/com/znzn/project/dc/dtdata/crontab/CollectionDataService.java

@@ -2,36 +2,50 @@ package com.znzn.project.dc.dtdata.crontab;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.znzn.project.dc.dtdata.entity.WindTurbineDataInfo;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
+import com.znzn.project.dc.dtdata.service.IInitService;
 import com.znzn.project.dc.dtdata.service.IWindTurbineDataInfoService;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
 
+import static com.znzn.project.dc.dtdata.utils.enos.DateUtils.DATE_FORMATTER_WITH_ZERO;
+
 @Service
+@Slf4j
 public class CollectionDataService {
 
     @Resource
     private IWindTurbineDataInfoService windTurbineDataInfoService;
 
+    @Resource
+    private IInitService iInitService;
+
     /**
-     * 每小时0分10秒执行
+     * 每小时0分5秒执行
      * cron表达式:10 0 * * * ? (秒 分 时 日 月 周)
      */
-    @Scheduled(cron = "10 0 * * * ?")
-    public void collectionDataEveryHour() {
-        LocalDateTime now = LocalDateTime.now();
-        // 重新动态获取风机
-        if (now.getHour() == 0) {
-
-        }
-
-        //获取前一小时数据
-
-
-        System.out.println(LocalDateTime.now());
-    }
+//    @Scheduled(cron = "5 0 0 * * ?")
+//    public void collectionDataEveryHour() {
+//        LocalDateTime now = LocalDateTime.now();
+//        // 重新动态获取风机
+//        if (now.getHour() == 0) {
+//            iInitService.init();
+//            log.info("每日重新获取风机成功");
+//        }
+//
+//        CollectDataInData inData = new CollectDataInData();
+//        LocalDateTime today = LocalDateTime.now();
+//        LocalDateTime lastDay = today.minusDays(1);
+//        inData.setBeginTime(lastDay.format(DATE_FORMATTER_WITH_ZERO));
+//        inData.setEndTime(today.format(DATE_FORMATTER_WITH_ZERO));
+//        inData.setMinute(1440);
+//
+//        log.info("每日执行{}的数据", lastDay.format(DATE_FORMATTER_WITH_ZERO));
+//    }
 
     /**
      * 每小时0分0秒执行
@@ -39,12 +53,11 @@ public class CollectionDataService {
      */
     @Scheduled(cron = "0 0 * * * ?")
     public void deleleOneYearAgeInfo() {
+        log.info("开始执行删除超过一年的数据");
         LocalDateTime now = LocalDateTime.now();
         LocalDateTime lastYear = now.minusYears(1);
-
         LambdaQueryWrapper<WindTurbineDataInfo> query = new LambdaQueryWrapper<>();
         query.lt(WindTurbineDataInfo::getDataTime, lastYear);
         windTurbineDataInfoService.remove(query);
-        System.out.println(LocalDateTime.now());
     }
 }

+ 37 - 2
src/main/java/com/znzn/project/dc/dtdata/crontab/FillDataService.java

@@ -1,26 +1,61 @@
 package com.znzn.project.dc.dtdata.crontab;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.znzn.project.dc.dtdata.common.CommonData;
 import com.znzn.project.dc.dtdata.entity.ExecError;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
+import com.znzn.project.dc.dtdata.service.IDataCollectionService;
 import com.znzn.project.dc.dtdata.service.IExecErrorService;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import static com.znzn.project.dc.dtdata.utils.enos.DateUtils.DATE_FORMATTER;
+
 @Service
 @Slf4j
 public class FillDataService {
 
     @Resource
     private IExecErrorService execErrorService;
+    @Resource
+    private IDataCollectionService dataCollectionService;
+
 
     /**
+     * 重新获取失败的数据
      * 每天凌晨2点0分0秒执行
      * cron表达式:0 0 2 * * ? (秒 分 时 日 月 周)
      */
-    @Scheduled(cron = "0 0 2 * * ?")
-    public void fileData() {
+    @Scheduled(cron = "0 0 */1 * * ?")
+    public void reCollectErrorData() {
+        log.info("开始执行重新获取失败数据");
         LambdaQueryWrapper<ExecError> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.eq(ExecError::getErrorCode, 2)
+                .eq(ExecError::getStoreId, CommonData.FILE_STORE.getId())
+                .lt(ExecError::getRetryTimes, 3)
+                .gt(ExecError::getQueryBegin,
+                        new Date(System.currentTimeMillis() - 30 * 3 * 24 * 60 * 60 * 1000L));
+        queryWrapper.orderByAsc(ExecError::getQueryBegin);
+        queryWrapper.last(" limit 100");
+        List<ExecError> list = execErrorService.list(queryWrapper);
+
+        for (ExecError execError : list) {
+            log.info("开始执行重新获取失败数据:{}", execError);
+            CollectDataInData inData = new CollectDataInData();
+            inData.setBeginTime(execError.getQueryBegin().format(DATE_FORMATTER));
+            inData.setEndTime(execError.getQueryEnd().format(DATE_FORMATTER));
+            inData.setMinute((int) ChronoUnit.MINUTES.between(inData.getBeginTime(), inData.getEndTime()));
+            inData.setWindTurbineIdList(Collections.singletonList(execError.getWindTurbineId()));
+            inData.setExecErrorId(execError.getId());
+
+            dataCollectionService.collectData(inData);
+        }
     }
 }

+ 104 - 104
src/main/java/com/znzn/project/dc/dtdata/crontab/WindTurbinePartitionAutoService.java

@@ -1,104 +1,104 @@
-package com.znzn.project.dc.dtdata.crontab;
-
-import com.znzn.project.dc.dtdata.mapper.WindTurbineDataMapper;
-import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Service;
-
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-
-/**
- * 风机数据分区自动管理服务
- */
-@Service
-@Slf4j
-public class WindTurbinePartitionAutoService {
-
-    @Resource
-    private WindTurbineDataMapper windTurbineDataMapper;
-
-    // 日期格式化器(线程安全,避免重复创建)
-    private static final SimpleDateFormat DATE_FORMAT_YYYYMM = new SimpleDateFormat("yyyyMM");
-    private static final SimpleDateFormat DATE_FORMAT_YYYY_MM_DD = new SimpleDateFormat("yyyy-MM-dd");
-
-    /**
-     * 定时执行分区管理:每月1日凌晨2点执行(创建未来2个月分区,删除12个月前过期分区)
-     * cron表达式:0 0 2 1 * ? (秒 分 时 日 月 周)
-     */
-    @Scheduled(cron = "0 0 2 1 * ?")
-//    @Scheduled(cron = "0 4 * * * ?")
-    public void autoManagePartition() {
-        // 1. 计算未来分区参数(创建下下个月、下下下个月,确保提前预留)
-        createFuturePartitions();
-
-        // 2. 计算过期分区参数(删除12个月前的分区,保留12个月数据)
-        dropExpiredPartitions();
-    }
-
-    /**
-     * 创建未来2个月的分区
-     */
-    private void createFuturePartitions() {
-        Calendar calendar = Calendar.getInstance();
-        // 循环创建:下1个月、下2个月(提前预留,避免写入失败)
-        for (int i = 1; i <= 2; i++) {
-            // 计算当前+i个月的日期
-            calendar.add(Calendar.MONTH, i);
-            Date targetMonth = calendar.getTime();
-
-            // 构造分区名称(如:p202611)
-            String partitionName = "p" + DATE_FORMAT_YYYYMM.format(targetMonth);
-
-            // ******** 核心新增:先校验分区是否存在 ********
-            int partitionExist = windTurbineDataMapper.checkPartitionExists(partitionName);
-            if (partitionExist == 1) {
-                log.info("分区已存在,无需重复创建:{}", partitionName);
-                calendar.add(Calendar.MONTH, -i);
-                continue;
-            }
-
-            // 构造分区截止日期(当前+i个月的下一个月1日,如:2026-11-01 → 2026-12-01)
-            Calendar endDateCalendar = (Calendar) calendar.clone();
-            endDateCalendar.add(Calendar.MONTH, 1);
-            endDateCalendar.set(Calendar.DAY_OF_MONTH, 1);
-            String partitionEndDate = DATE_FORMAT_YYYY_MM_DD.format(endDateCalendar.getTime());
-
-            try {
-                windTurbineDataMapper.addMonthlyPartition(partitionName, partitionEndDate);
-                log.info("分区创建成功:{},截止日期:{}", partitionName, partitionEndDate);
-            } catch (Exception e) {
-                log.error("分区创建失败:{},异常信息:{}", partitionName, e.getMessage());
-            }
-
-            // 重置calendar,避免循环叠加错误
-            calendar.add(Calendar.MONTH, -i);
-        }
-    }
-
-    /**
-     * 删除12个月前的过期分区
-     */
-    private void dropExpiredPartitions() {
-        Calendar calendar = Calendar.getInstance();
-        // 计算12个月前的日期
-        calendar.add(Calendar.MONTH, -12);
-        Date expiredMonth = calendar.getTime();
-
-        // 构造过期分区名称(如:p202510)
-        String expiredPartitionName = "p" + DATE_FORMAT_YYYYMM.format(expiredMonth);
-
-        try {
-            int partitionExist = windTurbineDataMapper.checkPartitionExists(expiredPartitionName);
-            if (partitionExist == 1) {
-                windTurbineDataMapper.dropExpiredPartition(expiredPartitionName);
-                log.info("过期分区删除成功:{}", expiredPartitionName);
-            }
-            log.info("过期分区不存在:{}", expiredPartitionName);
-        } catch (Exception e) {
-            log.error("过期分区删除失败:{},异常信息:{}", expiredPartitionName, e.getMessage());
-        }
-    }
-}
+//package com.znzn.project.dc.dtdata.crontab;
+//
+//import com.znzn.project.dc.dtdata.mapper.WindTurbineDataMapper;
+//import jakarta.annotation.Resource;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Service;
+//
+//import java.text.SimpleDateFormat;
+//import java.util.Calendar;
+//import java.util.Date;
+//
+///**
+// * 风机数据分区自动管理服务
+// */
+//@Service
+//@Slf4j
+//public class WindTurbinePartitionAutoService {
+//
+//    @Resource
+//    private WindTurbineDataMapper windTurbineDataMapper;
+//
+//    // 日期格式化器(线程安全,避免重复创建)
+//    private static final SimpleDateFormat DATE_FORMAT_YYYYMM = new SimpleDateFormat("yyyyMM");
+//    private static final SimpleDateFormat DATE_FORMAT_YYYY_MM_DD = new SimpleDateFormat("yyyy-MM-dd");
+//
+//    /**
+//     * 定时执行分区管理:每月1日凌晨2点执行(创建未来2个月分区,删除12个月前过期分区)
+//     * cron表达式:0 0 2 1 * ? (秒 分 时 日 月 周)
+//     */
+//    @Scheduled(cron = "0 0 2 1 * ?")
+////    @Scheduled(cron = "0 4 * * * ?")
+//    public void autoManagePartition() {
+//        // 1. 计算未来分区参数(创建下下个月、下下下个月,确保提前预留)
+//        createFuturePartitions();
+//
+//        // 2. 计算过期分区参数(删除12个月前的分区,保留12个月数据)
+//        dropExpiredPartitions();
+//    }
+//
+//    /**
+//     * 创建未来2个月的分区
+//     */
+//    private void createFuturePartitions() {
+//        Calendar calendar = Calendar.getInstance();
+//        // 循环创建:下1个月、下2个月(提前预留,避免写入失败)
+//        for (int i = 1; i <= 2; i++) {
+//            // 计算当前+i个月的日期
+//            calendar.add(Calendar.MONTH, i);
+//            Date targetMonth = calendar.getTime();
+//
+//            // 构造分区名称(如:p202611)
+//            String partitionName = "p" + DATE_FORMAT_YYYYMM.format(targetMonth);
+//
+//            // ******** 核心新增:先校验分区是否存在 ********
+//            int partitionExist = windTurbineDataMapper.checkPartitionExists(partitionName);
+//            if (partitionExist == 1) {
+//                log.info("分区已存在,无需重复创建:{}", partitionName);
+//                calendar.add(Calendar.MONTH, -i);
+//                continue;
+//            }
+//
+//            // 构造分区截止日期(当前+i个月的下一个月1日,如:2026-11-01 → 2026-12-01)
+//            Calendar endDateCalendar = (Calendar) calendar.clone();
+//            endDateCalendar.add(Calendar.MONTH, 1);
+//            endDateCalendar.set(Calendar.DAY_OF_MONTH, 1);
+//            String partitionEndDate = DATE_FORMAT_YYYY_MM_DD.format(endDateCalendar.getTime());
+//
+//            try {
+//                windTurbineDataMapper.addMonthlyPartition(partitionName, partitionEndDate);
+//                log.info("分区创建成功:{},截止日期:{}", partitionName, partitionEndDate);
+//            } catch (Exception e) {
+//                log.error("分区创建失败:{},异常信息:{}", partitionName, e.getMessage());
+//            }
+//
+//            // 重置calendar,避免循环叠加错误
+//            calendar.add(Calendar.MONTH, -i);
+//        }
+//    }
+//
+//    /**
+//     * 删除12个月前的过期分区
+//     */
+//    private void dropExpiredPartitions() {
+//        Calendar calendar = Calendar.getInstance();
+//        // 计算12个月前的日期
+//        calendar.add(Calendar.MONTH, -12);
+//        Date expiredMonth = calendar.getTime();
+//
+//        // 构造过期分区名称(如:p202510)
+//        String expiredPartitionName = "p" + DATE_FORMAT_YYYYMM.format(expiredMonth);
+//
+//        try {
+//            int partitionExist = windTurbineDataMapper.checkPartitionExists(expiredPartitionName);
+//            if (partitionExist == 1) {
+//                windTurbineDataMapper.dropExpiredPartition(expiredPartitionName);
+//                log.info("过期分区删除成功:{}", expiredPartitionName);
+//            }
+//            log.info("过期分区不存在:{}", expiredPartitionName);
+//        } catch (Exception e) {
+//            log.error("过期分区删除失败:{},异常信息:{}", expiredPartitionName, e.getMessage());
+//        }
+//    }
+//}

+ 3 - 2
src/main/java/com/znzn/project/dc/dtdata/entity/ExecDataDate.java

@@ -9,6 +9,7 @@ import lombok.Setter;
 
 import java.io.Serializable;
 import java.time.LocalDateTime;
+import java.util.Date;
 
 /**
  * <p>
@@ -33,8 +34,8 @@ public class ExecDataDate implements Serializable {
     private LocalDateTime execDateTime;
 
     @Schema(description = "创建时间")
-    private LocalDateTime createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDateTime updateTime;
+    private Date updateTime;
 }

+ 11 - 2
src/main/java/com/znzn/project/dc/dtdata/entity/ExecError.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -19,8 +20,7 @@ import java.time.LocalDateTime;
  * @author 魏志亮
  * @since 2025-11-25
  */
-@Getter
-@Setter
+@Data
 @TableName("exec_error")
 @Schema(name = "ExecError", description = "")
 public class ExecError implements Serializable {
@@ -36,15 +36,24 @@ public class ExecError implements Serializable {
     @Schema(description = "风机名称")
     private String windTurbineName;
 
+    @Schema(description = "风机保存服务器ID")
+    private Integer storeId;
+
     @Schema(description = "查询开始日期")
     private LocalDateTime queryBegin;
 
     @Schema(description = "查询结束日期")
     private LocalDateTime queryEnd;
 
+    @Schema(description = "0:成功 1:无数据(成功) 2:其他原因失败)")
+    private Integer errorCode;
+
     @Schema(description = "报错信息")
     private String errorMsg;
 
+    @Schema(description = "重试次数")
+    private Integer retryTimes;
+
     @Schema(description = "创建时间")
     private LocalDate createTime;
 

+ 2 - 5
src/main/java/com/znzn/project/dc/dtdata/entity/FileStore.java

@@ -5,11 +5,8 @@ import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
 
 import java.io.Serializable;
-import java.time.LocalDate;
 import java.util.Date;
 
 /**
@@ -41,8 +38,8 @@ public class FileStore implements Serializable {
     private String savaDir;
 
     @Schema(description = "创建时间")
-    private LocalDate createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDate updateTime;
+    private Date updateTime;
 }

+ 3 - 3
src/main/java/com/znzn/project/dc/dtdata/entity/WindFarm.java

@@ -6,7 +6,7 @@ import lombok.Getter;
 import lombok.Setter;
 
 import java.io.Serializable;
-import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.Date;
 
 /**
@@ -41,8 +41,8 @@ public class WindFarm implements Serializable {
     private Integer storeId;
 
     @Schema(description = "创建时间")
-    private LocalDate createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDate updateTime;
+    private Date updateTime;
 }

+ 3 - 2
src/main/java/com/znzn/project/dc/dtdata/entity/WindFarmTables.java

@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import java.io.Serializable;
 import java.time.LocalDate;
+import java.util.Date;
 
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Getter;
@@ -39,8 +40,8 @@ public class WindFarmTables implements Serializable {
     private String tableName;
 
     @Schema(description = "创建时间")
-    private LocalDate createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDate updateTime;
+    private Date updateTime;
 }

+ 3 - 2
src/main/java/com/znzn/project/dc/dtdata/entity/WindPoints.java

@@ -7,6 +7,7 @@ import lombok.Setter;
 
 import java.io.Serializable;
 import java.time.LocalDate;
+import java.util.Date;
 
 /**
  * <p>
@@ -58,9 +59,9 @@ public class WindPoints implements Serializable {
     private String useTimeAggMethods;
 
     @Schema(description = "创建时间")
-    private LocalDate createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDate updateTime;
+    private Date updateTime;
 
 }

+ 2 - 2
src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbine.java

@@ -47,8 +47,8 @@ public class WindTurbine implements Serializable {
     private String plcVersion;
 
     @Schema(description = "创建时间")
-    private LocalDate createTime;
+    private Date createTime;
 
     @Schema(description = "最近更新时间")
-    private LocalDate updateTime;
+    private Date updateTime;
 }

+ 161 - 6
src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbineData.java

@@ -1,452 +1,607 @@
 package com.znzn.project.dc.dtdata.entity;
 
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.util.Date;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Getter;
 import lombok.Setter;
 
-import java.io.Serializable;
-import java.util.Date;
-
 /**
  * <p>
  * 风机运行时序核心数据表(FLOAT类型/无风水机型/按月分区)
  * </p>
  *
  * @author 魏志亮
- * @since 2026-01-09
+ * @since 2026-01-13
  */
 @Getter
 @Setter
 @TableName("wind_turbine_data")
-@Schema(name = "WindTurbineData", description = "风机运行时序核心数据表")
+@Schema(name = "WindTurbineData", description = "风机运行时序核心数据表(FLOAT类型/无风水机型/按月分区)")
 public class WindTurbineData implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     @Schema(description = "数据采集时间")
+    @TableField("data_time")
     private Date dataTime;
 
-    @Schema(description = "风机唯一ID")
+    @Schema(description = "风机唯一ID(字符串类型,长度10)")
+    @TableField("turbine_id")
     private String turbineId;
 
     @Schema(description = "数据入库创建时间(自动填充当前时间)")
+    @TableField("create_time")
     private Date createTime;
 
     @Schema(description = "偏航角度")
+    @TableField("yaw_ang")
     private Double yawAng;
 
     @Schema(description = "扭缆角度")
+    @TableField("twist_ang")
     private Double twistAng;
 
     @Schema(description = "偏航误差")
+    @TableField("yaw_err")
     private Double yawErr;
 
     @Schema(description = "风向")
+    @TableField("wind_dir")
     private Double windDir;
 
     @Schema(description = "对风角度")
+    @TableField("yaw_to_wind_ang")
     private Double yawToWindAng;
 
     @Schema(description = "总偏航次数")
+    @TableField("total_yaw_cnt")
     private Double totalYawCnt;
 
     @Schema(description = "顺时针偏航状态(0-正常,1-工作,2-异常)")
+    @TableField("cw_yaw_sts")
     private Double cwYawSts;
 
     @Schema(description = "逆时针偏航状态(0-正常,1-工作,2-异常)")
+    @TableField("ccw_yaw_sts")
     private Double ccwYawSts;
 
     @Schema(description = "偏航系统压力")
+    @TableField("yaw_sys_prs")
     private Double yawSysPrs;
 
     @Schema(description = "偏航刹车压力")
+    @TableField("yaw_brk_prs")
     private Double yawBrkPrs;
 
     @Schema(description = "桨角设定值1")
+    @TableField("pitch_ang_set_1")
     private Double pitchAngSet1;
 
     @Schema(description = "桨角设定值2")
+    @TableField("pitch_ang_set_2")
     private Double pitchAngSet2;
 
     @Schema(description = "桨角设定值3")
+    @TableField("pitch_ang_set_3")
     private Double pitchAngSet3;
 
     @Schema(description = "桨角实际值1")
+    @TableField("pitch_ang_act_1")
     private Double pitchAngAct1;
 
     @Schema(description = "桨角实际值2")
+    @TableField("pitch_ang_act_2")
     private Double pitchAngAct2;
 
     @Schema(description = "桨角实际值3")
+    @TableField("pitch_ang_act_3")
     private Double pitchAngAct3;
 
     @Schema(description = "变桨速度1")
+    @TableField("pitch_spd_1")
     private Double pitchSpd1;
 
     @Schema(description = "变桨速度2")
+    @TableField("pitch_spd_2")
     private Double pitchSpd2;
 
     @Schema(description = "变桨速度3")
+    @TableField("pitch_spd_3")
     private Double pitchSpd3;
 
     @Schema(description = "变桨电机电流1")
+    @TableField("pitch_motor_cur_1")
     private Double pitchMotorCur1;
 
     @Schema(description = "变桨电机电流2")
+    @TableField("pitch_motor_cur_2")
     private Double pitchMotorCur2;
 
     @Schema(description = "变桨电机电流3")
+    @TableField("pitch_motor_cur_3")
     private Double pitchMotorCur3;
 
     @Schema(description = "变桨电机温度1")
+    @TableField("pitch_motor_temp_1")
     private Double pitchMotorTemp1;
 
     @Schema(description = "变桨电机温度2")
+    @TableField("pitch_motor_temp_2")
     private Double pitchMotorTemp2;
 
     @Schema(description = "变桨电机温度3")
+    @TableField("pitch_motor_temp_3")
     private Double pitchMotorTemp3;
 
     @Schema(description = "变桨控制柜温度1")
+    @TableField("pitch_cab_temp_1")
     private Double pitchCabTemp1;
 
     @Schema(description = "变桨控制柜温度2")
+    @TableField("pitch_cab_temp_2")
     private Double pitchCabTemp2;
 
     @Schema(description = "变桨控制柜温度3")
+    @TableField("pitch_cab_temp_3")
     private Double pitchCabTemp3;
 
     @Schema(description = "变桨电池柜温度1")
+    @TableField("pitch_bat_temp_1")
     private Double pitchBatTemp1;
 
     @Schema(description = "变桨电池柜温度2")
+    @TableField("pitch_bat_temp_2")
     private Double pitchBatTemp2;
 
     @Schema(description = "变桨电池柜温度3")
+    @TableField("pitch_bat_temp_3")
     private Double pitchBatTemp3;
 
     @Schema(description = "主轴转速")
+    @TableField("main_shaft_spd")
     private Double mainShaftSpd;
 
     @Schema(description = "主轴承温度1")
+    @TableField("main_brg_temp_1")
     private Double mainBrgTemp1;
 
     @Schema(description = "主轴承温度2")
+    @TableField("main_brg_temp_2")
     private Double mainBrgTemp2;
 
     @Schema(description = "主轴承温度3")
+    @TableField("main_brg_temp_3")
     private Double mainBrgTemp3;
 
     @Schema(description = "主轴承温度4")
+    @TableField("main_brg_temp_4")
     private Double mainBrgTemp4;
 
     @Schema(description = "齿轮箱转速1")
+    @TableField("gearbox_spd_1")
     private Double gearboxSpd1;
 
     @Schema(description = "齿轮箱转速2")
+    @TableField("gearbox_spd_2")
     private Double gearboxSpd2;
 
     @Schema(description = "高速轴驱动端轴承温度")
+    @TableField("high_spd_shaft_de_temp")
     private Double highSpdShaftDeTemp;
 
     @Schema(description = "高速轴非驱动端轴承温度")
+    @TableField("high_spd_shaft_nde_temp")
     private Double highSpdShaftNdeTemp;
 
     @Schema(description = "高速轴轴承温度")
+    @TableField("high_spd_shaft_temp")
     private Double highSpdShaftTemp;
 
     @Schema(description = "中速轴驱动端轴承温度")
+    @TableField("mid_spd_shaft_de_temp")
     private Double midSpdShaftDeTemp;
 
     @Schema(description = "中速轴非驱动端轴承温度")
+    @TableField("mid_spd_shaft_nde_temp")
     private Double midSpdShaftNdeTemp;
 
     @Schema(description = "中速轴轴承温度")
+    @TableField("mid_spd_shaft_temp")
     private Double midSpdShaftTemp;
 
     @Schema(description = "低速轴驱动端轴承温度")
+    @TableField("low_spd_shaft_de_temp")
     private Double lowSpdShaftDeTemp;
 
     @Schema(description = "低速轴非驱动端轴承温度")
+    @TableField("low_spd_shaft_nde_temp")
     private Double lowSpdShaftNdeTemp;
 
     @Schema(description = "低速轴轴承温度")
+    @TableField("low_spd_shaft_temp")
     private Double lowSpdShaftTemp;
 
     @Schema(description = "齿轮箱油池温度")
+    @TableField("gearbox_oil_temp")
     private Double gearboxOilTemp;
 
     @Schema(description = "齿轮箱出口油压")
+    @TableField("gb_out_oil_prs")
     private Double gbOutOilPrs;
 
     @Schema(description = "齿轮箱入口油压")
+    @TableField("gb_in_oil_prs")
     private Double gbInOilPrs;
 
     @Schema(description = "齿轮箱冷却水温度")
+    @TableField("gb_cool_water_temp")
     private Double gbCoolWaterTemp;
 
     @Schema(description = "发电机转速")
+    @TableField("gen_spd")
     private Double genSpd;
 
     @Schema(description = "发电机驱动端轴承温度")
+    @TableField("gen_de_temp")
     private Double genDeTemp;
 
     @Schema(description = "发电机非驱动端轴承温度")
+    @TableField("gen_nde_temp")
     private Double genNdeTemp;
 
     @Schema(description = "发电机轴承温度1")
+    @TableField("gen_brg_temp_1")
     private Double genBrgTemp1;
 
     @Schema(description = "发电机轴承温度2")
+    @TableField("gen_brg_temp_2")
     private Double genBrgTemp2;
 
     @Schema(description = "发电机轴承温度3")
+    @TableField("gen_brg_temp_3")
     private Double genBrgTemp3;
 
     @Schema(description = "发电机轴承温度4")
+    @TableField("gen_brg_temp_4")
     private Double genBrgTemp4;
 
     @Schema(description = "转子温度")
+    @TableField("rotor_temp")
     private Double rotorTemp;
 
     @Schema(description = "定子绕组温度1")
+    @TableField("stator_wind_temp_1")
     private Double statorWindTemp1;
 
     @Schema(description = "定子绕组温度2")
+    @TableField("stator_wind_temp_2")
     private Double statorWindTemp2;
 
     @Schema(description = "定子绕组温度3")
+    @TableField("stator_wind_temp_3")
     private Double statorWindTemp3;
 
     @Schema(description = "定子绕组温度4")
+    @TableField("stator_wind_temp_4")
     private Double statorWindTemp4;
 
     @Schema(description = "定子绕组温度5")
+    @TableField("stator_wind_temp_5")
     private Double statorWindTemp5;
 
     @Schema(description = "定子绕组温度6")
+    @TableField("stator_wind_temp_6")
     private Double statorWindTemp6;
 
     @Schema(description = "发电机输入水温")
+    @TableField("gen_in_water_temp")
     private Double genInWaterTemp;
 
     @Schema(description = "发电机输出水温")
+    @TableField("gen_out_water_temp")
     private Double genOutWaterTemp;
 
     @Schema(description = "发电机进风口温度")
+    @TableField("gen_in_air_temp")
     private Double genInAirTemp;
 
     @Schema(description = "发电机出风口温度")
+    @TableField("gen_out_air_temp")
     private Double genOutAirTemp;
 
     @Schema(description = "电网A相电流")
+    @TableField("grid_ia")
     private Double gridIa;
 
     @Schema(description = "电网B相电流")
+    @TableField("grid_ib")
     private Double gridIb;
 
     @Schema(description = "电网C相电流")
+    @TableField("grid_ic")
     private Double gridIc;
 
     @Schema(description = "电网A相电压")
+    @TableField("grid_ua")
     private Double gridUa;
 
     @Schema(description = "电网B相电压")
+    @TableField("grid_ub")
     private Double gridUb;
 
     @Schema(description = "电网C相电压")
+    @TableField("grid_uc")
     private Double gridUc;
 
     @Schema(description = "AB相电压相位角")
+    @TableField("grid_uab_phase_ang")
     private Double gridUabPhaseAng;
 
     @Schema(description = "BC相电压相位角")
+    @TableField("grid_ubc_phase_ang")
     private Double gridUbcPhaseAng;
 
     @Schema(description = "CA相电压相位角")
+    @TableField("grid_uca_phase_ang")
     private Double gridUcaPhaseAng;
 
     @Schema(description = "A相电压电流相位角")
+    @TableField("grid_ua_ia_phase_ang")
     private Double gridUaIaPhaseAng;
 
     @Schema(description = "B相电压电流相位角")
+    @TableField("grid_ub_ib_phase_ang")
     private Double gridUbIbPhaseAng;
 
     @Schema(description = "C相电压电流相位角")
+    @TableField("grid_uc_ic_phase_ang")
     private Double gridUcIcPhaseAng;
 
     @Schema(description = "网侧频率")
+    @TableField("grid_freq")
     private Double gridFreq;
 
     @Schema(description = "变流器转速")
+    @TableField("conv_spd")
     private Double convSpd;
 
     @Schema(description = "变流器机侧L1IGBT温度")
+    @TableField("conv_mach_l1_igbt_temp")
     private Double convMachL1IgbtTemp;
 
     @Schema(description = "变流器机侧L2IGBT温度")
+    @TableField("conv_mach_l2_igbt_temp")
     private Double convMachL2IgbtTemp;
 
     @Schema(description = "变流器机侧L3IGBT温度")
+    @TableField("conv_mach_l3_igbt_temp")
     private Double convMachL3IgbtTemp;
 
     @Schema(description = "变流器机侧逆变器温度")
+    @TableField("conv_mach_inv_temp")
     private Double convMachInvTemp;
 
     @Schema(description = "变流器机侧控制器温度")
+    @TableField("conv_mach_cntlr_temp")
     private Double convMachCntlrTemp;
 
     @Schema(description = "变流器机侧电抗器温度")
+    @TableField("conv_mach_reactor_temp")
     private Double convMachReactorTemp;
 
     @Schema(description = "变流器网侧L1IGBT温度")
+    @TableField("conv_grid_l1_igbt_temp")
     private Double convGridL1IgbtTemp;
 
     @Schema(description = "变流器网侧L2IGBT温度")
+    @TableField("conv_grid_l2_igbt_temp")
     private Double convGridL2IgbtTemp;
 
     @Schema(description = "变流器网侧L3IGBT温度")
+    @TableField("conv_grid_l3_igbt_temp")
     private Double convGridL3IgbtTemp;
 
     @Schema(description = "变流器网侧逆变器温度")
+    @TableField("conv_grid_inv_temp")
     private Double convGridInvTemp;
 
     @Schema(description = "变流器网侧控制器温度")
+    @TableField("conv_grid_cntlr_temp")
     private Double convGridCntlrTemp;
 
     @Schema(description = "变流器网侧电抗器温度")
+    @TableField("conv_grid_reactor_temp")
     private Double convGridReactorTemp;
 
     @Schema(description = "变流器冷却液温度")
+    @TableField("conv_coolant_temp")
     private Double convCoolantTemp;
 
     @Schema(description = "变流器冷却水入口温度")
+    @TableField("conv_cool_in_temp")
     private Double convCoolInTemp;
 
     @Schema(description = "变流器冷却水出口温度")
+    @TableField("conv_cool_out_temp")
     private Double convCoolOutTemp;
 
     @Schema(description = "液压站压力")
+    @TableField("hyd_station_prs")
     private Double hydStationPrs;
 
     @Schema(description = "液压泵压力")
+    @TableField("hyd_pump_prs")
     private Double hydPumpPrs;
 
     @Schema(description = "液压泵启动次数")
+    @TableField("hyd_pump_start_cnt")
     private Double hydPumpStartCnt;
 
     @Schema(description = "液压液位")
+    @TableField("hyd_tank_level")
     private Double hydTankLevel;
 
     @Schema(description = "液压油温度")
+    @TableField("hyd_oil_temp")
     private Double hydOilTemp;
 
     @Schema(description = "液压油压力")
+    @TableField("hyd_oil_prs")
     private Double hydOilPrs;
 
     @Schema(description = "液压站液位状态")
+    @TableField("hyd_level_sts")
     private Double hydLevelSts;
 
     @Schema(description = "液压油温度状态")
+    @TableField("hyd_oil_temp_sts")
     private Double hydOilTempSts;
 
     @Schema(description = "风机状态")
+    @TableField("wtg_sts")
     private Double wtgSts;
 
     @Schema(description = "集控风机状态")
+    @TableField("scada_wtg_sts")
     private Double scadaWtgSts;
 
     @Schema(description = "风速")
+    @TableField("wind_spd")
     private Double windSpd;
 
     @Schema(description = "有功功率")
+    @TableField("p_active")
     private Double pActive;
 
     @Schema(description = "无功功率")
+    @TableField("p_reactive")
     private Double pReactive;
 
     @Schema(description = "风轮转速")
+    @TableField("rotor_spd")
     private Double rotorSpd;
 
     @Schema(description = "叶轮转速")
+    @TableField("impeller_spd")
     private Double impellerSpd;
 
     @Schema(description = "安全链状态")
+    @TableField("safety_chain_sts")
     private Double safetyChainSts;
 
     @Schema(description = "安全链急停")
+    @TableField("safety_chain_estop_sts")
     private Double safetyChainEstopSts;
 
     @Schema(description = "机舱内温度")
+    @TableField("nacelle_in_temp")
     private Double nacelleInTemp;
 
     @Schema(description = "机舱外温度")
+    @TableField("nacelle_out_temp")
     private Double nacelleOutTemp;
 
     @Schema(description = "塔筒前后振动值")
+    @TableField("tower_fb_vib")
     private Double towerFbVib;
 
     @Schema(description = "塔筒左右振动值")
+    @TableField("tower_lr_vib")
     private Double towerLrVib;
 
     @Schema(description = "塔筒前后振动加速度")
+    @TableField("tower_fb_vib_acc")
     private Double towerFbVibAcc;
 
     @Schema(description = "塔筒左右振动加速度")
+    @TableField("tower_lr_vib_acc")
     private Double towerLrVibAcc;
 
     @Schema(description = "塔底温度")
+    @TableField("tower_base_temp")
     private Double towerBaseTemp;
 
     @Schema(description = "塔底塔筒外温度")
+    @TableField("tower_base_out_temp")
     private Double towerBaseOutTemp;
 
     @Schema(description = "机舱控制柜温度")
+    @TableField("nacelle_cab_temp")
     private Double nacelleCabTemp;
 
     @Schema(description = "塔底控制柜温度")
+    @TableField("tower_base_cab_temp")
     private Double towerBaseCabTemp;
 
     @Schema(description = "机舱UPS蓄电池电量低告警状态")
+    @TableField("nacelle_ups_batt_low_sts")
     private Double nacelleUpsBattLowSts;
 
     @Schema(description = "机舱UPS供电正常")
+    @TableField("nacelle_ups_pwr_sts")
     private Double nacelleUpsPwrSts;
 
     @Schema(description = "机舱UPS电池正常")
+    @TableField("nacelle_ups_batt_sts")
     private Double nacelleUpsBattSts;
 
     @Schema(description = "塔底UPS蓄电池电量低告警状态")
+    @TableField("tower_ups_batt_low_sts")
     private Double towerUpsBattLowSts;
 
     @Schema(description = "塔底UPS供电正常")
+    @TableField("tower_ups_pwr_sts")
     private Double towerUpsPwrSts;
 
     @Schema(description = "塔底UPS电池正常")
+    @TableField("tower_ups_batt_sts")
     private Double towerUpsBattSts;
 
     @Schema(description = "机舱位置")
+    @TableField("nacelle_pos")
     private Double nacellePos;
 
     @Schema(description = "绝对风向")
+    @TableField("abs_wind_dir")
     private Double absWindDir;
 
     @Schema(description = "理论有功功率")
+    @TableField("theory_p_active")
     private Double theoryPActive;
 
     @Schema(description = "有功设定值")
+    @TableField("p_active_set")
     private Double pActiveSet;
 
     @Schema(description = "有功设定反馈")
+    @TableField("p_active_set_fbk")
     private Double pActiveSetFbk;
 
     @Schema(description = "实际力矩")
+    @TableField("actual_torque")
     private Double actualTorque;
 
     @Schema(description = "给定力矩")
+    @TableField("set_torque")
     private Double setTorque;
 
     @Schema(description = "湍流强度")
+    @TableField("turb_intensity")
     private Double turbIntensity;
+
+    @Schema(description = "厂家风机状态")
+    @TableField("vendor_wtg_sts")
+    private Integer vendorWtgSts;
+
+    @Schema(description = "厂家风机状态接入")
+    @TableField("vendor_access_sts")
+    private Integer vendorAccessSts;
+
+    @Schema(description = "故障")
+    @TableField("fault_sts")
+    private Integer faultSts;
 }

+ 58 - 0
src/main/java/com/znzn/project/dc/dtdata/entity/WindTurbineDataInfo.java

@@ -0,0 +1,58 @@
+package com.znzn.project.dc.dtdata.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * <p>
+ *
+ * </p>
+ *
+ * @author 魏志亮
+ * @since 2025-11-10
+ */
+@Getter
+@Setter
+@TableName("wind_turbine_data_info")
+@Schema(name = "WindTurbineDataInfo", description = "风机数据信息")
+public class WindTurbineDataInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    @Schema(description = "风机编号")
+    private String windTurbineId;
+
+    @Schema(description = "风机名称")
+    private String windTurbineName;
+
+    @Schema(description = "日期")
+    private Date dataTime;
+
+    @Schema(description = "文件存储配置ID")
+    private Integer fileStoreId;
+
+    @Schema(description = "读取url")
+    private String readUri;
+
+    @Schema(description = "最新一条数据")
+    private String lastDataJson;
+
+    @Schema(description = "当日数据数量")
+    private Integer dataCount;
+
+    @Schema(description = "创建时间")
+    private Date createTime;
+
+    @Schema(description = "最近更新时间")
+    private Date updateTime;
+}

+ 7 - 2
src/main/java/com/znzn/project/dc/dtdata/generate/CodeGenerateMain.java

@@ -18,9 +18,13 @@ import static com.baomidou.mybatisplus.core.enums.SqlLike.RIGHT;
  * @author 魏志亮
  */
 public class CodeGenerateMain {
+//
+//    static String host = "192.168.50.235";
+//    static Integer port = 30306;
+
+    static String host = "127.0.0.1";
+    static Integer port = 3306;
 
-    static String host = "192.168.50.235";
-    static Integer port = 30306;
     static String dbName = "datang";
     static String username = "root";
     static String password = "admin123456";
@@ -71,6 +75,7 @@ public class CodeGenerateMain {
                         builder.addInclude(includTables);
                     }
                     builder.entityBuilder().enableLombok();
+                    builder.entityBuilder().enableTableFieldAnnotation();
                     builder.entityBuilder().enableFileOverride();
                     builder.controllerBuilder().enableRestStyle();
                 })

+ 21 - 4
src/main/java/com/znzn/project/dc/dtdata/inOutEntity/CollectDataInData.java

@@ -4,17 +4,34 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
 
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
 
-@Data
 @Schema
+@Data
 public class CollectDataInData {
 
     @Schema(description = "开始时间")
-    private LocalDateTime beginTime;
+    private String beginTime;
 
     @Schema(description = "结束时间")
-    private LocalDateTime endTime = LocalDateTime.now();
+    private String endTime;
 
     @Schema(description = "时间间隔:分钟 30,60,1440", defaultValue = "1440")
-    private Integer minute = 1440;
+    private Integer minute;
+
+    @Schema(description = "风机ID数组")
+    private List<String> windTurbineIdList;
+
+    @Schema(description = "执行历史错误的ID")
+    private Integer execErrorId;
+
+    public LocalDateTime getBeginTime() {
+        return LocalDateTime.parse(beginTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+    }
+
+    public LocalDateTime getEndTime() {
+        return LocalDateTime.parse(endTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+    }
 }

+ 37 - 0
src/main/java/com/znzn/project/dc/dtdata/inOutEntity/CollectDataWithFileInData.java

@@ -0,0 +1,37 @@
+package com.znzn.project.dc.dtdata.inOutEntity;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.NotBlank;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+import java.util.List;
+
+@Data
+@Schema
+public class CollectDataWithFileInData {
+
+    @Schema(description = "开始时间")
+    @NotBlank
+    private String beginTime;
+
+    @Schema(description = "开始时间")
+    @NotBlank
+    private String endTime;
+
+    @Schema(description = "风机ID数组")
+    private List<String> windTurbineIdList;
+
+    @Schema(description = "逻辑测点")
+    private List<String> pointsList;
+
+
+    public LocalDateTime getBeginTime() {
+        return LocalDateTime.parse(beginTime);
+    }
+
+    public LocalDateTime getEndTime() {
+        return LocalDateTime.parse(endTime);
+    }
+}
+

+ 18 - 0
src/main/java/com/znzn/project/dc/dtdata/mapper/WindTurbineDataInfoMapper.java

@@ -0,0 +1,18 @@
+package com.znzn.project.dc.dtdata.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.znzn.project.dc.dtdata.entity.WindTurbineDataInfo;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * <p>
+ * Mapper 接口
+ * </p>
+ *
+ * @author 魏志亮
+ * @since 2025-11-10
+ */
+@Mapper
+public interface WindTurbineDataInfoMapper extends BaseMapper<WindTurbineDataInfo> {
+
+}

+ 2 - 2
src/main/java/com/znzn/project/dc/dtdata/mapper/WindTurbineDataMapper.java

@@ -18,8 +18,8 @@ import java.util.Map;
 @Mapper
 public interface WindTurbineDataMapper extends BaseMapper<WindTurbineData> {
 
-    @Select("select  * from  wind_turbine_data where wind_farm_id = #{windFarmId} AND turbine_id = #{turbineId} and data_time > #{dataTime} order by data_time desc limit 1 ")
-    Map<String, Object> getLastHourLatestData(String dataTime, String windFarmId, String turbineId);
+    @Select("select  * from  wind_turbine_data where turbine_id = #{turbineId} and data_time > #{dataTime} order by data_time desc limit 1 ")
+    Map<String, Object> getLastHourLatestData(String dataTime,String turbineId);
 
     void addMonthlyPartition(String partitionName, String partitionEndDate);
 

+ 3 - 0
src/main/java/com/znzn/project/dc/dtdata/service/IDataCollectionService.java

@@ -1,8 +1,11 @@
 package com.znzn.project.dc.dtdata.service;
 
 import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataWithFileInData;
 
 public interface IDataCollectionService {
 
     void collectData(CollectDataInData inData);
+
+    byte [] collectDataWithFile(CollectDataWithFileInData inData);
 }

+ 5 - 0
src/main/java/com/znzn/project/dc/dtdata/service/IInitService.java

@@ -0,0 +1,5 @@
+package com.znzn.project.dc.dtdata.service;
+
+public interface IInitService {
+    void init();
+}

+ 27 - 0
src/main/java/com/znzn/project/dc/dtdata/service/IWindTurbineDataInfoService.java

@@ -0,0 +1,27 @@
+package com.znzn.project.dc.dtdata.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.znzn.project.dc.dtdata.entity.WindTurbineDataInfo;
+
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * <p>
+ * 服务类
+ * </p>
+ *
+ * @author 魏志亮
+ * @since 2025-11-10
+ */
+public interface IWindTurbineDataInfoService extends IService<WindTurbineDataInfo> {
+
+    /**
+     * 查询每个风场+机型的风机数据
+     */
+    void getEveryWindAndTypeData();
+
+    List<WindTurbineDataInfo> getByFileStoreIdAndDate(Integer fileStoreId, LocalDateTime queryBeginDate);
+
+    WindTurbineDataInfo getLastJsonData(LocalDateTime dataTime, String turbineId);
+}

+ 1 - 1
src/main/java/com/znzn/project/dc/dtdata/service/IWindTurbineDataService.java

@@ -15,5 +15,5 @@ import java.util.Map;
  */
 public interface IWindTurbineDataService extends IService<WindTurbineData> {
 
-    Map<String, Object> getLastHourLatestData(String format, String windFarmId, String id);
+    Map<String, Object> getLastHourLatestData(String format, String id);
 }

+ 175 - 144
src/main/java/com/znzn/project/dc/dtdata/service/impl/DataCollectionService.java

@@ -1,26 +1,34 @@
 package com.znzn.project.dc.dtdata.service.impl;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.znzn.project.dc.dtdata.common.CommonData;
-import com.znzn.project.dc.dtdata.entity.*;
+import com.znzn.project.dc.dtdata.entity.ExecError;
+import com.znzn.project.dc.dtdata.entity.FileStore;
+import com.znzn.project.dc.dtdata.entity.WindTurbine;
+import com.znzn.project.dc.dtdata.entity.WindTurbineDataInfo;
 import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
-import com.znzn.project.dc.dtdata.mapper.StandardizedDataMapper;
+import com.znzn.project.dc.dtdata.inOutEntity.CollectDataWithFileInData;
 import com.znzn.project.dc.dtdata.service.*;
+import com.znzn.project.dc.dtdata.utils.enos.DateUtils;
 import com.znzn.project.dc.dtdata.utils.enos.EnosapiUtils;
+import com.znzn.project.dc.dtdata.utils.enos.ParquetUtils;
 import jakarta.annotation.Resource;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -33,7 +41,6 @@ public class DataCollectionService implements IDataCollectionService {
     private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     private static final DateTimeFormatter DATE_FORMATTER_WITH_NUMBER = DateTimeFormatter.ofPattern("yyyyMMdd");
 
-    private static final DateTimeFormatter DATE_FORMATTER_WITH_ZERO = DateTimeFormatter.ofPattern("yyyy-MM-dd 00:00:00");
     private final ExecutorService executorService;
 
     @Resource
@@ -43,13 +50,9 @@ public class DataCollectionService implements IDataCollectionService {
     @Resource
     private IWindTurbineDataInfoService windTurbineDataInfoService;
     @Resource
-    private StandardizedDataMapper standardizedDataMapper;
-
     private IWindTurbineDataService windTurbineDataService;
 
-//    private final Map<String, JSONObject> lastData = new HashMap<>();
-
-    private static final Boolean saveCsv = true;
+//    private static final Boolean saveCsv = true;
 
 
     public DataCollectionService() {
@@ -69,11 +72,11 @@ public class DataCollectionService implements IDataCollectionService {
 
         // 按日期循环处理
         LocalDateTime date = inData.getBeginTime();
-        while (!date.isAfter(end)) {
+        while (!date.isAfter(end) && !date.isEqual(end)) {
             LocalDateTime queryEndDate = date.plusMinutes(inData.getMinute());
             long begin = System.currentTimeMillis();
             log.info("{},开始时间{},结束时间{},时间跨度为{}分钟", fileStore.getName(), date, end, inData.getMinute());
-            processSingleData(date, queryEndDate);
+            processSingleData(date, queryEndDate, inData);
             execDataDateService.finishAndAddNextTime(id, queryEndDate);
             date = queryEndDate;
             log.info("{},开始时间{},结束时间{},时间跨度为{}分钟,耗时{}", fileStore.getName(), date, end, inData.getMinute(),
@@ -81,34 +84,32 @@ public class DataCollectionService implements IDataCollectionService {
         }
     }
 
-//    private LocalDateTime getStartDate(Integer id) {
-//        ExecDataDate execDataDate = execDataDateService.getById(id);
-//        LocalDateTime start;
-//        if (null == execDataDate) {
-//            LocalDateTime today = LocalDateTime.now();
-//            LocalDateTime twoMonthsAgoFirstDay = today
-//                    .minusMonths(2)
-//                    .with(TemporalAdjusters.firstDayOfMonth())
-//                    .toLocalDate()
-//                    .atStartOfDay();
-//            execDataDate = new ExecDataDate();
-//            execDataDate.setId(id);
-//            execDataDate.setExecDateTime(twoMonthsAgoFirstDay);
-//            execDataDateService.save(execDataDate);
-//            start = execDataDate.getExecDateTime();
-//        } else {
-//            start = execDataDate.getExecDateTime();
-//        }
-//        return start;
-//    }
+    @SneakyThrows
+    @Override
+    public byte[] collectDataWithFile(CollectDataWithFileInData inData) {
+        List<JSONObject> list = new ArrayList<>();
+        for (String turbineId : inData.getWindTurbineIdList()) {
+            List<JSONObject> datas = EnosapiUtils.queryHistoricalMeasurementPoint(turbineId,
+                    inData.getBeginTime().format(DATE_FORMATTER), inData.getEndTime().format(DATE_FORMATTER),
+                    inData.getPointsList(), "1m", true);
+
+            if (null != datas && !datas.isEmpty()) {
+                list.addAll(datas);
+            }
+        }
+        if (list.isEmpty()) {
+            return new byte[0];
+        }
+        return convertListToCsv(list, inData.getPointsList());
+    }
 
     /**
      * 处理单日数据
      */
-    private void processSingleData(LocalDateTime queryBeginDate, LocalDateTime queryEndDate) {
+    private void processSingleData(LocalDateTime queryBeginDate, LocalDateTime queryEndDate, CollectDataInData inData) {
         long logBegin = System.currentTimeMillis();
         log.info("开始处理日期: {} 的数据", queryBeginDate);
-
+        List<String> windTurbineIdList = inData.getWindTurbineIdList();
         FileStore fileStore = CommonData.FILE_STORE;
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -117,71 +118,88 @@ public class DataCollectionService implements IDataCollectionService {
         List<String> existsWindturbineList = windTurbineDataInfoService.getByFileStoreIdAndDate(fileStore.getId(), queryBeginDate)
                 .stream().map(WindTurbineDataInfo::getWindTurbineId).toList();
 
-        // 按风机分组处理
-        for (WindTurbine turbine : CommonData.NEED_RUN_TURBINE.values()) {
-            if (!existsWindturbineList.contains(turbine.getId())) {
-                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-                    try {
-                        List<String> pointIdsWithLogic = CommonData.pointIdsWithLogicMap.get(String.format("%s%s", turbine.getWindFarmId(), turbine.getType()));
-                        if (!pointIdsWithLogic.isEmpty()) {
-                            processTurbineData(turbine, queryBeginDate, queryEndDate, pointIdsWithLogic, fileStore);
-                        } else {
-                            log.error("[{}]没有配置测点", turbine);
-                        }
-                    } catch (Exception e) {
-                        log.error("处理风机 {} 数据失败: {}", turbine.getId(), e.getMessage(), e);
+        List<WindTurbine> queryTurbineList = new ArrayList<>();
+        if (null != windTurbineIdList && !windTurbineIdList.isEmpty()) {
+            for (String windTurbineId : windTurbineIdList) {
+                for (WindTurbine turbine : CommonData.NEED_RUN_TURBINE.values()) {
+                    if (windTurbineId.equals(turbine.getId())) {
+                        queryTurbineList.add(turbine);
                     }
-                }, executorService);
-                futures.add(future);
+                }
             }
+
+        } else {
+            queryTurbineList.addAll(CommonData.NEED_RUN_TURBINE.values());
         }
 
-//        for (WindTurbine turbine : CommonData.NEED_RUN_TURBINE.values()) {
-//            if (!existsWindturbineList.contains(turbine.getId())) {
-//                String path = fileStore.getSavaDir() + File.separator + queryBeginDate.format(DATE_FORMATTER_WITH_NUMBER) + File.separator + turbine.getType() + File.separator + turbine.getWindFarmId() + File.separator + turbine.getId() + ".json";
-//                if (!Files.exists(new File(path).toPath())) {
-//                    log.debug(queryBeginDate.format(DATE_FORMATTER_WITH_ZERO) + "  " + path);
-//                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-//                        try {
-//                            List<String> pointIdsWithLogic = CommonData.pointIdsWithLogicMap.get(String.format("%s%s", turbine.getWindFarmId(), turbine.getType()));
-//                            if (!pointIdsWithLogic.isEmpty()) {
-//                                processTurbineData(turbine, queryBeginDate, queryEndDate, pointIdsWithLogic, fileStore);
-//                            } else {
-//                                log.error("[{}]没有配置测点", turbine);
-//                            }
-//                        } catch (Exception e) {
-//                            log.error("处理风机 {} 数据失败: {}", turbine.getId(), e.getMessage(), e);
-//                        }
-//                    }, executorService);
-//                    futures.add(future);
-//                }
-//            }
-//        }
+        for (WindTurbine turbine : queryTurbineList) {
+            if (!existsWindturbineList.contains(turbine.getId())) {
+                String path = fileStore.getSavaDir() + File.separator + queryBeginDate.format(DATE_FORMATTER_WITH_NUMBER) + File.separator + turbine.getType() + File.separator + turbine.getWindFarmId() + File.separator + turbine.getId() + ".json";
+                if (!Files.exists(new File(path).toPath())) {
+                    log.debug(queryBeginDate.format(DATE_FORMATTER) + "  " + path);
+                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                        try {
+                            HashSet<String> pointIdsWithLogic = CommonData.pointIdsWithLogicMap.get(String.format("%s%s", turbine.getWindFarmId(), turbine.getType()));
+                            if (!pointIdsWithLogic.isEmpty()) {
+                                processTurbineData(turbine, queryBeginDate, queryEndDate, pointIdsWithLogic, inData.getExecErrorId());
+                            } else {
+                                log.error("[{}]没有配置测点", turbine);
+                            }
+                        } catch (Exception e) {
+                            log.error("处理风机 {} 数据失败: {}", turbine.getId(), e.getMessage(), e);
+                        }
+                    }, executorService);
+                    futures.add(future);
+                }
+            }
+        }
 
         // 等待所有任务完成
         CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
         log.info("日期: {} 的数据处理完成, 耗时:{} 秒", queryBeginDate, (System.currentTimeMillis() - logBegin) / 1000);
     }
 
+    private void saveExecError(WindTurbine turbine, LocalDateTime queryStartDate, LocalDateTime queryEndDate, Integer errorCode, String message, Integer execErrorId) {
+        if (null == execErrorId) {
+            ExecError execError = new ExecError();
+            execError.setWindTurbineId(turbine.getId());
+            execError.setWindTurbineName(turbine.getName());
+            execError.setStoreId(CommonData.FILE_STORE.getId());
+            execError.setQueryBegin(LocalDateTime.from(queryStartDate));
+            execError.setQueryEnd(LocalDateTime.from(queryEndDate));
+            execError.setErrorCode(errorCode);
+            execError.setErrorMsg(message);
+            execErrorService.save(execError);
+        } else {
+            ExecError execError = execErrorService.getById(execErrorId);
+            ExecError updateExecError = new ExecError();
+            updateExecError.setRetryTimes(execError.getRetryTimes() + 1);
+            updateExecError.setId(execErrorId);
+            updateExecError.setErrorCode(errorCode);
+            execErrorService.updateById(updateExecError);
+        }
+    }
+
     /**
      * 处理单个风机数据
      */
-    private void processTurbineData(WindTurbine turbine, LocalDateTime queryStartDate, LocalDateTime queryEndDate, List<String>
-            pointIdsWithLogic, FileStore fileStore) throws IOException {
+    private void processTurbineData(WindTurbine turbine, LocalDateTime queryStartDate, LocalDateTime queryEndDate, HashSet<String>
+            pointIdsWithLogic, Integer execErrorId) throws IOException {
 
         long begin = System.currentTimeMillis();
         // 查询历史数据
         List<JSONObject> historicalData = null;
         try {
-            historicalData = EnosapiUtils.queryHistoricalMeasurementPoint(turbine.getId(), queryStartDate.format(DATE_FORMATTER), queryEndDate.format(DATE_FORMATTER), pointIdsWithLogic, "1m", true);
+            historicalData = EnosapiUtils.queryHistoricalMeasurementPoint(turbine.getId(), queryStartDate.format(DATE_FORMATTER), queryEndDate.format(DATE_FORMATTER), new ArrayList<>(pointIdsWithLogic), "1m", true);
         } catch (Exception e) {
-            ExecError execError = new ExecError();
-            execError.setWindTurbineId(turbine.getId());
-            execError.setWindTurbineName(turbine.getName());
-            execError.setQueryBegin(LocalDateTime.from(queryStartDate));
-            execError.setQueryEnd(LocalDateTime.from(queryEndDate));
-            execError.setErrorMsg(e.getMessage());
-            execErrorService.save(execError);
+            String message = e.getMessage();
+            saveExecError(turbine, queryStartDate, queryEndDate, 2, message, execErrorId);
+        }
+
+        if (null != historicalData && historicalData.isEmpty()) {
+            String message = "没有获取到数据";
+            saveExecError(turbine, queryStartDate, queryEndDate, 1, message, execErrorId);
+            log.warn("风机 {} 在日期 {} 没有数据", turbine.getId(), queryStartDate);
         }
 
         if (historicalData != null && !historicalData.isEmpty()) {
@@ -191,80 +209,52 @@ public class DataCollectionService implements IDataCollectionService {
 
             // 查询上个时间段完整数据
             JSONObject existsData = null;
-            Map<String, Object> map = windTurbineDataService.getLastHourLatestData(queryStartDate.minusHours(1L).format(DATE_FORMATTER), turbine.getWindFarmId(), turbine.getId());
-            if (null != map) {
-                existsData = new JSONObject(map);
+            WindTurbineDataInfo windTurbineDataInfo = windTurbineDataInfoService.getLastJsonData(queryStartDate.minusDays(1L), turbine.getId());
+            if (null != windTurbineDataInfo && StringUtils.isNotBlank(windTurbineDataInfo.getLastDataJson())) {
+                existsData = JSON.parseObject(windTurbineDataInfo.getLastDataJson());
             }
 
-            List<WindTurbineData> windTurbineDataList = standardizeData(historicalData, standerList, turbine.getId(), existsData);
+            List<JSONObject> jsonDatas = standardizeData(historicalData, standerList, turbine.getId(), existsData);
+            JSONObject lastObject = null;
 
-            windTurbineDataService.saveBatch(windTurbineDataList);
+            if (!jsonDatas.isEmpty()) {
+                if (jsonDatas.get(jsonDatas.size() - 1).getString("localtime").contains(":59:00"))
+                    lastObject = jsonDatas.get(jsonDatas.size() - 1);
+            }
+
+            if (null != execErrorId) {
+//                saveExecError(turbine, queryStartDate, queryEndDate, 0, "", execErrorId);
+                execErrorService.removeById(execErrorId);
+            }
 
-//            if (StringUtils.isBlank(tableName)) {
-//                log.error("{}获取表名失败", turbine);
-//            } else {
-//                //TODO 保存到数据表
-//                standardizedDataMapper.batchInsert(tableName, standardizedData);
-//                updateWindTurbineDataTable(turbine, queryStartDate, standardizedData.size(),"");
-//            }
-//
 
             // 构建文件路径 保存为文件
-//            String filePath = buildFilePath(turbine, queryStartDate);
-//            createDirectories(filePath);
-//
-//            List<String> allStanderList = new ArrayList<>();
-//            for (Set<String> keys : standerList) {
-//                allStanderList.addAll(keys);
-//            }
-//            JsonToParquetUtils.saveAsParquet(standardizedData, allStanderList, filePath);
+            String filePath = buildFilePath(turbine, queryStartDate);
+            createDirectories(filePath);
+
+            List<String> allStanderList = new ArrayList<>();
+            for (Set<String> keys : standerList) {
+                allStanderList.addAll(keys);
+            }
+            ParquetUtils.saveAsParquet(jsonDatas, allStanderList, filePath);
+
 
-            updateWindTurbineDataTable(turbine, queryStartDate, windTurbineDataList.size());
+            updateWindTurbineDataInfoTable(turbine, queryStartDate, jsonDatas.size(), lastObject, filePath);
 
         }
-//        else {
-//            ExecError execError = new ExecError();
-//            execError.setWindTurbineId(turbine.getId());
-//            execError.setWindTurbineName(turbine.getName());
-//            execError.setQueryBegin(LocalDateTime.from(queryStartDate));
-//            execError.setQueryEnd(LocalDateTime.from(queryEndDate));
-//            execError.setErrorMsg("没有获取到数据");
-//            execErrorService.save(execError);
-//            log.warn("风机 {} 在日期 {} 没有数据", turbine.getId(), queryStartDate);
-//        }
-        log.info("风场{},风机{},开始时间{},结束时间{},耗时{}", turbine.getName(), turbine.getCode(),
+        log.info("风场:{},风机:{},风机ID:{},开始时间:{},结束时间:{},耗时:{}", turbine.getName(), turbine.getCode(), turbine.getId(),
                 queryStartDate.format(DATE_FORMATTER), queryEndDate.format(DATE_FORMATTER), (System.currentTimeMillis() - begin) / 1000);
     }
 
-    private void updateWindTurbineDataTable(WindTurbine turbine, LocalDateTime queryStartDate, Integer dataCount) {
-//        LambdaQueryWrapper<WindTurbineDataInfo> wrapper = Wrappers.lambdaQuery(WindTurbineDataInfo.class);
-//        wrapper.eq(WindTurbineDataInfo::getWindTurbineId, turbine.getId())
-//                .eq(WindTurbineDataInfo::getDataTime, queryStartDate);
-//
-//
-//        WindTurbineDataInfo existsData = windTurbineDataInfoService.getOne(wrapper);
-//        if (null == existsData) {
-//            WindTurbineDataInfo data = new WindTurbineDataInfo();
-//            data.setWindTurbineId(turbine.getId());
-//            data.setWindTurbineName(turbine.getName());
-//            data.setDataTime(queryStartDate);
-//            data.setFileStoreId(CommonData.FILE_STORE.getId());
-//            data.setReadUri("");
-//            data.setDataCount(dataCount);
-//            windTurbineDataInfoService.save(data);
-//        } else {
-//            WindTurbineDataInfo updateData = new WindTurbineDataInfo();
-//            updateData.setId(existsData.getId());
-//            updateData.setDataCount(existsData.getDataCount() + dataCount);
-//            windTurbineDataInfoService.updateById(updateData);
-//        }
+    private void updateWindTurbineDataInfoTable(WindTurbine turbine, LocalDateTime queryStartDate, Integer dataCount, JSONObject lastData, String filePath) {
 
         WindTurbineDataInfo data = new WindTurbineDataInfo();
         data.setWindTurbineId(turbine.getId());
         data.setWindTurbineName(turbine.getName());
-        data.setDataTime(queryStartDate);
+        data.setDataTime(DateUtils.toDate(queryStartDate));
         data.setFileStoreId(CommonData.FILE_STORE.getId());
-        data.setReadUri("");
+        data.setReadUri(filePath);
+        data.setLastDataJson(null == lastData ? "" : JSONObject.toJSONString(lastData));
         data.setDataCount(dataCount);
         windTurbineDataInfoService.save(data);
 
@@ -273,11 +263,11 @@ public class DataCollectionService implements IDataCollectionService {
     /**
      * 标准化数据字段映射
      */
-    private List<WindTurbineData> standardizeData(List<JSONObject> originalData, List<Set<String>> standerList, String windTurbineId, JSONObject existsData) {
+    private List<JSONObject> standardizeData(List<JSONObject> originalData, List<Set<String>> standerList, String windTurbineId, JSONObject existsData) {
         if (null == existsData) {
             existsData = new JSONObject();
         }
-        List<WindTurbineData> standardizedData = new ArrayList<>();
+        List<JSONObject> standardizedData = new ArrayList<>();
         for (JSONObject item : originalData) {
             JSONObject standardizedItem = new JSONObject();
 
@@ -313,8 +303,9 @@ public class DataCollectionService implements IDataCollectionService {
                     }
                 }
             }
-            standardizedItem.put("wind_turbine_id", windTurbineId);
-            standardizedData.add(JSONObject.toJavaObject(standardizedItem, WindTurbineData.class));
+            standardizedItem.put("turbine_id", windTurbineId);
+            standardizedItem.put("localtime", standardizedItem.getString("localtime"));
+            standardizedData.add(standardizedItem);
         }
 
         return standardizedData;
@@ -340,7 +331,7 @@ public class DataCollectionService implements IDataCollectionService {
     private String buildFilePath(WindTurbine turbine, LocalDateTime date) {
         String basePath = CommonData.FILE_STORE != null ? CommonData.FILE_STORE.getSavaDir() : "/data/wind-turbine";
         return String.format("%s/%s/%s/%s/%s.%s", basePath, turbine.getType(), turbine.getWindFarmId(),
-                date.format(DATE_FORMATTER_WITH_NUMBER), turbine.getId(), saveCsv ? "csv" : "parquet");
+                date.format(DATE_FORMATTER_WITH_NUMBER), turbine.getId(), "parquet");
     }
 
     /**
@@ -369,4 +360,44 @@ public class DataCollectionService implements IDataCollectionService {
             throw new RuntimeException("目录创建失败", e);
         }
     }
+
+    private byte[] convertListToCsv(List<JSONObject> dataList, List<String> pointsList) {
+        // 边界处理:空列表直接返回空字节数组
+        if (dataList == null || dataList.isEmpty()) {
+            return new byte[0];
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             OutputStreamWriter osw = new OutputStreamWriter(baos, StandardCharsets.UTF_8)) {
+            // 步骤1:提取CSV表头(取第一个JSONObject的所有key作为表头,保证字段顺序一致)
+            JSONObject firstObj = dataList.get(0);
+            Set<String> headerKeys = firstObj.keySet();
+            headerKeys.addAll(pointsList);
+
+            List<String> headers = new ArrayList<>(headerKeys);
+
+            // 步骤2:写入CSV表头(字段之间用逗号分隔,末尾换行)
+            osw.write(String.join(",", headers));
+            osw.write("\n");
+
+            // 步骤3:遍历数据列表,写入CSV数据行
+            for (JSONObject jsonObj : dataList) {
+                List<String> dataValues = new ArrayList<>();
+                for (String header : headers) {
+                    String value = jsonObj.getString(header);
+
+                    dataValues.add(null == value ? "" : value);
+                }
+                // 写入一行数据,末尾换行
+                osw.write(String.join(",", dataValues));
+                osw.write("\n");
+            }
+
+            osw.flush();
+
+            return baos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException("转换List<JSONObject>到CSV失败", e);
+        }
+    }
 }

+ 398 - 0
src/main/java/com/znzn/project/dc/dtdata/service/impl/DataCollectionService_bak.java

@@ -0,0 +1,398 @@
+//package com.znzn.project.dc.dtdata.service.impl;
+//
+//import com.alibaba.fastjson.JSONObject;
+//import com.znzn.project.dc.dtdata.common.CommonData;
+//import com.znzn.project.dc.dtdata.entity.*;
+//import com.znzn.project.dc.dtdata.inOutEntity.CollectDataInData;
+//import com.znzn.project.dc.dtdata.inOutEntity.CollectDataWithFileInData;
+//import com.znzn.project.dc.dtdata.service.*;
+//import com.znzn.project.dc.dtdata.utils.enos.DateUtils;
+//import com.znzn.project.dc.dtdata.utils.enos.EnosapiUtils;
+//import jakarta.annotation.Resource;
+//import lombok.SneakyThrows;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.scheduling.annotation.Async;
+//import org.springframework.stereotype.Service;
+//
+//import java.io.ByteArrayOutputStream;
+//import java.io.File;
+//import java.io.IOException;
+//import java.io.OutputStreamWriter;
+//import java.nio.charset.StandardCharsets;
+//import java.nio.file.Files;
+//import java.time.LocalDateTime;
+//import java.time.format.DateTimeFormatter;
+//import java.util.*;
+//import java.util.concurrent.CompletableFuture;
+//import java.util.concurrent.ExecutorService;
+//import java.util.concurrent.Executors;
+//
+//@Slf4j
+//@Service
+//public class DataCollectionService_bak implements IDataCollectionService {
+//
+//    // 日期格式化
+//    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+//    private static final DateTimeFormatter DATE_FORMATTER_WITH_NUMBER = DateTimeFormatter.ofPattern("yyyyMMdd");
+//
+//    private final ExecutorService executorService;
+//
+//    @Resource
+//    private IExecErrorService execErrorService;
+//    @Resource
+//    private IExecDataDateService execDataDateService;
+//    @Resource
+//    private IWindTurbineDataInfoService windTurbineDataInfoService;
+//    @Resource
+//    private IWindTurbineDataService windTurbineDataService;
+//
+//    private static final Boolean saveCsv = true;
+//
+//
+//    public DataCollectionService_bak() {
+//        int coreCount = Runtime.getRuntime().availableProcessors();
+//        int threadPoolCount = Math.min(coreCount * 2, 6);
+//        this.executorService = Executors.newFixedThreadPool(threadPoolCount);
+//        log.info("初始化数据收集服务,线程池大小: {}", threadPoolCount);
+//    }
+//
+//    @Override
+//    @Async
+//    public void collectData(CollectDataInData inData) {
+//
+//        LocalDateTime end = inData.getEndTime();
+//        FileStore fileStore = CommonData.FILE_STORE;
+//        Integer id = fileStore.getId();
+//
+//        // 按日期循环处理
+//        LocalDateTime date = inData.getBeginTime();
+//        while (!date.isAfter(end)) {
+//            LocalDateTime queryEndDate = date.plusMinutes(inData.getMinute());
+//            long begin = System.currentTimeMillis();
+//            log.info("{},开始时间{},结束时间{},时间跨度为{}分钟", fileStore.getName(), date, end, inData.getMinute());
+//            processSingleData(date, queryEndDate, inData);
+//            execDataDateService.finishAndAddNextTime(id, queryEndDate);
+//            date = queryEndDate;
+//            log.info("{},开始时间{},结束时间{},时间跨度为{}分钟,耗时{}", fileStore.getName(), date, end, inData.getMinute(),
+//                    (System.currentTimeMillis() - begin) / 1000);
+//        }
+//    }
+//
+//    @SneakyThrows
+//    @Override
+//    public byte[] collectDataWithFile(CollectDataWithFileInData inData) {
+//        List<JSONObject> list = new ArrayList<>();
+//        for (String turbineId : inData.getWindTurbineIdList()) {
+//            List<JSONObject> datas = EnosapiUtils.queryHistoricalMeasurementPoint(turbineId,
+//                    inData.getBeginTime().format(DATE_FORMATTER), inData.getEndTime().format(DATE_FORMATTER),
+//                    inData.getPointsList(), "1m", true);
+//
+//            if (null != datas && !datas.isEmpty()) {
+//                list.addAll(datas);
+//            }
+//        }
+//        if (list.isEmpty()) {
+//            return new byte[0];
+//        }
+//        return convertListToCsv(list, inData.getPointsList());
+//    }
+//
+//    /**
+//     * 处理单日数据
+//     */
+//    private void processSingleData(LocalDateTime queryBeginDate, LocalDateTime queryEndDate, CollectDataInData inData) {
+//        long logBegin = System.currentTimeMillis();
+//        log.info("开始处理日期: {} 的数据", queryBeginDate);
+//        List<String> windTurbineIdList = inData.getWindTurbineIdList();
+//        FileStore fileStore = CommonData.FILE_STORE;
+//
+//        List<CompletableFuture<Void>> futures = new ArrayList<>();
+//
+//        // 查询已执行的风机
+//        List<String> existsWindturbineList = windTurbineDataInfoService.getByFileStoreIdAndDate(fileStore.getId(), queryBeginDate)
+//                .stream().map(WindTurbineDataInfo::getWindTurbineId).toList();
+//
+//        List<WindTurbine> queryTurbineList = new ArrayList<>();
+//        if (null != windTurbineIdList && !windTurbineIdList.isEmpty()) {
+//            for (String windTurbineId : windTurbineIdList) {
+//                for (WindTurbine turbine : CommonData.NEED_RUN_TURBINE.values()) {
+//                    if (windTurbineId.equals(turbine.getId())) {
+//                        queryTurbineList.add(turbine);
+//                    }
+//                }
+//            }
+//
+//        } else {
+//            queryTurbineList.addAll(CommonData.NEED_RUN_TURBINE.values());
+//        }
+//
+//        // 按风机分组处理
+//        for (WindTurbine turbine : queryTurbineList) {
+//            if (!existsWindturbineList.contains(turbine.getId())) {
+//                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+//                    try {
+//                        HashSet<String> pointIdsWithLogic = CommonData.pointIdsWithLogicMap.get(String.format("%s%s", turbine.getWindFarmId(), turbine.getType()));
+//                        if (!pointIdsWithLogic.isEmpty()) {
+//                            processTurbineData(turbine, queryBeginDate, queryEndDate, pointIdsWithLogic, inData.getExecErrorId());
+//                        } else {
+//                            log.error("[{}]没有配置测点", turbine);
+//                        }
+//                    } catch (Exception e) {
+//                        log.error("处理风机 {} 数据失败: {}", turbine.getId(), e.getMessage(), e);
+//                    }
+//                }, executorService);
+//                futures.add(future);
+//            }
+//        }
+//
+//        // 等待所有任务完成
+//        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+//        log.info("日期: {} 的数据处理完成, 耗时:{} 秒", queryBeginDate, (System.currentTimeMillis() - logBegin) / 1000);
+//    }
+//
+//    private void saveExecError(WindTurbine turbine, LocalDateTime queryStartDate, LocalDateTime queryEndDate, Integer errorCode, String message, Integer execErrorId) {
+//        if (null == execErrorId) {
+//            ExecError execError = new ExecError();
+//            execError.setWindTurbineId(turbine.getId());
+//            execError.setWindTurbineName(turbine.getName());
+//            execError.setStoreId(CommonData.FILE_STORE.getId());
+//            execError.setQueryBegin(LocalDateTime.from(queryStartDate));
+//            execError.setQueryEnd(LocalDateTime.from(queryEndDate));
+//            execError.setErrorCode(errorCode);
+//            execError.setErrorMsg(message);
+//            execErrorService.save(execError);
+//        } else {
+//            ExecError execError = execErrorService.getById(execErrorId);
+//            ExecError updateExecError = new ExecError();
+//            updateExecError.setRetryTimes(execError.getRetryTimes() + 1);
+//            updateExecError.setId(execErrorId);
+//            updateExecError.setErrorCode(errorCode);
+//            execErrorService.updateById(updateExecError);
+//        }
+//    }
+//
+//    /**
+//     * 处理单个风机数据
+//     */
+//    private void processTurbineData(WindTurbine turbine, LocalDateTime queryStartDate, LocalDateTime queryEndDate, HashSet<String>
+//            pointIdsWithLogic, Integer execErrorId) {
+//
+//        long begin = System.currentTimeMillis();
+//        // 查询历史数据
+//        List<JSONObject> historicalData = null;
+//        try {
+//            historicalData = EnosapiUtils.queryHistoricalMeasurementPoint(turbine.getId(), queryStartDate.format(DATE_FORMATTER), queryEndDate.format(DATE_FORMATTER), new ArrayList<>(pointIdsWithLogic), "1m", true);
+//        } catch (Exception e) {
+//            String message = e.getMessage();
+//            saveExecError(turbine, queryStartDate, queryEndDate, 2, message, execErrorId);
+//        }
+//
+//        if (null != historicalData && historicalData.isEmpty()) {
+//            String message = "没有获取到数据";
+//            saveExecError(turbine, queryStartDate, queryEndDate, 1, message, execErrorId);
+//            log.warn("风机 {} 在日期 {} 没有数据", turbine.getId(), queryStartDate);
+//        }
+//
+//        if (historicalData != null && !historicalData.isEmpty()) {
+//            // 标准化字段映射
+//            String windFarmType = String.format("%s%s", turbine.getWindFarmId(), turbine.getType());
+//            List<Set<String>> standerList = CommonData.standerMap.get(windFarmType).values().stream().toList();
+//
+//            // 查询上个时间段完整数据
+//            JSONObject existsData = null;
+//            Map<String, Object> map = windTurbineDataService.getLastHourLatestData(queryStartDate.minusHours(1L).format(DATE_FORMATTER), turbine.getId());
+//            if (null != map) {
+//                existsData = new JSONObject(map);
+//            }
+//
+//            List<WindTurbineData> windTurbineDataList = standardizeData(historicalData, standerList, turbine.getId(), existsData);
+//
+//            windTurbineDataService.saveBatch(windTurbineDataList);
+//
+//            if (null != execErrorId) {
+//                saveExecError(turbine, queryStartDate, queryEndDate, 0, "", execErrorId);
+//                execErrorService.removeById(execErrorId);
+//            }
+//
+//            updateWindTurbineDataTable(turbine, queryStartDate, windTurbineDataList.size());
+//
+//        }
+//        log.info("风场{},风机{},开始时间{},结束时间{},耗时{}", turbine.getName(), turbine.getCode(),
+//                queryStartDate.format(DATE_FORMATTER), queryEndDate.format(DATE_FORMATTER), (System.currentTimeMillis() - begin) / 1000);
+//    }
+//
+//    private void updateWindTurbineDataTable(WindTurbine turbine, LocalDateTime queryStartDate, Integer dataCount) {
+////        LambdaQueryWrapper<WindTurbineDataInfo> wrapper = Wrappers.lambdaQuery(WindTurbineDataInfo.class);
+////        wrapper.eq(WindTurbineDataInfo::getWindTurbineId, turbine.getId())
+////                .eq(WindTurbineDataInfo::getDataTime, queryStartDate);
+////
+////
+////        WindTurbineDataInfo existsData = windTurbineDataInfoService.getOne(wrapper);
+////        if (null == existsData) {
+////            WindTurbineDataInfo data = new WindTurbineDataInfo();
+////            data.setWindTurbineId(turbine.getId());
+////            data.setWindTurbineName(turbine.getName());
+////            data.setDataTime(queryStartDate);
+////            data.setFileStoreId(CommonData.FILE_STORE.getId());
+////            data.setReadUri("");
+////            data.setDataCount(dataCount);
+////            windTurbineDataInfoService.save(data);
+////        } else {
+////            WindTurbineDataInfo updateData = new WindTurbineDataInfo();
+////            updateData.setId(existsData.getId());
+////            updateData.setDataCount(existsData.getDataCount() + dataCount);
+////            windTurbineDataInfoService.updateById(updateData);
+////        }
+//
+//        WindTurbineDataInfo data = new WindTurbineDataInfo();
+//        data.setWindTurbineId(turbine.getId());
+//        data.setWindTurbineName(turbine.getName());
+//        data.setDataTime(DateUtils.toDate(queryStartDate));
+//        data.setFileStoreId(CommonData.FILE_STORE.getId());
+//        data.setReadUri("");
+//        data.setDataCount(dataCount);
+//        windTurbineDataInfoService.save(data);
+//
+//    }
+//
+//    /**
+//     * 标准化数据字段映射
+//     */
+//    private List<WindTurbineData> standardizeData(List<JSONObject> originalData, List<Set<String>> standerList, String windTurbineId, JSONObject existsData) {
+//        if (null == existsData) {
+//            existsData = new JSONObject();
+//        }
+//        List<WindTurbineData> standardizedData = new ArrayList<>();
+//        for (JSONObject item : originalData) {
+//            JSONObject standardizedItem = new JSONObject();
+//
+//            // 遍历原始数据的每个字段
+//            for (Map.Entry<String, Object> entry : item.entrySet()) {
+//                String originalKey = entry.getKey();
+//                Object value = entry.getValue();
+//
+//                // 查找标准化映射
+//                Set<String> standardizedKeys = findStandardizedKey(originalKey);
+//                if (standardizedKeys != null) {
+//                    for (String standardizedKey : standardizedKeys) {
+//                        if (null == value) {
+//                            value = existsData.get(standardizedKey);
+//                        } else {
+//                            existsData.put(standardizedKey, value);
+//                        }
+//
+//                        standardizedItem.put(standardizedKey, value);
+//                    }
+//                }
+////                else {
+////                    // 如果没有映射关系,保留原字段名
+////                    standardizedItem.put(originalKey, value);
+////                    log.warn("{}获取到无法映射字段{}", item.getOrDefault("mdmId", "未知"), originalKey);
+////                }
+//            }
+//
+//            for (Set<String> keys : standerList) {
+//                for (String key : keys) {
+//                    if (!standardizedItem.containsKey(key)) {
+//                        standardizedItem.put(key, "");
+//                    }
+//                }
+//            }
+//            standardizedItem.put("turbine_id", windTurbineId);
+//            standardizedItem.put("data_time", standardizedItem.getString("localtime"));
+//            standardizedData.add(JSONObject.toJavaObject(standardizedItem, WindTurbineData.class));
+//        }
+//
+//        return standardizedData;
+//    }
+//
+//    /**
+//     * 查找标准化字段名
+//     */
+//    private Set<String> findStandardizedKey(String originalKey) {
+//        // 遍历所有标准化映射字典
+//        for (Map<String, Set<String>> mapping : CommonData.standerMap.values()) {
+//            if (mapping.containsKey(originalKey)) {
+//                return mapping.get(originalKey);
+//            }
+//        }
+//        return null;
+//    }
+//
+//    /**
+//     * 构建文件路径
+//     */
+//
+//    private String buildFilePath(WindTurbine turbine, LocalDateTime date) {
+//        String basePath = CommonData.FILE_STORE != null ? CommonData.FILE_STORE.getSavaDir() : "/data/wind-turbine";
+//        return String.format("%s/%s/%s/%s/%s.%s", basePath, turbine.getType(), turbine.getWindFarmId(),
+//                date.format(DATE_FORMATTER_WITH_NUMBER), turbine.getId(), saveCsv ? "csv" : "parquet");
+//    }
+//
+//    /**
+//     * 创建目录
+//     */
+//    private void createDirectories(String filePath) {
+//        createDirectories(filePath, 1);
+//    }
+//
+//    /**
+//     * 创建目录
+//     */
+//    private void createDirectories(String filePath, int retryTimes) {
+//        try {
+//            File file = new File(filePath);
+//            File parentDir = file.getParentFile();
+//            if (!parentDir.exists()) {
+//                Files.createDirectories(parentDir.toPath());
+//            }
+//        } catch (IOException e) {
+//            log.error("创建目录失败: {}, 次数: {}", filePath, retryTimes, e);
+//            if (retryTimes < 6) {
+//                createDirectories(filePath, retryTimes + 1);
+//                return;
+//            }
+//            throw new RuntimeException("目录创建失败", e);
+//        }
+//    }
+//
+//    private byte[] convertListToCsv(List<JSONObject> dataList, List<String> pointsList) {
+//        // 边界处理:空列表直接返回空字节数组
+//        if (dataList == null || dataList.isEmpty()) {
+//            return new byte[0];
+//        }
+//
+//        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+//             OutputStreamWriter osw = new OutputStreamWriter(baos, StandardCharsets.UTF_8)) {
+//            // 步骤1:提取CSV表头(取第一个JSONObject的所有key作为表头,保证字段顺序一致)
+//            JSONObject firstObj = dataList.get(0);
+//            Set<String> headerKeys = firstObj.keySet();
+//            headerKeys.addAll(pointsList);
+//
+//            List<String> headers = new ArrayList<>(headerKeys);
+//
+//            // 步骤2:写入CSV表头(字段之间用逗号分隔,末尾换行)
+//            osw.write(String.join(",", headers));
+//            osw.write("\n");
+//
+//            // 步骤3:遍历数据列表,写入CSV数据行
+//            for (JSONObject jsonObj : dataList) {
+//                List<String> dataValues = new ArrayList<>();
+//                for (String header : headers) {
+//                    String value = jsonObj.getString(header);
+//
+//                    dataValues.add(null == value ? "" : value);
+//                }
+//                // 写入一行数据,末尾换行
+//                osw.write(String.join(",", dataValues));
+//                osw.write("\n");
+//            }
+//
+//            osw.flush();
+//
+//            return baos.toByteArray();
+//        } catch (Exception e) {
+//            throw new RuntimeException("转换List<JSONObject>到CSV失败", e);
+//        }
+//    }
+//}

+ 2 - 2
src/main/java/com/znzn/project/dc/dtdata/service/impl/ExecErrorServiceImpl.java

@@ -1,14 +1,14 @@
 package com.znzn.project.dc.dtdata.service.impl;
 
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.znzn.project.dc.dtdata.entity.ExecError;
 import com.znzn.project.dc.dtdata.mapper.ExecErrorMapper;
 import com.znzn.project.dc.dtdata.service.IExecErrorService;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import org.springframework.stereotype.Service;
 
 /**
  * <p>
- *  服务实现类
+ * 服务实现类
  * </p>
  *
  * @author 魏志亮

+ 190 - 0
src/main/java/com/znzn/project/dc/dtdata/service/impl/InitServiceImpl.java

@@ -0,0 +1,190 @@
+package com.znzn.project.dc.dtdata.service.impl;
+
+import com.znzn.project.dc.dtdata.common.CommonData;
+import com.znzn.project.dc.dtdata.entity.FileStore;
+import com.znzn.project.dc.dtdata.entity.WindFarmTables;
+import com.znzn.project.dc.dtdata.entity.WindPoints;
+import com.znzn.project.dc.dtdata.entity.WindTurbine;
+import com.znzn.project.dc.dtdata.service.*;
+import com.znzn.project.dc.dtdata.utils.enos.EnosapiUtils;
+import com.znzn.project.dc.dtdata.utils.enos.entity.AccessibleAsset;
+import jakarta.annotation.Resource;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Service
+@Slf4j
+public class InitServiceImpl implements IInitService {
+
+    private static final String FILE_STORE_ID_ENV_VAR = "FILE_STORE_ID";
+    @Resource
+    private IWindPointsService windPointsService;
+    @Resource
+    private IFileStoreService fileStoreService;
+    @Resource
+    private IWindTurbineService windTurbineService;
+    @Resource
+    private Environment environment;
+    @Resource
+    private IWindFarmService windFarmService;
+    @Resource
+    private IWindFarmTablesService windFarmTablesService;
+
+    @Override
+    @SneakyThrows
+    public void init() {
+        processFileStoreData();
+
+        if ("1".equals(environment.getProperty("NEED_RESTORE"))) {
+            //删除风机信息
+            log.info("开始删除风机数据");
+            windTurbineService.deleteByFileStoreId(CommonData.FILE_STORE.getId());
+            //重新写入风机信息
+            log.info("开始写入风机数据");
+            rereadWindTurbineTable();
+        }
+        log.info("写入风机数据完成");
+        // 应用启动时预加载所有数据
+        processWindPointsData();
+        log.info("风场测点数据初始化完成,需要执行的风机数为{}", CommonData.NEED_RUN_TURBINE.size());
+
+        log.info("开始查询风场机型表名映射");
+        processWindFarmTables();
+        log.info("查询风场机型表名映射完成,风场+机型数量为{}", CommonData.TABLE_MAP.size());
+    }
+
+    private void processWindFarmTables() {
+
+        List<WindFarmTables> tables = windFarmTablesService.list();
+        Map<String, String> tableMaps = tables.stream().collect(Collectors.toMap(data -> String.format("%s%s", data.getWindFarmId(), data.getType()), WindFarmTables::getTableName));
+        CommonData.TABLE_MAP.putAll(tableMaps);
+    }
+
+    @SneakyThrows
+    private void rereadWindTurbineTable() {
+        List<String> farmTypeList = windFarmService.getByFileStoreId(CommonData.FILE_STORE.getId()).stream().map(data -> String.join("--", data.getId(), data.getType())).toList();
+        log.info("本机风场+机型数量:{}", farmTypeList.size());
+        List<AccessibleAsset> assetList;
+        try {
+            assetList = EnosapiUtils.queryAccessibleAsset("EnOS_Wind_Turbine");
+        } catch (Exception e) {
+            Thread.sleep(200);
+            assetList = EnosapiUtils.queryAccessibleAsset("EnOS_Wind_Turbine");
+        }
+        if (assetList == null) {
+            System.exit(0);
+        }
+        List<WindTurbine> windTurbineList = new ArrayList<>();
+        for (AccessibleAsset asset : assetList) {
+            String key = String.join("--", asset.getAttributes().getParentId(), asset.getAttributes().getTurbineTypeID());
+            if (farmTypeList.contains(key)) {
+                windTurbineList.add(getWindTurbine(asset));
+            }
+        }
+        if (windTurbineList.isEmpty()) {
+            Thread.sleep(100);
+            rereadWindTurbineTable();
+        }
+        windTurbineService.saveBatch(windTurbineList, 5000);
+    }
+
+    private static WindTurbine getWindTurbine(AccessibleAsset asset) {
+        WindTurbine windTurbine = new WindTurbine();
+        windTurbine.setId(asset.getMdmId());
+        windTurbine.setCode(asset.getAttributes().getScadaName());
+        windTurbine.setName(asset.getAttributes().getName());
+        windTurbine.setType(asset.getAttributes().getTurbineTypeID());
+        windTurbine.setWindFarmId(asset.getAttributes().getParentId());
+        windTurbine.setPlcVersion(asset.getAttributes().getPLCVersion());
+        windTurbine.setTypeName(asset.getAttributes().getManufacturer());
+        return windTurbine;
+    }
+
+    private void processFileStoreData() throws Exception {
+        FileStore fileStore = fileStoreService.getById(getFileStoreIdFromEnv());
+        log.info("获取到本机的存储配置{}", fileStore.toString());
+        CommonData.FILE_STORE = fileStore;
+    }
+
+    private void processWindPointsData() {
+        windTurbineService.getByFileStoreId(CommonData.FILE_STORE.getId()).forEach(windTurbine -> CommonData.NEED_RUN_TURBINE.put(windTurbine.getId(), windTurbine));
+        log.info("本机需要运行的风机数量{}", CommonData.NEED_RUN_TURBINE.size());
+        Map<String, HashSet<WindPoints>> groupedByKey = new HashMap<>();
+        Map<String, HashSet<String>> pointIdsMap = new HashMap<>();
+        Map<String, Map<String, Set<String>>> standerMaps = new HashMap<>();
+
+        List<WindPoints> windPointsList = windPointsService.list();
+        for (WindPoints data : windPointsList) {
+            // 生成分组key
+            String key = data.getWindFarmId() + data.getWindType();
+
+            if (!groupedByKey.containsKey(key)) {
+                groupedByKey.put(key, new HashSet<>());
+            }
+            groupedByKey.get(key).add(data);
+
+            if (!pointIdsMap.containsKey(key)) {
+                pointIdsMap.put(key, new HashSet<>());
+            }
+            String pointId = String.format("%s(%s)",
+                    data.getUseTimeAggMethods(),
+                    data.getPointNameEn());
+            pointIdsMap.get(key).add(pointId);
+
+            if (!standerMaps.containsKey(key)) {
+                standerMaps.put(key, new HashMap<>());
+                Set<String> set = new HashSet<>();
+                set.add("localtime");
+                standerMaps.get(key).put("localtime", set);
+            }
+
+            String pointKey = String.format("%s(%s)",
+                    data.getUseTimeAggMethods(),
+                    data.getPointNameEn());
+            Map<String, Set<String>> innerMap = standerMaps.get(key);
+
+            if (!innerMap.containsKey(pointKey)) {
+                Set<String> set = new HashSet<>();
+                set.add(data.getStanderNameEn());
+                innerMap.put(pointKey, set);
+            } else {
+                Set<String> set = innerMap.get(pointKey);
+                set.add(data.getStanderNameEn());
+            }
+        }
+
+        CommonData.pointIdsWithLogicMap.putAll(pointIdsMap);
+        CommonData.standerMap.putAll(standerMaps);
+    }
+
+    private String getFileStoreIdFromEnv() throws Exception {
+
+        log.info("从系统环境变量获取 FILE_STORE_ID: {}, 从Spring Environment获取 FILE_STORE_ID: {}", System.getenv(FILE_STORE_ID_ENV_VAR), environment.getProperty(FILE_STORE_ID_ENV_VAR));
+//
+//        if (1 == 1) {
+//            return "1";
+//        }
+
+        String fileStoreId = System.getenv(FILE_STORE_ID_ENV_VAR);
+        if (StringUtils.isNotBlank(fileStoreId)) {
+            log.info("从系统环境变量获取 FILE_STORE_ID: {}", fileStoreId);
+            return fileStoreId.trim();
+        }
+
+        fileStoreId = environment.getProperty(FILE_STORE_ID_ENV_VAR);
+        if (StringUtils.isNotBlank(fileStoreId)) {
+            log.info("从Spring Environment获取 FILE_STORE_ID: {}", fileStoreId);
+            return fileStoreId.trim();
+        }
+
+        log.warn("所有方式都未获取到有效的 FILE_STORE_ID");
+        throw new Exception("所有方式都未获取到有效的 FILE_STORE_ID");
+    }
+
+}

+ 3 - 2
src/main/java/com/znzn/project/dc/dtdata/service/impl/WindPointsServiceImpl.java

@@ -11,6 +11,7 @@ import com.znzn.project.dc.dtdata.service.IWindPointsService;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -42,9 +43,9 @@ public class WindPointsServiceImpl extends ServiceImpl<WindPointsMapper, WindPoi
 
 
     @Override
-        public List<String> getPointIdsWithLogic(String windFarmId, String windType) {
+    public List<String> getPointIdsWithLogic(String windFarmId, String windType) {
         String key = windFarmId + windType;
-        return CommonData.pointIdsWithLogicMap.get(key);
+        return new ArrayList<>(CommonData.pointIdsWithLogicMap.get(key));
     }
 
 }

+ 78 - 0
src/main/java/com/znzn/project/dc/dtdata/service/impl/WindTurbineDataInfoServiceImpl.java

@@ -0,0 +1,78 @@
+package com.znzn.project.dc.dtdata.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.znzn.project.dc.dtdata.entity.WindTurbine;
+import com.znzn.project.dc.dtdata.entity.WindTurbineDataInfo;
+import com.znzn.project.dc.dtdata.mapper.WindTurbineDataInfoMapper;
+import com.znzn.project.dc.dtdata.service.IWindPointsService;
+import com.znzn.project.dc.dtdata.service.IWindTurbineDataInfoService;
+import com.znzn.project.dc.dtdata.service.IWindTurbineService;
+import com.znzn.project.dc.dtdata.utils.enos.EnosapiUtils;
+import jakarta.annotation.Resource;
+import lombok.SneakyThrows;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Files;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * <p>
+ * 服务实现类
+ * </p>
+ *
+ * @author 魏志亮
+ * @since 2025-11-10
+ */
+@Service
+public class WindTurbineDataInfoServiceImpl extends ServiceImpl<WindTurbineDataInfoMapper, WindTurbineDataInfo> implements IWindTurbineDataInfoService {
+
+    @Resource
+    private IWindTurbineService windTurbineService;
+    @Resource
+    private IWindPointsService windPointsService;
+
+    @SneakyThrows
+    @Override
+    @Async
+    public void getEveryWindAndTypeData() {
+        List<WindTurbine> windTurbineList = windTurbineService.getWindTurbineByGroupData();
+        for (WindTurbine windTurbine : windTurbineList) {
+            String mdmId = windTurbine.getId();
+            List<String> logicPoints = windPointsService.getPointIdsWithLogic(windTurbine.getWindFarmId(), windTurbine.getType());
+            List<JSONObject> datas = EnosapiUtils.queryHistoricalMeasurementPoint(mdmId, "2025-11-01 00:00:00", "2025-11-02 00:00:00", logicPoints, "1m", true);
+            System.out.println(mdmId + "  " + datas.size());
+            File file = new File("接口返回数据20251101" + File.separator + mdmId + ".json");
+            if (!file.getParentFile().exists()) {
+                Files.createFile(file.getParentFile().toPath());
+            }
+            FileWriter fileWriter = new FileWriter(file);
+            fileWriter.write(JSON.toJSONString(datas));
+            fileWriter.flush();
+            fileWriter.close();
+        }
+    }
+
+    @Override
+    public List<WindTurbineDataInfo> getByFileStoreIdAndDate(Integer fileStoreId, LocalDateTime queryBeginDate) {
+        LambdaQueryWrapper<WindTurbineDataInfo> queryWrapper = Wrappers.lambdaQuery(WindTurbineDataInfo.class);
+        queryWrapper.eq(WindTurbineDataInfo::getFileStoreId, fileStoreId).eq(WindTurbineDataInfo::getDataTime, queryBeginDate);
+        return list(queryWrapper);
+    }
+
+    @Override
+    public WindTurbineDataInfo getLastJsonData(LocalDateTime dataTime, String turbineId) {
+        LambdaQueryWrapper<WindTurbineDataInfo> queryWrapper = Wrappers.lambdaQuery(WindTurbineDataInfo.class);
+        queryWrapper.eq(WindTurbineDataInfo::getWindTurbineId, turbineId)
+                .eq(WindTurbineDataInfo::getDataTime, dataTime);
+
+        return getOne(queryWrapper);
+    }
+}

+ 2 - 2
src/main/java/com/znzn/project/dc/dtdata/service/impl/WindTurbineDataServiceImpl.java

@@ -20,7 +20,7 @@ import java.util.Map;
 public class WindTurbineDataServiceImpl extends ServiceImpl<WindTurbineDataMapper, WindTurbineData> implements IWindTurbineDataService {
 
     @Override
-    public Map<String, Object> getLastHourLatestData(String format, String windFarmId, String id) {
-        return getBaseMapper().getLastHourLatestData(format, windFarmId, id);
+    public Map<String, Object> getLastHourLatestData(String format, String id) {
+        return getBaseMapper().getLastHourLatestData(format, id);
     }
 }

+ 26 - 0
src/main/java/com/znzn/project/dc/dtdata/utils/enos/DateUtils.java

@@ -0,0 +1,26 @@
+package com.znzn.project.dc.dtdata.utils.enos;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public class DateUtils {
+
+
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    public static final DateTimeFormatter DATE_FORMATTER_WITH_ZERO = DateTimeFormatter.ofPattern("yyyy-MM-dd 00:00:00");
+
+    public static final DateTimeFormatter DATE_FORMATTER_WITH_NUMBER = DateTimeFormatter.ofPattern("yyyyMMdd");
+
+
+    public static Date toDate(LocalDateTime localDateTime) {
+        return Date.from(
+                localDateTime.atZone(ZoneId.of("Asia/Shanghai"))
+                        .toInstant());
+    }
+
+    public static LocalDateTime toLocalDateTime(Date date) {
+        return LocalDateTime.ofInstant(date.toInstant(), ZoneId.of("Asia/Shanghai"));
+    }
+}

+ 4 - 0
src/main/java/com/znzn/project/dc/dtdata/utils/enos/EnosapiUtils.java

@@ -230,6 +230,10 @@ public class EnosapiUtils {
 
             String message = getMessage(response);
 
+            if (response != null && null != response.getData() && null != response.getData().getItems() && response.getData().getItems().isEmpty()) {
+                return Collections.emptyList();
+            }
+
             try {
                 Thread.sleep(50);
             } catch (InterruptedException ignored) {

+ 0 - 103
src/main/java/com/znzn/project/dc/dtdata/utils/enos/JsonToParquetUtils.java

@@ -1,103 +0,0 @@
-package com.znzn.project.dc.dtdata.utils.enos;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-
-public class JsonToParquetUtils {
-
-    /**
-     * 将映射后的JSON数据保存为Parquet文件
-     */
-    public static void saveAsParquet(
-            List<JSONObject> mappedData,
-            List<String> standardKeys,
-            String outputPath) throws IOException {
-
-        if (mappedData == null || mappedData.isEmpty()) {
-            System.out.println("警告: 没有数据需要保存");
-            return;
-        }
-
-        // 如果文件存在则删除
-        File file = new File(outputPath);
-        Files.deleteIfExists(file.toPath());
-
-        // 创建Avro Schema
-        Schema schema = createAvroSchema(standardKeys);
-
-        // 创建Parquet写入器
-        try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(outputPath))
-                .withSchema(schema)
-                .withCompressionCodec(CompressionCodecName.SNAPPY)
-                .build()) {
-
-            // 将JSON数据转换为Avro记录并写入
-            for (JSONObject jsonObj : mappedData) {
-                GenericRecord record = convertToAvroRecord(jsonObj, schema, standardKeys);
-                writer.write(record);
-            }
-
-            System.out.println("成功保存Parquet文件: " + outputPath);
-            System.out.println("总记录数: " + mappedData.size());
-        }
-    }
-
-    /**
-     * 根据标准key创建Avro Schema
-     */
-    private static Schema createAvroSchema(List<String> standardKeys) {
-        StringBuilder schemaBuilder = new StringBuilder();
-        schemaBuilder.append("{\n");
-        schemaBuilder.append("  \"type\": \"record\",\n");
-        schemaBuilder.append("  \"name\": \"StandardRecord\",\n");
-        schemaBuilder.append("  \"namespace\": \"com.example.parquet\",\n");
-        schemaBuilder.append("  \"fields\": [\n");
-
-        for (int i = 0; i < standardKeys.size(); i++) {
-            String fieldName = standardKeys.get(i);
-            schemaBuilder.append("    {\n");
-            schemaBuilder.append("      \"name\": \"").append(fieldName).append("\",\n");
-            schemaBuilder.append("      \"type\": [\"string\", \"null\"]\n"); // 所有字段都设为string类型,可为null
-            if (i < standardKeys.size() - 1) {
-                schemaBuilder.append("    },\n");
-            } else {
-                schemaBuilder.append("    }\n");
-            }
-        }
-
-        schemaBuilder.append("  ]\n");
-        schemaBuilder.append("}");
-
-        return new Schema.Parser().parse(schemaBuilder.toString());
-    }
-
-    /**
-     * 将JSONObject转换为Avro GenericRecord
-     */
-    private static GenericRecord convertToAvroRecord(JSONObject jsonObj, Schema schema, List<String> standardKeys) {
-        GenericRecord record = new GenericData.Record(schema);
-
-        for (String key : standardKeys) {
-            Object value = jsonObj.get(key);
-            if (value != null) {
-                // 将所有值转换为字符串
-                record.put(key, value.toString());
-            } else {
-                record.put(key, null);
-            }
-        }
-
-        return record;
-    }
-
-}

+ 186 - 0
src/main/java/com/znzn/project/dc/dtdata/utils/enos/ParquetUtils.java

@@ -0,0 +1,186 @@
+package com.znzn.project.dc.dtdata.utils.enos;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class ParquetUtils {
+
+    /**
+     * 将映射后的JSON数据保存为Parquet文件
+     */
+    public static void saveAsParquet(
+            List<JSONObject> mappedData,
+            List<String> standardKeys,
+            String outputPath) throws IOException {
+
+        if (mappedData == null || mappedData.isEmpty()) {
+            System.out.println("警告: 没有数据需要保存");
+            return;
+        }
+
+        // 如果文件存在则删除
+        File file = new File(outputPath);
+        Files.deleteIfExists(file.toPath());
+
+        // 创建Avro Schema
+        Schema schema = createAvroSchema(standardKeys);
+
+        // 创建Parquet写入器
+        try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(outputPath))
+                .withSchema(schema)
+                .withCompressionCodec(CompressionCodecName.SNAPPY)
+                .build()) {
+
+            // 将JSON数据转换为Avro记录并写入
+            for (JSONObject jsonObj : mappedData) {
+                GenericRecord record = convertToAvroRecord(jsonObj, schema, standardKeys);
+                writer.write(record);
+            }
+
+            System.out.println("成功保存Parquet文件: " + outputPath);
+            System.out.println("总记录数: " + mappedData.size());
+        }
+    }
+
+    /**
+     * 根据标准key创建Avro Schema
+     */
+    private static Schema createAvroSchema(List<String> standardKeys) {
+        StringBuilder schemaBuilder = new StringBuilder();
+        schemaBuilder.append("{\n");
+        schemaBuilder.append("  \"type\": \"record\",\n");
+        schemaBuilder.append("  \"name\": \"StandardRecord\",\n");
+        schemaBuilder.append("  \"namespace\": \"com.example.parquet\",\n");
+        schemaBuilder.append("  \"fields\": [\n");
+
+        for (int i = 0; i < standardKeys.size(); i++) {
+            String fieldName = standardKeys.get(i);
+            schemaBuilder.append("    {\n");
+            schemaBuilder.append("      \"name\": \"").append(fieldName).append("\",\n");
+            schemaBuilder.append("      \"type\": [\"string\", \"null\"]\n"); // 所有字段都设为string类型,可为null
+            if (i < standardKeys.size() - 1) {
+                schemaBuilder.append("    },\n");
+            } else {
+                schemaBuilder.append("    }\n");
+            }
+        }
+
+        schemaBuilder.append("  ]\n");
+        schemaBuilder.append("}");
+
+        return new Schema.Parser().parse(schemaBuilder.toString());
+    }
+
+    /**
+     * 将JSONObject转换为Avro GenericRecord
+     */
+    private static GenericRecord convertToAvroRecord(JSONObject jsonObj, Schema schema, List<String> standardKeys) {
+        GenericRecord record = new GenericData.Record(schema);
+
+        for (String key : standardKeys) {
+            Object value = jsonObj.get(key);
+            if (value != null) {
+                // 将所有值转换为字符串
+                record.put(key, value.toString());
+            } else {
+                record.put(key, null);
+            }
+        }
+
+        return record;
+    }
+
+
+    // 全局Jackson对象映射器(可复用,提升性能)
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * 读取Parquet文件最后一行并转换为JSON字符串
+     *
+     * @param parquetFilePath Parquet文件本地路径(如:D:/data/test.parquet)
+     * @return 最后一行数据的JSON字符串
+     * @throws IOException 读取文件或转换JSON时的异常
+     */
+    public static JSONObject getParquetLastLineAsJson(String parquetFilePath) throws IOException {
+        // 1. 初始化Hadoop配置 - 适配本地文件系统(关键:支持file://协议)
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", "file:///");
+        Path parquetPath = new Path(parquetFilePath);
+        HadoopInputFile inputFile = HadoopInputFile.fromPath(parquetPath, conf);
+
+        // 2. 构建Avro Parquet读取器(Parquet文件多基于Avro序列化)
+        try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
+                .withConf(conf)
+                .build()) {
+
+            // 3. 遍历Parquet文件,记录最后一行数据(无需缓存所有行,节省内存)
+            GenericRecord currentRecord;
+            GenericRecord lastRecord = null;
+            while ((currentRecord = reader.read()) != null) {
+                lastRecord = currentRecord; // 不断覆盖,最终保留最后一行
+            }
+
+            // 4. 校验是否存在有效数据
+            if (lastRecord == null) {
+                return new JSONObject(); // 无数据时返回空JSON对象
+            }
+
+            // 5. 将GenericRecord转换为Map(方便Jackson转换为JSON)
+            Map<String, Object> dataMap = convertGenericRecordToMap(lastRecord);
+
+            // 6. Jackson将Map转换为格式化的JSON字符串
+            return new JSONObject(dataMap);
+        }
+    }
+
+    /**
+     * 将Avro GenericRecord转换为Java Map
+     *
+     * @param genericRecord Avro记录对象
+     * @return 键值对Map
+     */
+    private static Map<String, Object> convertGenericRecordToMap(GenericRecord genericRecord) {
+        Map<String, Object> resultMap = new HashMap<>();
+        genericRecord.getSchema().getFields().forEach(field -> {
+            String fieldName = field.name();
+            Object fieldValue = genericRecord.get(fieldName);
+            resultMap.put(fieldName, fieldValue);
+        });
+        return resultMap;
+    }
+
+    // 测试主方法
+    public static void main(String[] args) {
+        try {
+            // 替换为你的Parquet文件本地路径
+            String parquetPath = "C:\\jupyter-lab\\大唐25年科研\\files\\大唐验证数据\\CCWE1500-82.DF\\内蒙塔林花风电场-20251015.parquet";
+            long begin = System.currentTimeMillis();
+            JSONObject lastLineJson = getParquetLastLineAsJson(parquetPath);
+            System.out.println(System.currentTimeMillis() - begin);
+            System.out.println("Parquet文件最后一行数据(JSON格式):");
+            System.out.println(lastLineJson);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 2 - 2
src/main/resources/application.properties

@@ -11,8 +11,8 @@ logging.logback.rollingpolicy.max-history=90
 logging.logback.rollingpolicy.max-file-size=30MB
 logging.logback.rollingpolicy.total-size-cap=3GB
 # MySQL
-#spring.datasource.url=jdbc:mysql://10.172.12.213:4000/datang?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
-spring.datasource.url=jdbc:mysql://192.168.50.235:30306/datang?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+spring.datasource.url=jdbc:mysql://10.172.12.213:4000/datang?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+#spring.datasource.url=jdbc:mysql://192.168.50.235:30306/datang?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
 spring.datasource.username=root
 spring.datasource.password=admin123456
 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

+ 5 - 0
src/main/resources/mapper/WindTurbineDataInfoMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.znzn.project.dc.dtdata.mapper.WindTurbineDataInfoMapper">
+
+</mapper>

+ 1 - 1
src/test/java/com/znzn/project/dc/dtdata/DatangEnosDataApplicationTests.java

@@ -34,7 +34,7 @@ class DatangEnosDataApplicationTests {
         String farmId = "123";
 
         // 执行查询
-        Object result = windTurbineDataMapper.getLastHourLatestData(timestamp, turbineId, farmId);
+        Object result = windTurbineDataMapper.getLastHourLatestData(timestamp, farmId);
 
         // 验证结果
         System.out.println("查询结果: " + result);