Index: shared/scripts/configuration/utility.py =================================================================== diff -u -r84c1284ecff1f30169d50afb688be18934eb8506 -r4800083679383c87221a7e55ad35f84b40ffb386 --- shared/scripts/configuration/utility.py (.../utility.py) (revision 84c1284ecff1f30169d50afb688be18934eb8506) +++ shared/scripts/configuration/utility.py (.../utility.py) (revision 4800083679383c87221a7e55ad35f84b40ffb386) @@ -18,6 +18,9 @@ import squish import shutil import test +import re +import builtins + from builtins import format from builtins import str as pyStr from builtins import int as pyInt @@ -27,10 +30,20 @@ from datetime import timezone from datetime import datetime - def color_verification(exp_val = "Red", act_val = "#c53b33"): test.compare(config.COLOR_CODES[color_name],(act_val.color[name])) +def get_object_from_names(names_dict, error_message = "Missing object", timeout_ms = 200): + """ + To get an object with try..except catching to prevent script errors when the object is not found on the GUI + @param names_dict - the dictionary element from the names.py file (ie: names.some_variable_name_of_element) + @returns the object with corresponding dictionary, otherwise "None" + """ + try: + return squish.waitForObject(names_dict, timeout_ms) + except LookupError: + test.fail("ERROR : " + error_message) + return None def get_text_object(screen_obj, txt): """ @@ -155,6 +168,9 @@ names.o_vitals_reading["text"] = reading return names.o_vitals_reading +def get_heparin_string_with_mock(originalString): + return re.sub("{.*?}", "0.1mL", originalString) + def erase_entered_value(input_field): """ Method to erase the entered value @@ -163,39 +179,41 @@ test.startSection("Erasing value") input_field= squish.waitForObject(input_field) entered_value = str(input_field.text) - for value in range(len(entered_value)+1): - utils.waitForGUI(0.1) - squish.mouseClick(squish.waitForObjectExists(names.o_back_space_key)) + if len(entered_value) != 0: + for value in range(len(entered_value)+1): + utils.waitForGUI(0.1) + squish.mouseClick(squish.waitForObjectExists(names.o_back_space_key)) test.compare(str(input_field.text), "", "Input field should be empty") test.endSection() -def scroll_to_zone(zone=None, screen_object=None, direction = None): +def scroll_to_zone(targetObject=None, screen_object=None, direction = None, yDegreeScrollDisplace = 20): """ scroll to the to the value if object is hidden - @param value - (obj) value object - @param container - (obj) Container of the value + @param targetObject - (obj) value object + @param screen_object - (obj) Container of the value + @param direction: None indicate a horizontal direction, anything else a vertical direction + @param yDegreeScrollDisplace: The amount of degrees in the y direction to scroll @return boolean true and false """ counter = 0 + + # Figure out the screen's dimensions + ScreenObj = squish.findObject(screen_object) + screenHeight = pyInt(ScreenObj.height) + screenWidth = pyInt(ScreenObj.width) + while counter <= 100: - try: - counter += 1 - squish.findObject(zone) - squish.snooze(0.5) - if check_if_object_is_within_the_container(obj=zone, container=screen_object): - return True - else: - raise RuntimeError - except RuntimeError: - ScreenObj = squish.findObject(screen_object) - screenHeight = pyInt(ScreenObj.height) - screenWidth = pyInt(ScreenObj.width) + counter += 1 + squish.findObject(targetObject) + squish.snooze(0.5) + if check_if_object_is_within_the_container(obj=targetObject, container=screen_object): + return True + else: if direction is None: - squish.mouseWheel(ScreenObj, (screenWidth-100), 107, 0, -(screenHeight-460), squish.Qt.NoModifier) + squish.mouseWheel(ScreenObj, (screenWidth-100), 107, 0, -yDegreeScrollDisplace, squish.Qt.NoModifier) else: - squish.mouseWheel(ScreenObj, (screenWidth-100), -(screenHeight-700), 0, 200, squish.Qt.NoModifier) - - raise LookupError("zone object is not in view to the user after trying 100 times") + squish.mouseWheel(ScreenObj, (screenWidth-100), -(screenHeight-700), 0, yDegreeScrollDisplace, squish.Qt.NoModifier) + raise LookupError("targetObject object is not in view to the user after trying 100 times") def get_alarm_id_obj(id): @@ -511,48 +529,51 @@ test.compare(step_title.color.name, config.ENABLED_COLOR) #To verify the step indicators of the current treatment screen elif page == treatment_step: - test.verify(squish.waitForObjectExists(get_bullet_object(screen_obj, page)).current,) - test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).complete) - test.compare(bullet_circle_color, config.CURRENT_COLOR) test.compare(bullet_border_color, config.COMPLETE_COLOR) test.compare(step_title.color.name, config.ENABLED_COLOR) - test.verify(step_title.font.bold) #To verify the step indicators of the remaining treatment screens else: - test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).current,) - test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).complete,) - test.compare(step_title.color.name, config.INCOMPLETE_COLOR) + test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).complete) test.compare(bullet_circle_color, config.CURRENT_COLOR) - test.compare(bullet_border_color, config.INCOMPLETE_COLOR) test.endSection() -def verify_color_of_entry(entry, vital_parameter, input_field): +def verify_color_of_entry(entry, vital_parameter, input_field, is_complete = False): """ Method to verify the color of entry of systolic, diastolic and heart rate @param entry: (int) user user entered value @param vital_parameter - (str) parameter name under which user is entering value (sys/dia/heart rate) @param input_field - (obj) object of input field + @param is_complete - indicate whether all vital fields are filled in. """ test.startSection("Verify the color of {} value {}".format(vital_parameter, entry)) input_field_color = input_field.color.name - entry = pyInt(entry) + entry = builtins.int(entry) if vital_parameter is config.SYSTOLIC_TEXT: if (entry < config.SYSTOLIC_LOWER_LIMIT) or (entry > config.SYSTOLIC_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "systolic value {} is out of range, systolic value should be in range of {} and {}".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) elif (entry >= config.SYSTOLIC_LOWER_LIMIT) and (entry <= config.SYSTOLIC_UPPER_LIMIT): - test.compare(input_field_color, config.IN_RANGE_COLOR, "systolic value {} is in range of {} and {}".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) + if is_complete is True: + test.compare(input_field_color, config.IN_RANGE_COLOR, "systolic value {} is in range of {} and {}".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) + else: + test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "systolic value {} is in range of {} and {}, but incomplete vital fields".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) elif vital_parameter is config.DIASTOLIC_TEXT: if (entry < config.DIASTOLIC_LOWER_LIMIT) or (entry > config.DIASTOLIC_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "diastolic value {} is out of range, diastolic value should be in range of {} and {}".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) elif (entry >= config.DIASTOLIC_LOWER_LIMIT) and (entry <= config.DIASTOLIC_UPPER_LIMIT): - test.compare(input_field_color, config.IN_RANGE_COLOR, "diastolic value {} is in range of {} and {}".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) + if is_complete is True: + test.compare(input_field_color, config.IN_RANGE_COLOR, "diastolic value {} is in range of {} and {}".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) + else: + res = test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "diastolic value {} is in range of {} and {}, but incomplete vital fields".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) + if res is not True: + test.compare(True, True); elif vital_parameter is config.HEART_RATE_TITLE: if (entry < config.HEART_RATE_LOWER_LIMIT) or (entry > config.HEART_RATE_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "Heart Rate value {} is out of range, Heart Rate value should be in range of {} and {}".format(entry, config.HEART_RATE_LOWER_LIMIT, config.HEART_RATE_UPPER_LIMIT)) elif (entry >= config.HEART_RATE_LOWER_LIMIT) and (entry <= config.HEART_RATE_UPPER_LIMIT): - test.compare(input_field_color, config.IN_RANGE_COLOR, "Heart Rate value {} is in range of {} and {}".format(entry, config.HEART_RATE_LOWER_LIMIT, config.HEART_RATE_UPPER_LIMIT)) + test.compare(input_field_color,config.IN_RANGE_COLOR, "Heart Rate value {} is in range of {} and {}".format(entry, config.HEART_RATE_LOWER_LIMIT, config.HEART_RATE_UPPER_LIMIT)) test.endSection() + def keypad_input(key_value): @@ -615,35 +636,45 @@ #TODO: dynamic name try: current_date = get_current_date_and_time(date_format = "%Y_%m_%d") - latest_file = config.LOG_LOCATION + current_date + '_denaliSquish.log' + latest_file = config.LOG_LOCATION + current_date + '_denali.log' return latest_file except: return False def get_message_from_log(file_name, message_text, check_ack_bak = False): - MAX_LINE = 30 + MAX_LINE = 100 lines_from_last = 0 test.log(str(file_name)) message_to_return = None ack_bak_message = None - with open(file_name, 'r') as log_file: - lines = reversed(list(log_file.readlines())) - for line in lines: - if message_text in line: - message_to_return = line - if check_ack_bak: - continue - else: - return message_to_return - elif lines_from_last >= MAX_LINE: - if message_to_return is not None: - return message_to_return, ack_bak_message - test.fail("handler unable to find message text from log file.") - return + try: + with open(file_name, 'r') as log_file: + listOfLines = list(log_file.readlines()) + if len(listOfLines) > 0: + lines = reversed(listOfLines) + for line in lines: + if message_text in line: + message_to_return = line + if check_ack_bak: + continue + else: + return message_to_return, ack_bak_message + elif lines_from_last >= MAX_LINE: + if message_to_return is not None: + return message_to_return, ack_bak_message + test.fail("handler unable to find message text from log file.") + return message_to_return, ack_bak_message + else: + if config.ACK_BAK_STATUS in line: + ack_bak_message = line + lines_from_last += 1 else: - if config.ACK_BAK_STATUS in line: - ack_bak_message = line - lines_from_last += 1 + test.fail(f"Log file is empty - {file_name}") + return None, None + except IOError: + # file opening failed + test.fail(f"ERROR : Opening {file_name} failed") + return None, None # def get_message_from_log(file_name, message_text): # @@ -786,39 +817,73 @@ # ack_bak_status = get_bak_request_details(file_name, ack_bak_value) # content_record.append(ack_bak_status) return content_record + -def rename_file(path,filename): +def rename_file(path,filename, newFilename=""): """ This method Rename the original folder name to dummy name. - """ - folder_path = os.listdir(path) - for files in folder_path: - if files == filename+"_1": - if os.path.exists(path+filename+"_1"): - shutil.rmtree(path+filename+"_1") - test.log(str(filename+" name changed to "+files)) - if files == filename: - if os.path.exists(path+filename): - utils.waitForGUI(1) - os.rename(path+filename,path+filename+"_1") - test.log(str(files+" name changed to "+filename+"_1")) + """ + newPath = path + filename + "_1" + if newFilename is not "": + newPath = path + newFilename + + originalPath = path + filename + + if os.path.exists(originalPath): + if os.path.exists(newPath) is not True: + try : + os.rename(originalPath, newPath) + return True # renaming was successful + + # If Source is a file but destination is a directory + except IsADirectoryError: + test.fail(f"ERROR: {originalPath} is a file but destination is a directory.") + + # If source is a directory but destination is a file + except NotADirectoryError: + test.fail(f"ERROR: {originalPath} is a directory but destination is a file.") + + # For permission related errors + except PermissionError: + test.fail("ERROR: Operation not permitted.") + + # For other errors + except OSError as error: + test.fail(f"ERROR: {error}") + else: + test.fail(f"ERROR : Renaming {originalPath} to {newPath} failed") + else: + test.fail(f"ERROR : {originalPath} does not exist") + + return False #default return if the renaming failed in any form + def rename_old_name(path,filename): + #TODO need to refactor to use the rename_file function above """ This method Rename the dummy name to original folder name . """ - folder_path = os.listdir(path) - for files in folder_path: - if files == filename: - if os.path.exists(path+filename): - shutil.rmtree(path+filename) - test.log(str(files+"_1 name changed to "+filename)) - utils.waitForGUI(5) - if files != filename: - if (files == filename+"_1"): - os.rename(path+files,path+filename) - test.log(str(files+" name changed to "+filename)) + newFileName = filename + oldFileName = filename + "_1" + if os.path.exists(path+newFileName): + # During running of AUT, the application re-creates the folder (eg: unittests/resources/settings) + # need to delete before renaming + shutil.rmtree(path+newFileName) + if os.path.exists(path+oldFileName): + os.rename(path+oldFileName, path+newFileName) + test.log(str(oldFileName+" name changed to "+newFileName)) + else: + test.log(str(oldFileName+" failed to rename to "+newFileName)) +def remove_files_in_folder(folder_path): + if not os.path.exists(folder_path): + return + + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + os.remove(file_path) + + def vitals_interval_obj(interval): names.o_time_interval_obj["text"] = interval return names.o_time_interval_obj @@ -859,7 +924,7 @@ @return latest_file - (string) returns latest file that append on log folder from sd-data """ try: - list_of_files = glob.glob(config.POST_TREATMENT_LOG_LOCATION) + list_of_files = glob.glob(config.POST_TREATMENT_LOG_FILTER) latest_file = max(list_of_files, key=os.path.getctime) return latest_file except: @@ -998,7 +1063,7 @@ latest_file = config.INP_BUF_FILE_LOCATION + current_date + '_inp.buf' return latest_file except: - return False + return "" def retrive_log_data(readline_count = 1): """ @@ -1010,6 +1075,10 @@ cloudsync_data = [] count = 0 file_name = get_extracted_input_buf_file() + if len(file_name) == 0: + #empty filename, fail + test.fail("Extracted input buffer filename empty, error") + return [] try: with open(file_name,mode = 'r') as filereader: contents = csv.reader(filereader) @@ -1123,37 +1192,57 @@ utils.waitForGUI(0.2) flag = False title_found_status = False + list_title_found_status = False alarm_info_dict = {} with open(config.ALARM_MAPPING_CSV_FILE_LOCATION, 'r') as csv_file: csv_messages = list(csv.reader(csv_file)) with open(config.ALARMS_CONF_LOCATION, 'r') as conf_file: alarm_id = 1 for row in list(csv.reader(conf_file)): + print(f'ROW : {row}') try: - if flag is True and title_found_status is True: + # IF BOTH THE TITLE AND LISTTITLE ATTRIB ARE PARSED, GET THE MESSAGE + if flag is True and list_title_found_status is True: alarm_message = ",".join(row).split('= ') temp_message = " ".join(alarm_message[1::]) if '\\n' in temp_message: temp_message = '\n'.join(temp_message.split('\\n')) + print(f'TEMP title : {temp_message}') alarm_info_dict[alarm_id]['message'] = temp_message flag = False + list_title_found_status = False title_found_status = False if alarm_info_dict[alarm_id]['message'] == '' and alarm_id != config.NUM_OF_ALARM_ID: alarm_info_dict[alarm_id]['message'] = f'{csv_messages[alarm_id][-1].strip()[::]}' alarm_id += 1 - if flag is True: + # GET THE TITLE ATTRIBUTE + if flag is True and title_found_status is False: alarm_title = ",".join(row).split('= ') if alarm_title[0].split()[0] == 'Title': title_found_status = True temp_alarm_title = " ".join(alarm_title[1::]) - print(f'TEMP: {temp_alarm_title}') + print(f'TEMP title : {temp_alarm_title}') if '\\n' in temp_alarm_title: - print(f'HERE {alarm_id}') + print(f'HERE title {alarm_id}') temp_alarm_title = '\n'.join(temp_alarm_title.split('\\n')) - print(f'HERE {alarm_id}***{temp_alarm_title}') + print(f'HERE title {alarm_id}***{temp_alarm_title}') alarm_info_dict[alarm_id]['title'] = temp_alarm_title if alarm_info_dict[alarm_id]['title'] == '': alarm_info_dict[alarm_id]['title'] = 'Alarm' + # GET THE LISTTITLE ATTRIBUTE + if flag is True and title_found_status is True: + alarm_title = ",".join(row).split('= ') + if alarm_title[0].split()[0] == 'ListTitle': + list_title_found_status = True + temp_alarm_title = " ".join(alarm_title[1::]) + print(f'TEMP listTitle: {temp_alarm_title}') + if '\\n' in temp_alarm_title: + print(f'HERE listTitle {alarm_id}') + temp_alarm_title = '\n'.join(temp_alarm_title.split('\\n')) + print(f'HERE listTitle {alarm_id}***{temp_alarm_title}') + alarm_info_dict[alarm_id]['listTitle'] = temp_alarm_title + if alarm_info_dict[alarm_id]['listTitle'] == '': + alarm_info_dict[alarm_id]['listTitle'] = 'Alarm' if len(row) == 0: continue else: