########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file utils.py # # @author (last) Joseph varghese # @date (last) 15-Jan-2022 # ############################################################################ import object import names import sys import csv import os import squish import time import test from builtins import str as pyStr from dialin.ui.hd_simulator import HDSimulator from builtins import int as pyInt from configuration import config from dialin.ui import utils from datetime import datetime from dialin.common.msg_ids import MsgIds from dialin.ui import utils hd_simulator = HDSimulator() LOG_LOCATION = "/home/denali/Desktop/sd-card/log/*.log" def start_application(app_name): """ Function to start application and verify application status [running] If application does not start or running status is false, test stops Argument: @param app_name : (str) - Name of the application @param app_executable : (str) - Actual application @return: handle for the application if the application is in running state, or error (exist the application) """ counter = 0 while True: try: counter += 1 test.log("Starting {}".format(app_name)) squish.startApplication(app_name) if counter == 1: test.log("Application launched at the "+str(counter)+" st try.") elif counter == 2: test.log("Application launched at the "+str(counter)+" nd try.") elif counter == 3: test.log("Application launched at the "+str(counter)+" rd try.") else: test.log("Application launched at the "+str(counter)+" th try.") break except RuntimeError: if counter == 1: test.log("Application failed to launch after "+str(counter)+" try - Please refer logs") elif counter == 20: test.log("Exiting after "+str(counter)+ " tries..") sys.exit(1) else: test.log("Application failed to launch after "+str(counter)+ " tries - Please refer logs") except: logErrorDetails("Failed to start the application") sys.exit(1) def check_if_object_is_within_the_container(obj=None, container=None): """ check if an object is inside a container @param obj - child UI object @param container - container UI object @return boolean true/false """ container = squish.findObject(container) containerPos = container.mapToGlobal(squish.QPoint(0, 0)) container_x, container_y = pyInt(containerPos.x), pyInt(containerPos.y) container_width, container_height = pyInt(container.width), pyInt(container.height) obj = squish.findObject(obj) objPos = obj.mapToGlobal(squish.QPoint(0, 0)) obj_x, obj_y = pyInt(objPos.x), pyInt(objPos.y) obj_width, obj_height = pyInt(obj.width), pyInt(obj.height) if obj_x >= container_x and obj_y >= container_y: if (obj_x + obj_width) <= (container_x + container_width) and (obj_y + obj_height) <= (container_y + container_height): return True return False def scroll_to_zone(zone=None, screen_object=None): """ scroll to the numeric if object is hidden @param zone - UI object @param screen_object - UI object (UI Home screen = waveforms + numerics) @return boolean true/false """ counter = 0 while counter <= 100: try: counter += 1 squish.findObject(zone) squish.snooze(0.5) if check_if_object_is_within_the_container(obj=zone, container=screen_object): return True else: raise RuntimeError except RuntimeError: ScreenObj = squish.waitForObject(screen_object) screenHeight = pyInt(ScreenObj.height) screenWidth = pyInt(ScreenObj.width) squish.mouseWheel(ScreenObj, screenWidth-1000, screenHeight-10, 0, -50, squish.Qt.NoModifier) raise LookupError("zone object is not in view to the user after " + \ "trying 100 times") def navigate_to_pretreatment_screen(mode): """ Method to navigate to sub mode under pre-treatment screen @param mode - (int) pre treatment state """ hd_simulator.cmd_set_hd_operation_mode_data(5,0) hd_simulator.cmd_send_pre_treatment_state_data(sub_mode=mode, water_sample_state=0,consumables_self_test_state=0, no_cartridge_self_test_state=0,installation_state=0, dry_self_test_state=0, prime_state=0,recirculate_state=0, patient_connection_state=0) def get_current_date_and_time(date_format='%Y/%b/%d - %H:%M:%S'): date = datetime.now() return str(date.strftime(date_format)) def is_float(num): """ This function checks the value is adaptable for float conversion. @param num - (string) input value for conversion. @return True/False- (bool) returns True if the value can type casted into float, else False """ try: if '.' in num: float(num) return True except ValueError: return False def is_intiger(num): """ This function checks the value is adaptable for integer conversion. @param num - (string) (string) input value for conversion. @return True/False- (bool) returns True if the value can type casted into int, else False """ try: if num.isdigit(): return True except ValueError: return False def get_extracted_file(): """ This function is the handler for getting file from log folder. This handler will go inside log folder and looks for newly added log based on current time. if it satisfied that condition, it will return the exact path of newly created log. @return latest_file - (string) returns latest file that append on log folder from sd-data """ try: list_of_files = glob.glob(LOG_LOCATION) latest_file = max(list_of_files, key=os.path.getctime) return latest_file except: test.fail("log file is not created during application interaction") return False # def get_message_from_log(file_name, message_text): # # """ # This method intended to extract the message from application log. # # The handler find a match between time displayed on log and occurrences time of event to # extract the details. For row[index], index represent column to be extracted. # # @param file_name - (string) path of the latest log file created. # @param message_text - (string) message text to be extracted. # @return message - (string) binary message for the event. # """ # message = "" # Message binaries appending with for easy identification # try: # with open(file_name, 'r') as csv_file: # try: # for row in reversed(list(csv.reader(csv_file))): # if row[0].isalpha(): # pass # else: # row_length = sum(1 for values in row) # if row_length >= 4: # if row[3]!= None and row[3] == message_text: # message_length = sum(1 for values in row) # for column in range((message_length-1), 3, -1): # message = row[column] + message # return message # else: # pass # except: # test.fail("application log data is corrupted") # except: # test.fail("application log is protected") def get_message_from_log(file_name, message_text): """ This method intended to extract the message from application log. For row[index], index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param message_text - (string) message text to be extracted. @return message - (string) binary message for the event. """ message = [] try: with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length >= 4: if row[3]!= None and row[3] == message_text: message_length = sum(1 for values in row) for column in range(4, (message_length),1): #for column in range((message_length-1), 3, -1): message.append(row[column]) for value in range(len(message)): float_status = is_float(message[value]) int_status = is_intiger(message[value]) if float_status is True: message[value] = float(message[value]) if int_status is True: message[value] = int(message[value]) return message else: pass except: test.fail("application log data is corrupted") except: test.fail("application log is protected") # def get_ack_request_details(file_name, message_text): # """ # This method intended to extract the ack request from application log. # # The handler find a match between time displayed on log and occurrences time of event # along with message id to extract the request acknowledgement. # For row[index], were index represent column to be extracted. # # @param file_name - (string) path of the latest log file created. # @return row[3] - (string) acknowledgement request status. # @return row[4] - (string) Negative requested acknowledgement value (Sq) . # """ # try: # message_id = " ID" # with open(file_name, 'r') as csv_file: # try: # for row in reversed(list(csv.reader(csv_file))): # if row[0].isalpha(): # pass # else: # row_length = sum(1 for values in row) # if row_length == 6 and row[3] != message_text: # if row[5].split(':')[0] == message_id: # extracted_message_id = pyInt(row[5].split(':')[1], 16) # formatted_message_id = format(extracted_message_id, '#0x') # return row[3], row[4], formatted_message_id.replace("00", "") # else: # pass # except: # test.fail("application log data is corrupted - unable to fetch data") # except: # test.fail("application log is protected") def get_ack_request_details(file_name, message_text): """ This method intended to extract acknowledgement request status, negative requested acknowledgement and message id from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param message_text - (string) message text to be extracted from log. @return row[3] - (string) acknowledgement request status. @return row[4] - (string) Negative requested acknowledgement value (Sq). @return message_id - (string) formatted message id from log. """ try: message_id = " ID" with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length == 6 and row[3] != message_text: if row[5].split(':')[0] == message_id: extracted_message_id = pyInt(row[5].split(':')[1], 16) formatted_message_id = format(extracted_message_id, '#0x') return row[3], row[4], formatted_message_id.replace("00", "") else: pass except: test.fail("application log data is corrupted - unable to fetch data") except: test.fail("application log is protected") # def get_bak_request_details(file_name, ack_bak_value): # """ # This method intended to extract the ack from application log. # # The handler find a match between time displayed on log and occurrences time of event # along with message id to extract the request acknowledgement. # For row[index], were index represent column to be extracted. # # @param file_name - (string) path of the latest log file created. # @param message_id - (string) message text to be extracted. # @param ack_bak_value - (string) Positive back acknowledgement value (Sq). # @return row[3] - (string) acknowledgement back status. # """ # try: # with open(file_name, 'r') as csv_file: # try: # for row in reversed(list(csv.reader(csv_file))): # if row[0].isalpha(): # pass # else: # row_length = sum(1 for values in row) # log_time = row[0].split(':') # if row_length >= 5: # if row[4] == ack_bak_value: # return row[3] # else: # pass # else: # pass # except: # test.fail("application log data is corrupted") # except: # test.fail("application log is protected") def get_bak_request_details(file_name, ack_bak_value): """ This method intended to extract the acknowledgement back status from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param ack_bak_value - (string) Positive back acknowledgement value (Sq). @return row[3] - (string) acknowledgement back status. """ try: with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length >= 5: if row[4] == ack_bak_value: return row[3] else: pass else: pass except: test.fail("application log data is corrupted") except: test.fail("application log is protected") # def get_current_log_details(message_ack = None, message_text = None): # """ # This function is capable to perform data analysis from application log folder. # # logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". # # In row[index], index represent column to be extracted. # @param message_ack - (string) 'Y' - if ack is satisfied in log / 'N' - if ack condition is not satisfied # @param message_text - (string) message text to be extracted from log. # @param message_id - (string) message id to be extracted from log. # @return content_record - (list) list contains extracted data (ack_req, ack_bak, message). # """ # content_record = [] # file_name = get_extracted_file() # if message_text != None: # message = get_message_from_log(file_name, message_text) # content_record.append(message) # if message_ack != None: # ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) # ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack # content_record.append(ack_req_status) # content_record.append(message_id.lower()) # if message_ack != None and ack_bak_value != None: # ack_bak_status = get_bak_request_details(file_name, ack_bak_value) # content_record.append(ack_bak_status) # return content_record def get_current_log_details(message_ack = None, message_text = None): """ This function is capable to perform data analysis from application log folder. logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". In row[index], index represent column to be extracted. @param message_ack - (string) 'Y' - if ack is satisfied in log / 'N' - if ack condition is not satisfied @param message_text - (string) message text to be extracted from log. @return content_record - (list) list contains extracted data (ack_req, ack_bak, message, message_id). """ content_record = [] file_name = get_extracted_file() if message_text != None: message = get_message_from_log(file_name, message_text) content_record.append(message) if message_ack != None: ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack content_record.append(ack_req_status) content_record.append(message_id.lower()) if message_ack != None and ack_bak_value != None: ack_bak_status = get_bak_request_details(file_name, ack_bak_value) content_record.append(ack_bak_status) return content_record def set_arterial_ranges_min_val(art_low): """ Method to set the Arterial range maximum value to user expected value @param art_low - (int) user expected value """ test.startSection("Set Arterial range minimum value to {}".format(art_low)) arterial_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_min = pyInt(arterial_min.minimum) arterial_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_max = pyInt(arterial_max.maximum) low_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery)) low_handler_children = object.children(low_handler_parent) low_handler = low_handler_children[-2] width = pyInt(low_handler.width) - 8 height = pyInt(low_handler.height)- 10 if arterial_min == art_low: test.passes("Arterial range minimum is already set to {}".format(art_low)) elif arterial_min < art_low: while arterial_min != art_low: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_min += 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) break else: continue elif arterial_min > art_low: while arterial_min != art_low: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_min -= 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) break else: continue # arterial blood pressure low limit should be lower than the high limit by atleast 30mmHg if arterial_min == arterial_max - config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(low_handler_parent.minValue, arterial_min, "Arterial range minimum value cannot be moved beyond {}".format(arterial_min)) else: test.compare(arterial_min, art_low, "Actual Arterial range minimum value: {} is equal to Expected value: {}".format(arterial_min, art_low)) test.endSection() def set_arterial_ranges_max_val(art_high): """ Method to set the Arterial range maximum value to user expected value @param art_high - (int) user expected value """ test.startSection("Set Arterial range maximum value to {}".format(art_high)) arterial_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_max = pyInt(arterial_max.maximum) arterial_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery) arterial_min = pyInt(arterial_min.minimum) high_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Artery)) high_handler_children = object.children(high_handler_parent) high_handler = high_handler_children[-1] width = pyInt(high_handler.width) - 20 height = pyInt(high_handler.height) - 25 if arterial_max == art_high: test.passes("Arterial range maximum is already set to {}".format(art_high)) elif arterial_max < art_high: while arterial_max != art_high: squish.mouseDrag(high_handler, -1, height, width, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_max += 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) break else: continue elif arterial_max > art_high: while arterial_max != art_high: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) arterial_max -= 10 # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) break else: continue # arterial blood pressure low limit should be lower than the high limit by at least 30mmHg if arterial_max == arterial_min + config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(high_handler_parent.maxValue, arterial_max, "Arterial range maximum value cannot be moved beyond {}".format(arterial_max)) else: test.compare(arterial_max, art_high, "Actual Arterial range maximum value: {} is equal to Expected value: {}".format(arterial_max, art_high)) test.endSection() def set_venous_ranges_max_val(ven_high): """ Method to set the Venous range maximum value to user expected value @param ven_high - (int) user expected value """ test.startSection("Set Venous range maximum value to {}".format(ven_high)) ven_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_max = pyInt(ven_max.maximum) ven_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_min = pyInt(ven_min.minimum) high_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous)) high_handler_children = object.children(high_handler_parent) high_handler = high_handler_children[-1] width = pyInt(high_handler.width) - 15 height = pyInt(high_handler.height) - 10 if ven_max == ven_high: test.passes("Venous range maximum is already set to {}".format(ven_high)) elif ven_max < ven_high: while ven_max != ven_high: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_max += 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range maximum value cannot be moved beyond {}".format(ven_max)) break else: continue elif ven_max > ven_high: while ven_max != ven_high: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_max -= 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(high_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range maximum value cannot be moved beyond {}".format(ven_max)) break else: continue # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_max == ven_min + config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(high_handler_parent.maxValue, ven_max, "Venous range maximum value cannot be moved beyond {}".format(ven_max)) else: test.compare(ven_max, ven_high, "Actual Venous range maximum value: {} is equal to Expected value: {}".format(ven_max, ven_high)) test.endSection() #Methods for create custom treatment def set_venous_ranges_min_val(ven_low): """ Method to set the Venous range maximum value to user expected value @param ven_low - (int) user expected value """ test.startSection("set Venous range minimum value to {}".format(ven_low)) ven_min = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_min = pyInt(ven_min.minimum) ven_max = squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous) ven_max = pyInt(ven_max.maximum) low_handler_parent = object.parent(squish.waitForObjectExists(names.o_PreTreatmentCreate_rangeRect_RangeRect_Venous)) low_handler_children = object.children(low_handler_parent) low_handler = low_handler_children[-2] width = pyInt(low_handler.width) - 15 height = pyInt(low_handler.height) - 10 if ven_min == ven_low: test.passes("Venous range minimum is already set to {}".format(ven_low)) elif ven_min < ven_low: while ven_min != ven_low: squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_min += 10 if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: # venous blood pressure low limit should be lower than the high limit by at least 30mmHg squish.mouseDrag(low_handler, width, height, 1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range minimum value cannot be moved beyond {}".format(ven_min)) break else: continue elif ven_min > ven_low: while ven_min != ven_low: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) ven_min -= 10 # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: squish.mouseDrag(low_handler, width, height, -1, 0, squish.Qt.NoModifier, squish.Qt.LeftButton) test.log("Venous range minimum value cannot be moved beyond {}".format(ven_min)) break else: continue # venous blood pressure low limit should be lower than the high limit by at least 30mmHg if ven_min == ven_max - config.BUFFER_LOW_AND_HIGH_LIMITS: test.compare(low_handler_parent.minValue, ven_min, "Venous range minimum value cannot be moved beyond {}".format(ven_min)) else: test.compare(ven_min, ven_low, "Actual Venous range minimum value: {} is equal to Expected value: {}".format(ven_min, ven_low)) test.endSection() def scroll_to_value_on_pop_up(value=None, container=None): """ scroll to the to the value if object is hidden @param value - (obj) value object @param container - (obj) Container of the value @return boolean true and false """ counter = 0 while counter <= 100: try: counter += 1 squish.findObject(value) squish.snooze(0.5) if check_if_object_is_within_the_container(obj=value, container=container): return True else: raise RuntimeError except RuntimeError: ScreenObj = squish.waitForObject(container) screenHeight = pyInt(ScreenObj.height) screenWidth = pyInt(ScreenObj.width) squish.mouseWheel(ScreenObj, screenWidth//2, screenHeight//2, 0, -50, squish.Qt.NoModifier) raise LookupError("value object is not in view to the user after " + \ "trying 100 times") def expected_heparin_value(val): names.o_heparin_value["text"] = val return names.o_heparin_value def msg(string): """ Added ### at the right side of the string to make sure that it is a message. @param string: (str) the string to add trailing ### to @return pad_str: (str) padded string """ padded_str = "###"+string return padded_str