Index: dialin/dg/load_cells.py =================================================================== diff -u -r900f99812cee2e022fbbd46cd916b08a1397dda7 -r615852271ed083494ef644c55536c5dad1b25c83 --- dialin/dg/load_cells.py (.../load_cells.py) (revision 900f99812cee2e022fbbd46cd916b08a1397dda7) +++ dialin/dg/load_cells.py (.../load_cells.py) (revision 615852271ed083494ef644c55536c5dad1b25c83) @@ -29,7 +29,7 @@ Dialysate Generator (DG) Dialin API sub-class for load cell related commands. """ - # Pressure/Occlusion message IDs + # Load cell message IDs MSG_ID_DG_LOAD_CELL_DATA = 0x000C MSG_ID_DG_LOAD_CELL_OVERRIDE = 0xA005 MSG_ID_DG_LOAD_CELL_DATA_BROADCAST_INTERVAL_OVERRIDE = 0xA00D Index: dialin/hd/air_trap.py =================================================================== diff -u --- dialin/hd/air_trap.py (revision 0) +++ dialin/hd/air_trap.py (revision 615852271ed083494ef644c55536c5dad1b25c83) @@ -0,0 +1,187 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file air_trap.py +# +# @author (last) Sean +# @date (last) 21-Sep-2020 +# @author (original) Sean +# @date (original) 21-Sep-2020 +# +############################################################################ +import struct +from ..utils.conversions import integer_to_bytearray, float_to_bytearray +from .constants import RESET,NO_RESET +from ..protocols.CAN import (DenaliMessage, + DenaliChannels) +from ..utils.base import _AbstractSubSystem, _publish +from logging import Logger + + +class HDAirTrap(_AbstractSubSystem): + """ + HDAirTrap + + Hemodialysis Delivery (HD) Dialin API sub-class for air trap related commands. + """ + + # Air trap message IDs + MSG_ID_HD_AIR_TRAP_DATA = 0x003E + MSG_ID_DG_AIR_TRAP_LEVEL_SENSOR_OVERRIDE = 0x8033 + MSG_ID_DG_AIR_TRAP_DATA_BROADCAST_INTERVAL_OVERRIDE = 0x8032 + + # Air trap broadcast message field positions + START_POS_LOWER_LEVEL = DenaliMessage.PAYLOAD_START_INDEX + END_POS_LOWER_LEVEL = START_POS_LOWER_LEVEL + 4 + START_POS_UPPER_LEVEL = END_POS_LOWER_LEVEL + END_POS_UPPER_LEVEL = START_POS_UPPER_LEVEL + 4 + + # Air trap level sensor IDs + LOWER_LEVEL_SENSOR = 0 + UPPER_LEVEL_SENSOR = 1 + + # Air trap level sensor levels + AIR_DETECTED_AT_LEVEL = 0 + FLUID_DETECTED_AT_LEVEL = 1 + + def __init__(self, can_interface, logger: Logger): + """ + + @param can_interface: Denali Can Messenger object + """ + super().__init__() + self.can_interface = can_interface + self.logger = logger + + if self.can_interface is not None: + channel_id = DenaliChannels.hd_sync_broadcast_ch_id + msg_id = self.MSG_ID_HD_AIR_TRAP_DATA + self.can_interface.register_receiving_publication_function(channel_id, msg_id, + self._handler_air_trap_sync) + + self.lower_level = self.AIR_DETECTED_AT_LEVEL + self.upper_level = self.AIR_DETECTED_AT_LEVEL + + def get_air_trap_levels(self): + """ + Gets the current air trap levels + + @return: List containing air trap levels: [Lower, Upper] + """ + return [self.lower_level, self.upper_level] + + def get_air_trap_lower_level(self): + """ + Gets the current air trap lower level reading + + @return: 0 for air, 1 for fluid at lower level + """ + return self.lower_level + + def get_air_trap_upper_level(self): + """ + Gets the current air trap upper level reading + + @return: 0 for air, 1 for fluid at upper level + """ + return self.upper_level + + @_publish(["lower_level", "upper_level"]) + def _handler_air_trap_sync(self, message): + """ + Handles published air trap data messages. Air trap data are captured + for reference. + + @param message: published air trap data message + @return: None + """ + + lower = struct.unpack('i', bytearray( + message['message'][self.START_POS_LOWER_LEVEL:self.END_POS_LOWER_LEVEL])) + upper = struct.unpack('i', bytearray( + message['message'][self.START_POS_UPPER_LEVEL:self.END_POS_UPPER_LEVEL])) + + self.lower_level = lower[0] + self.upper_level = upper[0] + + def cmd_air_trap_level_sensor_override(self, detected, sensor, reset=NO_RESET): + """ + Constructs and sends the air trap level sensor override command + Constraints: + Must be logged into HD. + Given sensor must be one of the sensors listed below. + + @param detected: unsigned int - detected (0=air, 1=fluid) to override sensor with + @param sensor: unsigned int - sensor ID + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + + Air trap sensor IDs: \n + 0 = Lower level \n + 1 = Upper level \n + """ + + rst = integer_to_bytearray(reset) + det = float_to_bytearray(detected) + idx = integer_to_bytearray(sensor) + payload = rst + det + idx + + message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, + message_id=self.MSG_ID_DG_AIR_TRAP_LEVEL_SENSOR_OVERRIDE, + payload=payload) + + self.logger.debug("override air trap level sensor detection value for sensor " + str(sensor)) + + # Send message + received_message = self.can_interface.send(message) + + # If there is content... + if received_message is not None: + # response payload is OK or not OK + return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] + else: + self.logger.debug("Timeout!!!!") + return False + + def cmd_air_trap_data_broadcast_interval_override(self, ms, reset=NO_RESET): + """ + Constructs and sends the air trap data broadcast interval override command + Constraints: + Must be logged into HD. + Given interval must be non-zero and a multiple of the HD general task interval (50 ms). + + @param ms: integer - interval (in ms) to override with + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + + rst = integer_to_bytearray(reset) + mis = integer_to_bytearray(ms) + payload = rst + mis + + message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, + message_id=self.MSG_ID_DG_AIR_TRAP_DATA_BROADCAST_INTERVAL_OVERRIDE, + payload=payload) + + self.logger.debug("override HD air trap data broadcast interval") + + # Send message + received_message = self.can_interface.send(message) + + # If there is content... + if received_message is not None: + if reset == RESET: + str_res = "reset back to normal: " + else: + str_res = str(ms) + " ms: " + self.logger.debug("Air trap data broadcast interval overridden to " + str_res + + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) + # response payload is OK or not OK + return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] + else: + self.logger.debug("Timeout!!!!") + return False