########################################################################### # # Copyright (c) 2022-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file post_treatment.py # # @author (last) Micahel Garthwaite # @date (last) 20-Apr-2022 # @author (original) Micahel Garthwaite # @date (original) 22-Feb-2022 # ############################################################################ import struct from enum import unique from logging import Logger from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.conversions import bytearray_to_integer @unique class HDPostTreatmentDrainStates(DialinEnum): DRAIN_RESERVOIR_SWITCH_STATE = 0 DRAIN_RESERVOIR_START_DRAIN_STATE = 1 DRAIN_RESERVOIR_DRAIN_STATE = 2 DRAIN_RESERVOIR_COMPLETE_STATE = 3 NUM_OF_DRAIN_STATES = 4 class HDPostTreatment(AbstractSubSystem): """ Hemodialysis Delivery (HD) Dialin API sub-class for post treatment related commands. """ def __init__(self, can_interface, logger: Logger): """ HDPostTreatment constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(DenaliChannels.hd_sync_broadcast_ch_id, MsgIds.MSG_ID_HD_POST_TREATMENT_STATE.value, self._handler_post_treatment_sub_mode_sync) self.post_treatment_sub_mode = 0 self.post_treatment_drain_state = 0 def get_post_treatment_sub_mode(self) -> int: """ @return: (int) post treatment submode """ return self.post_treatment_sub_mode @publish(["post_treatment_sub_mode", "post_treatment_drain_state"]) def _handler_post_treatment_sub_mode_sync(self, message): """ Handles published post treatment sub mode data messages. Sub mode data is also captured for reference. @param message: published post treatment sub mode broadcast message U32 sub-mode @return: none """ payload = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 self.post_treatment_sub_mode, index = bytearray_to_integer(payload, index, False) self.post_treatment_drain_state, index = bytearray_to_integer(payload, index, False)