""" \file DialysateGeneraator.py \brief Comprises API interface for the DG board. """ from DialityCoreSerialProtocol import DialitySerialMessenger from DialityCoreSerialProtocol import START_BYTE from time import sleep """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DG - \ref HD - \ref UI """ class DG: """ \class DG \brief Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ def __init__(self, port="/dev/ttyUSB0"): """ DG constructor using serial port name \param port: serial port name, e.g. "/dev/ttyUSB0" \returns DG object that allows communication with board via port \details For example: dg_object = DG(port='/dev/ttyUSB69') or dg_object = DG('/dev/ttyUSB69') """ # Create listener self.__serialPort = DialitySerialMessenger(serial_port=port) self.__serialPort.start() self.__messageID = 0 def fill(self): # Send fill message message = bytearray([int.from_bytes(START_BYTE, byteorder="big"), 0, 0, 0, 0]) self.__messageID = 0 print("fill sent") # Send message self.__send(message) def registerAsyncReceiver(self, message_id, method): t1 = method t2 = message_id def registerSyncReceiver(self, message_id, method): t1 = method t2 = message_id def __send(self, a_message): # First calculate CRC a_message[-1] = self.__calculate_crc(a_message) # Send command via serial port self.__serialPort.write(a_message) # Check message ID self.__messageID = int.from_bytes(a_message[1:2], byteorder='big', signed=False) def __calculate_crc(self, a_message): # a_command includes the last byte for crc return sum(a_message[0:-1]) % 256 if __name__ == "__main__": thedg = DG() # \var thedg sleep(2) thedg.fill() sleep(2) thedg.fill() sleep(1) thedg.fill() sleep(2) thedg.fill()