Index: HD_DialOutFlow.py =================================================================== diff -u -r1d42a84787098eecf6f1117ef2169d735f35c194 -r55ee2778e43810b13382aed3a287edb06f332f69 --- HD_DialOutFlow.py (.../HD_DialOutFlow.py) (revision 1d42a84787098eecf6f1117ef2169d735f35c194) +++ HD_DialOutFlow.py (.../HD_DialOutFlow.py) (revision 55ee2778e43810b13382aed3a287edb06f332f69) @@ -7,15 +7,16 @@ class DialOutStates(Enum): + """ + Defines dialout states, i.e., STOP, RUN and PAUSE + """ STOP = 0 RUN = 1 PAUSE = 2 class HD_DialOut: """ - :class HD_Dialout - Encapsulates command for testing dialout flow module. """ can = None @@ -35,31 +36,33 @@ """ Constructor for the HD_Dialout class - :param can_interface: reference to DenaliMessenger + \param can_interface: reference to DenaliMessenger """ self.__can_interface = can_interface self.__can_interface.registerReceivingPublicationFunction(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=self.MSG_ID_DIALYSATE_UF_DATA, function=self.receiveDialysateUFDataHandler) + ## DialOutBroadcast a dictionary storing latest broadcast values of the following keys: + # state, target_volume, measured_volume, pwm, motor_current, motor_speed. self.DialOutBroadcast = {'state': 'None', 'target_volume': 0, 'measured_volume': 0, 'pwm': 0, 'motor_current': 0, 'motor_speed': 0} - self.BroadCastSignals = {'time': [], - 'state': [], - 'target_volume': [], - 'measured_volume': [], - 'pwm': []} - self.time = 0 + self.__BroadCastSignals = {'time': [], + 'state': [], + 'target_volume': [], + 'measured_volume': [], + 'pwm': []} + self.__time = 0 def setUFState(self, new_state): """ setUFState function sets the new states to STOP, RUN or PAUSE - :param new_state: from DialOutStates.STOP, DialOutStates.RUN or DialoutStates.PAUSE - :return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters + \param new_state: from DialOutStates.STOP, DialOutStates.RUN or DialoutStates.PAUSE + \return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None @@ -81,10 +84,10 @@ def setUFRx(self, rx_total_volume_ml, rx_time_min, rx_flow_rate): """ setUFRx function sets prescription for UF. Total Volume, time in minutes and flow rate - :param rx_total_volume_ml (integer) is total volume in ml - :param rx_time_min (integer) is rx time - :param rx_flow_rate (integer) is described flow rate - :return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters + \param rx_total_volume_ml (integer) is total volume in ml + \param rx_time_min (integer) is rx time + \param rx_flow_rate (integer) is described flow rate + \return: TRUE if confirmed, FALSE if HD had error or None for time out or incorrect parameters """ return_value = None received_msg = None @@ -110,39 +113,49 @@ return return_value def receiveDialysateUFDataHandler(self, message): + """ + Receiver method that handles UF broadcast data from the HD. + \param message: Denali message received + \return: None + """ payload = DenaliMessage.getPayload(message) - state_num = int.from_bytes(bytearray(payload[0:2]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) + state_num = int.from_bytes(bytearray(payload[0:2]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['state'] = DialOutStates(state_num).name self.DialOutBroadcast['target_volume'] = int.from_bytes(bytearray(payload[2:4]), byteorder=DenaliMessage.BYTE_ORDER, signed=True) self.DialOutBroadcast['measured_volume'] = np.int16(int.from_bytes(bytearray(payload[4:6]), - byteorder=DenaliMessage.BYTE_ORDER, signed=True)) + byteorder=DenaliMessage.BYTE_ORDER, + signed=True)) self.DialOutBroadcast['pwm'] = np.int16(int.from_bytes(bytearray(payload[6:8]), - byteorder=DenaliMessage.BYTE_ORDER, signed=True)) - self.DialOutBroadcast['motor_current'] = np.int16(int.from_bytes(bytearray(payload[8:10]), byteorder=DenaliMessage.BYTE_ORDER, signed=True)) + self.DialOutBroadcast['motor_current'] = np.int16(int.from_bytes(bytearray(payload[8:10]), + byteorder=DenaliMessage.BYTE_ORDER, + signed=True)) self.DialOutBroadcast['motor_speed'] = np.int16(int.from_bytes(bytearray(payload[10:12]), - byteorder=DenaliMessage.BYTE_ORDER, signed=True)) + byteorder=DenaliMessage.BYTE_ORDER, signed=True)) - self.BroadCastSignals['time'].append(self.time) - self.BroadCastSignals['state'].append(state_num*100) - self.BroadCastSignals['target_volume'].append(self.DialOutBroadcast['target_volume']) - self.BroadCastSignals['measured_volume'].append(self.DialOutBroadcast['measured_volume']) - self.BroadCastSignals['pwm'].append(self.DialOutBroadcast['pwm']) - self.time += 1 + self.__BroadCastSignals['time'].append(self.__time) + self.__BroadCastSignals['state'].append(state_num * 100) + self.__BroadCastSignals['target_volume'].append(self.DialOutBroadcast['target_volume']) + self.__BroadCastSignals['measured_volume'].append(self.DialOutBroadcast['measured_volume']) + self.__BroadCastSignals['pwm'].append(self.DialOutBroadcast['pwm']) + self.__time += 1 - def plotBroadCastSignals(self): + """ + Temporary method to demo STOP, PLAY and PAUSE behavior with simulator. + \return: None + """ - time = self.BroadCastSignals['time'] - state = self.BroadCastSignals['state'] - target = self.BroadCastSignals['target_volume'] - volume = self.BroadCastSignals['measured_volume'] - pwm = self.BroadCastSignals['pwm'] + time = self.__BroadCastSignals['time'] + state = self.__BroadCastSignals['state'] + target = self.__BroadCastSignals['target_volume'] + volume = self.__BroadCastSignals['measured_volume'] + pwm = self.__BroadCastSignals['pwm'] - plt.plot(time, state,'o', time, target, 'b-', time, volume, 'b--', time, pwm, 'g--') + plt.plot(time, state, 'o', time, target, 'b-', time, volume, 'b--', time, pwm, 'g--') plt.legend(('state', 'target vol', 'act vol', 'pwm')) plt.xlabel("Seconds")