########################################################################### # # Copyright (c) 2022-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file cpld.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Darren Cox # @date (original) 29-Sep-2022 # ############################################################################ import struct from enum import unique from logging import Logger from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum @unique class CPLD_LED_COLORS(DialinEnum): CPLD_LED_COLOR_OFF = 0 CPLD_LED_COLOR_ORANGE_HEAT_DISINFECT = 1 CPLD_LED_COLOR_YELLOW_CHEM_DISINFECT = 2 CPLD_LED_COLOR_BLUE_FLUSH_DISINFECT = 3 class Cpld(AbstractSubSystem): """ CPLD Dialysate Generator (DG) Dialin API sub-class for CPLD related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_CPLD_STATUS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_cpld_sync) self.cpld_wdog = 0 self.cpld_audio = 0 self.cpld_fault_led = 0 self.cpld_clean_led = 0 self.dg_cpld_timestamp = 0.0 def get_cpld_wdog(self) -> int: """ Gets the current CPLD watchdog value @return: cpld_wdog - 0 if timeout, 1 if OK """ return self.cpld_wdog def get_cpld_audio(self) -> int: """ Gets the current CPLD audio value @return: cpld_audio - 0=OFF, 1=ON """ return self.cpld_audio def get_cpld_fault_led(self) -> int: """ Gets the current CPLD Fault LED value @return: cpld_fault_led - 0=OFF, 1=ON """ return self.cpld_fault_led def get_cpld_clean_led(self) -> int: """ Gets the current CPLD Clean LED value CPLD_LED_COLOR_OFF = 0 CPLD_LED_COLOR_ORANGE_HEAT_DISINFECT = 1 CPLD_LED_COLOR_YELLOW_CHEM_DISINFECT = 2 CPLD_LED_COLOR_BLUE_FLUSH_DISINFECT = 3 @return: cpld_clean_led - CPLD_LED_COLORS enum """ return self.cpld_clean_led @publish(["dg_cpld_timestamp","cpld_wdog", "cpld_audio", "cpld_fault_led", "cpld_clean_led"]) def _handler_cpld_sync(self, message, timestamp=0.0): """ Handles published CPLD data messages. CPLD data are captured for reference. @param message: published CPLD data message @return: None """ self.cpld_wdog = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.START_POS_FIELD_1 + 1]))[0] self.cpld_audio = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1 + 1:MsgFieldPositions.START_POS_FIELD_1 + 2]))[0] self.cpld_fault_led = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1 + 2:MsgFieldPositions.START_POS_FIELD_1 + 3]))[0] self.cpld_clean_led = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1 + 3:MsgFieldPositions.START_POS_FIELD_1 + 4]))[0] self.dg_cpld_timestamp = timestamp