iec104_client.py 25 KB


  1. import socket
  2. import struct
  3. import threading
  4. import time
  5. from datetime import datetime
  6. class IEC104Client:
  7. def __init__(self, host, port):
  8. self.host = host
  9. self.port = port
  10. self.socket = None
  11. self.connected = False
  12. self.running = False
  13. # IEC 104 协议常量
  14. self.START_BYTE = 0x68
  15. self.TESTFR_ACT = b'\x68\x04\x43\x00\x00\x00'
  16. self.TESTFR_CON = b'\x68\x04\x83\x00\x00\x00'
  17. self.STARTDT_ACT = b'\x68\x04\x07\x00\x00\x00'
  18. self.STARTDT_CON = b'\x68\x04\x0B\x00\x00\x00'
  19. self.STOPDT_ACT = b'\x68\x04\x13\x00\x00\x00'
  20. self.send_seq = 0
  21. self.recv_seq = 0
  22. def connect(self):
  23. """连接到IEC 104服务器"""
  24. try:
  25. self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  26. self.socket.settimeout(10)
  27. self.socket.connect((self.host, self.port))
  28. self.connected = True
  29. print(f"成功连接到 {self.host}:{self.port}")
  30. return True
  31. except Exception as e:
  32. print(f"连接失败: {e}")
  33. return False
  34. def disconnect(self):
  35. """断开连接"""
  36. self.running = False
  37. if self.socket:
  38. try:
  39. # 发送停止数据传输命令
  40. self.socket.send(self.STOPDT_ACT)
  41. self.socket.close()
  42. except:
  43. pass
  44. self.connected = False
  45. print("连接已断开")
  46. def send_startdt(self):
  47. """发送启动数据传输命令"""
  48. try:
  49. self.socket.send(self.STARTDT_ACT)
  50. print("发送启动数据传输命令")
  51. except Exception as e:
  52. print(f"发送启动命令失败: {e}")
  53. def send_testfr(self):
  54. """发送测试帧"""
  55. try:
  56. self.socket.send(self.TESTFR_ACT)
  57. print("发送测试帧")
  58. except Exception as e:
  59. print(f"发送测试帧失败: {e}")
  60. def send_s_frame(self):
  61. """发送S格式监视帧确认"""
  62. try:
  63. # S格式帧:68 04 01 00 + 接收序号
  64. s_frame = b'\x68\x04\x01\x00' + struct.pack('<H', (self.recv_seq << 1))
  65. self.socket.send(s_frame)
  66. except Exception as e:
  67. print(f"发送S帧失败: {e}")
  68. def parse_apdu(self, data):
  69. """解析APDU(应用协议数据单元)"""
  70. if len(data) < 6:
  71. return None
  72. # 检查起始字节
  73. if data[0] != self.START_BYTE:
  74. return None
  75. length = data[1]
  76. # U格式帧(无编号控制功能)
  77. if length == 4 and len(data) >= 6:
  78. ctrl1 = data[2]
  79. ctrl2 = data[3]
  80. ctrl3 = data[4]
  81. ctrl4 = data[5]
  82. if ctrl1 == 0x07 and ctrl2 == 0x00:
  83. return {"type": "STARTDT_ACT", "description": "启动数据传输激活"}
  84. elif ctrl1 == 0x0B and ctrl2 == 0x00:
  85. return {"type": "STARTDT_CON", "description": "启动数据传输确认"}
  86. elif ctrl1 == 0x43 and ctrl2 == 0x00:
  87. return {"type": "TESTFR_ACT", "description": "测试帧激活"}
  88. elif ctrl1 == 0x83 and ctrl2 == 0x00:
  89. return {"type": "TESTFR_CON", "description": "测试帧确认"}
  90. # I格式帧(信息传输)
  91. elif length > 4:
  92. return self.parse_i_frame(data)
  93. return {"type": "UNKNOWN", "data": data.hex()}
  94. def parse_i_frame(self, data):
  95. """解析I格式帧"""
  96. if len(data) < 8:
  97. return None
  98. # 提取序列号
  99. send_seq = struct.unpack('<H', data[2:4])[0] >> 1
  100. recv_seq = struct.unpack('<H', data[4:6])[0] >> 1
  101. # ASDU(应用服务数据单元)
  102. if len(data) > 6:
  103. asdu = data[6:]
  104. print('报文:', data.hex())
  105. return self.parse_asdu(asdu, send_seq, recv_seq)
  106. return {
  107. "type": "I_FRAME",
  108. "send_seq": send_seq,
  109. "recv_seq": recv_seq,
  110. "data": data[6:].hex() if len(data) > 6 else ""
  111. }
  112. def parse_asdu(self, asdu_data, send_seq, recv_seq):
  113. """解析ASDU"""
  114. if len(asdu_data) < 6:
  115. return {"type": "I_FRAME", "error": "ASDU too short"}
  116. type_id = asdu_data[0]
  117. vsq = asdu_data[1] # 变量结构限定词
  118. cot = struct.unpack('<H', asdu_data[2:4])[0] # 传送原因(2字节)
  119. common_addr = struct.unpack('<H', asdu_data[4:6])[0] # 公共地址(2字节)
  120. # 信息对象地址根据VSQ确定
  121. num_objects = vsq & 0x7F
  122. sq = (vsq & 0x80) >> 7 # 序列标志
  123. result = {
  124. "type": "I_FRAME",
  125. "send_seq": send_seq,
  126. "recv_seq": recv_seq,
  127. "type_id": type_id,
  128. "type_description": self.get_type_description(type_id),
  129. "vsq": vsq,
  130. "num_objects": num_objects,
  131. "sq": sq,
  132. "cot": cot,
  133. "cot_description": self.get_cot_description(cot & 0x3F), # 只取低6位
  134. "common_addr": common_addr,
  135. "data_values": [],
  136. # "raw_data": asdu_data[6:].hex() if len(asdu_data) > 6 else "",
  137. "raw_data": asdu_data.hex() if len(asdu_data) > 6 else ""
  138. }
  139. # 解析数据值
  140. if len(asdu_data) > 6:
  141. data_part = asdu_data[6:]
  142. # 根据类型ID解析具体数据
  143. if type_id == 1: # 单点信息
  144. result["data_values"] = self.parse_single_point_info(data_part, num_objects, sq)
  145. elif type_id == 3: # 双点信息
  146. result["data_values"] = self.parse_double_point_info(data_part, num_objects, sq)
  147. elif type_id == 9: # 测量值,标准化值
  148. result["data_values"] = self.parse_normalized_value(data_part, num_objects, sq)
  149. elif type_id == 11: # 测量值,标度化值
  150. result["data_values"] = self.parse_scaled_value(data_part, num_objects, sq)
  151. elif type_id == 13: # 测量值,浮点数
  152. result["data_values"] = self.parse_float_value(data_part, num_objects, sq)
  153. return result
  154. def get_type_description(self, type_id):
  155. """获取类型标识描述"""
  156. type_descriptions = {
  157. 1: "单点信息",
  158. 2: "带时标的单点信息",
  159. 3: "双点信息",
  160. 4: "带时标的双点信息",
  161. 5: "步位置信息",
  162. 6: "带时标的步位置信息",
  163. 7: "32位串信息",
  164. 8: "带时标的32位串信息",
  165. 9: "测量值,标准化值",
  166. 10: "带时标的测量值,标准化值",
  167. 11: "测量值,标度化值",
  168. 12: "带时标的测量值,标度化值",
  169. 13: "测量值,浮点数",
  170. 14: "带时标的测量值,浮点数",
  171. 30: "带时标的单点信息",
  172. 36: "测量值,浮点数带时标",
  173. 100: "总召唤命令",
  174. 101: "计数量召唤命令"
  175. }
  176. return type_descriptions.get(type_id, f"未知类型 ({type_id})")
  177. def get_cot_description(self, cot):
  178. """获取传送原因描述"""
  179. cot_descriptions = {
  180. 1: "周期循环",
  181. 2: "背景扫描",
  182. 3: "自发",
  183. 4: "初始化",
  184. 5: "请求",
  185. 6: "激活",
  186. 7: "激活确认",
  187. 8: "停止激活",
  188. 9: "停止激活确认",
  189. 10: "激活结束",
  190. 20: "响应站召唤"
  191. }
  192. return cot_descriptions.get(cot, f"未知原因 ({cot})")
  193. def parse_single_point_info(self, data, num_objects, sq):
  194. """解析单点信息"""
  195. values = []
  196. pos = 0
  197. if sq == 0: # 非连续地址
  198. for i in range(num_objects):
  199. if pos + 3 <= len(data):
  200. addr = struct.unpack('<H', data[pos:pos + 2])[0]
  201. value = data[pos + 2] & 0x01
  202. quality = data[pos + 3]
  203. values.append({
  204. "address": addr,
  205. "value": bool(value),
  206. "quality": quality
  207. })
  208. pos += 4
  209. else: # 连续地址
  210. if len(data) >= 2:
  211. base_addr = struct.unpack('<H', data[0:2])[0]
  212. pos = 2
  213. for i in range(num_objects):
  214. if pos + 1 <= len(data):
  215. value = data[pos] & 0x01
  216. quality = data[pos + 1]
  217. values.append({
  218. "address": base_addr + i,
  219. "value": bool(value),
  220. "quality": quality
  221. })
  222. pos += 2
  223. return values
  224. def parse_double_point_info(self, data, num_objects, sq):
  225. """解析双点信息"""
  226. values = []
  227. pos = 0
  228. if sq == 0: # 非连续地址
  229. for i in range(num_objects):
  230. if pos + 3 <= len(data):
  231. addr = struct.unpack('<H', data[pos:pos + 2])[0]
  232. value = data[pos + 2] & 0x03
  233. quality = data[pos + 3]
  234. values.append({
  235. "address": addr,
  236. "value": value,
  237. "quality": quality
  238. })
  239. pos += 4
  240. else: # 连续地址
  241. if len(data) >= 2:
  242. base_addr = struct.unpack('<H', data[0:2])[0]
  243. pos = 2
  244. for i in range(num_objects):
  245. if pos + 1 <= len(data):
  246. value = data[pos] & 0x03
  247. quality = data[pos + 1]
  248. values.append({
  249. "address": base_addr + i,
  250. "value": value,
  251. "quality": quality
  252. })
  253. pos += 2
  254. return values
  255. def parse_normalized_value(self, data, num_objects, sq):
  256. """解析标准化值"""
  257. values = []
  258. pos = 0
  259. if sq == 0: # 非连续地址
  260. for i in range(num_objects):
  261. if pos + 6 <= len(data):
  262. addr = struct.unpack('<H', data[pos:pos + 2])[0]
  263. # 标准化值范围是-1.0到1.0,对应-32768到32767
  264. raw_value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
  265. value = raw_value / 32768.0 # 转换为-1.0到1.0范围
  266. quality = data[pos + 4]
  267. values.append({
  268. "address": addr,
  269. "value": round(value, 6), # 保留6位小数
  270. "quality": quality,
  271. "raw_value": raw_value # 保留原始值用于调试
  272. })
  273. pos += 6
  274. else: # 连续地址
  275. if len(data) >= 2:
  276. base_addr = struct.unpack('<H', data[0:2])[0]
  277. pos = 2
  278. for i in range(num_objects):
  279. if pos + 3 <= len(data):
  280. raw_value = struct.unpack('<h', data[pos:pos + 2])[0]
  281. value = raw_value / 32768.0
  282. quality = data[pos + 2]
  283. values.append({
  284. "address": base_addr + i,
  285. "value": round(value, 6),
  286. "quality": quality,
  287. "raw_value": raw_value
  288. })
  289. pos += 3
  290. return values
  291. def parse_scaled_value(self, data, num_objects, sq):
  292. """解析标度化值"""
  293. values = []
  294. pos = 0
  295. if sq == 0: # 非连续地址
  296. for i in range(num_objects):
  297. if pos + 6 <= len(data):
  298. addr = struct.unpack('<H', data[pos:pos + 2])[0]
  299. # 标度化值实际上是4字节,包含2字节值和2字节质量描述
  300. value = struct.unpack('<h', data[pos + 2:pos + 4])[0]
  301. quality = struct.unpack('<H', data[pos + 4:pos + 6])[0]
  302. # 将值转换为模拟器显示的格式
  303. scaled_value = value >> 7 # 取高8位作为显示值
  304. values.append({
  305. "address": addr,
  306. "value": scaled_value,
  307. "quality": quality & 0xFF # 只取低8位质量位
  308. })
  309. pos += 6
  310. else: # 连续地址
  311. if len(data) >= 2:
  312. base_addr = struct.unpack('<H', data[0:2])[0]
  313. pos = 2
  314. for i in range(num_objects):
  315. if pos + 4 <= len(data):
  316. value = struct.unpack('<h', data[pos:pos + 2])[0]
  317. quality = struct.unpack('<H', data[pos + 2:pos + 4])[0]
  318. scaled_value = value >> 8
  319. values.append({
  320. "address": base_addr + i,
  321. "value": scaled_value,
  322. "quality": quality & 0xFF
  323. })
  324. pos += 4
  325. return values
  326. def parse_float_value(self, data, num_objects, sq):
  327. """解析浮点数值"""
  328. values = []
  329. pos = 0
  330. if sq == 0: # 非连续地址
  331. # 每个信息对象:3字节地址 + 4字节浮点数 + 1字节质量
  332. for i in range(num_objects):
  333. if pos + 8 <= len(data):
  334. # 3字节地址(小端序)
  335. addr = struct.unpack('<I', data[pos:pos + 3] + b'\x00')[0]
  336. # 4字节IEEE754浮点数(小端序)
  337. value_bytes = data[pos + 3:pos + 7]
  338. value = struct.unpack('<f', value_bytes)[0]
  339. quality = data[pos + 7]
  340. # 检查数值有效性
  341. if not (value != value): # 检查NaN
  342. values.append({
  343. "address": addr,
  344. "value": round(value, 6),
  345. "quality": quality,
  346. "raw_bytes": value_bytes.hex()
  347. })
  348. pos += 8
  349. else: # 连续地址
  350. if len(data) >= 3:
  351. # 第一个信息对象地址
  352. base_addr = struct.unpack('<I', data[0:3] + b'\x00')[0]
  353. pos = 3
  354. # 后续对象只有数值和质量
  355. for i in range(num_objects):
  356. if pos + 5 <= len(data):
  357. value_bytes = data[pos:pos + 4]
  358. value = struct.unpack('<f', value_bytes)[0]
  359. quality = data[pos + 4]
  360. if not (value != value): # 检查NaN
  361. values.append({
  362. "address": base_addr + i,
  363. "value": round(value, 6),
  364. "quality": quality,
  365. "raw_bytes": value_bytes.hex()
  366. })
  367. pos += 5
  368. return values
  369. def receive_data(self):
  370. """接收数据的主循环"""
  371. buffer = b''
  372. while self.running:
  373. try:
  374. data = self.socket.recv(1024)
  375. if not data:
  376. print("连接被服务器关闭")
  377. break
  378. buffer += data
  379. # 处理缓冲区中的完整帧
  380. while len(buffer) >= 2:
  381. if buffer[0] != self.START_BYTE:
  382. buffer = buffer[1:]
  383. continue
  384. if len(buffer) < 2:
  385. break
  386. frame_length = buffer[1] + 2 # 长度字段不包括起始字节和长度字节本身
  387. if len(buffer) < frame_length:
  388. break
  389. frame = buffer[:frame_length]
  390. buffer = buffer[frame_length:]
  391. # 解析并显示帧
  392. parsed = self.parse_apdu(frame)
  393. if parsed:
  394. self.display_data(parsed)
  395. # 发送S格式确认帧(监视功能)
  396. if parsed.get("type") == "I_FRAME":
  397. self.send_s_frame()
  398. # 更新接收序号
  399. if parsed.get("type") == "I_FRAME" and "send_seq" in parsed:
  400. self.recv_seq = parsed["send_seq"] + 1
  401. # 自动回复确认帧
  402. if parsed and parsed.get("type") == "TESTFR_ACT":
  403. self.socket.send(self.TESTFR_CON)
  404. except socket.timeout:
  405. continue
  406. except Exception as e:
  407. print(f"接收数据错误: {e}")
  408. break
  409. def send_general_interrogation(self):
  410. """发送总召命令(C_IC_NA_1)"""
  411. try:
  412. # 构造总召命令帧
  413. # ASDU: 类型标识=100, VSQ=1, COT=6(激活), 公共地址, 信息对象地址=0, QOI=20(站召唤)
  414. asdu = struct.pack('<BBHHIB',
  415. 100, # 类型标识:总召命令
  416. 1, # VSQ:1个信息对象
  417. 6, # COT:激活
  418. self.common_addr if hasattr(self, 'common_addr') else 1, # 公共地址
  419. 0, # 信息对象地址
  420. 20 # QOI:站召唤
  421. )
  422. # 构造I格式帧
  423. frame_length = len(asdu) + 4
  424. i_frame = struct.pack('<BBH',
  425. self.START_BYTE,
  426. frame_length,
  427. (self.send_seq << 1) # 发送序号
  428. ) + struct.pack('<H', (self.recv_seq << 1)) + asdu # 接收序号 + ASDU
  429. self.socket.send(i_frame)
  430. self.send_seq += 1
  431. print(f"发送总召命令 (发送序号: {self.send_seq - 1})")
  432. except Exception as e:
  433. print(f"发送总召命令失败: {e}")
  434. def send_clock_sync(self):
  435. """发送时钟同步命令"""
  436. try:
  437. # 获取当前时间
  438. now = datetime.now()
  439. # CP56Time2a格式:毫秒(2) + 分钟(1) + 小时(1) + 日期(3)
  440. ms = (now.second * 1000 + now.microsecond // 1000) & 0xFFFF
  441. minute = now.minute & 0x3F
  442. hour = now.hour & 0x1F
  443. day = now.day & 0x1F
  444. month = now.month & 0x0F
  445. year = (now.year - 2000) & 0x7F
  446. time_bytes = struct.pack('<HBBBB', ms, minute, hour,
  447. (month << 5) | day, year)
  448. # 构造时钟同步ASDU
  449. asdu = struct.pack('<BBHHI',
  450. 103, # 类型标识:时钟同步命令
  451. 1, # VSQ
  452. 6, # COT:激活
  453. self.common_addr if hasattr(self, 'common_addr') else 1,
  454. 0 # 信息对象地址
  455. ) + time_bytes
  456. frame_length = len(asdu) + 4
  457. i_frame = struct.pack('<BBH',
  458. self.START_BYTE,
  459. frame_length,
  460. (self.send_seq << 1)
  461. ) + struct.pack('<H', (self.recv_seq << 1)) + asdu
  462. self.socket.send(i_frame)
  463. self.send_seq += 1
  464. print(f"发送时钟同步命令")
  465. except Exception as e:
  466. print(f"发送时钟同步命令失败: {e}")
  467. def display_data(self, parsed_data):
  468. """显示解析后的数据"""
  469. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  470. print(f"\n[{timestamp}] 接收到数据:")
  471. print(f"类型: {parsed_data.get('type', 'UNKNOWN')}")
  472. if 'description' in parsed_data:
  473. print(f"描述: {parsed_data['description']}")
  474. if 'type_description' in parsed_data:
  475. print(f"ASDU类型: {parsed_data['type_description']} (ID: {parsed_data['type_id']})")
  476. print(f"传送原因: {parsed_data['cot_description']} ({parsed_data['cot']})")
  477. print(f"公共地址: {parsed_data['common_addr']}")
  478. if 'num_objects' in parsed_data:
  479. print(f"信息对象数量: {parsed_data['num_objects']}, SQ: {parsed_data.get('sq', 0)}")
  480. if parsed_data.get('data_values'):
  481. print("数据值:")
  482. valid_values = [v for v in parsed_data['data_values'] if v.get('value') is not None]
  483. for i, value in enumerate(valid_values[:10]): # 显示前10个有效值
  484. quality_desc = self.get_quality_description(value.get('quality', 0))
  485. raw_info = f" [原始: {value.get('raw_bytes', '')}]" if 'raw_bytes' in value else ""
  486. print(f" 地址 {value.get('address', 'N/A')}: {value.get('value', 'N/A')} {quality_desc}{raw_info}")
  487. if len(valid_values) > 10:
  488. print(f" ... 还有 {len(valid_values) - 10} 个有效数据点")
  489. if len(parsed_data['data_values']) > len(valid_values):
  490. print(f" (过滤了 {len(parsed_data['data_values']) - len(valid_values)} 个异常值)")
  491. # 显示原始数据用于调试
  492. if 'raw_data' in parsed_data and parsed_data['raw_data']:
  493. print(f"原始数据: {parsed_data['raw_data']}")
  494. # 记录公共地址用于总召
  495. if 'common_addr' in parsed_data:
  496. self.common_addr = parsed_data['common_addr']
  497. def get_quality_description(self, quality):
  498. """获取质量描述符"""
  499. if quality == 0:
  500. return "(良好)"
  501. elif quality & 0x80:
  502. return "(无效)"
  503. elif quality & 0x40:
  504. return "(未更新)"
  505. elif quality & 0x20:
  506. return "(被取代)"
  507. elif quality & 0x10:
  508. return "(被阻塞)"
  509. else:
  510. return f"(质量: {quality})"
  511. print("-" * 50)
  512. def start(self):
  513. """启动客户端"""
  514. max_retries = 5
  515. retry_count = 0
  516. while retry_count < max_retries:
  517. if not self.connect():
  518. retry_count += 1
  519. if retry_count < max_retries:
  520. print(f"连接失败,{5}秒后重试 ({retry_count}/{max_retries})")
  521. time.sleep(5)
  522. continue
  523. else:
  524. print("达到最大重试次数,退出程序")
  525. return
  526. self.running = True
  527. retry_count = 0 # 重置重试计数
  528. # 启动接收线程
  529. receive_thread = threading.Thread(target=self.receive_data)
  530. receive_thread.daemon = True
  531. receive_thread.start()
  532. # 发送启动数据传输命令
  533. time.sleep(1)
  534. self.send_startdt()
  535. # 等待启动确认后发送总召
  536. time.sleep(2)
  537. self.send_general_interrogation()
  538. try:
  539. # 主循环 - 定期发送测试帧和总召
  540. last_gi_time = time.time()
  541. while self.running and self.connected:
  542. current_time = time.time()
  543. # 每5分钟发送一次总召
  544. if current_time - last_gi_time >= 300: # 300秒 = 5分钟
  545. self.send_general_interrogation()
  546. last_gi_time = current_time
  547. # 每30秒发送一次测试帧
  548. time.sleep(30)
  549. if self.connected:
  550. self.send_testfr()
  551. # 检查接收线程是否还活着
  552. if not receive_thread.is_alive():
  553. print("接收线程已停止,准备重连")
  554. break
  555. except KeyboardInterrupt:
  556. print("\n程序被用户中断")
  557. break
  558. finally:
  559. self.disconnect()
  560. # 如果不是用户中断,则尝试重连
  561. if self.running:
  562. print("连接断开,5秒后尝试重连...")
  563. time.sleep(5)
  564. def main():
  565. # 配置参数
  566. HOST = "192.168.50.242"
  567. PORT = 2404
  568. print(f"启动IEC 104客户端")
  569. print(f"目标服务器: {HOST}:{PORT}")
  570. print("按 Ctrl+C 退出程序\n")
  571. client = IEC104Client(HOST, PORT)
  572. client.start()
  573. if __name__ == "__main__":
  574. main()