########################################################################### # # Copyright (c) 2022-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file battery.py # # @author (last) Dara Navaei # @date (last) 08-Dec-2022 # @author (original) Steve Jarpe # @date (original) 27-Sep-2022 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray class HDBattery(AbstractSubSystem): """ Hemodialysis Delivery (HD) Dialin API sub-class for battery subsystem. """ _BATTERY_MAH_2_MWH_FACTOR = 14.7 # < Conversion factor for mAh to mWh. def __init__(self, can_interface, logger: Logger): """ HDBattery constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_BATTERY_MANAGEMENT_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_battery_manager_sync) msg_id = MsgIds.MSG_ID_HD_BATTERY_STATUS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_battery_status_sync) # values from battery status message self.RemainingCapacity = 0 self.BatteryStatus = 0 self.BatteryChargerStatus = 0 # values from battery manager message self.RemainingCapacityAlarm = 0 self.RemainingTimeAlarm = 0 self.BatteryMode = 0 self.AtRate = 0 self.AtRateTimeToFull = 0 self.AtRateTimeToEmpty = 0 self.AtRateOK = 0 self.Temperature = 0 self.Voltage = 0 self.Current = 0 self.AverageCurrent = 0 self.MaxError = 0 self.RelativeStateOfCharge = 0 self.AbsoluteStateOfCharge = 0 self.FullChargeCapacity = 0 self.RunTimeToEmpty = 0 self.AverageTimeToEmpty = 0 self.AverageTimeToFull = 0 self.ChargingCurrent = 0 self.ChargingVoltage = 0 self.CycleCount = 0 self.DesignCapacity = 0 self.DesignVoltage = 0 self.SpecificationInfo = 0 self.ManufactureDate = 0 self.SerialNumber = 0 self.ManufacturerName = "" self.DeviceName = "" self.DeviceChemistry = "" self.status_values = [ [self.RemainingCapacity, "RemainingCapacity", "mah", "mWh"], [self.BatteryStatus, "BatteryStatus", "bits", None], [self.BatteryChargerStatus, "BatteryChargerStatus", "bits", None]] self.manager_values = [ [self.RemainingCapacityAlarm, "RemainingCapacityAlarm", "int", "mAh"], [self.RemainingTimeAlarm, "RemainingTimeAlarm", "int", "minutes"], [self.BatteryMode, "BatteryMode", "bits", None], [self.AtRate, "AtRate", "int", "mA"], [self.AtRateTimeToFull, "AtRateTimeToFull", "int", "minutes"], [self.AtRateTimeToEmpty, "AtRateTimeToEmpty", "int", "minutes"], [self.AtRateOK, "AtRateOK", "bool", None], [self.Temperature, "Temperature", "temperature", "degC"], [self.Voltage, "Voltage", "int", "mV"], [self.Current, "Current", "int", "mA"], [self.AverageCurrent, "AverageCurrent", "int", "mA"], [self.MaxError, "MaxError", "int", "percent"], [self.RelativeStateOfCharge, "RelativeStateOfCharge", "int", "percent"], [self.AbsoluteStateOfCharge, "AbsoluteStateOfCharge", "int", "percent"], [self.FullChargeCapacity, "FullChargeCapacity", "int", "mAh"], [self.RunTimeToEmpty, "RunTimeToEmpty", "int", "minutes"], [self.AverageTimeToEmpty, "AverageTimeToEmpty", "int", "minutes"], [self.AverageTimeToFull, "AverageTimeToFull", "int", "minutes"], [self.ChargingCurrent, "ChargingCurrent", "int", "mA"], [self.ChargingVoltage, "ChargingVoltage", "int", "mV"], [self.CycleCount, "CycleCount", "int", "percent"], [self.DesignCapacity, "DesignCapacity", "int", "mAh"], [self.DesignVoltage, "DesignVoltage", "int", "mV"], [self.SpecificationInfo, "SpecificationInfo", "int", None], [self.ManufactureDate, "ManufactureDate", "int", "percent"], [self.SerialNumber, "SerialNumber", "int", None], [self.ManufacturerName, "ManufacturerName", "string", None], [self.DeviceName, "DeviceName", "string", None], [self.DeviceChemistry, "DeviceChemistry", "string", None]] def get_battery_status_values(self): """ Gets the battery status values @return: battery status values """ return self.status_values def get_battery_remaining_capacity(self): """ Gets the remaining capacity @return: battery remaining capacity """ return self.status_values[0][0] def get_battery_manager_values(self): """ Gets the battery manager values @return: battery manager values """ return self.manager_values @publish(['RemainingCapacity', 'BatteryStatus', 'BatteryChargerStatus']) def _handler_battery_status_sync(self, message): """ Handles published battery data messages. Battery data are captured for reference. @param message: published battery data message @return: none """ # first unpack all of the values into byte arrays index = 6 field_size = 4 # 32 bit unsigned integers for value in self.status_values: value[0] = bytearray(message['message'][index:index+field_size]) index += field_size # parse according to type of data for value in self.status_values: if value[2] == "int": value[0] = struct.unpack('HH', value[0])[0] if value[2] == "mah": temp = struct.unpack('I', value[0])[0] value[0] = temp elif value[2] == "string": value[0] = struct.unpack('s', value[0]) elif value[2] == "temperature": temp = struct.unpack('HH', value[0])[0] value[0] = temp / 10.0 elif value[2] == "bool": temp = struct.unpack('HH', value[0])[0] value[0] = "False" if temp > 0: value[0] = "True" elif value[2] == "bits": # convert integer to string representing bits temp = struct.unpack('HH', value[0])[0] value[0] = format(temp, 'b') @publish(['RemainingCapacityAlarm', 'RemainingTimeAlarm', 'BatteryMode', 'AtRate', 'AtRateTimeToFull', 'AtRateTimeToEmpty', 'AtRateOK', 'Temperature', 'Voltage', 'Current', 'AverageCurrent', 'MaxError', 'RelativeStateOfCharge', 'AbsoluteStateOfCharge', 'FullChargeCapacity', 'RunTimeToEmpty', 'AverageTimeToEmpty', 'AverageTimeToFull', 'ChargingCurrent', 'ChargingVoltage', 'CycleCount', 'DesignCapacity', 'DesignVoltage', 'SpecificationInfo', 'ManufactureDate', 'SerialNumber', 'ManufacturerName', 'DeviceName', 'DeviceChemistry']) def _handler_battery_manager_sync(self, message): """ Handles published battery data messages. Battery data are captured for reference. @param message: published battery data message @return: none """ # first unpack all of the values into byte arrays index = 6 field_size = 4 # 32 bit unsigned integers for value in self.manager_values: value[0] = bytearray(message['message'][index:index+field_size]) index += field_size # parse according to type of data for value in self.manager_values: if value[2] == "int": value[0] = struct.unpack('HH', value[0])[0] if value[2] == "mah": temp = struct.unpack('HH', value[0])[0] value[0] = temp * self._BATTERY_MAH_2_MWH_FACTOR elif value[2] == "string": value[0] = struct.unpack('s', value[0]) elif value[2] == "temperature": temp = struct.unpack('HH', value[0])[0] value[0] = temp / 10.0 elif value[2] == "bool": temp = struct.unpack('HH', value[0])[0] value[0] = "False" if temp > 0: value[0] = "True" elif value[2] == "bits": # convert integer to string representing bits temp = struct.unpack('HH', value[0])[0] value[0] = format(temp, 'b') def cmd_battery_remaining_capacity_override(self, mWh: float, reset: int = NO_RESET) -> int: """ Constructs and sends the battery remaining capacity (in mWh) override \n command. Constraints: Must be logged into HD. @param mWh: float - remaining mWh of power in the HD battery to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) pwr = float_to_bytearray(mWh) payload = rst + pwr message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BATTERY_REMAINING_CAP_MWH_OVERRIDE.value, payload=payload) self.logger.debug("override HD battery remaining capacity") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(mWh) self.logger.debug("HD battery remaining capacity overridden to " + str_res + " mWh: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False