########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file heaters.py # # @author (last) Peter Lucia # @date (last) 10-Nov-2020 # @author (original) Dara Navaei # @date (original) 29-May-2020 # ############################################################################ import struct from ..utils.conversions import integer_to_bytearray from ..utils.conversions import float_to_bytearray from ..common.msg_defs import MsgIds, MsgFieldPositions from .constants import NO_RESET from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish, DialinEnum from logging import Logger from enum import unique @unique class HeatersStartStop(DialinEnum): STOP = 0 START = 1 class Heaters(_AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for heaters related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.main_primary_heater_duty_cycle = 0 self.small_primary_heater_duty_cycle = 0 self.trimmer_heater_duty_cycle = 0 if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_HEATERS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_heaters_sync) def get_main_primary_heater_duty_cycle(self): """ Gets the main primary heater duty cycle @return: Main primary heater duty cycle """ return self.main_primary_heater_duty_cycle def get_small_primary_heater_duty_cycle(self): """ Gets the small primary heater duty cycle @return: Small primary heater duty cycle """ return self.small_primary_heater_duty_cycle def get_trimmer_heater_duty_cycle(self): """ Gets the trimmer heater duty cycle @return: Trimmer primary heater duty cycle """ return self.trimmer_heater_duty_cycle @_publish(["main_primary_heater_duty_cycle", "small_primary_heater_duty_cycle", "trimmer_heater_duty_cycle"]) def _handler_heaters_sync(self, message): """ Handles published heaters message @param message: published heaters data message @returns none """ main_primary_heater_pwm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) small_primary_heater_pwm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) trimmer_heater_pwm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) self.main_primary_heater_duty_cycle = main_primary_heater_pwm[0] self.small_primary_heater_duty_cycle = small_primary_heater_pwm[0] self.trimmer_heater_duty_cycle = trimmer_heater_pwm[0] def cmd_start_stop_primary_heater(self, state=HeatersStartStop.STOP.name): """ Constructs and sends a start/stop primary heater command Constraints: A target temperature for primary heater between 10 and 90 deg C must have been given to DG previously. @param state: (int) start/stop state of the primary heater. The default is stop. @returns none """ if state == HeatersStartStop.START.name: payload = integer_to_bytearray(1) operation = 'Turning on ' else: payload = integer_to_bytearray(0) operation = 'Turning off ' message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_START_STOP_PRIMARY_HEATER.value, payload=payload) self.logger.debug(operation + " the Primary heater") self.can_interface.send(message, 0) def cmd_start_stop_trimmer_heater(self, state=HeatersStartStop.STOP.name): """ Constructs and sends start trimmer heater command Constraints: A target temperature for trimmer heater between 10 and 90 deg C must have been given to DG previously. @param state: (int) start/stop state of the trimmer heater. The default is stop. @returns none """ if state == HeatersStartStop.START.name: payload = integer_to_bytearray(1) operation = 'Turning on ' else: payload = integer_to_bytearray(0) operation = 'Turning off' message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_START_STOP_TRIMMER_HEATER_CMD.value, payload=payload) self.logger.debug(operation + " the Primary heater") self.can_interface.send(message, 0) def cmd_set_dialysate_target_temperature(self, primary_target_temp=37.0, trimmer_target_temp=38.0): """ Constructs and sends primary and trimmer heater target temperature @param pirmary_target_temp: (float) Primary heater target temperature @param trimmer_target_temp: (float) Trimmer heater target temperature @returns none """ primary_target = float_to_bytearray(primary_target_temp) trimmer_target = float_to_bytearray(trimmer_target_temp) payload = primary_target + trimmer_target message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_SET_DG_DIALYSATE_TEMP_TARGETS.value, payload=payload) self.logger.debug("Setting Primary Heater to {} C and Trimmer Heater to {} C".format(primary_target_temp, trimmer_target_temp)) self.can_interface.send(message, 0) def cmd_heaters_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends broadcast time interval. Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: (int) Publish time interval in ms @param reset: (int) 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ resetValue = integer_to_bytearray(reset) intervalValue = integer_to_bytearray(ms) payload = resetValue + intervalValue message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_HEATERS_PUBLISH_INTERVAL_ORVERRIDE.value, payload=payload) self.logger.debug("Sending {} ms publish interval to the Heaters module".format(ms)) # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False