ASDU.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import struct
  2. class ASDU:
  3. def __init__(self, type_id, vsq, cot, common_addr, io_elements):
  4. self.type_id = type_id
  5. self.vsq = vsq
  6. self.cot = cot
  7. self.common_addr = common_addr
  8. self.io_elements = io_elements
  9. @classmethod
  10. def unpack(cls, data):
  11. if len(data) < 6:
  12. return None
  13. type_id = data[0]
  14. vsq = data[1]
  15. cot = struct.unpack('!H', data[2:4])[0]
  16. common_addr = struct.unpack('!H', data[4:6])[0]
  17. io_elements = []
  18. data = data[6:]
  19. num_elements = vsq & 0x7F
  20. is_sequence = (vsq & 0x80) != 0
  21. if is_sequence:
  22. if len(data) < 3:
  23. return None
  24. io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
  25. data = data[3:]
  26. for i in range(num_elements):
  27. element_data, data = cls._parse_io_element(type_id, data)
  28. io_elements.append({
  29. 'address': io_addr + i,
  30. 'value': element_data['value'],
  31. 'quality': element_data['quality']
  32. })
  33. else:
  34. for _ in range(num_elements):
  35. if len(data) < 3:
  36. return None
  37. io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
  38. data = data[3:]
  39. element_data, data = cls._parse_io_element(type_id, data)
  40. io_elements.append({
  41. 'address': io_addr,
  42. 'value': element_data['value'],
  43. 'quality': element_data['quality']
  44. })
  45. return cls(type_id, vsq, cot, common_addr, io_elements)
  46. @staticmethod
  47. def _parse_io_element(type_id, data):
  48. if type_id in [1, 30]:
  49. value = data[0] & 0x01
  50. quality = data[0] >> 7
  51. element_data = {
  52. 'value': value,
  53. 'quality': quality
  54. }
  55. if type_id == 30:
  56. time_data = data[1:8]
  57. element_data['time'] = cls._parse_time(time_data)
  58. return element_data, data[8:]
  59. return element_data, data[1:]
  60. elif type_id in [3, 31]:
  61. value = data[0] & 0x03
  62. quality = data[0] >> 7
  63. element_data = {
  64. 'value': value,
  65. 'quality': quality
  66. }
  67. if type_id == 31:
  68. time_data = data[1:8]
  69. element_data['time'] = cls._parse_time(time_data)
  70. return element_data, data[8:]
  71. return element_data, data[1:]
  72. elif type_id == 9:
  73. value = struct.unpack('!h', data[:2])[0]
  74. quality = data[2] >> 7
  75. element_data = {
  76. 'value': value,
  77. 'quality': quality
  78. }
  79. return element_data, data[3:]
  80. elif type_id == 11:
  81. value = struct.unpack('!h', data[:2])[0]
  82. quality = data[2] >> 7
  83. element_data = {
  84. 'value': value,
  85. 'quality': quality
  86. }
  87. return element_data, data[3:]
  88. elif type_id == 13:
  89. value = struct.unpack('!f', data[:4])[0]
  90. quality = data[4] >> 7
  91. element_data = {
  92. 'value': value,
  93. 'quality': quality
  94. }
  95. return element_data, data[5:]
  96. elif type_id == 151:
  97. value = data[0] & 0x0F
  98. quality = data[0] >> 4
  99. element_data = {
  100. 'value': value,
  101. 'quality': quality
  102. }
  103. return element_data, data[1:]
  104. else:
  105. return {'raw_data': data}, b''
  106. @staticmethod
  107. def _parse_time(time_data):
  108. if len(time_data) < 7:
  109. return None
  110. milliseconds = struct.unpack('!H', time_data[:2])[0]
  111. minute = time_data[2] & 0x3F
  112. hour = time_data[3] & 0x1F
  113. day = time_data[4] & 0x1F
  114. month = time_data[5] & 0x0F
  115. year = (time_data[6] & 0x7F) + 2000
  116. return {
  117. 'year': year,
  118. 'month': month,
  119. 'day': day,
  120. 'hour': hour,
  121. 'minute': minute,
  122. 'milliseconds': milliseconds
  123. }