import re import math import serial # import pyserial import serial.tools.list_ports as sertools import time import struct import parsePackets from datetime import datetime loop_time = 10 ########################################################################################## # Execute initialization function def initializeAllSensors(ser): ser.reset_input_buffer() # Clears the input buffer printTimestamp() ser.write(b'a\r\n') time.sleep(0.2) while True: serial_data = ser.readline() if b'\x03' in serial_data: print(serial_data) break printTimestamp() ########################################################################################## # Get initialization status def getInitStatus(ser): ser.reset_input_buffer() # Clears the input buffer printTimestamp() ser.write(b'l\r\n') time.sleep(0.2) serial_data = ser.readline() print(serial_data) printTimestamp() ########################################################################################## # Read single sensor value Measurement Vales def readSingleSensorMeasurement(ser, sensorNum): ser.reset_input_buffer() # Clears the input buffer select_unit = f'j {sensorNum}\r\n' printTimestamp() ser.write(select_unit.encode()) time.sleep(0.2) printTimestamp() ser.write(b'h\r\n') # printTimestamp() time.sleep(0.2) # printTimestamp() serial_data = ser.readline() print(serial_data) # printTimestamp() parsePackets.parseMesaurementData(serial_data) ########################################################################################## # Function to read all the sensor values def getAllSensorMeasurements(ser,): # Read the data from the corresponding channel ser.reset_input_buffer() # Clears the input buffer printTimestamp() ser.write(b'g\r\n') printTimestamp() time.sleep(0.2) printTimestamp() serial_data = ser.readline() print(serial_data) printTimestamp() parsePackets.parseMesaurementData(serial_data) ########################################################################################## def packMeasurementSettingsCSV(): # Define measurement settings values SinFreq = 11000.0 DacVoltPP = 400.0 BiasVolt = 200.0 HstiaRtiaSel = 7 AdcPgaGain = 2 DftNum = 256 ADCAvgNum = 16 # Create CSV string: mst,,,,,,, csv_command = f"mst,{SinFreq},{DacVoltPP},{BiasVolt},{HstiaRtiaSel},{AdcPgaGain},{DftNum},{ADCAvgNum}\r\n" return csv_command ########################################################################################## cfgSettingsParam = [ "sinfreq", "dacpp", "bias", "rtia", "pga", "dftnum", "avgnum" ] UPDATE_CFG_STATUS = { 0: "ERR_UNRECOGNIZED_PARAM", 1: "NO_CHANGE_TO_TIA", 2: "NO_CHANGE_TO_PGA", 3: "NO_CHANGE_TO_DFT_NUM", 4: "NO_CHANGE_TO_AVG_NUM", 5: "SUCCESS" } def updateMeasurementSettingsStruct(ser): ser.reset_input_buffer() printTimestamp() # Get CSV command csv_command = packMeasurementSettingsCSV() print(f"Sending mst command in CSV format: {csv_command.strip()}") ser.write(csv_command.encode()) # Wait for response: expect 7 bytes (one per parameter) time.sleep(0.2) response = ser.read(7) # MAX_CONDUCTIVITY_MST_PARAM_IDX = 7 UPDATE_CFG_STATUS = { 0: "ERR_UNRECOGNIZED_PARAM", 1: "NO_CHANGE_TO_TIA", 2: "NO_CHANGE_TO_PGA", 3: "NO_CHANGE_TO_DFT_NUM", 4: "NO_CHANGE_TO_AVG_NUM", 5: "SUCCESS" } if len(response) == len(cfgSettingsParam): print("Received update status for all parameters:") for idx, status_byte in enumerate(response): status_code = int(status_byte) status_message = UPDATE_CFG_STATUS.get(status_code, "UNKNOWN_STATUS") param_name = cfgSettingsParam[idx] print(f"{param_name}: {status_message}") else: print(f"Expected {len(cfgSettingsParam)} bytes, received {len(response)} bytes. Response incomplete.") ########################################################################################## # Function to read all the sensor values def getMeasurementSettings(ser): # Read the data from the corresponding channel ser.reset_input_buffer() # Clears the input buffer printTimestamp() ser.write(b'k\r\n') time.sleep(0.2) serial_data = ser.read(28) print(serial_data) printTimestamp() parsePackets.parseMeasurementSettings(serial_data) ########################################################################################## def updateEEPROMData(ser, *values): """ Sends a 'save' command with 1 to N EEPROM values over UART in CSV format, and interprets the response using Update_EEPROM_Status codes. Args: ser: The serial.Serial object. *values: A variable number of float or double values to be saved to EEPROM. """ UPDATE_EEPROM_STATUS = { 0: "INVALID_CMD", 1: "NO_VALID_VALUES", 2: "TOO_MANY_VALUES", 3: "WAITING_FOR_CONFIRMATION", 4: "CONFIRMATION_TIMEOUT", 5: "ERR_DOUBLE_VALUES", 6: "ERR_FLOAT_VALUES", 7: "VALUES_VERIFIED", 8: "OPERATION_ABORTED", 9: "INVALID_RESPONSE", 10: "SUCCESS" } if not values: print("No values provided to update EEPROM.") return # Construct the CSV command command = "upe," + ",".join(f"{v:.6f}" for v in values) + "\r\n" print(f"Sending command: {command.strip()}") # Send the command ser.reset_input_buffer() ser.write(command.encode()) # Wait for response time.sleep(0.2) response = ser.readline().decode(errors='ignore').strip() try: status_code = int(response) status_message = UPDATE_EEPROM_STATUS.get(status_code, "UNKNOWN_STATUS") print(f"EEPROM Update Response Code: {status_code} - {status_message}") except ValueError: print(f"Unexpected response: {response}") ########################################################################################## # Function to read all the sensor values def getEEPROMdata(ser): # Read the data from the corresponding channel ser.reset_input_buffer() # Clears the input buffer printTimestamp() ser.write(b'e\r\n') time.sleep(0.2) serial_data = ser.read(128) print(serial_data) printTimestamp() parsePackets.parseEEPROMdata(serial_data) ########################################################################################## # Function to read sensor data for 4 and 6 in loop for 10 seconds def loopSingleSensorReadings(ser): start_time = time.time() # sensor_nums = [4, 6] # index = 0 while time.time() - start_time <= loop_time: # sensor_num = sensor_nums[index % 2] # Alternate between 4 and 6 # readSingleSensorMeasurement(ser, str(sensor_num)) readSingleSensorMeasurement(ser, str(6)) # index += 1 ########################################################################################## # Function to read all sensor data in loop for 10 seconds def loopAllSensorReadings(ser): start_time = time.time() while time.time() - start_time <= loop_time: getAllSensorMeasurements(ser) ########################################################################################## def printTimestamp(): current_time = datetime.now() print("Current timestamp:", current_time) ########################################################################################## def getAllSensorData(ser): ser.reset_input_buffer() printTimestamp() ser.write(b'm\r\n') start_time = time.time() while time.time() - start_time < 100: if ser.in_waiting: serial_data = ser.readline() printTimestamp() parsePackets.parseMesaurementData(serial_data) ser.write(b'n\r\n') printTimestamp() ##########################################################################################