""" TD Rinseback UI loader """ # Python import os import can import struct # Qt from PySide2 import QtCore, QtWidgets from PySide2.QtCore import Slot # parent from engine.dynamicloader import DynamicLoader # plugin specific from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols import CAN from leahi_dialin.utils import conversions # td simulator from leahi_dialin.ui.td_messaging import TD_Messaging class Loader(DynamicLoader): """ TD Rinseback UI loader """ def __init__(self): self.td_interface = TD_Messaging() self.can_interface = self.td_interface.can_interface super().__init__(os.path.dirname(__file__)) if self.can_interface is not None: self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_RINSEBACK_CMD_REQUEST.value, self.handle_rinseback_cmd_request) def _init_loader(self): """ finds and creates widgets :return: none """ self.cmdStrings = [ "Confirm Start", "Increase Rate", "Decrease Rate", "Pause", "Resume", "End", "Additional", "Confirm Disconnect", "End Treatment", "Back to Treatment", ] self.tbProgReset = self.find_widget(QtWidgets.QToolButton , 'tbProgReset' ) self.tbProgSend = self.find_widget(QtWidgets.QToolButton , 'tbProgSend' ) self.slTargetVolume = self.find_widget(QtWidgets.QSlider , 'slTargetVolume' ) self.lbTargetVolume = self.find_widget(QtWidgets.QLabel , 'lbTargetVolume' ) self.slDeliveredVolume = self.find_widget(QtWidgets.QSlider , 'slDeliveredVolume' ) self.lbDeliveredVolume = self.find_widget(QtWidgets.QLabel , 'lbDeliveredVolume' ) self.slFlowRate = self.find_widget(QtWidgets.QSlider , 'slFlowRate' ) self.lbFlowRate = self.find_widget(QtWidgets.QLabel , 'lbFlowRate' ) self.slTimeout = self.find_widget(QtWidgets.QSlider , 'slTimeout' ) self.lbTimeout = self.find_widget(QtWidgets.QLabel , 'lbTimeout' ) self.slCountdown = self.find_widget(QtWidgets.QSlider , 'slCountdown' ) self.lbCountdown = self.find_widget(QtWidgets.QLabel , 'lbCountdown' ) self.cbCompleted = self.find_widget(QtWidgets.QCheckBox , 'cbCompleted' ) self.tbReqReset = self.find_widget(QtWidgets.QToolButton , 'tbReqReset' ) self.lbReqCommand = self.find_widget(QtWidgets.QLabel , 'lbReqCommand' ) self.tbRspReset = self.find_widget(QtWidgets.QToolButton , 'tbRspReset' ) self.tbRspSend = self.find_widget(QtWidgets.QToolButton , 'tbRspSend' ) self.sbRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbRspRejectReason' ) def _init_connections(self): """ initializes the widget's connections :return: none """ self.tbProgReset .clicked.connect ( self.init_rinseback_progress ) self.tbProgSend .clicked.connect ( self.do_rinseback_progress ) self.slTargetVolume .valueChanged.connect ( self.do_rinseback_progress ) self.slTargetVolume .valueChanged.connect ( lambda value: self.lbTargetVolume.setText(f"{value}") ) self.slDeliveredVolume .valueChanged.connect ( self.do_rinseback_progress ) self.slDeliveredVolume .valueChanged.connect ( lambda value: self.lbDeliveredVolume.setText(f"{value}") ) self.slFlowRate .valueChanged.connect ( self.do_rinseback_progress ) self.slFlowRate .valueChanged.connect ( lambda value: self.lbFlowRate.setText(f"{value}") ) self.slTimeout .valueChanged.connect ( self.do_rinseback_progress ) self.slTimeout .valueChanged.connect ( lambda value: self.lbTimeout.setText(f"{value}") ) self.slCountdown .valueChanged.connect ( self.do_rinseback_progress ) self.slCountdown .valueChanged.connect ( lambda value: self.lbCountdown.setText(f"{value}") ) self.cbCompleted .stateChanged.connect ( self.do_rinseback_progress ) self.tbReqReset .clicked.connect ( self.init_rinseback_cmd_request ) self.tbRspReset .clicked.connect ( self.init_rinseback_cmd_response ) self.tbRspSend .clicked.connect ( self.do_rinseback_cmd_response ) @Slot() def _init_widgets(self): """ initializes the widget's properties :return: none """ self.init_rinseback_progress() self.init_rinseback_cmd_request() self.init_rinseback_cmd_response() @Slot() def init_rinseback_progress(self): self.slTargetVolume .setValue ( 0 ) self.slDeliveredVolume .setValue ( 0 ) self.slFlowRate .setValue ( 0 ) self.slTimeout .setValue ( 0 ) self.slCountdown .setValue ( 0 ) self.cbCompleted .setChecked ( False ) @Slot() def do_rinseback_progress(self): """ the slot for sending rinseback progress data :return: none """ self.td_interface.td_rinseback_progress( self.slTargetVolume .value(), self.slDeliveredVolume .value(), self.slFlowRate .value(), self.slTimeout .value(), self.slCountdown .value(), self.cbCompleted .isChecked() ) @Slot() def init_rinseback_cmd_request(self): self.lbReqCommand .setText ("--") @Slot() def handle_rinseback_cmd_request(self, message, timestamp = 0.0): """ Called when the user sends a rinseback command request to firmware from UI @return: None """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_integer(message, index) cmdString = self.cmdStrings[value] if value >= 0 and value < len(self.cmdStrings) else "" self.lbReqCommand.setText(f"{cmdString} ({value})") @Slot() def init_rinseback_cmd_response(self): self.sbRspRejectReason .setValue (0) @Slot() def do_rinseback_cmd_response(self): payload = conversions.integer_to_bytearray(int(0 if self.sbRspRejectReason.value() == 0 else 1)) payload += conversions.integer_to_bytearray(self.sbRspRejectReason.value()) self.td_interface.cmd_send_general_response( message_id = MsgIds.MSG_ID_TD_RINSEBACK_CMD_RESPONSE.value, accepted = 0 if self.sbRspRejectReason.value() == 0 else 1, reason = self.sbRspRejectReason.value(), is_pure_data = False, has_parameters = True, parameters_payload = payload )