########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_ui_proxy.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Peter Lucia # @date (original) 16-Dec-2020 # ############################################################################ import unittest import sys import struct sys.path.append("../../") from dialin import HD from dialin.utils import setup_virtual_can_interface from collections import OrderedDict from dialin.utils.conversions import integer_to_bytearray, float_to_bytearray from dialin.protocols.CAN import (DenaliMessage, DenaliChannels) from dialin.common import MsgIds, TreatmentParameters from dialin.utils import is_interface_up, is_interface_present class Test(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not is_interface_present("can0") or not is_interface_up("can0"): setup_virtual_can_interface() self.hd = HD() self.params = OrderedDict([ ("bld_flow", 100 ), ("dia_flow", 100 ), ("duration", 60 ), ("hep_rate", 0.1 ), ("hep_bol" , 0.2 ), ("hep_stop", 1 ), ("sal_bol" , 100 ), ("acid" , 0 ), ("bicarb" , 0 ), ("dialyzer", 0 ), ("hep_type", 0 ), ("dia_temp", 35.0 ), ("art_win" , 120 ), ("ven_win" , 100 ), ("ven_asy" , 20 ), ("bp_intvl", 15 ), ("rb_flow" , 50 ) ]) def test_treatment_parameters(self): result = [] for key, value in self.params.items(): result.append(value) self.hd.ui.cmd_set_treatment_parameters(**self.params) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_BLOOD_FLOW_RATE_ML_MIN.value] , self.params.get("bld_flow", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_DIALYSATE_FLOW_RATE_ML_MIN.value] , self.params.get("dia_flow", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_TREATMENT_DURATION_MIN.value] , self.params.get("duration", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_HEPARIN_DISPENSE_RATE_ML_HR.value] , self.params.get("hep_rate", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_HEPARIN_BOLUS_VOLUME_ML.value] , self.params.get("hep_bol" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_HEPARIN_PRESTOP_MIN.value] , self.params.get("hep_stop", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_SALINE_BOLUS_VOLUME_ML.value] , self.params.get("sal_bol" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_ACID_CONCENTRATE.value] , self.params.get("acid" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_BICARB_CONCENTRATE.value] , self.params.get("bicarb" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_DIALYZER_TYPE.value] , self.params.get("dialyzer", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_HEPARIN_TYPE.value] , self.params.get("hep_type", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_DIALYSATE_TEMPERATURE_C.value] , self.params.get("dia_temp", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_ART_PRES_LIMIT_WINDOW.value] , self.params.get("art_win" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_VEN_PRES_LIMIT_WINDOW.value] , self.params.get("ven_win" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_VEN_PRES_LIMIT_ASYMMETRIC.value] , self.params.get("ven_asy" , None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_BLOOD_PRESSURE_MEAS_INTERVAL_MIN.value] , self.params.get("bp_intvl", None)) self.assertEqual(self.hd.ui.treatment_parameters[TreatmentParameters.TREATMENT_PARAM_RINSEBACK_FLOW_RATE_ML_MIN.value] , self.params.get("rb_flow" , None)) def test_treatment_parameters_settings_normal(self): test_timestamp = 0.0 payload = integer_to_bytearray(self.params.get("bld_flow", None)) payload += integer_to_bytearray(self.params.get("dia_flow", None)) payload += integer_to_bytearray(self.params.get("duration", None)) payload += float_to_bytearray(self.params.get("hep_rate", None)) payload += float_to_bytearray(self.params.get("hep_bol", None)) payload += integer_to_bytearray(self.params.get("hep_stop", None)) payload += integer_to_bytearray(self.params.get("sal_bol", None)) payload += integer_to_bytearray(self.params.get("acid", None)) payload += integer_to_bytearray(self.params.get("bicarb", None)) payload += integer_to_bytearray(self.params.get("dialyzer", None)) payload += integer_to_bytearray(self.params.get("hep_type", None)) payload += float_to_bytearray(self.params.get("dia_temp", None)) payload += integer_to_bytearray(self.params.get("art_win", None)) payload += integer_to_bytearray(self.params.get("ven_win", None)) payload += integer_to_bytearray(self.params.get("ven_asy", None)) payload += integer_to_bytearray(self.params.get("bp_intvl", None)) payload += integer_to_bytearray(self.params.get("rb_flow", None)) message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_hd_ch_id, message_id=MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS_REQUEST.value, payload=payload) self.assertIsNone(self.hd.ui._handler_treatment_param_settings(message, test_timestamp)) def test_treatment_parameters_settings_wrong_type(self): test_timestamp = 0.0 payload = bytearray() message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_hd_ch_id, message_id=MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS_REQUEST.value, payload=payload) self.assertRaises(struct.error, self.hd.ui._handler_treatment_param_settings, message, test_timestamp) if __name__ == '__main__': sys.exit(unittest.main(verbosity=2).result.wasSuccessful())