########################################################################### # # Copyright (c) 2024-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file analog_devices_conductivity_board.py # # @author (original) Christina Heine # @date (original) 08-October-2024 # @author (last) Eliza Petersen # @date (last) 15-November-2024 # ############################################################################ from serial import Serial import re import time class AnalogDevicesConductivityBoard: def __init__(self): self.ser = None self.run = False self.pressure = None self.id = None self.address = None def create_connection(self, port_name, address): """Creates serial connection to device using known port number.""" self.address = address.encode('UTF-8') self.ser = Serial(port_name, baudrate=115200) def get_poll(self): self.ser.write(self.address + b' poll\n') # poll = str(self.ser.read(385).decode()) #decode by number of characters - this may change i = 0 poll = "" while i <= 21: # decode by number of lines of data messages current_line_text = str(self.ser.readline().decode()) poll = poll + current_line_text i += 1 return poll def get_poll_more_lines(self, lines_index): #This function is used for debugging self.ser.write(self.address + b' poll\n') # poll = str(self.ser.read(385).decode()) #decode by number of characters - this may change i = 0 poll = "" while i <= lines_index: # decode by number of lines of data messages current_line_text = str(self.ser.readline().decode()) poll = poll + current_line_text i += 1 return poll def decode_last_poll(self, last_poll, lines_index): # This function is used for debugging i = 0 while i <= lines_index: # decode by number of lines of data messages current_line_text = str(self.ser.readline().decode()) poll = poll + current_line_text i += 1 return poll def get_cond(self): measurements = self.get_poll() cond = float(re.search('conductivity: (.*)S/cm', measurements).group(1)) * 1000000.00 return cond def set_frequency(self, freq): freq = str(freq) byte_freq = freq.encode('UTF-8') self.ser.write(self.address + b' setfreq ' + byte_freq + b'\n') time.sleep(1) self.ser.write(self.address + b' poll\n') i = 0 set_freq_message = "" while i <= 23: # current_line_text = str(self.ser.readline().decode()) set_freq_message = set_freq_message + current_line_text i += 1 return set_freq_message def set_potential(self, voltage): voltage = str(voltage) byte_voltage = voltage.encode('UTF-8') self.ser.write(self.address + b' setvolt ' + byte_voltage + b'\n') time.sleep(0.5) # This delay is critical or else next poll will happen before the set voltage line is # added to the poll, and won't properly clear out the extra message that adds to the buffer after setting # potential, affecting the next get poll self.ser.write(self.address + b' poll\n') i = 0 set_pot_message = "" while i <= 23: current_line_text = str(self.ser.readline().decode()) set_pot_message = set_pot_message + current_line_text i += 1 return set_pot_message def get_set_frequency(self): measurements = self.get_poll() set_freq = float(re.search('EXC FREQ: (.*)Hz', measurements).group(1)) return set_freq def get_set_potential(self): measurements = self.get_poll() set_pot = float(re.search('EXC V: (.*)V', measurements).group(1)) return set_pot def check_frequency(self, target_frequency): frequency_set = False set_frequency = self.get_set_frequency() if set_frequency != target_frequency: print(f'Frequency incorrectly set! \n' f'Target frequency: {target_frequency} V\n' f'Set frequency: {set_frequency} V') return frequency_set else: frequency_set = True return frequency_set def check_potential(self, target_potential): potential_set = False set_potential = self.get_set_potential() if set_potential != target_potential: print(f'Potential incorrectly set! \n' f'Target potential: {target_potential} V\n' f'Set potential: {set_potential} V') else: potential_set = True return potential_set