| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- import socket
- import struct
- import time
- import traceback
- from enum import Enum
- from tmp.iec104.ASDU import ASDU
- class IEC104Type(Enum):
- M_SP_NA_1 = 1 # 单点遥信
- M_DP_NA_1 = 3 # 双点遥信
- M_ME_NA_1 = 9 # 测量值,规一化值
- M_ME_NB_1 = 11 # 测量值,标度化值
- M_ME_NC_1 = 13 # 测量值,短浮点数
- M_SP_TB_1 = 30 # 带时标单点遥信
- M_DP_TB_1 = 31 # 带时标双点遥信
- CUSTOM_151 = 151 # 自定义遥信类型
- @classmethod
- def get_description(cls, type_id):
- descriptions = {
- 1: "单点遥信",
- 3: "双点遥信",
- 9: "测量值(规一化)",
- 11: "测量值(标度化)",
- 13: "测量值(短浮点)",
- 30: "带时标单点遥信",
- 31: "带时标双点遥信",
- 151: "自定义遥信"
- }
- return descriptions.get(type_id, f"未知类型({type_id})")
- class IEC104Client:
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.socket = None
- self.send_seq = 0
- self.recv_seq = 0
- self.max_retries = 3
- self.retry_interval = 5 # 重试间隔(秒)
- def connect(self):
- retry_count = 0
- while retry_count < self.max_retries:
- try:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.settimeout(10) # 设置超时时间
- self.socket.connect((self.host, self.port))
- self.send_seq = 0
- self.recv_seq = 0
- print(f"已连接到 {self.host}:{self.port}")
- return True
- except Exception as e:
- retry_count += 1
- print(f"连接失败 (尝试 {retry_count}/{self.max_retries}): {e}")
- if retry_count < self.max_retries:
- time.sleep(self.retry_interval)
- return False
- def disconnect(self):
- if self.socket:
- try:
- self.socket.close()
- except:
- pass
- self.socket = None
- print("连接已断开")
- def send_startdt(self):
- try:
- # 发送STARTDT ACT
- apci = APCI(4, self.send_seq, self.recv_seq)
- startdt_act = apci.pack() + b'\x07\x00\x00\x00' # U格式帧,STARTDT ACT
- self.socket.send(startdt_act)
- self.send_seq += 1
- print("已发送STARTDT激活命令")
- # 设置超时等待STARTDT CON
- self.socket.settimeout(5) # 5秒超时
- try:
- data = self.socket.recv(1024)
- if len(data) >= 6 and data[6:10] == b'\x0B\x00\x00\x00': # STARTDT CON
- print("收到STARTDT确认,连接已激活")
- return True
- # 如果收到其他响应,可能是服务器拒绝了连接
- if len(data) > 0:
- print(f"收到非预期响应: {data.hex()}")
- else:
- print("未收到STARTDT确认响应")
- return False
- except socket.timeout:
- print("等待STARTDT确认超时")
- return False
- except ConnectionResetError:
- print("发送STARTDT时连接被重置,服务器可能拒绝了连接")
- print(traceback.format_exc())
- return False
- except Exception as e:
- print(f"发送STARTDT时发生错误: {e}")
- print(traceback.format_exc())
- return False
- def send_general_interrogation(self):
- try:
- apci = APCI(14, self.send_seq, self.recv_seq)
- gi_asdu = struct.pack('!BBHHHBBH',
- 100, # 类型ID 100 = 总召
- 0x01, # VSQ (1个元素)
- 0x06, # COT (激活)
- 0x00, # 公共地址
- 0x00, # 信息对象地址
- 0x00, # 限定词 (QOI)
- 0x14, # 总召限定词 (20)
- 0x00) # 无时标
- gi_frame = apci.pack() + gi_asdu
- self.socket.send(gi_frame)
- self.send_seq += 1
- print("发送总召命令")
- return True
- except Exception as e:
- print(f"发送总召命令错误: {e}")
- return False
- def receive_data(self):
- while True:
- try:
- data = self.socket.recv(1024)
- if not data:
- print("连接被远程主机关闭")
- break
- apci = APCI.unpack(data[:6])
- if not apci:
- continue
- self.recv_seq = apci.send_seq
- if apci.length == 4:
- if data[6:10] == b'\x0B\x00\x00\x00':
- print("收到STARTDT CON")
- elif data[6:10] == b'\x13\x00\x00\x00':
- print("收到STOPDT CON")
- continue
- asdu = ASDU.unpack(data[6:6 + apci.length - 4])
- if not asdu:
- continue
- self._process_asdu(asdu)
- if asdu.cot in [1, 3, 5, 7, 9, 11, 20]:
- self._send_ack()
- except socket.timeout:
- print("接收数据超时,发送测试帧保持连接...")
- self._send_test_frame()
- continue
- except ConnectionResetError:
- print("连接被远程主机重置")
- break
- except Exception as e:
- print(f"接收数据错误: {e}")
- break
- def _process_asdu(self, asdu):
- type_desc = IEC104Type.get_description(asdu.type_id)
- cot_desc = self._get_cot_description(asdu.cot)
- print(f"\n收到ASDU: 类型={type_desc}, 原因={cot_desc}({asdu.cot}), 公共地址={asdu.common_addr}")
- for io in asdu.io_elements:
- print(f" 点地址: {io['address']}, 数据: {self._format_io_data(asdu.type_id, io['data'])}")
- if 'time' in io['data']:
- time_data = io['data']['time']
- print(f" 时间: {time_data['year']}-{time_data['month']:02d}-{time_data['day']:02d} "
- f"{time_data['hour']:02d}:{time_data['minute']:02d}:{time_data['milliseconds'] / 1000:.3f}")
- def _format_io_data(self, type_id, data):
- if type_id in [1, 30, 151]:
- return f"值={data['value']}, 质量={data['quality']}"
- elif type_id in [3, 31]:
- states = ["中间状态", "分", "合", "不确定"]
- state = states[data['value']] if data['value'] < len(states) else "未知"
- return f"状态={state}({data['value']}), 质量={data['quality']}"
- elif type_id in [9, 11, 13]:
- return f"值={data['value']}, 质量={data['quality']}"
- else:
- return str(data)
- def _get_cot_description(self, cot):
- descriptions = {
- 1: "周期/循环",
- 2: "背景扫描",
- 3: "突发",
- 5: "请求或被请求",
- 6: "激活",
- 7: "激活确认",
- 8: "停止激活",
- 9: "停止激活确认",
- 20: "响应总召"
- }
- return descriptions.get(cot, f"未知原因({cot})")
- def _send_ack(self):
- try:
- apci = APCI(4, self.send_seq, self.recv_seq)
- ack_frame = apci.pack() + b'\x00\x00\x00\x00'
- self.socket.send(ack_frame)
- self.send_seq += 1
- except Exception as e:
- print(f"发送确认帧错误: {e}")
- def _send_test_frame(self):
- try:
- apci = APCI(4, self.send_seq, self.recv_seq)
- test_frame = apci.pack() + b'\x43\x00\x00\x00' # 测试帧
- self.socket.send(test_frame)
- self.send_seq += 1
- except Exception as e:
- print(f"发送测试帧错误: {e}")
- def main():
- client = IEC104Client("192.168.50.242", 2404)
- try:
- while True:
- if client.connect():
- if client.send_startdt():
- if client.send_general_interrogation():
- client.receive_data()
- print(f"将在 {client.retry_interval} 秒后尝试重新连接...")
- time.sleep(client.retry_interval)
- except KeyboardInterrupt:
- print("\n用户中断")
- except Exception as e:
- print(f"发生错误: {e}")
- print(traceback.format_exc())
- finally:
- client.disconnect()
- if __name__ == "__main__":
- main()
|