########################################################################### # # Copyright (c) 2021-2025 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file events.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Dara Navaei # @date (original) 12-Oct-2021 # ############################################################################ # Module imports import struct from logging import Logger from datetime import datetime # Project imports from leahi_dialin.common import dd_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish class DDEvents(AbstractSubSystem): """ Dialysate Delivery (DD) Dialin API sub-class for events related commands. """ UNKNOWN_STATE = "UNKNOWN_PREVIOUS_STATE" def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_EVENT.value, function = self._handler_events_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_OP_MODE_DATA.value, function = self._handler_dd_op_mode_sync) self.dd_events_timestamp = 0.0 #: The timestamp of the last Event message self.dd_event_op_mode = 0 #: The new Operation Mode value self.dd_event_sub_mode = 0 #: The new Operation Sub-Mode value self.dd_event_op_mode_timestamp = 0.0 #: The timestamp of the last Operation Mode change message # Dictionary of the mode as key and the sub mode states enum class as the value self._dd_op_mode_2_sub_mode = {dd_enum_repository.DDOpModes.MODE_FAUL.name: dd_enum_repository.DDFaultStates, dd_enum_repository.DDOpModes.MODE_SERV.name: dd_enum_repository.DDServiceStates, dd_enum_repository.DDOpModes.MODE_INIT.name: dd_enum_repository.DDInitStates, dd_enum_repository.DDOpModes.MODE_STAN.name: dd_enum_repository.DDStandbyStates, dd_enum_repository.DDOpModes.MODE_PREG.name: dd_enum_repository.DDPreGenDialysateStates, dd_enum_repository.DDOpModes.MODE_GEND.name: dd_enum_repository.DDGenDialysateModeStates, dd_enum_repository.DDOpModes.MODE_POSG.name: dd_enum_repository.DDPostGenDialysateStates, dd_enum_repository.DDOpModes.MODE_HEAT.name: dd_enum_repository.DDHeatDisinfectStates, dd_enum_repository.DDOpModes.MODE_HCOL.name: dd_enum_repository.DDHeaterCoolingStates, dd_enum_repository.DDOpModes.MODE_ROPS.name: dd_enum_repository.DDROPermeateStates, dd_enum_repository.DDOpModes.MODE_NLEG.name: dd_enum_repository.DDNotLegalStates} # Define the dictionaries self._dd_event_dictionary = dict() # Loop through the list of the DD events enums and initial the event dictionary. Each event is a key in the # dictionary and the value is a list. for event in dd_enum_repository.DDEventList: self._dd_event_dictionary[dd_enum_repository.DDEventList(event).name] = [] def get_dd_nth_event(self, event_id, event_number=0): """ Returns the nth requested DD event @param event_id the ID of the DD event types (i.e. DD_EVENT_STARTUP) @param event_number the event number that is requested. The default is 0 meaning the last occurred event @returns the requested DD event number """ list_length = len(self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name]) if list_length == 0: event = [] elif event_number > list_length: event = self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name][list_length - 1] else: event = self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name][list_length - event_number - 1] return event def clear_dd_event_list(self): """ Clears the DD event list @returns none """ for key in self._dd_event_dictionary: self._dd_event_dictionary[key].clear() def get_dd_events(self, event_id, number_of_events=1): """ Returns the requested number of a certain DD event ID @param event_id the ID of the DD event types (i.e. DD_EVENT_STARTUP) @param number_of_events the last number of messages of a certain event type @returns a list of the requested DD event type """ list_of_events = [] # If there are not enough event lists send all the events that are available if len(self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name]) <= number_of_events: list_of_events = self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name] else: # Get the all the events complete_list = self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name] # Since the last are located at the end of the list, iterate backwards for the defined # event messages for i in range(len(complete_list) - 1, len(complete_list) - number_of_events - 1, -1): list_of_events.append(complete_list[i]) if number_of_events == 0: list_of_events = self._dd_event_dictionary[dd_enum_repository.DDEventList(event_id).name] return list_of_events @publish(["msg_id_dd_event", "dd_events_timestamp", '_dd_event_dictionary']) def _handler_events_sync(self, message, timestamp=0.0): """ Handles published events message @param message: published DD events data message @returns none """ event_data_1 = 0 event_data_2 = 0 op_mode = 0 sub_mode = 0 sub_state = 0 current_sub_tuple = [] event_id = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] # Convert the event ID to enum event_enum = dd_enum_repository.DDEventList(event_id) if event_enum is dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS: # Get the data type event_data_type_1 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] event_data_1 = struct.unpack(DataTypes(event_data_type_1).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] op_mode = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.START_POS_FIELD_4+1]))[0] sub_mode = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4+1:MsgFieldPositions.START_POS_FIELD_4+2]))[0] sub_state = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4+2:MsgFieldPositions.START_POS_FIELD_4+3]))[0] _ = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4+3:MsgFieldPositions.END_POS_FIELD_4]))[0] event_data_type_2 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] event_data_2 = struct.unpack(DataTypes(event_data_type_2).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] else: # Get the data type event_data_type_1 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] event_data_1 = struct.unpack(DataTypes(event_data_type_1).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] event_data_type_2 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] event_data_2 = struct.unpack(DataTypes(event_data_type_2).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] # Create the event data current_timestamp = datetime.strptime(last_op_tuple[0], '%Y-%m-%d %H:%M:%S.%f') event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) # Update event data from integer to name if it's op mode change if event_enum is dd_enum_repository.DDEventList.DD_EVENT_OP_MODE_CHANGE: event_data_1 = dd_enum_repository.DDOpModes(event_data_1).name event_data_2 = dd_enum_repository.DDOpModes(event_data_2).name event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) # Check if the event state name is sub mode change. elif event_enum is dd_enum_repository.DDEventList.DD_EVENT_SUB_MODE_CHANGE: # Get the length of the list of the op mode list op_list_len = len(self._dd_event_dictionary[dd_enum_repository.DDEventList.DD_EVENT_OP_MODE_CHANGE.name]) # Get the last tuple of the op mode # It is a list of tuples that each tuple is (timestamp, event type, prev op mode, current op mode) if op_list_len != 0: last_op_tuple = self._dd_event_dictionary[dd_enum_repository.DDEventList.DD_EVENT_OP_MODE_CHANGE.name][op_list_len - 1] else: # No op mode event has been recieved before the submode event was recieved. Use broadcast messages to # determine current op mode. Previous mode can not be known last_op_tuple = (current_timestamp, dd_enum_repository.DDEventList.DD_EVENT_OP_MODE_CHANGE.name, DDEvents.UNKNOWN_STATE, dd_enum_repository.DDOpModes(self.dd_event_op_mode).name) # Get the current and previous operation modes of the last tuple in the list of the sub modes # i.e. (timestamp, event type, prev, current) current_op_mode = last_op_tuple[len(last_op_tuple) - 1] sub_mode_list_len = len(self._dd_event_dictionary[dd_enum_repository.DDEventList.DD_EVENT_SUB_MODE_CHANGE.name]) if sub_mode_list_len != 0: # Get the tuple prior to the last tuple and get its previous and current operation modes current_sub_tuple = self._dd_event_dictionary[dd_enum_repository.DDEventList.DD_EVENT_SUB_MODE_CHANGE.name][ sub_mode_list_len - 1] current_sub_mode_timestamp = datetime.strptime(current_sub_tuple[0], '%Y-%m-%d %H:%M:%S.%f') else: current_sub_mode_timestamp = 0 # Get the class of the states enums of the current operation mode that is running current_sub_mode_enum_class = self._dd_op_mode_2_sub_mode[current_op_mode] # Check if the operation modes of the two tuples match # i.e. last = (timestamp, event type, prev, current) and one before = (timestamp, event type, prev, current) # If the prev and current match respectively, it means the current operation mode has not changed so the # operation mode states can be converted from the current sub mode enum class if current_sub_mode_timestamp != 0: if current_timestamp <= current_sub_mode_timestamp: event_data_1 = current_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name elif current_timestamp > current_sub_mode_timestamp: # If the previous and current of the last two tuples do not match, then an operation mode transition # has occurred and the previous state is converted from the previous class and the current op mode # is converted from current operation states enum class. # i.e last = (timestamp, event type, 3, 8) and one before = (timestamp, event type, 8, 3) # previous and current do not match so in the last type (timestamp, event type, 8, 3) the prev state # should be from op mode 8 and the current state should be from op mode 3 previous_op_mode = last_op_tuple[len(last_op_tuple) - 2] if previous_op_mode != DDEvents.UNKNOWN_STATE: previous_sub_mode_enum_class = self._dd_op_mode_2_sub_mode[previous_op_mode] event_data_1 = previous_sub_mode_enum_class(event_data_1).name # Unknown previous state. Display value instead of name. else: event_data_1 = str(event_data_1) event_data_2 = current_sub_mode_enum_class(event_data_2).name else: if event_data_2 != 0: event_data_1 = current_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name else: previous_sub_mode = current_sub_tuple[len(current_sub_tuple) - 2] previous_sub_mode_enum_class = self._dd_op_mode_2_sub_mode[previous_sub_mode] event_data_1 = previous_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) elif event_enum is dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS: event_tuple = (current_timestamp, op_mode, sub_mode, sub_state) # Update event dictionary self._dd_event_dictionary[event_enum.name].append(event_tuple) self.dd_events_timestamp = timestamp @publish(["msg_id_dd_op_mode_data", "dd_event_op_mode_timestamp", "dd_event_op_mode", "dd_event_sub_mode"]) def _handler_dd_op_mode_sync(self, message, timestamp=0.0): """ Handles published DD operation mode messages. Current DD operation mode is captured for reference. @param message: published DD operation mode broadcast message @return: None """ msg_list = [] msg_list.append(('self.dd_event_op_mode', DataTypes.U32)) msg_list.append(('self.dd_event_sub_mode', DataTypes.U32)) self.process_into_vars(decoder_list = msg_list, message = message) self.dd_event_op_mode_timestamp = timestamp