import socket import struct import threading import time from datetime import datetime class IEC104Client: def __init__(self, host, port): self.host = host self.port = port self.socket = None self.connected = False self.running = False # IEC 104 协议常量 self.START_BYTE = 0x68 self.TESTFR_ACT = b'\x68\x04\x43\x00\x00\x00' self.TESTFR_CON = b'\x68\x04\x83\x00\x00\x00' self.STARTDT_ACT = b'\x68\x04\x07\x00\x00\x00' self.STARTDT_CON = b'\x68\x04\x0B\x00\x00\x00' self.STOPDT_ACT = b'\x68\x04\x13\x00\x00\x00' self.send_seq = 0 self.recv_seq = 0 def connect(self): """连接到IEC 104服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(10) self.socket.connect((self.host, self.port)) self.connected = True print(f"成功连接到 {self.host}:{self.port}") return True except Exception as e: print(f"连接失败: {e}") return False def disconnect(self): """断开连接""" self.running = False if self.socket: try: # 发送停止数据传输命令 self.socket.send(self.STOPDT_ACT) self.socket.close() except: pass self.connected = False print("连接已断开") def send_startdt(self): """发送启动数据传输命令""" try: self.socket.send(self.STARTDT_ACT) print("发送启动数据传输命令") except Exception as e: print(f"发送启动命令失败: {e}") def send_testfr(self): """发送测试帧""" try: self.socket.send(self.TESTFR_ACT) print("发送测试帧") except Exception as e: print(f"发送测试帧失败: {e}") def send_s_frame(self): """发送S格式监视帧确认""" try: # S格式帧:68 04 01 00 + 接收序号 s_frame = b'\x68\x04\x01\x00' + struct.pack('= 6: ctrl1 = data[2] ctrl2 = data[3] ctrl3 = data[4] ctrl4 = data[5] if ctrl1 == 0x07 and ctrl2 == 0x00: return {"type": "STARTDT_ACT", "description": "启动数据传输激活"} elif ctrl1 == 0x0B and ctrl2 == 0x00: return {"type": "STARTDT_CON", "description": "启动数据传输确认"} elif ctrl1 == 0x43 and ctrl2 == 0x00: return {"type": "TESTFR_ACT", "description": "测试帧激活"} elif ctrl1 == 0x83 and ctrl2 == 0x00: return {"type": "TESTFR_CON", "description": "测试帧确认"} # I格式帧(信息传输) elif length > 4: return self.parse_i_frame(data) return {"type": "UNKNOWN", "data": data.hex()} def parse_i_frame(self, data): """解析I格式帧""" if len(data) < 8: return None # 提取序列号 send_seq = struct.unpack('> 1 recv_seq = struct.unpack('> 1 # ASDU(应用服务数据单元) if len(data) > 6: asdu = data[6:] print('报文:', data.hex()) return self.parse_asdu(asdu, send_seq, recv_seq) return { "type": "I_FRAME", "send_seq": send_seq, "recv_seq": recv_seq, "data": data[6:].hex() if len(data) > 6 else "" } def parse_asdu(self, asdu_data, send_seq, recv_seq): """解析ASDU""" if len(asdu_data) < 6: return {"type": "I_FRAME", "error": "ASDU too short"} type_id = asdu_data[0] vsq = asdu_data[1] # 变量结构限定词 cot = struct.unpack('> 7 # 序列标志 result = { "type": "I_FRAME", "send_seq": send_seq, "recv_seq": recv_seq, "type_id": type_id, "type_description": self.get_type_description(type_id), "vsq": vsq, "num_objects": num_objects, "sq": sq, "cot": cot, "cot_description": self.get_cot_description(cot & 0x3F), # 只取低6位 "common_addr": common_addr, "data_values": [], # "raw_data": asdu_data[6:].hex() if len(asdu_data) > 6 else "", "raw_data": asdu_data.hex() if len(asdu_data) > 6 else "" } # 解析数据值 if len(asdu_data) > 6: data_part = asdu_data[6:] # 根据类型ID解析具体数据 if type_id == 1: # 单点信息 result["data_values"] = self.parse_single_point_info(data_part, num_objects, sq) elif type_id == 3: # 双点信息 result["data_values"] = self.parse_double_point_info(data_part, num_objects, sq) elif type_id == 9: # 测量值,标准化值 result["data_values"] = self.parse_normalized_value(data_part, num_objects, sq) elif type_id == 11: # 测量值,标度化值 result["data_values"] = self.parse_scaled_value(data_part, num_objects, sq) elif type_id == 13: # 测量值,浮点数 result["data_values"] = self.parse_float_value(data_part, num_objects, sq) return result def get_type_description(self, type_id): """获取类型标识描述""" type_descriptions = { 1: "单点信息", 2: "带时标的单点信息", 3: "双点信息", 4: "带时标的双点信息", 5: "步位置信息", 6: "带时标的步位置信息", 7: "32位串信息", 8: "带时标的32位串信息", 9: "测量值,标准化值", 10: "带时标的测量值,标准化值", 11: "测量值,标度化值", 12: "带时标的测量值,标度化值", 13: "测量值,浮点数", 14: "带时标的测量值,浮点数", 30: "带时标的单点信息", 36: "测量值,浮点数带时标", 100: "总召唤命令", 101: "计数量召唤命令" } return type_descriptions.get(type_id, f"未知类型 ({type_id})") def get_cot_description(self, cot): """获取传送原因描述""" cot_descriptions = { 1: "周期循环", 2: "背景扫描", 3: "自发", 4: "初始化", 5: "请求", 6: "激活", 7: "激活确认", 8: "停止激活", 9: "停止激活确认", 10: "激活结束", 20: "响应站召唤" } return cot_descriptions.get(cot, f"未知原因 ({cot})") def parse_single_point_info(self, data, num_objects, sq): """解析单点信息""" values = [] pos = 0 if sq == 0: # 非连续地址 for i in range(num_objects): if pos + 3 <= len(data): addr = struct.unpack('= 2: base_addr = struct.unpack('= 2: base_addr = struct.unpack('= 2: base_addr = struct.unpack('> 7 # 取高8位作为显示值 values.append({ "address": addr, "value": scaled_value, "quality": quality & 0xFF # 只取低8位质量位 }) pos += 6 else: # 连续地址 if len(data) >= 2: base_addr = struct.unpack('> 8 values.append({ "address": base_addr + i, "value": scaled_value, "quality": quality & 0xFF }) pos += 4 return values def parse_float_value(self, data, num_objects, sq): """解析浮点数值""" values = [] pos = 0 if sq == 0: # 非连续地址 # 每个信息对象:3字节地址 + 4字节浮点数 + 1字节质量 for i in range(num_objects): if pos + 8 <= len(data): # 3字节地址(小端序) addr = struct.unpack('= 3: # 第一个信息对象地址 base_addr = struct.unpack('= 2: if buffer[0] != self.START_BYTE: buffer = buffer[1:] continue if len(buffer) < 2: break frame_length = buffer[1] + 2 # 长度字段不包括起始字节和长度字节本身 if len(buffer) < frame_length: break frame = buffer[:frame_length] buffer = buffer[frame_length:] # 解析并显示帧 parsed = self.parse_apdu(frame) if parsed: self.display_data(parsed) # 发送S格式确认帧(监视功能) if parsed.get("type") == "I_FRAME": self.send_s_frame() # 更新接收序号 if parsed.get("type") == "I_FRAME" and "send_seq" in parsed: self.recv_seq = parsed["send_seq"] + 1 # 自动回复确认帧 if parsed and parsed.get("type") == "TESTFR_ACT": self.socket.send(self.TESTFR_CON) except socket.timeout: continue except Exception as e: print(f"接收数据错误: {e}") break def send_general_interrogation(self): """发送总召命令(C_IC_NA_1)""" try: # 构造总召命令帧 # ASDU: 类型标识=100, VSQ=1, COT=6(激活), 公共地址, 信息对象地址=0, QOI=20(站召唤) asdu = struct.pack(' 10: print(f" ... 还有 {len(valid_values) - 10} 个有效数据点") if len(parsed_data['data_values']) > len(valid_values): print(f" (过滤了 {len(parsed_data['data_values']) - len(valid_values)} 个异常值)") # 显示原始数据用于调试 if 'raw_data' in parsed_data and parsed_data['raw_data']: print(f"原始数据: {parsed_data['raw_data']}") # 记录公共地址用于总召 if 'common_addr' in parsed_data: self.common_addr = parsed_data['common_addr'] def get_quality_description(self, quality): """获取质量描述符""" if quality == 0: return "(良好)" elif quality & 0x80: return "(无效)" elif quality & 0x40: return "(未更新)" elif quality & 0x20: return "(被取代)" elif quality & 0x10: return "(被阻塞)" else: return f"(质量: {quality})" print("-" * 50) def start(self): """启动客户端""" max_retries = 5 retry_count = 0 while retry_count < max_retries: if not self.connect(): retry_count += 1 if retry_count < max_retries: print(f"连接失败,{5}秒后重试 ({retry_count}/{max_retries})") time.sleep(5) continue else: print("达到最大重试次数,退出程序") return self.running = True retry_count = 0 # 重置重试计数 # 启动接收线程 receive_thread = threading.Thread(target=self.receive_data) receive_thread.daemon = True receive_thread.start() # 发送启动数据传输命令 time.sleep(1) self.send_startdt() # 等待启动确认后发送总召 time.sleep(2) self.send_general_interrogation() try: # 主循环 - 定期发送测试帧和总召 last_gi_time = time.time() while self.running and self.connected: current_time = time.time() # 每5分钟发送一次总召 if current_time - last_gi_time >= 300: # 300秒 = 5分钟 self.send_general_interrogation() last_gi_time = current_time # 每30秒发送一次测试帧 time.sleep(30) if self.connected: self.send_testfr() # 检查接收线程是否还活着 if not receive_thread.is_alive(): print("接收线程已停止,准备重连") break except KeyboardInterrupt: print("\n程序被用户中断") break finally: self.disconnect() # 如果不是用户中断,则尝试重连 if self.running: print("连接断开,5秒后尝试重连...") time.sleep(5) def main(): # 配置参数 HOST = "192.168.50.242" PORT = 2404 print(f"启动IEC 104客户端") print(f"目标服务器: {HOST}:{PORT}") print("按 Ctrl+C 退出程序\n") client = IEC104Client(HOST, PORT) client.start() if __name__ == "__main__": main()