""" TD Isolated UF UI Loader """ # Python import os import can import struct import datetime # Qt from PySide2 import QtCore, QtWidgets from PySide2.QtCore import Slot # parent from engine.dynamicloader import DynamicLoader # plugin specific from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols import CAN from leahi_dialin.utils import conversions # td Simulator from leahi_dialin.ui.td_messaging import TD_Messaging class Loader(DynamicLoader): """ TD Isolated UF UI Loader """ def __init__(self): self.td_interface = TD_Messaging() self.can_interface = self.td_interface.can_interface super().__init__(os.path.dirname(__file__)) if self.can_interface is not None: self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_DURATION_CHANGE_REQUEST.value, self.handle_duration_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_VOLUME_GOAL_CHANGE_REQUEST.value, self.handle_volume_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_CONFIRM_REQUEST.value, self.handle_confirm_request) def _init_loader(self): """ finds and creates widgets :return: none """ self.tbDurationRspSend = self.find_widget(QtWidgets.QToolButton , 'tbDurationRspSend' ) self.tbDurationRspReset = self.find_widget(QtWidgets.QToolButton , 'tbDurationRspReset' ) self.lbDurationReq = self.find_widget(QtWidgets.QLabel , 'lbDurationReq' ) self.sbDurationRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbDurationRspRejectReason' ) self.slDurationRspVolumeMax = self.find_widget(QtWidgets.QSlider , 'slDurationRspVolumeMax' ) self.lbDurationRspVolumeMax = self.find_widget(QtWidgets.QLabel , 'lbDurationRspVolumeMax' ) self.tbVolumeRspSend = self.find_widget(QtWidgets.QToolButton , 'tbVolumeRspSend' ) self.tbVolumeRspCopy = self.find_widget(QtWidgets.QToolButton , 'tbVolumeRspCopy' ) self.tbVolumeRspReset = self.find_widget(QtWidgets.QToolButton , 'tbVolumeRspReset' ) self.lbVolumeReqParams = self.find_widget(QtWidgets.QLabel , 'lbVolumeReqParams' ) self.sbVolumeRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbVolumeRspRejectReason' ) self.slVolumeRspVolume = self.find_widget(QtWidgets.QSlider , 'slVolumeRspVolume' ) self.lbVolumeRspVolume = self.find_widget(QtWidgets.QLabel , 'lbVolumeRspVolume' ) self.slVolumeRspDuration = self.find_widget(QtWidgets.QSlider , 'slVolumeRspDuration' ) self.lbVolumeRspDuration = self.find_widget(QtWidgets.QLabel , 'lbVolumeRspDuration' ) self.slVolumeRspRate = self.find_widget(QtWidgets.QSlider , 'slVolumeRspRate' ) self.lbVolumeRspRate = self.find_widget(QtWidgets.QLabel , 'lbVolumeRspRate' ) self.tbConfirmRspSend = self.find_widget(QtWidgets.QToolButton , 'tbConfirmRspSend' ) self.tbConfirmRspReset = self.find_widget(QtWidgets.QToolButton , 'tbConfirmRspReset' ) self.lbConfirmReqTimestamp = self.find_widget(QtWidgets.QLabel , 'lbConfirmReqTimestamp' ) self.sbConfirmRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbConfirmRspRejectReason' ) def _init_connections(self): """ initializes the widgets connections :return: none """ self.tbDurationRspSend .clicked .connect(self.do_duration_response) self.tbDurationRspReset .clicked .connect(self.init_duration_response) self.slDurationRspVolumeMax .valueChanged .connect(lambda value: self.lbDurationRspVolumeMax.setText(f"{value}")) self.tbVolumeRspSend .clicked .connect(self.do_volume_response) self.tbVolumeRspCopy .clicked .connect(self.copy_volume_response) self.tbVolumeRspReset .clicked .connect(self.init_volume_response) self.slVolumeRspVolume .valueChanged .connect(lambda value: self.lbVolumeRspVolume.setText(f"{value}")) self.slVolumeRspDuration .valueChanged .connect(lambda value: self.lbVolumeRspDuration.setText(f"{value}")) self.slVolumeRspRate .valueChanged .connect(lambda value: self.lbVolumeRspRate.setText(f"{value}")) self.tbConfirmRspSend .clicked .connect(self.do_confirm_response) self.tbConfirmRspReset .clicked .connect(self.init_confirm_response) @Slot() def _init_widgets(self): """ initializes the widgets' properties :return: none """ self.init_duration_response() self.init_volume_response() self.init_confirm_response() @Slot() def init_duration_response(self): """ slot for initializing isolated UF duration change response :return: none """ self.lbDurationReq .setText("-- --") self.sbDurationRspRejectReason .setValue(0) self.slDurationRspVolumeMax .setValue(0) @Slot() def handle_duration_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF duration change :return: none """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_integer(message, index) self.lbDurationReq.setText(f"{value:.0f}") @Slot() def do_duration_response(self): """ slot for sending isolated UF duration change response :return: none """ self.td_interface.td_isolated_uf_duration_change_response( vRejectionReason = self.sbDurationRspRejectReason.value(), vVolumeMax = self.slDurationRspVolumeMax.value() ) @Slot() def init_volume_response(self): """ slot for initializing isolated UF volume goal response :return: none """ self.lbVolumeReqParams .setText("-- --") self.sbVolumeRspRejectReason .setValue(0) self.slVolumeRspVolume .setValue(0) self.slVolumeRspDuration .setValue(0) self.slVolumeRspRate .setValue(0) @Slot() def handle_volume_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF volume goal change :return: none """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 self.reqDuration,index = conversions.bytearray_to_integer(message, index, False) self.reqVolume,index = conversions.bytearray_to_integer(message, index, False) self.lbVolumeReqParams.setText(f"Duration (min) = {self.reqDuration}, Volume (mL) = {self.reqVolume}") @Slot() def copy_volume_response(self): """ slot for copying the isolated UF volume goal from the request to the response :return: none """ self.slVolumeRspVolume .setValue(self.reqVolume) self.slVolumeRspDuration .setValue(self.reqDuration) @Slot() def do_volume_response(self): """ slot for sending isolated UF volume goal change response :return: none """ payload = conversions.integer_to_bytearray(1 if self.sbVolumeRspRejectReason.value() == 0 else 0) payload += conversions.integer_to_bytearray(self.sbVolumeRspRejectReason.value()) payload += conversions.integer_to_bytearray(self.slVolumeRspVolume.value()) payload += conversions.integer_to_bytearray(self.slVolumeRspDuration.value()) payload += conversions.integer_to_bytearray(self.slVolumeRspRate.value()) self.td_interface.td_isolated_uf_volume_change_response( vRejectionReason = self.sbVolumeRspRejectReason.value(), vVolume = self.slVolumeRspVolume.value(), vDuration = self.slVolumeRspDuration.value(), vRate = self.slVolumeRspRate.value() ) @Slot() def init_confirm_response(self): """ slot for initializing isolated UF confirm response :return: none """ self.lbConfirmReqTimestamp .setText("-- --") self.sbConfirmRspRejectReason .setValue(0) @Slot() def handle_confirm_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF confirmation :return: none """ message = message['message'] self.lbConfirmReqTimestamp.setText(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") @Slot() def do_confirm_response(self): """ slot for sending isolated UF confirm response :return: none """ self.td_interface.td_isolated_uf_confirm_response( vRejectionReason = self.sbConfirmRspRejectReason.value() )