########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateGenerator.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the DG device. # ############################################################################ from DialityCoreCanProtocol import * from time import sleep """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DialysateGenerator.DG - \ref HemodialysisDevice.HD """ def PublicationReceiverHandler(message): print(".", end='', flush=True) class DG: """ \class DG Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ DG_MSG_ID_FILL_COMMAND = 0xA000 DG_MSG_ID_BROADCAST = 0XA100 def __init__(self, can__interface="can0"): """ DG constructor using can bus \param can__interface: string with can bus name, e.g. "can0" \returns DG object that allows communication with board via port \details For example: dg_object = DG(can__interface='can0') or dg_object = DG('can0') """ # Create listener self.__can_interface = DenaliCanMessenger(can_interface=can__interface) self.__can_interface.registerReceivingPublicationFunction(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=self.DG_MSG_ID_BROADCAST, function=PublicationReceiverHandler) self.__can_interface.start() def fill(self, start_or_stop='start'): """ request the DG board to 'start' or to 'stop' fill \param start_or_stop is a string indicating which action to take, e.g., 'start' or 'stop' \returns True if ran the command, False otherwise, returns None if timeout """ payload = [1] if start_or_stop == 'start' else [0] msg = DenaliMessage.buildMessage(channel_id=DenaliChannels.hd_to_dg_ch_id, message_id=self.DG_MSG_ID_FILL_COMMAND, payload=payload) # Send message received_msg = self.__can_interface.send(msg) returnValue = None if received_msg is not None: returnValue = True if DenaliMessage.getPayload(received_msg)[0] == 1 else False return returnValue def test_fill_print(msg): if msg is not None: print("f", end='', flush=True) else: print("t", end='', flush=True) if __name__ == "__main__": test_dg = DG() sleep(2) test_packet = test_dg.fill('start') test_fill_print(test_packet) sleep(10) test_packet = test_dg.fill('stop') test_fill_print(test_packet) # del test_dg # sleep(1) # test_packet = test_dg.fill('start') # test_fill_print(test_packet) # sleep(2) # test_packet = test_dg.fill('start') # test_fill_print(test_packet)