########################################################################### # # Copyright (c) 2022-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file battery.py # # @author (last) Sean Nash # @date (last) 17-Mar-2023 # @author (original) Micahel Garthwaite # @date (original) 28-Mar-2022 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray class HDBattery(AbstractSubSystem): """ Hemodialysis Delivery (HD) Dialin API sub-class for battery subsystem. """ _BATTERY_MAH_2_MWH_FACTOR = 14.7 # < Conversion factor for mAh to mWh. def __init__(self, can_interface, logger: Logger): """ HDBattery constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_BATTERY_MANAGEMENT_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_battery_manager_sync) msg_id = MsgIds.MSG_ID_HD_BATTERY_STATUS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_battery_status_sync) self.hd_battery_management_timestamp = 0.0 self.hd_battery_status_timestamp = 0.0 # values from battery status message self.RemainingCapacity = 0 self.BatteryStatus = 0 self.BatteryChargerStatus = 0 self.BatteryCommStatus = 0 # values from battery manager message self.RemainingCapacityAlarm = 0 self.RemainingTimeAlarm = 0 self.BatteryMode = 0 self.AtRate = 0 self.AtRateTimeToFull = 0 self.AtRateTimeToEmpty = 0 self.AtRateOK = 0 self.Temperature = 0 self.Voltage = 0 self.Current = 0 self.AverageCurrent = 0 self.MaxError = 0 self.RelativeStateOfCharge = 0 self.AbsoluteStateOfCharge = 0 self.FullChargeCapacity = 0 self.RunTimeToEmpty = 0 self.AverageTimeToEmpty = 0 self.AverageTimeToFull = 0 self.ChargingCurrent = 0 self.ChargingVoltage = 0 self.CycleCount = 0 self.DesignCapacity = 0 self.DesignVoltage = 0 self.SpecificationInfo = 0 self.ManufactureDate = 0 self.SerialNumber = 0 self.ManufacturerName = "" self.DeviceName = "" self.DeviceChemistry = "" def get_battery_remaining_capacity(self): """ Gets the remaining capacity @return: battery remaining capacity """ return self.RemainingCapacity @publish(["hd_battery_status_timestamp", 'RemainingCapacity', 'BatteryStatus', 'BatteryChargerStatus', 'BatteryCommStatus']) def _handler_battery_status_sync(self, message, timestamp=0.0): """ Handles published battery data messages. Battery data are captured for reference. @param message: published battery data message @return: none """ self.RemainingCapacity = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.BatteryStatus = struct.unpack('HH', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.BatteryChargerStatus = struct.unpack('HH', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.BatteryCommStatus = struct.unpack('HH', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.hd_battery_status_timestamp = timestamp @publish(['hd_battery_management_timestamp', 'RemainingCapacityAlarm', 'RemainingTimeAlarm', 'BatteryMode', 'AtRate', 'AtRateTimeToFull', 'AtRateTimeToEmpty', 'AtRateOK', 'Temperature', 'Voltage', 'Current', 'AverageCurrent', 'MaxError', 'RelativeStateOfCharge', 'AbsoluteStateOfCharge', 'FullChargeCapacity', 'RunTimeToEmpty', 'AverageTimeToEmpty', 'AverageTimeToFull', 'ChargingCurrent', 'ChargingVoltage', 'CycleCount', 'DesignCapacity', 'DesignVoltage', 'SpecificationInfo', 'ManufactureDate', 'SerialNumber', 'ManufacturerName', 'DeviceName', 'DeviceChemistry']) def _handler_battery_manager_sync(self, message, timestamp=0.0): """ Handles published battery data messages. Battery data are captured for reference. @param message: published battery data message @return: none """ self.RemainingCapacityAlarm = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.RemainingTimeAlarm = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.BatteryMode = struct.unpack('HH', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.AtRate = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.AtRateTimeToFull = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.AtRateTimeToEmpty = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.AtRateOK = struct.unpack('HH', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.Temperature = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.Voltage = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.Current = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.AverageCurrent = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11]))[0] self.MaxError = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_12:MsgFieldPositions.END_POS_FIELD_12]))[0] self.RelativeStateOfCharge = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_13:MsgFieldPositions.END_POS_FIELD_13]))[0] self.AbsoluteStateOfCharge = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_14:MsgFieldPositions.END_POS_FIELD_14]))[0] self.FullChargeCapacity = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_15:MsgFieldPositions.END_POS_FIELD_15]))[0] self.RunTimeToEmpty = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_16:MsgFieldPositions.END_POS_FIELD_16]))[0] self.AverageTimeToEmpty = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_17:MsgFieldPositions.END_POS_FIELD_17]))[0] self.AverageTimeToFull = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_18:MsgFieldPositions.END_POS_FIELD_18]))[0] self.ChargingCurrent = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_19:MsgFieldPositions.END_POS_FIELD_19]))[0] self.ChargingVoltage = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_20:MsgFieldPositions.END_POS_FIELD_20]))[0] self.CycleCount = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_21:MsgFieldPositions.END_POS_FIELD_21]))[0] self.DesignCapacity = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_22:MsgFieldPositions.END_POS_FIELD_22]))[0] self.DesignVoltage = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_23:MsgFieldPositions.END_POS_FIELD_23]))[0] self.SpecificationInfo = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_24:MsgFieldPositions.END_POS_FIELD_24]))[0] self.ManufactureDate = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_25:MsgFieldPositions.END_POS_FIELD_25]))[0] self.SerialNumber = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_26:MsgFieldPositions.END_POS_FIELD_26]))[0] self.ManufacturerName = struct.unpack('s', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_27:MsgFieldPositions.END_POS_FIELD_27]))[0] self.DeviceName = struct.unpack('s', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_28:MsgFieldPositions.END_POS_FIELD_28]))[0] self.DeviceChemistry = struct.unpack('s', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_29:MsgFieldPositions.END_POS_FIELD_29]))[0] self.hd_battery_management_timestamp = timestamp def cmd_battery_remaining_capacity_override(self, mWh: float, reset: int = NO_RESET) -> int: """ Constructs and sends the battery remaining capacity (in mWh) override \n command. Constraints: Must be logged into HD. @param mWh: float - remaining mWh of power in the HD battery to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) pwr = float_to_bytearray(mWh) payload = rst + pwr message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BATTERY_REMAINING_CAP_MWH_OVERRIDE.value, payload=payload) self.logger.debug("override HD battery remaining capacity") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(mWh) self.logger.debug("HD battery remaining capacity overridden to " + str_res + " mWh: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_battery_status_override(self, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the battery remaining capacity (in mWh) override \n command. BATTERY_PACK_ERROR_BITS = 0x000F Constraints: Must be logged into HD. @param status: int - the battery comm status as a bit map. @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BATTERY_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD battery status") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(sts) self.logger.debug("HD battery status overridden to " + str_res + " " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_battery_charger_status_override(self, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the battery charger status override command. Setting BATTERY_CHARGER_STATUS_AC_PRESENT_MASK to 0 causes the HD to assume battery power. Constraints: Must be logged into HD. @param status: int - the bit map for the charger status register. BATTERY_CHARGER_STATUS_AC_PRESENT_MASK = 0x8000 @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BATTERY_CHARGER_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD battery charger status.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(sts) self.logger.debug("HD battery charger status overridden to " + str_res + " " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_battery_comm_status_override(self, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the battery comm status override command. \n This overrides the I2C status register. \n Constraints: Must be logged into HD. @param status: int - the bit map for the i2c status register. I2C_AL = 0x0001, /* arbitration lost */ I2C_NACK = 0x0002, /* no acknowledgement */ @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BATTERY_COMM_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD battery comm status.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(sts) self.logger.debug("HD battery comm status overridden to " + str_res + " " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False