Index: dialin/hd/air_trap.py =================================================================== diff -u -rf994f3ff2163edac03c480e1112a7c4b3ade7b76 -r56ebe87f02ab94a41349c8649c1bbd3cf5b11103 --- dialin/hd/air_trap.py (.../air_trap.py) (revision f994f3ff2163edac03c480e1112a7c4b3ade7b76) +++ dialin/hd/air_trap.py (.../air_trap.py) (revision 56ebe87f02ab94a41349c8649c1bbd3cf5b11103) @@ -1,41 +1,36 @@ ########################################################################### # -# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # -# @file air_trap.py +# @file air_trap.py # -# @author (last) Sean -# @date (last) 21-Sep-2020 -# @author (original) Sean -# @date (original) 21-Sep-2020 +# @author (last) Michael Garthwaite +# @date (last) 26-Jun-2024 +# @author (original) Sean Nash +# @date (original) 21-Sep-2020 # ############################################################################ import struct -from ..utils.conversions import integer_to_bytearray, float_to_bytearray -from .constants import RESET,NO_RESET -from ..common.msg_defs import MsgIds -from ..protocols.CAN import (DenaliMessage, - DenaliChannels) -from ..utils.base import _AbstractSubSystem, _publish from logging import Logger +from .constants import RESET, NO_RESET +from ..common.msg_defs import MsgIds, MsgFieldPositions +from ..protocols.CAN import DenaliMessage, DenaliChannels +from ..utils.base import AbstractSubSystem, publish +from ..utils.checks import check_broadcast_interval_override_ms +from ..utils.conversions import integer_to_bytearray, float_to_bytearray -class HDAirTrap(_AbstractSubSystem): + +class HDAirTrap(AbstractSubSystem): """ HDAirTrap Hemodialysis Delivery (HD) Dialin API sub-class for air trap related commands. """ - # Air trap broadcast message field positions - START_POS_LOWER_LEVEL = DenaliMessage.PAYLOAD_START_INDEX - END_POS_LOWER_LEVEL = START_POS_LOWER_LEVEL + 4 - START_POS_UPPER_LEVEL = END_POS_LOWER_LEVEL - END_POS_UPPER_LEVEL = START_POS_UPPER_LEVEL + 4 - # Air trap level sensor IDs LOWER_LEVEL_SENSOR = 0 UPPER_LEVEL_SENSOR = 1 @@ -61,14 +56,17 @@ self.lower_level = self.AIR_DETECTED_AT_LEVEL self.upper_level = self.AIR_DETECTED_AT_LEVEL + self.lower_level_raw = self.AIR_DETECTED_AT_LEVEL + self.upper_level_raw = self.AIR_DETECTED_AT_LEVEL + self.hd_air_trap_timestamp = 0.0 def get_air_trap_levels(self): """ Gets the current air trap levels - @return: List containing air trap levels: [Lower, Upper] + @return: List containing air trap levels: [Lower, Upper, RawLower, RawUpper] """ - return [self.lower_level, self.upper_level] + return [self.lower_level, self.upper_level, self.lower_level_raw,self.upper_level_raw] def get_air_trap_lower_level(self): """ @@ -86,9 +84,25 @@ """ return self.upper_level - @_publish(["lower_level", "upper_level"]) - def _handler_air_trap_sync(self, message): + def get_raw_air_trap_lower_level(self): """ + Gets the current air trap raw lower level reading + + @return: 0 for air, 1 for fluid at lower level + """ + return self.lower_level_raw + + def get_raw_air_trap_upper_level(self): + """ + Gets the current air trap raw upper level reading + + @return: 0 for air, 1 for fluid at upper level + """ + return self.upper_level_raw + + @publish(["hd_air_trap_timestamp", "lower_level", "upper_level", "lower_level_raw", "upper_level_raw"]) + def _handler_air_trap_sync(self, message, timestamp=0.0): + """ Handles published air trap data messages. Air trap data are captured for reference. @@ -97,22 +111,68 @@ """ lower = struct.unpack('i', bytearray( - message['message'][self.START_POS_LOWER_LEVEL:self.END_POS_LOWER_LEVEL])) + message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) upper = struct.unpack('i', bytearray( - message['message'][self.START_POS_UPPER_LEVEL:self.END_POS_UPPER_LEVEL])) + message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) + raw_lower = struct.unpack('i', bytearray( + message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) + raw_upper = struct.unpack('i', bytearray( + message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) self.lower_level = lower[0] self.upper_level = upper[0] + self.lower_level_raw = raw_lower[0] + self.upper_level_raw = raw_upper[0] + self.hd_air_trap_timestamp = timestamp - def cmd_air_trap_level_sensor_override(self, detected, sensor, reset=NO_RESET): + def cmd_air_trap_level_sensor_override(self, sensor: int, detected: int, reset: int = NO_RESET) -> int: """ Constructs and sends the air trap level sensor override command Constraints: Must be logged into HD. Given sensor must be one of the sensors listed below. + @param sensor: unsigned int - sensor ID @param detected: unsigned int - detected (0=air, 1=fluid) to override sensor with + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + + Air trap sensor IDs: \n + 0 = Lower level \n + 1 = Upper level \n + """ + + rst = integer_to_bytearray(reset) + det = integer_to_bytearray(detected) + idx = integer_to_bytearray(sensor) + payload = rst + det + idx + + message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, + message_id=MsgIds.MSG_ID_HD_AIR_TRAP_LEVEL_SENSOR_OVERRIDE.value, + payload=payload) + + self.logger.debug("override air trap level sensor detection value for sensor " + str(sensor) + " ,Level: " + str(detected)) + + # Send message + received_message = self.can_interface.send(message) + + # If there is content... + if received_message is not None: + # response payload is OK or not OK + return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] + else: + self.logger.debug("Timeout!!!!") + return False + + def cmd_raw_air_trap_level_sensor_override(self, sensor: int, detected: int, reset: int = NO_RESET) -> int: + """ + Constructs and sends the raw air trap level sensor override command + Constraints: + Must be logged into HD. + Given sensor must be one of the sensors listed below. + @param sensor: unsigned int - sensor ID + @param detected: unsigned int - detected (0=air, 1=fluid) to override sensor with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise @@ -122,15 +182,15 @@ """ rst = integer_to_bytearray(reset) - det = float_to_bytearray(detected) + det = integer_to_bytearray(detected) idx = integer_to_bytearray(sensor) payload = rst + det + idx message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, - message_id=MsgIds.MSG_ID_DG_AIR_TRAP_LEVEL_SENSOR_OVERRIDE.value, + message_id=MsgIds.MSG_ID_HD_RAW_AIR_TRAP_LEVEL_SENSOR_OVERRIDE.value, payload=payload) - self.logger.debug("override air trap level sensor detection value for sensor " + str(sensor)) + self.logger.debug("override raw air trap level sensor detection value for sensor " + str(sensor) + " ,Level: " + str(detected)) # Send message received_message = self.can_interface.send(message) @@ -143,7 +203,7 @@ self.logger.debug("Timeout!!!!") return False - def cmd_air_trap_data_broadcast_interval_override(self, ms, reset=NO_RESET): + def cmd_air_trap_data_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the air trap data broadcast interval override command Constraints: @@ -155,12 +215,15 @@ @return: 1 if successful, zero otherwise """ + if not check_broadcast_interval_override_ms(ms): + return False + rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, - message_id=MsgIds.MSG_ID_DG_AIR_TRAP_DATA_BROADCAST_INTERVAL_OVERRIDE.value, + message_id=MsgIds.MSG_ID_HD_AIR_TRAP_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override HD air trap data broadcast interval") @@ -175,7 +238,7 @@ else: str_res = str(ms) + " ms: " self.logger.debug("Air trap data broadcast interval overridden to " + str_res + - str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: