########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_simulator.py # # @author (last) Peter Lucia # @date (last) 10-Nov-2020 # @author (original) Peter Lucia # @date (original) 06-Aug-2020 # ############################################################################ import enum from time import sleep from typing import List, Union import time from . import messageBuilder from .hd_simulator_alarms import HDAlarmsSimulator from ..common import * from ..protocols.CAN import (DenaliMessage, DenaliCanMessenger, DenaliChannels) from ..utils import * from ..utils.base import _AbstractSubSystem, _LogManager class HDSimulator(_AbstractSubSystem): NUM_TREATMENT_PARAMETERS = 18 instanceCount = 0 def __init__(self, can_interface:str="can0", log_level:bool= None, console_out:bool= False, passive_mode:bool=False): """ The HDSimulator constructor @param can_interface: (str) the can interface name @param log_level: (str) or (None) if not set, contains the logging level @param console_out: (bool) If True, write each dialin message to the console. """ super().__init__() HDSimulator.instanceCount = HDSimulator.instanceCount + 1 self._log_manager = _LogManager(log_level=log_level, log_filepath=self.__class__.__name__ + ".log") self.logger = self._log_manager.logger self.console_out = console_out self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, log_can=self._log_manager.log_level == "CAN_ONLY", console_out=console_out, passive_mode=passive_mode) self.can_interface.start() if self.can_interface is not None: channel_id = DenaliChannels.ui_to_hd_ch_id self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_INITIATE_TREATMENT_REQUEST.value, self._handler_ui_initiate_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_SET_UF_VOLUME_PARAMETER.value, self._handler_ui_pre_treatment_uf_request) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS.value, self._handler_ui_validate_parameters) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_USER_CONFIRM_TREATMENT_PARAMS.value, self._handler_ui_confirm_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_TX_END_CMD.value, self._handler_ui_end_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_HD_SET_RTC_REQUEST.value, self._handler_set_rtc_request) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_REQUEST_FW_VERSIONS.value, self._handler_request_hd_version) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIdsDialin.MSG_DIALIN_ID_UI_SYSTEM_USAGE_REQUEST.value, self._handler_system_usage_response) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIdsDialin.MSG_DIALIN_ID_UI_POST_REPORT_VERSION.value, self._handler_ui_post_ui_version_compatibility) self.alarms_simulator = HDAlarmsSimulator(self.can_interface, self.logger) self.treatment_parameter_rejections = TreatmentParameterRejections() def cmd_send_hd_post_single_result(self, result: int, test_num: int) -> None: """ Reports a single result for power on self test @param result: success or fail, where success = 1, 0 = fail @param test_num: the test index [0-n] @return: None """ payload = integer_to_bytearray(result) payload += integer_to_bytearray(test_num) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_POST_SINGLE_TEST_RESULT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_post_final_result(self, result: int) -> None: """ Reports a final result for power on self test @param result: success or fail, where success = 1, 0 = fail @return: None """ payload = integer_to_bytearray(result) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_POST_FINAL_TEST_RESULT.value, payload=payload) self.can_interface.send(message, 0) def _handler_system_usage_response(self, message: dict) -> None: """ Handles a request for system usage @param message: (dict) the message @return: None """ payload = integer_to_bytearray(1619628663) payload += integer_to_bytearray(1619887863) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIdsDialin.MSG_DIALIN_ID_HD_SYSTEM_USAGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_set_rtc_request(self, message: dict) -> None: """ Handles a UI request to set the HD RTC @param message: (dict) the message containing the request @return: None """ START_POS = DenaliMessage.PAYLOAD_START_INDEX END_POS = START_POS + 4 epoch = struct.unpack('i', bytearray( message['message'][START_POS:END_POS]))[0] self.logger.debug("Request to set the HD epoch to {0}".format(epoch)) self.cmd_send_set_rtc_response(YES, 0) def cmd_send_set_rtc_response(self, response, reason): """ Sends a set RTC response message @param response: (int) 0=NO, 1=YES @return: None """ self.logger.debug("HD: Sending response {0} reason {1}".format(response, reason)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_UI_SET_RTC_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_parameter_validation_response(self, rejections: List[RequestRejectReasons]): """ Sends a treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, enum.Enum) for each in rejections]): self.logger.error("Not all rejections are enums.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_treatment_parameter_manual_validation_response(self, rejections: List[int]): """ Sends a manually built treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, int) for each in rejections]): self.logger.error("Not all rejections are the correct type.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_poweroff_button_pressed(self): """ Broadcast that the poweroff button was pressed @return: None """ payload = bytearray(0x01) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_broadcast_poweroff_imminent(self): """ Broadcast that the system will shut down @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_POWER_OFF_WARNING.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_timeout(self): """ Sends a poweroff timeout @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_confirm_treatment(self, message): """ Handler function to detect when a treatment is confirmed @param message: the confirm treatment message @return: None """ START_POS = DenaliMessage.PAYLOAD_START_INDEX END_POS = START_POS + 4 request = struct.unpack('i', bytearray( message['message'][START_POS:END_POS]))[0] if request == 0: self.logger.debug("Received UI cancel confirmation of Treatment Parameters. ") return self.logger.debug("Received UI confirmation of Treatment Parameters. ") self.logger.debug("Priming ...") state = 0 total_seconds = 100 for seconds_remaining in range(total_seconds, -1, -1): if seconds_remaining % (total_seconds // 3) == 0: state += 1 self.cmd_send_priming_time_remaining(state, seconds_remaining, total_seconds) sleep(0.05) self.logger.debug("Finished priming.") def _handler_ui_pre_treatment_uf_request(self, message): """ Handles the ui pre treatment uf request and sends a response @param message: The ui pretreatment uf request message @return: None """ START_POS = DenaliMessage.PAYLOAD_START_INDEX END_POS = START_POS + 4 uf_volume = struct.unpack('f', bytearray( message['message'][START_POS:END_POS]))[0] self.logger.debug("Received UF Volume: {0} mL".format(uf_volume)) self.cmd_send_uf_treatment_response(1, 0, uf_volume) def _handler_ui_initiate_treatment(self, message): """ Handler function to start a treatment @param message: the start treatment message @return: None """ START_POS = DenaliMessage.PAYLOAD_START_INDEX END_POS = START_POS + 4 request = struct.unpack('i', bytearray( message['message'][START_POS:END_POS]))[0] if request == 0: self.logger.debug("Selecting treatment parameters") self.cmd_send_hd_operation_mode(HDOpModes.MODE_PRET.value) elif request == 1: self.logger.debug("Canceling treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_STAN.value) elif request == 2: self.logger.debug("Starting treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_TREA.value) self.cmd_initiate_treatment_response(YES, 0) def cmd_initiate_treatment_response(self, response: int, reason: int): """ Sends a start treatment response message @param response: 0=NO, 1=YES @return: None """ self.logger.debug("Sending: {0}".format(response)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_INITIATE_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_operation_mode(self, opMode, subMode): """ Broadcasts the current HD operation mode @param: (int) opMode @param: (int) subMode @return: None """ if not isinstance(opMode, int): raise ValueError("Provided mode is not of type 'int'") if not isinstance(subMode, int): raise ValueError("Provided mode is not of type 'int'") payload = integer_to_bytearray(opMode) payload += integer_to_bytearray(subMode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_uf_treatment_response(self, accepted, reason, volume): """ Sends the uf volume adjustment response message in pre-treatment :param accepted: (uint) the acceptance, 1 = accepted, 0 = rejected :param reason: (uint) the reason for rejection :param volume: (float) the acceptable/accepted ultrafiltration volume :return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_end_treatment_response(self): """ Sends an end treatment response @return: None """ payload = integer_to_bytearray(YES) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_validate_parameters(self, message): """ @param message: @return: """ rejections = [ self.treatment_parameter_rejections.param_request_valid, self.treatment_parameter_rejections.param_blood_flow_rate, self.treatment_parameter_rejections.param_dialysate_flow_rate, self.treatment_parameter_rejections.param_duration, self.treatment_parameter_rejections.param_heparin_stop_time, self.treatment_parameter_rejections.param_saline_bolus, self.treatment_parameter_rejections.param_acid_concentrate, self.treatment_parameter_rejections.param_bicarbonate_concentrate, self.treatment_parameter_rejections.param_dialyzer_type, self.treatment_parameter_rejections.param_blood_pressure_measure_interval, self.treatment_parameter_rejections.param_rinseback_flow_rate, self.treatment_parameter_rejections.param_arterial_pressure_limit_low, self.treatment_parameter_rejections.param_arterial_pressure_limit_high, self.treatment_parameter_rejections.param_venous_pressure_limit_low, self.treatment_parameter_rejections.param_venous_pressure_limit_high, self.treatment_parameter_rejections.param_heparin_dispensing_rate, self.treatment_parameter_rejections.param_heparin_bolus_volume, self.treatment_parameter_rejections.param_dialysate_temp ] self.cmd_send_treatment_parameter_validation_response(rejections) def test_started(self, test_name: str): """ Logs that a test was started @param test_name: The name of the test @return: None """ self.logger.info("Test Started: {0}".format(test_name)) def test_completed(self): """ Logs that a test was completed @return: None """ self.logger.info("Test Completed") def cmd_send_acknowledge_hd(self): """ the acknowledge from HD :return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_send_acknowledge_ui(self): """ the acknowledge from UI :return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.ui_to_hd_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_dialog(self): """ the message from HD to UI to show the power off dialog :return: none """ payload = ["A5", "01", "00", "01", "00", "01", "00", "38"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_hide_poweroff_dialog(self): """ the message from HD to UI to hide the power off dialog :return: none """ payload = ["A5", "01", "00", "01", "00", "01", "01", "09"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_notification_dialog(self): """ the message from HD to UI to show the shutting down notification box :return: none """ payload = ["A5", "01", "00", "0E", "00", "00", "24", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_sync_broadcast_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_rejection_dialog(self): """ the message from HD to UI to show the power off dialog :return: none """ payload = ["A5", "01", "00", "01", "00", "01", "02", "5A"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) @staticmethod def wait_for_message_to_be_sent(delay=0.050): """ After each multi-frame message put a 50ms sleep, time.sleep(0.1) it seems it's needed otherwise the test will check a value which has not been received yet. :@param delay: the number of seconds to wait :return: none """ time.sleep(delay) @staticmethod def build_hd_debug_text(vText): """ the debug text message from HD builder method :param vText: (str) the debug text :return: none """ message_length = 40 txt = messageBuilder.textToByte(vText, message_length) # + 1 null term msg = messageBuilder.buildMessage(GuiActionType.HDDebugText, 1 * (message_length + 1), False, txt) return messageBuilder.toFrames(msg) @staticmethod def cmd_set_hd_debug_text(debug_text): """ the debug text message from HD setter/sender method :param debug_text: (str) the debug text :return: none """ frames = HDSimulator.build_hd_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() @staticmethod def cmd_set_dg_debug_text(debug_text): """ the debug text message from DG setter/sender method :param debug_text: (str) the debug text :return: none """ frames = HDSimulator.build_dg_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '070#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() def cmd_set_treatment_parameter_ranges(self, vMinTreatmentDuration, vMaxTreatmentDuration, vMinUFVolume, vMaxUFVolume, vMinDialysateFlowRate, vMaxDialysateFlowRate): """ The Treatment adjustment parameter ranges data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(F32) | #5:(U32) | #6:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1A00| 0x020 | 6 | 1/60 Hz| Y | HD | UI | Treatment adjustment param ranges Data | \ref Data::mDuration_Min | \ref Data::mDuration_Max | \ref Data::mUltrafiltration_Volume_Min | \ref Data::mUltrafiltration_Volume_Max | \ref Data::mDialysate_Flow_Min | \ref Data::mDialysate_Flow_Max | :param vMinTreatmentDuration: (int) Min Treatment Duration :param vMaxTreatmentDuration: (int) Max Treatment Duration :param vMinUFVolume: (float) Min UF Volume :param vMaxUFVolume: (float) Max UF Volume :param vMinDialysateFlowRate: (int) Min Dialysate Flow Rate :param vMaxDialysateFlowRate: (int) Max Dialysate Flow Rate :return: None """ payload = integer_to_bytearray(vMinTreatmentDuration) payload += integer_to_bytearray(vMaxTreatmentDuration) payload += float_to_bytearray(vMinUFVolume) payload += float_to_bytearray(vMaxUFVolume) payload += integer_to_bytearray(vMinDialysateFlowRate) payload += integer_to_bytearray(vMaxDialysateFlowRate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_PARAM_CHANGE_RANGES.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_blood_flow_rate(self, vFlowSetPt, vMeasFlow, vRotSpd, vMotSpd, vMCSpd, vMCCurr, vPWM, vSigStrength): """ The Blood Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0500| 0x040 | 7 | 1 Hz | N | HD | All | Blood Flow Data | \ref Data::mFlowSetPoint | \ref Data::mMeasuredFlow | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDutyCycle | \ref Data::mSigStrenght | :param vFlowSetPt: (signed int) Flow Set Point :param vMeasFlow: (float) Measured Flow :param vRotSpd: (float) Rot Speed :param vMotSpd: (float) Motor Speed :param vMCSpd: (float) MC Speed :param vMCCurr: (float) MC Current :param vPWM: (float) PWM :param vSigStrength: (float) Signal strength in percent :return: None """ payload = integer_to_bytearray(vFlowSetPt) payload += float_to_bytearray(vMeasFlow) payload += float_to_bytearray(vRotSpd) payload += float_to_bytearray(vMotSpd) payload += float_to_bytearray(vMCSpd) payload += float_to_bytearray(vMCCurr) payload += float_to_bytearray(vPWM) payload += float_to_bytearray(vSigStrength) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_BLOOD_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_dialysate_flow_rate(self, vFlowSetPt, vMeasFlow, vRotSpd, vMotSpd, vMCSpd, vMCCurr, vPWM, vSigStrength): """ The Dialysate Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0800| 0x040 | 7 | 1 Hz | N | HD | All | Dialysate Flow Data | mFlowSetPoint | mMeasuredFlow | mRotorSpeed | mMotorSpeed | mMotorCtlSpeed | mMotorCtlCurrent | mPWMDutyCycle | \ref Data::mSigStrenght | :param vFlowSetPt: (signed int) Flow Set Point :param vMeasFlow: (float) Measured Flow :param vRotSpd: (float) Rot Speed :param vMotSpd: (float) Motor Speed :param vMCSpd: (float) MC Speed :param vMCCurr: (float) MC Current :param vPWM: (float) PWM :param vSigStrength: (float) Signal strength in percent :return: None """ payload = integer_to_bytearray(vFlowSetPt) payload += float_to_bytearray(vMeasFlow) payload += float_to_bytearray(vRotSpd) payload += float_to_bytearray(vMotSpd) payload += float_to_bytearray(vMCSpd) payload += float_to_bytearray(vMCCurr) payload += float_to_bytearray(vPWM) payload += float_to_bytearray(vSigStrength) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_blood_dialysate_response(self, accepted, reason, blood_rate, dialysate_flow_rate): """ The Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #4:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1800| 0x020 | 6 | Rsp | Y | HD | UI | Blood/dialysate rate change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mBloodRate | \ref Data::mDialysateRate | :param accepted: (int) boolean accept/reject response :param reason: (int) rejection reason :param blood_rate: (int) Blood Flow Rate :param dialysate_flow_rate: (int) Dialysate Flow Rate :return: None """ if not isinstance(accepted, int): accepted = int(accepted) if not isinstance(reason, int): reason = int(reason) if not isinstance(blood_rate, int): blood_rate = int(blood_rate) if not isinstance(dialysate_flow_rate, int): dialysate_flow_rate = int(dialysate_flow_rate) payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(blood_rate) payload += integer_to_bytearray(dialysate_flow_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_BLOOD_DIAL_RATE_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_duration_response(self, vAccepted, vReason, vDuration, vUltrafiltration): """ The Treatment Duration change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1B00| 0x020 | 6 | Rsp | Y | HD | UI | Treatment Duration change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mDuration | \ref Data::mUFVolume | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vDuration: (int) Treatment Duration :param vUltrafiltration: (float) Ultrafiltration Volume :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += integer_to_bytearray(vDuration) payload += float_to_bytearray(vUltrafiltration) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_TREATMENT_TIME_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_state_response(self, vAccepted, vReason, vState): """ the Treatment ultrafiltration adjustment response message setter/sender method :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vState: (int) Ultrafiltration State :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += integer_to_bytearray(vState) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_accepted(self, vState): """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept true :return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Accepted, 0, vState) def cmd_set_treatment_adjust_ultrafiltration_rejected(self, vReason, vState): """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept false :return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Rejected, vReason, vState) def cmd_set_treatment_adjust_ultrafiltration_init_response(self, vAccepted, vReason, vVolume): """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |0x5000| 0x020 | 6 | Rsp | Y | HD | UI | Pre UF Volume Adjustment Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vVolume: (float) Ultrafiltration Volume :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += float_to_bytearray(vVolume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_response(self, vAccepted, vReason, vVolume, vDuration, vDurationDiff, vRate, vRateDiff, vRateOld): """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | #6:(U32) | #7:(U32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1300| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | \ref Data::mDurationDiff | \ref Data::mRateDiff | \ref Data::mRateOld | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vVolume: (float) Ultrafiltration Volume :param vDuration: (int) Treatment Duration :param vDurationDiff: (int) Duration Difference :param vRate: (float) Ultrafiltration Rate :param vRateDiff: (float) Ultrafiltration Rate Difference :param vRateOld: (float) Ultrafiltration Rate Old :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += float_to_bytearray(vVolume) payload += integer_to_bytearray(vDuration) payload += integer_to_bytearray(vDurationDiff) payload += float_to_bytearray(vRate) payload += float_to_bytearray(vRateDiff) payload += float_to_bytearray(vRateOld) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_rejected(self, vReason): """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_edit_response which only sends a rejection reason and sends other values all as zero :param vReason: (int) rejection reason :return: none """ self.cmd_set_treatment_adjust_ultrafiltration_edit_response(0, vReason, 0, 0, 0, 0, 0, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_response(self, vAccepted, vReason, vVolume, vDuration, vRate): """ the ultrafiltration volume Change Confirmation Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x2E00| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Confirmation Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vVolume: (float) Ultrafiltration Volume :param vDuration: (int) Treatment Duration :param vRate: (float) Ultrafiltration Rate :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += float_to_bytearray(vVolume) payload += integer_to_bytearray(vDuration) payload += float_to_bytearray(vRate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_CONFIRMATION_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_rejected(self, vReason): """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_confirm_response which only sends a rejection reason and sends other values all as zero :param vReason: (int) rejection reason :return: none """ self.cmd_set_treatment_adjust_ultrafiltration_confirm_response(0, vReason, 0, 0, 0) def cmd_set_treatment_time(self, vSecsTotal, vSecsElap, vSecsRem=None): """ the Treatment Time Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x0D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Time Data | \ref Data::mTotal | \ref Data::mElapsed | \ref Data::mRemaining | :param vSecsTotal: (int) Treatment Total Duration in Seconds :param vSecsElap: (int) Treatment Total Elapsed Time in Seconds :param vSecsRem: (int) Treatment Remaining Time in Seconds :return: none """ if vSecsRem is None: vSecsRem = vSecsTotal - vSecsElap payload = integer_to_bytearray(vSecsTotal) payload += integer_to_bytearray(vSecsElap) payload += integer_to_bytearray(vSecsRem) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_TIME.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_ultrafiltration_outlet_flow_data(self, vRefUFVol, vMeasUFVol, vRotSpd, vMotSpd, vMCSpd, vMCCurr, vPWM): """ the Outlet Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0B00| 0x040 | 7 | 1 Hz | N | HD | All | Outlet Flow Data | \ref Data::mRefUFVol | \ref Data::mMeasUFVol | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDtCycle | :param vRefUFVol: (float) Ref UF Volume :param vMeasUFVol: (float) Measured UF Volume :param vRotSpd: (float) Rot Speed :param vMotSpd: (float) Motor Speed :param vMCSpd: (float) MC Speed :param vMCCurr: (float) MC Current :param vPWM: (float) PWM :return: none """ payload = float_to_bytearray(vRefUFVol) payload += float_to_bytearray(vMeasUFVol) payload += float_to_bytearray(vRotSpd) payload += float_to_bytearray(vMotSpd) payload += float_to_bytearray(vMCSpd) payload += float_to_bytearray(vMCCurr) payload += float_to_bytearray(vPWM) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_OUT_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_pressure_occlusion_data(self, vArterialPressure, vVenousPressure, vBloodPumpOcclusion, vDialysateInletPumpOcclusion, vDialysateOutletPumpOcclusion): """ the Pressure/Occlusion Data messages setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x0900| 0x040 | 7 | 1 Hz | N | HD | All | PressureOcclusion Data | \ref Data::mArterialPressure | \ref Data::mVenousPressure | \ref Data::mBloodPumpOcclusion | \ref Data::mDialysateInletPumpOcclusion | \ref Data::mDialysateOutletPumpOcclusion | :param vArterialPressure: (float) Arterial Pressure :param vVenousPressure: (float) Venous Pressure :param vBloodPumpOcclusion: (uint) Blood Pump Occlusion :param vDialysateInletPumpOcclusion: (uint) Dialysate Inlet Pump Occlusion :param vDialysateOutletPumpOcclusion: (uint) Dialysate Outlet Pump Occlusion :return: none """ payload = float_to_bytearray(vArterialPressure) payload += float_to_bytearray(vVenousPressure) payload += integer_to_bytearray(vBloodPumpOcclusion) payload += integer_to_bytearray(vDialysateInletPumpOcclusion) payload += integer_to_bytearray(vDialysateOutletPumpOcclusion) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRESSURE_OCCLUSION_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_states_data(self, vSubMode, vUFState, vSalineState, vHeparingState, vRinsebackState, vRecirculateState, vBloodPrimeState, vTreatmentEndState, vTreammentStopState): """ the Treatment States Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x0F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment States Data | | #1:(U32) | #2:(U32) | #3:(U32) | #4:(U32) | |:--: |:--: |:--: |:--: | | \ref Data::mSubMode | \ref Data::mUFState | \ref Data::mSalineState | \ref Data::mHeparinState | |||| | #5:(U32) | #6:(U32) ||| |:--: |:--: ||| | \ref Data::mRinsebackState | \ref Data::mRecirculateState ||| |||| | #7:(U32) | #8:(U32) | #9:(U32) || |:--: |:--: |:--: || | \ref Data::vBloodPrimeState | \ref Data::mTreatmentEndState | \ref Data::vTreammentStopState || :param vSubMode: (int) Sub-Mode :param vUFState: (int) UF State :param vSalineState: (int) Saline Bolus State :param vHeparingState: (int) Saline Bolus State :param vRinsebackState: (int) Rinseback State :param vRecirculateState: (int) Recirculate State :param vBloodPrimeState: (int) Blood Prime State :param vTreatmentEndState: (int) Treatment End State :param vTreammentStopState: (int) Treatment Stop State :return: none """ payload = integer_to_bytearray(vSubMode) payload += integer_to_bytearray(vUFState) payload += integer_to_bytearray(vSalineState) payload += integer_to_bytearray(vHeparingState) payload += integer_to_bytearray(vRinsebackState) payload += integer_to_bytearray(vRecirculateState) payload += integer_to_bytearray(vBloodPrimeState) payload += integer_to_bytearray(vTreatmentEndState) payload += integer_to_bytearray(vTreammentStopState) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_STATE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_start_state(self): """ starting the treatment for user convenience since Tx is not by default running :return: none """ self.cmd_set_treatment_states_data(TXStates.TREATMENT_DIALYSIS_STATE, TXStates.UF_OFF_STATE, TXStates.SALINE_BOLUS_STATE_IDLE, TXStates.HEPARIN_STATE_OFF) def cmd_set_hd_operation_mode_data(self, operation_mode: int, operation_sub_mode: int) -> None: """ The HD Operation Mode Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x2500| 0x040 | 7 | 1 Hz | N | HD | All | HD Operation Mode Data | \ref Data::mOpMode | :param operation_mode: (int) Operation Mode :param operation_sub_mode: (int) Operation Sub-Mode :return: None """ payload = integer_to_bytearray(operation_mode) payload += integer_to_bytearray(operation_sub_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_saline_bolus_data(self, vTarget, vCumulative, vDelivered): """ the Treatment Saline Bolus Data message sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(F32) | #3:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Saline Bolus Data | \ref Data::mTarget | \ref Data::mCumulative | \ref Data::mDelivered | :param vTarget: (int) Saline Bolus Target Volume :param vCumulative: (float) Saline Bolus Cumulative Volume :param vDelivered: (float) Saline Bolus Delivered Volume :return: none """ payload = integer_to_bytearray(vTarget) payload += float_to_bytearray(vCumulative) payload += float_to_bytearray(vDelivered) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_SALINE_BOLUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_saline_bolus_response(self, vAccepted, vReason, vTarget, vState): """ the Saline Bolus Response message sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | #3:(U32) | #3:(U32) | |:---:|:------:|:-----:|:----:|:---:|:---:|:----:|:---------------------:|:--------------------:|:-------------------:|:-------------------:|:-------------------:| | 20 | 0x020 | 6 | Rsp | Y | HD | UI | Saline Bolus Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mTarget | \ref Data::mState | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vTarget: (int) Saline Bolus Target Volume :param vState: (int) Saline Bolus current State :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += integer_to_bytearray(vTarget) payload += integer_to_bytearray(vState) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_SALINE_BOLUS_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_canbus_fault_count(self, count): """ the CANBus fault count message setter/sender method :param count: (int) Fault Count :return: none """ payload = integer_to_bytearray(count) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_CAN_ERROR_COUNT.value, payload=payload) self.can_interface.send(message, 0) HDSimulator.wait_for_message_to_be_sent() def cmd_send_unknown_hd(self): """ the unknown message from HD setter/sender method :return: none """ payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_UNUSED.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_heparin_data(self, vCumulative): """ the Treatment Heparin Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x4D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Heparin Data | \ref Data::mCumulative | :param vCumulative: (float) Heparin Cumulative Volume :return: none """ payload = float_to_bytearray(vCumulative) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_DATA_BROADCAST.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_heparin_pause_resume_response(self, vAccepted, vReason, vState): """ the Heparin Response message setter/sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:-----:|:----:|:---:|:---:|:----:|:----------------:|:--------------------:|:-------------------:|:-------------------:| |0x4C00| 0x020 | 6 | Rsp | Y | HD | UI | Heparin Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mState | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vState: (int) Heparin current State :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += integer_to_bytearray(vState) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_pressures_limit_response(self, vAccepted, vReason, vArterialLow, vArterialHigh, vVenousLow, vVenousHigh): """ the Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(S32) | #4:(S32) | #3:(S32) | #4:(S32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x4700| 0x020 | 6 | Rsp | Y | HD | UI | A/V BP Limit Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mArterialLow | \ref Data::mArterialHigh | \ref Data::mVenousLow | \ref Data::mVenousHigh | :param vAccepted: (int) boolean accept/reject response :param vReason: (int) rejection reason :param vArterialLow: (int) Arterial Pressure Limit Low (mmHg) :param vArterialHigh: (int) Arterial Pressure Limit High (mmHg) :param vVenousLow: (int) Venous Pressure Limit Low (mmHg) :param vVenousHigh: (int) Venous Pressure Limit High (mmHg) :return: none """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) payload += integer_to_bytearray(vArterialLow) payload += integer_to_bytearray(vArterialHigh) payload += integer_to_bytearray(vVenousLow) payload += integer_to_bytearray(vVenousHigh) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PRESSURE_LIMITS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_rinseback_response(self, vAccepted, vReason): """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5300| 0x020 | 6 | Rsp | Y | HD | UI | Rinseback State Change Response | \ref Data::mAccepted | \ref Data::mReason | :param vAccepted: (int) boolean accept/reject response :param vReason : (int) rejection reason :return: None """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_rinseback_data(self, vTarget, vCurrent, vRate, vTimeoutTotal, vTimeoutCountDown): """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x5600| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | :param vTarget : (float) the target volume in mL :param vCurrent : (float) the current volume in mL :param vRate : (uint ) the current flow rate in mL/min :param vTimeoutTotal : (uint ) Total Timeout :param vTimeoutCountDown: (uint ) Current Timeout count down :return: None """ payload = float_to_bytearray(vTarget) payload += float_to_bytearray(vCurrent) payload += integer_to_bytearray(vRate) payload += integer_to_bytearray(vTimeoutTotal) payload += integer_to_bytearray(vTimeoutCountDown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_recirculate_data(self,vTimeoutTotal, vTimeoutCountDown): """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5A00| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | :param vTimeoutTotal : (uint ) Total Timeout :param vTimeoutCountDown: (uint ) Current Timeout count down :return: None """ payload = integer_to_bytearray(vTimeoutTotal) payload += integer_to_bytearray(vTimeoutCountDown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_blood_prime_data(self, vTarget, vCurrent): """ the bloodprime state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x5900| 0x020 | 6 | 1Hz | N | HD | UI | bloodprime progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | :param vTarget : (float) the target volume in mL :param vCurrent : (float) the current volume in mL :return: None """ payload = float_to_bytearray(vTarget) payload += float_to_bytearray(vCurrent) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_PRIME_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_recirculate_response(self, vAccepted, vReason): """ the recirculate state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5500| 0x020 | 6 | Rsp | Y | HD | UI | Recirculate State Change Response | \ref Data::mAccepted | \ref Data::mReason | :param vAccepted: (int) boolean accept/reject response :param vReason : (int) rejection reason :return: None """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_end_treatment(self, message): """ Handler function when received a request to end a treatment @param message: (dict) the end treatment request @return: None """ self.logger.debug("End treatment requested") request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if request == 0: self.logger.debug("Request to start rinseback") self.cmd_send_treatment_adjust_end_response(vAccepted=YES, vReason=0) else: self.logger.debug("End treatment unknown request") def cmd_send_treatment_adjust_end_response(self, vAccepted, vReason): """ the treatment end state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5800| 0x020 | 6 | Rsp | Y | HD | UI | Treatment End State Change Response | \ref Data::mAccepted | \ref Data::mReason | :param vAccepted: (int) boolean accept/reject response :param vReason : (int) rejection reason :return: None """ payload = integer_to_bytearray(vAccepted) payload += integer_to_bytearray(vReason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_accelerometer_hd_data(self, vX, vY, vZ, vXMax, vYMax, vZMax, vXTilt, vYTilt, vZTilt ): """ the accelerometer hd data message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x3300| 0x040 | 7 | 1Hz | N | HD | UI | HD Accelerometer data | | #1:(F32) | #2:(F32) | #3:(U32) | |:--: |:--: |:--: | | \ref Data::mX | \ref Data::mY | \ref Data::mX | | #4:(F32) | #5:(F32) | #6:(U32) | |:--: |:--: |:--: | | \ref Data::mXMax | \ref Data::mYMax | \ref Data::mXMax | | #7:(F32) | #8:(F32) | #9:(U32) | |:--: |:--: |:--: | | \ref Data::mXTilt | \ref Data::mYTilt | \ref Data::mXTilt | :param vX: x axis :param vY: y axis :param vZ: z axis :param vXMax: x axis max :param vYMax: y axis max :param vZMax: z axis max :param vXTilt: x axis tilt :param vYTilt: y axis tilt :param vZTilt: z axis tilt :return: None """ payload = float_to_bytearray(vX) payload += float_to_bytearray(vY) payload += float_to_bytearray(vZ) payload += float_to_bytearray(vXMax) payload += float_to_bytearray(vYMax) payload += float_to_bytearray(vZMax) payload += float_to_bytearray(vXTilt) payload += float_to_bytearray(vYTilt) payload += float_to_bytearray(vZTilt) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_ACCELEROMETER_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_request_hd_version(self, message: dict): """ Handles a request for the HD version @param message: (dict) the received message @return: None """ self.logger.debug("Handling request for hd version.") self.cmd_send_version_hd_data(9, 9, 9, 9, 9, 9, 9, 9) self.cmd_send_hd_serial_number() def cmd_send_hd_serial_number(self): """ Sends the hd serial number response @return: None """ self.logger.debug("Sending hd serial number...") payload = bytearray('0123456789\0', encoding="utf-8") message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERIAL_NUMBER.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_version_hd_data(self, vMajor, vMinor, vMicro, vBuild, vFPGA_id, vFPGA_Major, vFPGA_Minor, vFPGA_Lab ): """ the hd version response message method :param vMajor: Major version number :param vMinor: Minor version number :param vMicro: Micro version number :param vBuild: Build version number :param vFPGA_id: FPGA id version number :param vFPGA_Major: FPGA Major version number :param vFPGA_Minor: FPGA Minor version number :param vFPGA_Lab: FPGA Lab version number :return: None """ payload = byte_to_bytearray(vMajor) payload += byte_to_bytearray(vMinor) payload += byte_to_bytearray(vMicro) payload += short_to_bytearray(vBuild) payload += byte_to_bytearray(vFPGA_id) payload += byte_to_bytearray(vFPGA_Major) payload += byte_to_bytearray(vFPGA_Minor) payload += byte_to_bytearray(vFPGA_Lab) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_VERSION.value, payload=payload) self.can_interface.send(message, 0) def alarm(self) -> HDAlarmsSimulator: """ Gets the alarm simulator object @return: (HDAlarmsSimulator) the alarms simulator """ return self.alarms_simulator def cmd_send_pre_treatment_state_data(self, sub_mode, water_sample_state, consumables_self_test_state, no_cartridge_self_test_state, installation_state, dry_self_test_state, prime_state, recirculate_state, patient_connection_state): """ sends the broadcast message of the pre-treatment states :param sub_mode : (U32) the main pre treatment state :param water_sample_state : (U32) water sample state :param consumables_self_test_state : (U32) consumables self test state :param no_cartridge_self_test_state : (U32) no cartridge self-test state :param installation_state : (U32) installation state :param dry_self_test_state : (U32) dry self-test state :param prime_state : (U32) prime state :param recirculate_state : (U32) recirculate state :param patient_connection_state : (U32) patient connection state :return: """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(water_sample_state) payload += integer_to_bytearray(consumables_self_test_state) payload += integer_to_bytearray(no_cartridge_self_test_state) payload += integer_to_bytearray(installation_state) payload += integer_to_bytearray(dry_self_test_state) payload += integer_to_bytearray(prime_state) payload += integer_to_bytearray(recirculate_state) payload += integer_to_bytearray(patient_connection_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRE_TREATMENT_STATE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_water_sample_response(self, accepted, reason ): """ send the pretreatment water sample response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SAMPLE_WATER_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_no_cartridge_progress_data(self, total, countdown): """ send the pretreatment no cartridge self-tests progress data :param accepted: (U32) Total time in second :param reason: (U32) count down time in second :return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NO_CART_SELF_TEST_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_dry_progress_data(self, total, countdown): """ send the pretreatment dry self-tests progress data :param accepted: (U32) Total time in second :param reason: (U32) count down time in second :return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DRY_SELF_TEST_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_disposables_prime_progress_data(self, timeout, countdown): """ Broadcasts the progress time data of pre-treatment disposables priming to the UI :param timeout : (int) the total progress time timeout in seconds :param countdown: (int) the remaining time in seconds :return: None """ payload = bytearray() payload += integer_to_bytearray(timeout) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_PRIMING_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_prime_start_response(self, accepted, reason ): """ send the pretreatment prime start response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_PRIME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_continue_to_treament_response(self, accepted, reason ): """ send the pretreatment continue to treatment response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_patient_connection_confirm_response(self, accepted, reason ): """ send the pretreatment patient connection confirm response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PATIENT_CONNECTION_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_disposable_removal_confirm_response(self, accepted, reason ): """ send post treatment disposable removal confirm response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISPOSABLE_REMOVAL_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_disposable_removal_confirm_response(self, accepted, reason ): """ send post treatment disposable removal confirm response :param accepted: (U32) accept or reject :param reason: (U32) rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISPOSABLE_REMOVAL_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_log_response(self, accepted: bool, reason: int, bood_flow_rate: int, dialysate_flow_rate: int, treatment_duration: int, actual_treatment_duration: int, acid_concentrate_type: int, bicarbonate_concentrate_type: int, potassium_concentration: int, calcium_concentration: int, bicarbonate_concentration: int, sodium_concentration: int, dialysate_temperature: float, dialyzer_type: int, treatment_date_time: int, average_blood_flow: float, average_dialysate_flow: float, dialysate_volume_used: float, average_dialysate_temp: float, target_uf_volume: float, actual_uf_volume: float, target_uf_rate: float, actual_uf_rate: float, saline_bolus_volume: int, heparin_type: int, heparin_concentration: int, heparin_bolus_volume: float, heparin_dispense_rate: float, heparin_pre_stop: int, heparin_delivered_volume: float, average_arterial_pressure: float, average_venous_pressure: float, end_treatment_early_alarm: int, device_id: int, water_sample_test_result: int ) -> None: """ send post treatment log response :param accepted: true if accpeted :param reason: the rejection reason :param bood_flow_rate: bood flow rate :param dialysate_flow_rate: dialysate flow rate :param treatment_duration: treatment duration :param actual_treatment_duration: actual treatment duration :param acid_concentrate_type: acid concentrate type :param bicarbonate_concentrate_type: bicarbonate concentrate type :param potassium_concentration: potassium concentration :param calcium_concentration: calcium concentration :param bicarbonate_concentration: bicarbonate concentration :param sodium_concentration: sodium concentration :param dialysate_temperature: dialysate temperature :param dialyzer_type: dialyzer type :param treatment_date_time: treatment date time :param average_blood_flow: average blood flow :param average_dialysate_flow: average dialysate flow :param dialysate_volume_used: dialysate volume used :param average_dialysate_temp: average dialysate temp :param target_uf_volume: target uf volume :param actual_uf_volume: actual uf volume :param target_uf_rate: target uf rate :param actual_uf_rate: actual uf rate :param saline_bolus_volume: saline bolus volume :param heparin_type: heparin type :param heparin_concentration: heparin concentration :param heparin_bolus_volume: heparin bolus volume :param heparin_dispense_rate: heparin dispense rate :param heparin_pre_stop: heparin pre stop :param heparin_delivered_volume: heparin delivered volume :param average_arterial_pressure: average arterial pressure :param average_venous_pressure: average venous pressure :param end_treatment_early_alarm: end treatment early alarm :param device_id: device id :param water_sample_test_result: water sample test result :return: """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += unsigned_to_bytearray(int(bood_flow_rate)) payload += unsigned_to_bytearray(int(dialysate_flow_rate)) payload += unsigned_to_bytearray(int(treatment_duration)) payload += unsigned_to_bytearray(int(actual_treatment_duration)) payload += unsigned_to_bytearray(int(acid_concentrate_type)) payload += unsigned_to_bytearray(int(bicarbonate_concentrate_type)) payload += unsigned_to_bytearray(int(potassium_concentration)) payload += unsigned_to_bytearray(int(calcium_concentration)) payload += unsigned_to_bytearray(int(bicarbonate_concentration)) payload += unsigned_to_bytearray(int(sodium_concentration)) payload += float_to_bytearray(float(dialysate_temperature)) payload += unsigned_to_bytearray(int(dialyzer_type)) payload += unsigned_to_bytearray(int(treatment_date_time)) payload += float_to_bytearray(float(average_blood_flow)) payload += float_to_bytearray(float(average_dialysate_flow)) payload += float_to_bytearray(float(dialysate_volume_used)) payload += float_to_bytearray(float(average_dialysate_temp)) payload += float_to_bytearray(float(target_uf_volume)) payload += float_to_bytearray(float(actual_uf_volume)) payload += float_to_bytearray(float(target_uf_rate)) payload += float_to_bytearray(float(actual_uf_rate)) payload += unsigned_to_bytearray(int(saline_bolus_volume)) payload += unsigned_to_bytearray(int(heparin_type)) payload += unsigned_to_bytearray(int(heparin_concentration)) payload += float_to_bytearray(float(heparin_bolus_volume)) payload += float_to_bytearray(float(heparin_dispense_rate)) payload += unsigned_to_bytearray(int(heparin_pre_stop)) payload += float_to_bytearray(float(heparin_delivered_volume)) payload += float_to_bytearray(float(average_arterial_pressure)) payload += float_to_bytearray(float(average_venous_pressure)) payload += unsigned_to_bytearray(int(end_treatment_early_alarm)) payload += unsigned_to_bytearray(int(device_id)) payload += unsigned_to_bytearray(int(water_sample_test_result)) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_DATA_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_data(self, bloodFlowRate: int, dialysateFlowRate: int, ufRate: float, arterialPressure: float, venousPressure: float) -> None: """ send the treatment log data :param bloodFlowRate: (U32) blood flow rate :param dialysateFlowRate: (U32) Dialysate Flow Rate :param ufRate: (F32) UF Rate :param arterialPressure: (F32) Arterial Pressure :param venousPressure: (F32) Venous Pressure :return: None """ payload = integer_to_bytearray(bloodFlowRate) payload += integer_to_bytearray(dialysateFlowRate) payload += float_to_bytearray(ufRate) payload += float_to_bytearray(arterialPressure) payload += float_to_bytearray(venousPressure) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_PERIODIC_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_alarm(self, alarmID: int, parameter1: float, parameter2: float) -> None: """ send the treatment log data :param alarmID: (U32) alarm ID :param parameter1: (F32) paramter 1 (it's not clear yet how many paramters with what type is required and this is only plceholder) :param parameter2: (F32) paramter 2 (it's not clear yet how many paramters with what type is required and this is only plceholder) :return: None """ payload = integer_to_bytearray(alarmID) payload += float_to_bytearray(parameter1) payload += float_to_bytearray(parameter2) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_ALARM_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_event(self, eventID: int, oldValue: float, newValue: float) -> None: """ send the treatment log data :param alarmID: (U32) alarm ID :param oldValue: (F32) the old value :param newValue: (F32) the new value :return: none """ payload = integer_to_bytearray(eventID) payload += float_to_bytearray(oldValue) payload += float_to_bytearray(newValue) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfect_response(self, accepted: bool, reason: int) -> None: """ the HD response to the request from UI to initiate a disinfection/flush :param accepted: boolean accepted or rejected :param reason: the rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id= DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISINFECT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfect_chemical_confirm(self, accepted: bool, reason: int) -> None: """ the HD response to the UI sending the user chimical disinfection steps confirm. :param accepted: boolean accepted or rejected :param reason: the rejection reason :return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id= DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_CHEM_DISINFECT_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) # ------------------------------------------------ GENERAL MESSAGES ------------------------------------------------ def cmd_send_general_hd_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00) -> None: """ a general method to send any standard response message, by it's id and list of paramters. :param message_id: the id of the message :param accepted: the standard accepted parameter of any response message :param reason: the standard rejection reason parameter of any response message :param has_parameters: if the message has parameter this needs to be true. :param parameters_payload: the list of parameters pre-converted and ready to be concatenated to the payload. :return: None """ if not is_pure_data: payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) if has_parameters: payload += parameters_payload else: if has_parameters: payload = parameters_payload message = DenaliMessage.build_message(channel_id= DenaliChannels.hd_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_general_hd_progress_data(self, message_id: int, total: int, countdown: int) -> None: """ a general method t send any standard progress data message, by it's id :param message_id: the id of the message :param total: the total value of the progress data :param countdown: the remaining or countdown value :return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id= DenaliChannels.hd_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_ack_send_hd(self, seq: int) -> None: """ sending hd ack message by the sequence seq :param seq: the message sequence number :return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=GuiActionType.Acknow, seq=seq) self.can_interface.send(message, 0) def cmd_send_power_on_self_test_version_request(self) -> None: """ Sends the power on self test version request @return: None """ payload = bytearray() message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIdsDialin.MSG_DIALIN_ID_HD_VERSION_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_post_ui_version_compatibility(self, message: dict) -> None: """ Handles the UI's reporting of its version during the power on self tests @param message: The message data @return: None """ ui_major = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] ui_minor = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] ui_micro = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] ui_build = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] ui_compatibility = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.logger.debug("UI version power on self test received: " "Major: {0} " "Minor: {1} " "Micro: {2} " "Build: {3} " "Compat: {4} " .format(ui_major, ui_minor, ui_micro, ui_build, ui_compatibility))