Forráskód Böngészése

中广核104解析初始化

wzl 4 hónapja
szülő
commit
91d9c4ca56
10 módosított fájl, 1253 hozzáadás és 6 törlés
  1. 3 1
      .gitignore
  2. 62 1
      README.MD
  3. 1 3
      conf/config.yaml
  4. 255 0
      tmp/104_parse.py
  5. 22 0
      tmp/iec104/ACPI.py
  6. 143 0
      tmp/iec104/ASDU.py
  7. 0 0
      tmp/iec104/__init__.py
  8. 671 0
      tmp/iec104_client.py
  9. 1 1
      utils/db/ConnectMysql.py
  10. 95 0
      说明书.md

+ 3 - 1
.gitignore

@@ -1,2 +1,4 @@
 /.idea/
-*.iml
+*.iml
+__pycache__
+/tmp/

+ 62 - 1
README.MD

@@ -1 +1,62 @@
-### 初始化执行  init_table.py 文件
+# 104规约数据获取
+
+## 项目概述
+本项目用于处理和分析风电场SCADA数据、故障报警数据等。
+
+## 依赖环境
+- Python 3.x
+- 依赖包:见requirements.txt
+
+## 安装
+```bash
+pip install -r requirements.txt
+```
+
+## 项目结构
+```
+conf: 数据库连接配置文件以及,104数据字段映射关系xlsx
+
+data: 数据转化核心功能
+
+service: 数据库相关操作
+
+utils: 自定义的工具类
+```
+
+## 由于正向隔离,需要定时获取文件数据,使用crontab定时任务
+### 创建启动文件
+start.sh
+
+```
+#!/bin/bash
+cd /home/trans/project/energy-online-data
+python3 $1
+```
+
+### 添加自行权限
+```
+chmod +x start.sh
+```
+
+## 定时任务
+### 1.每十分钟执行SCADA数据(路径自己修改)
+```
+0/10 * * * * /home/trans/script/start.sh parse_scada_data.py
+```
+
+### 2.每十分钟执行故障报警数据(路径自己修改)
+```
+0/10 * * * * /home/trans/script/start.sh parse_warn_fault_data.py
+```
+
+### 3.每天一点执行数据转化(路径自己修改)
+```
+0 1 * * * /home/trans/script/start.sh app_run.py
+```
+
+### 4.每月一号一点执行增删分区(按照保存时间,删除历史数据,新增新月份分区)(路径自己修改)
+```
+0 1 1 * * sh /home/trans/script/start.sh add_or_remove_partition.py
+```
+
+

+ 1 - 3
conf/config.yaml

@@ -24,6 +24,4 @@ trans:
 # 日志保存路径
 log_path_dir: /home/trans/project/logs/104_parse
 
-#data_base_dir: /home/trans/data
-
-data_base_dir: C:\Users\wzl\Desktop\中广核104测点\0415新数据
+data_base_dir: /home/trans/data

+ 255 - 0
tmp/104_parse.py

@@ -0,0 +1,255 @@
+import socket
+import struct
+import time
+import traceback
+from enum import Enum
+
+from tmp.iec104.ASDU import ASDU
+
+
+class IEC104Type(Enum):
+    M_SP_NA_1 = 1  # 单点遥信
+    M_DP_NA_1 = 3  # 双点遥信
+    M_ME_NA_1 = 9  # 测量值,规一化值
+    M_ME_NB_1 = 11  # 测量值,标度化值
+    M_ME_NC_1 = 13  # 测量值,短浮点数
+    M_SP_TB_1 = 30  # 带时标单点遥信
+    M_DP_TB_1 = 31  # 带时标双点遥信
+    CUSTOM_151 = 151  # 自定义遥信类型
+
+    @classmethod
+    def get_description(cls, type_id):
+        descriptions = {
+            1: "单点遥信",
+            3: "双点遥信",
+            9: "测量值(规一化)",
+            11: "测量值(标度化)",
+            13: "测量值(短浮点)",
+            30: "带时标单点遥信",
+            31: "带时标双点遥信",
+            151: "自定义遥信"
+        }
+        return descriptions.get(type_id, f"未知类型({type_id})")
+
+
+
+
+class IEC104Client:
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.socket = None
+        self.send_seq = 0
+        self.recv_seq = 0
+        self.max_retries = 3
+        self.retry_interval = 5  # 重试间隔(秒)
+
+    def connect(self):
+        retry_count = 0
+        while retry_count < self.max_retries:
+            try:
+                self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                self.socket.settimeout(10)  # 设置超时时间
+                self.socket.connect((self.host, self.port))
+                self.send_seq = 0
+                self.recv_seq = 0
+                print(f"已连接到 {self.host}:{self.port}")
+                return True
+            except Exception as e:
+                retry_count += 1
+                print(f"连接失败 (尝试 {retry_count}/{self.max_retries}): {e}")
+                if retry_count < self.max_retries:
+                    time.sleep(self.retry_interval)
+        return False
+
+    def disconnect(self):
+        if self.socket:
+            try:
+                self.socket.close()
+            except:
+                pass
+            self.socket = None
+        print("连接已断开")
+
+    def send_startdt(self):
+        try:
+            # 发送STARTDT ACT
+            apci = APCI(4, self.send_seq, self.recv_seq)
+            startdt_act = apci.pack() + b'\x07\x00\x00\x00'  # U格式帧,STARTDT ACT
+            self.socket.send(startdt_act)
+            self.send_seq += 1
+            print("已发送STARTDT激活命令")
+
+            # 设置超时等待STARTDT CON
+            self.socket.settimeout(5)  # 5秒超时
+            try:
+                data = self.socket.recv(1024)
+                if len(data) >= 6 and data[6:10] == b'\x0B\x00\x00\x00':  # STARTDT CON
+                    print("收到STARTDT确认,连接已激活")
+                    return True
+
+                # 如果收到其他响应,可能是服务器拒绝了连接
+                if len(data) > 0:
+                    print(f"收到非预期响应: {data.hex()}")
+                else:
+                    print("未收到STARTDT确认响应")
+                return False
+
+            except socket.timeout:
+                print("等待STARTDT确认超时")
+                return False
+
+        except ConnectionResetError:
+            print("发送STARTDT时连接被重置,服务器可能拒绝了连接")
+            print(traceback.format_exc())
+            return False
+        except Exception as e:
+            print(f"发送STARTDT时发生错误: {e}")
+            print(traceback.format_exc())
+            return False
+
+    def send_general_interrogation(self):
+        try:
+            apci = APCI(14, self.send_seq, self.recv_seq)
+            gi_asdu = struct.pack('!BBHHHBBH',
+                                  100,  # 类型ID 100 = 总召
+                                  0x01,  # VSQ (1个元素)
+                                  0x06,  # COT (激活)
+                                  0x00,  # 公共地址
+                                  0x00,  # 信息对象地址
+                                  0x00,  # 限定词 (QOI)
+                                  0x14,  # 总召限定词 (20)
+                                  0x00)  # 无时标
+            gi_frame = apci.pack() + gi_asdu
+            self.socket.send(gi_frame)
+            self.send_seq += 1
+            print("发送总召命令")
+            return True
+        except Exception as e:
+            print(f"发送总召命令错误: {e}")
+            return False
+
+    def receive_data(self):
+        while True:
+            try:
+                data = self.socket.recv(1024)
+                if not data:
+                    print("连接被远程主机关闭")
+                    break
+
+                apci = APCI.unpack(data[:6])
+                if not apci:
+                    continue
+
+                self.recv_seq = apci.send_seq
+
+                if apci.length == 4:
+                    if data[6:10] == b'\x0B\x00\x00\x00':
+                        print("收到STARTDT CON")
+                    elif data[6:10] == b'\x13\x00\x00\x00':
+                        print("收到STOPDT CON")
+                    continue
+
+                asdu = ASDU.unpack(data[6:6 + apci.length - 4])
+                if not asdu:
+                    continue
+
+                self._process_asdu(asdu)
+
+                if asdu.cot in [1, 3, 5, 7, 9, 11, 20]:
+                    self._send_ack()
+
+            except socket.timeout:
+                print("接收数据超时,发送测试帧保持连接...")
+                self._send_test_frame()
+                continue
+            except ConnectionResetError:
+                print("连接被远程主机重置")
+                break
+            except Exception as e:
+                print(f"接收数据错误: {e}")
+                break
+
+    def _process_asdu(self, asdu):
+        type_desc = IEC104Type.get_description(asdu.type_id)
+        cot_desc = self._get_cot_description(asdu.cot)
+
+        print(f"\n收到ASDU: 类型={type_desc}, 原因={cot_desc}({asdu.cot}), 公共地址={asdu.common_addr}")
+
+        for io in asdu.io_elements:
+            print(f"  点地址: {io['address']}, 数据: {self._format_io_data(asdu.type_id, io['data'])}")
+
+            if 'time' in io['data']:
+                time_data = io['data']['time']
+                print(f"    时间: {time_data['year']}-{time_data['month']:02d}-{time_data['day']:02d} "
+                      f"{time_data['hour']:02d}:{time_data['minute']:02d}:{time_data['milliseconds'] / 1000:.3f}")
+
+    def _format_io_data(self, type_id, data):
+        if type_id in [1, 30, 151]:
+            return f"值={data['value']}, 质量={data['quality']}"
+        elif type_id in [3, 31]:
+            states = ["中间状态", "分", "合", "不确定"]
+            state = states[data['value']] if data['value'] < len(states) else "未知"
+            return f"状态={state}({data['value']}), 质量={data['quality']}"
+        elif type_id in [9, 11, 13]:
+            return f"值={data['value']}, 质量={data['quality']}"
+        else:
+            return str(data)
+
+    def _get_cot_description(self, cot):
+        descriptions = {
+            1: "周期/循环",
+            2: "背景扫描",
+            3: "突发",
+            5: "请求或被请求",
+            6: "激活",
+            7: "激活确认",
+            8: "停止激活",
+            9: "停止激活确认",
+            20: "响应总召"
+        }
+        return descriptions.get(cot, f"未知原因({cot})")
+
+    def _send_ack(self):
+        try:
+            apci = APCI(4, self.send_seq, self.recv_seq)
+            ack_frame = apci.pack() + b'\x00\x00\x00\x00'
+            self.socket.send(ack_frame)
+            self.send_seq += 1
+        except Exception as e:
+            print(f"发送确认帧错误: {e}")
+
+    def _send_test_frame(self):
+        try:
+            apci = APCI(4, self.send_seq, self.recv_seq)
+            test_frame = apci.pack() + b'\x43\x00\x00\x00'  # 测试帧
+            self.socket.send(test_frame)
+            self.send_seq += 1
+        except Exception as e:
+            print(f"发送测试帧错误: {e}")
+
+
+def main():
+    client = IEC104Client("192.168.50.242", 2404)
+
+    try:
+        while True:
+            if client.connect():
+                if client.send_startdt():
+                    if client.send_general_interrogation():
+                        client.receive_data()
+
+            print(f"将在 {client.retry_interval} 秒后尝试重新连接...")
+            time.sleep(client.retry_interval)
+
+    except KeyboardInterrupt:
+        print("\n用户中断")
+    except Exception as e:
+        print(f"发生错误: {e}")
+        print(traceback.format_exc())
+    finally:
+        client.disconnect()
+
+
+if __name__ == "__main__":
+    main()

+ 22 - 0
tmp/iec104/ACPI.py

@@ -0,0 +1,22 @@
+import struct
+
+
+class APCI:
+    START = 0x68
+
+    def __init__(self, length=0, send_seq=0, recv_seq=0):
+        self.length = length
+        self.send_seq = send_seq
+        self.recv_seq = recv_seq
+
+    def pack(self):
+        return struct.pack('!BBHH', self.START, self.length, self.send_seq << 1, self.recv_seq << 1)
+
+    @classmethod
+    def unpack(cls, data):
+        if len(data) < 6 or data[0] != cls.START:
+            return None
+        length = data[1]
+        send_seq = (struct.unpack('!H', data[2:4])[0]) >> 1
+        recv_seq = (struct.unpack('!H', data[4:6])[0]) >> 1
+        return cls(length, send_seq, recv_seq)

+ 143 - 0
tmp/iec104/ASDU.py

@@ -0,0 +1,143 @@
+import struct
+
+
+class ASDU:
+    def __init__(self, type_id, vsq, cot, common_addr, io_elements):
+        self.type_id = type_id
+        self.vsq = vsq
+        self.cot = cot
+        self.common_addr = common_addr
+        self.io_elements = io_elements
+
+    @classmethod
+    def unpack(cls, data):
+        if len(data) < 6:
+            return None
+
+        type_id = data[0]
+        vsq = data[1]
+        cot = struct.unpack('!H', data[2:4])[0]
+        common_addr = struct.unpack('!H', data[4:6])[0]
+
+        io_elements = []
+        data = data[6:]
+
+        num_elements = vsq & 0x7F
+        is_sequence = (vsq & 0x80) != 0
+
+        if is_sequence:
+            if len(data) < 3:
+                return None
+            io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
+            data = data[3:]
+
+            for i in range(num_elements):
+                element_data, data = cls._parse_io_element(type_id, data)
+                io_elements.append({
+                    'address': io_addr + i,
+                    'value': element_data['value'],
+                    'quality': element_data['quality']
+                })
+        else:
+            for _ in range(num_elements):
+                if len(data) < 3:
+                    return None
+                io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
+                data = data[3:]
+
+                element_data, data = cls._parse_io_element(type_id, data)
+                io_elements.append({
+                    'address': io_addr,
+                    'value': element_data['value'],
+                    'quality': element_data['quality']
+                })
+
+        return cls(type_id, vsq, cot, common_addr, io_elements)
+
+    @staticmethod
+    def _parse_io_element(type_id, data):
+        if type_id in [1, 30]:
+            value = data[0] & 0x01
+            quality = data[0] >> 7
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            if type_id == 30:
+                time_data = data[1:8]
+                element_data['time'] = cls._parse_time(time_data)
+                return element_data, data[8:]
+            return element_data, data[1:]
+
+        elif type_id in [3, 31]:
+            value = data[0] & 0x03
+            quality = data[0] >> 7
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            if type_id == 31:
+                time_data = data[1:8]
+                element_data['time'] = cls._parse_time(time_data)
+                return element_data, data[8:]
+            return element_data, data[1:]
+
+        elif type_id == 9:
+            value = struct.unpack('!h', data[:2])[0]
+            quality = data[2] >> 7
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            return element_data, data[3:]
+
+        elif type_id == 11:
+            value = struct.unpack('!h', data[:2])[0]
+            quality = data[2] >> 7
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            return element_data, data[3:]
+
+        elif type_id == 13:
+            value = struct.unpack('!f', data[:4])[0]
+            quality = data[4] >> 7
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            return element_data, data[5:]
+
+        elif type_id == 151:
+            value = data[0] & 0x0F
+            quality = data[0] >> 4
+            element_data = {
+                'value': value,
+                'quality': quality
+            }
+            return element_data, data[1:]
+
+        else:
+            return {'raw_data': data}, b''
+
+    @staticmethod
+    def _parse_time(time_data):
+        if len(time_data) < 7:
+            return None
+
+        milliseconds = struct.unpack('!H', time_data[:2])[0]
+        minute = time_data[2] & 0x3F
+        hour = time_data[3] & 0x1F
+        day = time_data[4] & 0x1F
+        month = time_data[5] & 0x0F
+        year = (time_data[6] & 0x7F) + 2000
+
+        return {
+            'year': year,
+            'month': month,
+            'day': day,
+            'hour': hour,
+            'minute': minute,
+            'milliseconds': milliseconds
+        }

+ 0 - 0
tmp/iec104/__init__.py


+ 671 - 0
tmp/iec104_client.py

@@ -0,0 +1,671 @@
+import socket
+import struct
+import threading
+import time
+from datetime import datetime
+
+
+class IEC104Client:
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.socket = None
+        self.connected = False
+        self.running = False
+
+        # IEC 104 协议常量
+        self.START_BYTE = 0x68
+        self.TESTFR_ACT = b'\x68\x04\x43\x00\x00\x00'
+        self.TESTFR_CON = b'\x68\x04\x83\x00\x00\x00'
+        self.STARTDT_ACT = b'\x68\x04\x07\x00\x00\x00'
+        self.STARTDT_CON = b'\x68\x04\x0B\x00\x00\x00'
+        self.STOPDT_ACT = b'\x68\x04\x13\x00\x00\x00'
+
+        self.send_seq = 0
+        self.recv_seq = 0
+
+
+
+    def connect(self):
+        """连接到IEC 104服务器"""
+        try:
+            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.socket.settimeout(10)
+            self.socket.connect((self.host, self.port))
+            self.connected = True
+            print(f"成功连接到 {self.host}:{self.port}")
+            return True
+        except Exception as e:
+            print(f"连接失败: {e}")
+            return False
+
+    def disconnect(self):
+        """断开连接"""
+        self.running = False
+        if self.socket:
+            try:
+                # 发送停止数据传输命令
+                self.socket.send(self.STOPDT_ACT)
+                self.socket.close()
+            except:
+                pass
+        self.connected = False
+        print("连接已断开")
+
+    def send_startdt(self):
+        """发送启动数据传输命令"""
+        try:
+            self.socket.send(self.STARTDT_ACT)
+            print("发送启动数据传输命令")
+        except Exception as e:
+            print(f"发送启动命令失败: {e}")
+
+    def send_testfr(self):
+        """发送测试帧"""
+        try:
+            self.socket.send(self.TESTFR_ACT)
+            print("发送测试帧")
+        except Exception as e:
+            print(f"发送测试帧失败: {e}")
+
+    def send_s_frame(self):
+        """发送S格式监视帧确认"""
+        try:
+            # S格式帧:68 04 01 00 + 接收序号
+            s_frame = b'\x68\x04\x01\x00' + struct.pack('<H', (self.recv_seq << 1))
+            self.socket.send(s_frame)
+        except Exception as e:
+            print(f"发送S帧失败: {e}")
+
+    def parse_apdu(self, data):
+        """解析APDU(应用协议数据单元)"""
+        if len(data) < 6:
+            return None
+
+        # 检查起始字节
+        if data[0] != self.START_BYTE:
+            return None
+
+        length = data[1]
+
+        # U格式帧(无编号控制功能)
+        if length == 4 and len(data) >= 6:
+            ctrl1 = data[2]
+            ctrl2 = data[3]
+            ctrl3 = data[4]
+            ctrl4 = data[5]
+
+            if ctrl1 == 0x07 and ctrl2 == 0x00:
+                return {"type": "STARTDT_ACT", "description": "启动数据传输激活"}
+            elif ctrl1 == 0x0B and ctrl2 == 0x00:
+                return {"type": "STARTDT_CON", "description": "启动数据传输确认"}
+            elif ctrl1 == 0x43 and ctrl2 == 0x00:
+                return {"type": "TESTFR_ACT", "description": "测试帧激活"}
+            elif ctrl1 == 0x83 and ctrl2 == 0x00:
+                return {"type": "TESTFR_CON", "description": "测试帧确认"}
+
+        # I格式帧(信息传输)
+        elif length > 4:
+            return self.parse_i_frame(data)
+
+        return {"type": "UNKNOWN", "data": data.hex()}
+
+    def parse_i_frame(self, data):
+        """解析I格式帧"""
+        if len(data) < 8:
+            return None
+
+        # 提取序列号
+        send_seq = struct.unpack('<H', data[2:4])[0] >> 1
+        recv_seq = struct.unpack('<H', data[4:6])[0] >> 1
+
+        # ASDU(应用服务数据单元)
+        if len(data) > 6:
+            asdu = data[6:]
+            print('报文:', data.hex())
+            return self.parse_asdu(asdu, send_seq, recv_seq)
+
+        return {
+            "type": "I_FRAME",
+            "send_seq": send_seq,
+            "recv_seq": recv_seq,
+            "data": data[6:].hex() if len(data) > 6 else ""
+        }
+
+    def parse_asdu(self, asdu_data, send_seq, recv_seq):
+        """解析ASDU"""
+        if len(asdu_data) < 6:
+            return {"type": "I_FRAME", "error": "ASDU too short"}
+
+        type_id = asdu_data[0]
+        vsq = asdu_data[1]  # 变量结构限定词
+        cot = struct.unpack('<H', asdu_data[2:4])[0]  # 传送原因(2字节)
+        common_addr = struct.unpack('<H', asdu_data[4:6])[0]  # 公共地址(2字节)
+
+        # 信息对象地址根据VSQ确定
+        num_objects = vsq & 0x7F
+        sq = (vsq & 0x80) >> 7  # 序列标志
+
+        result = {
+            "type": "I_FRAME",
+            "send_seq": send_seq,
+            "recv_seq": recv_seq,
+            "type_id": type_id,
+            "type_description": self.get_type_description(type_id),
+            "vsq": vsq,
+            "num_objects": num_objects,
+            "sq": sq,
+            "cot": cot,
+            "cot_description": self.get_cot_description(cot & 0x3F),  # 只取低6位
+            "common_addr": common_addr,
+            "data_values": [],
+            # "raw_data": asdu_data[6:].hex() if len(asdu_data) > 6 else "",
+            "raw_data": asdu_data.hex() if len(asdu_data) > 6 else ""
+        }
+        # 解析数据值
+        if len(asdu_data) > 6:
+            data_part = asdu_data[6:]
+
+            # 根据类型ID解析具体数据
+            if type_id == 1:  # 单点信息
+                result["data_values"] = self.parse_single_point_info(data_part, num_objects, sq)
+            elif type_id == 3:  # 双点信息
+                result["data_values"] = self.parse_double_point_info(data_part, num_objects, sq)
+            elif type_id == 9:  # 测量值,标准化值
+                result["data_values"] = self.parse_normalized_value(data_part, num_objects, sq)
+            elif type_id == 11:  # 测量值,标度化值
+                result["data_values"] = self.parse_scaled_value(data_part, num_objects, sq)
+            elif type_id == 13:  # 测量值,浮点数
+                result["data_values"] = self.parse_float_value(data_part, num_objects, sq)
+
+        return result
+
+    def get_type_description(self, type_id):
+        """获取类型标识描述"""
+        type_descriptions = {
+            1: "单点信息",
+            2: "带时标的单点信息",
+            3: "双点信息",
+            4: "带时标的双点信息",
+            5: "步位置信息",
+            6: "带时标的步位置信息",
+            7: "32位串信息",
+            8: "带时标的32位串信息",
+            9: "测量值,标准化值",
+            10: "带时标的测量值,标准化值",
+            11: "测量值,标度化值",
+            12: "带时标的测量值,标度化值",
+            13: "测量值,浮点数",
+            14: "带时标的测量值,浮点数",
+            30: "带时标的单点信息",
+            36: "测量值,浮点数带时标",
+            100: "总召唤命令",
+            101: "计数量召唤命令"
+        }
+        return type_descriptions.get(type_id, f"未知类型 ({type_id})")
+
+    def get_cot_description(self, cot):
+        """获取传送原因描述"""
+        cot_descriptions = {
+            1: "周期循环",
+            2: "背景扫描",
+            3: "自发",
+            4: "初始化",
+            5: "请求",
+            6: "激活",
+            7: "激活确认",
+            8: "停止激活",
+            9: "停止激活确认",
+            10: "激活结束",
+            20: "响应站召唤"
+        }
+        return cot_descriptions.get(cot, f"未知原因 ({cot})")
+
+    def parse_single_point_info(self, data, num_objects, sq):
+        """解析单点信息"""
+        values = []
+        pos = 0
+
+        if sq == 0:  # 非连续地址
+            for i in range(num_objects):
+                if pos + 3 <= len(data):
+                    addr = struct.unpack('<H', data[pos:pos + 2])[0]
+                    value = data[pos + 2] & 0x01
+                    quality = data[pos + 3]
+                    values.append({
+                        "address": addr,
+                        "value": bool(value),
+                        "quality": quality
+                    })
+                    pos += 4
+        else:  # 连续地址
+            if len(data) >= 2:
+                base_addr = struct.unpack('<H', data[0:2])[0]
+                pos = 2
+                for i in range(num_objects):
+                    if pos + 1 <= len(data):
+                        value = data[pos] & 0x01
+                        quality = data[pos + 1]
+                        values.append({
+                            "address": base_addr + i,
+                            "value": bool(value),
+                            "quality": quality
+                        })
+                        pos += 2
+
+        return values
+
+    def parse_double_point_info(self, data, num_objects, sq):
+        """解析双点信息"""
+        values = []
+        pos = 0
+
+        if sq == 0:  # 非连续地址
+            for i in range(num_objects):
+                if pos + 3 <= len(data):
+                    addr = struct.unpack('<H', data[pos:pos + 2])[0]
+                    value = data[pos + 2] & 0x03
+                    quality = data[pos + 3]
+                    values.append({
+                        "address": addr,
+                        "value": value,
+                        "quality": quality
+                    })
+                    pos += 4
+        else:  # 连续地址
+            if len(data) >= 2:
+                base_addr = struct.unpack('<H', data[0:2])[0]
+                pos = 2
+                for i in range(num_objects):
+                    if pos + 1 <= len(data):
+                        value = data[pos] & 0x03
+                        quality = data[pos + 1]
+                        values.append({
+                            "address": base_addr + i,
+                            "value": value,
+                            "quality": quality
+                        })
+                        pos += 2
+
+        return values
+
+    def parse_normalized_value(self, data, num_objects, sq):
+        """解析标准化值"""
+        values = []
+        pos = 0
+
+        if sq == 0:  # 非连续地址
+            for i in range(num_objects):
+                if pos + 6 <= len(data):
+                    addr = struct.unpack('<H', data[pos:pos + 2])[0]
+                    # 标准化值范围是-1.0到1.0,对应-32768到32767
+                    raw_value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
+                    value = raw_value / 32768.0  # 转换为-1.0到1.0范围
+                    quality = data[pos + 4]
+                    values.append({
+                        "address": addr,
+                        "value": round(value, 6),  # 保留6位小数
+                        "quality": quality,
+                        "raw_value": raw_value  # 保留原始值用于调试
+                    })
+                    pos += 6
+        else:  # 连续地址
+            if len(data) >= 2:
+                base_addr = struct.unpack('<H', data[0:2])[0]
+                pos = 2
+                for i in range(num_objects):
+                    if pos + 3 <= len(data):
+                        raw_value = struct.unpack('<h', data[pos:pos + 2])[0]
+                        value = raw_value / 32768.0
+                        quality = data[pos + 2]
+                        values.append({
+                            "address": base_addr + i,
+                            "value": round(value, 6),
+                            "quality": quality,
+                            "raw_value": raw_value
+                        })
+                        pos += 3
+
+        return values
+
+    def parse_scaled_value(self, data, num_objects, sq):
+        """解析标度化值"""
+        values = []
+        pos = 0
+
+        if sq == 0:  # 非连续地址
+            for i in range(num_objects):
+                if pos + 6 <= len(data):
+                    addr = struct.unpack('<H', data[pos:pos + 2])[0]
+                    # 标度化值实际上是4字节,包含2字节值和2字节质量描述
+                    value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
+                    quality = struct.unpack('<H', data[pos + 4:pos + 6])[0]
+                    # 将值转换为模拟器显示的格式
+                    scaled_value = value >> 7  # 取高8位作为显示值
+                    values.append({
+                        "address": addr,
+                        "value": scaled_value,
+                        "quality": quality & 0xFF  # 只取低8位质量位
+                    })
+                    pos += 6
+        else:  # 连续地址
+            if len(data) >= 2:
+                base_addr = struct.unpack('<H', data[0:2])[0]
+                pos = 2
+                for i in range(num_objects):
+                    if pos + 4 <= len(data):
+                        value = struct.unpack('<h', data[pos:pos + 2])[0]
+                        quality = struct.unpack('<H', data[pos + 2:pos + 4])[0]
+                        scaled_value = value >> 8
+                        values.append({
+                            "address": base_addr + i,
+                            "value": scaled_value,
+                            "quality": quality & 0xFF
+                        })
+                        pos += 4
+
+        return values
+
+    def parse_float_value(self, data, num_objects, sq):
+        """解析浮点数值"""
+        values = []
+        pos = 0
+
+        if sq == 0:  # 非连续地址
+            # 每个信息对象:3字节地址 + 4字节浮点数 + 1字节质量
+            for i in range(num_objects):
+                if pos + 8 <= len(data):
+                    # 3字节地址(小端序)
+                    addr = struct.unpack('<I', data[pos:pos + 3] + b'\x00')[0]
+                    # 4字节IEEE754浮点数(小端序)
+                    value_bytes = data[pos + 3:pos + 7]
+                    value = struct.unpack('<f', value_bytes)[0]
+                    quality = data[pos + 7]
+
+                    # 检查数值有效性
+                    if not (value != value):  # 检查NaN
+                        values.append({
+                            "address": addr,
+                            "value": round(value, 6),
+                            "quality": quality,
+                            "raw_bytes": value_bytes.hex()
+                        })
+                    pos += 8
+        else:  # 连续地址
+            if len(data) >= 3:
+                # 第一个信息对象地址
+                base_addr = struct.unpack('<I', data[0:3] + b'\x00')[0]
+                pos = 3
+
+                # 后续对象只有数值和质量
+                for i in range(num_objects):
+                    if pos + 5 <= len(data):
+                        value_bytes = data[pos:pos + 4]
+                        value = struct.unpack('<f', value_bytes)[0]
+                        quality = data[pos + 4]
+
+                        if not (value != value):  # 检查NaN
+                            values.append({
+                                "address": base_addr + i,
+                                "value": round(value, 6),
+                                "quality": quality,
+                                "raw_bytes": value_bytes.hex()
+                            })
+                        pos += 5
+
+        return values
+
+    def receive_data(self):
+        """接收数据的主循环"""
+        buffer = b''
+
+        while self.running:
+            try:
+                data = self.socket.recv(1024)
+                if not data:
+                    print("连接被服务器关闭")
+                    break
+
+                buffer += data
+
+                # 处理缓冲区中的完整帧
+                while len(buffer) >= 2:
+                    if buffer[0] != self.START_BYTE:
+                        buffer = buffer[1:]
+                        continue
+
+                    if len(buffer) < 2:
+                        break
+
+                    frame_length = buffer[1] + 2  # 长度字段不包括起始字节和长度字节本身
+
+                    if len(buffer) < frame_length:
+                        break
+
+                    frame = buffer[:frame_length]
+                    buffer = buffer[frame_length:]
+
+                    # 解析并显示帧
+                    parsed = self.parse_apdu(frame)
+                    if parsed:
+                        self.display_data(parsed)
+
+                        # 发送S格式确认帧(监视功能)
+                        if parsed.get("type") == "I_FRAME":
+                            self.send_s_frame()
+
+                        # 更新接收序号
+                        if parsed.get("type") == "I_FRAME" and "send_seq" in parsed:
+                            self.recv_seq = parsed["send_seq"] + 1
+
+                    # 自动回复确认帧
+                    if parsed and parsed.get("type") == "TESTFR_ACT":
+                        self.socket.send(self.TESTFR_CON)
+
+            except socket.timeout:
+                continue
+            except Exception as e:
+                print(f"接收数据错误: {e}")
+                break
+
+    def send_general_interrogation(self):
+        """发送总召命令(C_IC_NA_1)"""
+        try:
+            # 构造总召命令帧
+            # ASDU: 类型标识=100, VSQ=1, COT=6(激活), 公共地址, 信息对象地址=0, QOI=20(站召唤)
+            asdu = struct.pack('<BBHHIB',
+                               100,  # 类型标识:总召命令
+                               1,  # VSQ:1个信息对象
+                               6,  # COT:激活
+                               self.common_addr if hasattr(self, 'common_addr') else 1,  # 公共地址
+                               0,  # 信息对象地址
+                               20  # QOI:站召唤
+                               )
+
+            # 构造I格式帧
+            frame_length = len(asdu) + 4
+            i_frame = struct.pack('<BBH',
+                                  self.START_BYTE,
+                                  frame_length,
+                                  (self.send_seq << 1)  # 发送序号
+                                  ) + struct.pack('<H', (self.recv_seq << 1)) + asdu  # 接收序号 + ASDU
+
+            self.socket.send(i_frame)
+            self.send_seq += 1
+            print(f"发送总召命令 (发送序号: {self.send_seq - 1})")
+
+        except Exception as e:
+            print(f"发送总召命令失败: {e}")
+
+    def send_clock_sync(self):
+        """发送时钟同步命令"""
+        try:
+            # 获取当前时间
+            now = datetime.now()
+            # CP56Time2a格式:毫秒(2) + 分钟(1) + 小时(1) + 日期(3)
+            ms = (now.second * 1000 + now.microsecond // 1000) & 0xFFFF
+            minute = now.minute & 0x3F
+            hour = now.hour & 0x1F
+            day = now.day & 0x1F
+            month = now.month & 0x0F
+            year = (now.year - 2000) & 0x7F
+
+            time_bytes = struct.pack('<HBBBB', ms, minute, hour,
+                                     (month << 5) | day, year)
+
+            # 构造时钟同步ASDU
+            asdu = struct.pack('<BBHHI',
+                               103,  # 类型标识:时钟同步命令
+                               1,  # VSQ
+                               6,  # COT:激活
+                               self.common_addr if hasattr(self, 'common_addr') else 1,
+                               0  # 信息对象地址
+                               ) + time_bytes
+
+            frame_length = len(asdu) + 4
+            i_frame = struct.pack('<BBH',
+                                  self.START_BYTE,
+                                  frame_length,
+                                  (self.send_seq << 1)
+                                  ) + struct.pack('<H', (self.recv_seq << 1)) + asdu
+
+            self.socket.send(i_frame)
+            self.send_seq += 1
+            print(f"发送时钟同步命令")
+
+        except Exception as e:
+            print(f"发送时钟同步命令失败: {e}")
+
+    def display_data(self, parsed_data):
+        """显示解析后的数据"""
+        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        print(f"\n[{timestamp}] 接收到数据:")
+        print(f"类型: {parsed_data.get('type', 'UNKNOWN')}")
+
+        if 'description' in parsed_data:
+            print(f"描述: {parsed_data['description']}")
+
+        if 'type_description' in parsed_data:
+            print(f"ASDU类型: {parsed_data['type_description']} (ID: {parsed_data['type_id']})")
+            print(f"传送原因: {parsed_data['cot_description']} ({parsed_data['cot']})")
+            print(f"公共地址: {parsed_data['common_addr']}")
+            if 'num_objects' in parsed_data:
+                print(f"信息对象数量: {parsed_data['num_objects']}, SQ: {parsed_data.get('sq', 0)}")
+
+            if parsed_data.get('data_values'):
+                print("数据值:")
+                valid_values = [v for v in parsed_data['data_values'] if v.get('value') is not None]
+                for i, value in enumerate(valid_values[:10]):  # 显示前10个有效值
+                    quality_desc = self.get_quality_description(value.get('quality', 0))
+                    raw_info = f" [原始: {value.get('raw_bytes', '')}]" if 'raw_bytes' in value else ""
+                    print(f"  地址 {value.get('address', 'N/A')}: {value.get('value', 'N/A')} {quality_desc}{raw_info}")
+                if len(valid_values) > 10:
+                    print(f"  ... 还有 {len(valid_values) - 10} 个有效数据点")
+                if len(parsed_data['data_values']) > len(valid_values):
+                    print(f"  (过滤了 {len(parsed_data['data_values']) - len(valid_values)} 个异常值)")
+
+            # 显示原始数据用于调试
+            if 'raw_data' in parsed_data and parsed_data['raw_data']:
+                print(f"原始数据: {parsed_data['raw_data']}")
+
+        # 记录公共地址用于总召
+        if 'common_addr' in parsed_data:
+            self.common_addr = parsed_data['common_addr']
+
+    def get_quality_description(self, quality):
+        """获取质量描述符"""
+        if quality == 0:
+            return "(良好)"
+        elif quality & 0x80:
+            return "(无效)"
+        elif quality & 0x40:
+            return "(未更新)"
+        elif quality & 0x20:
+            return "(被取代)"
+        elif quality & 0x10:
+            return "(被阻塞)"
+        else:
+            return f"(质量: {quality})"
+
+        print("-" * 50)
+
+    def start(self):
+        """启动客户端"""
+        max_retries = 5
+        retry_count = 0
+
+        while retry_count < max_retries:
+            if not self.connect():
+                retry_count += 1
+                if retry_count < max_retries:
+                    print(f"连接失败,{5}秒后重试 ({retry_count}/{max_retries})")
+                    time.sleep(5)
+                    continue
+                else:
+                    print("达到最大重试次数,退出程序")
+                    return
+
+            self.running = True
+            retry_count = 0  # 重置重试计数
+
+            # 启动接收线程
+            receive_thread = threading.Thread(target=self.receive_data)
+            receive_thread.daemon = True
+            receive_thread.start()
+
+            # 发送启动数据传输命令
+            time.sleep(1)
+            self.send_startdt()
+
+            # 等待启动确认后发送总召
+            time.sleep(2)
+            self.send_general_interrogation()
+
+            try:
+                # 主循环 - 定期发送测试帧和总召
+                last_gi_time = time.time()
+                while self.running and self.connected:
+                    current_time = time.time()
+
+                    # 每5分钟发送一次总召
+                    if current_time - last_gi_time >= 300:  # 300秒 = 5分钟
+                        self.send_general_interrogation()
+                        last_gi_time = current_time
+
+                    # 每30秒发送一次测试帧
+                    time.sleep(30)
+                    if self.connected:
+                        self.send_testfr()
+
+                    # 检查接收线程是否还活着
+                    if not receive_thread.is_alive():
+                        print("接收线程已停止,准备重连")
+                        break
+
+            except KeyboardInterrupt:
+                print("\n程序被用户中断")
+                break
+            finally:
+                self.disconnect()
+
+            # 如果不是用户中断,则尝试重连
+            if self.running:
+                print("连接断开,5秒后尝试重连...")
+                time.sleep(5)
+
+
+def main():
+    # 配置参数
+    HOST = "192.168.50.242"
+    PORT = 2404
+
+    print(f"启动IEC 104客户端")
+    print(f"目标服务器: {HOST}:{PORT}")
+    print("按 Ctrl+C 退出程序\n")
+
+    client = IEC104Client(HOST, PORT)
+    client.start()
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
utils/db/ConnectMysql.py

@@ -76,7 +76,7 @@ class ConnectMysql:
 
             while retry_count < max_retries:
                 try:
-                    with tempfile.NamedTemporaryFile(mode='w') as tmp:
+                        with tempfile.NamedTemporaryFile(mode='w') as tmp:
                         batch.to_csv(tmp, index=False, header=False, sep='\t')
                         tmp.flush()
 

+ 95 - 0
说明书.md

@@ -0,0 +1,95 @@
+   # 项目说明书:104规约数据获取系统
+
+## 一、项目概述
+本项目用于处理和分析风电场SCADA数据、故障报警数据等,通过定时任务实现数据的自动获取、解析与存储,为风电场运行状态监控和数据分析提供支持。
+
+## 二、项目结构
+项目根目录位于<mcfolder name="energy-online-data" path="d:\project\energy-online-data"></mcfolder>,主要包含以下文件和目录:
+
+### 核心文件
+- <mcfile name="app_run.py" path="d:\project\energy-online-data\app_run.py"></mcfile>:项目主程序入口,负责协调数据处理流程
+- <mcfile name="parse_scada_data.py" path="d:\project\energy-online-data\parse_scada_data.py"></mcfile>:SCADA数据解析脚本
+- <mcfile name="parse_warn_fault_data.py" path="d:\project\energy-online-data\parse_warn_fault_data.py"></mcfile>:故障报警数据解析脚本
+- <mcfile name="add_or_remove_partition.py" path="d:\project\energy-online-data\add_or_remove_partition.py"></mcfile>:数据库分区管理脚本
+- <mcfile name="create_table.py" path="d:\project\energy-online-data\create_table.py"></mcfile>:数据库表创建脚本
+
+### 主要目录
+1. **conf/**:配置文件目录
+   - <mcfile name="config.yaml" path="d:\project\energy-online-data\conf\config.yaml"></mcfile>:系统配置文件
+   - 多个Excel文件:包含测点表、故障代码表等业务配置数据
+
+2. **data/**:数据处理核心模块
+   - <mcfile name="ReadAndSaveDb.py" path="d:\project\energy-online-data\data\ReadAndSaveDb.py"></mcfile>:数据读取与存储管理
+   - <mcfile name="ClassIdentifier.py" path="d:\project\energy-online-data\data\ClassIdentifier.py"></mcfile>:数据分类标识
+   - <mcfile name="WindFarmDayCount.py" path="d:\project\energy-online-data\data\WindFarmDayCount.py"></mcfile>:风电场日数据统计
+
+3. **service/**:服务层模块
+   - <mcfile name="common_connect.py" path="d:\project\energy-online-data\service\common_connect.py"></mcfile>:数据库连接管理
+   - <mcfile name="trans_service.py" path="d:\project\energy-online-data\service\trans_service.py"></mcfile>:数据传输服务
+
+4. **utils/**:工具类模块
+   - <mcfile name="read_conf.py" path="d:\project\energy-online-data\utils\conf\read_conf.py"></mcfile>:配置文件读取工具
+   - <mcfile name="ConnectMysql.py" path="d:\project\energy-online-data\utils\db\ConnectMysql.py"></mcfile>:MySQL数据库连接工具
+   - <mcfile name="trans_log.py" path="d:\project\energy-online-data\utils\log\trans_log.py"></mcfile>:日志管理工具
+
+## 三、核心功能
+
+### 1. 数据处理流程
+项目核心数据处理逻辑在<mcsymbol name="ReadAndSaveDb" filename="ReadAndSaveDb.py" path="d:\project\energy-online-data\data\ReadAndSaveDb.py" startline="15" type="class"></mcsymbol>类中实现,主要流程包括:
+- 读取历史数据表信息
+- 多线程处理风机数据
+- 数据清洗与转换
+- 数据分类标识
+- 结果存储与临时表清理
+
+### 2. 定时任务调度
+系统通过crontab配置定时任务,实现自动化数据处理:
+- 每10分钟执行SCADA数据解析:`0/10 * * * * /home/trans/script/start.sh parse_scada_data.py`
+- 每10分钟执行故障报警数据解析:`0/10 * * * * /home/trans/script/start.sh parse_warn_fault_data.py`
+- 每天1点执行数据转化:`0 1 * * * /home/trans/script/start.sh app_run.py`
+- 每月1号1点执行分区管理:`0 1 1 * * sh /home/trans/script/start.sh add_or_remove_partition.py`
+
+### 3. 数据库交互
+通过<mcsymbol name="ConnectMysql" filename="ConnectMysql.py" path="d:\project\energy-online-data\utils\db\ConnectMysql.py" startline="1" type="class"></mcsymbol>实现数据库连接,在<mcfile name="common_connect.py" path="d:\project\energy-online-data\service\common_connect.py"></mcfile>中定义了全局数据库连接实例。
+
+## 四、环境要求与安装
+
+### 环境要求
+- Python 3.x
+- 依赖包:详见<mcfile name="requirements.txt" path="d:\project\energy-online-data\requirements.txt"></mcfile>
+
+### 安装步骤
+1. 克隆项目到本地
+2. 安装依赖包:
+```bash
+pip install -r requirements.txt
+```
+3. 配置<mcfile name="config.yaml" path="d:\project\energy-online-data\conf\config.yaml"></mcfile>文件
+4. 设置定时任务(参考README中的定时任务配置)
+
+## 五、配置说明
+系统配置主要通过<mcfile name="config.yaml" path="d:\project\energy-online-data\conf\config.yaml"></mcfile>文件进行,包括数据库连接参数、数据处理配置等。可通过<mcsymbol name="yaml_conf" filename="read_conf.py" path="d:\project\energy-online-data\utils\conf\read_conf.py" startline="8" type="function"></mcsymbol>函数读取配置内容。
+
+## 六、关键代码说明
+
+### 数据处理核心方法
+<mcsymbol name="run" filename="ReadAndSaveDb.py" path="d:\project\energy-online-data\data\ReadAndSaveDb.py" startline="100" type="function"></mcsymbol>方法实现了多进程处理数据的逻辑:
+```python:/d:/project/energy-online-data/data/ReadAndSaveDb.py
+def run(self):
+    with multiprocessing.Pool(2) as pool:
+        pool.map(self.read_and_save_db, self.yesterday_tables)
+```
+
+### 配置文件读取
+<mcsymbol name="read_conf" filename="read_conf.py" path="d:\project\energy-online-data\utils\conf\read_conf.py" startline="16" type="function"></mcsymbol>函数提供了统一的配置读取接口:
+```python:/d:/project/energy-online-data/utils/conf/read_conf.py
+def read_conf(dict_conf, col, default_value=None):
+    if col in dict_conf:
+        res = dict_conf[col]
+        if res is None and default_value is not None:
+            return default_value
+        return res
+    else:
+        return default_value
+```
+