########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file utils.py # @author (last) LTTS # @date (last) 15-Jan-2022 # ############################################################################ import csv import glob import names import object import squish import shutil import test import re import builtins from builtins import format from builtins import str as pyStr from builtins import int as pyInt from dialin.ui import utils from configuration import config from dialin.utils import * from datetime import timezone from datetime import datetime def color_verification(exp_val = "Red", act_val = "#c53b33"): test.compare(config.COLOR_CODES[color_name],(act_val.color[name])) def get_object_from_names(names_dict, error_message = "Missing object", timeout_ms = 200): """ To get an object with try..except catching to prevent script errors when the object is not found on the GUI @param names_dict - the dictionary element from the names.py file (ie: names.some_variable_name_of_element) @returns the object with corresponding dictionary, otherwise "None" """ try: return squish.waitForObject(names_dict, timeout_ms) except LookupError: test.fail("ERROR : " + error_message) return None def get_text_object(screen_obj, txt): """ To obtain a text object based on text provided @param screen_obj: provides the container on which the txt must be present @returns a real name object """ names.o_text_object["container"] = screen_obj names.o_text_object["text"] = txt return names.o_text_object def get_bullet_object(screen_obj, num): """ To obtain a bullet object based on occurrence provided. @param screen_obj: provides the container on which the bullet must be present @param num: provides the occurrence value @returns a real name object """ names.o_bullet_object["container"] = screen_obj names.o_bullet_object["occurrence"] = num + 1 return names.o_bullet_object def get_indicators(screen_obj, txt): """ Verifying the busy indicators for BiCarb Pump Check and Acid Pump Check. indicator object of expected text @param step - (str) expected text @return indicators - (obj) list of busy and check indicator """ parent_obj = object.parent(squish.waitForObjectExists(get_text_object(screen_obj,txt))) children_obj = object.children(parent_obj) indicator_parent = children_obj[2] indicators = object.children(indicator_parent) return indicators def check_if_object_is_within_the_container(obj=None, container=None): """ check if an object is inside a container @param obj - child UI object @param container - container UI object @return boolean True/False """ container = squish.findObject(container) containerPos = container.mapToGlobal(squish.QPoint(0, 0)) container_x, container_y = pyInt(containerPos.x), pyInt(containerPos.y) container_width, container_height = pyInt(container.width), pyInt(container.height) obj = squish.findObject(obj) objPos = obj.mapToGlobal(squish.QPoint(0, 0)) obj_x, obj_y = pyInt(objPos.x), pyInt(objPos.y) obj_width, obj_height = pyInt(obj.width), pyInt(obj.height) if obj_x >= container_x and obj_y >= container_y: if (obj_x + obj_width) <= (container_x + container_width) and (obj_y + obj_height) <= (container_y + container_height): return True return False def verify_bullet_navigation(num, num_of_instructions, screen_obj): """ Method to verify status of bullets based on number of instruction screen @param num - (int) number of indicator @param num_of_instructions- (int) count the number of instructions @param object - screen_obj - (str) Screen object """ test.startSection("instruction bullet verification for screens") for instruction in range(1, num_of_instructions): bullet_children = object.children(squish.waitForObjectExists(get_bullet_object(screen_obj,(0 + instruction) - 1))) bullet_circle_color = bullet_children[0].color.name bullet_border_color = bullet_children[0].border.color.name if instruction <= num: test.compare(bullet_circle_color, config.COMPLETE_COLOR) test.compare(bullet_border_color,config.COMPLETE_COLOR) test.log(str(instruction) + " Complete bullet") else: test.compare(bullet_circle_color, config.CURRENT_COLOR) test.compare(bullet_border_color,config.INCOMPLETE_COLOR) test.log(str(instruction) + " Incomplete bullet") test.endSection() def convert_seconds_into_min_and_sec(seconds, time_format="%M:%S"): """ Method to convert seconds into minute format. @param seconds - time in seconds. @param time_format (str) - time format. @return (int) - minute time """ seconds = int(seconds) min_and_sec = time.strftime(time_format, time.gmtime(seconds)) return min_and_sec def verify_missing_object(object_to_check): """ Method to verify the given object is invisible or is not present on the screen @param object_to_check: the object whose invisibility must be verified """ try: squish.testSettings.objectNotFoundDebugging = False squish.waitForObject(object_to_check,3000) test.fail("Given object should not be present initially") except LookupError as _: test.passes("object is not present as expected") squish.testSettings.objectNotFoundDebugging = True def pressure_pop_up_text_obj(text): names.o_pop_up_pressure_text_obj["text"] = text return names.o_pop_up_pressure_text_obj def pressure_text_obj(text): names.o_pressure_text_obj["text"] = text return names.o_pressure_text_obj def get_current_date_and_time(date_format='%Y/%b/%d - %H:%M'): date = datetime.now() return str(date.strftime(date_format)) def vitals_reading_obj(reading): names.o_vitals_reading["text"] = reading return names.o_vitals_reading def get_heparin_string_with_mock(originalString): return re.sub("{.*?}", "0.1mL", originalString) def erase_entered_value(input_field): """ Method to erase the entered value @param input_field - (obj) object of input field """ test.startSection("Erasing value") input_field= squish.waitForObject(input_field) entered_value = str(input_field.text) if len(entered_value) != 0: for value in range(len(entered_value)+1): utils.waitForGUI(0.1) squish.mouseClick(squish.waitForObjectExists(names.o_back_space_key)) test.compare(str(input_field.text), "", "Input field should be empty") test.endSection() def scroll_to_zone(targetObject=None, screen_object=None, direction = None, yDegreeScrollDisplace = 20): """ scroll to the to the value if object is hidden @param targetObject - (obj) value object @param screen_object - (obj) Container of the value @param direction: None indicate a horizontal direction, anything else a vertical direction @param yDegreeScrollDisplace: The amount of degrees in the y direction to scroll @return boolean true and false """ counter = 0 # Figure out the screen's dimensions ScreenObj = squish.findObject(screen_object) screenHeight = pyInt(ScreenObj.height) screenWidth = pyInt(ScreenObj.width) while counter <= 100: counter += 1 squish.findObject(targetObject) squish.snooze(0.5) if check_if_object_is_within_the_container(obj=targetObject, container=screen_object): return True else: if direction is None: squish.mouseWheel(ScreenObj, (screenWidth-100), 107, 0, -yDegreeScrollDisplace, squish.Qt.NoModifier) else: squish.mouseWheel(ScreenObj, (screenWidth-100), -(screenHeight-700), 0, yDegreeScrollDisplace, squish.Qt.NoModifier) raise LookupError("targetObject object is not in view to the user after trying 100 times") def get_alarm_id_obj(id): names.o_alarm_id_text["text"] = id return names.o_alarm_id_text def get_alarm_msg_obj(msg): names.o_alarm_message["text"] = msg return names.o_alarm_message def rejection_msg(text): names.o_notificationbar_response["text"] = text return names.o_notificationbar_response def rejection_msg_recirc(text): names.o_notification_bar_recirc["text"] = text return names.o_notification_bar_recirc def set_arterial_ranges_min_val(art_low): """ Method to set the Arterial range maximum value to user expected value @param art_low - (int) user expected value """ test.startSection("Set Arterial range minimum value to {}".format(art_low)) arterial_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_min = pyInt(arterial_min.minimum) arterial_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_max = pyInt(arterial_max.maximum) low_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery)) low_handler_children = object.children(low_handler_parent) low_handler = low_handler_children[-2] width = pyInt(low_handler.width) - 8 height = pyInt(low_handler.height)- 10 if arterial_min == art_low: test.passes("Arterial range minimum is already set to {}".format(art_low)) elif arterial_min < art_low: while arterial_min != art_low: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_min += 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) break else: continue elif arterial_min > art_low: while arterial_min != art_low: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_min -= 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) break else: continue # arterial blood pressure low limit should be lower than the high limit by atleast 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(low_handler_parent.minValue, arterial_min, "Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) else: test.compare(arterial_min, art_low, "Actual Arterial range minimum value: {} is equal to Expected value: {}".format(arterial_min, art_low)) test.endSection() def set_arterial_ranges_max_val(art_high): """ Method to set the Arterial range maximum value to user expected value @param art_high - (int) user expected value """ test.startSection("Set Arterial range maximum value to {}".format(art_high)) arterial_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_max = pyInt(arterial_max.maximum) arterial_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_min = pyInt(arterial_min.minimum) high_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery)) high_handler_children = object.children(high_handler_parent) high_handler = high_handler_children[-1] width = pyInt(high_handler.width) - 20 height = pyInt(high_handler.height) - 25 if arterial_max == art_high: test.passes("Arterial range maximum is already set to {}".format(art_high)) elif arterial_max < art_high: while arterial_max != art_high: squish.mouseDrag(high_handler, -1, height, width, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_max += 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) break else: continue elif arterial_max > art_high: while arterial_max != art_high: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_max -= 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) break else: continue # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(high_handler_parent.maxValue, arterial_max, "Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) else: test.compare(arterial_max, art_high, "Actual Arterial range maximum value: {} is equal to Expected value: {}".format(arterial_max, art_high)) test.endSection() def set_venous_ranges_max_val(ven_high): """ Method to set the Venous range maximum value to user expected value @param ven_high - (int) user expected value """ test.startSection("Set Venous range maximum value to {}".format(ven_high)) ven_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_max = pyInt(ven_max.maximum) ven_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_min = pyInt(ven_min.minimum) high_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous)) high_handler_children = object.children(high_handler_parent) high_handler = high_handler_children[-1] width = pyInt(high_handler.width) - 15 height = pyInt(high_handler.height) - 10 if ven_max == ven_high: test.passes("Venous range maximum is already set to {}".format(ven_high)) elif ven_max < ven_high: while ven_max != ven_high: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_max += 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range maximum value cannot be moved beyond {}".format(ven_max)) break else: continue elif ven_max > ven_high: while ven_max != ven_high: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_max -= 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range maximum value cannot be moved beyond {}".format(ven_max)) break else: continue # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(high_handler_parent.maxValue, ven_max, "Venous range maximum value cannot be moved beyond {}".format(ven_max)) else: test.compare(ven_max, ven_high, "Actual Venous range maximum value: {} is equal to Expected value: {}".format(ven_max, ven_high)) test.endSection() #Methods for create custom treatment def set_venous_ranges_min_val(ven_low): """ Method to set the Venous range maximum value to user expected value @param ven_low - (int) user expected value """ test.startSection("set Venous range minimum value to {}".format(ven_low)) ven_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_min = pyInt(ven_min.minimum) ven_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_max = pyInt(ven_max.maximum) low_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous)) low_handler_children = object.children(low_handler_parent) low_handler = low_handler_children[-2] width = pyInt(low_handler.width) - 15 height = pyInt(low_handler.height) - 10 if ven_min == ven_low: test.passes("Venous range minimum is already set to {}".format(ven_low)) elif ven_min < ven_low: while ven_min != ven_low: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_min += 10 if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: # venous blood pressure low limit should be lower than the high limit by at least 30mmHg squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range minimum value cannot be moved beyond {}".format(ven_min)) break else: continue elif ven_min > ven_low: while ven_min != ven_low: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_min -= 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range minimum value cannot be moved beyond {}".format(ven_min)) break else: continue # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(low_handler_parent.minValue, ven_min, "Venous range minimum value cannot be moved beyond {}".format(ven_min)) else: test.compare(ven_min, ven_low, "Actual Venous range minimum value: {} is equal to Expected value: {}".format(ven_min, ven_low)) test.endSection() def expected_heparin_value(val): names.o_heparin_value["text"] = val return names.o_heparin_value def msg(string): """ Added ### at the right side of the string to make sure that it is a message. @param string: (str) the string to add trailing ### to @return pad_str: (str) padded string """ padded_str = "###"+string return padded_str def navigate_to_pretreatment_screen(mode): """ Method to navigate to sub mode under pre-treatment screen @param mode - (int) pre treatment state """ hd_simulator.cmd_set_hd_operation_mode_data(HDOpModes.MODE_PRET.value,0) hd_simulator.cmd_send_pre_treatment_state_data(sub_mode=mode, water_sample_state=0, consumables_self_test_state=0, no_cartridge_self_test_state=0, installation_state=0, dry_self_test_state=0, prime_state=0, recirculate_state=0, patient_connection_state=0) def self_test_dry_check_list_text(text): names.o_self_test_dry_check_list_text["text"] = text return names.o_self_test_dry_check_list_text def get_time(screen_title): """ Method to return the current count down in the application @param screen_title - (str) current title of the screen @return time_text - (str) count down in the application """ if screen_title == config.BEGIN_PRIME_TITLE or screen_title == config.PRIMING_TITLE: parent_object = object.parent(squish.waitForObjectExists(self_test_dry_check_list_text(screen_title))) elif screen_title == config.SYSTEM_SELF_TEST_TITLE: parent_object = object.parent(squish.waitForObjectExists(names.o_system_self_test)) else: parent_object = object.parent(squish.waitForObjectExists(names.o_PreTreatmentBase_Filter_Flush_Text)) time_parent_children = object.children(parent_object) progress_circle_parent = time_parent_children[4] progress_circle_parent = object.children(progress_circle_parent) progress_circle_parent = progress_circle_parent[0] progress_circle_parent = object.children(progress_circle_parent) progress_circle_children = object.children(progress_circle_parent[0]) time_text = progress_circle_children[1] return time_text.time def verify_countdown(screen_title, time_out, hd_simulator, dg_simulator): """ Method to verify the count down time in application @param screen_title - (str) current title of the screen @param time_out - (int) time out duration in secs @Param hd_simulator - Instance of class HDSimulator @Param dg_simulator - Instance of class DGSimulator """ test.startSection("Verify the count down time in application") for count_down in range(config.COUNT_DOWN_TIME_100, config.MINIMUM_COUNTDOWN_TIME-1, -1): if screen_title == config.BEGIN_PRIME_TITLE: hd_simulator.cmd_send_pre_treatment_self_test_dry_progress_data(time_out, count_down) elif screen_title == config.PRIMING_TITLE: hd_simulator.cmd_send_pre_treatment_disposables_prime_progress_data(time_out, count_down) elif screen_title == config.SYSTEM_SELF_TEST_TITLE: hd_simulator.cmd_send_pre_treatment_self_test_no_cartridge_progress_data(time_out, count_down) else: dg_simulator.cmd_send_dg_pre_treatment_filter_flush_progress_data(time_out, count_down) actual_time = get_time(screen_title) expected_time = convert_seconds_into_min_and_sec(count_down) test.compare(actual_time, expected_time, "Actual count down time: {} should be equal to expected count down time {}".format(actual_time, expected_time)) verify_the_progress(count_down, screen_title, time_out) test.endSection() def verify_the_progress(count_down, screen_title, time_out): """ Method to verify the current progress @param count_down - (int) current count down time @param screen_title - (str) current title of the screen @param time_out - (int) time out duration in secs """ test.startSection("Verifying the current progress") if screen_title == config.BEGIN_PRIME_TITLE or screen_title == config.PRIMING_TITLE: current_progress = (squish.waitForObjectExists(names.o_self_test_dry_progress)).progressValue elif screen_title == config.SYSTEM_SELF_TEST_TITLE: current_progress = (squish.waitForObjectExists(names.o_system_self_test_progress)).progressValue else: current_progress = (squish.waitForObjectExists(names.o_filter_flush_progress)).progressValue #Since progress value is equal maximum count down value - current count down value expected_progress = time_out - count_down test.compare(current_progress, expected_progress, "{} should be the current progress".format(expected_progress)) test.endSection() def verify_page_step_indicator(screen_obj, treatment_step, treatment_screens): """ Method to verify the Page Step indicators [the object on top of the screen which indicates the steps passed, current, remained] @param treatment_step : (int) indicates the Current treatment step """ test.startSection("verification of page step indicators") for page in range(len(treatment_screens)): bullet_children = object.children(squish.waitForObjectExists(get_bullet_object(screen_obj, page))) bullet_circle_color = bullet_children[0].color.name bullet_border_color = bullet_children[0].border.color.name step_title = squish.waitForObjectExists(get_text_object(screen_obj, treatment_screens[page])) #To verify the step indicators of the completed treatment screens if page < treatment_step: test.verify(squish.waitForObjectExists(get_bullet_object(screen_obj, page)).complete) test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).current) test.compare(bullet_circle_color, config.COMPLETE_COLOR) test.compare(bullet_border_color, config.COMPLETE_COLOR) test.compare(step_title.color.name, config.ENABLED_COLOR) #To verify the step indicators of the current treatment screen elif page == treatment_step: test.compare(bullet_border_color, config.COMPLETE_COLOR) test.compare(step_title.color.name, config.ENABLED_COLOR) #To verify the step indicators of the remaining treatment screens else: test.verify(not squish.waitForObjectExists(get_bullet_object(screen_obj, page)).complete) test.compare(bullet_circle_color, config.CURRENT_COLOR) test.endSection() def verify_color_of_entry(entry, vital_parameter, input_field, is_complete = False): """ Method to verify the color of entry of systolic, diastolic and heart rate @param entry: (int) user user entered value @param vital_parameter - (str) parameter name under which user is entering value (sys/dia/heart rate) @param input_field - (obj) object of input field @param is_complete - indicate whether all vital fields are filled in. """ test.startSection("Verify the color of {} value {}".format(vital_parameter, entry)) input_field_color = input_field.color.name entry = builtins.int(entry) if vital_parameter is config.SYSTOLIC_TEXT: if (entry < config.SYSTOLIC_LOWER_LIMIT) or (entry > config.SYSTOLIC_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "systolic value {} is out of range, systolic value should be in range of {} and {}".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) elif (entry >= config.SYSTOLIC_LOWER_LIMIT) and (entry <= config.SYSTOLIC_UPPER_LIMIT): if is_complete is True: test.compare(input_field_color, config.IN_RANGE_COLOR, "systolic value {} is in range of {} and {}".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) else: test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "systolic value {} is in range of {} and {}, but incomplete vital fields".format(entry, config.SYSTOLIC_LOWER_LIMIT, config.SYSTOLIC_UPPER_LIMIT)) elif vital_parameter is config.DIASTOLIC_TEXT: if (entry < config.DIASTOLIC_LOWER_LIMIT) or (entry > config.DIASTOLIC_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "diastolic value {} is out of range, diastolic value should be in range of {} and {}".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) elif (entry >= config.DIASTOLIC_LOWER_LIMIT) and (entry <= config.DIASTOLIC_UPPER_LIMIT): if is_complete is True: test.compare(input_field_color, config.IN_RANGE_COLOR, "diastolic value {} is in range of {} and {}".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) else: res = test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "diastolic value {} is in range of {} and {}, but incomplete vital fields".format(entry, config.DIASTOLIC_LOWER_LIMIT, config.DIASTOLIC_UPPER_LIMIT)) if res is not True: test.compare(True, True); elif vital_parameter is config.HEART_RATE_TITLE: if (entry < config.HEART_RATE_LOWER_LIMIT) or (entry > config.HEART_RATE_UPPER_LIMIT): test.compare(input_field_color, config.OUT_OF_RANGE_COLOR, "Heart Rate value {} is out of range, Heart Rate value should be in range of {} and {}".format(entry, config.HEART_RATE_LOWER_LIMIT, config.HEART_RATE_UPPER_LIMIT)) elif (entry >= config.HEART_RATE_LOWER_LIMIT) and (entry <= config.HEART_RATE_UPPER_LIMIT): test.compare(input_field_color,config.IN_RANGE_COLOR, "Heart Rate value {} is in range of {} and {}".format(entry, config.HEART_RATE_LOWER_LIMIT, config.HEART_RATE_UPPER_LIMIT)) test.endSection() def keypad_input(key_value): """ Method to enter values using application UI keyboard @param key_value: (str) User expected value """ if key_value is not None: names.o_keypad_input["text"] = key_value return names.o_keypad_input else: test.log("Invalid text for object.") names.o_keypad_input["text"] = None def enter_keypad_value(entry): """ Method to enter user desired value using keypad @param entry: (str) User expected value """ test.startSection("Entering {}".format(entry)) entry = pyStr(entry) #type casted into string format for value in entry: squish.mouseClick(squish.waitForObject(keypad_input(value))) test.endSection() def is_float(num): """ This function checks the value is adaptable for float conversion. @param num - (string) input value for conversion. @return True/False- (bool) returns True if the value can type casted into float, else False """ try: if '.' in num: float(num) return True except ValueError: return False def is_intiger(num): """ This function checks the value is adaptable for integer conversion. @param num - (string) (string) input value for conversion. @return True/False- (bool) returns True if the value can type casted into int, else False """ try: if num.isdigit(): return True except ValueError: return False def get_extracted_file(): """ This function is the handler for getting file from log folder. This handler will go inside log folder and looks for newly added log based on current time. if it satisfied that condition, it will return the exact path of newly created log. Application log file is automatically created on '/home/denali/Desktop/sd-card/log/ {current_date}_denaliSquish.log' @return latest_file - (string) returns latest file that append on log folder from sd-data """ #TODO: dynamic name try: current_date = get_current_date_and_time(date_format = "%Y_%m_%d") latest_file = config.LOG_LOCATION + current_date + '_SystemSetup.log' return latest_file except: return False def get_message_from_log(file_name, message_text, check_ack_bak = False): MAX_LINE = 100 lines_from_last = 0 test.log(str(file_name)) message_to_return = None ack_bak_message = None try: with open(file_name, 'r') as log_file: listOfLines = list(log_file.readlines()) if len(listOfLines) > 0: lines = reversed(listOfLines) for line in lines: if message_text in line: message_to_return = line if check_ack_bak: continue else: return message_to_return, ack_bak_message elif lines_from_last >= MAX_LINE: if message_to_return is not None: return message_to_return, ack_bak_message test.fail("handler unable to find message text from log file.") return message_to_return, ack_bak_message else: if config.ACK_BAK_STATUS in line: ack_bak_message = line lines_from_last += 1 else: test.fail(f"Log file is empty - {file_name}") return None, None except IOError: # file opening failed test.fail(f"ERROR : Opening {file_name} failed") return None, None # def get_message_from_log(file_name, message_text): # # """ # This method intended to extract the message from application log. # For row[index], index represent column to be extracted. # @param file_name - (string) path of the latest log file created. # @param message_text - (string) message text to be extracted. # @return message - (list) API arguments displayed in log. # """ # message = [] # count = 0 # try: # with open(file_name, 'r') as csv_file: # try: # for row in reversed(list(csv.reader(csv_file))): # if row[0].isalpha(): # pass # else: # row_length = sum(1 for values in row) # if row_length >= 4: # if row[3]!= None and row[3] == message_text: # if count == 30: # test.fail("handler unable to find message text from log file.") # message_length = sum(1 for values in row) # for column in range(4, message_length, 1): # message.append(row[column]) # count +=1 # for value in range(len(message)): # float_status = is_float(message[value]) # int_status = is_intiger(message[value]) # if float_status is True: # message[value] = float(message[value]) # if int_status is True: # message[value] = int(message[value]) # test.log(str(message)) # return message # else: # pass # except: # test.fail("application log data is corrupted") # except: # test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_ack_request_details(file_name, message_text): """ This method intended to extract acknowledgement request status, negative requested acknowledgement and message id from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param message_text - (string) message text to be extracted from log. @return row[3] - (string) acknowledgement request status. @return row[4] - (string) Negative requested acknowledgement value (Sq). @return message_id - (string) formatted message id from log. """ try: message_id = " ID" with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length == 6 and row[3] != message_text: if row[5].split(':')[0] == message_id: extracted_message_id = pyInt(row[5].split(':')[1], 16) formatted_message_id = format(extracted_message_id, '#0x') # MSG_ID_HD_DEBUG_EVENT (0xFFF1) and MSG_ID_DG_DEBUG_EVENT (0xFFF2) hex values are reversed in log. string_format_id = str(formatted_message_id) first_two_char = string_format_id[2:4] last_two_char = string_format_id[-2:] if last_two_char != '00': return row[3], row[4], ('0x'+last_two_char+first_two_char) else: return row[3], row[4], formatted_message_id.replace("00", "") else: pass except: test.fail("application log data is corrupted - unable to fetch data") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_bak_request_details(file_name, ack_bak_value): """ This method intended to extract the acknowledgement back status from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param ack_bak_value - (string) Positive back acknowledgement value (Sq). @return row[3] - (string) acknowledgement back status. """ try: with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length >= 5: if row[4] == ack_bak_value: return row[3] else: pass else: pass except: test.fail("application log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_current_log_details(message_ack = False, message_text = None): """ This function is capable to perform data analysis from application log folder. logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". In row[index], index represent column to be extracted. @param message_ack - (bool) 'True' - if ack is satisfied in log / 'False' - if ack condition is not satisfied @param message_text - (string) message text to be extracted from log. @return content_record - (list) list contains extracted data (ack_req, ack_bak, message, message_id). """ content_record = [] file_name = get_extracted_file() if message_text != None: message = get_message_from_log(file_name, message_text) content_record.append(message) if message_ack != False: message_ack, message_bak = get_message_from_log(file_name, config.ACK_REQ_STATUS, check_ack_bak=True) content_record.append(message_ack) if message_bak is None: test.log("Not Ack Bak message found") else: content_record.append(message_bak) # ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) # ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack # content_record.append(ack_req_status) # content_record.append(message_id.lower()) # if message_ack != False and ack_bak_value != None: # message_bak = get_message_from_log(file_name, config.ACK_BAK_STATUS) # content_record.append(message_bak) # ack_bak_status = get_bak_request_details(file_name, ack_bak_value) # content_record.append(ack_bak_status) return content_record def rename_file(path,filename, newFilename=""): """ This method Rename the original folder name to dummy name. """ newPath = path + filename + "_1" if newFilename is not "": newPath = path + newFilename originalPath = path + filename if os.path.exists(originalPath): if os.path.exists(newPath) is not True: try : os.rename(originalPath, newPath) return True # renaming was successful # If Source is a file but destination is a directory except IsADirectoryError: test.fail(f"ERROR: {originalPath} is a file but destination is a directory.") # If source is a directory but destination is a file except NotADirectoryError: test.fail(f"ERROR: {originalPath} is a directory but destination is a file.") # For permission related errors except PermissionError: test.fail("ERROR: Operation not permitted.") # For other errors except OSError as error: test.fail(f"ERROR: {error}") else: test.fail(f"ERROR : Renaming {originalPath} to {newPath} failed") else: test.fail(f"ERROR : {originalPath} does not exist") return False #default return if the renaming failed in any form def rename_old_name(path,filename): #TODO need to refactor to use the rename_file function above """ This method Rename the dummy name to original folder name . """ newFileName = filename oldFileName = filename + "_1" if os.path.exists(path+newFileName): # During running of AUT, the application re-creates the folder (eg: unittests/resources/settings) # need to delete before renaming shutil.rmtree(path+newFileName) if os.path.exists(path+oldFileName): os.rename(path+oldFileName, path+newFileName) test.log(str(oldFileName+" name changed to "+newFileName)) else: test.log(str(oldFileName+" failed to rename to "+newFileName)) def remove_files_in_folder(folder_path): if not os.path.exists(folder_path): return for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) os.remove(file_path) def vitals_interval_obj(interval): names.o_time_interval_obj["text"] = interval return names.o_time_interval_obj def end_treatment_states_rejection_msg(text): names.o_end_treatment_state_rejection_msg["text"] = text return names.o_end_treatment_state_rejection_msg def verify_parameter_from_post_treatment_log(msg_text): """ To obtain the details of parameter from post treatment log file. @param msg_text: parameter to be extracted. @returns message value and message unit """ try: log_location = str(get_extracted_file_from_post_treatment()) with open(log_location, 'r') as csv_file: try: for row in csv_file: reader = csv.reader(csv_file) for row in reader: row_length = sum(1 for values in row) for row1 in row: if row[0]!= None and row[0] == msg_text and row_length == 3: return (row[1],row[2]) else: pass except: test.fail("Treatment log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_extracted_file_from_post_treatment(): """ This function is the handler for getting file from log folder. This handler will go inside log folder and looks for newly added log based on current time. if it satisfied that condition, it will return the exact path of newly created log. @return latest_file - (string) returns latest file that append on log folder from sd-data """ try: list_of_files = glob.glob(config.POST_TREATMENT_LOG_FILTER) latest_file = max(list_of_files, key=os.path.getctime) return latest_file except: test.fail("log file is not created during application interaction") return False def rinseback_rejection_msg(text): names.o_rinseback_rejection_msg["text"] = text return names.o_rinseback_rejection_msg squish.testSettings.objectNotFoundDebugging = True def recirculate_rejection_msg(text): names.o_recirculate_rejection_msg["text"] = text return names.o_recirculate_rejection_msg def get_disinfect_bullet_object(id, screen_obj): """ To set a custom object based on item from parameters @param screen_obj (String): provides the custom container for the object @param id (String): provides the custom id for the object @returns a real name object """ names.o_disinfect_dynamic_StepBullet["container"] = screen_obj names.o_disinfect_dynamic_StepBullet["id"] = id return names.o_disinfect_dynamic_StepBullet def verify_page_step_indicator_for_disinfect(screen_obj, treatment_id, treatment_step, treatment_screens): """ Method to verify the Page Step indicators [the object on top of the screen which indicates the steps passed, current, remained] @param screen_obj (Dict) : Parent objject of page step indicator. @param treatment_id (List) : disinfection id's for disinfection types. @param treatment_step (Int) : indicates the Current treatment step. @param treatment_screens(List): Type of disinfection. """ test.startSection("verification of page step indicators") for page in range(len(treatment_screens)): bullet_children = object.children(squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj))) bullet_circle_color = bullet_children[0].color.name bullet_border_color = bullet_children[0].border.color.name step_title = squish.waitForObjectExists(get_disinfect_text_object(treatment_screens[page], screen_obj)) #To verify the step indicators of the completed treatment screens if page < treatment_step: test.verify(squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).complete) test.verify(not squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).current) test.compare(bullet_circle_color, config.COMPLETE_COLOR) test.compare(bullet_border_color,config.COMPLETE_COLOR) test.compare(step_title.color.name,config.ENABLED_COLOR) #To verify the step indicators of the current treatment screen elif page == treatment_step: test.verify(squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).current) test.verify(not squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).complete) test.compare(bullet_circle_color, config.CURRENT_COLOR) test.compare(bullet_border_color, config.COMPLETE_COLOR) test.compare(step_title.color.name, config.ENABLED_COLOR) test.verify(step_title.font.bold) #To verify the step indicators of the remaining treatment screens else: test.verify(not squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).current) test.verify(not squish.waitForObjectExists(get_disinfect_bullet_object(treatment_id[page], screen_obj)).complete) test.compare(step_title.color.name, config.INCOMPLETE_COLOR) test.compare(bullet_circle_color, config.CURRENT_COLOR) test.compare(bullet_border_color, config.INCOMPLETE_COLOR) test.endSection() def get_disinfect_text_object(text, screen_obj): """ To set a custom object based on item from parameters @param screen_obj (String): provides the custom container for the object @param text (String) : provides the custom text for the object @returns a real name object """ names.o_disinfect_dynamic_StepBullet_Text["container"] = screen_obj names.o_disinfect_dynamic_StepBullet_Text["text"] = text return names.o_disinfect_dynamic_StepBullet_Text def append_cloudsync_credentials_file(): """ This function is used for creating .pem files in the cloudsync folder. pem files is created on '/home/denali/Desktop/cloudsync/credentials' """ try: os.makedirs(config.CLOUD_CREDENTIALS_LOCATION, exist_ok = True) test.log("Directory created successfully") for file_handler in range(len(config.PEM_FILES)): path = os.path.join(config.CLOUD_CREDENTIALS_LOCATION, config.PEM_FILES[file_handler]) with open(path, 'w') as file_reader: pass except OSError: test.log("Directory can not be created") def get_cloud_sync_input_file(): """ This function is the handler for getting file from log folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_inp.log' @return latest_file - (string) returns latest input log file path from sd-data """ try: current_date = get_current_date_and_time_in_utc(date_format = "%Y_%m_%d") latest_file = config.INP_BUF_FILE_LOCATION+current_date+'_inp.buf' return latest_file except: return False def get_cloud_sync_output_file(): """ This function is the handler for getting file from log folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_out.log' @return latest_file - (string) returns latest output log file path from sd-data """ try: current_date = get_current_date_and_time_in_utc(date_format = "%Y_%m_%d") latest_file = config.INP_BUF_FILE_LOCATION+current_date+'_out.buf' return latest_file except: return False def get_current_date_and_time_in_utc(date_format='%Y/%m/%d - %H:%M:%S'): """ Method to get current date and time. @input date_format (str) - format of date to be retrieved. @return (str) - date in specified format. """ date = datetime.now(timezone.utc) return str(date.strftime(date_format)) def get_extracted_input_buf_file(): """ This function is the handler for getting file from cloudsync folder. This handler will go inside cloudsync folder and looks for newly added input buf file based on current time. if it satisfied that condition, it will return the exact path of newly created input buf file. Application input buf file is automatically created in '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_inp.buf' @return latest_file - (string) returns latest file that append in cloudsync folder """ try: current_date = get_current_date_and_time_in_utc(date_format = "%Y_%m_%d") latest_file = config.INP_BUF_FILE_LOCATION + current_date + '_inp.buf' return latest_file except: return "" def retrive_log_data(readline_count = 1): """ This function is the handler for getting file from cloudsync folder. Application log data is automatically appended on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_inp.buf' @param: (int) readline_count @return: (list) cloudsync_data returns latest file that append in cloudsync folder """ cloudsync_data = [] count = 0 file_name = get_extracted_input_buf_file() if len(file_name) == 0: #empty filename, fail test.fail("Extracted input buffer filename empty, error") return [] try: with open(file_name,mode = 'r') as filereader: contents = csv.reader(filereader) try: for reader in reversed(list(contents)): if readline_count == count: return cloudsync_data cloudsync_data.append(reader) count = count + 1 except: test.fail("application log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_epoch_value_consistancy(current_epoch_value, expected_epoch_value): """ This function is verify consistancy of epoch value. @input current_epoch_value (float) - current epoch time @input expected_epoch_value (str) - epoch time from the cloud-sync log. @return (bool) - True/False """ expected_epoch_value = int(expected_epoch_value) maximum_epoch_value = int(current_epoch_value + 25) minimum_epoch_value = int(current_epoch_value - 25) if expected_epoch_value > minimum_epoch_value and expected_epoch_value < maximum_epoch_value : return True return False def keyboard_input(key_value): names.o_keyboard_object["text"] = key_value return names.o_keyboard_object def enter_keyboard_numeric_value(entry): """ Method to enter user desired value using keypad @param entry: (str) User expected value """ test.startSection("Entering {}".format(entry)) for value in entry: if value.isalpha(): value = pyStr(value) else: value = pyInt(value) key_val = squish.waitForObject(keyboard_input(value)) squish.mouseClick(key_val) utils.waitForGUI(0.1) test.endSection() def get_extracted_alarm_mapping_file(): """ This function is the handler for getting error file from service folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/service/ {current_date}_denaliSquish.err ' @return latest_file - (string) returns latest file that append on log folder from service """ try: latest_file = config.ALARM_MAPPING_CPP_FILE_LOCATION return latest_file except: return False def scroll_to_value_on_pop_up(value=None, container=None): """ scroll to the to the value if object is hidden @param value - (obj) value object @param container - (obj) Container of the value @return boolean true and false """ counter = 0 while counter <= 100: try: counter += 1 squish.findObject(value) squish.snooze(0.5) if check_if_object_is_within_the_container(obj=value, container=container): return True else: raise RuntimeError except RuntimeError: ScreenObj = squish.waitForObject(container) screenHeight = pyInt(ScreenObj.height) screenWidth = pyInt(ScreenObj.width) squish.mouseWheel(ScreenObj, screenWidth//2, screenHeight//2, 0, -50, squish.Qt.NoModifier) raise LookupError("value object is not in view to the user after trying 100 times") def get_extracted_error_file(): """ This function is the handler for getting error file from service folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/service/ {current_date}_denaliSquish.err ' @return latest_file - (string) returns latest file that append on log folder from service """ try: current_date = get_current_date_and_time(date_format = "%Y_%m_%d") latest_file = config.ERROR_FILE_LOCATION + current_date + '_denaliSquish.err' return latest_file except: return False raise LookupError("value object is not in view to the user after trying 100 times") def extract_alarm_info_from_alarms_conf_file() -> dict: """ This method is used to extract data from Alarms.conf file. @param (int) alarm_id: ID for specified alarm message. @return (dict) alarm info dict """ utils.waitForGUI(0.2) flag = False title_found_status = False list_title_found_status = False alarm_info_dict = {} with open(config.ALARM_MAPPING_CSV_FILE_LOCATION, 'r') as csv_file: csv_messages = list(csv.reader(csv_file)) with open(config.ALARMS_CONF_LOCATION, 'r') as conf_file: alarm_id = 1 for row in list(csv.reader(conf_file)): print(f'ROW : {row}') try: # IF BOTH THE TITLE AND LISTTITLE ATTRIB ARE PARSED, GET THE MESSAGE if flag is True and list_title_found_status is True: alarm_message = ",".join(row).split('= ') temp_message = " ".join(alarm_message[1::]) if '\\n' in temp_message: temp_message = '\n'.join(temp_message.split('\\n')) print(f'TEMP title : {temp_message}') alarm_info_dict[alarm_id]['message'] = temp_message flag = False list_title_found_status = False title_found_status = False if alarm_info_dict[alarm_id]['message'] == '' and alarm_id != config.NUM_OF_ALARM_ID: alarm_info_dict[alarm_id]['message'] = f'{csv_messages[alarm_id][-1].strip()[::]}' alarm_id += 1 # GET THE TITLE ATTRIBUTE if flag is True and title_found_status is False: alarm_title = ",".join(row).split('= ') if alarm_title[0].split()[0] == 'Title': title_found_status = True temp_alarm_title = " ".join(alarm_title[1::]) print(f'TEMP title : {temp_alarm_title}') if '\\n' in temp_alarm_title: print(f'HERE title {alarm_id}') temp_alarm_title = '\n'.join(temp_alarm_title.split('\\n')) print(f'HERE title {alarm_id}***{temp_alarm_title}') alarm_info_dict[alarm_id]['title'] = temp_alarm_title if alarm_info_dict[alarm_id]['title'] == '': alarm_info_dict[alarm_id]['title'] = 'Alarm' # GET THE LISTTITLE ATTRIBUTE if flag is True and title_found_status is True: alarm_title = ",".join(row).split('= ') if alarm_title[0].split()[0] == 'ListTitle': list_title_found_status = True temp_alarm_title = " ".join(alarm_title[1::]) print(f'TEMP listTitle: {temp_alarm_title}') if '\\n' in temp_alarm_title: print(f'HERE listTitle {alarm_id}') temp_alarm_title = '\n'.join(temp_alarm_title.split('\\n')) print(f'HERE listTitle {alarm_id}***{temp_alarm_title}') alarm_info_dict[alarm_id]['listTitle'] = temp_alarm_title if alarm_info_dict[alarm_id]['listTitle'] == '': alarm_info_dict[alarm_id]['listTitle'] = 'Alarm' if len(row) == 0: continue else: if row[0] == '['+str(alarm_id)+']': alarm_info_dict[alarm_id] = {} flag = True except IndexError: test.fail("alarm message is not mapped with alarm id {} from Alarm.conf file".format(alarm_id)) return alarm_info_dict