from enum import Enum from DialityCoreCanProtocol import DenaliMessage from DialityCoreCanProtocol import DenaliChannels import sys class DialOutStates(Enum): STOP = 0 RUN = 1 PAUSE = 2 class HD_DialOut: """ :class HD_Dialout Encapsulates command for testing dialout flow module. """ can = None # Command message ID MSG_ID_SET_DIALOUT_FLOW_STATE = 0x8017 MSG_ID_SET_DIALOUT_FLOW_RX = 0x8018 # Broadcast message ID MSG_ID_DIALYSATE_UF_DATA = 0x0009 DIALOUT_STATES = list(DialOutStates) # variable storing information received by Handler def __init__(self, can_interface): """ Constructor for the HD_Dialout class :param can_interface: reference to DenaliMessenger """ self.__can_interface = can_interface self.__can_interface.registerReceivingPublicationFunction(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=self.MSG_ID_DIALYSATE_UF_DATA, function=self.receiveDialysateUFDataHandler) self.DialOutBroadcast = {'state': 'None', 'target_volume': 0, 'measured_volume': 0, 'pwm': 0} def setUFState(self, new_state): """ setUFState function sets the new states to STOP, RUN or PAUSE :param new_state: from DialOutStates.STOP, DialOutStates.RUN or DialoutStates.PAUSE :return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None if new_state in self.DIALOUT_STATES: payload = [new_state.value] msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_SET_DIALOUT_FLOW_STATE, payload=payload) received_msg = self.__can_interface.send(msg) else: print("Argument to setUFState method are incorrect", file=sys.stderr) if received_msg is not None: return_value = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return return_value def setUFRx(self, rx_total_volume_ml, rx_time_min, rx_flow_rate): """ setUFRx function sets prescription for UF. Total Volume, time in minutes and flow rate :param rx_total_volume_ml (integer) is total volume in ml :param rx_time_min (integer) is rx time :param rx_flow_rate (integer) is described flow rate :return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None if isinstance(rx_total_volume_ml, int) and isinstance(rx_time_min, int) and \ isinstance(rx_flow_rate, int): payload = list(rx_total_volume_ml.to_bytes(2, DenaliMessage.BYTE_ORDER)) payload += list(rx_time_min.to_bytes(2, DenaliMessage.BYTE_ORDER)) payload += list(rx_flow_rate.to_bytes(2, DenaliMessage.BYTE_ORDER)) msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_SET_DIALOUT_FLOW_RX, payload=payload) received_msg = self.__can_interface.send(msg) else: print("Arguments to setUFRx are incorrect", file=sys.stderr) if received_msg is not None: return_value = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return return_value def receiveDialysateUFDataHandler(self, message): payload = DenaliMessage.getPayload(message) state_num = int.from_bytes(bytearray(payload[0:2]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['state'] = DialOutStates(state_num).name self.DialOutBroadcast['target_volume'] = int.from_bytes(bytearray(payload[2:4]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['measured_volume'] = int.from_bytes(bytearray(payload[4:6]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['pwm'] = int.from_bytes(bytearray(payload[6:8]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)