#!/bin/python3 from leahi_dialin.ui.utils import singleton_threadsafe from leahi_dialin.utils import conversions from leahi_dialin.protocols import CAN from leahi_dialin.common import msg_ids @singleton_threadsafe class FP_Messaging(): def __init__(self): self.can_enabled: bool=False self.can_Channel: str = "can0" class fakeLogger(): def __init__(self): pass def error(a1, a2): pass def info(a1, a2): pass self.can_interface = CAN.DenaliCanMessenger(can_interface=self.can_Channel, logger=fakeLogger() ) self.can_interface.start() if self.can_interface is not None: self.can_enabled = True def fp_versions(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int): """ Broadcasts the current dd Version Data (Msg ID: 0x0F, 15) Args: @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_byte_to_bytearray (major ) payload += conversions.unsigned_byte_to_bytearray (minor ) payload += conversions.unsigned_byte_to_bytearray (micro ) payload += conversions.unsigned_short_to_bytearray (build ) payload += conversions.unsigned_byte_to_bytearray (fpga_id ) payload += conversions.unsigned_byte_to_bytearray (fpga_major ) payload += conversions.unsigned_byte_to_bytearray (fpga_minor ) payload += conversions.unsigned_byte_to_bytearray (fpga_lab ) payload += conversions.unsigned_integer_to_bytearray(compatibility_rev) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_VERSION_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def fp_pressure(self, m1Pressure : float, m3Pressure : float, p8Pressure : float, p13Pressure : float, p17Pressure : float, x1Pressure : float, x2Pressure : float, x3Pressure : float, x4Pressure : float): """ Broadcasts the current FP Level Sensor data (Msg ID: 0x34, 52) Args: m1Pressure (float): Pressure before pressure regulator (M1) m3Pressure (float): Pressure after pressure regulator (M3) p8Pressure (float): Pressure before inlet conductivity sensor (P8) p13Pressure (float): Pressure before RO filter (P13) p17Pressure (float): Pressure after RO filter (P17) x1Pressure (float): Pressure before RO pump (X1) x2Pressure (float): Pressure RO Concentrate x3Pressure (float): Pressure RO Concentrate drop x4Pressure (float): Pressure drain drop @return: None """ payload = conversions.float_to_bytearray(m1Pressure ) payload += conversions.float_to_bytearray(m3Pressure ) payload += conversions.float_to_bytearray(p8Pressure ) payload += conversions.float_to_bytearray(p13Pressure ) payload += conversions.float_to_bytearray(p17Pressure ) payload += conversions.float_to_bytearray(x1Pressure ) payload += conversions.float_to_bytearray(x2Pressure ) payload += conversions.float_to_bytearray(x3Pressure ) payload += conversions.float_to_bytearray(x4Pressure ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_PRESSURES_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_ro_pump(self, p12PumpState : int , p12PumpDutyCycle : int , p12PumpFBDutyCycle : int , p12PumpSpeed : float, p12TargetPressure : float, p12TargetFlow : float, p12TargetDutyCycle : float, p12PumpDutyCyclePct : float, p12PumpFBDutyCyclePct : float): """ Broadcasts the current FP RO Pump data (Msg ID: 0x32, 50) Args: p12PumpState (int ): RO pump current state. p12PumpDutyCycle (int ): RO pump duty cycle at driver level. p12PumpFBDutyCycle (int ): RO pump feedback duty cycle. p12PumpSpeed (float): RO pump speed (RPM). p12TargetPressure (float): RO pump target pressure for control. p12TargetFlow (float): RO pump target flow ( in mL/min ) for control. p12TargetDutyCycle (float): RO pump target duty cycle for open loop control. p12PumpDutyCyclePct (float): RO pump duty cycle as a percentage. p12PumpFBDutyCyclePct (float): RO pump feedback duty cycle as a percentage. @return: None """ payload = conversions.unsigned_integer_to_bytearray(p12PumpState ) payload += conversions.unsigned_integer_to_bytearray(p12PumpDutyCycle ) payload += conversions.unsigned_integer_to_bytearray(p12PumpFBDutyCycle ) payload += conversions.float_to_bytearray (p12PumpSpeed ) payload += conversions.float_to_bytearray (p12TargetPressure ) payload += conversions.float_to_bytearray (p12TargetFlow ) payload += conversions.float_to_bytearray (p12TargetDutyCycle ) payload += conversions.float_to_bytearray (p12PumpDutyCyclePct ) payload += conversions.float_to_bytearray (p12PumpFBDutyCyclePct ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_RO_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_level(self, p25Level : int): """ Broadcasts the current FP level data (Msg ID: 0x35, 53) Args: p25Level (int): P25 level from floater sensor. @return: None """ payload = conversions.unsigned_integer_to_bytearray(p25Level ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_LEVEL_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_flow_sensors(self, p7Flow : float, p16Flow : float, p7Temp : float, p16Temp : float): """ Broadcasts the current FP flow data (Msg ID: 0x36, 54) Args: p7Flow (float): P7 flow rate at water inlet. p16Flow (float): P16 flow rate at RO filter outlet. p7Temp (float): P7 temperature from inlet flow sensor. p16Temp (float): P16 temperature from outlet flow sensor. @return: None """ payload = conversions.float_to_bytearray(p7Flow ) payload += conversions.float_to_bytearray(p16Flow ) payload += conversions.float_to_bytearray(p7Temp ) payload += conversions.float_to_bytearray(p16Temp ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_FLOW_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_conductivity(self, p9Conductivity : float, p18Conductivity : float): """ Broadcasts the current FP conductivity data (Msg ID: 0x37, 55) Args: p9Conductivity (float): (P9) conductivity sensor value p18Conductivity (float): (P18) conductivity sensor value @return: None """ payload = conversions.float_to_bytearray(p9Conductivity ) payload += conversions.float_to_bytearray(p18Conductivity ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_CONDUCTIVITY_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_temperature(self, p10Temp : float, p19Temp : float, m1Temp : float, m3Temp : float, p8Temp : float, p13Temp : float, p17Temp : float, x1Temp : float, x2Temp : float, x3Temp : float, x4Temp : float, p7Temp : float, p16Temp : float): """ Broadcasts the current FP temperature data (Msg ID: 0x39, 57) Args: p10Temp (float): P10 temperature from inlet conductivity sensor. p19Temp (float): P19 temperature from outlet conductivity sensor. m1Temp (float): Temperature before pressure regulator (M1) m3Temp (float): Temperature after pressure regulator (M3) p8Temp (float): Temperature before inlet conductivity sensor (P8) p13Temp (float): Temperature before RO filter (P13) p17Temp (float): Temperature after RO filter (P17) x1Temp (float): Temperature before RO pump (X1) x2Temp (float): Temperature RO Concentrate x3Temp (float): Temperature RO Concentrate drop x4Temp (float): Temperature drain drop p7Temp (float): Temperature RO inlet p16Temp (float): Temperature RO outlet @return: None """ payload = conversions.float_to_bytearray(p10Temp ) payload += conversions.float_to_bytearray(p19Temp ) payload += conversions.float_to_bytearray(m1Temp ) payload += conversions.float_to_bytearray(m3Temp ) payload += conversions.float_to_bytearray(p8Temp ) payload += conversions.float_to_bytearray(p13Temp ) payload += conversions.float_to_bytearray(p17Temp ) payload += conversions.float_to_bytearray(x1Temp ) payload += conversions.float_to_bytearray(x2Temp ) payload += conversions.float_to_bytearray(x3Temp ) payload += conversions.float_to_bytearray(x4Temp ) payload += conversions.float_to_bytearray(p7Temp ) payload += conversions.float_to_bytearray(p16Temp ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_TEMPERATURE_DATA.value, payload=payload) self.can_interface.send(message, 0) def fp_boost_pump(self, p40PumpState : int , p40PumpDutyCycle : int , p40PumpFBDutyCycle : int , p40PumpSpeed : float, p40TargetPressure : float, p40TargetFlow : float, p40TargetDutyCycle : float, p40PumpDutyCyclePct : float, p40PumpFBDutyCyclePct : float): """ Broadcasts the current FP Boost Pump data (Msg ID: 0x50, 80) Args: p40PumpState (int ): Boost pump current state. p40PumpDutyCycle (int ): Boost pump duty cycle at driver level. p40PumpFBDutyCycle (int ): Boost pump feedback duty cycle. p40PumpSpeed (float): Boost pump speed (RPM). p40TargetPressure (float): Boost pump target pressure for control. p40TargetFlow (float): Boost pump target flow ( in mL/min ) for control. p40TargetDutyCycle (float): Boost pump target duty cycle for open loop control. p40PumpDutyCyclePct (float): Boost pump duty cycle as a percentage. p40PumpFBDutyCyclePct (float): Boost pump feedback duty cycle as a percentage. @return: None """ payload = conversions.unsigned_integer_to_bytearray(p40PumpState ) payload += conversions.unsigned_integer_to_bytearray(p40PumpDutyCycle ) payload += conversions.unsigned_integer_to_bytearray(p40PumpFBDutyCycle ) payload += conversions.float_to_bytearray (p40PumpSpeed ) payload += conversions.float_to_bytearray (p40TargetPressure ) payload += conversions.float_to_bytearray (p40TargetFlow ) payload += conversions.float_to_bytearray (p40TargetDutyCycle ) payload += conversions.float_to_bytearray (p40PumpDutyCyclePct ) payload += conversions.float_to_bytearray (p40PumpFBDutyCyclePct ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.fp_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_FP_BOOST_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0)