########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateGenerator.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the DG device. # ############################################################################ from DialityCoreCanProtocol import * from time import sleep """ :mainpage Dialin API Dialin API is comprised primarily by 3 classes: - :ref DialysateGenerator.DG - :ref HemodialysisDevice.HD """ def PublicationReceiverHandler(message): print(".", end='', flush=True) class DG: """ :class DG Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ def __init__(self, can__interface="can0"): """ DG constructor using can bus :param can__interface: can bus, e.g. "can0" :returns DG object that allows communication with board via port :details For example: dg_object = DG(can__interface='can0') or dg_object = DG('can0') """ # Create listener self.__can_interface = DialinCanMessenger(can_interface=can__interface) self.__can_interface.registerReceivingPublicationFunction(channel_id=DialinChannels.dg_sync_broadcast_ch_id, message_id=0x7100, function=PublicationReceiverHandler) self.__can_interface.start() def fill(self, start_or_stop='start'): """ request the DG board to fill :returns True if ran the command, False otherwise, returns None if timeout """ payload = [1] if start_or_stop == 'start' else [0] msg = DialinMessage.buildMessage(channel_id=DialinChannels.ui_to_hd_ch_id, message_id=1000, payload=payload) # Send message received_msg = self.__can_interface.send(msg) returnValue = None if received_msg is not None: returnValue = True if DialinMessage.getPayload(received_msg)[0] == 1 else False return returnValue def __del__(self): self.__can_interface.stop() def test_fill_print(msg): if msg is not None: print("f", end='', flush=True) else: print("t", end='', flush=True) if __name__ == "__main__": test_dg = DG() sleep(2) test_packet = test_dg.fill('start') test_fill_print(test_packet) sleep(10) test_packet = test_dg.fill('stop') test_fill_print(test_packet) del test_dg # sleep(1) # test_packet = test_dg.fill('start') # test_fill_print(test_packet) # sleep(2) # test_packet = test_dg.fill('start') # test_fill_print(test_packet)