########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file PressureOcclusion.py # # @date 1-Apr-2020 # @author P. Lucia # # @brief # # ############################################################################ from DialIn.CoreCANProtocol import (DenaliMessage, DenaliChannels) class HDBasics: """ \class HDBasics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ # Basic message IDs MSG_ID_LOGIN_TO_HD = 0x8000 MSG_ID_HD_MSG = 0x8001 # HD login password HD_LOGIN_PASSWORD = '123' def __init__(self, can_interface): """ HDBasics constructor \param can_interface: the denali can interface object """ self.can_interface = can_interface def cmd_log_in_to_hd(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the HD. \returns 1 if logged in, 0 if log in failed """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOGIN_TO_HD, payload=list(map(int, map(ord, self.HD_LOGIN_PASSWORD)))) print("login") # Send message received_message = self.can_interface.send(message) if received_message is not None: if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: print("Logged In") else: print("Log In Failed.") return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_hd_message_insert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n be inserted into the HD's received message queue for processing. \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_MSG, payload=msg) print("insert HD message") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: print("Inserted message: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False