import struct class ASDU: def __init__(self, type_id, vsq, cot, common_addr, io_elements): self.type_id = type_id self.vsq = vsq self.cot = cot self.common_addr = common_addr self.io_elements = io_elements @classmethod def unpack(cls, data): if len(data) < 6: return None type_id = data[0] vsq = data[1] cot = struct.unpack('!H', data[2:4])[0] common_addr = struct.unpack('!H', data[4:6])[0] io_elements = [] data = data[6:] num_elements = vsq & 0x7F is_sequence = (vsq & 0x80) != 0 if is_sequence: if len(data) < 3: return None io_addr = struct.unpack('!I', b'\x00' + data[:3])[0] data = data[3:] for i in range(num_elements): element_data, data = cls._parse_io_element(type_id, data) io_elements.append({ 'address': io_addr + i, 'value': element_data['value'], 'quality': element_data['quality'] }) else: for _ in range(num_elements): if len(data) < 3: return None io_addr = struct.unpack('!I', b'\x00' + data[:3])[0] data = data[3:] element_data, data = cls._parse_io_element(type_id, data) io_elements.append({ 'address': io_addr, 'value': element_data['value'], 'quality': element_data['quality'] }) return cls(type_id, vsq, cot, common_addr, io_elements) @staticmethod def _parse_io_element(type_id, data): if type_id in [1, 30]: value = data[0] & 0x01 quality = data[0] >> 7 element_data = { 'value': value, 'quality': quality } if type_id == 30: time_data = data[1:8] element_data['time'] = cls._parse_time(time_data) return element_data, data[8:] return element_data, data[1:] elif type_id in [3, 31]: value = data[0] & 0x03 quality = data[0] >> 7 element_data = { 'value': value, 'quality': quality } if type_id == 31: time_data = data[1:8] element_data['time'] = cls._parse_time(time_data) return element_data, data[8:] return element_data, data[1:] elif type_id == 9: value = struct.unpack('!h', data[:2])[0] quality = data[2] >> 7 element_data = { 'value': value, 'quality': quality } return element_data, data[3:] elif type_id == 11: value = struct.unpack('!h', data[:2])[0] quality = data[2] >> 7 element_data = { 'value': value, 'quality': quality } return element_data, data[3:] elif type_id == 13: value = struct.unpack('!f', data[:4])[0] quality = data[4] >> 7 element_data = { 'value': value, 'quality': quality } return element_data, data[5:] elif type_id == 151: value = data[0] & 0x0F quality = data[0] >> 4 element_data = { 'value': value, 'quality': quality } return element_data, data[1:] else: return {'raw_data': data}, b'' @staticmethod def _parse_time(time_data): if len(time_data) < 7: return None milliseconds = struct.unpack('!H', time_data[:2])[0] minute = time_data[2] & 0x3F hour = time_data[3] & 0x1F day = time_data[4] & 0x1F month = time_data[5] & 0x0F year = (time_data[6] & 0x7F) + 2000 return { 'year': year, 'month': month, 'day': day, 'hour': hour, 'minute': minute, 'milliseconds': milliseconds }