#!/bin/python3 from leahi_dialin.ui.utils import singleton_threadsafe from leahi_dialin.utils import conversions from leahi_dialin.protocols import CAN from leahi_dialin.common import msg_ids @singleton_threadsafe class TD_Messaging(): def __init__(self): self.can_enabled: bool=False self.can_Channel: str = "can0" class fakeLogger(): def __init__(self): pass def error(a1, a2): pass def info(a1, a2): pass self.can_interface = CAN.DenaliCanMessenger(can_interface=self.can_Channel, logger=fakeLogger() ) self.can_interface.start() if self.can_interface is not None: self.can_enabled = True def td_operation_mode(self, op_mode: int, sub_mode: int = 0): """Broadcasts the current TD operation mode Args: op_mode (int ): operation mode sub_mode (int, optional ): operation sub-mode. Defaults to 0. """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(op_mode ) payload += conversions.integer_to_bytearray(sub_mode) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_OP_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_Treatment_Parameters_CreateRx(self, vRejectionReason: int): if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(1 if vRejectionReason == 0 else 0) payload += conversions.integer_to_bytearray(vRejectionReason) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW.value, payload=payload) self.can_interface.send(message, 0) def td_Treatment_Parameters_Validation( self, vAccepted : int = 1, vBloodFlowRateRejectReason : int = 0, vDialysateFlowRateRejectReason : int = 0, vTreatmentDurationRejectReason : int = 0, vSalineBolusVolumeRejectReason : int = 0, vHeparinStopTimeRejectReason : int = 0, vHeparinTypeRejectReason : int = 0, vAcidConcentrateRejectReason : int = 0, vBicarbonateConcentrateRejectReason : int = 0, vDialyzerTypeRejectReason : int = 0, vBloodPressureMeasureIntervalRejectReason : int = 0, vRinsebackFlowRateRejectReason : int = 0, vRinsebackVolumeRejectReason : int = 0, vArterialPressureLimitWindowRejectReason : int = 0, vVenousPressureLimitWindowRejectReason : int = 0, vVenousPressureLimitAsymtrcRejectReason : int = 0, vTrancembrncPressureLimitWindowRejectReason : int = 0, vDialysateTempRejectReason : int = 0, vHeparinDispensingRateRejectReason : int = 0, vHeparinBolusVolumeRejectReason : int = 0 ): """TD Treatment Parameter Validation Response to the SW validation request with sent values of the same parameters. Args: vAccepted (int, optional): Zero value if rejected. Defaults to 1. vBloodFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialysateFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vTreatmentDurationRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vSalineBolusVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinStopTimeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinTypeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vAcidConcentrateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vBicarbonateConcentrateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialyzerTypeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vBloodPressureMeasureIntervalRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vRinsebackFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vRinsebackVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vArterialPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vVenousPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vVenousPressureLimitAsymtrcRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vTrancembrncPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialysateTempRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinDispensingRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinBolusVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(vAccepted ) payload += conversions.integer_to_bytearray(vBloodFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vDialysateFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vTreatmentDurationRejectReason ) payload += conversions.integer_to_bytearray(vSalineBolusVolumeRejectReason ) payload += conversions.integer_to_bytearray(vHeparinStopTimeRejectReason ) payload += conversions.integer_to_bytearray(vHeparinTypeRejectReason ) payload += conversions.integer_to_bytearray(vAcidConcentrateRejectReason ) payload += conversions.integer_to_bytearray(vBicarbonateConcentrateRejectReason ) payload += conversions.integer_to_bytearray(vDialyzerTypeRejectReason ) payload += conversions.integer_to_bytearray(vBloodPressureMeasureIntervalRejectReason ) payload += conversions.integer_to_bytearray(vRinsebackFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vRinsebackVolumeRejectReason ) payload += conversions.integer_to_bytearray(vArterialPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vVenousPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vVenousPressureLimitAsymtrcRejectReason ) payload += conversions.integer_to_bytearray(vTrancembrncPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vDialysateTempRejectReason ) payload += conversions.integer_to_bytearray(vHeparinDispensingRateRejectReason ) payload += conversions.integer_to_bytearray(vHeparinBolusVolumeRejectReason ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_general_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00, channel_id=CAN.DenaliChannels.td_to_ui_ch_id) -> None: """A general method to send any standard response message, by it's id and list of parameters. Args: message_id (int ): the id of the message accepted (int ): the standard accepted parameter of any response message reason (int ): the standard rejection reason parameter of any response message is_pure_data (bool, optional ): The message only has data. Defaults to False. has_parameters (bool, optional ): if the message has parameter this needs to be true. Defaults to False. parameters_payload (any, optional ): the list of parameters pre-converted and ready to be concatenated to the payload. Defaults to 0x00. channel_id (_type_, optional ): indicates the channel. Defaults to CAN.DenaliChannels.td_to_ui_ch_id. """ payload = "" if not is_pure_data: payload = conversions.integer_to_bytearray(accepted) payload += conversions.integer_to_bytearray(reason ) if has_parameters: payload = parameters_payload message = CAN.DenaliMessage.build_message( channel_id, message_id, payload ) self.can_interface.send(message, 0)