########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file dialysate_generator.py # # @author (last) Quang Nguyen # @date (last) 09-Sep-2020 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from .accelerometer import DGAccelerometer from .alarms import DGAlarms from .drain_pump import DGDrainPump from .hd_proxy import DGHDProxy from .load_cells import DGLoadCells from .pressures import DGPressures from .reservoirs import DGReservoirs from .valves import DGValves from .ro_pump import DGROPump from .heaters import Heaters from .temperature_sensors import TemperatureSensors from .conductivity_sensors import ConductivitySensors from ..protocols.CAN import (DenaliCanMessenger, DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish, _LogManager class DG(_AbstractSubSystem): """ Dialysate Generator (DG) Dialin object API. It provides the basic interface to communicate with the DG firmware. """ # DG message IDs MSG_ID_DG_OPERATION_MODE_BROADCAST = 0x0027 MSG_ID_DG_REQUEST_DG_VERSION = 0x001C MSG_ID_DG_DG_VERSION_RESPONSE = 0x001E MSG_ID_LOGIN_TO_DG = 0xA000 MSG_ID_DG_SAFETY_SHUTDOWN_OVERRIDE = 0xA014 MSG_ID_DG_SOFTWARE_RESET_REQUEST = 0xA022 # HD login password DG_LOGIN_PASSWORD = '123' # broadcast message field positions START_POS_DG_OP_MODE = DenaliMessage.PAYLOAD_START_INDEX END_POS_DG_OP_MODE = START_POS_DG_OP_MODE + 4 START_POS_DG_SUB_MODE = END_POS_DG_OP_MODE END_POS_DG_SUB_MODE = START_POS_DG_SUB_MODE + 4 # DG version message field positions START_POS_MAJOR = DenaliMessage.PAYLOAD_START_INDEX END_POS_MAJOR = START_POS_MAJOR + 1 START_POS_MINOR = END_POS_MAJOR END_POS_MINOR = START_POS_MINOR + 1 START_POS_MICRO = END_POS_MINOR END_POS_MICRO = START_POS_MICRO + 1 START_POS_BUILD = END_POS_MICRO END_POS_BUILD = START_POS_BUILD + 2 # FPGA START_POS_FPGA_ID = END_POS_BUILD END_POS_FPGA_ID = START_POS_FPGA_ID + 1 START_POS_FPGA_MAJOR = END_POS_FPGA_ID END_POS_FPGA_MAJOR = START_POS_FPGA_MAJOR + 1 START_POS_FPGA_MINOR = END_POS_FPGA_MAJOR END_POS_FPGA_MINOR = START_POS_FPGA_MINOR + 1 START_POS_FPGA_LAB = END_POS_FPGA_MINOR END_POS_FPGA_LAB = START_POS_FPGA_LAB + 1 # DG operation modes DG_OP_MODE_FAULT = 0 DG_OP_MODE_SERVICE = 1 DG_OP_MODE_INIT_POST = 2 DG_OP_MODE_STANDBY = 3 DG_OP_MODE_STANDBY_SOLO = 4 DG_OP_MODE_RECIRCULATE = 5 DG_OP_MODE_FILL = 6 DG_OP_MODE_DRAIN = 7 DG_OP_MODE_FLUSH = 8 DG_OP_MODE_DISINFECT = 9 DG_OP_MODE_CHEMICAL_DISINFECT = 10 # DG sub_modes DG_POST_STATE_START = 0 # Start initialize & POST mode state DG_POST_STATE_FPGA = 1 # FPGA POST test state DG_POST_STATE_WATCHDOG = 2 # Watchdog POST test state DG_POST_STATE_TEMPERATURE_SENSORS = 3 # Temperature Sensors POST state DG_POST_STATE_HEATERS = 4 # Heaters POST state DG_POST_STATE_COMPLETED = 5 # POST completed successfully state DG_POST_STATE_FAILED = 6 # POST failed state NUM_OF_DG_POST_STATES = 7 # Number of initialize & POST mode states def __init__(self, can_interface="can0", log_level=None): """ Initializes the DG object For example: dg_object = DG(can_interface='can0') or dg_object = DG(can_interface="can0", log_level="DEBUG") Possible log levels: ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "CAN_ONLY", "PRINT_ONLY"] @param can_interface: string with can bus name, e.g. "can0" """ super().__init__() self._log_manager = _LogManager(log_level=log_level, log_filepath=self.__class__.__name__+".log") self.logger = self._log_manager.logger # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, log_can=self._log_manager.log_level == "CAN_ONLY") self.can_interface.start() # register handler for HD operation mode broadcast messages if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = self.MSG_ID_DG_OPERATION_MODE_BROADCAST self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_dg_op_mode_sync) self.can_interface.register_receiving_publication_function(DenaliChannels.dg_sync_broadcast_ch_id, self.MSG_ID_DG_DG_VERSION_RESPONSE, self._handler_dg_version) # initialize variables that will be populated by DG version response self.dg_version = None self.fpga_version = None # create properties self.dg_operation_mode = self.DG_OP_MODE_INIT_POST self.dg_operation_sub_mode = 0 # Create command groups self.accel = DGAccelerometer(self.can_interface, self.logger) self.hd_proxy = DGHDProxy(self.can_interface, self.logger) self.load_cells = DGLoadCells(self.can_interface, self.logger) self.pressures = DGPressures(self.can_interface, self.logger) self.reservoirs = DGReservoirs(self.can_interface, self.logger) self.valves = DGValves(self.can_interface, self.logger) self.ro_pump = DGROPump(self.can_interface, self.logger) self.drain_pump = DGDrainPump(self.can_interface, self.logger) self.heaters = Heaters(self.can_interface, self.logger) self.temperature_sensors = TemperatureSensors(self.can_interface, self.logger) self.conductivity_sensors = ConductivitySensors(self.can_interface, self.logger) self.alarms = DGAlarms(self.can_interface, self.logger) def get_version(self): """ Gets the DG version. Assumes DG version has already been requested. @return: The dg version string """ return self.dg_version def get_fpga_version(self): """ Gets the fpga version from the DG @return: The FPGA version """ return self.fpga_version def get_operation_mode(self): """ Gets the operation mode @return: The operation mode """ return self.dg_operation_mode def get_operation_sub_mode(self): """ Gets the operation sub mode @return: The operation sub mode """ return self.dg_operation_sub_mode @_publish(["dg_version", "fpga_version"]) def _handler_dg_version(self, message): """ Handler for response from DG regarding its version. @param message: response message from HD regarding valid treatment parameter ranges.\n U08 Major \n U08 Minor \n U08 Micro \n U16 Build \n @return: None if unsuccessful, the version string if unpacked successfully """ major = struct.unpack('B', bytearray(message['message'][self.START_POS_MAJOR:self.END_POS_MAJOR])) minor = struct.unpack('B', bytearray(message['message'][self.START_POS_MINOR:self.END_POS_MINOR])) micro = struct.unpack('B', bytearray(message['message'][self.START_POS_MICRO:self.END_POS_MICRO])) build = struct.unpack('H', bytearray(message['message'][self.START_POS_BUILD:self.END_POS_BUILD])) fpga_id = struct.unpack('B', bytearray( message['message'][self.START_POS_FPGA_ID:self.END_POS_FPGA_ID])) fpga_major = struct.unpack('B', bytearray( message['message'][self.START_POS_FPGA_MAJOR:self.END_POS_FPGA_MAJOR])) fpga_minor = struct.unpack('B', bytearray( message['message'][self.START_POS_FPGA_MINOR:self.END_POS_FPGA_MINOR])) fpga_lab = struct.unpack('B', bytearray( message['message'][self.START_POS_FPGA_LAB:self.END_POS_FPGA_LAB])) if all([len(each) > 0 for each in [major, minor, micro, build]]): self.dg_version = f"v{major[0]}.{minor[0]}.{micro[0]}-{build[0]}" self.logger.debug(f"DG VERSION: {self.dg_version}") if all([len(each) > 0 for each in [fpga_major, fpga_minor, fpga_id, fpga_lab]]): self.fpga_version = f"ID: {fpga_id[0]} v{fpga_major[0]}.{fpga_minor[0]}.{fpga_lab[0]}" self.logger.debug(f"DG FPGA VERSION: {self.fpga_version}") @_publish(["dg_operation_mode", "dg_operation_sub_mode"]) def _handler_dg_op_mode_sync(self, message): """ Handles published DG operation mode messages. Current DG operation mode is captured for reference. @param message: published DG operation mode broadcast message @return: None """ mode = struct.unpack('i', bytearray( message['message'][self.START_POS_DG_OP_MODE:self.END_POS_DG_OP_MODE])) smode = struct.unpack('i', bytearray( message['message'][self.START_POS_DG_SUB_MODE:self.END_POS_DG_SUB_MODE])) self.dg_operation_mode = mode[0] self.dg_operation_sub_mode = smode[0] def cmd_log_in_to_dg(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the DG. @return: 1 if logged in, 0 if log in failed """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_LOGIN_TO_DG, payload=list(map(int, map(ord, self.DG_LOGIN_PASSWORD)))) self.logger.info("Logging in to the DG...") # Send message received_message = self.can_interface.send(message) if received_message is not None: if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: self.logger.info("Successfully logged in to the DG.") else: self.logger.error("Log In Failed.") return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def cmd_ui_request_dg_version(self): """ Constructs and sends the ui request for version message """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_DG_REQUEST_DG_VERSION) self.logger.debug("Sending Dialin request for version to DG") self.can_interface.send(message, 0) def cmd_dg_safety_shutdown_override(self): """ Constructs and sends an DG safety shutdown override command via CAN bus. \returns response message if received, False if no response received @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_DG_SAFETY_SHUTDOWN_OVERRIDE) self.logger.debug("overriding DG safety shutdown") # Send message received_message = self.can_interface.send(message) if received_message is not None: if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: self.logger.debug("Safety shutdown signal overridden") else: self.logger.debug("Safety shutdown signal override failed.") return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_dg_software_reset_request(self): """ Constructs and sends an DG software reset request via CAN bus. Constraints: Must be logged into DG. \returns response message if received, False if no response received @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_DG_SOFTWARE_RESET_REQUEST) self.logger.debug("requesting DG software reset") # Send message received_message = self.can_interface.send(message) if received_message is not None: if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: self.logger.debug("DG is resetting...") else: self.logger.debug("DG reset request failed.") return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False