########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file pressure_sensors.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Sean # @date (original) 14-Apr-2020 # ############################################################################ # Module imports from logging import Logger from time import sleep # Project imports from leahi_dialin.common import dd_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_override from leahi_dialin.protocols.CAN import CanMessenger, CanChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.enums import DialinEnum from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray, byte_to_bytearray class DDRecords(AbstractSubSystem): """ DD interface containing pressure related commands. """ def __init__(self, can_interface: CanMessenger, logger: Logger): """ @param can_interface: The CanMessenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SYSTEM_RECORD.value, function = self._handler_system_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SERVICE_RECORD.value, function = self._handler_service_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_CALIBRATION_RECORD.value, function = self._handler_calibration_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_INSTITUTIONAL_RECORD.value, function = self._handler_institutional_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_USAGE_INFO_RECORD.value, function = self._handler_usage_info_record_sync) self.system_records_timestamp = 0 #: The timestamp of the latest System Records message self.service_records_timestamp = 0 #: The timestamp of the latest Service Records message self.calibration_records_timestamp = 0 #: The timestamp of the latest Calibration Records message self.institutional_records_timestamp = 0 #: The timestamp of the latest Institutional Records message self.usage_info_records_timestamp = 0 #: The timestamp of the latest Usage Information Records message self.system_records = { } #: The System Records data in dictionary format self.service_records = { } #: The Service Records data in dictionary format self.calibration_records = { } #: The Calibration Records data in dictionary format self.institutional_records = { } #: The Institutional Records data in dictionary format self.usage_info_records = { } #: The Usage Information Records data in dictionary format self.pager_system_record = 0 self.pager_service_record = 0 self.pager_calibration_record = 0 self.pager_institutional_record = 0 self.pager_usage_info_record = 0 for sys_record in dd_enum_repository.SystemRecordFields: self.system_records[sys_record.name] = None for serv_record in dd_enum_repository.ServiceRecordFields: self.service_records[serv_record.name] = None for cal_record in dd_enum_repository.CalibrationRecordFields: self.calibration_records[cal_record.name] = None for inst_record in dd_enum_repository.InstitutionalRecordFields: self.institutional_records[inst_record.name] = None for usage_record in dd_enum_repository.UsageInformationRecordFields: self.usage_info_records[usage_record.name] = None @publish(["msg_id_dd_nvm_send_system_record", "system_records", "system_records_timestamp"]) def _handler_system_record_sync(self, message, timestamp = 0.0): """ Handles published DD System Record data messages. DD System Records are captured for reference. @param message: published data message @return: none """ # Get the payload related values msg_list =[] msg_list.append(('current_page', DataTypes.U32)) msg_list.append(('all_pages', DataTypes.U32)) msg_list.append(('payload_count', DataTypes.U32)) result = self.process_into_vars(decoder_list = msg_list, message = message) # Get the record data record_list = [] for i in range(0, result['payload_count']): sys_record = dd_enum_repository.SystemRecordFields(i + self.pager_system_record) if sys_record.multichar_length > 1: for j in range(0, sys_record.multichar_length - 1): name = f'{sys_record.name}_{j}' record_list.append((name, sys_record.datatype)) else: name = sys_record.name record_list.append((name, sys_record.datatype)) result_payload = self.process_into_vars(decoder_list = record_list, message = message, start_from_byte = len(msg_list) * DataTypes.U32.size) # Saving data into System Records self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name] = '' self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name] = '' self.system_records[dd_enum_repository.SystemRecordFields.MFG_LOCATION.name] = 0 self.system_records[dd_enum_repository.SystemRecordFields.MFG_DATE.name] = 0 self.system_records[dd_enum_repository.SystemRecordFields.CRC.name] = 0 for key in result_payload: if key.startswith(dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name): value = result_payload[key] self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name] += value elif key.startswith(dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name): value = result_payload[key] self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name] += value else: self.system_records[key] = result_payload[key] # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message if result['current_page'] == result['all_pages']: self.pager_system_record = 0 else: self.pager_system_record =+ result['payload_count'] self.system_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_service_record", "service_records", "service_records_timestamp"]) def _handler_service_record_sync(self, message, timestamp = 0.0): """ Handles published DD Service Record data messages. DD Service Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.ServiceRecordFields, pager_record = self.pager_service_record) # # Get the payload related values # msg_list =[] # msg_list.append(('current_page', DataTypes.U32)) # msg_list.append(('all_pages', DataTypes.U32)) # msg_list.append(('payload_count', DataTypes.U32)) # result = self.process_into_vars(decoder_list = msg_list, # message = message) # # Get the record data # record_list = [] # for i in range(0, result['payload_count']): # record = dd_enum_repository.ServiceRecordFields(i + self.pager_service_record) # record_list.append((record.name, record.datatype)) # self.process_into_dict(dict_to_update = self.service_records, # decoder_list = record_list, # message = message, # start_from_byte = len(msg_list) * DataTypes.U32.size) # # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message # if result['current_page'] == result['all_pages']: # self.pager_service_record = 0 # else: # self.pager_service_record =+ result['payload_count'] self.service_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_calibration_record", "calibration_records", "calibration_records_timestamp"]) def _handler_calibration_record_sync(self, message, timestamp = 0.0): """ Handles published DD Calibration Record data messages. DD Calibration Records are captured for reference. @param message: published data message @return: none """ dd_enum_repository.CalibrationRecordFields _generic_handler(self = self, message = message, service_name = dd_enum_repository.CalibrationRecordFields, pager_record = self.pager_calibration_record) self.calibration_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_institutional_record", "institutional_records", "institutional_records_timestamp"]) def _handler_institutional_record_sync(self, message, timestamp = 0.0): """ Handles published DD Institutional Record data messages. DD Institutional Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.InstitutionalRecordFields, pager_record = self.pager_institutional_record) self.institutional_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_usage_info_record", "usage_info_records", "usage_info_records_timestamp"]) def _handler_usage_info_record_sync(self, message, timestamp = 0.0): """ Handles published DD Usage Information Record data messages. DD Usage Information Records are captured for reference. @param message: published data message @return: none """ _generic_handler(self = self, message = message, service_name = dd_enum_repository.ServiceRecordFields, pager_record = self.pager_usage_info_record) self.usage_info_records_timestamp = timestamp def cmd_initiate_service_mode(self) -> int: """ Constructs and sends a request to change to Service operation mode via CAN bus. Constraints: Must be logged into DD. Transition from current to requested op mode must be legal. @return: 1 if successful, zero otherwise """ service_mode = dd_enum_repository.DDOpModes.MODE_SERV payload = integer_to_bytearray(service_mode.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_SET_OPERATION_MODE_OVERRIDE_REQUEST, entity_name = 'DD Operation Mode', override_text = f'set to {service_mode.name}', logger = self.logger, can_interface = self.can_interface) def cmd_request_system_records(self) -> int: """ Constructs and sends a request for System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.SYSTEM_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD System Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_service_records(self) -> int: """ Constructs and sends a request for Service Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.SERVICE_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Service Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_calibration_records(self) -> int: """ Constructs and sends a request for Calibration Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.CALIBRATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Calibration Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_institutional_records(self) -> int: """ Constructs and sends a request for Institutional Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.INSTITUTIONAL_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Institutional Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_usage_information_records(self) -> int: """ Constructs and sends a request for Usage Information Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(dd_enum_repository.RecordTypes.USAGE_INFORMATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Usage Information Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_set_system_records(self, part_number: chr[10], serial_number: chr[20], manufacturing_location: int, manufacturing_date: int) -> int: """ Constructs and sends a command for setting the System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload_count = 0 for e in dd_enum_repository.SystemRecordFields: if e != dd_enum_repository.SystemRecordFields.NUM_OF_SYSTEM_RECORD_FIELDS: payload_count += e.datatype().size() * e.multichar_length() payload_base = integer_to_bytearray(1) # Current Page payload_base += integer_to_bytearray(1) # All Page Count payload_base += integer_to_bytearray(payload_count) # Payload count in bytes payload = b'' for c in part_number: payload += byte_to_bytearray(c) for c in serial_number: payload += byte_to_bytearray(c) payload += integer_to_bytearray(manufacturing_location) payload += integer_to_bytearray(manufacturing_date) crc = crc16_modbus(payload) payload += integer_to_bytearray(crc) total_payload = payload_base + payload return cmd_generic_override( payload = total_payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_SYSTEM_RECORD, entity_name = f'New DD System Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_calibration_records(self, calibration_records: dict) -> int: """ Constructs and sends a command for setting the Calibration Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ MAX_MESSAGE_SIZE = 256 class CalibrationComponents(DialinEnum): PRES_SENSORS = 0 TEMP_SENSORS = 1 CalibrationComponents._from_str = { 'PRES_SENSORS': ['pres'], 'TEMP_SENSORS': ['temp'] } # Determine how many pages we will need to send the full data page_payload_count = [] payload_packet = [] current_enum = 0 max_page = 1 while current_enum < dd_enum_repository.CalibrationRecordFields.NUM_OF_CALIBRATION_RECORD_FIELDS.value: remaining_bytes = 256 - (3 * 4) packet = b'' for i in range(current_enum, dd_enum_repository.CalibrationRecordFields.NUM_OF_CALIBRATION_RECORD_FIELDS.value - 1): e = dd_enum_repository.CalibrationRecordFields(i) enum_split = e.split('__') if len(enum_split) > 1: module = CalibrationComponents.from_str(enum_split[0]) sensor = enum_split[1] if len(enum_split) == 2: value = calibration_records[module][sensor] else: field = enum_split[2] value = calibration_records[module][sensor][field] else: value = calibration_records[module] datatype = dd_enum_repository.CalibrationRecordFields(i).datatype() if remaining_bytes > datatype.size(): remaining_bytes -= datatype.size() if datatype in [ DataTypes.U32, DataTypes.BOOL ]: packet += integer_to_bytearray(value) elif datatype in [ DataTypes.F32 ]: packet += float_to_bytearray(value) elif datatype in [ DataTypes.U08 ]: packet += byte_to_bytearray(value) if i == dd_enum_repository.CalibrationRecordFields.NUM_OF_CALIBRATION_RECORD_FIELDS.value - 1: # page_payload_count.append(i - current_enum + 1) page_payload_count.append(MAX_MESSAGE_SIZE - remaining_bytes + (3 * 4)) payload_packet.append(packet) else: max_page += 1 # page_payload_count.append(i - current_enum + 1) page_payload_count.append(MAX_MESSAGE_SIZE - remaining_bytes + (3 * 4)) payload_packet.append(packet) break # Send the packets current_page = 1 while current_page <= max_page: payload = integer_to_bytearray(current_page) # Current Page payload += integer_to_bytearray(max_page) # All Page Count payload += integer_to_bytearray(page_payload_count[current_page - 1]) # Payload count payload.join(payload_packet[current_page - 1]) cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_CALIBRATION_RECORD, entity_name = f'New DD Calibration Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) current_page += 1 sleep(0.5) # ================================================= Private Methods ================================================= def _generic_handler(self: DDRecords, message, service_name, pager_record): # Get the payload related values msg_list =[] msg_list.append(('current_page', DataTypes.U32)) msg_list.append(('all_pages', DataTypes.U32)) msg_list.append(('payload_count', DataTypes.U32)) result = self.process_into_vars(decoder_list = msg_list, message = message) # Get the record data record_list = [] for i in range(0, result['payload_count']): record = service_name(i + pager_record) record_list.append((record.name, record.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message, start_from_byte = len(msg_list) * DataTypes.U32.size) # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message if result['current_page'] == result['all_pages']: pager_record = 0 else: pager_record =+ result['payload_count'] def crc16_modbus(data: bytes) -> int: """Calculate CRC-16 Modbus (Poly: 0xA001, Init: 0xFFFF, Reflected).""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc def crc16_xmodem(data: bytes) -> int: """Calculate CRC-16 XModem (Poly: 0x1021, Init: 0x0000, Not Reflected).""" crc = 0x0000 for byte in data: crc ^= (byte << 8) for _ in range(8): if crc & 0x8000: crc = ((crc << 1) ^ 0x1021) & 0xFFFF else: crc = (crc << 1) & 0xFFFF return crc