""" Treatment Adjustment Pressure Limits UI Loader """ # Python import os import can import struct # Qt from PySide2 import QtCore, QtWidgets from PySide2.QtCore import Slot # parent from engine.dynamicloader import DynamicLoader # plugin specific from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols import CAN from leahi_dialin.utils import conversions # TD simulator from leahi_dialin.ui.td_messaging import TD_Messaging class Loader(DynamicLoader): """ Treatment Adjustment Pressure Limits UI Loader """ def __init__(self): self.td_interface = TD_Messaging() self.can_interface = self.td_interface.can_interface super().__init__(os.path.dirname(__file__)) if self.can_interface is not None: self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_TREATMENT_SET_POINTS_CHANGE_REQUEST.value, self.handle_set_points_change_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_TREATMENT_SET_POINT_BLOOD_FLOW_CHANGE_REQUEST.value, self.handle_blood_flow_change_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_TREATMENT_SET_POINT_DIALYSATE_FLOW_CHANGE_REQUEST.value, self.handle_dialysate_flow_change_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_TREATMENT_SET_POINT_DIALYSATE_TEMPERATURE_CHANGE_REQUEST.value, self.handle_dialysate_temperature_change_request) def _init_loader(self): """ finds and creates widgets :return: none """ self.tbSetPointsReqReset = self.find_widget(QtWidgets.QToolButton , 'tbSetPointsReqReset' ) self.lbSetPointsReqBloodFlowRate = self.find_widget(QtWidgets.QLabel , 'lbSetPointsReqBloodFlowRate' ) self.lbSetPointsReqDialysateFlowRate = self.find_widget(QtWidgets.QLabel , 'lbSetPointsReqDialysateFlowRate' ) self.lbSetPointsReqDialysateTemperature = self.find_widget(QtWidgets.QLabel , 'lbSetPointsReqDialysateTemperature' ) self.lbSetPointsReqAcidConcentrate = self.find_widget(QtWidgets.QLabel , 'lbSetPointsReqAcidConcentrate' ) self.lbSetPointsReqBicarbConcentrate = self.find_widget(QtWidgets.QLabel , 'lbSetPointsReqBicarbConcentrate' ) self.tbSetPointsRspSend = self.find_widget(QtWidgets.QToolButton , 'tbSetPointsRspSend' ) self.tbSetPointsRspReset = self.find_widget(QtWidgets.QToolButton , 'tbSetPointsRspReset' ) self.sbSetPointsRspRejectionReason = self.find_widget(QtWidgets.QSpinBox , 'sbSetPointsRspRejectionReason' ) self.tbBloodFlowRateRspSend = self.find_widget(QtWidgets.QToolButton , 'tbBloodFlowRateRspSend' ) self.tbBloodFlowRateRspReset = self.find_widget(QtWidgets.QToolButton , 'tbBloodFlowRateRspReset' ) self.lbBloodFlowRateReq = self.find_widget(QtWidgets.QLabel , 'lbBloodFlowRateReq' ) self.sbBloodFlowRateRspRejectionReason = self.find_widget(QtWidgets.QSpinBox , 'sbBloodFlowRateRspRejectionReason' ) self.tbDialysateFlowRateRspSend = self.find_widget(QtWidgets.QToolButton , 'tbDialysateFlowRateRspSend' ) self.tbDialysateFlowRateRspReset = self.find_widget(QtWidgets.QToolButton , 'tbDialysateFlowRateRspReset' ) self.lbDialysateFlowRateReq = self.find_widget(QtWidgets.QLabel , 'lbDialysateFlowRateReq' ) self.sbDialysateFlowRateRspRejectionReason = self.find_widget(QtWidgets.QSpinBox , 'sbDialysateFlowRateRspRejectionReason' ) self.tbDialysateTemperatureRspSend = self.find_widget(QtWidgets.QToolButton , 'tbDialysateTemperatureRspSend' ) self.tbDialysateTemperatureRspReset = self.find_widget(QtWidgets.QToolButton , 'tbDialysateTemperatureRspReset' ) self.lbDialysateTemperatureReq = self.find_widget(QtWidgets.QLabel , 'lbDialysateTemperatureReq' ) self.sbDialysateTemperatureRspRejectionReason = self.find_widget(QtWidgets.QSpinBox , 'sbDialysateTemperatureRspRejectionReason') def _init_connections(self): """ initializes the widgets connections :return: none """ self.tbSetPointsReqReset .clicked .connect(self.do_set_points_request_reset ) self.tbSetPointsRspSend .clicked .connect(self.do_set_points_response_send ) self.tbSetPointsRspReset .clicked .connect(self.do_set_points_response_reset ) self.tbBloodFlowRateRspSend .clicked .connect(self.do_blood_flow_rate_response_send ) self.tbBloodFlowRateRspReset .clicked .connect(self.do_blood_flow_rate_response_reset ) self.tbDialysateFlowRateRspSend .clicked .connect(self.do_dialysate_flow_rate_response_send ) self.tbDialysateFlowRateRspReset .clicked .connect(self.do_dialysate_flow_rate_response_reset ) self.tbDialysateTemperatureRspSend .clicked .connect(self.do_dialysate_temperature_response_send ) self.tbDialysateTemperatureRspReset .clicked .connect(self.do_dialysate_temperature_response_reset ) def _init_widgets(self): """ initializes the widgets' properties :return: none """ self.do_set_points_reset() @Slot() def do_set_points_reset(self): self.do_set_points_request_reset() self.do_set_points_response_reset() @Slot() def do_set_points_request_reset(self): self.lbSetPointsReqBloodFlowRate .setText("--") self.lbSetPointsReqDialysateFlowRate .setText("--") self.lbSetPointsReqDialysateTemperature .setText("--") self.lbSetPointsReqAcidConcentrate .setText("--") self.lbSetPointsReqBicarbConcentrate .setText("--") @Slot() def handle_set_points_change_request(self, message, timestamp = 0.0): """ Called when the user requests set points change from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 param1,index = conversions.bytearray_to_integer (message, index) param2,index = conversions.bytearray_to_integer (message, index) param3,index = conversions.bytearray_to_float (message, index) param4,index = conversions.bytearray_to_integer (message, index) param5,index = conversions.bytearray_to_integer (message, index) self.lbSetPointsReqBloodFlowRate .setText( f"{param1}" ) self.lbSetPointsReqDialysateFlowRate .setText( f"{param2}" ) self.lbSetPointsReqDialysateTemperature .setText( f"{param3:.1f}" ) self.lbSetPointsReqAcidConcentrate .setText( f"{param4}" ) self.lbSetPointsReqBicarbConcentrate .setText( f"{param5}" ) @Slot() def do_set_points_response_send(self): self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_TREATMENT_SET_POINTS_CHANGE_RESPONSE.value, accepted = 1 if self.sbSetPointsRspRejectionReason.value() == 0 else 0, reason = self.sbSetPointsRspRejectionReason.value(), is_pure_data = False, has_parameters = False, ) @Slot() def do_set_points_response_reset(self): self.sbSetPointsRspRejectionReason.setValue(0) @Slot() def handle_blood_flow_change_request(self, message, timestamp = 0.0): """ Called when the user requests blood flow rate change from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_integer(message, index) self.lbBloodFlowRateReq.setText(f"{value}") @Slot() def do_blood_flow_rate_response_send(self): self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_TREATMENT_SET_POINT_BLOOD_FLOW_CHANGE_RESPONSE.value, accepted = 1 if self.sbBloodFlowRateRspRejectionReason.value() == 0 else 0, reason = self.sbBloodFlowRateRspRejectionReason.value(), is_pure_data = False, has_parameters = False, ) @Slot() def do_blood_flow_rate_response_reset(self): self.lbBloodFlowRateReq .setText("--") self.sbBloodFlowRateRspRejectionReason .setValue(0) @Slot() def do_dialysate_flow_rate_reset(self): self.do_dialysate_flow_rate_response_reset() @Slot() def handle_dialysate_flow_change_request(self, message, timestamp = 0.0): """ Called when the user requests dialysate flow rate change from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_integer (message, index) self.lbDialysateFlowRateReq.setText(f"{value}") @Slot() def do_dialysate_flow_rate_response_send(self): self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_TREATMENT_SET_POINT_DIALYSATE_FLOW_CHANGE_RESPONSE.value, accepted = 1 if self.sbDialysateFlowRateRspRejectionReason.value() == 0 else 0, reason = self.sbDialysateFlowRateRspRejectionReason.value(), is_pure_data = False, has_parameters = False, ) @Slot() def do_dialysate_flow_rate_response_reset(self): self.lbDialysateFlowRateReq .setText("--") self.sbDialysateFlowRateRspRejectionReason .setValue(0) @Slot() def handle_dialysate_temperature_change_request(self, message, timestamp = 0.0): """ Called when the user requests dialysate temperature change from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_float(message, index) self.lbDialysateTemperatureReq.setText(f"{value:.1f}") @Slot() def do_dialysate_temperature_response_send(self): self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_TREATMENT_SET_POINT_DIALYSATE_TEMPERATURE_CHANGE_RESPONSE.value, accepted = 1 if self.sbDialysateTemperatureRspRejectionReason.value() == 0 else 0, reason = self.sbDialysateTemperatureRspRejectionReason.value(), is_pure_data = False, has_parameters = False, ) @Slot() def do_dialysate_temperature_response_reset(self): self.lbDialysateTemperatureReq .setText("--") self.sbDialysateTemperatureRspRejectionReason .setValue(0)