import struct ######################################################################################################################## # This function parses and prints the received packet during automated polling FRAME_SIZE = 7 _buffer = bytearray() def parse_autopoll(data: bytes): global _buffer _buffer.extend(data) while len(_buffer) >= FRAME_SIZE: msg_id = _buffer[0:1] # resync if invalid ID if msg_id not in (b'r', b't'): del _buffer[0] continue frame = bytes(_buffer[:FRAME_SIZE]) del _buffer[:FRAME_SIZE] _id, value, read_count, error = struct.unpack(' str: # decode up to first NUL, tolerate missing terminator raw = field_bytes.split(b'\x00', 1)[0] return raw.decode('utf-8', errors='replace').strip() def take_field(max_len: int) -> bytes: """ Take up to max_len bytes from packet starting at i. If fewer than max_len bytes remain, take what's left (tolerate truncated padding). """ nonlocal i chunk = packet[i:i + max_len] i += len(chunk) return chunk # Read 3 fields, each up to MAX_VERSION_LENGTH, but do NOT require full padding bytes. fw_bytes = take_field(MAX_VERSION_LENGTH) hw_bytes = take_field(MAX_VERSION_LENGTH) sn_bytes = take_field(MAX_VERSION_LENGTH) fw = parse_ver_field(fw_bytes) hw = parse_ver_field(hw_bytes) sn = parse_ver_field(sn_bytes) print("Versions:") print(" FW:", fw) print(" HW:", hw) print(" SN:", sn) # If there’s extra trailing data, show it (optional but useful) if i < len(packet): print(f"Trailing bytes ({len(packet)-i}): {packet[i:]}") ########################################################################################################################