########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file concentrate_pumps.py # # @author (last) Quang Nguyen # @date (last) 29-Oct-2020 # @author (original) Quang Nguyen # @date (original) 29-Oct-2020 # ############################################################################ import struct from .constants import RESET,NO_RESET from ..utils.conversions import integer_to_bytearray, float_to_bytearray from ..utils.checks import check_broadcast_interval_override_ms from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish, DialinEnum from ..common.msg_defs import MsgIds from logging import Logger class ConcentratePumpsEnum(DialinEnum): CP1 = 0 CP2 = 1 class ConcentratePumps(_AbstractSubSystem): """ ConcentratePumps Dialysate Generator (DG) Dialin API sub-class for concentrate pumps related commands. """ # Conductivity sensor broadcast message field positions START_POS_CP1_TARGET = DenaliMessage.PAYLOAD_START_INDEX END_POS_CP1_TARGET = START_POS_CP1_TARGET + 4 START_POS_CP1 = END_POS_CP1_TARGET END_POS_CP1 = START_POS_CP1 + 4 START_POS_CP2_TARGET = END_POS_CP1 END_POS_CP2_TARGET = START_POS_CP2_TARGET + 4 START_POS_CP2 = END_POS_CP2_TARGET END_POS_CP2 = START_POS_CP2 + 4 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_CONCENTRATE_PUMP_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_concentrate_pumps_sync) self.concentrate_pump_cp1_current_set_speed = 0.0 self.concentrate_pump_cp1_measured_speed = 0.0 self.concentrate_pump_cp2_current_set_speed = 0.0 self.concentrate_pump_cp2_measured_speed = 0.0 def get_concentrate_pumps(self): """ Gets the current concentrate pump data value @return: List containing concentrate pump data values: [ concentrate_pump_cp1_current_set_speed, concentrate_pump_cp1_measured_speed, concentrate_pump_cp2_current_set_speed, concentrate_pump_cp2_measured_speed ] """ return [self.concentrate_pump_cp1_current_set_speed, self.concentrate_pump_cp1_measured_speed, self.concentrate_pump_cp2_current_set_speed, self.concentrate_pump_cp2_measured_speed] @_publish(["concentrate_pump_cp1_current_set_speed", "concentrate_pump_cp1_measured_speed", "concentrate_pump_cp2_current_set_speed", "concentrate_pump_cp2_measured_speed"]) def _handler_concentrate_pumps_sync(self, message): """ Handles published concentrate pumps' data messages. Concentrate pumps' speed data are captured for reference. @param message: published concentrate pumps' data message @return: None """ cp1_current = struct.unpack('f', bytearray(message['message'][self.START_POS_CP1_TARGET:self.END_POS_CP1_TARGET])) cp1_measured = struct.unpack('f', bytearray(message['message'][self.START_POS_CP1:self.END_POS_CP1])) cp2_current = struct.unpack('f', bytearray(message['message'][self.START_POS_CP2_TARGET:self.END_POS_CP2_TARGET])) cp2_measured = struct.unpack('f', bytearray(message['message'][self.START_POS_CP2:self.END_POS_CP2])) self.concentrate_pump_cp1_current_set_speed = cp1_current[0] self.concentrate_pump_cp1_measured_speed = cp1_measured[0] self.concentrate_pump_cp2_current_set_speed = cp2_current[0] self.concentrate_pump_cp2_measured_speed = cp2_measured[0] def cmd_concentrate_pump_state_change_request(self, pump_id: int, on: bool = False) -> int: """ Constructs and sends the concentrate pump state change request command @param pump_id: unsigned int - concentrate pump ID @param on: bool - 1 to turn on, 0 to turn off @return: 1 if successful, zero otherwise Concentrate pump IDs: \n 0 = CP1 \n 1 = CP2 \n """ payload = integer_to_bytearray(0) + integer_to_bytearray(on) + integer_to_bytearray(pump_id) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_CONCENTRATE_PUMP_STATE_CHANGE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) if on: self.logger.debug("Requested to turn on concentrate pump: CP" + str(pump_id)) else: self.logger.debug("Requested to turn off concentrate pump: CP" + str(pump_id)) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def cmd_concentrate_pump_target_speed_override(self, pump_id: int, speed: float) -> int: """ Constructs and sends the concentrate pump target speed override command @param pump_id: unsigned int - concentrate pump ID @param speed: float - target speed value to override concentrate pump with @return: 1 if successful, zero otherwise Concentrate pump IDs: \n 0 = CP1 \n 1 = CP2 \n """ reset_byte_array = integer_to_bytearray(NO_RESET) speed_byte_array = float_to_bytearray(speed) pump_id_byte_array = integer_to_bytearray(pump_id) payload = reset_byte_array + speed_byte_array + pump_id_byte_array message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_CONCENTRATE_PUMP_TARGET_SPEED_OVERRIDE.value, payload=payload) self.logger.debug("override target speed: " + str(speed) + " - for pump: " + str(pump_id)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def cmd_concentrate_pump_measured_speed_override(self, pump_id: int, speed: float, reset: int = NO_RESET) -> int: """ Constructs and sends the concentrate pump measured speed override command @param pump_id: unsigned int - concentrate pump ID @param speed: float - measured speed value to override concentrate pump with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise Concentrate pump IDs: \n 0 = CP1 \n 1 = CP2 \n """ reset_byte_array = integer_to_bytearray(reset) speed_byte_array = float_to_bytearray(speed) pump_id_byte_array = integer_to_bytearray(pump_id) payload = reset_byte_array + speed_byte_array + pump_id_byte_array message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_CONCENTRATE_PUMP_MEASURED_SPEED_OVERRIDE.value, payload=payload) if reset == RESET: self.logger.debug("reset back to normal value for pump: " + str(pump_id)) else: self.logger.debug("override measured speed: " + str(speed) + " - for pump: " + str(pump_id)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def cmd_concentrate_pump_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the concentrate pump data broadcast interval override command Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False reset_byte_array = integer_to_bytearray(reset) ms_byte_array = integer_to_bytearray(ms) payload = reset_byte_array + ms_byte_array message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_CONCENTRATE_PUMP_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override DG concentrate pump data broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False