########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file helpers.py # # @author (last) Quang Nguyen # @date (last) 07-Jul-2021 # @author (original) Peter Lucia # @date (original) 29-Apr-2021 # ############################################################################ import logging import subprocess from datetime import datetime from .base import LogManager def setup_virtual_can_interface(): """ Convenience function to setup a virtual can interface using the most common settings See the setup_can function to setup a can interface with custom settings. @return: True if successful, false otherwise """ bring_interface_down(interface="can0", delete=True) is_present = is_interface_present("can0") if not is_present: setup_can(interface="can0", is_virtual=True) result = is_interface_up("can0") if result: print("Success") return True print("Failure") return False def setup_real_can_interface(): """ Convenience function to setup a real can interface using the most common settings See the setup_can function to setup a can interface with custom settings. @return: True if successful, False otherwise """ delete_virtual = input("Delete virtual can interface? (y,n)") if delete_virtual.upper() in ["Y", "YES"]: bring_interface_down(interface="can0", delete=True) if is_interface_present("can0"): print("Failure") return False else: bring_interface_down(interface="can0", delete=False) if not is_interface_present("can0") or is_interface_up("can0"): print("Failure") return False input("Press 'Enter' after plugging in the PCAN-USB.") setup_can(interface="can0", is_virtual=False, bitrate=250000, restart_ms=100, txqueue=10000) if is_interface_present("can0") and is_interface_up("can0"): print("Success") return True else: print("Failure") return False def bring_interface_down(interface: str = "can0", delete=False): """ Brings the specified can interface down @param interface: (str) the interface name @param delete: (bool) whether to delete the interface after bringing it down @return: None """ cmd_iface_down = "sudo ifconfig {0} down > /dev/null 2>&1".format(interface) cmd_iface_delete = "sudo ip link delete dev {0} > /dev/null 2>&1".format(interface) success, info = run_cmd(cmd_iface_down) if not success: print("Warning: Could not bring interface down: {0}".format(info)) if delete: run_cmd(cmd_iface_delete) if not success: print("Warning: Could not delete the interface: {0}".format(info)) def run_cmd(cmd: str): """ Runs the provided command, returns if it was successful and output or error information @param cmd: (str) the command to run @return: tuple(bool, info) Whether the command was successful. If true info is the output, otherwise info contains the error information """ try: result = subprocess \ .check_output(cmd, shell=True).decode("utf-8").strip() return True, result except subprocess.CalledProcessError as e: return False, str(e) def is_interface_up(interface: str = "can0"): """ Checks if the specified interface is listed and up @param interface: @return: (bool) True if up, False if not or if we can't tell """ cmd_check_interface = "ifconfig {0}".format(interface) success, info = run_cmd(cmd_check_interface) if success: if interface in info and "UP,RUNNING" in info: return True return False def is_interface_present(interface: str = "can0"): """ Checks if the specified interface is listed @param interface: @return: (bool) True if present, False if not or if we can't tell """ cmd_check_interface = "ip a" success, info = run_cmd(cmd_check_interface) if success: if interface in info: return True return False def setup_can(interface: str = "can0", is_virtual=False, bitrate=250000, restart_ms=100, txqueue=10000): """ Convenience function to setup a can interface @param interface: (str) The interface name @param is_virtual: (bool) If the interface is virtual or not @param bitrate: (int) The desired bitrate (applies to non-virtual interfaces only) @param restart_ms: (int) The desired restart_ms (applies to non-virtual interfaces only) @param txqueue: (int) The desired txqueue length @return: """ cmd_iface_up = "sudo ip link set {0} up type can bitrate {1} restart-ms {2}" \ .format(interface, bitrate, restart_ms) cmd_txqueue = "sudo ifconfig {0} txqueuelen {1}".format(interface, txqueue) cmd_iface_type_virtual = "sudo ip link add dev {0} type vcan".format(interface) cmd_iface_up_virtual = "sudo ip link set {0} up".format(interface) if is_virtual: success, info = run_cmd(cmd_iface_type_virtual) if not success: print("Warning: Could not set iface type to virtual: {0}".format(info)) success, info = run_cmd(cmd_iface_up_virtual) if not success: print("Warning: Could not bring up virtual interface: {0}".format(info)) else: success, info = run_cmd(cmd_iface_up) if not success: print("Warning: Could not bring interface up: {0}".format(info)) success, info = run_cmd(cmd_txqueue) if not success: print("Warning: Could not set txtqueue length: {0}".format(cmd_txqueue)) def create_logger(log_path: str = "/tmp/DialinScript.log", level: str = "ERROR", enable_metadata=True, clear_before_write=False): """ Convenience function to create a logger for external Dialin scripts @param log_path: (str) The full path to the output log file. @param level: (str) The logging level (e.g. INFO, WARN, DEBUG, ERROR, CRITICAL) @param enable_metadata: (bool) if True, include metadata, otherwise, log only the message @param clear_before_write: (bool) if True, clear log file before write to it @return: (logging.Logger) The logger object """ numeric_level = getattr(logging, level, logging.ERROR) current_time = datetime.now() # create a new unique logger using current date and time logger = logging.getLogger("DialinScript{0}{1}{2}".format(current_time.hour, current_time.minute, current_time.second)) logger.setLevel(numeric_level) if clear_before_write: fh = logging.FileHandler(log_path, mode='w') else: fh = logging.FileHandler(log_path) fh.setLevel(numeric_level) ch = logging.StreamHandler() ch.setLevel(numeric_level) if enable_metadata: formatter = logging.Formatter(fmt=LogManager.LOG_FMT, datefmt=LogManager.LOG_DT_FMT) else: formatter = logging.Formatter(fmt=LogManager.LOG_FMT_NO_METADATA) fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger def find_variables_in_object(obj, value, starts_with: str = ""): """ Returns a list of variable names that in the object that match the searched value @param obj: (object) The object to search through @param value: (object) The value to lookup @param starts_with: (str) The @return: (List[str]) A list of variable names matching the searched value """ result = [] for attr in dir(obj): if not callable(getattr(obj, attr)) and attr.startswith(starts_with): if value == getattr(obj, attr): result.append(attr) return result