########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_simulator.py # # @author (last) Quang Nguyen # @date (last) 11-Aug-2021 # @author (original) Peter Lucia # @date (original) 06-Aug-2020 # ############################################################################ import enum import test from time import sleep from . import messageBuilder from ..common import * from ..protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from ..utils import * from ..utils.base import AbstractSubSystem, LogManager class HDSimulator(AbstractSubSystem): NUM_TREATMENT_PARAMETERS = 19 instance_count = 0 # UI version message field positions START_POS_MAJOR = DenaliMessage.PAYLOAD_START_INDEX END_POS_MAJOR = START_POS_MAJOR + 1 START_POS_MINOR = END_POS_MAJOR END_POS_MINOR = START_POS_MINOR + 1 START_POS_MICRO = END_POS_MINOR END_POS_MICRO = START_POS_MICRO + 1 START_POS_BUILD = END_POS_MICRO END_POS_BUILD = START_POS_BUILD + 2 START_POS_COMPAT = END_POS_BUILD END_POS_COMPAT = START_POS_COMPAT + 4 def __init__(self, can_interface: str = "can0", log_level: bool = None, console_out: bool = False, passive_mode: bool = False, auto_response: bool = False): """ The HDSimulator constructor @param can_interface: (str) the can interface name @param log_level: (str) or (None) if not set, contains the logging level @param console_out: (bool) If True, write each dialin message to the console. """ super().__init__() HDSimulator.instance_count = HDSimulator.instance_count + 1 self.checked_in = False self.checked_in_last = 0 self._log_manager = LogManager(log_level=log_level, log_filepath=self.__class__.__name__ + ".log") self.logger = self._log_manager.logger self.console_out = console_out self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, log_can=self._log_manager.log_level == "CAN_ONLY", console_out=console_out, passive_mode=passive_mode) self.can_interface.start() if self.can_interface is not None: channel_id = DenaliChannels.ui_to_hd_ch_id if auto_response: self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_CHECK_IN.value, self._handler_ui_first_check_in) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_INITIATE_TREATMENT_REQUEST.value, self._handler_ui_initiate_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_SET_UF_VOLUME_PARAMETER.value, self._handler_ui_pre_treatment_uf_request) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS.value, self._handler_ui_validate_parameters) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_USER_CONFIRM_TREATMENT_PARAMS.value, self._handler_ui_confirm_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_TX_END_CMD.value, self._handler_ui_end_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_HD_SET_RTC_REQUEST.value, self._handler_set_rtc_request) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_REQUEST_FW_VERSIONS.value, self._handler_request_hd_version) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_UI_REQUEST_SERVICE_INFO.value, self._handler_system_usage_response) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIds.MSG_ID_HD_UI_VERSION_INFO_RESPONSE.value, self._handler_ui_post_ui_version_compatibility) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIds.MSG_ID_HD_UI_VERSION_INFO_RESPONSE.value, self._handler_ui_version) self.treatment_parameter_rejections = TreatmentParameterRejections() # initialize variables that will be populated by UI version response self.ui_version = None def get_ui_version(self): """ Gets the ui version @return: The ui version """ return self.ui_version def _handler_system_usage_response(self) -> None: """ Handles a request for system usage @return: None """ payload = integer_to_bytearray(1619628663) payload += integer_to_bytearray(1619887863) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERVICE_SCHEDULE_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_set_rtc_request(self, message: dict) -> None: """ Handles a UI request to set the HD RTC @param message: (dict) the message containing the request @return: None """ epoch = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.logger.debug("Request to set the HD epoch to {0}".format(epoch)) self.cmd_send_set_rtc_response(YES, 0) def cmd_send_set_rtc_response(self, response, reason): """ Sends a set RTC response message @param response: (int) 0=NO, 1=YES @param reason: the rejection reason @return: None """ self.logger.debug("HD: Sending response {0} reason {1}".format(response, reason)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_UI_SET_RTC_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_parameter_validation_response(self, rejections: List[RequestRejectReasons]): """ Sends a treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, enum.Enum) for each in rejections]): self.logger.error("Not all rejections are enums.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_treatment_parameter_manual_validation_response(self, rejections: List[int]): """ Sends a manually built treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, int) for each in rejections]): self.logger.error("Not all rejections are the correct type.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_poweroff_button_pressed(self): """ Broadcast that the poweroff button was pressed @return: None """ payload = bytearray(0x01) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_broadcast_poweroff_imminent(self): """ Broadcast that the system will shut down @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_POWER_OFF_WARNING.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_timeout(self): """ Sends a poweroff timeout @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_confirm_treatment(self, message): """ Handler function to detect when a treatment is confirmed @param message: the confirm treatment message @return: None """ request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if request == 0: self.logger.debug("Received UI cancel confirmation of Treatment Parameters. ") return self.logger.debug("Received UI confirmation of Treatment Parameters. ") self.logger.debug("Priming ...") state = 0 total_seconds = 100 for seconds_remaining in range(total_seconds, -1, -1): if seconds_remaining % (total_seconds // 3) == 0: state += 1 self.cmd_send_priming_time_remaining(state, seconds_remaining, total_seconds) sleep(0.05) self.logger.debug("Finished priming.") def _handler_ui_pre_treatment_uf_request(self, message): """ Handles the ui pre treatment uf request and sends a response @param message: The ui pretreatment uf request message @return: None """ uf_volume = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.logger.debug("Received UF Volume: {0} mL".format(uf_volume)) self.cmd_send_uf_treatment_response(1, 0, uf_volume) def _handler_ui_first_check_in(self, message)->None: """ Handler function to first check in to start the post @param message: the check-in message @return: None """ now = time.time() # if the application is not checking-in (the simulatoe ) within 2 sec it means it has been stopped, # so do the check-in again. print("check-in: ", now, now - self.checked_in_last) if now - self.checked_in_last > 5: self.checked_in = False self.checked_in_last = now if self.checked_in: return None print("_handler_ui_first_check_in") self.cmd_send_power_on_self_test_version_request() for i in range(20): self.cmd_send_hd_post(i, True, False) sleep(0.1) self.cmd_send_hd_post(0, True, True) self.cmd_send_hd_operation_mode(4, 0) self.checked_in = True def _handler_ui_initiate_treatment(self, message): """ Handler function to start a treatment @param message: the start treatment message @return: None """ request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if request == 0: self.logger.debug("Selecting treatment parameters") self.cmd_send_hd_operation_mode(HDOpModes.MODE_PRET.value) elif request == 1: self.logger.debug("Canceling treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_STAN.value) elif request == 2: self.logger.debug("Starting treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_TREA.value) self.cmd_initiate_treatment_response(YES, 0) def cmd_initiate_treatment_response(self, response: int, reason: int): """ Sends a start treatment response message @param response: 0=NO, 1=YES @param reason: the rejection reason @return: None """ self.logger.debug("Sending: {0}".format(response)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_INITIATE_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_operation_mode(self, op_mode: int, sub_mode: int = 0): """ Broadcasts the current HD operation mode @param op_mode: hd operation mode @param sub_mode: hd operation sub-mode @return: None """ if not isinstance(op_mode, int): raise ValueError("Provided mode is not of type 'int'") if not isinstance(sub_mode, int): raise ValueError("Provided mode is not of type 'int'") payload = integer_to_bytearray(op_mode) payload += integer_to_bytearray(sub_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_uf_treatment_response(self, accepted, reason, volume): """ Sends the uf volume adjustment response message in pre-treatment @param accepted: (uint) the acceptance, 1 = accepted, 0 = rejected @param reason: (uint) the reason for rejection @param volume: (float) the acceptable/accepted ultrafiltration volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_end_treatment_response(self): """ Sends an end treatment response @return: None """ payload = integer_to_bytearray(YES) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_validate_parameters(self) -> None: """ handler for UI parameters validation @return: None """ rejections = [ self.treatment_parameter_rejections.param_request_valid, self.treatment_parameter_rejections.param_blood_flow_rate, self.treatment_parameter_rejections.param_dialysate_flow_rate, self.treatment_parameter_rejections.param_duration, self.treatment_parameter_rejections.param_heparin_stop_time, self.treatment_parameter_rejections.param_saline_bolus, self.treatment_parameter_rejections.param_acid_concentrate, self.treatment_parameter_rejections.param_bicarbonate_concentrate, self.treatment_parameter_rejections.param_dialyzer_type, self.treatment_parameter_rejections.param_blood_pressure_measure_interval, self.treatment_parameter_rejections.param_rinseback_flow_rate, self.treatment_parameter_rejections.param_arterial_pressure_limit_low, self.treatment_parameter_rejections.param_arterial_pressure_limit_high, self.treatment_parameter_rejections.param_venous_pressure_limit_low, self.treatment_parameter_rejections.param_venous_pressure_limit_high, self.treatment_parameter_rejections.param_heparin_dispensing_rate, self.treatment_parameter_rejections.param_heparin_bolus_volume, self.treatment_parameter_rejections.param_dialysate_temp ] self.cmd_send_treatment_parameter_validation_response(rejections) def test_started(self, test_name: str): """ Logs that a test was started @param test_name: The name of the test @return: None """ self.logger.info("Test Started: {0}".format(test_name)) def test_completed(self): """ Logs that a test was completed @return: None """ self.logger.info("Test Completed") def cmd_send_acknowledge_hd(self): """ the acknowledge from HD @return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_send_acknowledge_ui(self): """ the acknowledge from UI @return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.ui_to_hd_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_dialog(self): """ the message from HD to UI to show the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "00", "38"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_hide_poweroff_dialog(self): """ the message from HD to UI to hide the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "01", "09"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_notification_dialog(self): """ the message from HD to UI to show the shutting down notification box @return: none """ payload = ["A5", "01", "00", "0E", "00", "00", "24", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_sync_broadcast_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_rejection_dialog(self): """ the message from HD to UI to show the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "02", "5A"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) @staticmethod def wait_for_message_to_be_sent(delay=0.050): """ After each multi-frame message put a 50ms sleep, time.sleep(0.1) it seems it's needed otherwise the test will check a value which has not been received yet. :@param delay: the number of seconds to wait @return: none """ time.sleep(delay) @staticmethod def build_hd_debug_text(vtext): """ the debug text message from HD builder method @param vtext: (str) the debug text @return: none """ message_length = 40 txt = messageBuilder.textToByte(vtext, message_length) # + 1 null term msg = messageBuilder.buildMessage(GuiActionType.HDDebugText, 1 * (message_length + 1), False, txt) return messageBuilder.toFrames(msg) @staticmethod def cmd_set_hd_debug_text(debug_text): """ the debug text message from HD setter/sender method @param debug_text: (str) the debug text @return: none """ frames = HDSimulator.build_hd_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() @staticmethod def cmd_set_dg_debug_text(debug_text): """ the debug text message from DG setter/sender method @param debug_text: (str) the debug text @return: none """ frames = HDSimulator.build_dg_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '070#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() def cmd_set_treatment_parameter_ranges(self, min_treatment_duration: int, max_treatment_duration: int, min_uf_volume: float, max_uf_volume: float, min_dialysate_flow_rate: int, max_dialysate_flow_rate: int) -> None: """ The Treatment adjustment parameter ranges data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(F32) | #5:(U32) | #6:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1A00| 0x020 | 6 | 1/60 Hz| Y | HD | UI | Treatment adjustment param ranges Data | \ref Data::mDuration_Min | \ref Data::mDuration_Max | \ref Data::mUltrafiltration_Volume_Min | \ref Data::mUltrafiltration_Volume_Max | \ref Data::mDialysate_Flow_Min | \ref Data::mDialysate_Flow_Max | @param min_treatment_duration: (int) Min Treatment Duration @param max_treatment_duration: (int) Max Treatment Duration @param min_uf_volume: (float) Min UF Volume @param max_uf_volume: (float) Max UF Volume @param min_dialysate_flow_rate: (int) Min Dialysate Flow Rate @param max_dialysate_flow_rate: (int) Max Dialysate Flow Rate @return: None """ payload = integer_to_bytearray(min_treatment_duration) payload += integer_to_bytearray(max_treatment_duration) payload += float_to_bytearray(min_uf_volume) payload += float_to_bytearray(max_uf_volume) payload += integer_to_bytearray(min_dialysate_flow_rate) payload += integer_to_bytearray(max_dialysate_flow_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_PARAM_CHANGE_RANGES.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_blood_flow_rate(self, flow_set_pt: int, measured_flow: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float, signal_strength: float) -> None: """ The Blood Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0500| 0x040 | 7 | 1 Hz | N | HD | All | Blood Flow Data | \ref Data::mFlowSetPoint | \ref Data::mMeasuredFlow | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDutyCycle | \ref Data::mSigStrenght | @param flow_set_pt: (int) Flow Set Point @param measured_flow: (float) Measured Flow @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @param signal_strength: (float) Signal strength in percent @return: None """ payload = integer_to_bytearray(flow_set_pt) payload += float_to_bytearray(measured_flow) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) payload += float_to_bytearray(signal_strength) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_BLOOD_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_dialysate_flow_rate(self, flow_set_pt: int, measured_flow: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float, signal_strength: float) -> None: """ The Dialysate Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0800| 0x040 | 7 | 1 Hz | N | HD | All | Dialysate Flow Data | mFlowSetPoint | mMeasuredFlow | mRotorSpeed | mMotorSpeed | mMotorCtlSpeed | mMotorCtlCurrent | mPWMDutyCycle | \ref Data::mSigStrenght | @param flow_set_pt: (signed int) Flow Set Point @param measured_flow: (float) Measured Flow @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @param signal_strength: (float) Signal strength in percent @return: None """ payload = integer_to_bytearray(flow_set_pt) payload += float_to_bytearray(measured_flow) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) payload += float_to_bytearray(signal_strength) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_blood_dialysate_response(self, accepted: int, reason: int, blood_rate: int, dialysate_flow_rate: int) -> None: """ The Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #4:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1800| 0x020 | 6 | Rsp | Y | HD | UI | Blood/dialysate rate change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mBloodRate | \ref Data::mDialysateRate | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param blood_rate: (int) Blood Flow Rate @param dialysate_flow_rate: (int) Dialysate Flow Rate @return: None """ if not isinstance(accepted, int): accepted = int(accepted) if not isinstance(reason, int): reason = int(reason) if not isinstance(blood_rate, int): blood_rate = int(blood_rate) if not isinstance(dialysate_flow_rate, int): dialysate_flow_rate = int(dialysate_flow_rate) payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(blood_rate) payload += integer_to_bytearray(dialysate_flow_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_BLOOD_DIAL_RATE_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_duration_response(self, accepted: int, reason: int, duration: int, ultrafiltration: float) -> None: """ The Treatment Duration change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1B00| 0x020 | 6 | Rsp | Y | HD | UI | Treatment Duration change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mDuration | \ref Data::mUFVolume | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param duration: (int) Treatment Duration @param ultrafiltration: (float) Ultrafiltration Volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(duration) payload += float_to_bytearray(ultrafiltration) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_TREATMENT_TIME_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_state_response(self, accepted: int, reason: int, state: int) -> None: """ the Treatment ultrafiltration adjustment response message setter/sender method @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param state: (int) Ultrafiltration State @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_accepted(self, state: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept true @param state: (int) Ultrafiltration State @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Accepted, 0, state) def cmd_set_treatment_adjust_ultrafiltration_rejected(self, reason: int, state: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept false @param reason: (int) rejection reason @param state: (int) Ultrafiltration State @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Rejected, reason, state) def cmd_set_treatment_adjust_ultrafiltration_init_response(self, accepted: int, reason: int, volume: float) -> None: """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |0x5000| 0x020 | 6 | Rsp | Y | HD | UI | Pre UF Volume Adjustment Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_response(self, accepted: int, reason: int, volume: float, duration: int, duration_diff: int, rate: float, rate_diff: float, rate_old: float) -> None: """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | #6:(U32) | #7:(U32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1300| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | \ref Data::mDurationDiff | \ref Data::mRateDiff | \ref Data::mRateOld | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @param duration: (int) Treatment Duration @param duration_diff: (int) Duration Difference @param rate: (float) Ultrafiltration Rate @param rate_diff: (float) Ultrafiltration Rate Difference @param rate_old: (float) Ultrafiltration Rate Old @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) payload += integer_to_bytearray(duration) payload += integer_to_bytearray(duration_diff) payload += float_to_bytearray(rate) payload += float_to_bytearray(rate_diff) payload += float_to_bytearray(rate_old) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_rejected(self, reason: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_edit_response which only sends a rejection reason and sends other values all as zero @param reason: (int) rejection reason @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_edit_response(0, reason, 0, 0, 0, 0, 0, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_response(self, accepted: int, reason: int, volume: float, duration: int, rate: float) -> None: """ the ultrafiltration volume Change Confirmation Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x2E00| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Confirmation Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @param duration: (int) Treatment Duration @param rate: (float) Ultrafiltration Rate @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) payload += integer_to_bytearray(duration) payload += float_to_bytearray(rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_CONFIRMATION_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_rejected(self, reason: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_confirm_response which only sends a rejection reason and sends other values all as zero @param reason: (int) rejection reason @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_confirm_response(0, reason, 0, 0, 0) def cmd_set_treatment_time(self, sec_total: int, sec_elapsed: int, sec_remain: int = 0) -> None: """ the Treatment Time Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x0D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Time Data | \ref Data::mTotal | \ref Data::mElapsed | \ref Data::mRemaining | @param sec_total: (int) Treatment Total Duration in Seconds @param sec_elapsed: (int) Treatment Total Elapsed Time in Seconds @param sec_remain: (int) Treatment Remaining Time in Seconds @return: none """ if sec_remain is None: sec_remain = sec_total - sec_elapsed payload = integer_to_bytearray(sec_total) payload += integer_to_bytearray(sec_elapsed) payload += integer_to_bytearray(sec_remain) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_TIME.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_ultrafiltration_outlet_flow_data(self, ref_uf_vol: float, measured_uf_vol: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float) -> None: """ the Outlet Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0B00| 0x040 | 7 | 1 Hz | N | HD | All | Outlet Flow Data | \ref Data::mRefUFVol | \ref Data::mMeasUFVol | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDtCycle | @param ref_uf_vol: (float) Ref UF Volume @param measured_uf_vol: (float) Measured UF Volume @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @return: none """ payload = float_to_bytearray(ref_uf_vol) payload += float_to_bytearray(measured_uf_vol) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_OUT_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_pressure_occlusion_data(self, arterial_prs: float, venous_prs: float, blood_pump_occlusion: int, dialysate_inlet_pump_occlusion: int, dialysate_outlet_pump_occlusion) -> None: """ the Pressure/Occlusion Data messages setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x0900| 0x040 | 7 | 1 Hz | N | HD | All | PressureOcclusion Data | \ref Data::mArterialPressure | \ref Data::mVenousPressure | \ref Data::mBloodPumpOcclusion | \ref Data::mDialysateInletPumpOcclusion | \ref Data::mDialysateOutletPumpOcclusion | @param arterial_prs: (float) Arterial Pressure @param venous_prs: (float) Venous Pressure @param blood_pump_occlusion: (uint) Blood Pump Occlusion @param dialysate_inlet_pump_occlusion: (uint) Dialysate Inlet Pump Occlusion @param dialysate_outlet_pump_occlusion: (uint) Dialysate Outlet Pump Occlusion @return: none """ payload = float_to_bytearray(arterial_prs) payload += float_to_bytearray(venous_prs) payload += integer_to_bytearray(blood_pump_occlusion) payload += integer_to_bytearray(dialysate_inlet_pump_occlusion) payload += integer_to_bytearray(dialysate_outlet_pump_occlusion) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRESSURE_OCCLUSION_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_states_data(self, sub_mode: int, uf_state: int, saline_state: int, heparin_state: int, rinseback_state: int, recirculate_state: int, blood_prime_state: int, treatment_end_state: int, treatment_stop_state: int) -> None: """ the Treatment States Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x0F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment States Data | | #1:(U32) | #2:(U32) | #3:(U32) || |:--: |:--: |:--: || | \ref Data::mSubMode | \ref Data::mUFState | \ref Data::mSalineState || |||| | #4:(U32) | #5:(U32) | #6:(U32) || |:--: |:--: |:--: || | \ref Data::mHeparinState | \ref Data::mRinsebackState | \ref Data::mRecirculateState || |||| | #7:(U32) | #8:(U32) | #9:(U32) || |:--: |:--: |:--: || | \ref Data::vBloodPrimeState | \ref Data::mTreatmentEndState | \ref Data::vTreammentStopState || @param sub_mode: (int) Sub-Mode @param uf_state: (int) UF State @param saline_state: (int) Saline Bolus State @param heparin_state: (int) Heparin State @param rinseback_state: (int) Rinseback State @param recirculate_state: (int) Recirculate State @param blood_prime_state: (int) Blood Prime State @param treatment_end_state: (int) Treatment End State @param treatment_stop_state: (int) Treatment Stop State @return: none """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(uf_state) payload += integer_to_bytearray(saline_state) payload += integer_to_bytearray(heparin_state) payload += integer_to_bytearray(rinseback_state) payload += integer_to_bytearray(recirculate_state) payload += integer_to_bytearray(blood_prime_state) payload += integer_to_bytearray(treatment_end_state) payload += integer_to_bytearray(treatment_stop_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_STATE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_start_state(self) -> None: """ starting the treatment for user convenience since Tx is not by default running @return: none """ self.cmd_set_treatment_states_data(TXStates.TREATMENT_DIALYSIS_STATE, TXStates.UF_OFF_STATE, TXStates.SALINE_BOLUS_STATE_IDLE, TXStates.HEPARIN_STATE_OFF, TXStates.RINSEBACK_STOP_INIT_STATE, TXStates.TREATMENT_RECIRC_RECIRC_STATE, TXStates.BLOOD_PRIME_RAMP_STATE, TXStates.TREATMENT_END_WAIT_FOR_RINSEBACK_STATE, TXStates.TREATMENT_STOP_RECIRC_STATE) def cmd_set_hd_operation_mode_data(self, operation_mode: int, operation_sub_mode: int) -> None: """ The HD Operation Mode Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x2500| 0x040 | 7 | 1 Hz | N | HD | All | HD Operation Mode Data | \ref Data::mOpMode | @param operation_mode: (int) Operation Mode @param operation_sub_mode: (int) Operation Sub-Mode @return: None """ payload = integer_to_bytearray(operation_mode) payload += integer_to_bytearray(operation_sub_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_saline_bolus_data(self, target: int, cumulative: float, delivered: float) -> None: """ the Treatment Saline Bolus Data message sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(F32) | #3:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Saline Bolus Data | \ref Data::mTarget | \ref Data::mCumulative | \ref Data::mDelivered | @param target: (int) Saline Bolus Target Volume @param cumulative: (float) Saline Bolus Cumulative Volume @param delivered: (float) Saline Bolus Delivered Volume @return: none """ payload = integer_to_bytearray(target) payload += float_to_bytearray(cumulative) payload += float_to_bytearray(delivered) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_SALINE_BOLUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_saline_bolus_response(self, accepted: int, reason: int, target: int, state: int) -> None: """ the Saline Bolus Response message sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | #3:(U32) | #3:(U32) | |:---:|:------:|:-----:|:----:|:---:|:---:|:----:|:---------------------:|:--------------------:|:-------------------:|:-------------------:|:-------------------:| | 20 | 0x020 | 6 | Rsp | Y | HD | UI | Saline Bolus Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mTarget | \ref Data::mState | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param target: (int) Saline Bolus Target Volume @param state: (int) Saline Bolus current State @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(target) payload += integer_to_bytearray(state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_SALINE_BOLUS_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_canbus_fault_count(self, count: int) -> None: """ the CANBus fault count message setter/sender method @param count: (int) Fault Count @return: none """ payload = integer_to_bytearray(count) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_CAN_ERROR_COUNT.value, payload=payload) self.can_interface.send(message, 0) HDSimulator.wait_for_message_to_be_sent() def cmd_send_unknown_hd(self) -> None: """ the unknown message from HD setter/sender method @return: none """ payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_UNUSED.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_heparin_data(self, cumulative: float) -> None: """ the Treatment Heparin Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x4D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Heparin Data | \ref Data::mCumulative | @param cumulative: (float) Heparin Cumulative Volume @return: none """ payload = float_to_bytearray(cumulative) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_DATA_BROADCAST.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_heparin_pause_resume_response(self, accepted: int, reason: int, state: int) -> None: """ the Heparin Response message setter/sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:-----:|:----:|:---:|:---:|:----:|:----------------:|:--------------------:|:-------------------:|:-------------------:| |0x4C00| 0x020 | 6 | Rsp | Y | HD | UI | Heparin Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mState | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param state: (int) Heparin current State @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_pressures_limit_response(self, accepted: int, reason: int, arterial_low: int, arterial_high: int, venous_low: int, venous_high: int) -> None: """ the Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(S32) | #4:(S32) | #3:(S32) | #4:(S32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x4700| 0x020 | 6 | Rsp | Y | HD | UI | A/V BP Limit Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mArterialLow | \ref Data::mArterialHigh | \ref Data::mVenousLow | \ref Data::mVenousHigh | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param arterial_low: (int) Arterial Pressure Limit Low (mmHg) @param arterial_high: (int) Arterial Pressure Limit High (mmHg) @param venous_low: (int) Venous Pressure Limit Low (mmHg) @param venous_high: (int) Venous Pressure Limit High (mmHg) @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(arterial_low) payload += integer_to_bytearray(arterial_high) payload += integer_to_bytearray(venous_low) payload += integer_to_bytearray(venous_high) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PRESSURE_LIMITS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_rinseback_response(self, accepted: int, reason: int) -> None: """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5300| 0x020 | 6 | Rsp | Y | HD | UI | Rinseback State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason : (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_rinseback_data(self, vTarget, vCurrent, vRate, vTimeoutTotal, vTimeoutCountDown, vSafetyVolume): """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x5600| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | :param vTarget : (float) the target volume in mL :param vCurrent : (float) the current volume in mL :param vRate : (uint ) the current flow rate in mL/min :param vTimeoutTotal : (uint ) Total Timeout :param vTimeoutCountDown: (uint ) Current Timeout count down :param vSafetyVolume : (float) Safety Volume :return: None """ payload = float_to_bytearray(vTarget) payload += float_to_bytearray(vCurrent) payload += integer_to_bytearray(vRate) payload += integer_to_bytearray(vTimeoutTotal) payload += integer_to_bytearray(vTimeoutCountDown) payload += float_to_bytearray(vSafetyVolume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_recirculate_data(self, timeout_total: int, timeout_count_down: int) -> None: """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5A00| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | @param timeout_total: (int) Total Timeout @param timeout_count_down: (int) Current Timeout count down @return: None """ payload = integer_to_bytearray(timeout_total) payload += integer_to_bytearray(timeout_count_down) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_blood_prime_data(self, target: float, current: float): """ the bloodprime state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x5900| 0x020 | 6 | 1Hz | N | HD | UI | bloodprime progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | @param target : (float) the target volume in mL @param current : (float) the current volume in mL @return: None """ payload = float_to_bytearray(target) payload += float_to_bytearray(current) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_PRIME_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_recirculate_response(self, accepted: int, reason: int) -> None: """ the recirculate state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5500| 0x020 | 6 | Rsp | Y | HD | UI | Recirculate State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason : (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_end_treatment(self, message: dict) -> None: """ Handler function when received a request to end a treatment @param message: (dict) the end treatment request @return: None """ self.logger.debug("End treatment requested") request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if request == 0: self.logger.debug("Request to start rinseback") self.cmd_send_treatment_adjust_end_response(accepted=YES, reason=0) else: self.logger.debug("End treatment unknown request") def cmd_send_treatment_adjust_end_response(self, accepted: int, reason: int): """ the treatment end state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5800| 0x020 | 6 | Rsp | Y | HD | UI | Treatment End State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_accelerometer_hd_data(self, x: float, y: float, z: float, x_max: float, y_max: float, z_max: float, x_tilt: float, y_tilt: float, z_tilt: float) -> None: """ the accelerometer hd data message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x3300| 0x040 | 7 | 1Hz | N | HD | UI | HD Accelerometer data | | #1:(F32) | #2:(F32) | #3:(U32) | |:--: |:--: |:--: | | \ref Data::mX | \ref Data::mY | \ref Data::mX | | #4:(F32) | #5:(F32) | #6:(U32) | |:--: |:--: |:--: | | \ref Data::mXMax | \ref Data::mYMax | \ref Data::mXMax | | #7:(F32) | #8:(F32) | #9:(U32) | |:--: |:--: |:--: | | \ref Data::mXTilt | \ref Data::mYTilt | \ref Data::mXTilt | @param x: float - x axis @param y: float - y axis @param z: float - z axis @param x_max: float - x axis max @param y_max: float - y axis max @param z_max: float - z axis max @param x_tilt: float - x axis tilt @param y_tilt: float - y axis tilt @param z_tilt: float - z axis tilt @return: None """ payload = float_to_bytearray(x) payload += float_to_bytearray(y) payload += float_to_bytearray(z) payload += float_to_bytearray(x_max) payload += float_to_bytearray(y_max) payload += float_to_bytearray(z_max) payload += float_to_bytearray(x_tilt) payload += float_to_bytearray(y_tilt) payload += float_to_bytearray(z_tilt) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_ACCELEROMETER_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_request_hd_version(self) -> None: """ Handles a request for the HD version @return: None """ self.logger.debug("Handling request for hd version.") self.cmd_send_version_hd_data(9, 9, 9, 9, 9, 9, 9, 9) self.cmd_send_hd_serial_number() def cmd_send_hd_serial_number(self) -> None: """ Sends the hd serial number response @return: None """ self.logger.debug("Sending hd serial number...") payload = bytearray('0123456789\0', encoding="utf-8") message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERIAL_NUMBER.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_version_hd_data(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int) -> None: """ the hd version response message method @param major: integer - Major version number @param minor: integer - Minor version number @param micro: integer - Micro version number @param build: integer - Build version number @param fpga_id: integer - FPGA id version number @param fpga_major: integer - FPGA Major version number @param fpga_minor: integer - FPGA Minor version number @param fpga_lab: integer - FPGA Lab version number @return: None """ payload = byte_to_bytearray(major) payload += byte_to_bytearray(minor) payload += byte_to_bytearray(micro) payload += short_to_bytearray(build) payload += byte_to_bytearray(fpga_id) payload += byte_to_bytearray(fpga_major) payload += byte_to_bytearray(fpga_minor) payload += byte_to_bytearray(fpga_lab) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_VERSION.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_serial_hd_data(self, serial: str): """ the hd version serial response message method @param serial: serial number @return: None """ payload = bytes(serial, 'ascii') + b'\x00' message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERIAL_NUMBER.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_state_data(self, sub_mode: int, water_sample_state: int, consumables_self_test_state: int, no_cartridge_self_test_state: int, installation_state: int, dry_self_test_state: int, prime_state: int, recirculate_state: int, patient_connection_state: int) -> None: """ sends the broadcast message of the pre-treatment states @param sub_mode : (int) the main pre treatment state @param water_sample_state : (int) water sample state @param consumables_self_test_state : (int) consumables self test state @param no_cartridge_self_test_state : (int) no cartridge self-test state @param installation_state : (int) installation state @param dry_self_test_state : (int) dry self-test state @param prime_state : (int) prime state @param recirculate_state : (int) recirculate state @param patient_connection_state : (int) patient connection state @return: """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(water_sample_state) payload += integer_to_bytearray(consumables_self_test_state) payload += integer_to_bytearray(no_cartridge_self_test_state) payload += integer_to_bytearray(installation_state) payload += integer_to_bytearray(dry_self_test_state) payload += integer_to_bytearray(prime_state) payload += integer_to_bytearray(recirculate_state) payload += integer_to_bytearray(patient_connection_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRE_TREATMENT_STATE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_water_sample_response(self, accepted: int, reason: int) -> None: """ send the pretreatment water sample response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SAMPLE_WATER_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_no_cartridge_progress_data(self, total: int, countdown: int) -> None: """ send the pretreatment no cartridge self-tests progress data @param total: (int) Total time in second @param countdown: (int) count down time in second @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NO_CART_SELF_TEST_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_dry_progress_data(self, total: int, countdown: int) -> None: """ send the pretreatment dry self-tests progress data @param total: (int) Total time in second @param countdown: (int) count down time in second @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DRY_SELF_TEST_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_disposables_prime_progress_data(self, timeout: int, countdown: int) -> None: """ Broadcasts the progress time data of pre-treatment disposables priming to the UI @param timeout : (int) the total progress time timeout in seconds @param countdown: (int) the remaining time in seconds @return: None """ payload = bytearray() payload += integer_to_bytearray(timeout) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_PRIMING_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_prime_start_response(self, accepted: int, reason: int) -> None: """ send the pretreatment prime start response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_PRIME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_continue_to_treament_response(self, accepted: int, reason: int) -> None: """ send the pretreatment continue to treatment response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_patient_connection_confirm_response(self, accepted: int, reason: int) -> None: """ send the pretreatment patient connection confirm response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PATIENT_CONNECTION_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_disposable_removal_confirm_response(self, accepted: int, reason: int) -> None: """ send post treatment disposable removal confirm response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISPOSABLE_REMOVAL_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_disposable_removal_confirm_response(self, accepted: int, reason: int) -> None: """ send post treatment disposable removal confirm response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISPOSABLE_REMOVAL_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_log_response(self, accepted: bool, reason: int, bood_flow_rate: int, dialysate_flow_rate: int, treatment_duration: int, actual_treatment_duration: int, acid_concentrate_type: int, bicarbonate_concentrate_type: int, potassium_concentration: int, calcium_concentration: int, bicarbonate_concentration: int, sodium_concentration: int, dialysate_temperature: float, dialyzer_type: int, treatment_start_date_time: int, treatment_end_date_time: int, average_blood_flow: float, average_dialysate_flow: float, dialysate_volume_used: float, average_dialysate_temp: float, origin_uf_volume: float, target_uf_volume: float, actual_uf_volume: float, origin_uf_rate: float, target_uf_rate: float, actual_uf_rate: float, saline_bolus_volume: int, heparin_bolus_volume: float, heparin_dispense_rate: float, heparin_pre_stop: int, heparin_delivered_volume: float, heparin_type: int, average_arterial_pressure: float, average_venous_pressure: float, device_id: int, water_sample_test_result: int ) -> None: """ send post treatment log response @param accepted: true if accpeted @param reason: the rejection reason @param bood_flow_rate: bood flow rate @param dialysate_flow_rate: dialysate flow rate @param treatment_duration: treatment duration @param actual_treatment_duration: actual treatment duration @param acid_concentrate_type: acid concentrate type @param bicarbonate_concentrate_type: bicarbonate concentrate type @param potassium_concentration: potassium concentration @param calcium_concentration: calcium concentration @param bicarbonate_concentration: bicarbonate concentration @param sodium_concentration: sodium concentration @param dialysate_temperature: dialysate temperature @param dialyzer_type: dialyzer type @param treatment_start_date_time: treatment start date time @param treatment_end_date_time: treatment end date time @param average_blood_flow: average blood flow @param average_dialysate_flow: average dialysate flow @param dialysate_volume_used: dialysate volume used @param average_dialysate_temp: average dialysate temp @param origin_uf_volume: origin uf volume @param target_uf_volume: target uf volume @param actual_uf_volume: actual uf volume @param origin_uf_rate: origin uf rate @param target_uf_rate: target uf rate @param actual_uf_rate: actual uf rate @param saline_bolus_volume: saline bolus volume @param heparin_bolus_volume: heparin bolus volume @param heparin_dispense_rate: heparin dispense rate @param heparin_pre_stop: heparin pre stop @param heparin_delivered_volume: heparin delivered volume @param heparin_type: heparin type @param average_arterial_pressure: average arterial pressure @param average_venous_pressure: average venous pressure @param device_id: device id @param water_sample_test_result: water sample test result @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += unsigned_to_bytearray(int(bood_flow_rate)) payload += unsigned_to_bytearray(int(dialysate_flow_rate)) payload += unsigned_to_bytearray(int(treatment_duration)) payload += unsigned_to_bytearray(int(actual_treatment_duration)) payload += unsigned_to_bytearray(int(acid_concentrate_type)) payload += unsigned_to_bytearray(int(bicarbonate_concentrate_type)) payload += unsigned_to_bytearray(int(potassium_concentration)) payload += unsigned_to_bytearray(int(calcium_concentration)) payload += unsigned_to_bytearray(int(bicarbonate_concentration)) payload += unsigned_to_bytearray(int(sodium_concentration)) payload += float_to_bytearray(float(dialysate_temperature)) payload += unsigned_to_bytearray(int(dialyzer_type)) payload += unsigned_to_bytearray(int(treatment_start_date_time)) payload += unsigned_to_bytearray(int(treatment_end_date_time)) payload += float_to_bytearray(float(average_blood_flow)) payload += float_to_bytearray(float(average_dialysate_flow)) payload += float_to_bytearray(float(dialysate_volume_used)) payload += float_to_bytearray(float(average_dialysate_temp)) payload += float_to_bytearray(float(origin_uf_volume)) payload += float_to_bytearray(float(target_uf_volume)) payload += float_to_bytearray(float(actual_uf_volume)) payload += float_to_bytearray(float(origin_uf_rate)) payload += float_to_bytearray(float(target_uf_rate)) payload += float_to_bytearray(float(actual_uf_rate)) payload += unsigned_to_bytearray(int(saline_bolus_volume)) payload += float_to_bytearray(float(heparin_bolus_volume)) payload += float_to_bytearray(float(heparin_dispense_rate)) payload += unsigned_to_bytearray(int(heparin_pre_stop)) payload += float_to_bytearray(float(heparin_delivered_volume)) payload += unsigned_to_bytearray(int(heparin_type)) payload += float_to_bytearray(float(average_arterial_pressure)) payload += float_to_bytearray(float(average_venous_pressure)) payload += unsigned_to_bytearray(int(device_id)) payload += unsigned_to_bytearray(int(water_sample_test_result)) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_DATA_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_data(self, blood_flow_rate: int, dialysate_flow_rate: int, uf_rate: float, arterial_pressure: float, venous_pressure: float) -> None: """ send the treatment log data @param blood_flow_rate: (int) blood flow rate @param dialysate_flow_rate: (int) Dialysate Flow Rate @param uf_rate: (float) UF Rate @param arterial_pressure: (float) Arterial Pressure @param venous_pressure: (float) Venous Pressure @return: None """ payload = integer_to_bytearray(blood_flow_rate) payload += integer_to_bytearray(dialysate_flow_rate) payload += float_to_bytearray(uf_rate) payload += float_to_bytearray(arterial_pressure) payload += float_to_bytearray(venous_pressure) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_PERIODIC_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_alarm(self, alarm_id: int, parameter1: float, parameter2: float) -> None: """ send the treatment log data @param alarm_id: (U32) alarm ID @param parameter1: (F32) paramter 1 (it's not clear yet how many paramters with what type is required and this is only plceholder) @param parameter2: (F32) paramter 2 (it's not clear yet how many paramters with what type is required and this is only plceholder) @return: None """ payload = integer_to_bytearray(alarm_id) payload += float_to_bytearray(parameter1) payload += float_to_bytearray(parameter2) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_ALARM_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_event(self, event_id: int, old_value: float, new_value: float) -> None: """ send the treatment log data @param event_id: (U32) alarm ID @param old_value: (F32) the old value @param new_value: (F32) the new value @return: none """ payload = integer_to_bytearray(event_id) payload += float_to_bytearray(old_value) payload += float_to_bytearray(new_value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfection_state(self, sub_mode: int, flush_mode: int, heat_mode: int, chemical_mode: int) -> None: """ Broadcasts the current DG disinfection mode @param sub_mode: (int) disinfect states @param flush_mode: (int) flush states @param heat_mode: (int) heat states @param chemical_mode: (int) chemical states @return: None """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(flush_mode) payload += integer_to_bytearray(heat_mode) payload += integer_to_bytearray(chemical_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_DISINFECT_STANDBY_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfect_response(self, accepted: bool, reason: int) -> None: """ the HD response to the request from UI to initiate a disinfection/flush @param accepted: boolean accepted or rejected @param reason: the rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISINFECT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfect_chemical_confirm(self, accepted: bool, reason: int) -> None: """ the HD response to the UI sending the user chimical disinfection steps confirm. @param accepted: boolean accepted or rejected @param reason: the rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_CHEM_DISINFECT_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) @publish(["ui_version"]) def _handler_ui_version(self, message) -> None: """ Handles the ui version response @param message: The ui version response message @return: None """ major = struct.unpack('B', bytearray( message['message'][self.START_POS_MAJOR:self.END_POS_MAJOR])) minor = struct.unpack('B', bytearray( message['message'][self.START_POS_MINOR:self.END_POS_MINOR])) micro = struct.unpack('B', bytearray( message['message'][self.START_POS_MICRO:self.END_POS_MICRO])) build = struct.unpack('H', bytearray( message['message'][self.START_POS_BUILD:self.END_POS_BUILD])) compatibility = struct.unpack('H', bytearray( message['message'][self.START_POS_COMPAT:self.END_POS_COMPAT])) if all([len(each) > 0 for each in [major, minor, micro, build]]): self.ui_version = f"v{major[0]}.{minor[0]}.{micro[0]}-{build[0]}-{compatibility[0]}" self.logger.debug(f"UI VERSION: {self.ui_version}") def cmd_send_hd_request_ui_version(self) -> None: """ send HD request for UI version @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_UI_VERSION_INFO_REQUEST.value, payload=None) self.can_interface.send(message, 0) def cmd_send_hd_post(self, item: int, passed: bool, done: bool = False) -> None: """ send HD post message the single(item) or the final(done) @param item: the post state/item index @param passed: the post result single or final @param done: if this is the final post message this should be true @return: None """ payload = integer_to_bytearray(passed) if not done: payload += integer_to_bytearray(item) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_POST_FINAL_TEST_RESULT.value if done else MsgIds.MSG_ID_HD_POST_SINGLE_TEST_RESULT.value, payload=payload) self.can_interface.send(message, 0) # ------------------------------------------------ GENERAL MESSAGES ------------------------------------------------ def cmd_send_hd_general_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00) -> None: """ a general method to send any standard response message, by it's id and list of paramters. @param message_id: the id of the message @param accepted: the standard accepted parameter of any response message @param reason: the standard rejection reason parameter of any response message @param is_pure_data: The message only has data @param has_parameters: if the message has parameter this needs to be true. @param parameters_payload: the list of parameters pre-converted and ready to be concatenated to the payload. @return: None """ payload = "" if not is_pure_data: payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) if has_parameters: payload += integer_to_bytearray(parameters_payload) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_general_progress_data(self, message_id: int, total: int, countdown: int) -> None: """ a general method t send any standard progress data message, by it's id @param message_id: the id of the message @param total: the total value of the progress data @param countdown: the remaining or countdown value @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_ack(self, seq: int) -> None: """ sending hd ack message by the sequence seq @param seq: the message sequence number @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=GuiActionType.Acknow, seq=seq) self.can_interface.send(message, 0) def cmd_send_power_on_self_test_version_request(self) -> None: """ Sends the power on self test version request @return: None """ payload = bytearray() message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIdsDialin.MSG_DIALIN_ID_HD_VERSION_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_post_ui_version_compatibility(self, message: dict) -> None: """ Handles the UI's reporting of its version during the power on self tests @param message: The message data @return: None """ ui_major = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] ui_minor = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] ui_micro = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] ui_build = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] ui_compatibility = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.logger.debug("UI version power on self test received: " "Major: {0} " "Minor: {1} " "Micro: {2} " "Build: {3} " "Compat: {4} " .format(ui_major, ui_minor, ui_micro, ui_build, ui_compatibility))