########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file scheduled_runs_record.py # # @author (last) Dara Navaei # @date (last) 07-Nov-2022 # @author (original) Dara Navaei # @date (original) 12-Feb-2021 # ############################################################################ import struct import time import math from collections import OrderedDict from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.nv_ops_utils import NVOpsUtils from logging import Logger class DGScheduledRunsNVRecord(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for scheduled runs commands. """ SCHEDULED_RUNS_RECORD_START_INDEX = 6 SCHEDULED_RECORD_SPECS_BYTES = 12 SCHEDULED_RECORDS_SPECS_BYTE_ARRAY = 3 MAX_PART_NUMBER_BYTES = 10 MAX_SERIAL_NUMBER_BYTES = 15 DEFAULT_SCHEDULED_RUNS_DATE_VALUE = 0 DEFAULT_SCHEDULED_RUNS_CRC_VALUE = 0 DEFAULT_SCHEDULED_RUNS_PADDING_VALUE = 0 CURRENT_MESSAGE_NUM_INDEX = 0 TOTAL_MESSAGES_NUM_INDEX = 4 PAYLOAD_LENGTH_INDEX = 8 PAYLOAD_START_INDEX = 12 SCHEDULED_RUNS_DATA_TYPE_INDEX = 0 SCHEDULED_RUNS_VALUE_INDEX = 1 TARGET_BYTES_TO_SEND_TO_FW = 150 MIN_PAYLOAD_BYTES_SPACE = 4 RTC_RAM_MAX_BYTES_TO_WRITE = 64 PAYLOAD_CURRENT_MSG_INDEX = 0 PAYLOAD_TOTAL_MSG_INDEX = 1 PAYLOAD_SCHEDULED_RUNS_BYTES_INDEX = 2 # Delay in between each payload transfer _PAYLOAD_TRANSFER_DELAY_S = 0.2 _DIALIN_RECORD_UPDATE_DELAY_S = 0.2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self._current_message = 0 self._total_messages = 0 self._received_msg_length = 0 self._is_getting_runs_in_progress = False self._write_fw_data_to_excel = True self._cal_data = 0 self._raw_scheduled_runs_record = [] self._utilities = NVOpsUtils(logger=self.logger) # DG scheduled runs main record as an ordered dictionary self.dg_scheduled_runs_record = self._prepare_dg_scheduled_runs_record() if self.can_interface is not None: channel_id = DenaliChannels.dg_to_dialin_ch_id msg_id = MsgIds.MSG_ID_DG_SEND_SCHEDULED_RUNS_RECORD.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_dg_scheduled_runs_record) def cmd_reset_dg_calibration_record(self) -> bool: """ Handles resetting DG scheduled services record. @return: True if successful, False otherwise """ self._prepare_dg_scheduled_runs_record() print(self.dg_scheduled_runs_record) self.dg_scheduled_runs_record = self._utilities.reset_fw_record(self.dg_scheduled_runs_record) status = self.cmd_set_dg_scheduled_runs_record(self.dg_scheduled_runs_record) return status def cmd_request_dg_scheduled_runs_record(self) -> int: """ Handles getting DG scheduled runs record from firmware. @return: 1 upon success, False otherwise """ if self._is_getting_runs_in_progress is not True: self._is_getting_runs_in_progress = True # Clear the list for the next call self._raw_scheduled_runs_record.clear() message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_GET_SCHEDULED_RUNS_RECORD.value) self.logger.debug('Getting DG scheduled runs record') received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False self.logger.debug("Request cancelled: an existing request is in progress.") return False def _handler_dg_scheduled_runs_record(self, message): """ Handles published DG scheduled runs record messages. DG scheduled runs records are captured for processing and updating the DG scheduled runs record. @param message: published DG scheduled runs record data message @return: None """ curr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] total = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] length = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self._current_message = curr self._total_messages = total self._received_msg_length = length # The end of scheduled runs record payload is from the start index + 12 bytes for the current message + total # messages + the length of scheduled runs record. The rest is the CAN messaging CRC and is not # needed. end_of_data_index = self.SCHEDULED_RUNS_RECORD_START_INDEX + self.SCHEDULED_RECORD_SPECS_BYTES + \ self._received_msg_length # Get the scheduled runs data only self._cal_data = message['message'][self.SCHEDULED_RUNS_RECORD_START_INDEX:end_of_data_index] # Continue getting calibration_record records until the all the messages are received. Concatenate the # calibration_record records to each other if self._current_message <= self._total_messages: self._raw_scheduled_runs_record += (message['message'][self.SCHEDULED_RUNS_RECORD_START_INDEX + self.SCHEDULED_RECORD_SPECS_BYTES:end_of_data_index]) if self._current_message == self._total_messages: # Done receiving the messages self._is_getting_runs_in_progress = False # If all the messages have been received, call another function to process the raw data self._update_dg_scheduled_record_from_fw() self._handler_received_complete_dg_scheduled_runs_record() @publish(["dg_scheduled_runs_record"]) def _handler_received_complete_dg_scheduled_runs_record(self): """ Publishes the received scheduled runs record @return: None """ self.logger.debug("Received a complete dg scheduled runs record.") def _update_dg_scheduled_record_from_fw(self): """ Handles parsing the scheduled runs messages that were received from DG firmware. @return: None """ raw_payload_temp_start_index = 0 # Convert the concatenated raw data into a byte array since the struct library requires byte arrays. self._raw_scheduled_runs_record = bytearray(self._raw_scheduled_runs_record) # Loop through the keys for the main calibration_record dictionary # DG_Calibration : {pressure_sensors : { ppi : { gain: [' bool: """ Handles updating the DG system record with the newest calibration_record data of a hardware and sends it to FW. @return: True upon success, False otherwise """ record_packets = self._utilities.prepare_record_to_send_to_fw(dg_scheduled_runs_record) self.logger.debug('Setting DG scheduled runs started') # Update all the data packets with the last message count since is the number of messages that firmware # should receive for packet in record_packets: # Sleep to let the firmware receive and process the data time.sleep(self._PAYLOAD_TRANSFER_DELAY_S) # Convert the list packet to a bytearray payload = b''.join(packet) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SET_SCHEDULED_RUNS_RECORD.value, payload=payload) received_message = self.can_interface.send(message) # If there is no content... if received_message is None: self.logger.debug("Timeout!!!!") return False self.logger.debug("Finished sending DG scheduled runs record.") return True