########################################################################### # # Copyright (c) 2021-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file events.py # # @author (last) Micahel Garthwaite # @date (last) 29-Aug-2023 # @author (original) Dara Navaei # @date (original) 12-Oct-2021 # ############################################################################ import struct from logging import Logger from ..common import * from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliChannels from ..utils.base import AbstractSubSystem, publish from datetime import datetime from time import time class DGEvents(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for events related commands. """ UNKNOWN_STATE = "UNKNOWN_PREVIOUS_STATE" def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.dg_events_timestamp = 0.0 if self.can_interface is not None: channel_id = DenaliChannels.dg_to_ui_ch_id msg_id = MsgIds.MSG_ID_DG_EVENT.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_events_sync) channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_OP_MODE_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_dg_op_mode_sync) # Define the dictionaries self._dg_event_dictionary = dict() self._dg_event_data_type = dict() # Dictionary of the mode as key and the sub mode states enum class as the value self._dg_op_mode_2_sub_mode = {DGOpModes.DG_MODE_FAUL.name: DGFaultStates, DGOpModes.DG_MODE_INIT.name: DGInitStates, DGOpModes.DG_MODE_STAN.name: DGStandByModeStates, DGOpModes.DG_MODE_GENE.name: DGGenIdleModeStates, DGOpModes.DG_MODE_FILL.name: DGFillModeStates, DGOpModes.DG_MODE_DRAI.name: DGDrainModeStates, DGOpModes.DG_MODE_FLUS.name: DGFlushStates, DGOpModes.DG_MODE_HEAT.name: DGHeatDisinfectStates, DGOpModes.DG_MODE_HCOL.name: DGHeatDisinfectActiveCoolStates, DGOpModes.DG_MODE_CHEM.name: DGChemicalDisinfectStates, DGOpModes.DG_MODE_CHFL.name: DGChemDisinfectFlushStates, DGOpModes.DG_MODE_ROPS.name: DGROPermeateSampleStates} # Loop through the list of the DG events enums and initial the event dictionary. Each event is a key in the # dictionary and the value is a list. for event in DGEventList: self._dg_event_dictionary[DGEventList(event).name] = [] # Loop through the list of the event data type enum and update the dictionary for data_type in DGEventDataType: event_data_type = DGEventDataType(data_type).name struct_unpack_type = None # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is # the corresponding format in the python struct if 'U32' in event_data_type or 'BOOL' in event_data_type: struct_unpack_type = 'I' elif 'S32' in event_data_type: struct_unpack_type = 'i' elif 'F32' in event_data_type: struct_unpack_type = 'f' self._dg_event_data_type[event_data_type] = struct_unpack_type def get_dg_nth_event(self, event_id, event_number=0): """ Returns the nth requested DG event @param event_id the ID of the DG event types (i.e. DG_EVENT_STARTUP) @param event_number the event number that is requested. The default is 0 meaning the last occurred event @returns the requested DG event number """ list_length = len(self._dg_event_dictionary[DGEventList(event_id).name]) if list_length == 0: event = [] elif event_number > list_length: event = self._dg_event_dictionary[DGEventList(event_id).name][list_length - 1] else: event = self._dg_event_dictionary[DGEventList(event_id).name][list_length - event_number - 1] return event def clear_dg_event_list(self): """ Clears the DG event list @returns none """ for key in self._dg_event_dictionary: self._dg_event_dictionary[key].clear() def get_dg_events(self, event_id, number_of_events=1): """ Returns the requested number of a certain DG event ID @param event_id the ID of the DG event types (i.e. DG_EVENT_STARTUP) @param number_of_events the last number of messages of a certain event type @returns a list of the requested DG event type """ list_of_events = [] # If there are not enough event lists send all the events that are available if len(self._dg_event_dictionary[DGEventList(event_id).name]) <= number_of_events: list_of_events = self._dg_event_dictionary[DGEventList(event_id).name] else: # Get the all the events complete_list = self._dg_event_dictionary[DGEventList(event_id).name] # Since the last are located at the end of the list, iterate backwards for the defined # event messages for i in range(len(complete_list) - 1, len(complete_list) - number_of_events - 1, -1): list_of_events.append(complete_list[i]) if number_of_events == 0: list_of_events = self._dg_event_dictionary[DGEventList(event_id).name] return list_of_events @publish(["dg_events_timestamp", '_dg_event_dictionary']) def _handler_events_sync(self, message, timestamp=0.0): """ Handles published events message @param message: published DG events data message @returns none """ event_data_1 = 0 event_data_2 = 0 op_mode = 0 sub_mode = 0 sub_state = 0 current_sub_tuple = [] event_id = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] if event_id == DGEventList.DG_EVENT_OPERATION_STATUS.value: # Get the data type event_data_type_1 = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] struct_data_type = self._dg_event_data_type[HDEventDataType(event_data_type_1).name] op_mode = struct.unpack(' current_sub_mode_timestamp: # If the previous and current of the last two tuples do not match, then an operation mode transition # has occurred and the previous state is converted from the previous class and the current op mode # is converted from current operation states enum class. # i.e last = (timestamp, event type, 3, 8) and one before = (timestamp, event type, 8, 3) # previous and current do not match so in the last type (timestamp, event type, 8, 3) the prev state # should be from op mode 8 and the current state should be from op mode 3 previous_op_mode = last_op_tuple[len(last_op_tuple) - 2] if previous_op_mode != DGEvents.UNKNOWN_STATE: previous_sub_mode_enum_class = self._dg_op_mode_2_sub_mode[previous_op_mode] event_data_1 = previous_sub_mode_enum_class(event_data_1).name # Unknown previous state. Display value instead of name. else: event_data_1 = str(event_data_1) event_data_2 = current_sub_mode_enum_class(event_data_2).name else: if event_data_2 != 0: event_data_1 = current_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name else: previous_sub_mode = current_sub_tuple[len(current_sub_tuple) - 2] previous_sub_mode_enum_class = self._dg_op_mode_2_sub_mode[previous_sub_mode] event_data_1 = previous_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name event_tuple = (datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S.%f'), event_state_name, event_data_1, event_data_2) elif event_state_name == DGEventList.DG_EVENT_OPERATION_STATUS.name: event_tuple = (time(), op_mode, sub_mode, sub_state) # Update event dictionary self._dg_event_dictionary[event_state_name].append(event_tuple) self.dg_events_timestamp = timestamp @publish(["dg_event_op_mode_timestamp", "dg_event_op_mode", "dg_event_sub_mode"]) def _handler_dg_op_mode_sync(self, message, timestamp=0.0): """ Handles published DG operation mode messages. Current DG operation mode is captured for reference. @param message: published DG operation mode broadcast message @return: None """ mode = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) smode = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) self.dg_event_op_mode = mode[0] self.dg_event_sub_mode = smode[0] self.dg_event_op_mode_timestamp = timestamp