#!/bin/python3 from leahi_dialin.ui.utils import singleton_threadsafe from leahi_dialin.utils import conversions from leahi_dialin.protocols import CAN from leahi_dialin.common import msg_ids @singleton_threadsafe class DD_Messaging(): def __init__(self): self.can_enabled: bool=False self.can_Channel: str = "can0" class fakeLogger(): def __init__(self): pass def error(a1, a2): pass def info(a1, a2): pass self.can_interface = CAN.DenaliCanMessenger(can_interface=self.can_Channel, logger=fakeLogger() ) self.can_interface.start() if self.can_interface is not None: self.can_enabled = True def dd_conductivity(self, D17: float, D27: float, D29: float, D43: float, D74: float): """ Broadcasts DD generate conductivity data @param D17 (float): Bicarb only conductivity sensor 1 @param D27 (float): Acid and Bicarb mix conductivity sensor 1 @param D29 (float): Acid and Bicarb mix conductivity sensor 2 @param D43 (float): Spent dialysate conductivity sensor @param D74 (float): Bicarb only conductivity sensor 2 @return: None """ payload = conversions.float_to_bytearray(D17 ) payload += conversions.float_to_bytearray(D27 ) payload += conversions.float_to_bytearray(D29 ) payload += conversions.float_to_bytearray(D43 ) payload += conversions.float_to_bytearray(D74 ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.dd_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_DD_CONDUCTIVITY_DATA.value, payload=payload) self.can_interface.send(message, 0) def dd_gen_dialysate(self, GenDExec : int , isDDInPrg : int , FltLevel1 : int , BiCarbLvl : int , SpentLvl : int , HydNegPres : float , HydPosPres : float , SpentPress : float , IsDialGood : int ): """ Broadcasts DD generate dialysate data @param GenDExec (int ): Generate dialysate execution state @param isDDInPrg (int ): Dialysate Delivery in Progress Bool @param FltLevel1 (int ): Floater level (low, medium and high) @param BiCarbLvl (int ): BiCarb Chamber level @param SpentLvl (int ): Spent dialysate chamber level @param HydNegPres (float): Hydraulics chamber negative pressure @param HydPosPres (float): Hydraulics chamber positive pressure @param SpentPress (float): Spent Dialysate positive pressure @param IsDialGood (int ): Ready to deliver dialysate or not @return: None """ payload = conversions.unsigned_integer_to_bytearray(GenDExec ) payload += conversions.unsigned_integer_to_bytearray(isDDInPrg ) payload += conversions.unsigned_integer_to_bytearray(FltLevel1 ) payload += conversions.unsigned_integer_to_bytearray(BiCarbLvl ) payload += conversions.unsigned_integer_to_bytearray(SpentLvl ) payload += conversions.float_to_bytearray (HydNegPres ) payload += conversions.float_to_bytearray (HydPosPres ) payload += conversions.float_to_bytearray (SpentPress ) payload += conversions.unsigned_integer_to_bytearray(IsDialGood ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.dd_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_DD_GEN_DIALYSATE_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def dd_versions(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int): """ Broadcasts the current dd Version Data @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_byte_to_bytearray (major ) payload += conversions.unsigned_byte_to_bytearray (minor ) payload += conversions.unsigned_byte_to_bytearray (micro ) payload += conversions.unsigned_short_to_bytearray (build ) payload += conversions.unsigned_byte_to_bytearray (fpga_id ) payload += conversions.unsigned_byte_to_bytearray (fpga_major ) payload += conversions.unsigned_byte_to_bytearray (fpga_minor ) payload += conversions.unsigned_byte_to_bytearray (fpga_lab ) payload += conversions.unsigned_integer_to_bytearray(compatibility_rev) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.dd_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_DD_VERSION_REPONSE.value, payload=payload) self.can_interface.send(message, 0) def dd_serial(self, serial: str): """ the dd version serial response message method @param serial: serial number @return: None """ # TODO: replace with proper payload and message ID once message is defined payload = bytes(serial, 'ascii') + b'\x00' message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.dd_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_DD_SERIAL_RESPONSE.value, payload=payload) self.can_interface.send(message, 0)