########################################################################### # # Copyright (c) 2021-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file fp_defs.py # # @author (last) Zoltan Miskolci # @date (last) 28-Jan-2026 # @author (original) Peter Lucia # @date (original) 22-Jun-2021 # ############################################################################ from enum import unique from aenum import extend_enum from typing import TYPE_CHECKING from .defs_base import Defs_Base from .dd_defs import dd_enum_repository from .global_vars import GlobalVariables, test_config_change from ..utils.base import DialinEnum class FP_Defs(Defs_Base): # IDE autocomplete suppor if TYPE_CHECKING: FPOpModes: DialinEnum FPFaultStates: DialinEnum FPServiceStates: DialinEnum FPPostStates: DialinEnum FPStandbyStates: DialinEnum FPPreGenPermeateStates: DialinEnum FPPreGenPDefStates: DialinEnum FPGenPermeateStates: DialinEnum FPGenPermeateDefStates: DialinEnum FPNotLegalStates: DialinEnum FPEventList: DialinEnum FPEventDataTypes: DialinEnum FPBoostPumpNames: DialinEnum FPConductivitySensorNames: DialinEnum FPRORejectionRatioNames: DialinEnum FPFlowSensorNames: DialinEnum FPFloaterLevelSensorNames: DialinEnum FPPressureSensorNames: DialinEnum FPROPumpNames: DialinEnum FPTemperatureSensorNames: DialinEnum FPValveNames: DialinEnum FPAllPumpNames: DialinEnum FPFloaterLevels: DialinEnum FPPumpAttributes: DialinEnum FPValveStates: DialinEnum # A Dictionary storing which Property(Enum) should be created by which function _properties = { # Operation / State enums 'FPOpModes': '_create_fp_operation_modes', 'FPFaultStates': '_create_fp_fault_states', 'FPServiceStates': '_create_fp_service_states', 'FPPostStates': '_create_fp_init_post_states', 'FPStandbyStates': '_create_fp_standby_states', 'FPPreGenPermeateStates': '_create_fp_pregen_states', 'FPPreGenPDefStates': '_create_fp_pregen_defeatured_states', 'FPGenPermeateStates': '_create_fp_gen_permeate_states', 'FPGenPermeateDefStates': '_create_fp_gen_permeate_defeatured_states', 'FPNotLegalStates': '_create_fp_not_legal_states', # Event enums 'FPEventList': '_create_fp_eventlist', 'FPEventDataTypes': '_create_fp_event_data_types', # Module Names enums 'FPBoostPumpNames': '_create_fp_boost_pump_names', 'FPConductivitySensorNames': '_create_fp_conductivity_sensor_names', 'FPRORejectionRatioNames': '_create_fp_ro_rejection_ratio_names', 'FPFlowSensorNames': '_create_fp_flow_sensor_names', 'FPFloaterLevelSensorNames': '_create_fp_floater_sensor_names', 'FPPressureSensorNames': '_create_fp_pressure_sensor_names', 'FPROPumpNames': '_create_fp_ro_pump_names', 'FPValveNames': '_create_fp_valve_names', 'FPAllPumpNames': '_create_fp_all_pump_names', 'FPTemperatureSensorNames': '_create_fp_temperature_sensor_names', # Support enums 'FPFloaterLevels': '_create_fp_floater_levels', 'FPPumpAttributes': '_create_fp_pump_attributes', 'FPValveStates': '_create_fp_valve_states', } # ================================================== Enum Creators: Operations ================================================== def _create_fp_operation_modes(self): members = { # Official Name : Accepted strings 'MODE_FAUL': ['fault'], # Fault mode 'MODE_SERV': ['service'], # Service mode 'MODE_INIT': ['initialization'], # Initialization & POST mode 'MODE_STAN': ['standby'], # Standby mode 'MODE_PRE_GENP': ['pre-gen', 'pre-generate permeate'], # Pre Generate Permeate mode 'MODE_GENP': ['generate permeate'], # Generate Permeate mode 'MODE_DPGP': ['defeatured pre-generate permeate', 'defeatured pre-gen'], # Defeatured Pre-Generate Permeate mode 'MODE_DEGP': ['defeatured generate permeate'], # Defeatured Generate Permeate mode 'MODE_NLEG': ['not legal'], # Not legal - an illegal mode transition occurred 'NUM_OF_FP_MODES': [], # Number of FP operation modes } created_enum = unique(DialinEnum('FPOpModes', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_fault_states(self): members = { # Official Name : Accepted strings 'FP_FAULT_STATE_START': ['fault start'], # FP Fault Start State 'FP_FAULT_DEENERGIZED_STATE': ['fault deenergized'], # FP Fault De-energized State 'FP_FAULT_ENERGIZED_STATE': ['fault energized'], # FP Fault Energized State 'NUM_OF_FP_FAULT_STATES': [], # Number of Fault Mode State } created_enum = unique(DialinEnum('FPFaultStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_service_states(self): members = { # Official Name : Accepted strings 'NUM_OF_FP_SERV_STATES': [], # Number of FP service states } created_enum = unique(DialinEnum('FPServiceStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_init_post_states(self): members = { # Official Name : Accepted strings 'FP_POST_STATE_START': ['start'], # Start initialize & POST mode state 'FP_POST_STATE_FW_INTEGRITY': ['fw integrity'], # Run firmware integrity test state 'FP_POST_STATE_NVDATAMGMT': ['nv data management', 'data management'], # Run NV Data Mgmt. test state 'FP_POST_STATE_WATCHDOG': ['watchdog'], # Run watchdog test state 'FP_POST_STATE_SAFETY_SHUTDOWN': ['safety shutdown'], # Run safety shutdown test state 'FP_POST_STATE_TEMPERATURES': ['temperatures', 'temperatures check'], # Run temperatures POST state 'FP_POST_STATE_FW_COMPATIBILITY': ['fw compatibility'], # Run firmware compatibility test state 'FP_POST_STATE_FPGA': ['fpga'], # Run FPGA test state 'FP_POST_STATE_COMPLETED': ['completed'], # POST self-tests completed state 'FP_POST_STATE_FAILED': ['failed'], # POST self-tests failed state 'NUM_OF_FP_POST_STATES': [] # Number of initialize & POST mode states } created_enum = unique(DialinEnum('FPPostStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_standby_states(self): members = { # Official Name : Accepted strings 'FP_STANDBY_MODE_STATE_IDLE': ['idle'], # Idle standby mode state 'NUM_OF_FP_STANDBY_MODE_STATES': [], # Number of standby mode states } created_enum = unique(DialinEnum('FPStandbyStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_pregen_states(self): members = { # Official Name : Accepted strings 'FP_PRE_GENP_INLET_PRES_CHECK': ['inlet pressure check', 'pressure check'], # FP Pre Gen Permeate Inlet Pressure Check State 'FP_PRE_GENP_FILTER_FLUSH': ['filter flush'], # FP Pre Gen Permeate Filter Flush State 'FP_PRE_GENP_PERMEATE_FLUSH': ['permeate flush'], # FP Pre Gen Permeate Permeate Flush State 'FP_PRE_GENP_CONCENTRATE_FLUSH': ['concentrate flush'], # FP Pre Gen Permeate Concentrate Flush State 'FP_PRE_GENP_VERIFY_WATER': ['verify water'], # FP Pre Gen Permeate Verify Water State 'FP_PRE_GENP_PAUSED': ['paused'], # FP Pre Gen Permeate Paused State 'NUM_OF_FP_PRE_GENP_MODE_STATES': [], # Number of Pre-Gen Permeate mode states } created_enum = unique(DialinEnum('FPPreGenPermeateStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_pregen_defeatured_states(self): members = { # Official Name : Accepted strings 'FP_PRE_GENP_DEF_FLUSH': ['flush', 'defeatured flush', 'def flush'], # Pre Gen Permeate Defeatured Flush state 'FP_PRE_GENP_DEF_INLET_WATER_CHECK': ['inlet water check', 'def inlet water check', 'defeatured inlet water check'], # Pre Gen Permeate Defeatured Inlet Water Check state 'FP_PRE_GENP_DEF_PAUSED': ['paused', 'defeatured paused', 'def paused'], # Defeatured Pre Gen Permeate Paused state 'NUM_OF_FP_PRE_GENP_DEF_MODE_STATES': [], # Number of Defeatured Pre Gen Permeate states } created_enum = unique(DialinEnum('FPPreGenPDefStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_gen_permeate_states(self): members = { # Official Name : Accepted strings 'FP_GENP_TANK_FILL_STATE': ['fill', 'tank fill'], # Gen Permeate Tank Fill low state 'FP_GENP_TANK_FULL_STATE': ['full', 'tank full'], # Gen Permeate Tank Full state 'NUM_OF_FP_GENP_MODE_STATES': [], # Number of Gen permeate states } created_enum = unique(DialinEnum('FPGenPermeateStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_gen_permeate_defeatured_states(self): members = { # Official Name : Accepted strings 'FP_GENP_DEF_SUPPLY_WATER': ['supply water', 'def supply water', 'defeatured supply water'], # Gen Permeate Defeatured Supply Water statee 'FP_GENP_DEF_PAUSED': ['paused', 'def paused', 'defeatured paused'], # Gen Permeate Defeatured Paused state 'NUM_OF_FP_GENP_DEF_MODE_STATES': [], # Number of Defeatured Gen permeate states } created_enum = unique(DialinEnum('FPGenPermeateDefStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_not_legal_states(self): members = { # Official Name : Accepted strings 'NUM_OF_NOT_LEGAL_STATES': [], # Number of Not Legal states } created_enum = unique(DialinEnum('FPNotLegalStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_eventlist(self): members = { # Official Name : Accepted strings 'FP_EVENT_STARTUP': ['startup'], # FP startup event 'FP_EVENT_OP_MODE_CHANGE': ['op mode change'], # FP Op mode change event 'FP_EVENT_SUB_MODE_CHANGE': ['sub mode change'], # FP Op sub-mode change event 'FP_EVENT_PRE_GEN_RO_SET_PWM': ['ro pump set pwm'], # FP gen permeate ro set pwm event 'FP_EVENT_GENP_BOOST_SET_PWM': ['boost pump set pwm'], # FP gen permeate boost set pwm event 'FP_EVENT_GENP_CHANGE': ['generate permeate change', 'genp change'], # FP gen permeate state change 'FP_EVENT_PRE_GEN_CHANGE': ['pre-generate permeate change', 'pre-genp change'], # FP pre gen state change 'FP_EVENT_PRE_GEN_DEF_CHANGE': ['defeatured pre-generate permeate change', 'def pre-genp change'], # FP defeatured pre gen state change 'FP_EVENT_GENP_DEF_CHANGE': ['defeatured generate permeate change', 'def genp change'], # FP defeatured gen permeate state change 'FP_EVENT_FAULT_ALARM_TRIGGER': ['alarm', 'alarm trigger', 'fault alarm trigger'], # FP event for alarms that would trigger 'NUM_OF_FP_EVENT_IDS': [], # Number of FP events } created_enum = unique(DialinEnum('FPEventList', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_event_data_types(self): members = { # Official Name : Accepted strings 'EVENT_DATA_TYPE_NONE': ['none'], # No Event Data Type 'EVENT_DATA_TYPE_U32': ['u32'], # Unsigned 32bit Event Data Type 'EVENT_DATA_TYPE_S32': ['s32'], # Signed 32bit Event Data Type 'EVENT_DATA_TYPE_F32': ['f32'], # Float 32bit Event Data Type 'EVENT_DATA_TYPE_BOOL': ['bool', 'boolean'], # Boolean Event Data Type 'NUM_OF_EVENT_DATA_TYPES': [], # Number of Event Data Types } created_enum = unique(DialinEnum('FPEventDataTypes', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum # ================================================== Enum Creators: Names ================================================== def _create_fp_boost_pump_names(self): members = { # Official Name : Accepted strings 'P40_PUMP': ['p40'], # FP Boost Pump 'NUM_OF_BOOST_PUMPS': [], # Number of FP Boost Pumps } created_enum = unique(DialinEnum('FPBoostPumpNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_conductivity_sensor_names(self): members = { # Official Name : Accepted strings 'P9_COND': ['p9'], # CPi (P9) conductivity sensor value 'P18_COND': ['p18'], # CPo (P18) conductivity sensor value 'NUM_OF_FP_CONDUCTIVITY_SENSRORS': [], # Number of Conductivity sensors } created_enum = unique(DialinEnum('FPConductivitySensorNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_ro_rejection_ratio_names(self): members = { # Official Name : Accepted strings 'RAW_RO_REJECTION_RATIO': ['raw ro rejection ratio'], # Raw RO Rejection Ratio value 'RAW_RO_REJECTION_RATIO_TANK_FILL': ['raw ro tank fill rejection ratio'], # Raw RO Rejection Ratio Tank Fill value 'AVG_RO_REJECTION_RATIO': ['avg ro rejection ratio', 'average ro rejection ratio'], # Average RO Rejection Ratio value 'AVG_RO_REJECTION_RATIO_TANK_FILL': ['avg ro tank fill rejection ratio', 'average ro tank fill rejection ratio'], # Average RO Rejection Ratio Tank Fill value 'GEN_PERMEATE_STATE': ['generate permeate state', 'generate permeate'], # Generate Permeate State value 'NUM_OF_FP_RO_REJECTION_RATIO': [], # Number of RO Rejection Ratio items } created_enum = unique(DialinEnum('FPRORejectionRatioNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_flow_sensor_names(self): members = { # Official Name : Accepted strings 'P7_FLOW': ['p7'], # Water inlet flow sensor (FMS P7) 'P16_FLOW': ['p16'], # RO outlet flow sensor (FMP P16) 'NUM_OF_FP_FLOW_SENSORS': [], # Number of FP Flow Sensors } created_enum = unique(DialinEnum('FPFlowSensorNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_floater_sensor_names(self): members = { # Official Name : Accepted strings 'P25_LEVEL': ['p25'], # P25 Level sensor 'NUM_OF_FP_LEVEL_SENSORS': [], # Number of Level Sensors } created_enum = unique(DialinEnum('FPFloaterLevelSensorNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_pressure_sensor_names(self): members = { # Official Name : Accepted strings 'M3_PRES': ['m3'], # Water inlet pressure before regulator 'P8_PRES': ['p8'], # Water inlet pressure after regulator 'P13_PRES': ['p13'], # Water inlet pressure before the conductivity sensor 'P17_PRES': ['p17'], # Pressure before the RO filter 'P46_PRES': ['p46'], # Pressure after the RO filter 'NUM_OF_FP_PRES_SENSORS': [], # Number of Pressure Sensors } created_enum = unique(DialinEnum('FPPressureSensorNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_ro_pump_names(self): members = { # Official Name : Accepted strings 'P12_PUMP': ['p12'], # P12 RO Pump 'NUM_OF_RO_PUMPS': [], # Number of RO Pumps } created_enum = unique(DialinEnum('FPROPumpNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_temperature_sensor_names(self): members = { # Official Name : Accepted strings 'M3_TEMP': ['m3'], # Temperature after pressure regulator (M3) 'P8_TEMP': ['p8'], # Temperature before inlet conductivity sensor (P8) 'P13_TEMP': ['p13'], # Temperature before RO filter (P13) 'P17_TEMP': ['p17'], # Temperature after RO filter (P17) 'P46_TEMP': ['p46'], # Temperature after sediment filter (P46) 'P10_TEMP': ['p10'], # P10 temperature from inlet conductivity sensor. 'P19_TEMP': ['p19'], # P19 temperature from outlet conductivity sensor. 'P7_TEMP': ['p7'], # Temperature at p7 flow meter 'P16_TEMP': ['p16'], # Temperature at p16 flow meter 'P7_INTERNAL_TEMP': ['p7 internal'], # Interal Temperature at p7 flow meter 'P16_INTERNAL_TEMP': ['p16 internal'], # Interal Temperature at p16 flow meter 'NUM_OF_TEMPERATURE_SENSORS': [], # Number of Temperature Sensors } created_enum = DialinEnum('FPTemperatureSensorNames', self.create_members(members)) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) # Add aliases extend_enum(created_enum, 'FIRST_PRESSURE_SENSOR', created_enum.M3_TEMP.value) extend_enum(created_enum, 'LAST_PRESSURE_SENSOR', created_enum.P46_TEMP.value) extend_enum(created_enum, 'FIRST_CONDUCTIVITY_SENSOR', created_enum.P10_TEMP.value) extend_enum(created_enum, 'LAST_CONDUCTIVITY_SENSOR', created_enum.P19_TEMP.value) extend_enum(created_enum, 'FIRST_FLOW_SENSOR', created_enum.P7_TEMP.value) extend_enum(created_enum, 'LAST_FLOW_SENSOR', created_enum.P16_TEMP.value) # Add aliases to be searchable (listed when pressed TAB) created_enum._member_names_.append('FIRST_PRESSURE_SENSOR') created_enum._member_names_.append('LAST_PRESSURE_SENSOR') created_enum._member_names_.append('FIRST_CONDUCTIVITY_SENSOR') created_enum._member_names_.append('LAST_CONDUCTIVITY_SENSOR') created_enum._member_names_.append('FIRST_FLOW_SENSOR') created_enum._member_names_.append('LAST_FLOW_SENSOR') return created_enum def _create_fp_valve_names(self): members = { # Official Name : Accepted strings 'M4_VALV': ['m4'], # Valve (M4 VWi) 'M12_VALV': ['m12'], # Valve (M12) 'P11_VALV': ['p11'], # Valve (P11 VPi) 'P33_VALV': ['p33'], # Valve (P33 VCr) 'P34_VALV': ['p34'], # Valve (P34 VCb) 'P37_VALV': ['p37'], # Valve (P37 VCd) 'P39_VALV': ['p39'], # Valve (P39 VROd) 'P6_VALV': ['p6'], # Valve (P6 VFF) 'NUM_OF_FP_VALVES': [], # Number of valves } created_enum = unique(DialinEnum('FPValveNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_all_pump_names(self): members = { # Official Name : Accepted strings 'P12_PUMP_RO': ['p12', 'ro pump'], # P12 RO Pump 'P40_PUMP_BOOSTER': ['p40', 'boost pump'], # P40 Boost Pump 'NUM_OF_PUMPS': [], # Number of pumps } created_enum = unique(DialinEnum('FPAllPumpNames', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum # ================================================== Enum Creators: Support ================================================== def _create_fp_floater_levels(self): members = { # Official Name : Accepted strings 'LEVEL_STATE_EMPTY': ['empty'], # Empty level for the FP Floater Level Sensors 'LEVEL_STATE_LOW': ['low'], # Low level for the FP Floater Level Sensors 'LEVEL_STATE_MEDIUM': ['medium'], # Medium level for the FP Floater Level Sensors 'LEVEL_STATE_HIGH': ['high'], # High level for the FP Floater Level Sensors 'LEVEL_STATE_ILLEGAL': ['illegal'], # Illegal level for FP Floater Level Sensor (indicates investigation) 'NUM_OF_LEVEL_STATES': [], # Number of Level States } created_enum = unique(DialinEnum('FPFloaterLevels', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_pump_attributes(self): members = { # Official Name : Accepted strings 'STATE': [], # State to command attribute for Pumps 'DUTY_CYCLE': [], # Duty Cycle attribute for Pumps 'FB_DUTY_CYCLE': ['feedback duty cycle'], # Feedback Duty Cycle attribute for Pumps 'SPEED': [], # Current Speed attribute for pumps 'TARGET_PRES': ['target pressure'], # Target Pressure attribute for pumps 'TARGET_FLOW': [], # Target Flow attribute for pumps 'TARGET_DUTY_CYCLE': [], # Target Duty Cycle attribute for pumps 'DUTY_CYCLE_PCT': ['duty cycle percentage'], # Duty Cycle Percentage attribute for pumps 'FB_DUTY_CYCLE_PCT': ['feedback duty cycle percentage'], # Feedback Duty Cycle Percentage attribute for pumps 'NUM_OF_RO_PUMP_ATTRIBUTES': [], # Number of pump attributes } created_enum = unique(DialinEnum('FPPumpAttributes', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum def _create_fp_valve_states(self): members = { # Official Name : Accepted strings 'VALVE_STATE_CLOSED': ['closed'], # Valve Closed State 'VALVE_STATE_OPEN': ['open'], # Valve Open State 'NUM_OF_VALVE_STATES': [], # Valve Open State } created_enum = unique(DialinEnum('FPValveStates', self.create_members(members))) created_enum._str_list = members created_enum.from_str = classmethod(self.from_str) return created_enum # ================================================== Creates a singleton ================================================== fp_enum_repository = FP_Defs() # Attach listener to update function test_config_change.add_listener(fp_enum_repository.update_enums) BETA_IOFP_COND_SENSOR_OFFSET = dd_enum_repository.DDConductivitySensorNames.NUM_OF_CONDUCTIVITY_SENSORS.value # Offset to translate Dialin FPTemperaturesNames to DD firmware DD Conductivity Sensors enum BETA_IOFP_PRES_SENSOR_OFFSET = dd_enum_repository.DDPressureSensorNames.NUM_OF_PRESSURE_SENSORS.value # Offset to translate Dialin FPTemperaturesNames to DD firmware DD Pressure Sensors enum BETA_IOFP_VALVE_OFFSET = dd_enum_repository.DDValveNames.NUM_OF_DD_VALVES.value # Offset to translate Dialin FPValveNames to DD firmware DD_Valves