########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateGenerator.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the DG device. # ############################################################################ from DialityCoreCanProtocol import * from time import sleep """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DialysateGenerator.DG - \ref HemodialysisDevice.HD """ class DG: """ \class DG Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ DG_MSG_ID_FILL_COMMAND = 0x2000 DG_MSG_ID_BROADCAST = 0X2100 def __init__(self, can__interface="can0"): """ DG constructor using can bus \param can__interface: string with can bus name, e.g. "can0" \returns DG object that allows communication with board via port \details For example: dg_object = DG(can__interface='can0') or dg_object = DG('can0') """ # Create listener self.__can_interface = DenaliCanMessenger(can_interface=can__interface) self.__can_interface.registerReceivingPublicationFunction(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=self.DG_MSG_ID_BROADCAST, function=( lambda message: print(".", end='', flush=True))) self.__can_interface.start() def fill(self, start_or_stop='start'): """ request the DG board to 'start' or to 'stop' fill \param start_or_stop is a string indicating which action to take, e.g., 'start' or 'stop' \returns True if ran the command, False otherwise, returns None if timeout """ payload = [1] if start_or_stop == 'start' else [0] msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.hd_to_dg_ch_id, message_id=self.DG_MSG_ID_FILL_COMMAND, payload=payload) # Send message received_msg = self.__can_interface.send(msg) returnValue = None if received_msg is not None: returnValue = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return returnValue def test_dg_start(): dg = DG() sleep(2) resp = dg.fill('start') test_passed = isinstance(resp, bool) if test_passed: print("DG started and DG response is {}".format(test_passed)) assert test_passed def test_dg_stop(): dg = DG() sleep(1) dg.fill('start') sleep(2) resp = dg.fill('stop') test_passed = isinstance(resp, bool) if test_passed: print("DG started/stopped and DG response is {}".format(test_passed)) assert test_passed