########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Sean Nash # @date (last) 12-Nov-2021 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..utils.checks import check_broadcast_interval_override_ms from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.conversions import integer_to_bytearray class DGAlarms(AbstractSubSystem): """ DG interface containing alarm related commands. """ START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) msg_id = MsgIds.MSG_ID_ALARM_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) msg_id = MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_condition_clear) msg_id = MsgIds.MSG_ID_DG_ALARM_INFO.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_information_sync) # alarm states based on received DG alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received DG alarm activation and clear condition messages self.alarm_conditions = [False] * 500 self.safety_shutdown_active = False def get_alarm_states(self): """ Gets all states for all alarms @return: List of booleans of size 500 """ return self.alarm_states def get_alarm_conditions(self): """ Gets all alarm condition states for all alarms @return: List of booleans of size 500 """ return self.alarm_conditions def get_alarm_state(self, alarm_id): """ Gets alarm state for given alarm @return: Alarm state """ return self.alarm_states[alarm_id] def get_safety_shutdown_activated(self): """ Gets the state of the DG safety shutdown signal. @return: (bool) safety shutdown line is activated (T/F) """ return self.safety_shutdown_active @publish(["alarm_states"]) def _handler_alarm_activate(self, message): """ Handles published DG alarm activation messages. @param message: published DG alarm activation message @return: none """ alarm_id = struct.unpack(' int: """ Constructs and sends the alarm state override command Constraints: Must be logged into DG. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_ALARM_STATE_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") self.logger.debug("Alarm " + str(alarm) + " " + str_res + ": " + str( received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_clear_all_alarms(self) -> int: """ Constructs and sends the clear all active alarms command to the DG. This will clear even non-recoverable alarms. Constraints: Must be logged into DG. @return: 1 if successful, zero otherwise """ key = integer_to_bytearray(-758926171) # 0xD2C3B4A5 payload = key message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SUPER_CLEAR_ALARMS_CMD.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("All DG alarms cleared.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_alarm_info_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET): """ Constructs and sends the alarm information broadcast interval override command Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_ALARM_INFO_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override DG alarm information broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("DG alarm information broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False