||
- import socket
- import struct
- import threading
- import time
- from datetime import datetime
- class IEC104Client:
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.socket = None
- self.connected = False
- self.running = False
- # IEC 104 协议常量
- self.START_BYTE = 0x68
- self.TESTFR_ACT = b'\x68\x04\x43\x00\x00\x00'
- self.TESTFR_CON = b'\x68\x04\x83\x00\x00\x00'
- self.STARTDT_ACT = b'\x68\x04\x07\x00\x00\x00'
- self.STARTDT_CON = b'\x68\x04\x0B\x00\x00\x00'
- self.STOPDT_ACT = b'\x68\x04\x13\x00\x00\x00'
- self.send_seq = 0
- self.recv_seq = 0
- def connect(self):
- """连接到IEC 104服务器"""
- try:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.settimeout(10)
- self.socket.connect((self.host, self.port))
- self.connected = True
- print(f"成功连接到 {self.host}:{self.port}")
- return True
- except Exception as e:
- print(f"连接失败: {e}")
- return False
- def disconnect(self):
- """断开连接"""
- self.running = False
- if self.socket:
- try:
- # 发送停止数据传输命令
- self.socket.send(self.STOPDT_ACT)
- self.socket.close()
- except:
- pass
- self.connected = False
- print("连接已断开")
- def send_startdt(self):
- """发送启动数据传输命令"""
- try:
- self.socket.send(self.STARTDT_ACT)
- print("发送启动数据传输命令")
- except Exception as e:
- print(f"发送启动命令失败: {e}")
- def send_testfr(self):
- """发送测试帧"""
- try:
- self.socket.send(self.TESTFR_ACT)
- print("发送测试帧")
- except Exception as e:
- print(f"发送测试帧失败: {e}")
- def send_s_frame(self):
- """发送S格式监视帧确认"""
- try:
- # S格式帧:68 04 01 00 + 接收序号
- s_frame = b'\x68\x04\x01\x00' + struct.pack('<H', (self.recv_seq << 1))
- self.socket.send(s_frame)
- except Exception as e:
- print(f"发送S帧失败: {e}")
- def parse_apdu(self, data):
- """解析APDU(应用协议数据单元)"""
- if len(data) < 6:
- return None
- # 检查起始字节
- if data[0] != self.START_BYTE:
- return None
- length = data[1]
- # U格式帧(无编号控制功能)
- if length == 4 and len(data) >= 6:
- ctrl1 = data[2]
- ctrl2 = data[3]
- ctrl3 = data[4]
- ctrl4 = data[5]
- if ctrl1 == 0x07 and ctrl2 == 0x00:
- return {"type": "STARTDT_ACT", "description": "启动数据传输激活"}
- elif ctrl1 == 0x0B and ctrl2 == 0x00:
- return {"type": "STARTDT_CON", "description": "启动数据传输确认"}
- elif ctrl1 == 0x43 and ctrl2 == 0x00:
- return {"type": "TESTFR_ACT", "description": "测试帧激活"}
- elif ctrl1 == 0x83 and ctrl2 == 0x00:
- return {"type": "TESTFR_CON", "description": "测试帧确认"}
- # I格式帧(信息传输)
- elif length > 4:
- return self.parse_i_frame(data)
- return {"type": "UNKNOWN", "data": data.hex()}
- def parse_i_frame(self, data):
- """解析I格式帧"""
- if len(data) < 8:
- return None
- # 提取序列号
- send_seq = struct.unpack('<H', data[2:4])[0] >> 1
- recv_seq = struct.unpack('<H', data[4:6])[0] >> 1
- # ASDU(应用服务数据单元)
- if len(data) > 6:
- asdu = data[6:]
- print('报文:', data.hex())
- return self.parse_asdu(asdu, send_seq, recv_seq)
- return {
- "type": "I_FRAME",
- "send_seq": send_seq,
- "recv_seq": recv_seq,
- "data": data[6:].hex() if len(data) > 6 else ""
- }
- def parse_asdu(self, asdu_data, send_seq, recv_seq):
- """解析ASDU"""
- if len(asdu_data) < 6:
- return {"type": "I_FRAME", "error": "ASDU too short"}
- type_id = asdu_data[0]
- vsq = asdu_data[1] # 变量结构限定词
- cot = struct.unpack('<H', asdu_data[2:4])[0] # 传送原因(2字节)
- common_addr = struct.unpack('<H', asdu_data[4:6])[0] # 公共地址(2字节)
- # 信息对象地址根据VSQ确定
- num_objects = vsq & 0x7F
- sq = (vsq & 0x80) >> 7 # 序列标志
- result = {
- "type": "I_FRAME",
- "send_seq": send_seq,
- "recv_seq": recv_seq,
- "type_id": type_id,
- "type_description": self.get_type_description(type_id),
- "vsq": vsq,
- "num_objects": num_objects,
- "sq": sq,
- "cot": cot,
- "cot_description": self.get_cot_description(cot & 0x3F), # 只取低6位
- "common_addr": common_addr,
- "data_values": [],
- # "raw_data": asdu_data[6:].hex() if len(asdu_data) > 6 else "",
- "raw_data": asdu_data.hex() if len(asdu_data) > 6 else ""
- }
- # 解析数据值
- if len(asdu_data) > 6:
- data_part = asdu_data[6:]
- # 根据类型ID解析具体数据
- if type_id == 1: # 单点信息
- result["data_values"] = self.parse_single_point_info(data_part, num_objects, sq)
- elif type_id == 3: # 双点信息
- result["data_values"] = self.parse_double_point_info(data_part, num_objects, sq)
- elif type_id == 9: # 测量值,标准化值
- result["data_values"] = self.parse_normalized_value(data_part, num_objects, sq)
- elif type_id == 11: # 测量值,标度化值
- result["data_values"] = self.parse_scaled_value(data_part, num_objects, sq)
- elif type_id == 13: # 测量值,浮点数
- result["data_values"] = self.parse_float_value(data_part, num_objects, sq)
- return result
- def get_type_description(self, type_id):
- """获取类型标识描述"""
- type_descriptions = {
- 1: "单点信息",
- 2: "带时标的单点信息",
- 3: "双点信息",
- 4: "带时标的双点信息",
- 5: "步位置信息",
- 6: "带时标的步位置信息",
- 7: "32位串信息",
- 8: "带时标的32位串信息",
- 9: "测量值,标准化值",
- 10: "带时标的测量值,标准化值",
- 11: "测量值,标度化值",
- 12: "带时标的测量值,标度化值",
- 13: "测量值,浮点数",
- 14: "带时标的测量值,浮点数",
- 30: "带时标的单点信息",
- 36: "测量值,浮点数带时标",
- 100: "总召唤命令",
- 101: "计数量召唤命令"
- }
- return type_descriptions.get(type_id, f"未知类型 ({type_id})")
- def get_cot_description(self, cot):
- """获取传送原因描述"""
- cot_descriptions = {
- 1: "周期循环",
- 2: "背景扫描",
- 3: "自发",
- 4: "初始化",
- 5: "请求",
- 6: "激活",
- 7: "激活确认",
- 8: "停止激活",
- 9: "停止激活确认",
- 10: "激活结束",
- 20: "响应站召唤"
- }
- return cot_descriptions.get(cot, f"未知原因 ({cot})")
- def parse_single_point_info(self, data, num_objects, sq):
- """解析单点信息"""
- values = []
- pos = 0
- if sq == 0: # 非连续地址
- for i in range(num_objects):
- if pos + 3 <= len(data):
- addr = struct.unpack('<H', data[pos:pos + 2])[0]
- value = data[pos + 2] & 0x01
- quality = data[pos + 3]
- values.append({
- "address": addr,
- "value": bool(value),
- "quality": quality
- })
- pos += 4
- else: # 连续地址
- if len(data) >= 2:
- base_addr = struct.unpack('<H', data[0:2])[0]
- pos = 2
- for i in range(num_objects):
- if pos + 1 <= len(data):
- value = data[pos] & 0x01
- quality = data[pos + 1]
- values.append({
- "address": base_addr + i,
- "value": bool(value),
- "quality": quality
- })
- pos += 2
- return values
- def parse_double_point_info(self, data, num_objects, sq):
- """解析双点信息"""
- values = []
- pos = 0
- if sq == 0: # 非连续地址
- for i in range(num_objects):
- if pos + 3 <= len(data):
- addr = struct.unpack('<H', data[pos:pos + 2])[0]
- value = data[pos + 2] & 0x03
- quality = data[pos + 3]
- values.append({
- "address": addr,
- "value": value,
- "quality": quality
- })
- pos += 4
- else: # 连续地址
- if len(data) >= 2:
- base_addr = struct.unpack('<H', data[0:2])[0]
- pos = 2
- for i in range(num_objects):
- if pos + 1 <= len(data):
- value = data[pos] & 0x03
- quality = data[pos + 1]
- values.append({
- "address": base_addr + i,
- "value": value,
- "quality": quality
- })
- pos += 2
- return values
- def parse_normalized_value(self, data, num_objects, sq):
- """解析标准化值"""
- values = []
- pos = 0
- if sq == 0: # 非连续地址
- for i in range(num_objects):
- if pos + 6 <= len(data):
- addr = struct.unpack('<H', data[pos:pos + 2])[0]
- # 标准化值范围是-1.0到1.0,对应-32768到32767
- raw_value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
- value = raw_value / 32768.0 # 转换为-1.0到1.0范围
- quality = data[pos + 4]
- values.append({
- "address": addr,
- "value": round(value, 6), # 保留6位小数
- "quality": quality,
- "raw_value": raw_value # 保留原始值用于调试
- })
- pos += 6
- else: # 连续地址
- if len(data) >= 2:
- base_addr = struct.unpack('<H', data[0:2])[0]
- pos = 2
- for i in range(num_objects):
- if pos + 3 <= len(data):
- raw_value = struct.unpack('<h', data[pos:pos + 2])[0]
- value = raw_value / 32768.0
- quality = data[pos + 2]
- values.append({
- "address": base_addr + i,
- "value": round(value, 6),
- "quality": quality,
- "raw_value": raw_value
- })
- pos += 3
- return values
- def parse_scaled_value(self, data, num_objects, sq):
- """解析标度化值"""
- values = []
- pos = 0
- if sq == 0: # 非连续地址
- for i in range(num_objects):
- if pos + 6 <= len(data):
- addr = struct.unpack('<H', data[pos:pos + 2])[0]
- # 标度化值实际上是4字节,包含2字节值和2字节质量描述
- value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
- quality = struct.unpack('<H', data[pos + 4:pos + 6])[0]
- # 将值转换为模拟器显示的格式
- scaled_value = value >> 7 # 取高8位作为显示值
- values.append({
- "address": addr,
- "value": scaled_value,
- "quality": quality & 0xFF # 只取低8位质量位
- })
- pos += 6
- else: # 连续地址
- if len(data) >= 2:
- base_addr = struct.unpack('<H', data[0:2])[0]
- pos = 2
- for i in range(num_objects):
- if pos + 4 <= len(data):
- value = struct.unpack('<h', data[pos:pos + 2])[0]
- quality = struct.unpack('<H', data[pos + 2:pos + 4])[0]
- scaled_value = value >> 8
- values.append({
- "address": base_addr + i,
- "value": scaled_value,
- "quality": quality & 0xFF
- })
- pos += 4
- return values
- def parse_float_value(self, data, num_objects, sq):
- """解析浮点数值"""
- values = []
- pos = 0
- if sq == 0: # 非连续地址
- # 每个信息对象:3字节地址 + 4字节浮点数 + 1字节质量
- for i in range(num_objects):
- if pos + 8 <= len(data):
- # 3字节地址(小端序)
- addr = struct.unpack('<I', data[pos:pos + 3] + b'\x00')[0]
- # 4字节IEEE754浮点数(小端序)
- value_bytes = data[pos + 3:pos + 7]
- value = struct.unpack('<f', value_bytes)[0]
- quality = data[pos + 7]
- # 检查数值有效性
- if not (value != value): # 检查NaN
- values.append({
- "address": addr,
- "value": round(value, 6),
- "quality": quality,
- "raw_bytes": value_bytes.hex()
- })
- pos += 8
- else: # 连续地址
- if len(data) >= 3:
- # 第一个信息对象地址
- base_addr = struct.unpack('<I', data[0:3] + b'\x00')[0]
- pos = 3
- # 后续对象只有数值和质量
- for i in range(num_objects):
- if pos + 5 <= len(data):
- value_bytes = data[pos:pos + 4]
- value = struct.unpack('<f', value_bytes)[0]
- quality = data[pos + 4]
- if not (value != value): # 检查NaN
- values.append({
- "address": base_addr + i,
- "value": round(value, 6),
- "quality": quality,
- "raw_bytes": value_bytes.hex()
- })
- pos += 5
- return values
- def receive_data(self):
- """接收数据的主循环"""
- buffer = b''
- while self.running:
- try:
- data = self.socket.recv(1024)
- if not data:
- print("连接被服务器关闭")
- break
- buffer += data
- # 处理缓冲区中的完整帧
- while len(buffer) >= 2:
- if buffer[0] != self.START_BYTE:
- buffer = buffer[1:]
- continue
- if len(buffer) < 2:
- break
- frame_length = buffer[1] + 2 # 长度字段不包括起始字节和长度字节本身
- if len(buffer) < frame_length:
- break
- frame = buffer[:frame_length]
- buffer = buffer[frame_length:]
- # 解析并显示帧
- parsed = self.parse_apdu(frame)
- if parsed:
- self.display_data(parsed)
- # 发送S格式确认帧(监视功能)
- if parsed.get("type") == "I_FRAME":
- self.send_s_frame()
- # 更新接收序号
- if parsed.get("type") == "I_FRAME" and "send_seq" in parsed:
- self.recv_seq = parsed["send_seq"] + 1
- # 自动回复确认帧
- if parsed and parsed.get("type") == "TESTFR_ACT":
- self.socket.send(self.TESTFR_CON)
- except socket.timeout:
- continue
- except Exception as e:
- print(f"接收数据错误: {e}")
- break
- def send_general_interrogation(self):
- """发送总召命令(C_IC_NA_1)"""
- try:
- # 构造总召命令帧
- # ASDU: 类型标识=100, VSQ=1, COT=6(激活), 公共地址, 信息对象地址=0, QOI=20(站召唤)
- asdu = struct.pack('<BBHHIB',
- 100, # 类型标识:总召命令
- 1, # VSQ:1个信息对象
- 6, # COT:激活
- self.common_addr if hasattr(self, 'common_addr') else 1, # 公共地址
- 0, # 信息对象地址
- 20 # QOI:站召唤
- )
- # 构造I格式帧
- frame_length = len(asdu) + 4
- i_frame = struct.pack('<BBH',
- self.START_BYTE,
- frame_length,
- (self.send_seq << 1) # 发送序号
- ) + struct.pack('<H', (self.recv_seq << 1)) + asdu # 接收序号 + ASDU
- self.socket.send(i_frame)
- self.send_seq += 1
- print(f"发送总召命令 (发送序号: {self.send_seq - 1})")
- except Exception as e:
- print(f"发送总召命令失败: {e}")
- def send_clock_sync(self):
- """发送时钟同步命令"""
- try:
- # 获取当前时间
- now = datetime.now()
- # CP56Time2a格式:毫秒(2) + 分钟(1) + 小时(1) + 日期(3)
- ms = (now.second * 1000 + now.microsecond // 1000) & 0xFFFF
- minute = now.minute & 0x3F
- hour = now.hour & 0x1F
- day = now.day & 0x1F
- month = now.month & 0x0F
- year = (now.year - 2000) & 0x7F
- time_bytes = struct.pack('<HBBBB', ms, minute, hour,
- (month << 5) | day, year)
- # 构造时钟同步ASDU
- asdu = struct.pack('<BBHHI',
- 103, # 类型标识:时钟同步命令
- 1, # VSQ
- 6, # COT:激活
- self.common_addr if hasattr(self, 'common_addr') else 1,
- 0 # 信息对象地址
- ) + time_bytes
- frame_length = len(asdu) + 4
- i_frame = struct.pack('<BBH',
- self.START_BYTE,
- frame_length,
- (self.send_seq << 1)
- ) + struct.pack('<H', (self.recv_seq << 1)) + asdu
- self.socket.send(i_frame)
- self.send_seq += 1
- print(f"发送时钟同步命令")
- except Exception as e:
- print(f"发送时钟同步命令失败: {e}")
- def display_data(self, parsed_data):
- """显示解析后的数据"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"\n[{timestamp}] 接收到数据:")
- print(f"类型: {parsed_data.get('type', 'UNKNOWN')}")
- if 'description' in parsed_data:
- print(f"描述: {parsed_data['description']}")
- if 'type_description' in parsed_data:
- print(f"ASDU类型: {parsed_data['type_description']} (ID: {parsed_data['type_id']})")
- print(f"传送原因: {parsed_data['cot_description']} ({parsed_data['cot']})")
- print(f"公共地址: {parsed_data['common_addr']}")
- if 'num_objects' in parsed_data:
- print(f"信息对象数量: {parsed_data['num_objects']}, SQ: {parsed_data.get('sq', 0)}")
- if parsed_data.get('data_values'):
- print("数据值:")
- valid_values = [v for v in parsed_data['data_values'] if v.get('value') is not None]
- for i, value in enumerate(valid_values[:10]): # 显示前10个有效值
- quality_desc = self.get_quality_description(value.get('quality', 0))
- raw_info = f" [原始: {value.get('raw_bytes', '')}]" if 'raw_bytes' in value else ""
- print(f" 地址 {value.get('address', 'N/A')}: {value.get('value', 'N/A')} {quality_desc}{raw_info}")
- if len(valid_values) > 10:
- print(f" ... 还有 {len(valid_values) - 10} 个有效数据点")
- if len(parsed_data['data_values']) > len(valid_values):
- print(f" (过滤了 {len(parsed_data['data_values']) - len(valid_values)} 个异常值)")
- # 显示原始数据用于调试
- if 'raw_data' in parsed_data and parsed_data['raw_data']:
- print(f"原始数据: {parsed_data['raw_data']}")
- # 记录公共地址用于总召
- if 'common_addr' in parsed_data:
- self.common_addr = parsed_data['common_addr']
- def get_quality_description(self, quality):
- """获取质量描述符"""
- if quality == 0:
- return "(良好)"
- elif quality & 0x80:
- return "(无效)"
- elif quality & 0x40:
- return "(未更新)"
- elif quality & 0x20:
- return "(被取代)"
- elif quality & 0x10:
- return "(被阻塞)"
- else:
- return f"(质量: {quality})"
- print("-" * 50)
- def start(self):
- """启动客户端"""
- max_retries = 5
- retry_count = 0
- while retry_count < max_retries:
- if not self.connect():
- retry_count += 1
- if retry_count < max_retries:
- print(f"连接失败,{5}秒后重试 ({retry_count}/{max_retries})")
- time.sleep(5)
- continue
- else:
- print("达到最大重试次数,退出程序")
- return
- self.running = True
- retry_count = 0 # 重置重试计数
- # 启动接收线程
- receive_thread = threading.Thread(target=self.receive_data)
- receive_thread.daemon = True
- receive_thread.start()
- # 发送启动数据传输命令
- time.sleep(1)
- self.send_startdt()
- # 等待启动确认后发送总召
- time.sleep(2)
- self.send_general_interrogation()
- try:
- # 主循环 - 定期发送测试帧和总召
- last_gi_time = time.time()
- while self.running and self.connected:
- current_time = time.time()
- # 每5分钟发送一次总召
- if current_time - last_gi_time >= 300: # 300秒 = 5分钟
- self.send_general_interrogation()
- last_gi_time = current_time
- # 每30秒发送一次测试帧
- time.sleep(30)
- if self.connected:
- self.send_testfr()
- # 检查接收线程是否还活着
- if not receive_thread.is_alive():
- print("接收线程已停止,准备重连")
- break
- except KeyboardInterrupt:
- print("\n程序被用户中断")
- break
- finally:
- self.disconnect()
- # 如果不是用户中断,则尝试重连
- if self.running:
- print("连接断开,5秒后尝试重连...")
- time.sleep(5)
- def main():
- # 配置参数
- HOST = "192.168.50.242"
- PORT = 2404
- print(f"启动IEC 104客户端")
- print(f"目标服务器: {HOST}:{PORT}")
- print("按 Ctrl+C 退出程序\n")
- client = IEC104Client(HOST, PORT)
- client.start()
- if __name__ == "__main__":
- main()
|