########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file fans.py # # @author (last) Micahel Garthwaite # @date (last) 07-Mar-2023 # @author (original) Dara Navaei # @date (original) 18-Nov-2020 # ############################################################################ import struct from enum import unique from logging import Logger from .constants import NO_RESET, RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray @unique class DGFansNames(DialinEnum): FAN_INLET_1 = 0 FAN_INLET_2 = 1 FAN_INLET_3 = 2 FAN_OUTLET_1 = 3 FAN_OUTLET_2 = 4 FAN_OUTLET_3 = 5 class Fans(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for fans related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_FANS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_fans_sync) # Publish variables self.dg_fans_duty_cycle = 0.0 self.dg_fans_target_rpm = 0.0 self.inlet_1_rpm = 0.0 self.inlet_2_rpm = 0.0 self.inlet_3_rpm = 0.0 self.outlet_1_rpm = 0.0 self.outlet_2_rpm = 0.0 self.outlet_3_rpm = 0.0 self.rpm_alarm_time = 0 self.dg_fans_timestamp = 0.0 def get_fans_target_duty_cycle(self): """ Gets the fans target duty cycle @return: Fans target duty cycle """ return self.dg_fans_duty_cycle def get_fan_inlet_1_rpm(self): """ Gets the inlet 1 fan RPM @return: Fan inlet 1 RPM """ return self.inlet_1_rpm def get_fan_inlet_2_rpm(self): """ Gets the inlet 2 fan RPM @return: Fan inlet 2 RPM """ return self.inlet_2_rpm def get_fan_inlet_3_rpm(self): """ Gets the inlet 3 fan RPM @return: Fan inlet 3 RPM """ return self.inlet_3_rpm def get_fan_outlet_1_rpm(self): """ Gets the outlet 1 fan RPM @return: Fan outlet 1 RPM """ return self.outlet_1_rpm def get_fan_outlet_2_rpm(self): """ Gets the outlet 2 fan RPM @return: Fan outlet 2 RPM """ return self.outlet_2_rpm def get_fan_outlet_3_rpm(self): """ Gets the outlet 3 fan RPM @return: Fan outlet 3 RPM """ return self.outlet_3_rpm def get_dg_fans_target_rpm(self): """ Gets the fans target RPM @return: Fans target RPM """ return self.dg_fans_target_rpm def get_fans_time_left_to_rpm_alarm(self): """ Gets the fans time left to RPM alarm @return: Fans time left to RPM alarm """ return self.rpm_alarm_time @publish(["dg_fans_timestamp",'dg_fans_duty_cycle', 'dg_fans_target_rpm', 'inlet_1_rpm', 'inlet_2_rpm', 'inlet_3_rpm', 'outlet_1_rpm', 'outlet_2_rpm', 'outlet_3_rpm', 'rpm_alarm_time']) def _handler_fans_sync(self, message, timestamp=0.0): """ Handles published thermistors message. @param message: published thermistors message @return: none """ self.dg_fans_duty_cycle = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.dg_fans_target_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.inlet_1_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.inlet_2_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.inlet_3_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.outlet_1_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.outlet_2_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.outlet_3_rpm = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.rpm_alarm_time = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.dg_fans_timestamp = timestamp def cmd_fans_rpm_override(self, fan: int, rpm: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dg fan RPM override command Constraints: Must be logged into DG. @param fan: (int) fan ID that is status is overridden @param rpm: (int) RPM that the fan will be overridden to @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) f = integer_to_bytearray(fan) r = float_to_bytearray(rpm) payload = reset_value + r + f message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_FANS_RPM_OVERRIDE.value, payload=payload) self.logger.debug("Override fan RPM") # Send message received_message = self.can_interface.send(message) # If there is no content... if received_message is not None: self.logger.debug("Fan " + str(DGFansNames(fan).name) + " to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_rpm_alarm_start_time_offset(self, time: int) -> int: """ Constructs and sends the DG fan RPM alarm start time offset command Constraints: Must be logged into DG. @param time: (int) time offset in milliseconds @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(time) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SET_FANS_RPM_ALARM_START_TIME_OFFSET.value, payload=payload) self.logger.debug("Override fan RPM alarm start time offset") # Send message received_message = self.can_interface.send(message) # If there is no content... if received_message is not None: self.logger.debug("RPM alarm start time offset set to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_data_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the fans data publish interval. Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: (int) interval (in ms) to override with @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_FANS_DATA_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Override fans data publish interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "Reset back to normal" else: str_res = str(mis) self.logger.debug( "Fans data broadcast interval overridden to " + str_res + " ms: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fans_duty_cycle_override(self, duty_cycle: float, reset: int = NO_RESET) -> int: """ Constructs and sends the DG fans duty cycle override command Constraints: Must be logged into DG. @param duty_cycle: (float) the duty cycle that the fans are overridden to @param reset: (int) 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) dc = float_to_bytearray(duty_cycle / 100.0) payload = reset_value + dc message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_FANS_DUTY_CYCLE_OVERRIDE.value, payload=payload) self.logger.debug("Override fans duty cycle") # Send message received_message = self.can_interface.send(message) # If there is no content... if received_message is not None: self.logger.debug("Set fans duty cycle to: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False