Index: dialin/dg/alarms.py =================================================================== diff -u --- dialin/dg/alarms.py (revision 0) +++ dialin/dg/alarms.py (revision d490b41a0158d5d0f97b3c1c646c7045bfc4db27) @@ -0,0 +1,150 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file alarms.py +# +# @author (last) Quang Nguyen +# @date (last) 02-Sep-2020 +# @author (original) Quang Nguyen +# @date (original) 02-Sep-2020 +# +############################################################################ +from ..protocols.CAN import (DenaliMessage, + DenaliChannels) +from ..utils.conversions import integer_to_bytearray +from ..utils.base import _AbstractSubSystem, _publish +from .constants import RESET, NO_RESET +from collections import OrderedDict +import struct +from logging import Logger +from ..common.alarm_defs import AlarmList + + +class DGAlarms(_AbstractSubSystem): + """ + DG interface containing alarm related commands. + """ + + # alarms message IDs + MSG_ID_ALARM_ACTIVATE = 0x0003 + MSG_ID_ALARM_CLEAR = 0x0004 + MSG_ID_DG_ALARM_STATE_OVERRIDE = 0xA001 + + START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX + END_POS_ALARM_ID = START_POS_ALARM_ID + 2 + + def __init__(self, can_interface, logger: Logger): + """ + @param can_interface: Denali Can Messenger object + """ + super().__init__() + self.can_interface = can_interface + self.logger = logger + + if self.can_interface is not None: + channel_id = DenaliChannels.dg_alarm_broadcast_ch_id + msg_id = self.MSG_ID_ALARM_ACTIVATE + self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) + + channel_id = DenaliChannels.dg_alarm_broadcast_ch_id + msg_id = self.MSG_ID_ALARM_CLEAR + self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) + + # alarm states based on received DG alarm activation and alarm clear messages + self.alarm_states = [False] * 500 + + self.ids = OrderedDict() + for attr in dir(AlarmList): + if not callable(getattr(AlarmList, attr)) and attr.startswith("ALARM_ID"): + self.ids[attr] = getattr(AlarmList, attr) + self.ids = OrderedDict(sorted(self.ids.items(), key=lambda key: key[1].value)) + + def get_alarm_states(self): + """ + Gets all states for all alarms + + @return: List of booleans of size 500 + """ + return self.alarm_states + + def get_alarm_state(self, alarm_id): + """ + Gets alarm state for given alarm + + @return: Alarm state + """ + return self.alarm_states[alarm_id] + + def get_alarm_ids(self): + """ + Returns a dictionary of the alarm short name and the corresponding id + + @return: OrderedDict of the alarm ids + """ + return self.ids + + @_publish(["alarm_states"]) + def _handler_alarm_activate(self, message): + """ + Handles published DG alarm activation messages. + + @param message: published DG alarm activation message + @return: none + """ + + alarm_id = struct.unpack('