Index: CN0_Python_Scripts/parser.py =================================================================== diff -u -r509657069dd7ee800560a5c0b790077ce4cbd31a -r685d8eb11e2a49f2c8f7a270e379186415594601 --- CN0_Python_Scripts/parser.py (.../parser.py) (revision 509657069dd7ee800560a5c0b790077ce4cbd31a) +++ CN0_Python_Scripts/parser.py (.../parser.py) (revision 685d8eb11e2a49f2c8f7a270e379186415594601) @@ -1,161 +1,96 @@ import struct -# ---- Struct formats (little-endian, packed) ---- -# flash_file: int, int, float×6, int, int, int -> 44 bytes -FLASH_FMT = struct.Struct(" 52 bytes -# ADuCM360: unsigned long is 32-bit -> 'I' -ADC_FMT = struct.Struct("<4f6if2I") +######################################################################################################################## +# This function parses and prints the received packet during automated polling +FRAME_SIZE = 7 +_buffer = bytearray() -# cmdPollBinStruct: flash_file + adc_file + float (conductivity) -> 100 bytes -CMD_FMT = struct.Struct("= FRAME_SIZE: + msg_id = _buffer[0:1] -def parse_cmd_poll_bin(data: bytes) -> dict: - """ - Parse a 100-byte binary payload of cmdPollBinStruct into a nested dict. + # resync if invalid ID + if msg_id not in (b'r', b't'): + del _buffer[0] + continue - Expected layout (packed, little-endian): - struct cmdPollBinStruct { - flash_file flashData; // 44 bytes - adc_file adcData; // 52 bytes - float conductivity; // 4 bytes - } - """ - if len(data) != CMD_SIZE: - raise ValueError(f"Expected {CMD_SIZE} bytes, got {len(data)}") + frame = bytes(_buffer[:FRAME_SIZE]) + del _buffer[:FRAME_SIZE] - # Unpack top-level once, then split into sections - vals = CMD_FMT.unpack(data) + _id, value, read_count, error = struct.unpack(' bytes: - """Read exactly n bytes from a serial.Serial; returns empty on timeout.""" - buf = bytearray() - while len(buf) < n: - chunk = ser.read(n - len(buf)) - if not chunk: # timeout - break - buf.extend(chunk) - return bytes(buf) + if len(packet) < i + coeff_size: + raise ValueError(f"Need {coeff_size} bytes for coeffStruct, got {len(packet) - i}") + _, *coeffs = struct.unpack(coeff_fmt, packet[i:i+coeff_size]) + i += coeff_size -def read_cmd_poll_bin(ser) -> dict: - """ - Read and parse one cmdPollBinStruct (100 bytes) from the serial port. - Assumes the device has already been commanded (e.g., 'poll_bin'). - """ - data = read_exact(ser, CMD_SIZE) - if len(data) != CMD_SIZE: - raise TimeoutError(f"Got {len(data)} / {CMD_SIZE} bytes") + print("Coefficients:") + for idx, c in enumerate(coeffs, 1): + print(f" coeff{idx}: {c:.10f}") + # --- versionStruct --- + if len(packet) <= i or packet[i:i+1] != b'v': + raise ValueError(f"Expected 'v' after coeffStruct, got {packet[i:i+1]!r}") + i += 1 # consume 'v' -def print_poll_readout(d: dict) -> None: - """ - Pretty-print the parsed cmdPollBinStruct dict in the required format. - Expects: - d = { - "flashData": { - "voltage", "frequency", "setup", "hold", - "temp_coef", "cell_const", - # (other fields may be present) - }, - "adcData": { - "adc0_hit", "adc1_hit", - "p_curt_gain", "n_curt_gain", "p_volt_gain", "n_volt_gain", - "p_curt", "n_curt", "p_volt", "n_volt", - "rtd_type", "wire_mode", "temp", - }, - "conductivity": float, - } - """ - f = d["flashData"] - a = d["adcData"] - cond = d["conductivity"] + def parse_ver_field(field_bytes: bytes) -> str: + # decode up to first NUL, tolerate missing terminator + raw = field_bytes.split(b'\x00', 1)[0] + return raw.decode('utf-8', errors='replace').strip() - # 1) Excitation + cell setup - print(f"EXC V: {float(f['voltage']):.6f}V") - print(f"EXC FREQ: {float(f['frequency']):.6f}Hz") - print(f"EXC setup time: {float(f['setup']) * 100.0:.6f}%") # if 'setup' is fraction [0..1] - print(f"EXC hold time: {float(f['hold']) * 100.0:.6f}%") # if 'hold' is fraction [0..1] - print(f"TEMP COEF: {float(f['temp_coef']):.6f}%/'C") - print(f"cell K: {float(f['cell_const']):.6f}/cm") + def take_field(max_len: int) -> bytes: + """ + Take up to max_len bytes from packet starting at i. + If fewer than max_len bytes remain, take what's left (tolerate truncated padding). + """ + nonlocal i + chunk = packet[i:i + max_len] + i += len(chunk) + return chunk - # 2) ADC hits and gains + measured p-p values - print(f"ADC0 hits: {int(a['adc0_hit'])}") - print(f"+I gain: {int(a['p_curt_gain'])}") - print(f"+Ip-p: {float(a['p_curt']):.6e}A") - print(f"-I gain: {int(a['n_curt_gain'])}") - print(f"-Ip-p: {float(a['n_curt']):.6e}A") - print(f"+V gain: {int(a['p_volt_gain'])}") - print(f"+Vp-p: {float(a['p_volt']):.6e}V") - print(f"-V gain: {int(a['n_volt_gain'])}") - print(f"-Vp-p: {float(a['n_volt']):.6e}V") - print(f"ADC1 hits: {int(a['adc1_hit'])}") + # Read 3 fields, each up to MAX_VERSION_LENGTH, but do NOT require full padding bytes. + fw_bytes = take_field(MAX_VERSION_LENGTH) + hw_bytes = take_field(MAX_VERSION_LENGTH) + sn_bytes = take_field(MAX_VERSION_LENGTH) - # 3) RTD type/wire + temperature - # If your payload uses numeric codes, map them here; otherwise print as strings. - rtd_type = a['rtd_type'] - wire_mode = a['wire_mode'] - RTD_TYPES = {1000: "PT1000", 100: "PT100"} # adjust if you use enumerations - WIRE_MODES = {2: "2 wire", 3: "3 wire", 4: "4 wire"} + fw = parse_ver_field(fw_bytes) + hw = parse_ver_field(hw_bytes) + sn = parse_ver_field(sn_bytes) - rtd_str = RTD_TYPES.get(rtd_type, str(rtd_type)) - wire_str = WIRE_MODES.get(wire_mode, str(wire_mode)) - print(f"RTD: {rtd_str}") - print(f"RTD wire: {wire_str}") - print(f"TEMP: {float(a['temp']):.6f}'C") + print("Versions:") + print(" FW:", fw) + print(" HW:", hw) + print(" SN:", sn) - # 4) Blank line + conductivity - print() # blank line - print(f"conductivity: {float(cond):.6e}S/cm") + # If there’s extra trailing data, show it (optional but useful) + if i < len(packet): + print(f"Trailing bytes ({len(packet)-i}): {packet[i:]}") - ######################################################################################################################## - -def parse_cal_file_bytes(data: bytes) -> dict: - """Parse 48 bytes as 12 little-endian floats into a dict {coeff1..coeff12}.""" - if len(data) != 12 * 4: - raise ValueError(f"Expected 48 bytes, got {len(data)}") - coeffs = struct.unpack('<12f', data) - return {f'coeff{i+1}': coeffs[i] for i in range(12)} - -def print_cal_file(d: dict) -> None: - for i in range(12): - v = d[f'coeff{i+1}'] - print(f"coeff{i+1}: {v:.10f}") - -