import struct # ---- Struct formats (little-endian, packed) ---- # flash_file: int, int, float×6, int, int, int -> 44 bytes FLASH_FMT = struct.Struct(" 52 bytes # ADuCM360: unsigned long is 32-bit -> 'I' ADC_FMT = struct.Struct("<4f6if2I") # cmdPollBinStruct: flash_file + adc_file + float (conductivity) -> 100 bytes CMD_FMT = struct.Struct(" dict: """ Parse a 100-byte binary payload of cmdPollBinStruct into a nested dict. Expected layout (packed, little-endian): struct cmdPollBinStruct { flash_file flashData; // 44 bytes adc_file adcData; // 52 bytes float conductivity; // 4 bytes } """ if len(data) != CMD_SIZE: raise ValueError(f"Expected {CMD_SIZE} bytes, got {len(data)}") # Unpack top-level once, then split into sections vals = CMD_FMT.unpack(data) # Map flash_file (first 11 fields) flash_keys = [ "baud_rate", "rs485_address", "voltage", "frequency", "temp_coef", "cell_const", "setup", "hold", "lcd_uc1601s_br", "lcd_uc1601s_pm", "lcd_uc1601s_tc", ] flash_vals = vals[0:11] flash = dict(zip(flash_keys, flash_vals)) # Map adc_file (next 13 fields) adc_keys = [ "p_curt", "n_curt", "p_volt", "n_volt", "p_curt_gain", "p_volt_gain", "n_curt_gain", "n_volt_gain", "rtd_type", "wire_mode", "temp", "adc0_hit", "adc1_hit", ] adc_vals = vals[11:24] adc = dict(zip(adc_keys, adc_vals)) # Conductivity (last float) conductivity = vals[24] return { "flashData": flash, "adcData": adc, "conductivity": conductivity, } # ---- Optional: reliable read helper for UART (PySerial) ---- def read_exact(ser, n: int) -> bytes: """Read exactly n bytes from a serial.Serial; returns empty on timeout.""" buf = bytearray() while len(buf) < n: chunk = ser.read(n - len(buf)) if not chunk: # timeout break buf.extend(chunk) return bytes(buf) def read_cmd_poll_bin(ser) -> dict: """ Read and parse one cmdPollBinStruct (100 bytes) from the serial port. Assumes the device has already been commanded (e.g., 'poll_bin'). """ data = read_exact(ser, CMD_SIZE) if len(data) != CMD_SIZE: raise TimeoutError(f"Got {len(data)} / {CMD_SIZE} bytes") def print_poll_readout(d: dict) -> None: """ Pretty-print the parsed cmdPollBinStruct dict in the required format. Expects: d = { "flashData": { "voltage", "frequency", "setup", "hold", "temp_coef", "cell_const", # (other fields may be present) }, "adcData": { "adc0_hit", "adc1_hit", "p_curt_gain", "n_curt_gain", "p_volt_gain", "n_volt_gain", "p_curt", "n_curt", "p_volt", "n_volt", "rtd_type", "wire_mode", "temp", }, "conductivity": float, } """ f = d["flashData"] a = d["adcData"] cond = d["conductivity"] # 1) Excitation + cell setup print(f"EXC V: {float(f['voltage']):.6f}V") print(f"EXC FREQ: {float(f['frequency']):.6f}Hz") print(f"EXC setup time: {float(f['setup']) * 100.0:.6f}%") # if 'setup' is fraction [0..1] print(f"EXC hold time: {float(f['hold']) * 100.0:.6f}%") # if 'hold' is fraction [0..1] print(f"TEMP COEF: {float(f['temp_coef']):.6f}%/'C") print(f"cell K: {float(f['cell_const']):.6f}/cm") # 2) ADC hits and gains + measured p-p values print(f"ADC0 hits: {int(a['adc0_hit'])}") print(f"+I gain: {int(a['p_curt_gain'])}") print(f"+Ip-p: {float(a['p_curt']):.6e}A") print(f"-I gain: {int(a['n_curt_gain'])}") print(f"-Ip-p: {float(a['n_curt']):.6e}A") print(f"+V gain: {int(a['p_volt_gain'])}") print(f"+Vp-p: {float(a['p_volt']):.6e}V") print(f"-V gain: {int(a['n_volt_gain'])}") print(f"-Vp-p: {float(a['n_volt']):.6e}V") print(f"ADC1 hits: {int(a['adc1_hit'])}") # 3) RTD type/wire + temperature # If your payload uses numeric codes, map them here; otherwise print as strings. rtd_type = a['rtd_type'] wire_mode = a['wire_mode'] RTD_TYPES = {1000: "PT1000", 100: "PT100"} # adjust if you use enumerations WIRE_MODES = {2: "2 wire", 3: "3 wire", 4: "4 wire"} rtd_str = RTD_TYPES.get(rtd_type, str(rtd_type)) wire_str = WIRE_MODES.get(wire_mode, str(wire_mode)) print(f"RTD: {rtd_str}") print(f"RTD wire: {wire_str}") print(f"TEMP: {float(a['temp']):.6f}'C") # 4) Blank line + conductivity print() # blank line print(f"conductivity: {float(cond):.6e}S/cm") ######################################################################################################################## def parse_cal_file_bytes(data: bytes) -> dict: """Parse 48 bytes as 12 little-endian floats into a dict {coeff1..coeff12}.""" if len(data) != 12 * 4: raise ValueError(f"Expected 48 bytes, got {len(data)}") coeffs = struct.unpack('<12f', data) return {f'coeff{i+1}': coeffs[i] for i in range(12)} def print_cal_file(d: dict) -> None: for i in range(12): v = d[f'coeff{i+1}'] print(f"coeff{i+1}: {v:.10f}")