########################################################################### # # Copyright (c) 2020-2022 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_ui_proxy.py # # @author (last) Behrouz NematiPour # @date (last) 17-Mar-2022 # @author (original) Peter Lucia # @date (original) 16-Dec-2020 # ############################################################################ import unittest import sys import struct sys.path.append("../../") from dialin import HD from dialin.utils import setup_virtual_can_interface from collections import OrderedDict from dialin.utils.conversions import integer_to_bytearray, float_to_bytearray from dialin.protocols.CAN import (DenaliMessage, DenaliChannels) from dialin.common import MsgIds from dialin.utils import is_interface_up, is_interface_present class Test(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not is_interface_present("can0") or not is_interface_up("can0"): setup_virtual_can_interface() self.hd = HD() self.params = OrderedDict([ ("bld_flow", 100 ), ("dia_flow", 100 ), ("duration", 60 ), ("hep_rate", 0.1 ), ("hep_bol" , 0.1 ), ("hep_stop", 1 ), ("sal_bol" , 100 ), ("acid" , 0 ), ("bicarb" , 0 ), ("dialyzer", 0 ), ("dia_temp", 35.0 ), ("art_low" , -300 ), ("art_high", -200 ), ("ven_low" , -100 ), ("ven_high", 100 ), ("bp_intvl", 15 ), ("rb_flow" , 50) ]) def test_treatment_parameters(self): result = [] for key, value in self.params.items(): result.append(value) self.hd.ui.cmd_set_treatment_parameters(**self.params) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_BLOOD_FLOW_RATE_ML_MIN.value] , self.params.get("bld_flow", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_DIALYSATE_FLOW_RATE_ML_MIN.value] , self.params.get("dia_flow", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_TREATMENT_DURATION_MIN.value] , self.params.get("duration", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_HEPARIN_DISPENSE_RATE_ML_HR.value] , self.params.get("hep_rate", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_HEPARIN_BOLUS_VOLUME_ML.value] , self.params.get("hep_bol" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_HEPARIN_PRESTOP_MIN.value] , self.params.get("hep_stop", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_SALINE_BOLUS_VOLUME_ML.value] , self.params.get("sal_bol" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_ACID_CONCENTRATE.value] , self.params.get("acid" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_BICARB_CONCENTRATE.value] , self.params.get("bicarb" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_DIALYZER_TYPE.value] , self.params.get("dialyzer", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_DIALYSATE_TEMPERATURE_C.value] , self.params.get("dia_temp", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_ARTERIAL_PRESSURE_LOW_LIMIT_MMHG.value] , self.params.get("art_low" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_ARTERIAL_PRESSURE_HIGH_LIMIT_MMHG.value] , self.params.get("art_high", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_VENOUS_PRESSURE_LOW_LIMIT_MMHG.value] , self.params.get("ven_low" , None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_VENOUS_PRESSURE_HIGH_LIMIT_MMHG.value] , self.params.get("ven_high", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_BLOOD_PRESSURE_MEAS_INTERVAL_MIN.value] , self.params.get("bp_intvl", None)) self.assertEqual(self.hd.ui.treatment_parameters[self.hd.ui.TreatmentParameters.TREATMENT_PARAM_RINSEBACK_FLOW_RATE_ML_MIN.value] , self.params.get("rb_flow" , None)) def test_treatment_parameters_settings_normal(self): payload = integer_to_bytearray(self.params.get("bld_flow", None)) payload += integer_to_bytearray(self.params.get("dia_flow", None)) payload += integer_to_bytearray(self.params.get("duration", None)) payload += float_to_bytearray(self.params.get("hep_rate", None)) payload += float_to_bytearray(self.params.get("hep_bol", None)) payload += integer_to_bytearray(self.params.get("hep_stop", None)) payload += integer_to_bytearray(self.params.get("sal_bol", None)) payload += integer_to_bytearray(self.params.get("acid", None)) payload += integer_to_bytearray(self.params.get("bicarb", None)) payload += integer_to_bytearray(self.params.get("dialyzer", None)) payload += float_to_bytearray(self.params.get("dia_temp", None)) payload += integer_to_bytearray(self.params.get("art_low", None)) payload += integer_to_bytearray(self.params.get("art_high", None)) payload += integer_to_bytearray(self.params.get("ven_low", None)) payload += integer_to_bytearray(self.params.get("ven_high", None)) payload += integer_to_bytearray(self.params.get("bp_intvl", None)) payload += integer_to_bytearray(self.params.get("rb_flow", None)) message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_hd_ch_id, message_id=MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS.value, payload=payload) self.assertIsNone(self.hd.ui._handler_treatment_param_settings(message)) def test_treatment_parameters_settings_wrong_type(self): payload = bytearray() message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_hd_ch_id, message_id=MsgIds.MSG_ID_UI_NEW_TREATMENT_PARAMS.value, payload=payload) self.assertRaises(struct.error, self.hd.ui._handler_treatment_param_settings, message) if __name__ == '__main__': sys.exit(unittest.main(verbosity=2).result.wasSuccessful())