""" TD Isolated UF UI Loader """ # Python import os import can import struct import datetime # Qt from PySide2 import QtCore, QtWidgets from PySide2.QtCore import Slot # parent from engine.dynamicloader import DynamicLoader # plugin specific from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols import CAN from leahi_dialin.utils import conversions # td Simulator from leahi_dialin.ui.td_messaging import TD_Messaging class Loader(DynamicLoader): """ TD Isolated UF UI Loader """ def __init__(self): self.td_interface = TD_Messaging() self.can_interface = self.td_interface.can_interface super().__init__(os.path.dirname(__file__)) if self.can_interface is not None: self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_DURATION_CHANGE_REQUEST.value, self.handle_duration_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_VOLUME_GOAL_CHANGE_REQUEST.value, self.handle_volume_goal_request) self.can_interface.register_receiving_publication_function( CAN.DenaliChannels.ui_to_td_ch_id, MsgIds.MSG_ID_UI_ISOLATED_UF_CONFIRM_REQUEST.value, self.handle_confirm_request) def _init_loader(self): """ finds and creates widgets :return: none """ self.tbDurationRspSend = self.find_widget(QtWidgets.QToolButton , 'tbDurationRspSend' ) self.tbDurationRspReset = self.find_widget(QtWidgets.QToolButton , 'tbDurationRspReset' ) self.lbDurationReq = self.find_widget(QtWidgets.QLabel , 'lbDurationReq' ) self.sbDurationRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbDurationRspRejectReason' ) self.slDurationRspVolumeGoalMax = self.find_widget(QtWidgets.QSlider , 'slDurationRspVolumeGoalMax' ) self.lbDurationRspVolumeGoalMax = self.find_widget(QtWidgets.QLabel , 'lbDurationRspVolumeGoalMax' ) self.tbVolumeGoalRspSend = self.find_widget(QtWidgets.QToolButton , 'tbVolumeGoalRspSend' ) self.tbVolumeGoalRspCopy = self.find_widget(QtWidgets.QToolButton , 'tbVolumeGoalRspCopy' ) self.tbVolumeGoalRspReset = self.find_widget(QtWidgets.QToolButton , 'tbVolumeGoalRspReset' ) self.lbVolumeGoalReqParams = self.find_widget(QtWidgets.QLabel , 'lbVolumeGoalReqParams' ) self.sbVolumeGoalRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbVolumeGoalRspRejectReason' ) self.slVolumeGoalRspVolumeGoal = self.find_widget(QtWidgets.QSlider , 'slVolumeGoalRspVolumeGoal' ) self.lbVolumeGoalRspVolumeGoal = self.find_widget(QtWidgets.QLabel , 'lbVolumeGoalRspVolumeGoal' ) self.slVolumeGoalRspDuration = self.find_widget(QtWidgets.QSlider , 'slVolumeGoalRspDuration' ) self.lbVolumeGoalRspDuration = self.find_widget(QtWidgets.QLabel , 'lbVolumeGoalRspDuration' ) self.slVolumeGoalRspRate = self.find_widget(QtWidgets.QSlider , 'slVolumeGoalRspRate' ) self.lbVolumeGoalRspRate = self.find_widget(QtWidgets.QLabel , 'lbVolumeGoalRspRate' ) self.tbConfirmRspSend = self.find_widget(QtWidgets.QToolButton , 'tbConfirmRspSend' ) self.tbConfirmRspReset = self.find_widget(QtWidgets.QToolButton , 'tbConfirmRspReset' ) self.lbConfirmReqTimestamp = self.find_widget(QtWidgets.QLabel , 'lbConfirmReqTimestamp' ) self.sbConfirmRspRejectReason = self.find_widget(QtWidgets.QSpinBox , 'sbConfirmRspRejectReason' ) def _init_connections(self): """ initializes the widgets connections :return: none """ self.tbDurationRspSend .clicked .connect(self.do_duration_response) self.tbDurationRspReset .clicked .connect(self.init_duration_response) self.slDurationRspVolumeGoalMax .valueChanged .connect(lambda value: self.lbDurationRspVolumeGoalMax.setText(f"{value}")) self.tbVolumeGoalRspSend .clicked .connect(self.do_volume_goal_response) self.tbVolumeGoalRspCopy .clicked .connect(self.copy_volume_goal_response) self.tbVolumeGoalRspReset .clicked .connect(self.init_volume_goal_response) self.slVolumeGoalRspVolumeGoal .valueChanged .connect(lambda value: self.lbVolumeGoalRspVolumeGoal.setText(f"{value}")) self.slVolumeGoalRspDuration .valueChanged .connect(lambda value: self.lbVolumeGoalRspDuration.setText(f"{value}")) self.slVolumeGoalRspRate .valueChanged .connect(lambda value: self.lbVolumeGoalRspRate.setText(f"{value}")) self.tbConfirmRspSend .clicked .connect(self.do_confirm_response) self.tbConfirmRspReset .clicked .connect(self.init_confirm_response) @Slot() def _init_widgets(self): """ initializes the widgets' properties :return: none """ self.init_duration_response() self.init_volume_goal_response() self.init_confirm_response() @Slot() def init_duration_response(self): """ slot for initializing isolated UF duration change response :return: none """ self.lbDurationReq .setText("-- --") self.sbDurationRspRejectReason .setValue(0) self.slDurationRspVolumeGoalMax .setValue(0) @Slot() def handle_duration_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF duration change :return: none """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 value,index = conversions.bytearray_to_integer(message, index) self.lbDurationReq.setText(f"{value:.0f}") @Slot() def do_duration_response(self): """ slot for sending isolated UF duration change response :return: none """ self.td_interface.td_isolated_uf_duration_change_response( vRejectionReason = self.sbDurationRspRejectReason.value(), vVolumeGoalMax = self.slDurationRspVolumeGoalMax.value() ) @Slot() def init_volume_goal_response(self): """ slot for initializing isolated UF volume goal response :return: none """ self.lbVolumeGoalReqParams .setText("-- --") self.sbVolumeGoalRspRejectReason .setValue(0) self.slVolumeGoalRspVolumeGoal .setValue(0) self.slVolumeGoalRspDuration .setValue(0) self.slVolumeGoalRspRate .setValue(0) @Slot() def handle_volume_goal_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF volume goal change :return: none """ message = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 self.reqDuration,index = conversions.bytearray_to_integer(message, index, False) self.reqVolumeGoal,index = conversions.bytearray_to_integer(message, index, False) self.lbVolumeGoalReqParams.setText(f"Duration (min) = {self.reqDuration}, Volume Goal (mL) = {self.reqVolumeGoal}") @Slot() def copy_volume_goal_response(self): """ slot for copying the isolated UF volume goal from the request to the response :return: none """ self.slVolumeGoalRspVolumeGoal .setValue(self.reqVolumeGoal) self.slVolumeGoalRspDuration .setValue(self.reqDuration) @Slot() def do_volume_goal_response(self): """ slot for sending isolated UF volume goal change response :return: none """ payload = conversions.integer_to_bytearray(1 if self.sbVolumeGoalRspRejectReason.value() == 0 else 0) payload += conversions.integer_to_bytearray(self.sbVolumeGoalRspRejectReason.value()) payload += conversions.integer_to_bytearray(self.slVolumeGoalRspVolumeGoal.value()) payload += conversions.integer_to_bytearray(self.slVolumeGoalRspDuration.value()) payload += conversions.integer_to_bytearray(self.slVolumeGoalRspRate.value()) self.td_interface.td_isolated_uf_volume_goal_change_response( vRejectionReason = self.sbVolumeGoalRspRejectReason.value(), vVolumeGoal = self.slVolumeGoalRspVolumeGoal.value(), vDuration = self.slVolumeGoalRspDuration.value(), vRate = self.slVolumeGoalRspRate.value() ) @Slot() def init_confirm_response(self): """ slot for initializing isolated UF confirm response :return: none """ self.lbConfirmReqTimestamp .setText("-- --") self.sbConfirmRspRejectReason .setValue(0) @Slot() def handle_confirm_request(self, message, timestamp = 0.0): """ slot called when the user requests isolated UF confirmation :return: none """ message = message['message'] self.lbConfirmReqTimestamp.setText(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") @Slot() def do_confirm_response(self): """ slot for sending isolated UF confirm response :return: none """ self.td_interface.td_isolated_uf_confirm_response( vRejectionReason = self.sbConfirmRspRejectReason.value() )