########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file utils.py # # @author (last) Quang Nguyen # @date (last) 22-Jul-2021 # @author (original) Peter Lucia # @date (original) 11-Nov-2020 # ############################################################################ import time import os import squish import struct from subprocess import check_output RainierMonitorApplication = "denaliSquish" COMMON_PATH = f"{os.environ['HOME']}/denali/Projects/dialin" def srsui(vstr=""): """ for user convenience if wanted to put a note for the SRSUI it makes sure all the numbers are 3 digit aligned to easily read on test results. """ return "SRSUI " + "{}".format(vstr).rjust(3, '0') def isVisible(object, vobject): """ checks if the object is visible. Note: in SquishQt it's a little different since usual check is wait Note : object is the squish's builtin variable which should be available within the test I don't thing importing would be a good idea here since it might be a runtime assigned value and it may have different/undefined value when imported out of test """ if object.exists(vobject): o_object = findObject(vobject) if o_object is None: return False else: return o_object.visible else: return False def waitForGUI(delay_s: int = 2): """ a global 2 seconds default wait method which is used to wait for GUI animations mostly to make sure the object is available. @param delay_s: integer - amount of second for delay @return: none """ time.sleep(delay_s) def toUXX(value: int, byte_count: int, delimiter: str) -> str: """ converts the value to hex @param value: (int) the value @param byte_count: (int) number of bytes the value will take eg. U32=4, ... @param delimiter: (str) the output delimiter @return: hex formated conversion of the value """ x = '{0:0{1}X}'.format(int(value) & (2 ** (4 * byte_count) - 1), byte_count, 'x') byte_arr = partition(x, 2) return delimiter.join(byte_arr) def toI32(value: int, delimiter: str = "") -> str: """ a convenient method for toUXX to convert the value to integer 4 bytes @param value: (int) the value @param delimiter: (str) the output delimiter @return: hex formated conversion of the vValue to int 4 bytes """ return toUXX(value, 8, delimiter) def toI16(value: int, delimiter: str = "") -> str: """ a convenient method for toUXX to convert the value to integer 2 bytes @param value: (int) the value @param delimiter: (str) the output delimiter @return: hex formated conversion of the vValue to int 2 bytes """ return toUXX(value, 4, delimiter) def toI08(value: int, delimiter: str = "") -> str: """ a convenient method for toUXX to convert the value vValue to integer 1 byte @param value: (int) the value @param delimiter: (str) the output delimiter @return: hex formated conversion of the vValue to int 1 bytes """ return toUXX(value, 2, delimiter) def toF32(value: float) -> str: """ converts value to floating point 4 bytes. @param value: (float) the value @return: hex formated conversion of the vValue to float 4 bytes """ return '{:08X}'.format(struct.unpack('f', value))[0], 'X') def partition(string: str, part: int, right_rirection: bool = True) -> list: """ splits the given string into sections of vPart long and puts the sections from left to right if right_rirection is False and puts the sections from right to left if right_rirection is True after the split is done @param string: (str) the given string @param part: (int) length of the section @param right_rirection: (bool) order of sections in the output list @return: (list) the section of the string in list """ return [string[i: i + part] for i in range(0, len(string), part)][::-1 if right_rirection else 1] def padding(string: str, length: int) -> any: """ added zero at the right side of the string to be of length of vLen @param string: (str) the string to add trailing zero to @param length: (int) the entire length of the string @return: (str) padded string """ str_len = len(string) pad_len = int(str_len / length) * length + (length * (1 if str_len % length else 0)) return string.ljust(pad_len, "0") def tstStart(test_name) -> None: """ test case start print out with time @param test_name: (str) name of the test case @return: none - prints out on the console """ print(time.strftime("%H:%M:%S Start", time.localtime()) + " - " + test_name) def tstDone() -> None: """ test case end print out with time @return: none - prints out on the console """ print(time.strftime("%H:%M:%S Done ", time.localtime())) def l2ml(value) -> int: """ converts liter to milliliter @param (int) value: the value in liter. @return: (int) value converted to milliliter. """ return int(round(value, 3) * 1000) def ml2l(value): """ converts milliliter to liter @param (int) value: the value in milliliter. @return: (int) value converted to liter. """ return value / 1000 def dict_update(obj: dict, key: str, value): """ Adds a key and value to a dictionary object without updating the original dictionary @param obj: (dict) the object to update @param key: (str) the key name @param value: the new value of the field @return: (dict) the updated dictionary object """ obj = obj.copy() obj[key] = value return obj def check_can0(): """ check if the can0 bus driver presents @return: (list) false if can0 not exists, msg as a message """ canid = "can0" ipa = "ip a" ipa = check_output(ipa, shell=True) loc = str(ipa).find(canid) if loc >= 0: msg = "can device '{}' found".format(canid) else: msg = "No can device registered as '{}'".format(canid) return loc >= 0, msg def cleanup(): """ Cleanup for simulator @return: None """ os.chdir(f"{COMMON_PATH}/ui") try: res = os.system("sudo ./cleanup.sh") waitForGUI(2) if (res != 0) and (res != 256): raise Exception("Incorrect command to kill the running simulator processes. " + \ "Please check the cleanup.sh and provide required permissions") except Exception as msg: test.log(pyStr(msg)) test.log("Cleaned up running simulator") def startApplication(app_executable, app_name): """ Function to start application and verify application status [running] If application does not start or running status is false, test stops Argument: @param app_name : (str) - Name of the application @param app_executable : (str) - Actual application @return: handle for the application if the application is in running state, or error (exist the application) """ counter = 0 while True: try: counter += 1 test.log("Starting Application {}".format(app_name)) startApplication(app_executable) # if app.isRunning == True: # test.passes("{} application is running".format(app_name)) if counter == 1: test.log(f"Application launched at the {counter}'st try.") elif counter == 2: test.log(f"Application launched at the {counter}'nd try.") elif counter == 3: test.log(f"Application launched at the {counter}'rd try.") else: test.log(f"Application launched at the {counter}'th try.") squish.snooze(20) break # else: # test.fail("{} startup failure or the application is hung".format(app_name)) except RuntimeError: if counter == 1: test.log(f"Application failed to launch after {counter} try - Please refer logs") elif counter == 20: test.log(f"Exiting after {counter} tries..") sys.exit(1) else: test.log(f"Application failed to launch after {counter} tries - Please refer logs") except: logErrorDetails("Failed to start the application") sys.exit(1) def launch_application(test_name ): """ Method to enables simulator and launch application @param test_name: (str) name of the test case @return: None, print out in the console """ cleanup() # cleanup before setup test.log("Launching a new instance of File Simulator") try: os.chdir(f"{COMMON_PATH}/ui") res = os.system("./sim_setup.sh") waitForGUI(delay_s = 5) if (res != 0) and (res != 256): raise Exception("Wrong path to the simulator executable was given. Script not executed") except Exception as msg: test.log(pyStr(msg)) test.log("Launched a new instance of Simulator") startApplication(RainierMonitorApplication, "Denali Application")