########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_simulator.py # # @author (last) Michael Garthwaite # @date (last) 22-Apr-2024 # @author (original) Peter Lucia # @date (original) 06-Aug-2020 # ############################################################################ import enum from time import sleep from typing import Callable from inspect import signature from . import messageBuilder from ..common import * from ..protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from ..utils import * from ..utils.base import AbstractSubSystem, LogManager class HDSimulator(AbstractSubSystem): NUM_TREATMENT_PARAMETERS = 19 instance_count = 0 # UI version message field positions START_POS_MAJOR = DenaliMessage.PAYLOAD_START_INDEX END_POS_MAJOR = START_POS_MAJOR + 1 START_POS_MINOR = END_POS_MAJOR END_POS_MINOR = START_POS_MINOR + 1 START_POS_MICRO = END_POS_MINOR END_POS_MICRO = START_POS_MICRO + 1 START_POS_BUILD = END_POS_MICRO END_POS_BUILD = START_POS_BUILD + 2 START_POS_COMPAT = END_POS_BUILD END_POS_COMPAT = START_POS_COMPAT + 4 def __init__(self, can_interface: str = "can0", log_level: bool = None, console_out: bool = False, passive_mode: bool = True, auto_response: bool = False): """ The HDSimulator constructor @param can_interface: (str) the can interface name @param log_level: (str) or (None) if not set, contains the logging level @param console_out: (bool) If True, write each dialin message to the console. """ super().__init__() HDSimulator.instance_count = HDSimulator.instance_count + 1 self.auto_response = auto_response self._log_manager = LogManager(log_level=log_level, log_filepath=self.__class__.__name__ + ".log") self.logger = self._log_manager.logger self.console_out = console_out self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, console_out=console_out, passive_mode=passive_mode) self.can_interface.start() self.ui_initiate_treatment_req_timestamp = 0.0 self.ui_set_uf_volume_parameter_timestamp = 0.0 self.ui_new_treatment_parameters_timestamp = 0.0 self.ui_user_confirm_treatment_parameters_timestamp = 0.0 self.ui_tx_end_cmd_timestamp = 0.0 self.ui_hd_set_rtc_req_timestamp = 0.0 self.ui_fw_versions_req_timestamp = 0.0 self.ui_service_info_req_timestamp = 0.0 self.ui_version_info_response_timestamp = 0.0 self.ui_version_info_compatibility_timestamp = 0.0 if self.can_interface is not None: channel_id = DenaliChannels.ui_to_hd_ch_id if auto_response: self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_INITIATE_TREATMENT_REQUEST.value, self._handler_ui_initiate_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_SET_UF_VOLUME_PARAMETER_REQUEST.value, self._handler_ui_pre_treatment_uf_request) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS_REQUEST.value, self._handler_ui_validate_parameters) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_USER_CONFIRM_TREATMENT_PARAMS_REQUEST.value, self._handler_ui_confirm_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_TX_END_CMD_REQUEST.value, self._handler_ui_end_treatment) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_HD_SET_RTC_REQUEST.value, self._handler_set_rtc_request) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_FW_VERSIONS_REQUEST.value, self._handler_request_hd_version) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_UI_SERVICE_INFO_REQUEST.value, self._handler_system_usage_response) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIds.MSG_ID_UI_VERSION_INFO_RESPONSE.value, self._handler_ui_post_ui_version_compatibility) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIds.MSG_ID_UI_VERSION_INFO_RESPONSE.value, self._handler_ui_version) self.treatment_parameter_rejections = TreatmentParameterRejections() # initialize variables that will be populated by UI version response self.ui_version = None def add_publication(self, channel_id: DenaliChannels, message_id: MsgIds, function_ptr: Callable) -> None: """ Allows later addition of publication to the HDSimulator @param channel_id: (DenaliChannels) the channel id of the message @param message_id: (MsgIds) the message id @param function_ptr: (Callable) the pointer to the message handler function @return: None """ if self.auto_response: if channel_id > 0 and message_id != MsgIds.MSG_ID_UNUSED and function_ptr is not None: self.can_interface.register_receiving_publication_function(channel_id, message_id, function_ptr) else: self.logger.debug("rejected publication registration {0}, {1}, {2}".format(channel_id, message_id, function_ptr)) def set_ui_all_publication(self, function_ptr: Callable) -> None: """ Allows later addition of publication to the HDSimulator This function needs improvements, it has been implemented to quickly being used by Development Testing team. @param function_ptr: (Callable) the pointer to the message handler function @return: None """ function_signature_exp = "(message:dict)->None" # TODO: update later to get param name and type. if not callable(function_ptr): print("ui all publication rejected (not a function)") self.logger.debug("ui all publication rejected (not a function)") else: function_signature_act = str(signature(function_ptr)).replace(" ", "") if function_signature_act == function_signature_exp: self.can_interface.register_received_all_ui_publication_function(function_ptr) else: print("ui all publication rejected {0},{1}" .format(function_signature_exp, function_signature_act)) self.logger.debug("rejected ui all messages publication registration, expected function signature {0}, got {1} " .format(function_signature_exp, function_signature_act)) def get_ui_version(self): """ Gets the ui version @return: The ui version """ return self.ui_version def _handler_system_usage_response(self, message, timestamp=0.0) -> None: """ Handles a request for system usage @return: None """ self.ui_service_info_req_timestamp = timestamp payload = integer_to_bytearray(1619628663) payload += integer_to_bytearray(1619887863) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERVICE_SCHEDULE_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_set_rtc_request(self, message: dict, timestamp=0.0) -> None: """ Handles a UI request to set the HD RTC @param message: (dict) the message containing the request @return: None """ epoch = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.ui_hd_set_rtc_req_timestamp = timestamp self.logger.debug("Request to set the HD epoch to {0}".format(epoch)) self.cmd_send_set_rtc_response(YES, 0) def cmd_send_set_rtc_response(self, response, reason): """ Sends a set RTC response message @param response: (int) 0=NO, 1=YES @param reason: the rejection reason @return: None """ self.logger.debug("HD: Sending response {0} reason {1}".format(response, reason)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_UI_SET_RTC_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_parameter_validation_response(self, rejections: List[RequestRejectReasons]): """ Sends a treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, enum.Enum) for each in rejections]): self.logger.error("Not all rejections are enums.") return False payload = integer_to_bytearray(0) for rejection in rejections: payload += integer_to_bytearray(rejection.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_treatment_parameter_manual_validation_response(self, rejections: List[int]): """ Sends a manually built treatment parameter validation response @param rejections: A list of rejection code enums @return: True if successful, False otherwise """ if len(rejections) != self.NUM_TREATMENT_PARAMETERS: self.logger.error("Invalid number of treatment parameter enums provided.") return False if not all([isinstance(each, int) for each in rejections]): self.logger.error("Not all rejections are the correct type.") return False payload = bytearray() for rejection in rejections: payload += integer_to_bytearray(rejection) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NEW_TREATMENT_PARAMS_RESPONSE.value, payload=payload) # Send message self.can_interface.send(message, 0) return True def cmd_send_poweroff_command(self): """ Broadcast that the poweroff command @return: None """ payload = byte_to_bytearray(PowerOffCommands.PW_COMMAND_OPEN.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_timeout(self): """ Sends a poweroff timeout @return: None """ payload = byte_to_bytearray(PowerOffCommands.PW_TIMEOUT_CLOSE.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_reject(self): """ Sends a poweroff reject @return: None """ payload = byte_to_bytearray(PowerOffCommands.PW_REJECT_SHOW.value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_OFF_BUTTON_PRESS_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_poweroff_imminent(self): """ Broadcast that the system will shut down @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_POWER_OFF_WARNING.value, payload=None) self.can_interface.send(message, 0) def _handler_ui_confirm_treatment(self, message, timestamp=0.0): """ Handler function to detect when a treatment is confirmed @param message: the confirm treatment message @return: None """ request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.ui_user_confirm_treatment_parameters_timestamp = timestamp if request == 0: self.logger.debug("Received UI cancel confirmation of Treatment Parameters. ") return self.logger.debug("Received UI confirmation of Treatment Parameters. ") self.logger.debug("Priming ...") state = 0 total_seconds = 100 for seconds_remaining in range(total_seconds, -1, -1): if seconds_remaining % (total_seconds // 3) == 0: state += 1 self.cmd_send_priming_time_remaining(state, seconds_remaining, total_seconds) sleep(0.05) self.logger.debug("Finished priming.") def _handler_ui_pre_treatment_uf_request(self, message, timestamp=0.0): """ Handles the ui pre treatment uf request and sends a response @param message: The ui pretreatment uf request message @return: None """ uf_volume = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.logger.debug("Received UF Volume: {0} mL".format(uf_volume)) self.ui_set_uf_volume_parameter_timestamp = timestamp self.cmd_send_uf_treatment_response(1, 0, uf_volume) def _handler_ui_initiate_treatment(self, message,timestamp=0.0): """ Handler function to start a treatment @param message: the start treatment message @return: None """ request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if request == 0: self.logger.debug("Selecting treatment parameters") self.cmd_send_hd_operation_mode(HDOpModes.MODE_PRET.value) elif request == 1: self.logger.debug("Canceling treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_STAN.value) elif request == 2: self.logger.debug("Starting treatment") self.cmd_send_hd_operation_mode(HDOpModes.MODE_TREA.value) self.ui_initiate_treatment_req_timestamp = timestamp self.cmd_initiate_treatment_response(YES, 0) def cmd_initiate_treatment_response(self, response: int, reason: int): """ Sends a start treatment response message @param response: 0=NO, 1=YES @param reason: the rejection reason @return: None """ self.logger.debug("Sending: {0}".format(response)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_INITIATE_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_operation_mode(self, op_mode: int, sub_mode: int = 0): """ Broadcasts the current HD operation mode @param op_mode: hd operation mode @param sub_mode: hd operation sub-mode @return: None """ if not isinstance(op_mode, int): raise ValueError("Provided mode is not of type 'int'") if not isinstance(sub_mode, int): raise ValueError("Provided mode is not of type 'int'") payload = integer_to_bytearray(op_mode) payload += integer_to_bytearray(sub_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_uf_treatment_response(self, accepted, reason, volume): """ Sends the uf volume adjustment response message in pre-treatment @param accepted: (uint) the acceptance, 1 = accepted, 0 = rejected @param reason: (uint) the reason for rejection @param volume: (float) the acceptable/accepted ultrafiltration volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_end_treatment_response(self): """ Sends an end treatment response @return: None """ payload = integer_to_bytearray(YES) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_validate_parameters(self, message, timestamp=0.0) -> None: """ handler for UI parameters validation @return: None """ rejections = [ self.treatment_parameter_rejections.param_request_valid, self.treatment_parameter_rejections.param_blood_flow_rate, self.treatment_parameter_rejections.param_dialysate_flow_rate, self.treatment_parameter_rejections.param_duration, self.treatment_parameter_rejections.param_heparin_stop_time, self.treatment_parameter_rejections.param_saline_bolus, self.treatment_parameter_rejections.param_acid_concentrate, self.treatment_parameter_rejections.param_bicarbonate_concentrate, self.treatment_parameter_rejections.param_dialyzer_type, self.treatment_parameter_rejections.param_blood_pressure_measure_interval, self.treatment_parameter_rejections.param_rinseback_flow_rate, self.treatment_parameter_rejections.param_arterial_pressure_limit_low, self.treatment_parameter_rejections.param_arterial_pressure_limit_high, self.treatment_parameter_rejections.param_venous_pressure_limit_low, self.treatment_parameter_rejections.param_venous_pressure_limit_high, self.treatment_parameter_rejections.param_heparin_dispensing_rate, self.treatment_parameter_rejections.param_heparin_bolus_volume, self.treatment_parameter_rejections.param_dialysate_temp ] self.ui_new_treatment_parameters_timestamp = timestamp self.cmd_send_treatment_parameter_validation_response(rejections) def test_started(self, test_name: str): """ Logs that a test was started @param test_name: The name of the test @return: None """ self.logger.info("Test Started: {0}".format(test_name)) def test_completed(self): """ Logs that a test was completed @return: None """ self.logger.info("Test Completed") def cmd_send_acknowledge_hd(self): """ the acknowledge from HD @return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_send_acknowledge_ui(self): """ the acknowledge from UI @return: none """ payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.ui_to_hd_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_dialog(self): """ the message from HD to UI to show the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "00", "38"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_hide_poweroff_dialog(self): """ the message from HD to UI to hide the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "01", "09"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_notification_dialog(self): """ the message from HD to UI to show the shutting down notification box @return: none """ payload = ["A5", "01", "00", "0E", "00", "00", "24", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_sync_broadcast_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_show_poweroff_rejection_dialog(self): """ the message from HD to UI to show the power off dialog @return: none """ payload = ["A5", "01", "00", "01", "00", "01", "02", "5A"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, "message": payload} self.can_interface.send(message, 0) @staticmethod def wait_for_message_to_be_sent(delay=0.050): """ After each multi-frame message put a 50ms sleep, time.sleep(0.1) it seems it's needed otherwise the test will check a value which has not been received yet. :@param delay: the number of seconds to wait @return: none """ time.sleep(delay) @staticmethod def build_hd_debug_text(vtext): """ the debug text message from HD builder method @param vtext: (str) the debug text @return: none """ message_length = 40 txt = messageBuilder.textToByte(vtext, message_length) # + 1 null term msg = messageBuilder.buildMessage(GuiActionType.HDDebugText, 1 * (message_length + 1), False, txt) return messageBuilder.toFrames(msg) @staticmethod def cmd_set_hd_debug_text(debug_text): """ the debug text message from HD setter/sender method @param debug_text: (str) the debug text @return: none """ frames = HDSimulator.build_hd_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() @staticmethod def cmd_set_dg_debug_text(debug_text): """ the debug text message from DG setter/sender method @param debug_text: (str) the debug text @return: none """ frames = HDSimulator.build_dg_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '070#{}'.format(frame)]) HDSimulator.wait_for_message_to_be_sent() def cmd_set_treatment_parameter_ranges(self, min_treatment_duration: int, max_treatment_duration: int, min_uf_volume: float, max_uf_volume: float, min_dialysate_flow_rate: int, max_dialysate_flow_rate: int) -> None: """ The Treatment adjustment parameter ranges data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(F32) | #5:(U32) | #6:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1A00| 0x020 | 6 | 1/60 Hz| Y | HD | UI | Treatment adjustment param ranges Data | \ref Data::mDuration_Min | \ref Data::mDuration_Max | \ref Data::mUltrafiltration_Volume_Min | \ref Data::mUltrafiltration_Volume_Max | \ref Data::mDialysate_Flow_Min | \ref Data::mDialysate_Flow_Max | @param min_treatment_duration: (int) Min Treatment Duration @param max_treatment_duration: (int) Max Treatment Duration @param min_uf_volume: (float) Min UF Volume @param max_uf_volume: (float) Max UF Volume @param min_dialysate_flow_rate: (int) Min Dialysate Flow Rate @param max_dialysate_flow_rate: (int) Max Dialysate Flow Rate @return: None """ payload = integer_to_bytearray(min_treatment_duration) payload += integer_to_bytearray(max_treatment_duration) payload += float_to_bytearray(min_uf_volume) payload += float_to_bytearray(max_uf_volume) payload += integer_to_bytearray(min_dialysate_flow_rate) payload += integer_to_bytearray(max_dialysate_flow_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_PARAM_CHANGE_RANGES_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_blood_flow_rate(self, flow_set_pt: int, measured_flow: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float, rotor_count: int, pres_flow: int, rotor_hall: int) -> None: """ The Blood Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0500| 0x040 | 7 | 1 Hz | N | HD | All | Blood Flow Data | \ref Data::mFlowSetPoint | \ref Data::mMeasuredFlow | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDutyCycle | \ref Data::mRotorCount | @param flow_set_pt: (int) Flow Set Point @param measured_flow: (float) Measured Flow @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @param rotor_count: (int) Rotor Count @param pres_flow: (int) Prescribed flow rate @param rotor_hall: (int) Rotor hall sensor @return: None """ payload = integer_to_bytearray(flow_set_pt) payload += float_to_bytearray(measured_flow) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) payload += integer_to_bytearray(rotor_count) payload += integer_to_bytearray(pres_flow) payload += integer_to_bytearray(rotor_hall) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_BLOOD_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_dialysate_flow_rate(self, flow_set_pt: int, measured_flow: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float, rotor_count: int, pres_flow: int, rotor_hall: int) -> None: """ The Dialysate Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(S32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0800| 0x040 | 7 | 1 Hz | N | HD | All | Dialysate Flow Data | mFlowSetPoint | mMeasuredFlow | mRotorSpeed | mMotorSpeed | mMotorCtlSpeed | mMotorCtlCurrent | mPWMDutyCycle | @param flow_set_pt: (signed int) Flow Set Point @param measured_flow: (float) Measured Flow @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @param rotor_count: (int) Rotor Count @param pres_flow: (int) Prescribed flow rate @param rotor_hall: (int) Rotor hall sensor @return: None """ payload = integer_to_bytearray(flow_set_pt) payload += float_to_bytearray(measured_flow) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) payload += integer_to_bytearray(rotor_count) payload += integer_to_bytearray(pres_flow) payload += integer_to_bytearray(rotor_hall) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_blood_dialysate_response(self, accepted: int, reason: int, blood_rate: int, dialysate_flow_rate: int) -> None: """ The Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #4:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1800| 0x020 | 6 | Rsp | Y | HD | UI | Blood/dialysate rate change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mBloodRate | \ref Data::mDialysateRate | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param blood_rate: (int) Blood Flow Rate @param dialysate_flow_rate: (int) Dialysate Flow Rate @return: None """ if not isinstance(accepted, int): accepted = int(accepted) if not isinstance(reason, int): reason = int(reason) if not isinstance(blood_rate, int): blood_rate = int(blood_rate) if not isinstance(dialysate_flow_rate, int): dialysate_flow_rate = int(dialysate_flow_rate) payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(blood_rate) payload += integer_to_bytearray(dialysate_flow_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_BLOOD_DIAL_RATE_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_duration_response(self, accepted: int, reason: int, duration: int, ultrafiltration: float) -> None: """ The Treatment Duration change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x1B00| 0x020 | 6 | Rsp | Y | HD | UI | Treatment Duration change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mDuration | \ref Data::mUFVolume | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param duration: (int) Treatment Duration @param ultrafiltration: (float) Ultrafiltration Volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(duration) payload += float_to_bytearray(ultrafiltration) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_TREATMENT_TIME_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_state_response(self, accepted: int, reason: int, state: int) -> None: """ the Treatment ultrafiltration adjustment response message setter/sender method @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param state: (int) Ultrafiltration State @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_accepted(self, state: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept true @param state: (int) Ultrafiltration State @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Accepted, 0, state) def cmd_set_treatment_adjust_ultrafiltration_rejected(self, reason: int, state: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_state_response which sends accept false @param reason: (int) rejection reason @param state: (int) Ultrafiltration State @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_state_response(EResponse.Rejected, reason, state) def cmd_set_treatment_adjust_ultrafiltration_init_response(self, accepted: int, reason: int, volume: float) -> None: """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |0x5000| 0x020 | 6 | Rsp | Y | HD | UI | Pre UF Volume Adjustment Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SET_UF_VOLUME_PARAMETER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_response(self, accepted: int, reason: int, volume: float, duration: int, duration_diff: int, rate: float, rate_diff: float, rate_old: float) -> None: """ the ultrafiltration volume change response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | #6:(U32) | #7:(U32) | #8:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x1300| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | \ref Data::mDurationDiff | \ref Data::mRateDiff | \ref Data::mRateOld | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @param duration: (int) Treatment Duration @param duration_diff: (int) Duration Difference @param rate: (float) Ultrafiltration Rate @param rate_diff: (float) Ultrafiltration Rate Difference @param rate_old: (float) Ultrafiltration Rate Old @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) payload += integer_to_bytearray(duration) payload += integer_to_bytearray(duration_diff) payload += float_to_bytearray(rate) payload += float_to_bytearray(rate_diff) payload += float_to_bytearray(rate_old) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_edit_rejected(self, reason: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_edit_response which only sends a rejection reason and sends other values all as zero @param reason: (int) rejection reason @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_edit_response(0, reason, 0, 0, 0, 0, 0, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_response(self, accepted: int, reason: int, volume: float, duration: int, rate: float) -> None: """ the ultrafiltration volume Change Confirmation Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(F32) | #4:(U32) | #5:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x2E00| 0x020 | 6 | Rsp | Y | HD | UI | UF Vol. Change Confirmation Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mVolume | \ref Data::mDuration | \ref Data::mRate | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param volume: (float) Ultrafiltration Volume @param duration: (int) Treatment Duration @param rate: (float) Ultrafiltration Rate @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += float_to_bytearray(volume) payload += integer_to_bytearray(duration) payload += float_to_bytearray(rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_CONFIRMATION_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_adjust_ultrafiltration_confirm_rejected(self, reason: int) -> None: """ a convenient method for cmd_set_treatment_adjust_ultrafiltration_confirm_response which only sends a rejection reason and sends other values all as zero @param reason: (int) rejection reason @return: none """ self.cmd_set_treatment_adjust_ultrafiltration_confirm_response(0, reason, 0, 0, 0) def cmd_set_treatment_time(self, sec_total: int, sec_elapsed: int, sec_remain: int = 0) -> None: """ the Treatment Time Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x0D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Time Data | \ref Data::mTotal | \ref Data::mElapsed | \ref Data::mRemaining | @param sec_total: (int) Treatment Total Duration in Seconds @param sec_elapsed: (int) Treatment Total Elapsed Time in Seconds @param sec_remain: (int) Treatment Remaining Time in Seconds @return: none """ if sec_remain is None: sec_remain = sec_total - sec_elapsed payload = integer_to_bytearray(sec_total) payload += integer_to_bytearray(sec_elapsed) payload += integer_to_bytearray(sec_remain) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_TIME_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_ultrafiltration_outlet_flow_data(self, ref_uf_vol: float, measured_uf_vol: float, rot_speed: float, mot_speed: float, mc_speed: float, mc_current: float, pwm: float, dop_corr_offset: float, dop_calc_rate: float, uf_calc_rate: float, rotor_hall: int, current_set_uf_rate: float) -> None: """ the Outlet Flow Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: |:--: | |0x0B00| 0x040 | 7 | 1 Hz | N | HD | All | Outlet Flow Data | \ref Data::mRefUFVol | \ref Data::mMeasUFVol | \ref Data::mRotorSpeed | \ref Data::mMotorSpeed | \ref Data::mMotorCtlSpeed | \ref Data::mMotorCtlCurrent | \ref Data::mPWMDtCycle | @param ref_uf_vol: (float) Ref UF Volume @param measured_uf_vol: (float) Measured UF Volume @param rot_speed: (float) Rot Speed @param mot_speed: (float) Motor Speed @param mc_speed: (float) MC Speed @param mc_current: (float) MC Current @param pwm: (float) PWM @param dop_corr_offset: (float) correction offset @param dop_calc_rate: (float) calculated dop flow rate @param uf_calc_rate: (float) calculated uf flow rate @param rotor_hall: (int) rotor hall sensor @param current_set_uf_rate: (float) set UF rate @return: none """ payload = float_to_bytearray(ref_uf_vol) payload += float_to_bytearray(measured_uf_vol) payload += float_to_bytearray(rot_speed) payload += float_to_bytearray(mot_speed) payload += float_to_bytearray(mc_speed) payload += float_to_bytearray(mc_current) payload += float_to_bytearray(pwm) payload += float_to_bytearray(dop_corr_offset) payload += float_to_bytearray(dop_calc_rate) payload += float_to_bytearray(uf_calc_rate) payload += integer_to_bytearray(rotor_hall) payload += float_to_bytearray(current_set_uf_rate) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DIALYSATE_OUT_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_pressure_occlusion_data(self, arterial_prs: float, venous_prs: float, blood_pump_occlusion: int, pressure_state: int, art_min_limit: int, art_max_limit: int, ven_min_limit: int, ven_max_limit: int , filtered_arterial_prs: float, filtered_venous_prs : float ) -> None: """ the Pressure/Occlusion Data messages setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: | |0x0900| 0x040 | 7 | 1 Hz | N | HD | All | PressureOcclusion Data | \ref Data::mArterialPressure | \ref Data::mVenousPressure | \ref Data::mBloodPumpOcclusion | \ref Data::mDialysateInletPumpOcclusion | \ref Data::mDialysateOutletPumpOcclusion | @param arterial_prs: (float) Arterial Pressure @param venous_prs: (float) Venous Pressure @param blood_pump_occlusion: (uint) Blood Pump Occlusion @param pressure_state: (uint) Pressure state @param art_min_limit: (int) arterial minimum limit @param art_max_limit: (int) arterial maximum limit @param ven_min_limit: (int) venous minimum limit @param ven_max_limit: (int) venous maximum limit # @param filtered_arterial_prs: (float) filtered arterial pressure # @param filtered_venous_prs: (float) filtered venous pressure @return: none """ payload = float_to_bytearray(arterial_prs) payload += float_to_bytearray(venous_prs) payload += integer_to_bytearray(blood_pump_occlusion) payload += integer_to_bytearray(pressure_state) payload += integer_to_bytearray(art_min_limit) payload += integer_to_bytearray(art_max_limit) payload += integer_to_bytearray(ven_min_limit) payload += integer_to_bytearray(ven_max_limit) payload += float_to_bytearray(filtered_arterial_prs) payload += float_to_bytearray(filtered_venous_prs) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRESSURE_OCCLUSION_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_states_data(self, sub_mode: int, uf_state: int, saline_state: int, heparin_state: int, rinseback_state: int, recirculate_state: int, blood_prime_state: int, treatment_end_state: int, treatment_stop_state: int, dialysis_state: int) -> None: """ the Treatment States Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x0F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment States Data | | #1:(U32) | #2:(U32) | #3:(U32) || |:--: |:--: |:--: || | \ref Data::mSubMode | \ref Data::mUFState | \ref Data::mSalineState || |||| | #4:(U32) | #5:(U32) | #6:(U32) || |:--: |:--: |:--: || | \ref Data::mHeparinState | \ref Data::mRinsebackState | \ref Data::mRecirculateState || |||| | #7:(U32) | #8:(U32) | #9:(U32) | |:--: |:--: |:--: | | \ref Data::vBloodPrimeState | \ref Data::mTreatmentEndState | \ref Data::mTreammentStopState | | #9:(U32) || |:--: || | \ref Data::mDialysisState || @param sub_mode: (int) Sub-Mode @param uf_state: (int) UF State @param saline_state: (int) Saline Bolus State @param heparin_state: (int) Saline Bolus State @param rinseback_state: (int) Rinseback State @param recirculate_state: (int) Recirculate State @param blood_prime_state: (int) Blood Prime State @param treatment_end_state: (int) Treatment End State @param treatment_stop_state: (int) Treatment Stop State @param dialysis_state: (int) Dialysis State @return: none """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(uf_state) payload += integer_to_bytearray(saline_state) payload += integer_to_bytearray(heparin_state) payload += integer_to_bytearray(rinseback_state) payload += integer_to_bytearray(recirculate_state) payload += integer_to_bytearray(blood_prime_state) payload += integer_to_bytearray(treatment_end_state) payload += integer_to_bytearray(treatment_stop_state) payload += integer_to_bytearray(dialysis_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_TREATMENT_STATE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_start_state(self) -> None: """ starting the treatment for user convenience since Tx is not by default running @return: none """ self.cmd_set_treatment_states_data(TXStates.TREATMENT_DIALYSIS_STATE, TXStates.UF_OFF_STATE, TXStates.SALINE_BOLUS_STATE_IDLE, TXStates.HEPARIN_STATE_OFF, TXStates.RINSEBACK_STOP_INIT_STATE, TXStates.TREATMENT_RECIRC_RECIRC_STATE, TXStates.BLOOD_PRIME_RAMP_STATE, TXStates.TREATMENT_END_WAIT_FOR_RINSEBACK_STATE, TXStates.TREATMENT_STOP_RECIRC_STATE) def cmd_set_hd_operation_mode_data(self, operation_mode: int, operation_sub_mode: int) -> None: """ The HD Operation Mode Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x2500| 0x040 | 7 | 1 Hz | N | HD | All | HD Operation Mode Data | \ref Data::mOpMode | @param operation_mode: (int) Operation Mode @param operation_sub_mode: (int) Operation Sub-Mode @return: None """ payload = integer_to_bytearray(operation_mode) payload += integer_to_bytearray(operation_sub_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_OP_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_saline_bolus_data(self, target: int, cumulative: float, delivered: float) -> None: """ the Treatment Saline Bolus Data message sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(F32) | #3:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2F00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Saline Bolus Data | \ref Data::mTarget | \ref Data::mCumulative | \ref Data::mDelivered | @param target: (int) Saline Bolus Target Volume @param cumulative: (float) Saline Bolus Cumulative Volume @param delivered: (float) Saline Bolus Delivered Volume @return: none """ payload = integer_to_bytearray(target) payload += float_to_bytearray(cumulative) payload += float_to_bytearray(delivered) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_SALINE_BOLUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_saline_bolus_response(self, accepted: int, reason: int, target: int) -> None: """ the Saline Bolus Response message sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:---:|:------:|:-----:|:----:|:---:|:---:|:----:|:---------------------:|:--------------------:|:-------------------:|:-------------------:| | 20 | 0x020 | 6 | Rsp | Y | HD | UI | Saline Bolus Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mTarget | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param target: (int) Saline Bolus Target Volume @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(target) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_USER_SALINE_BOLUS_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_canbus_fault_count(self, count: int) -> None: """ the CANBus fault count message setter/sender method @param count: (int) Fault Count @return: none """ payload = integer_to_bytearray(count) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_CAN_ERROR_COUNT.value, payload=payload) self.can_interface.send(message, 0) HDSimulator.wait_for_message_to_be_sent() def cmd_send_unknown_hd(self) -> None: """ the unknown message from HD setter/sender method @return: none """ payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_UNUSED.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_treatment_heparin_data(self, cumulative: float, required: float) -> None: """ the Treatment Heparin Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #1:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x4D00| 0x040 | 7 | 1 Hz | N | HD | All | Treatment Heparin Data | \ref Data::mCumulative | \ref Data::mRequired | @param cumulative: (float) Heparin Cumulative Volume @param required: (float) Heparin Volume required for treatment @return: none """ payload = float_to_bytearray(cumulative) payload += float_to_bytearray(required) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_heparin_pause_resume_response(self, accepted: int, reason: int) -> None: """ the Heparin Response message setter/sender method | MSG | CAN ID | M.Box | Type | Ack | Src | Dest | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:-----:|:----:|:---:|:---:|:----:|:----------------:|:--------------------:|:-------------------:| |0x4C00| 0x020 | 6 | Rsp | Y | HD | UI | Heparin Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_PAUSE_RESUME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_pressures_limit_response(self, accepted: int, reason: int, art_pressure_limit_window: int, ven_pressure_limit_window: int, ven_pressure_asym_limit_window: int) -> None: """ the Blood/dialysate rate change Response message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(S32) | #4:(S32) | #3:(S32) | #4:(S32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x4700| 0x020 | 6 | Rsp | Y | HD | UI | A/V BP Limit Change Response | \ref Data::mAccepted | \ref Data::mReason | \ref Data::mArterialLow | \ref Data::mArterialHigh | \ref Data::mVenousLow | \ref Data::mVenousHigh | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @param art_pressure_limit_window: (int) Arterial Pressure Limit window (mmHg) @param ven_pressure_limit_window: (int) Venous Pressure Limit window (mmHg) @param ven_pressure_asym_limit_window: (int) Venous Pressure asymmetric limit window (mmHg) @return: none """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(art_pressure_limit_window) payload += integer_to_bytearray(ven_pressure_limit_window) payload += integer_to_bytearray(ven_pressure_asym_limit_window) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PRESSURE_LIMITS_CHANGE_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_rinseback_response(self, accepted: int, reason: int) -> None: """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5300| 0x020 | 6 | Rsp | Y | HD | UI | Rinseback State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason : (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_rinseback_data(self, target_vol: float, current_vol: float, flow_rate: int, timeout: int, timeout_countdown: int, is_completed: bool) -> None: """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(U32) | #4:(U32) | #5:(U32) | #6:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: |:--: |:--: | |0x5600| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | \ref Data::is_completed | @param target_vol : (float) the target volume in mL @param current_vol : (float) the current volume in mL @param flow_rate : (uint ) the current flow rate in mL/min @param timeout : (uint ) Total Timeout @param timeout_countdown : (uint ) Current Timeout count down @param is_completed : (bool ) Is Rinseback completed. @return: None """ payload = float_to_bytearray(target_vol) payload += float_to_bytearray(current_vol) payload += integer_to_bytearray(flow_rate) payload += integer_to_bytearray(timeout) payload += integer_to_bytearray(timeout_countdown) payload += integer_to_bytearray(is_completed) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RINSEBACK_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_recirculate_data(self, timeout_total: int, timeout_count_down: int) -> None: """ the rinseback state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5A00| 0x020 | 6 | 1Hz | N | HD | UI | Rinseback progress data | \ref Data::mTimeoutTotal | \ref Data::mTimeoutCountDown | @param timeout_total: (int) Total Timeout @param timeout_count_down: (int) Current Timeout count down @return: None """ payload = integer_to_bytearray(timeout_total) payload += integer_to_bytearray(timeout_count_down) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_PROGRESS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_blood_prime_data(self, target: float, current: float): """ the bloodprime state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x5900| 0x020 | 6 | 1Hz | N | HD | UI | bloodprime progress data | \ref Data::mTarget | \ref Data::mCurrent | \ref Data::mRate | @param target : (float) the target volume in mL @param current : (float) the current volume in mL @return: None """ payload = float_to_bytearray(target) payload += float_to_bytearray(current) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_PRIME_PROGRESS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_adjust_recirculate_response(self, accepted: int, reason: int) -> None: """ the recirculate state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5500| 0x020 | 6 | Rsp | Y | HD | UI | Recirculate State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason : (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RECIRC_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_end_treatment(self, message: dict, timestamp=0.0) -> None: """ Handler function when received a request to end a treatment @param message: (dict) the end treatment request @return: None """ self.logger.debug("End treatment requested") request = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.ui_tx_end_cmd_timestamp = timestamp if request == 0: self.logger.debug("Request to start rinseback") self.cmd_send_treatment_adjust_end_response(accepted=YES, reason=0) else: self.logger.debug("End treatment unknown request") def cmd_send_treatment_adjust_end_response(self, accepted: int, reason: int): """ the treatment end state change Response message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x5800| 0x020 | 6 | Rsp | Y | HD | UI | Treatment End State Change Response | \ref Data::mAccepted | \ref Data::mReason | @param accepted: (int) boolean accept/reject response @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TX_END_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_accelerometer_hd_data(self, x: float, y: float, z: float, x_max: float, y_max: float, z_max: float, x_tilt: float, y_tilt: float, z_tilt: float) -> None: """ the accelerometer hd data message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x3300| 0x040 | 7 | 1Hz | N | HD | UI | HD Accelerometer data | | #1:(F32) | #2:(F32) | #3:(U32) | |:--: |:--: |:--: | | \ref Data::mX | \ref Data::mY | \ref Data::mX | | #4:(F32) | #5:(F32) | #6:(U32) | |:--: |:--: |:--: | | \ref Data::mXMax | \ref Data::mYMax | \ref Data::mXMax | | #7:(F32) | #8:(F32) | #9:(U32) | |:--: |:--: |:--: | | \ref Data::mXTilt | \ref Data::mYTilt | \ref Data::mXTilt | @param x: float - x axis @param y: float - y axis @param z: float - z axis @param x_max: float - x axis max @param y_max: float - y axis max @param z_max: float - z axis max @param x_tilt: float - x axis tilt @param y_tilt: float - y axis tilt @param z_tilt: float - z axis tilt @return: None """ payload = float_to_bytearray(x) payload += float_to_bytearray(y) payload += float_to_bytearray(z) payload += float_to_bytearray(x_max) payload += float_to_bytearray(y_max) payload += float_to_bytearray(z_max) payload += float_to_bytearray(x_tilt) payload += float_to_bytearray(y_tilt) payload += float_to_bytearray(z_tilt) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_ACCELEROMETER_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_request_hd_version(self,message,timestamp=0.0) -> None: """ Handles a request for the HD version @return: None """ self.ui_fw_versions_req_timestamp = timestamp self.logger.debug("Handling request for hd version.") self.cmd_send_version_hd_data(9, 9, 9, 9, 9, 9, 9, 9) self.cmd_send_hd_serial_number() def cmd_send_hd_serial_number(self) -> None: """ Sends the hd serial number response @return: None """ self.logger.debug("Sending hd serial number...") payload = bytearray('0123456789\0', encoding="utf-8") message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERIAL_NUMBER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_version_hd_data(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int) -> None: """ [0x1D00] # 29 0x040 Rsp Y HD All HD f/w version U08=Major U08=Minor U08=Micro U16=Build U08=FPGA ID U08=FPGA Major U08=FPGA Minor U08=FPGA Lab U32=compatibility rev --- the dg version response message method @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ payload = unsigned_byte_to_bytearray(major) payload += unsigned_byte_to_bytearray(minor) payload += unsigned_byte_to_bytearray(micro) payload += unsigned_short_to_bytearray(build) payload += unsigned_byte_to_bytearray(fpga_id) payload += unsigned_byte_to_bytearray(fpga_major) payload += unsigned_byte_to_bytearray(fpga_minor) payload += unsigned_byte_to_bytearray(fpga_lab) payload += unsigned_integer_to_bytearray(compatibility_rev) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_VERSION_REPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_serial_hd_data(self, serial: str): """ the hd version serial response message method @param serial: serial number @return: None """ payload = bytes(serial, 'ascii') + b'\x00' message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SERIAL_NUMBER_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_state_data(self, sub_mode: int, water_sample_state: int, consumables_self_test_state: int, no_cartridge_self_test_state: int, installation_state: int, dry_self_test_state: int, prime_state: int, recirculate_state: int, patient_connection_state: int, wet_selftests_state: int, pretreatment_rsrvr_state: int ) -> None: """ sends the broadcast message of the pre-treatment states @param sub_mode : (int) the main pre treatment state @param water_sample_state : (int) water sample state @param consumables_self_test_state : (int) consumables self test state @param no_cartridge_self_test_state : (int) no cartridge self-test state @param installation_state : (int) installation state @param dry_self_test_state : (int) dry self-test state @param prime_state : (int) prime state @param recirculate_state : (int) recirculate state @param patient_connection_state : (int) patient connection state @param wet_selftests_state : (int) wet selftest state @param pretreatment_rsrvr_state : (int) reservoir state @return: """ payload = integer_to_bytearray(sub_mode) payload += integer_to_bytearray(water_sample_state) payload += integer_to_bytearray(consumables_self_test_state) payload += integer_to_bytearray(no_cartridge_self_test_state) payload += integer_to_bytearray(installation_state) payload += integer_to_bytearray(dry_self_test_state) payload += integer_to_bytearray(prime_state) payload += integer_to_bytearray(recirculate_state) payload += integer_to_bytearray(patient_connection_state) payload += integer_to_bytearray(wet_selftests_state) payload += integer_to_bytearray(pretreatment_rsrvr_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_PRE_TREATMENT_STATE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_water_sample_response(self, accepted: int, reason: int) -> None: """ send the pretreatment water sample response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_SAMPLE_WATER_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_no_cartridge_progress_data(self, total: int, countdown: int) -> None: """ send the pretreatment no cartridge self-tests progress data @param total: (int) Total time in second @param countdown: (int) count down time in second @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_NO_CART_SELF_TEST_PROGRESS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_self_test_dry_progress_data(self, total: int, countdown: int) -> None: """ send the pretreatment dry self-tests progress data @param total: (int) Total time in second @param countdown: (int) count down time in second @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DRY_SELF_TEST_PROGRESS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_disposables_prime_progress_data(self, timeout: int, countdown: int) -> None: """ Broadcasts the progress time data of pre-treatment disposables priming to the UI @param timeout : (int) the total progress time timeout in seconds @param countdown: (int) the remaining time in seconds @return: None """ payload = bytearray() payload += integer_to_bytearray(timeout) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_PRIMING_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_prime_start_response(self, accepted: int, reason: int) -> None: """ send the pretreatment prime start response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_PRIME_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_continue_to_treament_response(self, accepted: int, reason: int) -> None: """ send the pretreatment continue to treatment response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_START_TREATMENT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_pre_treatment_patient_connection_confirm_response(self, accepted: int, reason: int) -> None: """ send the pretreatment patient connection confirm response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_PATIENT_CONNECTION_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_disposable_removal_confirm_response(self, accepted: int, reason: int) -> None: """ send post treatment disposable removal confirm response @param accepted: (int) accept or reject @param reason: (int) rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISPOSABLE_REMOVAL_CONFIRM_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_post_treatment_log_response(self, accepted: bool, reason: int, bood_flow_rate: int, dialysate_flow_rate: int, treatment_duration: int, actual_treatment_duration: int, acid_concentrate_type: int, bicarbonate_concentrate_type: int, potassium_concentration: int, calcium_concentration: int, bicarbonate_concentration: int, sodium_concentration: int, dialysate_temperature: float, dialyzer_type: int, treatment_start_date_time: int, treatment_end_date_time: int, average_blood_flow: float, average_dialysate_flow: float, dialysate_volume_used: float, average_dialysate_temp: float, origin_uf_volume: float, target_uf_volume: float, actual_uf_volume: float, origin_uf_rate: float, target_uf_rate: float, actual_uf_rate: float, saline_bolus_volume: int, heparin_bolus_volume: float, heparin_dispense_rate: float, heparin_pre_stop: int, heparin_delivered_volume: float, heparin_type: int, average_arterial_pressure: float, average_venous_pressure: float, device_id: int, water_sample_test_result: int ) -> None: """ send post treatment log response @param accepted: true if accpeted @param reason: the rejection reason @param bood_flow_rate: bood flow rate @param dialysate_flow_rate: dialysate flow rate @param treatment_duration: treatment duration @param actual_treatment_duration: actual treatment duration @param acid_concentrate_type: acid concentrate type @param bicarbonate_concentrate_type: bicarbonate concentrate type @param potassium_concentration: potassium concentration @param calcium_concentration: calcium concentration @param bicarbonate_concentration: bicarbonate concentration @param sodium_concentration: sodium concentration @param dialysate_temperature: dialysate temperature @param dialyzer_type: dialyzer type @param treatment_start_date_time: treatment start date time @param treatment_end_date_time: treatment end date time @param average_blood_flow: average blood flow @param average_dialysate_flow: average dialysate flow @param dialysate_volume_used: dialysate volume used @param average_dialysate_temp: average dialysate temp @param origin_uf_volume: origin uf volume @param target_uf_volume: target uf volume @param actual_uf_volume: actual uf volume @param origin_uf_rate: origin uf rate @param target_uf_rate: target uf rate @param actual_uf_rate: actual uf rate @param saline_bolus_volume: saline bolus volume @param heparin_bolus_volume: heparin bolus volume @param heparin_dispense_rate: heparin dispense rate @param heparin_pre_stop: heparin pre stop @param heparin_delivered_volume: heparin delivered volume @param heparin_type: heparin type @param average_arterial_pressure: average arterial pressure @param average_venous_pressure: average venous pressure @param device_id: device id @param water_sample_test_result: water sample test result @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) payload += unsigned_integer_to_bytearray(int(bood_flow_rate)) payload += unsigned_integer_to_bytearray(int(dialysate_flow_rate)) payload += unsigned_integer_to_bytearray(int(treatment_duration)) payload += unsigned_integer_to_bytearray(int(actual_treatment_duration)) payload += unsigned_integer_to_bytearray(int(acid_concentrate_type)) payload += unsigned_integer_to_bytearray(int(bicarbonate_concentrate_type)) payload += unsigned_integer_to_bytearray(int(potassium_concentration)) payload += unsigned_integer_to_bytearray(int(calcium_concentration)) payload += unsigned_integer_to_bytearray(int(bicarbonate_concentration)) payload += unsigned_integer_to_bytearray(int(sodium_concentration)) payload += float_to_bytearray(float(dialysate_temperature)) payload += unsigned_integer_to_bytearray(int(dialyzer_type)) payload += unsigned_integer_to_bytearray(int(treatment_start_date_time)) payload += unsigned_integer_to_bytearray(int(treatment_end_date_time)) payload += float_to_bytearray(float(average_blood_flow)) payload += float_to_bytearray(float(average_dialysate_flow)) payload += float_to_bytearray(float(dialysate_volume_used)) payload += float_to_bytearray(float(average_dialysate_temp)) payload += float_to_bytearray(float(origin_uf_volume)) payload += float_to_bytearray(float(target_uf_volume)) payload += float_to_bytearray(float(actual_uf_volume)) payload += float_to_bytearray(float(origin_uf_rate)) payload += float_to_bytearray(float(target_uf_rate)) payload += float_to_bytearray(float(actual_uf_rate)) payload += unsigned_integer_to_bytearray(int(saline_bolus_volume)) payload += float_to_bytearray(float(heparin_bolus_volume)) payload += float_to_bytearray(float(heparin_dispense_rate)) payload += unsigned_integer_to_bytearray(int(heparin_pre_stop)) payload += float_to_bytearray(float(heparin_delivered_volume)) payload += unsigned_integer_to_bytearray(int(heparin_type)) payload += float_to_bytearray(float(average_arterial_pressure)) payload += float_to_bytearray(float(average_venous_pressure)) payload += unsigned_integer_to_bytearray(int(device_id)) payload += unsigned_integer_to_bytearray(int(water_sample_test_result)) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_DATA_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_data(self, blood_flow_rate: float, dialysate_flow_rate: float, uf_rate: float, arterial_pressure: float, venous_pressure: float) -> None: """ send the treatment log data @param blood_flow_rate: (float) blood flow rate @param dialysate_flow_rate: (float) Dialysate Flow Rate @param uf_rate: (float) UF Rate @param arterial_pressure: (float) Arterial Pressure @param venous_pressure: (float) Venous Pressure @return: None """ payload = float_to_bytearray(blood_flow_rate) payload += float_to_bytearray(dialysate_flow_rate) payload += float_to_bytearray(uf_rate) payload += float_to_bytearray(arterial_pressure) payload += float_to_bytearray(venous_pressure) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_PERIODIC_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_alarm(self, alarm_id: int, parameter1: float, parameter2: float) -> None: """ send the treatment log data @param alarm_id: (U32) alarm ID @param parameter1: (F32) paramter 1 (it's not clear yet how many paramters with what type is required and this is only plceholder) @param parameter2: (F32) paramter 2 (it's not clear yet how many paramters with what type is required and this is only plceholder) @return: None """ payload = integer_to_bytearray(alarm_id) payload += float_to_bytearray(parameter1) payload += float_to_bytearray(parameter2) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_ALARM_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_treatment_log_event(self, event_id: int, old_value: float, new_value: float) -> None: """ send the treatment log data @param event_id: (U32) alarm ID @param old_value: (F32) the old value @param new_value: (F32) the new value @return: none """ payload = integer_to_bytearray(event_id) payload += float_to_bytearray(old_value) payload += float_to_bytearray(new_value) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_TREATMENT_LOG_EVENT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_disinfect_response(self, accepted: bool, reason: int) -> None: """ the HD response to the request from UI to initiate a disinfection/flush @param accepted: boolean accepted or rejected @param reason: the rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_DISINFECT_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) @publish(["ui_version"]) def _handler_ui_version(self, message, timestamp=0.0) -> None: """ Handles the ui version response @param message: The ui version response message @return: None """ payload = message['message'] index = DenaliMessage.PAYLOAD_START_INDEX major, index = bytearray_to_byte(payload, index, False) minor, index = bytearray_to_byte(payload, index, False) micro, index = bytearray_to_byte(payload, index, False) build, index = bytearray_to_short(payload, index, False) compt, index = bytearray_to_integer(payload, index, False) self.ui_version_info_response_timestamp = timestamp self.ui_version = f"v{major}.{minor}.{micro}.{build}.{compt}" self.logger.debug(f"UI VERSION: {self.ui_version}") if self.console_out: print("Version:", self.ui_version) def cmd_send_hd_request_ui_version(self) -> None: """ send HD request for UI version @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_UI_VERSION_INFO_REQUEST.value, payload=None) self.can_interface.send(message, 0) def cmd_send_hd_post(self, item: int, passed: bool, done: bool = False) -> None: """ send HD post message the single(item) or the final(done) @param item: the post state/item index @param passed: the post result single or final @param done: if this is the final post message this should be true @return: None """ payload = integer_to_bytearray(passed) if not done: payload += integer_to_bytearray(item) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_POST_FINAL_TEST_RESULT.value if done else MsgIds.MSG_ID_HD_POST_SINGLE_TEST_RESULT.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_confirm_post_tx_next(self, accepted: int, reason: int): """ the HD response to the request from UI to continue after a post treatment review @param accepted: boolean accepted or rejected @param reason: the rejection reason @return: None """ payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_POST_TX_NEXT_CMD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_heparin_response(self, syringePumpVolumeDelivered: float, syringePumpVolumeRequired: float): """ the HD response to the request from UI to continue after a post treatment review @param accepted: boolean accepted or rejected @param reason: the rejection reason @return: None """ payload = float_to_bytearray(syringePumpVolumeDelivered) payload += float_to_bytearray(syringePumpVolumeRequired) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_HEPARIN_DATA.value, payload=payload) self.can_interface.send(message, 0) # ------------------------------------------------ GENERAL MESSAGES ------------------------------------------------ def cmd_send_hd_general_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00, channel_id=DenaliChannels.hd_to_ui_ch_id) -> None: """ a general method to send any standard response message, by it's id and list of parameters. @param message_id: the id of the message @param accepted: the standard accepted parameter of any response message @param reason: the standard rejection reason parameter of any response message @param is_pure_data: The message only has data @param has_parameters: if the message has parameter this needs to be true. @param parameters_payload: the list of parameters pre-converted and ready to be concatenated to the payload. @param channel_id: (int) indicates the channel @return: None """ payload = "" if not is_pure_data: payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) if has_parameters: payload = parameters_payload message = DenaliMessage.build_message(channel_id=channel_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_general_progress_data(self, message_id: int, total: int, countdown: int) -> None: """ a general method t send any standard progress data message, by it's id @param message_id: the id of the message @param total: the total value of the progress data @param countdown: the remaining or countdown value @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_ack(self, seq: int) -> None: """ sending hd ack message by the sequence seq @param seq: the message sequence number @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=GuiActionType.Acknow, seq=seq) self.can_interface.send(message, 0) def cmd_send_power_on_self_test_version_request(self) -> None: """ Sends the power on self test version request @return: None """ payload = bytearray() message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIdsDialin.MSG_DIALIN_ID_HD_VERSION_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def _handler_ui_post_ui_version_compatibility(self, message: dict, timestamp=0.0) -> None: """ Handles the UI's reporting of its version during the power on self tests @param message: The message data @return: None """ ui_major = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] ui_minor = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] ui_micro = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] ui_build = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] ui_compatibility = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.ui_version_info_compatibility_timestamp = timestamp self.logger.debug("UI version power on self test received: " "Major: {0} " "Minor: {1} " "Micro: {2} " "Build: {3} " "Compat: {4} " .format(ui_major, ui_minor, ui_micro, ui_build, ui_compatibility)) def cmd_send_hd_blood_leak_data(self, blood_leak_status: int, blood_leak_state: int, blood_leak_zero_status_counter: int, blood_leak_serial_state: int) -> None: """ A simulated HD broadcast message of blood leak data. @param blood_leak_status: (int) Blood leak status @param blood_leak_state: (int) Blood leak state @param blood_leak_zero_status_counter: (int) Blood leak zero status counter @param blood_leak_serial_state: (int) Blood leak serial comm state @return: None """ payload = unsigned_integer_to_bytearray(blood_leak_status) payload += unsigned_integer_to_bytearray(blood_leak_state) payload += unsigned_integer_to_bytearray(blood_leak_zero_status_counter) payload += unsigned_integer_to_bytearray(blood_leak_serial_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_LEAK_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_air_trap_data(self, lower_level: int, upper_level: int) -> None: """ A simulated HD broadcast message of air trap data. @param lower_level: (int) Air trap lower level @param upper_level: (int) Air trap upper level @return: None """ payload = unsigned_integer_to_bytearray(lower_level) payload += unsigned_integer_to_bytearray(upper_level) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_AIR_TRAP_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_air_bubble_data(self, venous_air_bubble_status: int, venous_air_bubble_state: int) -> None: """ A simulated HD broadcast message of air bubble data. @param venous_air_bubble_status: (int) Venous air bubble status @param venous_air_bubble_state: (int) Venous air bubble state @return: None """ payload = unsigned_integer_to_bytearray(venous_air_bubble_status) payload += unsigned_integer_to_bytearray(venous_air_bubble_state) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_BUBBLES_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_service_mode_response(self, accepted: int, reason: int) -> None: """ A simulated HD response message of service mode request @param accepted: (int) acceptance. 1 if accepted @param reason: (int) rejection reason @return: None """ payload = unsigned_integer_to_bytearray(accepted) payload += unsigned_integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_RESPONSE_SERVICE_MODE_REQUEST.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_hd_institutional_record_response(self, minBloodFlowMLPM: int, maxBloodFlowMLPM: int, minDialysateFlowMLPM: int, maxDialysateFlowMLPM: int, minTxDurationMIN: int, maxTxDurationMIN: int, minStopHeparinDispBeforeTxEndMIN: int, maxStopHeparinDispBeforeTxEndMIN: int, minSalineBolusVolumeML: int, maxSalineBolusVolumeML: int, minDialysateTempC: float, maxDialysateTempC: float, minArtPressLimitWindowMMHG: int, maxArtPressLimitWindowMMHG: int, minVenPressLimitWindowMMHG: int, maxVenPressLimitWindowMMHG: int, minVenAsymPressLimitMMHG: int, maxVenAsymPressLimitMMHG: int, minUFVolumeL: float, maxUFVolumeL: float, minHeparinDispRateMLPHR: float, maxHeparinDispRateMLPHR: float, minHeparinBolusVolumeML: float, maxHeparinBolusVolumeML: float, enableChemicalDisinfect: int) -> None: """ A simulated HD response message of hd institutational record response. Default values: _DEFAULT_MIN_BLOOD_FLOW_MLPM = 100 _DEFAULT_MAX_BLOOD_FLOW_MLPM = 500 _DEFAULT_MIN_DIALYSATE_FLOW_MLPM = 100 _DEFAULT_MAX_DIALYSATE_FLOW_MLPM = 600 _DEFAULT_MIN_TX_DURATION_MIN = 60 _DEFAULT_MAX_TX_DURATION_MIN = 480 _DEFAULT_MIN_STOP_HEP_DISP_BEFORE_TX_END_MIN = 0 _DEFAULT_MAX_STOP_HEP_DISP_BEFORE_TX_END_MIN = 480 _DEFAULT_MIN_SALINE_BOLUS_VOLUME_ML = 100 _DEFAULT_MAX_SALINE_BOLUS_VOLUME_ML = 300 _DEFAULT_MIN_DIALYSATE_TEMPERATURE_C = 35.0 _DEFAULT_MAX_DIALYSATE_TEMPERATURE_C = 37.0 _DEFAULT_MIN_ART_PRESS_LIMIT_WINDOW_MMHG = 120 _DEFAULT_MAX_ART_PRESS_LIMIT_WINDOW_MMHG = 200 _DEFAULT_MIN_VEN_PRESS_LIMIT_WINDOW_MMHG = 100 _DEFAULT_MAX_VEN_PRESS_LIMIT_WINDOW_MMHG = 200 _DEFAULT_MIN_VEN_ASYM_PRESS_LIMIT_WINDOW_MMHG = 20 _DEFAULT_MAX_VEN_ASYM_PRESS_LIMIT_WINDOW_MMHG = 35 _DEFAULT_MIN_UF_VOLUME_L = 0.0 _DEFAULT_MAX_UF_VOLUME_L = 8.0 _DEFAULT_MIN_HEPARIN_DISP_RATE_MLPHR = 0.0 _DEFAULT_MAX_HEPARIN_DISP_RATE_MLPHR = 1.0 _DEFAULT_MIN_HEPARIN_BOLUS_VOLUME_ML = 0.0 _DEFAULT_MAX_HEPARIN_BOLUS_VOLUME_ML = 2.0 _DEFAULT_ENABLE_CHEM_DISINFECT = 1 @param minBloodFlowMLPM: (int) Min blood flow in mL/min. @param maxBloodFlowMLPM: (int) Max blood flow in mL/min. @param minDialysateFlowMLPM: (int) Min dialysate flow in mL/min. @param maxDialysateFlowMLPM: (int) Max dialysate flow in mL/min. @param minTxDurationMIN: (int) Min treatment duration in minutes. @param maxTxDurationMIN: (int) Max treatment duration in minutes. @param minStopHeparinDispBeforeTxEndMIN: (int) Min stop heparin dispense before treatment end in minutes. @param maxStopHeparinDispBeforeTxEndMIN: (int) Max stop heparin dispense before treatment end in minutes. @param minSalineBolusVolumeML: (int) Min saline bolus volume in milliliters. @param maxSalineBolusVolumeML: (int) Max saline bolus volume in milliliters. @param minDialysateTempC: (float) Min dialysate temperature in C. @param maxDialysateTempC: (float) Max dialysate temperature in C. @param minArtPressLimitWindowMMHG: (int) Min arterial pressure limit window in mmHg. @param maxArtPressLimitWindowMMHG: (int) Max arterial pressure limit window in mmHg. @param minVenPressLimitWindowMMHG: (int) Min venous pressure limit window in mmHg. @param maxVenPressLimitWindowMMHG: (int) Max venous pressure limit window in mmHg. @param minVenAsymPressLimitMMHG: (int) Min venous asymmetric pressure limit in mmHg. @param maxVenAsymPressLimitMMHG: (int) Max venous asymmetric pressure limit in mmHg. @param minUFVolumeL: (float) Min ultrafiltration volume in mL. @param maxUFVolumeL: (float) Max ultrafiltration volume in mL. @param minHeparinDispRateMLPHR : (float) Min heparin dispense rate in mL/hr. @param maxHeparinDispRateMLPHR: (float) Max heparin dispense rate in mL/hr. @param minHeparinBolusVolumeML: (float) Min heparin bolus volume in mL. @param maxHeparinBolusVolumeML: (float) Max heparin bolus volume in mL. @param enableChemicalDisinfect: (int) Enable/disable chemical disinfect. @return: None """ payload = unsigned_integer_to_bytearray(minBloodFlowMLPM) payload += unsigned_integer_to_bytearray(maxBloodFlowMLPM) payload += unsigned_integer_to_bytearray(minDialysateFlowMLPM) payload += unsigned_integer_to_bytearray(maxDialysateFlowMLPM) payload += unsigned_integer_to_bytearray(minTxDurationMIN) payload += unsigned_integer_to_bytearray(maxTxDurationMIN) payload += unsigned_integer_to_bytearray(minStopHeparinDispBeforeTxEndMIN) payload += unsigned_integer_to_bytearray(maxStopHeparinDispBeforeTxEndMIN) payload += unsigned_integer_to_bytearray(minSalineBolusVolumeML) payload += unsigned_integer_to_bytearray(maxSalineBolusVolumeML) payload += float_to_bytearray(minDialysateTempC) payload += float_to_bytearray(maxDialysateTempC) payload += unsigned_integer_to_bytearray(minArtPressLimitWindowMMHG) payload += unsigned_integer_to_bytearray(maxArtPressLimitWindowMMHG) payload += unsigned_integer_to_bytearray(minVenPressLimitWindowMMHG) payload += unsigned_integer_to_bytearray(maxVenPressLimitWindowMMHG) payload += unsigned_integer_to_bytearray(minVenAsymPressLimitMMHG) payload += unsigned_integer_to_bytearray(maxVenAsymPressLimitMMHG) payload += float_to_bytearray(minUFVolumeL) payload += float_to_bytearray(maxUFVolumeL) payload += float_to_bytearray(minHeparinDispRateMLPHR) payload += float_to_bytearray(maxHeparinDispRateMLPHR) payload += float_to_bytearray(minHeparinBolusVolumeML) payload += float_to_bytearray(maxHeparinBolusVolumeML) payload += unsigned_integer_to_bytearray(enableChemicalDisinfect) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_INSTITUTIONAL_RECORD_RESPONSE.value, payload=payload) self.can_interface.send(message, 0)