########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file pressure_sensors.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Sean # @date (original) 14-Apr-2020 # ############################################################################ # Module imports from logging import Logger from functools import partial # Project imports from leahi_dialin.common import dd_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_override from leahi_dialin.protocols.CAN import CanMessenger, CanChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.enums import DialinEnum from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray, byte_to_bytearray, short_to_bytearray class DDRecords(AbstractSubSystem): """ DD interface containing pressure related commands. """ def __init__(self, can_interface: CanMessenger, logger: Logger): """ @param can_interface: The CanMessenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SYSTEM_RECORD.value, function = self._handler_system_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_SERVICE_RECORD.value, function = self._handler_service_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_CALIBRATION_RECORD.value, function = self._handler_calibration_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_INSTITUTIONAL_RECORD.value, function = self._handler_institutional_record_sync) self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_NVM_SEND_USAGE_INFO_RECORD.value, function = self._handler_usage_info_record_sync) self.system_records_timestamp = 0 #: The timestamp of the latest System Records message self.service_records_timestamp = 0 #: The timestamp of the latest Service Records message self.calibration_records_timestamp = 0 #: The timestamp of the latest Calibration Records message self.institutional_records_timestamp = 0 #: The timestamp of the latest Institutional Records message self.usage_info_records_timestamp = 0 #: The timestamp of the latest Usage Information Records message self.system_records = { } #: The System Records data in dictionary format self.service_records = { } #: The Service Records data in dictionary format self.calibration_records = { } #: The Calibration Records data in dictionary format self.institutional_records = { } #: The Institutional Records data in dictionary format self.usage_info_records = { } #: The Usage Information Records data in dictionary format for sys_record in dd_enum_repository.SystemRecordFields: self.system_records[sys_record.name] = None for serv_record in dd_enum_repository.ServiceRecordFields: self.service_records[serv_record.name] = None for cal_record in dd_enum_repository.CalibrationRecordFields: self.calibration_records[cal_record.name] = None for inst_record in dd_enum_repository.InstitutionalRecordFields: self.institutional_records[inst_record.name] = None for usage_record in dd_enum_repository.UsageInformationRecordFields: self.usage_info_records[usage_record.name] = None # ================================================= CAN Message Handler Methods ================================================= @publish(["msg_id_dd_nvm_send_system_record", "system_records", "system_records_timestamp"]) def _handler_system_record_sync(self, message, timestamp = 0.0): """ Handles published DD System Record data messages. DD System Records are captured for reference. @param message: published data message @return: none """ record_list = [] for service_member in dd_enum_repository.ServiceRecordFields: record_list.append((service_member.name, service_member.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message) # Get the payload related values msg_list =[] msg_list.append(('current_page', DataTypes.U32)) msg_list.append(('all_pages', DataTypes.U32)) msg_list.append(('payload_count', DataTypes.U32)) result = self.process_into_vars(decoder_list = msg_list, message = message) # Get the record data record_list = [] for i in range(0, result['payload_count']): sys_record = dd_enum_repository.SystemRecordFields(i + self.pager_system_record) if sys_record.multichar_length > 1: for j in range(0, sys_record.multichar_length - 1): name = f'{sys_record.name}_{j}' record_list.append((name, sys_record.datatype)) else: name = sys_record.name record_list.append((name, sys_record.datatype)) result_payload = self.process_into_vars(decoder_list = record_list, message = message, start_from_byte = len(msg_list) * DataTypes.U32.size) # Saving data into System Records self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name] = '' self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name] = '' self.system_records[dd_enum_repository.SystemRecordFields.MFG_LOCATION.name] = 0 self.system_records[dd_enum_repository.SystemRecordFields.MFG_DATE.name] = 0 self.system_records[dd_enum_repository.SystemRecordFields.CRC.name] = 0 for key in result_payload: if key.startswith(dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name): value = result_payload[key] self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_PN.name] += value elif key.startswith(dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name): value = result_payload[key] self.system_records[dd_enum_repository.SystemRecordFields.TOP_LEVEL_SN.name] += value else: self.system_records[key] = result_payload[key] # Increase the pager by the payload count to know where the next message should resume, or reset when it's the last message if result['current_page'] == result['all_pages']: self.pager_system_record = 0 else: self.pager_system_record =+ result['payload_count'] self.system_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_service_record", "service_records", "service_records_timestamp"]) def _handler_service_record_sync(self, message, timestamp = 0.0): """ Handles published DD Service Record data messages. DD Service Records are captured for reference. @param message: published data message @return: none """ record_list = [] for member in dd_enum_repository.ServiceRecordFields: record_list.append((member.name, member.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message) self.service_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_calibration_record", "calibration_records", "calibration_records_timestamp"]) def _handler_calibration_record_sync(self, message, timestamp = 0.0): """ Handles published DD Calibration Record data messages. DD Calibration Records are captured for reference. @param message: published data message @return: none """ record_list = [] for member in dd_enum_repository.CalibrationRecordFields: record_list.append((member.name, member.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message) self.calibration_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_institutional_record", "institutional_records", "institutional_records_timestamp"]) def _handler_institutional_record_sync(self, message, timestamp = 0.0): """ Handles published DD Institutional Record data messages. DD Institutional Records are captured for reference. @param message: published data message @return: none """ record_list = [('record_id', DataTypes.U08)] results = self.process_into_vars(decoder_list = record_list, message = message) record_id = int(results['record_id']) member = dd_enum_repository.InstitutionalRecordFields(record_id) record_list = [] record_list.append((member.name, member.datatype)) record_list.append((dd_enum_repository.InstitutionalRecordFields.CALIBRATION_TIME.name, dd_enum_repository.InstitutionalRecordFields.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message, start_from_byte = len(record_list) * DataTypes.U08.size) self.institutional_records_timestamp = timestamp @publish(["msg_id_dd_nvm_send_usage_info_record", "usage_info_records", "usage_info_records_timestamp"]) def _handler_usage_info_record_sync(self, message, timestamp = 0.0): """ Handles published DD Usage Information Record data messages. DD Usage Information Records are captured for reference. @param message: published data message @return: none """ record_list = [] for member in dd_enum_repository.UsageInformationRecordFields: record_list.append((member.name, member.datatype)) self.process_into_dict(dict_to_update = self.service_records, decoder_list = record_list, message = message) self.usage_info_records_timestamp = timestamp # ================================================= Go to Service Mode Method ================================================= def cmd_initiate_service_mode(self) -> int: """ Constructs and sends a request to change to Service operation mode via CAN bus. Constraints: Must be logged into DD. Transition from current to requested op mode must be legal. @return: 1 if successful, zero otherwise """ service_mode = dd_enum_repository.DDOpModes.MODE_SERV payload = integer_to_bytearray(service_mode.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_SET_OPERATION_MODE_OVERRIDE_REQUEST, entity_name = 'DD Operation Mode', override_text = f'set to {service_mode.name}', logger = self.logger, can_interface = self.can_interface) # ================================================= Request Records Methods ================================================= def cmd_request_system_records(self) -> int: """ Constructs and sends a request for System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(dd_enum_repository.RecordTypes.SYSTEM_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD System Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_service_records(self) -> int: """ Constructs and sends a request for Service Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(dd_enum_repository.RecordTypes.SERVICE_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Service Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_calibration_records(self) -> int: """ Constructs and sends a request for Calibration Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(dd_enum_repository.RecordTypes.CALIBRATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Calibration Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_institutional_records(self) -> int: """ Constructs and sends a request for Institutional Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(dd_enum_repository.RecordTypes.INSTITUTIONAL_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Institutional Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) def cmd_request_usage_information_records(self) -> int: """ Constructs and sends a request for Usage Information Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(dd_enum_repository.RecordTypes.USAGE_INFORMATION_RECORD.value) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_GET_RECORD, entity_name = f'DD Usage Information Record request', override_text = '', logger = self.logger, can_interface = self.can_interface) # ================================================= Set Records Main Methods ================================================= def cmd_set_system_records(self, part_number: str='0000000000', serial_number: str='00000000000000000000', manufacturing_location: int=0, manufacturing_date: int=0) -> int: """ Constructs and sends a command for setting the System Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ if len(part_number) != 10: raise ValueError("part_number must be 10 characters") if len(serial_number) != 10: raise ValueError("serial must be 20 characters") payload = b'' for c in part_number: payload += byte_to_bytearray(c) for c in serial_number: payload += byte_to_bytearray(c) payload += integer_to_bytearray(manufacturing_location) payload += integer_to_bytearray(manufacturing_date) payload += short_to_bytearray(crc16_modbus(payload)) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_SYSTEM_RECORD, entity_name = f'New DD System Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_service_records(self, service_loc: int=0, last_service_date: int=0, service_interval_sec: int=0, last_reset_time: int=0) -> int: """ Constructs and sends a command for setting the Service Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = byte_to_bytearray(service_loc) payload += integer_to_bytearray(last_service_date) payload += integer_to_bytearray(service_interval_sec) payload += integer_to_bytearray(last_reset_time) payload += short_to_bytearray(crc16_modbus(payload)) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_SERVICE_RECORD, entity_name = f'New DD Service Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_calibration_records(self, calibration_records: dict) -> int: """ Constructs and sends a command for setting the Calibration Records. Constraints: Must be logged into DD. Must be in Service mode. @param calibration_records: (Dictionary) The stored and structured Calibration Records @return: 1 if successful, zero otherwise """ msg_id_pairing = { 'pressure_sensors': MsgIds.TBD, 'temperature_sensors': MsgIds.TBD, 'concentrate_pumps': MsgIds.TBD, 'dialysate_pumps': MsgIds.TBD, 'acid': MsgIds.TBD, 'bicarb': MsgIds.TBD, 'accelerometer': MsgIds.TBD, 'blood_leak': MsgIds.TBD, } # Make a dictionary to store all the send functions for later send send_data = [] for key in calibration_records: if key.lower() in ['pressure_sensors', 'temperature_sensors', 'concentrate_pumps', 'dialysate_pumps']: for sensor in calibration_records[key]: # Store the function but do not execute it send_data.append(partial(self.cmd_set_calibration_records_sensor(sensor_enum = sensor, msg_id = msg_id_pairing[key], forth_order_coeff = calibration_records[key][sensor]['forth_order_coeff'], third_order_coeff = calibration_records[key][sensor]['third_order_coeff'], second_order_coeff = calibration_records[key][sensor]['second_order_coeff'], gain = calibration_records[key][sensor]['gain'], offset = calibration_records[key][sensor]['offset'], calibration_time = calibration_records[key][sensor]['calibration_time']))) elif key.lower() in ['dialysate_pumps']: for sensor in calibration_records[key]: # Store the function but do not execute it send_data.append(partial(self.cmd_set_calibration_records_sensor(sensor_enum = sensor, msg_id = msg_id_pairing[key], dialysate_pump_target_speed = calibration_records[key][sensor]['target_speed'], forth_order_coeff = calibration_records[key][sensor]['forth_order_coeff'], third_order_coeff = calibration_records[key][sensor]['third_order_coeff'], second_order_coeff = calibration_records[key][sensor]['second_order_coeff'], gain = calibration_records[key][sensor]['gain'], offset = calibration_records[key][sensor]['offset'], calibration_time = calibration_records[key][sensor]['calibration_time']))) elif key.lower() in ['acid', 'bicarb']: for sensor in calibration_records[key]: # Store the function but do not execute it send_data.append(partial(self.cmd_set_calibration_records_concentrate(msg_id = msg_id_pairing[key], concentrate_mix_ratio = calibration_records[key][sensor]['concentrate_mix_ratio'], volume_ml = calibration_records[key][sensor]['volume_ml'], conductivity_uspcm = calibration_records[key][sensor]['conductivity_uspcm'], temperature_c = calibration_records[key][sensor]['temperature_c'], calibration_time = calibration_records[key][sensor]['calibration_time']))) elif key.lower() in ['accelerometer']: for sensor in calibration_records[key]: # Store the function but do not execute it send_data.append(partial(self.cmd_set_calibration_records_accelerometer(msg_id = msg_id_pairing[key], accel_x_offset = calibration_records[key][sensor]['accel_x_offset'], accel_y_offset = calibration_records[key][sensor]['accel_y_offset'], accel_z_offset = calibration_records[key][sensor]['accel_z_offset'], calibration_time = calibration_records[key][sensor]['calibration_time']))) elif key.lower() in ['blood_leak']: for sensor in calibration_records[key]: # Store the function but do not execute it send_data.append(partial(self.cmd_set_calibration_records_blood_leak(msg_id = msg_id_pairing[key], set_point = calibration_records[key][sensor]['set_point'], calibration_time = calibration_records[key][sensor]['calibration_time']))) # Execute the stored functions one by one # Remove the ones that got 1 (successfully recieved) as response # Retry the ones that are failed 2 more times retry = 0 while send_data != [] and retry < 3: failed = [] for func in send_data: resp = func() if resp == 1: failed.append(func) send_data = failed retry += 1 def cmd_set_institutional_records(self, institutional_records: dict) -> int: """ Constructs and sends a command for setting the Institutional Records. Constraints: Must be logged into DD. Must be in Service mode. @param institutional_records: (Dictionary) The stored and structured Institutional Records @return: 1 if successful, zero otherwise """ # Make a dictionary to store all the send functions for later send send_data = [] for key in institutional_records: payload = integer_to_bytearray(key.value) if key.datatype() in [DataTypes.U32, DataTypes.BOOL, DataTypes.S32]: payload += integer_to_bytearray(institutional_records[key]) elif key.datatype() in [DataTypes.F32]: payload += float_to_bytearray(institutional_records[key]) elif key.datatype() in [DataTypes.U08, DataTypes.BOOL_U08]: payload += byte_to_bytearray(institutional_records[key]) payload += short_to_bytearray(crc16_modbus(payload)) send_data.append(partial(cmd_generic_override(payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_INSTITUTIONAL_RECORD, entity_name = f'DD Institutional Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface))) # Execute the stored functions one by one # Remove the ones that got 1 (successfully recieved) as response # Retry the ones that are failed 2 more times retry = 0 while send_data != [] and retry < 3: failed = [] for func in send_data: resp = func() if resp == 1: failed.append(func) send_data = failed retry += 1 def cmd_set_usage_info_records(self, ro_water_gen_total_l: float=0.0, ro_water_gen_since_last_serv_l: float=0.0, last_basic_flush_date: int=0, last_heat_disinfect_date: int=0, last_heat_active_cool_date: int=0, last_filter_flush_date: int=0, last_reset_time: int=0) -> int: """ Constructs and sends a command for setting the Usage Information Records. Constraints: Must be logged into DD. Must be in Service mode. @return: 1 if successful, zero otherwise """ payload = float_to_bytearray(ro_water_gen_total_l) payload += float_to_bytearray(ro_water_gen_since_last_serv_l) payload += integer_to_bytearray(last_basic_flush_date) payload += integer_to_bytearray(last_heat_disinfect_date) payload += integer_to_bytearray(last_heat_active_cool_date) payload += integer_to_bytearray(last_filter_flush_date) payload += integer_to_bytearray(last_reset_time) payload += short_to_bytearray(crc16_modbus(payload)) return cmd_generic_override( payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_NVM_SET_USAGE_INFO_RECORD, entity_name = f'New DD Usage Information Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) # ================================================= Set Records Support Methods ================================================= def cmd_set_calibration_records_sensor(self, sensor_enum: DialinEnum, msg_id: MsgIds, dialysate_pump_target_speed: float=0.0, forth_order_coeff: float=0.0, third_order_coeff: float=0.0, second_order_coeff: float=0.0, gain: float=0.0, offset: float=0.0, calibration_time: int = 0) -> int: payload = integer_to_bytearray(sensor_enum.value) if msg_id == MsgIds.TBD: payload += float_to_bytearray(dialysate_pump_target_speed) payload += float_to_bytearray(forth_order_coeff) payload += float_to_bytearray(third_order_coeff) payload += float_to_bytearray(second_order_coeff) payload += float_to_bytearray(gain) payload += float_to_bytearray(offset) payload += integer_to_bytearray(calibration_time) payload += short_to_bytearray(crc16_modbus(payload)) sensor_type = '' if msg_id == MsgIds.TBD: sensor_type = 'Pressure Sensor' elif msg_id == MsgIds.TBD: sensor_type = 'Temperature Sensor' elif msg_id == MsgIds.TBD: sensor_type = 'Concentrate Pump' elif msg_id == MsgIds.TBD: sensor_type = 'Dialysate Pump' return cmd_generic_override(payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = msg_id, entity_name = f'DD {sensor_type} {sensor_enum.name} Calibration Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_calibration_records_concentrate(self, msg_id: MsgIds, concentrate_mix_ratio: float=0.0, volume_ml: float=0.0, conductivity_uspcm: float=0.0, temperature_c: float=0.0, calibration_time: int = 0) -> int: payload = float_to_bytearray(concentrate_mix_ratio) payload += float_to_bytearray(volume_ml) payload += float_to_bytearray(conductivity_uspcm) payload += float_to_bytearray(temperature_c) payload += integer_to_bytearray(calibration_time) payload += short_to_bytearray(crc16_modbus(payload)) conc_type = '' if msg_id == MsgIds.TBD: conc_type = 'Acid' elif msg_id == MsgIds.TBD: conc_type = 'Bicarb' return cmd_generic_override(payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = msg_id, entity_name = f'DD {conc_type} Concentrate Calibration Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_calibration_records_accelerometer(self, msg_id: MsgIds, accel_x_offset: float=0.0, accel_y_offset: float=0.0, accel_z_offset: float=0.0, calibration_time: int = 0) -> int: payload = float_to_bytearray(accel_x_offset) payload += float_to_bytearray(accel_y_offset) payload += float_to_bytearray(accel_z_offset) payload += integer_to_bytearray(calibration_time) payload += short_to_bytearray(crc16_modbus(payload)) conc_type = '' if msg_id == MsgIds.TBD: conc_type = 'Acid' elif msg_id == MsgIds.TBD: conc_type = 'Bicarb' return cmd_generic_override(payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = msg_id, entity_name = f'DD {conc_type} Concentrate Calibration Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) def cmd_set_calibration_records_blood_leak(self, msg_id: MsgIds, set_point: float=0.0, calibration_time: int = 0) -> int: payload = float_to_bytearray(set_point) payload += integer_to_bytearray(calibration_time) payload += short_to_bytearray(crc16_modbus(payload)) return cmd_generic_override(payload = payload, reset = None, channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = msg_id, entity_name = f'DD Blood Leak Calibration Record', override_text = 'being set', logger = self.logger, can_interface = self.can_interface) # ================================================= Private Methods ================================================= def crc16_modbus(data: bytes) -> int: """Calculate CRC-16 Modbus (Poly: 0xA001, Init: 0xFFFF, Reflected).""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc def crc16_xmodem(data: bytes) -> int: """Calculate CRC-16 XModem (Poly: 0x1021, Init: 0x0000, Not Reflected).""" crc = 0x0000 for byte in data: crc ^= (byte << 8) for _ in range(8): if crc & 0x8000: crc = ((crc << 1) ^ 0x1021) & 0xFFFF else: crc = (crc << 1) & 0xFFFF return crc