from enum import Enum from DialityCoreCanProtocol import DenaliMessage from DialityCoreCanProtocol import DenaliChannels import matplotlib.pyplot as plt import sys import numpy as np class DialOutStates(Enum): """ Defines dialout states, i.e., STOP, RUN and PAUSE """ STOP = 0 RUN = 1 PAUSE = 2 class HD_DialOut: """ Encapsulates command for testing dialout flow module. """ can = None # Command message ID MSG_ID_SET_DIALOUT_FLOW_STATE = 0x8017 MSG_ID_SET_DIALOUT_FLOW_RX = 0x8018 # Broadcast message ID MSG_ID_DIALYSATE_UF_DATA = 0x0009 DIALOUT_STATES = list(DialOutStates) # variable storing information received by Handler def __init__(self, can_interface): """ Constructor for the HD_Dialout class \param can_interface: reference to DenaliMessenger """ self.__can_interface = can_interface self.__can_interface.registerReceivingPublicationFunction(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=self.MSG_ID_DIALYSATE_UF_DATA, function=self.receiveDialysateUFDataHandler) ## DialOutBroadcast a dictionary storing latest broadcast values of the following keys: # state, target_volume, measured_volume, pwm, motor_current, motor_speed. self.DialOutBroadcast = {'state': 'None', 'target_volume': 0, 'measured_volume': 0, 'pwm': 0, 'motor_current': 0, 'motor_speed': 0} self.__BroadCastSignals = {'time': [], 'state': [], 'target_volume': [], 'measured_volume': [], 'pwm': []} self.__time = 0 def setUFState(self, new_state): """ setUFState function sets the new states to STOP, RUN or PAUSE \param new_state: from DialOutStates.STOP, DialOutStates.RUN or DialoutStates.PAUSE \return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None if new_state in self.DIALOUT_STATES: payload = [new_state.value] msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_SET_DIALOUT_FLOW_STATE, payload=payload) received_msg = self.__can_interface.send(msg) else: print("Argument to setUFState method are incorrect", file=sys.stderr) if received_msg is not None: return_value = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return return_value def setUFRx(self, rx_total_volume_ml, rx_time_min, rx_flow_rate): """ setUFRx function sets prescription for UF. Total Volume, time in minutes and flow rate \param rx_total_volume_ml (integer) is total volume in ml \param rx_time_min (integer) is rx time \param rx_flow_rate (integer) is described flow rate \return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None if isinstance(rx_total_volume_ml, int) and isinstance(rx_time_min, int) and \ isinstance(rx_flow_rate, int): payload = list(rx_total_volume_ml.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER, signed=False)) payload += list(rx_time_min.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER, signed=False)) payload += list(rx_flow_rate.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER, signed=False)) msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_SET_DIALOUT_FLOW_RX, payload=payload) received_msg = self.__can_interface.send(msg) print(received_msg) else: print("Arguments to setUFRx are incorrect", file=sys.stderr) if received_msg is not None: return_value = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return return_value def receiveDialysateUFDataHandler(self, message): """ Receiver method that handles UF broadcast data from the HD. \param message: Denali message received \return: None """ payload = DenaliMessage.getPayload(message) state_num = int.from_bytes(bytearray(payload[0:2]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['state'] = DialOutStates(state_num).name self.DialOutBroadcast['target_volume'] = int.from_bytes(bytearray(payload[2:4]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['measured_volume'] = np.int16(int.from_bytes(bytearray(payload[4:6]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)) self.DialOutBroadcast['pwm'] = np.int16(int.from_bytes(bytearray(payload[6:8]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)) self.DialOutBroadcast['motor_current'] = np.int16(int.from_bytes(bytearray(payload[8:10]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)) self.DialOutBroadcast['motor_speed'] = np.int16(int.from_bytes(bytearray(payload[10:12]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)) self.__BroadCastSignals['time'].append(self.__time) self.__BroadCastSignals['state'].append(state_num * 100) self.__BroadCastSignals['target_volume'].append(self.DialOutBroadcast['target_volume']) self.__BroadCastSignals['measured_volume'].append(self.DialOutBroadcast['measured_volume']) self.__BroadCastSignals['pwm'].append(self.DialOutBroadcast['pwm']) self.__time += 1 def plotBroadCastSignals(self): """ Temporary method to demo STOP, PLAY and PAUSE behavior with simulator. \return: None """ time = self.__BroadCastSignals['time'] state = self.__BroadCastSignals['state'] target = self.__BroadCastSignals['target_volume'] volume = self.__BroadCastSignals['measured_volume'] pwm = self.__BroadCastSignals['pwm'] plt.plot(time, state, 'o', time, target, 'b-', time, volume, 'b--', time, pwm, 'g--') plt.legend(('state', 'target vol', 'act vol', 'pwm')) plt.xlabel("Seconds") plt.title("Demo for the outlet pump control") plt.show()