import struct from datetime import datetime import data_analysis ######################################################################################################################## # This function parses and prints the received packet during automated polling FRAME_SIZE = 7 _buffer = bytearray() def parse_autopoll(data: bytes): global _buffer _buffer.extend(data) while len(_buffer) >= FRAME_SIZE: msg_id = _buffer[0:1] # resync if invalid ID if msg_id not in (b'r', b't'): del _buffer[0] continue frame = bytes(_buffer[:FRAME_SIZE]) del _buffer[:FRAME_SIZE] _id, value, read_count, error = struct.unpack(' str: raw = field_bytes.split(b'\x00', 1)[0] return raw.decode('utf-8', errors='replace').strip() def take_field(max_len: int) -> bytes: nonlocal i chunk = packet[i:i+max_len] i += len(chunk) return chunk fw_bytes = take_field(MAX_VERSION_LENGTH) hw_bytes = take_field(MAX_VERSION_LENGTH) sn_bytes = take_field(MAX_VERSION_LENGTH) fw = parse_ver_field(fw_bytes) hw = parse_ver_field(hw_bytes) sn = parse_ver_field(sn_bytes) # Return structured data return { "coeffs": coeffs, "fw": fw, "hw": hw, "sn": sn, } ######################################################################################################################## def verify_coeffs_and_versions(packet: bytes, expected_coeffs, expected_fw, expected_hw, expected_sn, float_tol=1e-5): parsed = parse_coeff_and_version(packet) coeffs_ok = True coeff_compare = [] for i, (exp, got) in enumerate(zip(expected_coeffs, parsed["coeffs"]), start=1): match = abs(exp - got) <= float_tol coeff_compare.append((i, exp, got, match)) if not match: coeffs_ok = False fw_ok = (parsed["fw"] == expected_fw) hw_ok = (parsed["hw"] == expected_hw) sn_ok = (parsed["sn"] == expected_sn) overall_ok = coeffs_ok and fw_ok and hw_ok and sn_ok return { "overall_match": overall_ok, # Coeffs already carry expected & received per row "coeff_details": coeff_compare, # Versions: include both expected and parsed for pretty printing "versions": { "fw": {"expected": expected_fw, "received": parsed["fw"], "match": fw_ok}, "hw": {"expected": expected_hw, "received": parsed["hw"], "match": hw_ok}, "sn": {"expected": expected_sn, "received": parsed["sn"], "match": sn_ok}, }, # Keep the raw parsed payload too (optional) "parsed": parsed, } ######################################################################################################################## def print_verification_results(status): print(f"Overall Match: {status['overall_match']}\n") # ---- Coeff Table ---- print(f"{'Coeff':<8} {'Expected':>12} {'Received':>12} {'Match':>8}") for idx, exp, got, ok in status["coeff_details"]: print(f"{idx:<8} {exp:>12.2f} {got:>12.2f} {str(ok):>8}") # ---- Version Table ---- print("\n" + f"{'Field':<12} {'Expected':>12} {'Received':>12} {'Match':>8}") for field in ("fw", "hw", "sn"): row = status["versions"][field] label = field.upper() print(f"{label:<12} {row['expected']:>12} {row['received']:>12} {str(row['match']):>8}") ########################################################################################################################