########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file dg_simulator.py # # @author (last) Behrouz NematiPour # @date (last) 29-Jun-2022 # @author (original) Peter Lucia # @date (original) 16-Mar-2021 # ############################################################################ from ..common import * from ..protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from ..utils import * from ..utils.base import AbstractSubSystem, LogManager from . import messageBuilder class DGSimulator(AbstractSubSystem): instance_count = 0 def __init__(self, can_interface: str = "can0", log_level: str = None, console_out: bool = False, passive_mode: bool = False, auto_response: bool = False): super().__init__() DGSimulator.instance_count = DGSimulator.instance_count + 1 self._log_manager = LogManager(log_level=log_level, log_filepath=self.__class__.__name__ + ".log") self.logger = self._log_manager.logger self.console_out = console_out self.can_interface = DenaliCanMessenger(can_interface=can_interface, logger=self.logger, console_out=console_out, passive_mode=passive_mode) self.can_interface.start() if self.can_interface is not None: channel_id = DenaliChannels.ui_to_dg_ch_id if auto_response: self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_DG_SET_RTC_REQUEST.value, self._handler_set_rtc_request) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_FW_VERSIONS_REQUEST.value, self._handler_request_dg_version) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_sync_broadcast_ch_id, MsgIds.MSG_ID_UI_SERVICE_INFO_REQUEST.value, self._handler_system_usage_response) def _handler_system_usage_response(self) -> None: """ Handles a request for system usage @return: None """ self.logger.debug("Handling request for system usage.") payload = integer_to_bytearray(1619628663) payload += integer_to_bytearray(1619887863) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_SERVICE_SCHEDULE_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_set_rtc_request(self, message: dict) -> None: """ Handles a request to set the DG RTC @param message: (dict) the message content @return: None """ epoch = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.logger.debug("DG: Request to set the DG epoch to {0}".format(epoch)) self.cmd_send_set_rtc_response(YES, 0) def cmd_send_set_rtc_response(self, response: int, reason: int) -> None: """ Sends a set RTC response message @param response: integer - 0=NO, 1=YES @param reason: integer - the rejection reason @return: None """ self.logger.debug("DG: Sending response {0} reason {1}".format(response, reason)) payload = integer_to_bytearray(response) payload += integer_to_bytearray(reason) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_UI_SET_RTC_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_checkin_dg(self) -> None: """ check-in (keep alive) message from DG @return: none """ payload = ["A5", "01", "00", "06", "00", "00", "76", "00"] payload = [int(each, 16) for each in payload] message = {"channel_id": DenaliChannels.dg_to_hd_ch_id, "message": payload} self.can_interface.send(message, 0) def cmd_set_dg_ro_pump_data(self, set_pt_pressure: int, flow_rate: float, pwm: float) -> None: """ the DG RO Pump Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(F32) | #3:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x1F00| 0x080 | 8 | 1 Hz | N | DG | All | DG RO Pump Data | \ref Data::mPressure | \ref Data::mFlowRate | \ref Data::mPWM | @param vSetPtPressure: @param set_pt_pressure: (int) set Point Pressure @param flow_rate: float - Flow Rate @param pwm: float - PWM @return: none """ payload = integer_to_bytearray(set_pt_pressure) payload += float_to_bytearray(flow_rate) payload += float_to_bytearray(pwm) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_RO_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_pressures_data(self, ro_inlet_pressure: float, ro_outlet_pressure: float, drain_inlet_pressure: float, drain_outlet_pressure: float) -> None: """ the DG Pressures Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | #4:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x2000| 0x080 | 8 | 1 Hz | N | DG | All | DG Pressures Data | \ref Data::mROInletPSI | \ref Data::mROOutletPSI | \ref Data::mDrainInletPSI | \ref Data::mDrainOutletPSI | @param ro_inlet_pressure: float - RO Inlet PSI @param ro_outlet_pressure: float - RO Outlet PSI @param drain_inlet_pressure: float - Drain Inlet PSI @param drain_outlet_pressure: float - Drain Outlet PSI @return: none """ payload = float_to_bytearray(ro_inlet_pressure) payload += float_to_bytearray(ro_outlet_pressure) payload += float_to_bytearray(drain_inlet_pressure) payload += float_to_bytearray(drain_outlet_pressure) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_PRESSURES_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_drain_pump_data(self, set_pt_pwm: int, dac_value: int) -> None: """ the DG Drain Pump Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: | |0x2400| 0x080 | 8 | 1 Hz | N | DG | All | DG Drain Pump Data | \ref Data::mRPM | \ref Data::mDAC | @param set_pt_pwm: integer - Set Point RPM @param dac_value: integer - DAC Value @return: none """ payload = integer_to_bytearray(set_pt_pwm) payload += integer_to_bytearray(dac_value) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DRAIN_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_operation_mode(self, dg_op_mode: int) -> None: """ the DG Operation Mode Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x2700| 0x080 | 8 | 1 Hz | N | DG | All | DG Operation Mode Data | \ref Data::mOpMode | @param dg_op_mode: integer - DG Operation Mode @return: none """ payload = integer_to_bytearray(dg_op_mode) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_OP_MODE.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_reservoir_data(self, active_reservoir: int, fill_to_vol_ml: int, drain_to_vol_ml: int) -> None: """ the DG Reservoir Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2800| 0x080 | 8 | 1 Hz | N | DG | All | DG Reservoir Data | \ref Data::mActiveReservoir | \ref Data::mFillToVol | \ref Data::mDrainToVol | @param active_reservoir: integer - Active Reservoir @param fill_to_vol_ml: integer - Fill To Volume ML @param drain_to_vol_ml: integer - Drain To Vol ML @return: none """ payload = integer_to_bytearray(active_reservoir) payload += integer_to_bytearray(fill_to_vol_ml) payload += integer_to_bytearray(drain_to_vol_ml) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_RESERVOIRS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_valves_states(self, valves_states): """ the DG Valves States Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U16) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x2A00| 0x080 | 8 | 2 Hz | N | DG | All | DG Valves States Data | \ref Data::mStates | @param valves_states: integer - Valves states @return: none """ payload = integer_to_bytearray(valves_states) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_VALVES_STATES.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_heaters_data(self, main_primary_dc: int, small_primary_dc: int, trimmer_dc: int) -> None: """ the DG Heaters Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | #2:(U32) | #3:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2C00| 0x080 | 8 | 2 Hz | N | DG | All | DG Heaters Data | \ref Data::mMainPrimaryDC | \ref Data::mSmallPrimaryDC | \ref Data::mTrimmerDC | @param main_primary_dc: integer - Main PriMary DC @param small_primary_dc: integer - Small Primary DC @param trimmer_dc: integer - Trimmer DC @return: none """ payload = integer_to_bytearray(main_primary_dc) payload += integer_to_bytearray(small_primary_dc) payload += integer_to_bytearray(trimmer_dc) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_HEATERS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_load_cell_readings_data(self, reservoir1_primary: float, reservoir1_backup: float, reservoir2_primary: float, reservoir2_backup: float) -> None: """ The DG Load Cell Readings Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | #4:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: |:--: | |0x0C00| 0x080 | 8 | 10 Hz | N | DG | All | DG Load Cell Readings Data | \ref Data::mReservoir1Prim | \ref Data::mReservoir1Bkup | \ref Data::mReservoir2Prim | \ref Data::mReservoir2Bkup | @param reservoir1_primary: float - Reservoir 1 Primary @param reservoir1_backup: float - Reservoir 1 Backup @param reservoir2_primary: float - Reservoir 2 Primary @param reservoir2_backup: float - Reservoir 2 Backup @return: none """ payload = float_to_bytearray(reservoir1_primary) payload += float_to_bytearray(reservoir1_backup) payload += float_to_bytearray(reservoir2_primary) payload += float_to_bytearray(reservoir2_backup) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_LOAD_CELL_READINGS.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_dg_temperatures_data(self, inlet_primary_heater: float, outlet_primary_heater: float, conductivity_sensor1: float, conductivity_sensor2: float, outlet_redundancy: float, inlet_dialysate: float, primary_heater_thermocouple: float, trimmer_heater_thermocouple: float, primary_heater_cold_junction: float, trimmer_heater_cold_junction: float, primary_heater_internal_temp: float, trimmer_heater_internal_temp: float) -> None: """ the DG Temperatures Data message setter/sender method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(F32) | #2:(F32) | #3:(F32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: |:--: |:--: | |0x2D00| 0x080 | 8 | 2 Hz | N | DG | All | DG Temperatures Data | \ref Data::mInletPrimaryHeater | \ref Data::mOutletPrimaryHeater | \ref Data::mConductivitySensor1 | | #4:(F32) | #5:(F32) | #6:(F32) | #7:(F32) | #8:(F32) | |:--: |:--: |:--: |:--: |:--: | | \ref Data::mConductivitySensor2 | \ref Data::mOutletRedundancy | \ref Data::mInletDialysate | \ref Data::mPrimaryHeaterThermoCouple | \ref Data::mTrimmerHeaterThermoCouple | | #9:(F32) | #10:(F32) | #11:(F32) | #12:(F32) | | :--: |:--: |:--: |:--: | | \ref Data::mPrimaryHeaterColdJunction | \ref Data::mTrimmerHeaterColdJunction | \ref Data::mPrimaryHeaterInternal | \ref Data::mTrimmerHeaterInternal | @param inlet_primary_heater: (float) Inlet Primary Heater @param outlet_primary_heater: (float) Outlet Primary Heater @param conductivity_sensor1: (float) Conductivity Sensor 1 @param conductivity_sensor2: (float) Conductivity Sensor 2 @param outlet_redundancy: (float) Outlet Redundancy @param inlet_dialysate: (float) Inlet Dialysate @param primary_heater_thermocouple: (float) Primary Heater Thermocouple @param trimmer_heater_thermocouple: (float) Trimmer Heater Thermocouple @param primary_heater_cold_junction: (float) Primary Heater ColdJunction @param trimmer_heater_cold_junction: (float) Trimmer Heater ColdJunction @param primary_heater_internal_temp: (float) Primary Heater Internal Temperature @param trimmer_heater_internal_temp: (float) Trimmer HeaterInternal Temperature @return: none """ payload = float_to_bytearray(inlet_primary_heater) payload += float_to_bytearray(outlet_primary_heater) payload += float_to_bytearray(conductivity_sensor1) payload += float_to_bytearray(conductivity_sensor2) payload += float_to_bytearray(outlet_redundancy) payload += float_to_bytearray(inlet_dialysate) payload += float_to_bytearray(primary_heater_thermocouple) payload += float_to_bytearray(trimmer_heater_thermocouple) payload += float_to_bytearray(primary_heater_cold_junction) payload += float_to_bytearray(trimmer_heater_cold_junction) payload += float_to_bytearray(primary_heater_internal_temp) payload += float_to_bytearray(trimmer_heater_internal_temp) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_TEMPERATURE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_unknown_dg(self) -> None: """ the unknown message from DG setter/sender method @return: none """ payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_UNUSED.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_accelerometer_dg_data(self, x: float, y: float, z: float, x_max: float, y_max: float, z_max: float, x_tilt: float, y_tilt: float, z_tilt: float) -> None: """ the accelerometer hd data message method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: | |0x3400| 0x080 | 8 | 1Hz | N | HD | UI | DG Accelerometer data | | #1:(F32) | #2:(F32) | #3:(U32) | |:--: |:--: |:--: | | \ref Data::mX | \ref Data::mY | \ref Data::mX | | #4:(F32) | #5:(F32) | #6:(U32) | |:--: |:--: |:--: | | \ref Data::mXMax | \ref Data::mYMax | \ref Data::mXMax | | #7:(F32) | #8:(F32) | #9:(U32) | |:--: |:--: |:--: | | \ref Data::mXTilt | \ref Data::mYTilt | \ref Data::mXTilt | @param x: float - x axis @param y: float - y axis @param z: float - z axis @param x_max: float - x axis max @param y_max: float - y axis max @param z_max: float - z axis max @param x_tilt: float - x axis tilt @param y_tilt: float - y axis tilt @param z_tilt: float - z axis tilt @return: None """ payload = float_to_bytearray(x) payload += float_to_bytearray(y) payload += float_to_bytearray(z) payload += float_to_bytearray(x_max) payload += float_to_bytearray(y_max) payload += float_to_bytearray(z_max) payload += float_to_bytearray(x_tilt) payload += float_to_bytearray(y_tilt) payload += float_to_bytearray(z_tilt) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_ACCELEROMETER_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_request_dg_version(self) -> None: """ Handles a request for the HD version @return: None """ self.logger.debug("Handling request for dg version.") self.cmd_send_version_dg_data(9, 9, 9, 9, 9, 9, 9, 9) self.cmd_send_dg_serial_number() def cmd_send_dg_serial_number(self) -> None: """ Sends the dg serial number @return: None """ payload = b'0123456789\0' message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_SERIAL_NUMBER.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_version_dg_data(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int) -> None: """ [0x1E00] # 30 0x080 Rsp Y DG All DG f/w version U08=Major U08=Minor U08=Micro U16=Build U08=FPGA ID U08=FPGA Major U08=FPGA Minor U08=FPGA Lab U32=compatibility rev --- the dg version response message method @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ payload = unsigned_byte_to_bytearray(major) payload += unsigned_byte_to_bytearray(minor) payload += unsigned_byte_to_bytearray(micro) payload += unsigned_short_to_bytearray(build) payload += unsigned_byte_to_bytearray(fpga_id) payload += unsigned_byte_to_bytearray(fpga_major) payload += unsigned_byte_to_bytearray(fpga_minor) payload += unsigned_byte_to_bytearray(fpga_lab) payload += unsigned_integer_to_bytearray(compatibility_rev) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_VERSION.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_serial_dg_data(self, serial: str): """ the dg version serial response message method @param serial: serial number @return: None """ payload = bytes(serial, 'ascii') + b'\x00' message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_SERIAL_NUMBER.value, payload=payload) self.can_interface.send(message, 0) @staticmethod def build_dg_debug_text(text: str) -> list: """ the debug text message from DG builder method @param text: string - the debug text @return: none """ message_length = 40 txt = messageBuilder.textToByte(text, message_length) # + 1 null term msg = messageBuilder.buildMessage(GuiActionType.DGDebugText, 1 * (message_length + 1), False, txt) return messageBuilder.toFrames(msg) def cmd_send_dg_pre_treatment_filter_flush_progress_data(self, total, countdown) -> None: """ send the pretreatment filter flush progress data @param total: (U32) Total time in second @param countdown: (U32) count down time in second @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_FILTER_FLUSH_PROGRESS.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_dg_disinfect_progress_time_flush(self, total: int, countdown: int) -> None: """ the broadcast progress water flush time @param total: the total time @param countdown: the gradual countdown time @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_FLUSH_TIME_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_dg_disinfect_progress_time_heat(self, total: int, countdown: int) -> None: """ the broadcast progress heat disinfect time @param total: the total time @param countdown: the gradual countdown time @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_HEAT_DISINFECT_TIME_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_dg_post(self, item: int, passed: bool, done: bool = False) -> None: """ send hd post message the single(item) or the final(done) @param item: the post state/item index @param passed: the post result single or final @param done: if this is the final post message this should be true @return: None """ payload = integer_to_bytearray(passed) if not done: payload += integer_to_bytearray(item) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_POST_FINAL_TEST_RESULT.value if done else MsgIds.MSG_ID_DG_POST_SINGLE_TEST_RESULT.value, payload=payload) self.can_interface.send(message, 0) # ------------------------------------------------ GENERAL MESSAGES ------------------------------------------------ def cmd_send_dg_disinfect_progress_time_checmical(self, total: int, countdown: int) -> None: """ the broadcast progress chemical disinfect time @param total: the total time @param countdown: the gradual countdown time @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=MsgIds.MSG_ID_DG_CHEM_DISINFECT_TIME_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_dg_general_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00, channel_id=DenaliChannels.dg_to_ui_ch_id) -> None: """ a general method to send any standard response message, by it's id and list of parameters. @param message_id: the id of the message @param accepted: the standard accepted parameter of any response message @param reason: the standard rejection reason parameter of any response message @param is_pure_data: The message only has data @param has_parameters: if the message has parameter this needs to be true. @param parameters_payload: the list of parameters pre-converted and ready to be concatenated to the payload. @param channel_id: (int) indicates the channel @return: None """ payload = "" if not is_pure_data: payload = integer_to_bytearray(accepted) payload += integer_to_bytearray(reason) if has_parameters: payload = parameters_payload message = DenaliMessage.build_message(channel_id=channel_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_sg_general_progress_data(self, message_id: int, total: int, countdown: int) -> None: """ a general method t send any standard progress data message, by it's id @param message_id: the id of the message @param total: the total value of the progress data @param countdown: the remaining or countdown value @return: None """ payload = integer_to_bytearray(total) payload += integer_to_bytearray(countdown) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=message_id, payload=payload) self.can_interface.send(message, 0) def cmd_send_dg_ack(self, seq: int) -> None: """ sending dg ack message by the sequence seq @param seq: the message sequence number @return: None """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, message_id=GuiActionType.Acknow, seq=seq) self.can_interface.send(message, 0) def cmd_send_dg_conductivity_data(self, ro_rejection_ratio: float, cpi_conductivity: float, cpo_conductivity: float, cd1_conductivity: float, cd2_conductivity: float) -> None: """ A simulated DG broadcast message of conductivity data. @param ro_rejection_ratio: (float) RO Pump rejection ratio @param cpi_conductivity: (float) CPi conductivity @param cpo_conductivity: (float) CPo conductivity @param cd1_conductivity: (float) CD1 conductivity @param cd2_conductivity: (float) CD2 conductivity @return: None """ payload = float_to_bytearray(ro_rejection_ratio) payload += float_to_bytearray(cpi_conductivity) payload += float_to_bytearray(cpo_conductivity) payload += float_to_bytearray(cd1_conductivity) payload += float_to_bytearray(cd2_conductivity) message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_DG_CONDUCTIVITY_DATA.value, payload=payload) self.can_interface.send(message, 0)