import socket import struct import time import traceback from enum import Enum from tmp.iec104.ASDU import ASDU class IEC104Type(Enum): M_SP_NA_1 = 1 # 单点遥信 M_DP_NA_1 = 3 # 双点遥信 M_ME_NA_1 = 9 # 测量值,规一化值 M_ME_NB_1 = 11 # 测量值,标度化值 M_ME_NC_1 = 13 # 测量值,短浮点数 M_SP_TB_1 = 30 # 带时标单点遥信 M_DP_TB_1 = 31 # 带时标双点遥信 CUSTOM_151 = 151 # 自定义遥信类型 @classmethod def get_description(cls, type_id): descriptions = { 1: "单点遥信", 3: "双点遥信", 9: "测量值(规一化)", 11: "测量值(标度化)", 13: "测量值(短浮点)", 30: "带时标单点遥信", 31: "带时标双点遥信", 151: "自定义遥信" } return descriptions.get(type_id, f"未知类型({type_id})") class IEC104Client: def __init__(self, host, port): self.host = host self.port = port self.socket = None self.send_seq = 0 self.recv_seq = 0 self.max_retries = 3 self.retry_interval = 5 # 重试间隔(秒) def connect(self): retry_count = 0 while retry_count < self.max_retries: try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(10) # 设置超时时间 self.socket.connect((self.host, self.port)) self.send_seq = 0 self.recv_seq = 0 print(f"已连接到 {self.host}:{self.port}") return True except Exception as e: retry_count += 1 print(f"连接失败 (尝试 {retry_count}/{self.max_retries}): {e}") if retry_count < self.max_retries: time.sleep(self.retry_interval) return False def disconnect(self): if self.socket: try: self.socket.close() except: pass self.socket = None print("连接已断开") def send_startdt(self): try: # 发送STARTDT ACT apci = APCI(4, self.send_seq, self.recv_seq) startdt_act = apci.pack() + b'\x07\x00\x00\x00' # U格式帧,STARTDT ACT self.socket.send(startdt_act) self.send_seq += 1 print("已发送STARTDT激活命令") # 设置超时等待STARTDT CON self.socket.settimeout(5) # 5秒超时 try: data = self.socket.recv(1024) if len(data) >= 6 and data[6:10] == b'\x0B\x00\x00\x00': # STARTDT CON print("收到STARTDT确认,连接已激活") return True # 如果收到其他响应,可能是服务器拒绝了连接 if len(data) > 0: print(f"收到非预期响应: {data.hex()}") else: print("未收到STARTDT确认响应") return False except socket.timeout: print("等待STARTDT确认超时") return False except ConnectionResetError: print("发送STARTDT时连接被重置,服务器可能拒绝了连接") print(traceback.format_exc()) return False except Exception as e: print(f"发送STARTDT时发生错误: {e}") print(traceback.format_exc()) return False def send_general_interrogation(self): try: apci = APCI(14, self.send_seq, self.recv_seq) gi_asdu = struct.pack('!BBHHHBBH', 100, # 类型ID 100 = 总召 0x01, # VSQ (1个元素) 0x06, # COT (激活) 0x00, # 公共地址 0x00, # 信息对象地址 0x00, # 限定词 (QOI) 0x14, # 总召限定词 (20) 0x00) # 无时标 gi_frame = apci.pack() + gi_asdu self.socket.send(gi_frame) self.send_seq += 1 print("发送总召命令") return True except Exception as e: print(f"发送总召命令错误: {e}") return False def receive_data(self): while True: try: data = self.socket.recv(1024) if not data: print("连接被远程主机关闭") break apci = APCI.unpack(data[:6]) if not apci: continue self.recv_seq = apci.send_seq if apci.length == 4: if data[6:10] == b'\x0B\x00\x00\x00': print("收到STARTDT CON") elif data[6:10] == b'\x13\x00\x00\x00': print("收到STOPDT CON") continue asdu = ASDU.unpack(data[6:6 + apci.length - 4]) if not asdu: continue self._process_asdu(asdu) if asdu.cot in [1, 3, 5, 7, 9, 11, 20]: self._send_ack() except socket.timeout: print("接收数据超时,发送测试帧保持连接...") self._send_test_frame() continue except ConnectionResetError: print("连接被远程主机重置") break except Exception as e: print(f"接收数据错误: {e}") break def _process_asdu(self, asdu): type_desc = IEC104Type.get_description(asdu.type_id) cot_desc = self._get_cot_description(asdu.cot) print(f"\n收到ASDU: 类型={type_desc}, 原因={cot_desc}({asdu.cot}), 公共地址={asdu.common_addr}") for io in asdu.io_elements: print(f" 点地址: {io['address']}, 数据: {self._format_io_data(asdu.type_id, io['data'])}") if 'time' in io['data']: time_data = io['data']['time'] print(f" 时间: {time_data['year']}-{time_data['month']:02d}-{time_data['day']:02d} " f"{time_data['hour']:02d}:{time_data['minute']:02d}:{time_data['milliseconds'] / 1000:.3f}") def _format_io_data(self, type_id, data): if type_id in [1, 30, 151]: return f"值={data['value']}, 质量={data['quality']}" elif type_id in [3, 31]: states = ["中间状态", "分", "合", "不确定"] state = states[data['value']] if data['value'] < len(states) else "未知" return f"状态={state}({data['value']}), 质量={data['quality']}" elif type_id in [9, 11, 13]: return f"值={data['value']}, 质量={data['quality']}" else: return str(data) def _get_cot_description(self, cot): descriptions = { 1: "周期/循环", 2: "背景扫描", 3: "突发", 5: "请求或被请求", 6: "激活", 7: "激活确认", 8: "停止激活", 9: "停止激活确认", 20: "响应总召" } return descriptions.get(cot, f"未知原因({cot})") def _send_ack(self): try: apci = APCI(4, self.send_seq, self.recv_seq) ack_frame = apci.pack() + b'\x00\x00\x00\x00' self.socket.send(ack_frame) self.send_seq += 1 except Exception as e: print(f"发送确认帧错误: {e}") def _send_test_frame(self): try: apci = APCI(4, self.send_seq, self.recv_seq) test_frame = apci.pack() + b'\x43\x00\x00\x00' # 测试帧 self.socket.send(test_frame) self.send_seq += 1 except Exception as e: print(f"发送测试帧错误: {e}") def main(): client = IEC104Client("192.168.50.242", 2404) try: while True: if client.connect(): if client.send_startdt(): if client.send_general_interrogation(): client.receive_data() print(f"将在 {client.retry_interval} 秒后尝试重新连接...") time.sleep(client.retry_interval) except KeyboardInterrupt: print("\n用户中断") except Exception as e: print(f"发生错误: {e}") print(traceback.format_exc()) finally: client.disconnect() if __name__ == "__main__": main()