#!/bin/python3 from leahi_dialin.ui.utils import singleton_threadsafe from leahi_dialin.utils import conversions from leahi_dialin.protocols import CAN from leahi_dialin.common import msg_ids @singleton_threadsafe class TD_Messaging(): def __init__(self): self.can_enabled: bool=False self.can_Channel: str = "can0" class fakeLogger(): def __init__(self): pass def error(a1, a2): pass def info(a1, a2): pass self.can_interface = CAN.DenaliCanMessenger(can_interface=self.can_Channel, logger=fakeLogger() ) self.can_interface.start() if self.can_interface is not None: self.can_enabled = True def td_operation_mode(self, op_mode: int, sub_mode: int = 0): """ Broadcasts the current TD operation mode @param op_mode : operation mode @param sub_mode : operation sub-mode @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(op_mode ) payload += conversions.integer_to_bytearray(sub_mode) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_OP_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_temperatures(self, board_temp: float): """ Broadcasts the current TD Temperatures Data @param board_temp : TD Board temperature @return: None """ # TODO: replace with proper payload and message ID once message is defined if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray(board_temp) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_TEMPERATURE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_blood_pump(self, set_flow: int, meas_flow: float, rot_speed: float, mot_speed: float, curr_motor: float, set_rpm: float, rot_count: int, pres_flow: int, rot_hall_state: int): """ Broadcasts the current TD Blood Pump Data @param set_flow : set flow rate @param meas_flow : meaured speed @param rot_speed : measured rotor speed @param mot_speed : measured blood pump speed @param curr_motor : current motor @param set_rpm : set rpm @param pres_flow : pres_flow @param rot_count : rotation count @param rot_hall_state : rotor hall state @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray (set_flow ) payload += conversions.float_to_bytearray (meas_flow ) payload += conversions.float_to_bytearray (rot_speed ) payload += conversions.float_to_bytearray (mot_speed ) payload += conversions.float_to_bytearray (curr_motor ) payload += conversions.float_to_bytearray (set_rpm ) payload += conversions.unsigned_integer_to_bytearray(rot_count ) payload += conversions.unsigned_integer_to_bytearray(pres_flow ) payload += conversions.unsigned_integer_to_bytearray(rot_hall_state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_BLOOD_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_air_trap(self, low_level: int, upper_level: int, lower_level_raw: int, upper_level_raw: int, valve_state: int, controlling: int): """ Broadcasts the current TD air trap data @param low_level : lower level value @param upper_level : upper level value @param low_level_raw : lower level raw value @param upper_level_raw : upper level raw value @param valve_state : air valve state @param controlling : air control @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(low_level ) payload += conversions.unsigned_integer_to_bytearray(upper_level ) payload += conversions.unsigned_integer_to_bytearray(lower_level_raw ) payload += conversions.unsigned_integer_to_bytearray(upper_level_raw ) payload += conversions.unsigned_integer_to_bytearray(valve_state ) payload += conversions.unsigned_integer_to_bytearray(controlling ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_AIR_TRAP_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_ejector(self, state: int, set_speed: float): """ Broadcasts the current TD operation mode @param state : ejector state @param set_speed : ejector set speed @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(state ) payload += conversions.float_to_bytearray (set_speed) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_EJECTOR_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_switches(self, door: int): """ Broadcasts the current TD switch Data @param door : door status @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(door ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_SWITCHES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_battery(self, capacity: int, ac_power: int): """ Broadcasts the current TD Battery Data @param capacity : battery capacity @param ac_power : a/c power status @return: None """ # TODO: replace with proper payload and message ID once message is defined if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(capacity) payload += conversions.integer_to_bytearray(ac_power) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_BATTERY_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_valves(self, valve_id: int, state: int, pos_name: int, pos_count: int, next_pos: int): """ Broadcasts the current TD Battery Data @param valve_id : Valve ID @param state : Valve State @param pos_name : Position Name @param pos_count : Position Count @param next_pos : Next Position @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(valve_id ) payload += conversions.unsigned_integer_to_bytearray(state ) payload += conversions.unsigned_integer_to_bytearray(pos_name ) payload += conversions.short_to_bytearray (pos_count ) payload += conversions.short_to_bytearray (next_pos ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VALVES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_voltages(self, line_1_2v: float, line_3_3v: float, line_logic_5v: float, line_sensors_5v: float, line_24v: float, line_regen_24v: float, fpga_adc_ref: float, res_ref: float, fpga_vcc: float, fpga_vaux: float, fpga_vpvn: float): """ Broadcasts the current TD voltage data @param line_1_2v : 1.2V line @param line_3_3v : 3.3V line @param line_logic_5v : Logic voltage (5V) @param line_sensors_5v : Sensors voltage (5V) @param line_24v : Actuators voltage (24V) @param line_regen_24v : Actuators regen voltage (24V) @param fpga_adc_ref : FPGA ADC reference voltage (1V) @param res_ref : PBA ADC reference voltage (3V) @param fpga_vcc : FPGA Vcc (3V) @param fpga_vaux : FPGA Vaux (3V) @param fpga_vpvn : FPGA Vpvn (1V) @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray(line_1_2v ) payload += conversions.float_to_bytearray(line_3_3v ) payload += conversions.float_to_bytearray(line_logic_5v ) payload += conversions.float_to_bytearray(line_sensors_5v ) payload += conversions.float_to_bytearray(line_24v ) payload += conversions.float_to_bytearray(line_regen_24v ) payload += conversions.float_to_bytearray(fpga_adc_ref ) payload += conversions.float_to_bytearray(res_ref ) payload += conversions.float_to_bytearray(fpga_vcc ) payload += conversions.float_to_bytearray(fpga_vaux ) payload += conversions.float_to_bytearray(fpga_vpvn ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VOLTAGES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_versions(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int): """ Broadcasts the current TD Version Data @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_byte_to_bytearray (major ) payload += conversions.unsigned_byte_to_bytearray (minor ) payload += conversions.unsigned_byte_to_bytearray (micro ) payload += conversions.unsigned_short_to_bytearray (build ) payload += conversions.unsigned_byte_to_bytearray (fpga_id ) payload += conversions.unsigned_byte_to_bytearray (fpga_major ) payload += conversions.unsigned_byte_to_bytearray (fpga_minor ) payload += conversions.unsigned_byte_to_bytearray (fpga_lab ) payload += conversions.unsigned_integer_to_bytearray(compatibility_rev) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VERSION_REPONSE.value, payload=payload) self.can_interface.send(message, 0) def td_serial(self, serial: str): """ the td version serial response message method @param serial: serial number @return: None """ # TODO: replace with proper payload and message ID once message is defined payload = bytes(serial, 'ascii') + b'\x00' message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_SERIAL_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def td_request_ui_versions(self): """ the td request UI versions @return: None """ message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_UI_VERSION_INFO_REQUEST.value) self.can_interface.send(message, 0)