########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateGenerator.py # # @date 31-Mar-2019 # @author P. Lucia # # @brief This class allows sending to and receiving from the DG device. # ############################################################################ # TODO: Needs to be restructured and existing TODO items need to be addressed. from DialIn.CoreCANProtocol import (DenaliCanMessenger, DenaliMessage, DenaliChannels) from time import sleep import unittest """ \mainpage Dialin API Dialin API is comprised primarily by 3 classes: - \ref DialysateGenerator.DG - \ref HemodialysisDevice.HD """ class DG: """ \class DG Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG board """ DG_MSG_ID_FILL_COMMAND = 0x2000 DG_MSG_ID_BROADCAST = 0X2100 MSG_ID_LOAD_CELL_A1_OVERRIDE = 0xA005 MSG_ID_LOAD_CELL_A2_OVERRIDE = 0xA006 MSG_ID_LOAD_CELL_B1_OVERRIDE = 0xA007 MSG_ID_LOAD_CELL_B2_OVERRIDE = 0xA008 def __init__(self, can_interface="can0"): """ DG constructor using can bus \param can_interface: string with can bus name, e.g. "can0" \returns DG object that allows communication with board via port \details For example: dg_object = DG(can_interface='can0') or dg_object = DG('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface) self.can_interface.register_receiving_publication_function(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=self.DG_MSG_ID_BROADCAST, function=( lambda message: print(".", end='', flush=True))) self.can_interface.start() def fill(self, start_or_stop='start'): """ Request the DG board to 'start' or to 'stop' fill \param start_or_stop is a string indicating which action to take, e.g., 'start' or 'stop' \returns True if ran the command, False otherwise, returns None if timeout """ payload = [1] if start_or_stop == 'start' else [0] msg = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_dg_ch_id, message_id=self.DG_MSG_ID_FILL_COMMAND, payload=payload) # Send message received_msg = self.can_interface.send(msg) return_value = None if received_msg is not None: return_value = True if DenaliMessage.get_payload(received_msg)[0] == 1 else False return return_value def CmdLoadCellA1Override(self, reset, adc_raw): """ Constructs and sends the load cell A1 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_A1_OVERRIDE, payload=payload) print("override load cell A1 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell A1 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellA2Override(self, reset, adc_raw): """ Constructs and sends the load cell A2 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_A2_OVERRIDE, payload=payload) print("override load cell A2 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell A2 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellB1Override(self, reset, adc_raw): """ Constructs and sends the load cell B1 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_B1_OVERRIDE, payload=payload) print("override load cell B1 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell B1 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdLoadCellB2Override(self, reset, adc_raw): """ Constructs and sends the load cell B2 override command \param reset: integer - 1 to reset a previous override, 0 to override \param adc_raw: unsigned int - raw adc value. 0.0894 per gram. 1000 ml = 11,186 \returns 1 if successful, zero otherwise TODO: This is built based on HD but needs more infrastructure made for DG before being operational """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.float2ByteArray(adc_raw) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOAD_CELL_B2_OVERRIDE, payload=payload) print("override load cell B2 raw adc value") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Load cell B2 raw adc (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class Test(unittest.TestCase): # @unittest.skip("Skipping dg_start.") def test_dg_start(self): dg = DG() sleep(2) success = dg.fill("start") self.assertTrue(success) # @unittest.skip("Skipping dg_start_stop.") def test_dg_start_stop(self): dg = DG() sleep(2) success = dg.fill("start") self.assertTrue(success) sleep(2) success = dg.fill('stop') self.assertTrue(success) if __name__ == '__main__': unittest.main(verbosity=2)