########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file voltages.py # # @author (last) Quang Nguyen # @date (last) 30-Aug-2021 # @author (original) Sean Nash # @date (original) 15-Apr-2021 # ############################################################################ import struct from enum import unique from logging import Logger from .constants import RESET, NO_RESET from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.conversions import integer_to_bytearray, float_to_bytearray from ..utils.checks import check_broadcast_interval_override_ms from ..common.msg_defs import MsgIds, MsgFieldPositions # Monitored voltages @unique class HDMonitoredVoltages(DialinEnum): MONITORED_LINE_1_2V = 0 # Processor voltage (1.2V) MONITORED_LINE_3_3V = 1 # Logic voltage (3.3V) MONITORED_LINE_5V_LOGIC = 2 # Logic voltage (5V) MONITORED_LINE_5V_SENSORS = 3 # Sensors voltage (5V) MONITORED_LINE_24V = 4 # Actuators voltage (24V) MONITORED_LINE_24V_REGEN = 5 # Actuators regen voltage (24V) MONITORED_LINE_FPGA_REF_V = 6 # FPGA ADC reference voltage (1V) MONITORED_LINE_PBA_REF_V = 7 # PBA ADC reference voltage (3V) MONITORED_LINE_FPGA_VCC_V = 8 # FPGA Vcc (3V) MONITORED_LINE_FPGA_AUX_V = 9 # FPGA Vaux (3V) MONITORED_LINE_FPGA_PVN_V = 10 # FPGA Vpvn (1V) NUM_OF_MONITORED_VOLTAGE_LINES = 11 # Number of HD operation modes class HDVoltages(AbstractSubSystem): """ Hemodialysis Delivery (HD) Dialin API sub-class for voltage monitor related commands and data. """ def __init__(self, can_interface, logger: Logger): """ HDVoltages constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_VOLTAGES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_monitored_voltages_sync) self.monitored_voltages = [0.0] * HDMonitoredVoltages.NUM_OF_MONITORED_VOLTAGE_LINES.value def get_monitored_voltages(self): """ Gets all HD monitored voltages @return: List of voltages of size NUM_OF_MONITORED_VOLTAGE_LINES """ return self.monitored_voltages @publish(["monitored_voltages"]) def _handler_monitored_voltages_sync(self, message): """ Handles published HD monitored voltages data messages. Voltage data are captured for reference. @param message: published monitored voltages data message @return: none """ v12 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) v33 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) v5l = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) v5s = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) v24 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) v24g = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) vfr = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) vpr = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) vfc = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9])) vfa = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10])) vfp = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11])) self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_1_2V.value] = v12[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_3_3V.value] = v33[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_5V_LOGIC.value] = v5l[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_5V_SENSORS.value] = v5s[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V.value] = v24[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V_REGEN.value] = v24g[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_FPGA_REF_V.value] = vfr[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_PBA_REF_V.value] = vpr[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_FPGA_VCC_V.value] = vfc[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_FPGA_AUX_V.value] = vfa[0] self.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_FPGA_PVN_V.value] = vfp[0] def cmd_monitored_voltage_override(self, signal: int = 0, volts: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the HD monitored voltage override command Constraints: Must be logged into HD. Given signal must be valid member of HDMonitoredVoltages enum @param signal: integer - ID of signal to override @param volts: float - value (in volts) to override signal with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vlt = float_to_bytearray(volts) idx = integer_to_bytearray(signal) payload = rst + vlt + idx message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_MONITORED_VOLTAGES_OVERRIDE.value, payload=payload) self.logger.debug("override monitored HD voltage for signal " + str(signal)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(volts) + " V. " self.logger.debug("Monitored HD voltage overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_monitored_voltages_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET) -> int: """ Constructs and sends the monitored HD voltages broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_MONITORED_VOLTAGES_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override monitored HD voltages broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("HD monitored voltages broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False