| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import struct
- class ASDU:
- def __init__(self, type_id, vsq, cot, common_addr, io_elements):
- self.type_id = type_id
- self.vsq = vsq
- self.cot = cot
- self.common_addr = common_addr
- self.io_elements = io_elements
- @classmethod
- def unpack(cls, data):
- if len(data) < 6:
- return None
- type_id = data[0]
- vsq = data[1]
- cot = struct.unpack('!H', data[2:4])[0]
- common_addr = struct.unpack('!H', data[4:6])[0]
- io_elements = []
- data = data[6:]
- num_elements = vsq & 0x7F
- is_sequence = (vsq & 0x80) != 0
- if is_sequence:
- if len(data) < 3:
- return None
- io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
- data = data[3:]
- for i in range(num_elements):
- element_data, data = cls._parse_io_element(type_id, data)
- io_elements.append({
- 'address': io_addr + i,
- 'value': element_data['value'],
- 'quality': element_data['quality']
- })
- else:
- for _ in range(num_elements):
- if len(data) < 3:
- return None
- io_addr = struct.unpack('!I', b'\x00' + data[:3])[0]
- data = data[3:]
- element_data, data = cls._parse_io_element(type_id, data)
- io_elements.append({
- 'address': io_addr,
- 'value': element_data['value'],
- 'quality': element_data['quality']
- })
- return cls(type_id, vsq, cot, common_addr, io_elements)
- @staticmethod
- def _parse_io_element(type_id, data):
- if type_id in [1, 30]:
- value = data[0] & 0x01
- quality = data[0] >> 7
- element_data = {
- 'value': value,
- 'quality': quality
- }
- if type_id == 30:
- time_data = data[1:8]
- element_data['time'] = cls._parse_time(time_data)
- return element_data, data[8:]
- return element_data, data[1:]
- elif type_id in [3, 31]:
- value = data[0] & 0x03
- quality = data[0] >> 7
- element_data = {
- 'value': value,
- 'quality': quality
- }
- if type_id == 31:
- time_data = data[1:8]
- element_data['time'] = cls._parse_time(time_data)
- return element_data, data[8:]
- return element_data, data[1:]
- elif type_id == 9:
- value = struct.unpack('!h', data[:2])[0]
- quality = data[2] >> 7
- element_data = {
- 'value': value,
- 'quality': quality
- }
- return element_data, data[3:]
- elif type_id == 11:
- value = struct.unpack('!h', data[:2])[0]
- quality = data[2] >> 7
- element_data = {
- 'value': value,
- 'quality': quality
- }
- return element_data, data[3:]
- elif type_id == 13:
- value = struct.unpack('!f', data[:4])[0]
- quality = data[4] >> 7
- element_data = {
- 'value': value,
- 'quality': quality
- }
- return element_data, data[5:]
- elif type_id == 151:
- value = data[0] & 0x0F
- quality = data[0] >> 4
- element_data = {
- 'value': value,
- 'quality': quality
- }
- return element_data, data[1:]
- else:
- return {'raw_data': data}, b''
- @staticmethod
- def _parse_time(time_data):
- if len(time_data) < 7:
- return None
- milliseconds = struct.unpack('!H', time_data[:2])[0]
- minute = time_data[2] & 0x3F
- hour = time_data[3] & 0x1F
- day = time_data[4] & 0x1F
- month = time_data[5] & 0x0F
- year = (time_data[6] & 0x7F) + 2000
- return {
- 'year': year,
- 'month': month,
- 'day': day,
- 'hour': hour,
- 'minute': minute,
- 'milliseconds': milliseconds
- }
|