Index: dialin/dg/watchdog.py =================================================================== diff -u --- dialin/dg/watchdog.py (revision 0) +++ dialin/dg/watchdog.py (revision 5b2d913869600b7f9846eb4c64267538173686cf) @@ -0,0 +1,81 @@ +########################################################################### +# +# Copyright (c) 2020-2023 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file watchdog.py +# +# @author (last) Michael Garthwaite +# @date (last) 15-Aug-2023 +# @author (original) Michael Garthwaite +# @date (original) 15-Aug-2023 +# +############################################################################ +from logging import Logger + +from .constants import RESET, NO_RESET +from ..common import MsgIds +from ..protocols.CAN import DenaliMessage, DenaliChannels +from ..utils.base import AbstractSubSystem +from ..utils.conversions import integer_to_bytearray + + +class DGWatchdog(AbstractSubSystem): + """ + Dialysate Generator (DG) Dialin API sub-class for watchdog related commands. + """ + + def __init__(self, can_interface, logger: Logger): + """ + + @param can_interface: the Denali CAN interface object + """ + + super().__init__() + self.can_interface = can_interface + self.logger = logger + + def cmd_watchdog_task_check_in_override(self, state: int, task: int, reset: int = NO_RESET) -> int: + """ + Constructs and sends the watchdog task check-in override command + Constraints: + Must be logged into DG. + Given task must be valid. + Given state must be a 0 or 1. + + @param state: integer - 1 for task checked in, 0 for task not checked in + @param task: integer - ID of task to override + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + + rst = integer_to_bytearray(reset) + sta = integer_to_bytearray(state) + tsk = integer_to_bytearray(task) + payload = rst + sta + tsk + + message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, + message_id=MsgIds.MSG_ID_DG_WATCHDOG_TASK_CHECKIN_OVERRIDE.value, + payload=payload) + + self.logger.debug("override DG watchdog task check-in state") + + # Send message + received_message = self.can_interface.send(message) + + # If there is content... + if received_message is not None: + # self.logger.debug(received_message) + if reset == RESET: + str_res = "reset back to normal" + else: + str_res = ("checked in" if state != 0 else "not checked in") + self.logger.debug("watchdog task check-in overridden to " + str_res + ":" + + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) + # response payload is OK or not OK + return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] + else: + self.logger.debug("Timeout!!!!") + return False