########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_simulator.py # # @date 29-July-2020 # @author P. Lucia # # @brief This class simulates the hd when interfacing with the UI # ############################################################################ from ..protocols.CAN import (DenaliMessage, DenaliCanMessenger, DenaliChannels) from ..utils.base import _AbstractSubSystem, _LogManager, _publish from ..utils.conversions import integer_to_bytearray from ..hd.buttons import HDButtons from .hd_simulator_alarms import HDProxyAlarms import enum from typing import List from ..common.msg_defs import RequestRejectReasons, MsgIds import struct from time import sleep YES=1 NO=0 class HDSimulator(_AbstractSubSystem): NUM_TREATMENT_PARAMETERS = 18 def __init__(self, can_interface="can0", log_level=None): super().__init__() self._log_manager = _LogManager(log_level=log_level, log_filepath=self.__class__.__name__+".log") self.logger = self._log_manager.logger self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, log_can=self._log_manager.log_level == "CAN_ONLY") self.can_interface.start() if self.can_interface is not None: channel_id = DenaliChannels.ui_to_hd_ch_id self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_START_TREATMENT.value, self._handler_ui_start_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS.value, self._handler_ui_validate_parameters) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_USER_CONFIRM_TREATMENT_PARAMS.value, self._handler_ui_confirm_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_TREATMENT_END_REQUEST.value, self._handler_ui_end_treatment) self.alarms_simulator = HDProxyAlarms(self.can_interface, self.logger) def cmd_send_treatment_parameter_validation_response(self, rejections: List[RequestRejectReasons]): """ Sends a treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, enum.Enum) for each in rejections]): self.logger.error("Not all rejections are enums.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_treatment_parameter_manual_validation_response(self, rejections: int): """ Sends a manually built treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, int) for each in rejections]): self.logger.error("Not all rejections are enums.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_poweroff_button_pressed(self): """ Broadcast that the poweroff button was pressed @return: None """ payload = bytearray(0x01) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=HDButtons.MSG_ID_HD_POWEROFF_OPEN, payload=payload) self.can_interface.send(message, 0) def cmd_send_broadcast_poweroff_imminent(self): """ Broadcast that the system will shut down @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=HDButtons.MSG_ID_HD_POWEROFF_BROADCAST, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_timeout(self): """ Sends a poweroff timeout @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=HDButtons.MSG_ID_HD_POWEROFF_TIMEOUT, payload=payload) self.can_interface.send(message, 0) def cmd_send_priming_time_remaining(self, state, seconds_remaining, seconds_total): """ Broadcasts the number of seconds remaining in priming to the UI @return: None """ payload = bytearray() payload += integer_to_bytearray(state) payload += integer_to_bytearray(seconds_remaining) payload += integer_to_bytearray(seconds_total) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRIMING_STATUS.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_confirm_treatment(self, message): """ Handler function to detect when a treatment is confirmed @param message: the confirm treatment message @return: None """ self.logger.debug("Received UI confirmation of Treatment Parameters. ") state = 0 total_seconds = 100 for seconds_remaining in range(total_seconds, -1, -1): if seconds_remaining % (total_seconds // 3) == 0: state += 1 self.cmd_send_priming_time_remaining(state, seconds_remaining, total_seconds) sleep(0.05) def _handler_ui_start_treatment(self, message): """ Handler function to start a treatment @param message: the start treatment message @return: None """ START_POS = DenaliMessage.PAYLOAD_START_INDEX END_POS = START_POS + 4 request = struct.unpack('i', bytearray( message['message'][START_POS:END_POS]))[0] if request == 0: self.logger.debug("Selecting treatment parameters") elif request == 1: self.logger.debug("Canceling treatment") elif request == 2: self.logger.debug("Starting treatment") self.cmd_send_start_treatment_response(YES, 0) def cmd_send_start_treatment_response(self, response, reason): """ Sends a start treatment response message @param response: 0=NO, 1=YES @return: None """ print("Sending: {0}".format(response)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_end_treatment(self, message): """ @param message: @return: """ self.logger.debug("Ending Treatment") def _handler_ui_validate_parameters(self, message): """ @param message: @return: """ rejections = [ RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # requestValid RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # bloodFlowRate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # dialysateFlowRate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # duration RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # heparinStopTime RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # salineBolus RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # acidConcentrate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # bicarbonateConcentrate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # dialyzerType RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # bloodPressureMeasureInterval RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # rinsebackFlowRate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # arterialPressureLimitLow RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # arterialPressureLimitHigh RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # venousPressureLimitLow RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # venousPressureLimitHigh RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # heparinDispensingRate RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # heparinBolusVolume RequestRejectReasons.REQUEST_REJECT_REASON_NONE, # dialysateTemp ] self.cmd_send_treatment_parameter_validation_response(rejections)