########################################################################### # # Copyright (c) 2021-2022 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file voltages.py # # @author (last) Quang Nguyen # @date (last) 30-Aug-2021 # @author (original) Sean Nash # @date (original) 22-Apr-2021 # ############################################################################ import struct from enum import unique from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.base import DialinEnum from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray # Monitored voltages @unique class DGMonitoredVoltages(DialinEnum): MONITORED_LINE_24V_MAIN = 0 # Main voltage (24V) MONITORED_LINE_1_8V_FPGA = 1 # FPGA logic voltage (1.8V) MONITORED_LINE_1V_FPGA = 2 # FPGA reference voltage (1V) MONITORED_LINE_3_3V_SENSORS = 3 # Sensors voltage (3.3V) MONITORED_LINE_1_8V_PROC = 4 # Processor voltage (1.8V) MONITORED_LINE_5V_SENSORS = 5 # Sensors voltage (5V) MONITORED_LINE_5V_LOGIC = 6 # Logic voltage (5V) MONITORED_LINE_3_3V = 7 # Logic voltage (3.3V) MONITORED_LINE_1_2V_PROC = 8 # Processor voltage (1.2V) MONITORED_LINE_V_REF = 9 # Reference voltage (3V) MONITORED_LINE_EXT_ADC_1_REF_V = 10 # External ADC 1 reference voltage (3V) MONITORED_LINE_EXT_ADC_2_REF_V = 11 # External ADC 2 reference voltage (3V) MONITORED_LINE_PS_GATE_DRIVER_V = 12 # P/S gate driver (5V) MONITORED_LINE_24V_PRIM_HTR_V = 13 # Primary heater (0..24V) MONITORED_LINE_24V_PRIM_HTR_GND_V = 14 # Primary heater ground (0..24V) MONITORED_LINE_24V_SEC_HTR_V = 15 # Secondary element voltage (0..24V) MONITORED_LINE_24V_TRIM_HTR_V = 16 # Trimmer heater voltage (0..24V) NUM_OF_MONITORED_LINES = 17 # Number of monitored voltages class DGVoltages(AbstractSubSystem): """ Hemodialysis Delivery (DG) Dialin API sub-class for voltage monitor related commands and data. """ def __init__(self, can_interface, logger: Logger): """ DGVoltages constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_VOLTAGES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_monitored_voltages_sync) self.monitored_voltages = [0.0] * DGMonitoredVoltages.NUM_OF_MONITORED_LINES.value def get_monitored_voltages(self): """ Gets all DG monitored voltages @return: List of voltages of size NUM_OF_MONITORED_VOLTAGE_LINES """ return self.monitored_voltages @publish([ "monitored_voltages" ]) def _handler_monitored_voltages_sync(self, message): """ Handles published DG monitored voltages data messages. Voltage data are captured for reference. @param message: published monitored voltages data message @return: none """ v1 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) v12 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) v18p = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) v18f = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) v3r = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) ve1 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) ve2 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) v33 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) v33s = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9])) v5l = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10])) v5s = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11])) v5g = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_12:MsgFieldPositions.END_POS_FIELD_12])) v24 = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_13:MsgFieldPositions.END_POS_FIELD_13])) v24p = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_14:MsgFieldPositions.END_POS_FIELD_14])) v24pg = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_15:MsgFieldPositions.END_POS_FIELD_15])) v24s = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_16:MsgFieldPositions.END_POS_FIELD_16])) v24t = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_17:MsgFieldPositions.END_POS_FIELD_17])) self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_1V_FPGA.value] = v1[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_1_2V_PROC.value] = v12[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_1_8V_PROC.value] = v18p[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_1_8V_FPGA.value] = v18f[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_V_REF.value] = v3r[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_EXT_ADC_1_REF_V.value] = ve1[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_EXT_ADC_2_REF_V.value] = ve2[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_3_3V.value] = v33[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_3_3V_SENSORS.value] = v33s[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_5V_LOGIC.value] = v5l[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_5V_SENSORS.value] = v5s[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_PS_GATE_DRIVER_V.value] = v5g[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_MAIN.value] = v24[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_PRIM_HTR_V.value] = v24p[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_PRIM_HTR_GND_V.value] = v24pg[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_SEC_HTR_V.value] = v24s[0] self.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_TRIM_HTR_V.value] = v24t[0] def cmd_monitored_voltage_override(self, signal: int = 0, volts: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the DG monitored voltage override command Constraints: Must be logged into DG. Given signal must be valid member of DGMonitoredVoltages enum @param signal: integer - ID of signal to override @param volts: float - value (in volts) to override signal with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vlt = float_to_bytearray(volts) idx = integer_to_bytearray(signal) payload = rst + vlt + idx message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_MONITORED_VOLTAGES_OVERRIDE.value, payload=payload) self.logger.debug("override monitored DG voltage for signal " + str(signal)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(volts) + " V. " self.logger.debug("Monitored DG voltage overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_monitored_voltages_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET) -> int: """ Constructs and sends the monitored DG voltages broadcast interval override command Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_MONITORED_VOLTAGES_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override monitored DG voltages broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("DG monitored voltages broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False