########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateGenerator.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the DG device. # ############################################################################ from DialIn.CoreCANProtocol import (DenaliCanMessenger, DenaliMessage, DenaliChannels) from time import sleep import unittest """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DialysateGenerator.DG - \ref HemodialysisDevice.HD """ class DG: """ \class DG Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ DG_MSG_ID_FILL_COMMAND = 0x2000 DG_MSG_ID_BROADCAST = 0X2100 MSG_ID_LOAD_CELL_A1_OVERRIDE = 0xA005 MSG_ID_LOAD_CELL_A2_OVERRIDE = 0xA006 MSG_ID_LOAD_CELL_B1_OVERRIDE = 0xA007 MSG_ID_LOAD_CELL_B2_OVERRIDE = 0xA008 def __init__(self, can_interface="can0"): """ DG constructor using can bus \param can_interface: string with can bus name, e.g. "can0" \returns DG object that allows communication with board via port \details For example: dg_object = DG(can_interface='can0') or dg_object = DG('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface) self.can_interface.register_receiving_publication_function(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=self.DG_MSG_ID_BROADCAST, function=( lambda message: print(".", end='', flush=True))) self.can_interface.start() def fill(self, start_or_stop='start'): """ Request the DG board to 'start' or to 'stop' fill \param start_or_stop is a string indicating which action to take, e.g., 'start' or 'stop' \returns True if ran the command, False otherwise, returns None if timeout """ payload = [1] if start_or_stop == 'start' else [0] msg = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_dg_ch_id, message_id=self.DG_MSG_ID_FILL_COMMAND, payload=payload) # Send message received_msg = self.can_interface.send(msg) return_value = None if received_msg is not None: return_value = True if DenaliMessage.get_payload(received_msg)[0] == 1 else False return return_value def CmdLoadCellA1Override(self, reset, adc_raw): """ Constructs and sends the load cell A1 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_A1_OVERRIDE, payload=payload) print("override load cell A1 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell A1 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellA2Override(self, reset, adc_raw): """ Constructs and sends the load cell A2 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_A2_OVERRIDE, payload=payload) print("override load cell A2 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell A2 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellB1Override(self, reset, adc_raw): """ Constructs and sends the load cell B1 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_B1_OVERRIDE, payload=payload) print("override load cell B1 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell B1 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellB2Override(self, reset, adc_raw): """ Constructs and sends the load cell B2 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_B2_OVERRIDE, payload=payload) print("override load cell B2 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell B2 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class Test(unittest.TestCase): # @unittest.skip("Skipping dg_start.") def test_dg_start(self): dg = DG() sleep(2) success = dg.fill("start") self.assertTrue(success) # @unittest.skip("Skipping dg_start_stop.") def test_dg_start_stop(self): dg = DG() sleep(2) success = dg.fill("start") self.assertTrue(success) sleep(2) success = dg.fill('stop') self.assertTrue(success) if __name__ == '__main__': unittest.main(verbosity=2)