########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file fans.py # # @author (last) Dara Navaei # @date (last) 27-May-2022 # @author (original) Dara Navaei # @date (original) 04-Aug-2021 # ############################################################################ import struct from ..utils.conversions import integer_to_bytearray, float_to_bytearray from ..utils.checks import check_broadcast_interval_override_ms from .constants import NO_RESET, RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import AbstractSubSystem, publish, DialinEnum from logging import Logger from enum import unique @unique class HDFansNames(DialinEnum): FAN_INLET_1 = 0 class HDFans(AbstractSubSystem): """ @brief Hemodialysis Device (HD) Dialin API sub-class for fans related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_FANS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_fans_sync) # Publish variables self.duty_cycle = 0.0 self.target_rpm = 0.0 self.inlet_1_rpm = 0.0 self.rpm_alarm_time = 0 def get_fans_target_duty_cycle(self): """ Gets the fans target duty cycle @return: Fans target duty cycle """ return self.duty_cycle def get_fan_inlet_1_rpm(self): """ Gets the inlet 1 fan RPM @return: Fan inlet 1 RPM """ return self.inlet_1_rpm def get_fan_target_rpm(self): """ Gets the fans target RPM @return: target RPM """ return self.target_rpm def get_fans_time_left_to_rpm_alarm(self): """ Gets the fans time left to RPM alarm @return: Fans time left to RPM alarm """ return self.rpm_alarm_time @publish(['duty_cycle', 'target_rpm', 'inlet_1_rpm', 'rpm_alarm_time']) def _handler_fans_sync(self, message): """ Handles published thermistors message. @param message: published thermistors message @return: none """ self.duty_cycle = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.target_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.inlet_1_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.rpm_alarm_time = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] def cmd_fans_rpm_override(self, fan: int, rpm: float, reset: int = NO_RESET) -> int: """ Constructs and sends the HD fan RPM override command Constraints: Must be logged into HD. @param fan: (int) fan ID that is status is overridden @param rpm: (int) RPM that the fan will be overridden to @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) f = integer_to_bytearray(fan) r = float_to_bytearray(rpm) payload = reset_value + r + f message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_FANS_RPM_OVERRIDE.value, payload=payload) self.logger.debug("Override fan RPM") # Send message received_message = self.can_interface.send(message) # If there is no content... if received_message is not None: self.logger.debug("Fan " + str(HDFansNames(fan).name) + " to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_rpm_alarm_start_time_offset(self, time: int) -> int: """ Constructs and sends the HD fan RPM alarm start time offset command Constraints: Must be logged into HD. @param time: (int) time offset in milliseconds @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(time) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SET_FANS_RPM_ALARM_START_TIME_OFFSET.value, payload=payload) self.logger.debug("Override fan RPM alarm start time offset") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("RPM alarm start time offset set to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_data_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the fans data publish interval. Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: (int) interval (in ms) to override with @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_FANS_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Override fans data publish interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "Reset back to normal" else: str_res = str(mis) self.logger.debug( "Fans data broadcast interval overridden to " + str_res + " ms: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_duty_cycle_override(self, duty_cycle: float, reset: int = NO_RESET) -> int: """ Constructs and sends the HD fans duty cycle override command Constraints: Must be logged into HD. @param duty_cycle: (float) the duty cycle that the fans are overridden to @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) dc = float_to_bytearray(duty_cycle / 100.0) payload = reset_value + dc message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_FANS_DUTY_CYCLE_OVERRIDE.value, payload=payload) self.logger.debug("Override fans duty cycle") # Send message received_message = self.can_interface.send(message) # If there is no content... if received_message is not None: self.logger.debug("Set fans duty cycle to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False