########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file pressure_sensors.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Sean # @date (original) 14-Apr-2020 # ############################################################################ # Module imports from logging import Logger # Project imports from leahi_dialin.common import dd_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_override from leahi_dialin.protocols.CAN import CanMessenger, CanChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray, byte_to_bytearray class DDRecords(AbstractSubSystem): """ DD interface containing pressure related commands. """ def __init__(self, can_interface: CanMessenger, logger: Logger): """ @param can_interface: The CanMessenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SYSTEM_RECORD.value, function = self._handler_system_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SERVICE_RECORD.value, function = self._handler_service_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_CALIBRATION_RECORD.value, function = self._handler_calibration_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_INSTITUTIONAL_RECORD.value, function = self._handler_institutional_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_USAGE_INFO_RECORD.value, function = self._handler_usage_info_record_sync) self.system_records_timestamp = 0 #: The timestamp of the latest System Records message self.service_records_timestamp = 0 #: The timestamp of the latest Service Records message self.calibration_records_timestamp = 0 #: The timestamp of the latest Calibration Records message self.institutional_records_timestamp = 0 #: The timestamp of the latest Institutional Records message self.usage_info_records_timestamp = 0 #: The timestamp of the latest Usage Information Records message self.system_records = { } #: The System Records data in dictionary format self.service_records = { } #: The Service Records data in dictionary format self.calibration_records = { } #: The Calibration Records data in dictionary format self.institutional_records = { } #: The Institutional Records data in dictionary format self.usage_info_records = { } #: The Usage Information Records data in dictionary format self.pager_system_record = 0 self.pager_service_record = 0 self.pager_calibration_record = 0 self.pager_institutional_record = 0 self.pager_usage_info_record = 0 for sys_record in dd_enum_repository.SystemRecordFields: self.system_records[sys_record.name] = None for serv_record in dd_enum_repository.ServiceRecordFields: self.service_records[serv_record.name] = None for cal_record in dd_enum_repository.CalibrationRecordFields: self.calibration_records[cal_record.name] = None for inst_record in dd_enum_repository.InstitutionalRecordFields: self.institutional_records[inst_record.name] = None for usage_record in dd_enum_repository.UsageInformationRecordFields: self.usage_info_records[usage_record.name] = None @publish(["msg_id_dd_nvm_send_system_record", "system_records", "system_records_timestamp"]) def _handler_system_record_sync(self, message, timestamp = 0.0): """ Handles published DD System Record data messages. DD System Records are captured for reference. @param message: published data message @return: none """ # Get the payload related values msg_list =[] msg_list.append(('current_page', DataTypes.U32)) msg_list.append(('all_pages', DataTypes.U32)) msg_list.append(('payload_count', DataTypes.U32)) result = self.process_into_vars(decoder_list = msg_list, message = message) # Get the record data record_list = [] for i in range(0, result['payload_count']): sys_record = dd_enum_repository.SystemRecordFields(i + self.pager_system_record) if sys_record.multichar_length > 1: name = f'{sys_record.name}_1' else: name = sys_record.name record_list.append((name, sys_record.datatype)) self.process_into_dict(dict_to_update = self.system_records, decoder_list = record_list, message = message, start_from_byte = len(msg_list) * DataTypes.U32.size) # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message if result['current_page'] == result['all_pages']: self.pager_system_record = 0 else: self.pager_system_record =+ result['payload_count'] self.system_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_service_record", "service_records", "service_records_timestamp"]) def _handler_service_record_sync(self, message, timestamp = 0.0): """ Handles published DD Service Record data messages. DD Service Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.ServiceRecordFields, pager_record = self.pager_service_record) # # Get the payload related values # msg_list =[] # msg_list.append(('current_page', DataTypes.U32)) # msg_list.append(('all_pages', DataTypes.U32)) # msg_list.append(('payload_count', DataTypes.U32)) # result = self.process_into_vars(decoder_list = msg_list, # message = message) # # Get the record data # record_list = [] # for i in range(0, result['payload_count']): # record = dd_enum_repository.ServiceRecordFields(i + self.pager_service_record) # record_list.append((record.name, record.datatype)) # self.process_into_dict(dict_to_update = self.service_records, # decoder_list = record_list, # message = message, # start_from_byte = len(msg_list) * DataTypes.U32.size) # # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message # if result['current_page'] == result['all_pages']: # self.pager_service_record = 0 # else: # self.pager_service_record =+ result['payload_count'] self.service_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_calibration_record", "calibration_records", "calibration_records_timestamp"]) def _handler_calibration_record_sync(self, message, timestamp = 0.0): """ Handles published DD Calibration Record data messages. DD Calibration Records are captured for reference. @param message: published data message @return: none """ dd_enum_repository.CalibrationRecordFields _generic_handler(self = self, message = message, service_name = dd_enum_repository.CalibrationRecordFields, pager_record = self.pager_calibration_record) self.calibration_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_institutional_record", "institutional_records", "institutional_records_timestamp"]) def _handler_institutional_record_sync(self, message, timestamp = 0.0): """ Handles published DD Institutional Record data messages. DD Institutional Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.InstitutionalRecordFields, pager_record = self.pager_institutional_record) self.institutional_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_usage_info_record", "usage_info_records", "usage_info_records_timestamp"]) def _handler_usage_info_record_sync(self, message, timestamp = 0.0): """ Handles published DD Usage Information Record data messages. DD Usage Information Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.ServiceRecordFields, pager_record = self.pager_usage_info_record) self.usage_info_records_timestamp = timestamp def cmd_initiate_service_mode(self) -> int: """ Constructs and sends a request to change to Service operation mode via CAN bus. Constraints: Must be logged into DD. Transition from current to requested op mode must be legal. @return: 1 if successful, zero otherwise """ service_mode = dd_enum_repository.DDOpModes.MODE_SERV payload = integer_to_bytearray(service_mode.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_SET_OPERATION_MODE_OVERRIDE_REQUEST, entity_name = 'DD Operation Mode', override_text = f'set to {service_mode.name}', logger = self.logger, can_interface = self.can_interface) def cmd_request_system_records(self) -> int: """ Constructs and sends a request for System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.SYSTEM_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD System Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_service_records(self) -> int: """ Constructs and sends a request for Service Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.SERVICE_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Service Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_calibration_records(self) -> int: """ Constructs and sends a request for Calibration Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.CALIBRATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Calibration Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_institutional_records(self) -> int: """ Constructs and sends a request for Institutional Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.INSTITUTIONAL_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Institutional Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_usage_information_records(self) -> int: """ Constructs and sends a request for Usage Information Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.USAGE_INFORMATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Usage Information Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_set_system_records(self, part_number: chr[10], serial_number: chr[20], manufacturing_location: int, manufacturing_date: int) -> int: """ Constructs and sends a command for setting the System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(1) # Current Page payload += integer_to_bytearray(1) # All Page Count payload += integer_to_bytearray(dd_enum_repository.SystemRecordFields.NUM_OF_SYSTEM_RECORD_FIELDS.value) # Payload count for c in part_number: payload += byte_to_bytearray(c) for c in serial_number: payload += byte_to_bytearray(c) payload += integer_to_bytearray(manufacturing_location) payload += integer_to_bytearray(manufacturing_date) crc = '' payload += integer_to_bytearray(crc) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_SYSTEM_RECORD, entity_name = f'New DD System Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) # ================================================= Private Methods ================================================= def _generic_handler(self: DDRecords, message, service_name, pager_record): # Get the payload related values msg_list =[] msg_list.append(('current_page', DataTypes.U32)) msg_list.append(('all_pages', DataTypes.U32)) msg_list.append(('payload_count', DataTypes.U32)) result = self.process_into_vars(decoder_list = msg_list, message = message) # Get the record data record_list = [] for i in range(0, result['payload_count']): record = service_name(i + pager_record) record_list.append((record.name, record.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message, start_from_byte = len(msg_list) * DataTypes.U32.size) # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message if result['current_page'] == result['all_pages']: pager_record = 0 else: pager_record =+ result['payload_count']