""" \file HemodialysisDevice.py \brief Comprises API interface for the DG board. """ from DialityCoreSerialProtocol import DialitySerialMessenger from DialityCoreSerialProtocol import START_BYTE from time import sleep """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DG - \ref HD - \ref UI """ class HD: """ \class HD \brief Hemodialysis Device (HD) Dialin object API. It provides the basic interface to communicate with the HD board """ def __init__(self, port="/dev/ttyUSB0"): """ HD constructor using serial port name \param port: serial port name, e.g. "/dev/ttyUSB0" \returns HD object that allows communication with board via port \details For example: hd_object = HD(port='/dev/ttyUSB69') or hd_object = HD('/dev/ttyUSB69') """ # Create listener self.__serialPort = DialitySerialMessenger(serial_port=port) self.__serialPort.start() self.__messageID = 0 def login(self): # Send fill message message = bytearray([int.from_bytes(START_BYTE, byteorder="big"), 0, 0, 0, 0, 0, 0, 0]) self.__messageID = 8000 #message[1:2] = int.from_bytes(self.__messageID, byteorder='little') message[1] = int.from_bytes(b'\x40',byteorder='big') message[2] = int.from_bytes(b'\x1F',byteorder='big') message[3] = int.from_bytes(b'\x03',byteorder='big') message[4] = int.from_bytes(b'\x31', byteorder='big') message[5] = int.from_bytes(b'\x32', byteorder='big') message[6] = int.from_bytes(b'\x33', byteorder='big') print("login") # Send message send_event = self.__send(message) message_received = send_event.wait(1) if(message_received): print("I have a response") received_message = self.__getmessage() if len(received_message) != 0: print(received_message) print("Logged:" + str(received_message[4])) return received_message[4] else: print("Timeout!!!!") return False def OffButton(self): # Send fill message message = bytearray([int.from_bytes(START_BYTE, byteorder="big"), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) self.__messageID = 8000 # message[1:2] = int.from_bytes(self.__messageID, byteorder='little') message[1] = int.from_bytes(b'\x42', byteorder='big') message[2] = int.from_bytes(b'\x1F', byteorder='big') message[3] = int.from_bytes(b'\x05', byteorder='big') message[4] = int.from_bytes(b'\x00', byteorder='big') message[5] = int.from_bytes(b'\x01', byteorder='big') message[6] = int.from_bytes(b'\x00', byteorder='big') message[7] = int.from_bytes(b'\x00', byteorder='big') message[8] = int.from_bytes(b'\x00', byteorder='big') print("override off button") # Send message send_event = self.__send(message) message_received = send_event.wait(1) if (message_received): print("I have a response") received_message = self.__getmessage() if len(received_message) != 0: print(received_message) print("Logged:" + str(received_message[4])) return received_message[4] else: print("Timeout!!!!") return False def registerAsyncReceiver(self, message_id, method): t1 = method t2 = message_id def registerSyncReceiver(self, message_id, method): t1 = method t2 = message_id def __send(self, a_message): # First calculate CRC ##a_message[-1] = self.__calculate_crc(a_message) print('Sending: '+str(a_message)) # Send command via serial port return self.__serialPort.write(a_message) def __getmessage(self): return self.__serialPort.getMessage() def __calculate_crc(self, a_message): # a_command includes the last byte for crc return sum(a_message[0:-1]) % 256 if __name__ == "__main__": thehd = HD() # \var thedg sleep(2) thehd.login() sleep(2) thehd.OffButton()