""" TD Ultrafiltration UI Loader """ # Python import os import can import struct # Qt from PySide2 import QtCore, QtWidgets from PySide2.QtCore import Slot # parent from engine.dynamicloader import DynamicLoader # plugin specific from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols import CAN from leahi_dialin.utils import conversions from leahi_dialin.common.td_defs import TDOpModes from leahi_dialin.common.td_defs import TDStandbyStates # hd Simulator from leahi_dialin.ui.td_messaging import TD_Messaging class Loader(DynamicLoader): """ TD Ultrafiltration UI Loader """ def __init__(self): self.td_interface = TD_Messaging() self.can_interface = self.td_interface.can_interface super().__init__(os.path.dirname(__file__)) if self.can_interface is not None: self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_UF_PAUSE_RESUME_REQUEST.value, self.handle_pause_resume_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ULTRAFILTRATION_VOLUME_TO_VALIDATE.value, self.handle_settings_change_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_USER_CONFIRM_UF_SETTINGS_CHANGE_REQUEST.value, self.handle_confirm_request) def _init_loader(self): """ finds and creates widgets :return: none """ self.uf_scaling = 100.00 # UF Scaling factor to be 2 decimal places self.allSend = self.find_widget(QtWidgets.QToolButton , 'allSendButton' ) self.allReset = self.find_widget(QtWidgets.QToolButton , 'allResetButton' ) self.ufDataSend = self.find_widget(QtWidgets.QToolButton, 'ufDataSendButton' ) self.ufDataReset = self.find_widget(QtWidgets.QToolButton, 'ufDataResetButton' ) self.ufSetVolume = self.find_widget(QtWidgets.QSlider , 'ufSetVolumeSlider' ) self.ufTargetRate = self.find_widget(QtWidgets.QSlider , 'ufTargetRateSlider' ) self.ufVolumeDelivered = self.find_widget(QtWidgets.QSlider , 'ufVolumeDeliveredSlider' ) self.ufState = self.find_widget(QtWidgets.QSpinBox , 'ufStateSpinBox' ) self.ufSetVolumeLabel = self.find_widget(QtWidgets.QLabel , 'ufSetVolumeLabel' ) self.ufTargetRateLabel = self.find_widget(QtWidgets.QLabel , 'ufTargetRateLabel' ) self.ufVolumeDeliveredLabel = self.find_widget(QtWidgets.QLabel , 'ufVolumeDeliveredLabel' ) self.prSend = self.find_widget(QtWidgets.QToolButton , 'prSendButton' ) self.prReset = self.find_widget(QtWidgets.QToolButton , 'prResetButton' ) self.prRequestLabel = self.find_widget(QtWidgets.QLabel , 'prRequestLabel' ) self.prAccepted = self.find_widget(QtWidgets.QRadioButton , 'prAcceptRadioButton' ) self.prRejectReason = self.find_widget(QtWidgets.QSpinBox , 'prRejectReasonSpinBox' ) self.changeSend = self.find_widget(QtWidgets.QToolButton , 'changeSendButton' ) self.changeReset = self.find_widget(QtWidgets.QToolButton , 'changeResetButton' ) self.changeRequestLabel = self.find_widget(QtWidgets.QLabel , 'changeRequestLabel' ) self.changeAccepted = self.find_widget(QtWidgets.QRadioButton , 'changeAcceptRadioButton' ) self.changeRejectReason = self.find_widget(QtWidgets.QSpinBox , 'changeRejectReasonSpinBox' ) self.changeVolume = self.find_widget(QtWidgets.QSlider , 'changeVolumeSlider' ) self.changeVolumeLabel = self.find_widget(QtWidgets.QLabel , 'changeVolumeLabel' ) self.changeDuration = self.find_widget(QtWidgets.QSlider , 'changeDurationSlider' ) self.changeDurationLabel = self.find_widget(QtWidgets.QLabel , 'changeDurationLabel' ) self.changeTimeDiff = self.find_widget(QtWidgets.QSlider , 'changeTimeDiffSlider' ) self.changeTimeDiffLabel = self.find_widget(QtWidgets.QLabel , 'changeTimeDiffLabel' ) self.changeRate = self.find_widget(QtWidgets.QSlider , 'changeRateSlider' ) self.changeRateLabel = self.find_widget(QtWidgets.QLabel , 'changeRateLabel' ) self.changeRateDiff = self.find_widget(QtWidgets.QSlider , 'changeRateDiffSlider' ) self.changeRateDiffLabel = self.find_widget(QtWidgets.QLabel , 'changeRateDiffLabel' ) self.changeOldRate = self.find_widget(QtWidgets.QSlider , 'changeOldRateSlider' ) self.changeOldRateLabel = self.find_widget(QtWidgets.QLabel , 'changeOldRateLabel' ) self.confirmSend = self.find_widget(QtWidgets.QToolButton , 'confirmSendButton' ) self.confirmReset = self.find_widget(QtWidgets.QToolButton , 'confirmResetButton' ) self.confirmRequestLabel = self.find_widget(QtWidgets.QLabel , 'confirmRequestLabel' ) self.confirmAccepted = self.find_widget(QtWidgets.QRadioButton , 'confirmAcceptRadioButton' ) self.confirmRejectReason = self.find_widget(QtWidgets.QSpinBox , 'confirmRejectReasonSpinBox' ) self.confirmVolume = self.find_widget(QtWidgets.QSlider , 'confirmVolumeSlider' ) self.confirmVolumeLabel = self.find_widget(QtWidgets.QLabel , 'confirmVolumeLabel' ) self.confirmDuration = self.find_widget(QtWidgets.QSlider , 'confirmDurationSlider' ) self.confirmDurationLabel = self.find_widget(QtWidgets.QLabel , 'confirmDurationLabel' ) self.confirmRate = self.find_widget(QtWidgets.QSlider , 'confirmRateSlider' ) self.confirmRateLabel = self.find_widget(QtWidgets.QLabel , 'confirmRateLabel' ) def _init_connections(self): """ initializes the widgets connections :return: none """ self.allReset .clicked.connect(self._init_widgets) self.allSend .clicked.connect(self.do_all_tx_data) self.ufDataSend .clicked.connect(self.do_uf_data) self.ufDataReset .clicked.connect(self.init_uf_data) self.ufSetVolume .valueChanged.connect(self.do_uf_data) self.ufTargetRate .valueChanged.connect(self.do_uf_data) self.ufVolumeDelivered .valueChanged.connect(self.do_uf_data) self.ufState .valueChanged.connect(self.do_uf_data) self.prSend.clicked.connect(self.do_pause_resume) self.prReset.clicked.connect(self.init_pause_resume) self.changeSend.clicked.connect(self.do_settings_change) self.changeReset.clicked.connect(self.init_settings_change) self.changeVolume.valueChanged.connect(lambda value: self.changeVolumeLabel.setText(f"{value/1000:.2f}")) self.changeDuration.valueChanged.connect(lambda value: self.changeDurationLabel.setText(f"{value}")) self.changeTimeDiff.valueChanged.connect(lambda value: self.changeTimeDiffLabel.setText(f"{value}")) self.changeRate.valueChanged.connect(lambda value: self.changeRateLabel.setText(f"{value/1000:.2f}")) self.changeRateDiff.valueChanged.connect(lambda value: self.changeRateDiffLabel.setText(f"{value/1000:.2f}")) self.changeOldRate.valueChanged.connect(lambda value: self.changeOldRateLabel.setText(f"{value/1000:.2f}")) self.confirmSend.clicked.connect(self.do_confirm) self.confirmReset.clicked.connect(self.init_confirm) self.confirmVolume.valueChanged.connect(lambda value: self.confirmVolumeLabel.setText(f"{value/1000:.2f}")) self.confirmDuration.valueChanged.connect(lambda value: self.confirmDurationLabel.setText(f"{value}")) self.confirmRate.valueChanged.connect(lambda value: self.confirmRateLabel.setText(f"{value/1000:.2f}")) @Slot() def _init_widgets(self): """ initializes the widgets' properties :return: none """ self.init_uf_data() self.init_settings_change() self.init_pause_resume() self.init_confirm() @Slot() def init_uf_data(self): """ slot for initializing UF settings change :return: none """ self.ufSetVolume .setValue(0) self.ufTargetRate .setValue(0) self.ufVolumeDelivered .setValue(0) self.ufState .setValue(0) @Slot() def do_uf_data(self): """ the slot for UF Data :return: none """ set_volume = self.ufSetVolume .value() / self.uf_scaling target_rate = self.ufTargetRate .value() / self.uf_scaling volume_delivered = self.ufVolumeDelivered.value() / self.uf_scaling state = self.ufState .value() self.ufSetVolumeLabel .setText(f"{set_volume:.2f}" ) self.ufTargetRateLabel .setText(f"{target_rate:.2f}" ) self.ufVolumeDeliveredLabel .setText(f"{volume_delivered:.2f}" ) self.td_interface.td_ultrafiltration( set_volume , target_rate , volume_delivered , state ) @Slot() def init_settings_change(self): """ slot for initializing UF settings change :return: none """ self.changeRequestLabel .setText("--") self.changeAccepted .setChecked(True) self.changeRejectReason .setValue(0) self.changeVolume .setValue(0) self.changeDuration .setValue(0) self.changeTimeDiff .setValue(0) self.changeRate .setValue(0) self.changeRateDiff .setValue(0) self.changeOldRate .setValue(0) @Slot() def handle_settings_change_request(self, message, timestamp = 0.0): """ Called when the user requests UF settings change to firmware from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_float(message, index) self.changeRequestLabel.setText(f"{value:.0f}") @Slot() def do_settings_change(self): """ slot for UF settings change :return: none """ payload = conversions.integer_to_bytearray(int(1 if self.changeAccepted.isChecked() else 0)) payload += conversions.integer_to_bytearray(self.changeRejectReason.value()) payload += conversions.float_to_bytearray(self.changeVolume.value()) payload += conversions.integer_to_bytearray(self.changeDuration.value()) payload += conversions.integer_to_bytearray(self.changeTimeDiff.value()) payload += conversions.float_to_bytearray(self.changeRate.value()/1000) payload += conversions.float_to_bytearray(self.changeRateDiff.value()/1000) payload += conversions.float_to_bytearray(self.changeOldRate.value()/1000) self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_RESP_ULTRAFILTRATION_VOLUME_TO_VALIDATE.value, accepted = 1 if self.changeAccepted.isChecked() else 0, reason = self.changeRejectReason.value(), is_pure_data = False, has_parameters = True, parameters_payload = payload ) @Slot() def init_pause_resume(self): """ slot for initializing UF pause/resume :return: none """ self.prRequestLabel .setText("--") self.prAccepted .setChecked(True) self.prRejectReason .setValue(0) @Slot() def handle_pause_resume_request(self, message, timestamp = 0.0): """ Called when the user requests pause/resume to firmware from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 state,index = conversions.bytearray_to_integer(message, index) self.prRequestLabel.setText(f"Resume({state})" if state else f"Pause({state})") @Slot() def do_pause_resume(self): """ slot for UF pause/resume :return: none """ self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_UF_PAUSE_RESUME_RESPONSE.value, accepted = 1 if self.prAccepted.isChecked() else 0, reason = self.prRejectReason.value() ) @Slot() def init_confirm(self): """ slot for initializing UF confirm :return: none """ self.confirmAccepted .setChecked(True) self.confirmRejectReason .setValue(0) self.confirmVolume .setValue(0) self.confirmDuration .setValue(0) self.confirmRate .setValue(0) @Slot() def handle_confirm_request(self, message, timestamp = 0.0): """ Called when the user requests confirm to firmware from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_float(message, index) self.confirmRequestLabel.setText(f"{value:.0f}") @Slot() def do_confirm(self): """ slot for UF confirm :return: none """ payload = conversions.integer_to_bytearray(int(1 if self.confirmAccepted.isChecked() else 0)) payload += conversions.integer_to_bytearray(self.confirmRejectReason.value()) payload += conversions.float_to_bytearray(self.confirmVolume.value()) payload += conversions.integer_to_bytearray(self.confirmDuration.value()) payload += conversions.float_to_bytearray(self.confirmRate.value()/1000) self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_USER_UF_SETTINGS_CHANGE_CONFIRMATION_RESPONSE.value, accepted = 1 if self.confirmAccepted.isChecked() else 0, reason = self.confirmRejectReason.value(), is_pure_data = False, has_parameters = True, parameters_payload = payload ) @Slot() def do_all_tx_data(self): """ slot for treatment set point Data :return: none """ self.do_uf_data() self.do_settings_change() self.do_pause_resume() self.do_confirm()