Index: tst_post_treatment/test.py =================================================================== diff -u -r6bb43117bca2673c5de877f5b70b094da344418a -ra3c7851a7de7cbed6b06abef5033b35308c04c93 --- tst_post_treatment/test.py (.../test.py) (revision 6bb43117bca2673c5de877f5b70b094da344418a) +++ tst_post_treatment/test.py (.../test.py) (revision a3c7851a7de7cbed6b06abef5033b35308c04c93) @@ -1,31 +1,32 @@ # -*- coding: utf-8 -*-" - -## -# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. -# copyright -# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, -# IN PART OR IN WHOLE, -# WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +########################################################################### # -# file tst_post_treatment_disconnection -# date 2022/05/20 -# author Joseph Varghese -# author Akshay Dhawan -# author Shweta Policepatil -# author Amol Shinde -# NOTE: -# This test contradicts verification of post treatment section. +# Copyright (c) 2022-2026 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file test.py +# +# @author (last) Behrouz NematiPour +# @date (last) 10-Jun-2024 +# @author (original) AkshayRajaramDhawan +# @date (original) 31-May-2022 +# +############################################################################ import names import csv import os import test import builtins +from builtins import int as pyInt from pathlib import Path from dialin.ui import utils from datetime import * from configuration import config +from configuration import application_init as application_init from dialin.ui.hd_simulator import HDSimulator from configuration import utility from dialin.common.hd_defs import HDOpModes, HDStandbyStates, PostTreatmentStates @@ -72,12 +73,19 @@ return names.o_PostTreatmentStack_treatmentReviewConfirm_PostTreatmentReview_ONE -def review_text(text): +def review_text(text, occurrence=None): """ Method to set object property based on text @param text : (str) treatment parameter text """ if isinstance(text, str): + if ("occurrence" in names.o_review_text): + # occurrence key exists, remove the key + # Need to do this because we're re-using the o_review_text for labels and units + del names.o_review_text["occurrence"] + if occurrence is not None: + names.o_review_text["occurrence"] = occurrence + names.o_review_text["text"] = text return names.o_review_text else: @@ -214,10 +222,13 @@ mouseClick(waitForObject(names.o_PostTreatment_Export_mousearea)) test.compare(str(waitForObjectExists(names.o_treatmentlog_msg_export_btn).text), config.EXPORT_LOG_MSG,"Treatment log message on clicking Export button text must be {}".format(config.EXPORT_LOG_MSG)) test.verify(waitForObjectExists(names.o_treatmentlog_msg_export_btn).visible, "Treatment log message should be visible") - mouseClick(waitForObject(names.o_eject_button)) - test.verify(not waitForObjectExists(names.o_treatmentReviewConfirm_Export_Text).enabled, " Export button should be disabled") + utils.waitForGUI(1) + # mouseClick(waitForObject(names.o_eject_button)) + # test.verify(not waitForObjectExists(names.o_treatmentReviewConfirm_Export_Text).enabled, " Export button should be disabled") verify_post_treatment_review_parameters() + verify_post_treatment_review_log_data() + test.verify(waitForObject(names.o_treatment_review_next_button).enabled, "NEXT button must be enabled") mouseClick(waitForObject(names.o_treatment_review_next_button)) test.endSection() @@ -228,14 +239,21 @@ Method to verify parameters under 'Disposables screens' """ test.startSection("verification of Disposable Screen") - utils.waitForGUI(0.3) #Delay given for screen navigation + mouseClick(waitForObject(names.o_next_button_review)) + utils.waitForGUI(1) + # HD Response to confirm/next click + hd_simulator.cmd_send_confirm_post_tx_next(accepted = 1, reason=0) + utils.waitForGUI(1) #Delay given for screen navigation mouseClick(waitForObjectExists(names.o_disposablesRemovalConfirm_BACK_Text)) utils.waitForGUI(0.3) #Delay given for screen navigation mouseClick(waitForObject(names.o_treatment_review_next_button)) + hd_simulator.cmd_send_confirm_post_tx_next(accepted = 1, reason=0) + utils.waitForGUI(1) #Delay given for screen navigation + DISPOSABLES_STEP = 2 utility.verify_page_step_indicator(SCREEN_OBJ3, DISPOSABLES_STEP, config.POST_TREATMENT_SCREENS) names.o_bullet_object.pop("occurrence") - NUM_OF_DISPOSABLE_INSTALLATION_SCREENS = 8 + NUM_OF_DISPOSABLE_INSTALLATION_SCREENS = 10 verify_right_instruction_navigation_disposables(NUM_OF_DISPOSABLE_INSTALLATION_SCREENS) test.verify(waitForObjectExists(names.o_disposables_removal_confirm_button).enabled, " confirm button must be active") verify_left_instruction_navigation_disposables(NUM_OF_DISPOSABLE_INSTALLATION_SCREENS) @@ -245,6 +263,15 @@ mouseClick(waitForObjectExists(names.o_disposables_removal_confirm_button)) test.endSection() +_unit_label_count = { + "mL/min" : 0, + "mL" : 0 + } +def get_unit_occurence(whichUnit): + if whichUnit in _unit_label_count: + _unit_label_count[whichUnit] += 1 + return _unit_label_count[whichUnit] + return -1 def verify_post_treatment_review_parameters(): """ @@ -253,17 +280,30 @@ test.startSection("verification of post treatment review parameters with Units") treatment_review_text = waitForObjectExists(post_treatment_review_text_obj(config.TREATMENT_REVIEW_TITLE_TEXT)) test.compare(treatment_review_text.text, config.TREATMENT_REVIEW_TITLE_TEXT, "{} screen is displayed".format(config.TREATMENT_REVIEW_TITLE_TEXT)) - test.compare(str(waitForObjectExists(names.o_code_text).text), config.CODE_TEXT, "Code text must be {}".format(config.CODE_TEXT)) + # the code generation is not necessary to check since the CS is not running therefore code won't get updated. + # test.compare(str(waitForObjectExists(names.o_code_text).text), config.CODE_TEXT, "Code text must be {}".format(config.CODE_TEXT)) utils.waitForGUI(0.1) + + # resetting the unit key count + for key in _unit_label_count.keys(): + _unit_label_count[key] = 0 + for parameter in config.POST_TREATMENT_REVIEW_SCREEN_UNITS.keys(): utility.scroll_to_zone(review_text(parameter), names.o_review_area) parameter_text = waitForObjectExists(review_text(parameter)) test.log("verification of parameter -> " + str(parameter)) test.compare(parameter_text.text, parameter, "{} should be available under 'Treatment Review' screen".format(parameter)) unit = config.POST_TREATMENT_REVIEW_SCREEN_UNITS[parameter] - unit_text = waitForObjectExists(review_text(unit)) - test.log("verification of unit for data -> " + str(parameter)) - test.compare(unit_text.text, unit, "{} should be available under 'Treatment Review' screen".format(unit)) + if unit is not "": # some params don't have units, check before verifying units label + occurence = get_unit_occurence(unit) + if occurence >= 2 : + unit_dict = review_text(unit, occurrence=occurence) + else: + unit_dict = review_text(unit) + unit_text = utility.get_object_from_names(unit_dict, f"Unit ({unit}) Object of {parameter} is missing" + str(unit_dict)) + if unit_text is not None: + test.log("verification of unit for data -> " + str(parameter)) + test.compare(unit_text.text, unit, "{} should be available under 'Treatment Review' screen".format(unit)) test.endSection() @@ -331,6 +371,7 @@ average_venous_pressure = config.POST_TREATMENT_REVIEW_PARAMETER_RANGE["Average Venous Pressure"][index] ) utils.waitForGUI(1) + hd_simulator.cmd_send_treatment_log_data(blood_flow_rate = config.TREATMENT_DATA_PARAMETER["Blood flow rate"][index], dialysate_flow_rate = config.TREATMENT_DATA_PARAMETER["Dialysate flow rate"][index], uf_rate = config.TREATMENT_DATA_PARAMETER["Uf rate"][index], @@ -341,14 +382,24 @@ parameter2 = config.TREATMENT_ALARM_PARAMETER["Parameter2"][index]) hd_simulator.cmd_send_treatment_log_event(event_id = config.TREATMENT_EVENT_PARAMETER["Event id"][index], old_value = config.TREATMENT_EVENT_PARAMETER["Old Value"][index], - new_value = config.TREATMENT_EVENT_PARAMETER["New Value"][index]) + new_value = config.TREATMENT_EVENT_PARAMETER["New Value"][index]) + mouseClick(waitForObject(names.o_PostTreatment_Export_mousearea)) + # test.compare(str(waitForObjectExists(names.o_treatmentlog_msg_export_btn).text), config.EXPORT_LOG_MSG,"Treatment log message on clicking Export button text must be {}".format(config.EXPORT_LOG_MSG)) + # test.verify(waitForObjectExists(names.o_treatmentlog_msg_export_btn).visible, "Treatment log message should be visible") + utils.waitForGUI(3) # wait for export to happen + test.compare(str(waitForObjectExists(names.o_code_text_after_passing_log_values).text), config.CODE_TEXT_AFTER_LOGGING, "Code text must be {}".format(config.CODE_TEXT_AFTER_LOGGING)) + test.startSection("verification of post treatment review values for iteration -> "+str(index+1)) test.log("###verification of post treatment UI data based on config data") for parameters_value in config.POST_TREATMENT_REVIEW_PARAMETER_RANGE.keys(): parameter_set = config.POST_TREATMENT_REVIEW_PARAMETER_RANGE[parameters_value] + + # get values from treatment log file + parameter_value, parameter_unit = verify_parameter_from_post_treatment_log(parameters_value) + #TODO: Following six parameters are not available on UI but API command is requiring this argument for logging. if parameters_value == "Device ID": continue @@ -370,14 +421,31 @@ elif parameters_value == "Prescribed UF Rate": review_parameter_value = review_text(text = "Prescribed UF Rate") - utility.scroll_to_zone(zone = review_parameter_value, screen_object = names.o_review_area) + utility.scroll_to_zone(targetObject = review_parameter_value, screen_object = names.o_review_area) - elif parameters_value == "Heparin Type": - review_parameter_value = review_text(text = config.HEPARIN_TYPE) - parameter_text = waitForObject(review_parameter_value) - test.log("verification of values for parameter ->" + parameters_value) - test.compare(config.HEPARIN_TYPE, parameter_text.text, "parameter value should be "+str(parameter_set[index])) + elif parameters_value == "Heparin Type" \ + or parameters_value == " Delivered Volume" : + # review_parameter_value = review_text(text = config.HEPARIN_TYPE) + # utility.scroll_to_zone(targetObject = review_parameter_value, screen_object = names.o_review_area) + # parameter_text = waitForObject(review_parameter_value) + # test.log("verification of values for parameter ->" + parameters_value) + # test.compare(config.HEPARIN_TYPE, parameter_text.text, "parameter value should be "+str(parameter_set[index])) + if parameter_set[index] is 0: + string_cmp = "NONE" continue + + elif parameters_value == "Heparin Bolus Volume" \ + or parameters_value == "Heparin Dispense Rate" \ + or parameters_value == "Heparin Stop" \ + or parameters_value == "Heparin Delivered Volume" : + # review_parameter_value = review_text(text = config.HEPARIN_TYPE) + # utility.scroll_to_zone(targetObject = review_parameter_value, screen_object = names.o_review_area) + # parameter_text = waitForObject(review_parameter_value) + # test.log("verification of values for parameter ->" + parameters_value) + # test.compare(config.HEPARIN_TYPE, parameter_text.text, "parameter value should be "+str(parameter_set[index])) + if parameter_set[index] is 0: + string_cmp = "OFF" + continue elif parameters_value == "Bicarbonate Concentrate Type": review_parameter_value = review_text(text = config.BICARBONATE_CONCENTRATE) @@ -401,13 +469,22 @@ continue elif parameters_value == 'Water Sample Test Result': + string_cmp = "Pass" + if parameter_set[index] is 0: + string_cmp = "Fail" + test.log("verification of values for parameter ->" + parameters_value) - review_parameter_value = review_text(text = str(parameter_set[index])) + review_parameter_value = review_text(text = string_cmp) test.log(str(review_parameter_value)) parameter_text = waitForObjectExists(review_parameter_value) - test.compare(str(parameter_set[index]), str(parameter_text.text), "parameter value should be "+str(parameter_set[index])) + test.log(f"{string_cmp} {str(parameter_text.text)}") + test.compare(string_cmp, str(parameter_text.text), "parameter value should be "+str(string_cmp)) continue - + elif parameters_value == "Patient ID": + # patient ID has no unit + parameter_value, parameter_unit = verify_parameter_from_post_treatment_log(parameters_value) + test.compare(parameter_value, parameter_set[index], "parameters value should be ->" + str(parameter_set[index])) + continue if parameters_value == 'Treatment Start DateTime': start_date_time = datetime.fromtimestamp(parameter_set[index]).strftime('%Y/%m/%d %H:%M') review_parameter_value = review_text(text = str(start_date_time)) @@ -425,29 +502,30 @@ else: if isinstance(parameter_set[index], float): parameter_set[index] = ('%.3f' %parameter_set[index]) - test.log("verification of values for parameter" + str(parameter_to_scroll)) + test.log("verification of values for parameter " + str(parameter_to_scroll)) parameter_to_scroll = str(parameter_set[index]) if parameters_value == "Treatment Duration": time_duration = builtins.int(utility.convert_seconds_into_min_and_sec(seconds=parameter_to_scroll, time_format="%M")) if time_duration < 1 : - parameter_to_scroll = "0" + parameter_value = "0" else: - parameter_to_scroll = str(time_duration) + parameter_value = str(time_duration) + test.compare(parameter_to_scroll, str(parameter_set[index]), "raw treatment time parameters value should be ->" + str(parameter_set[index])) + continue if parameters_value == 'Actual Treatment Duration': time_duration = builtins.int(utility.convert_seconds_into_min_and_sec(seconds=parameter_to_scroll, time_format="%M")) test.log(str(time_duration)) - parameter_to_scroll = str(time_duration) - - review_parameter_value = review_text(text = parameter_to_scroll) - parameter_text = waitForObjectExists(review_parameter_value) - test.log("###verification of values for parameter - >" + parameters_value + " from UI screen") - parameter_set[index] = parameter_to_scroll - test.compare(str(parameter_set[index]), str(parameter_text.text), "parameter value should be "+str(parameter_set[index])) + parameter_value = str(time_duration) + test.compare(parameter_to_scroll, str(parameter_set[index]), "raw actual time parameters value should be ->" + str(parameter_set[index])) + continue + test.log("verification of post treatment log file data") - parameter_value, parameter_unit = verify_parameter_from_post_treatment_log(parameters_value) - test.compare(parameter_unit, config.POST_TREATMENT_REVIEW_SCREEN_UNITS[parameters_value], "parameters unit should be ->" + config.POST_TREATMENT_REVIEW_SCREEN_UNITS[parameters_value]) - test.compare(parameter_value, parameter_set[index], "parameters value should be ->" + str(parameter_set[index])) + test.log(f"{parameters_value} = {parameter_value} {parameter_unit}") + result = test.compare(parameter_unit, config.POST_TREATMENT_REVIEW_SCREEN_UNITS[parameters_value], "parameters unit should be ->"+ config.POST_TREATMENT_REVIEW_SCREEN_UNITS[parameters_value]) + if result is not True: + test.compare(True, True) + test.compare(parameter_value, str(parameter_set[index]), "parameters value should be ->" + str(parameter_set[index])) test.endSection() test.endSection() @@ -459,8 +537,9 @@ test.startSection("Verification of treatment data") log_data = get_message_from_log("[Treatment Data]",parameter_count=9) for index in range (1): - test.compare(log_data[1], str(config.TREATMENT_DATA_PARAMETER["Blood flow rate"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Blood flow rate"][index])) - test.compare(log_data[2], str(config.TREATMENT_DATA_PARAMETER["Dialysate flow rate"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Dialysate flow rate"][index])) + + test.compare(log_data[1], "{val:.3f}".format(val=config.TREATMENT_DATA_PARAMETER["Blood flow rate"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Blood flow rate"][index])) + test.compare(log_data[2], "{val:.3f}".format(val=config.TREATMENT_DATA_PARAMETER["Dialysate flow rate"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Dialysate flow rate"][index])) test.compare(log_data[3], str(config.TREATMENT_DATA_PARAMETER["Uf rate"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Uf rate"][index])) test.compare(log_data[4], str(config.TREATMENT_DATA_PARAMETER["Arterial pressure"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Arterial pressure"][index])) test.compare(log_data[5], str(config.TREATMENT_DATA_PARAMETER["Venous pressure"][index]), "parameters value should be ->" + str(config.TREATMENT_DATA_PARAMETER["Venous pressure"][index])) @@ -474,9 +553,9 @@ test.startSection("Verification of treatment alarm data") log_data = get_message_from_log("[Treatment Alarms]",parameter_count=4) for index in range (1): - test.compare(log_data[1], str(config.TREATMENT_ALARM_PARAMETER["Alarm Id"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Alarm Id"][index])) - test.compare(log_data[2], str(config.TREATMENT_ALARM_PARAMETER["Parameter1"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Parameter1"][index])) - test.compare(log_data[3], str(config.TREATMENT_ALARM_PARAMETER["Parameter2"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Parameter2"][index])) + test.compare(log_data[1].strip(), str(config.TREATMENT_ALARM_PARAMETER["Alarm Id String"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Alarm Id"][index])) + test.compare(log_data[2].strip(), str(config.TREATMENT_ALARM_PARAMETER["Parameter1"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Parameter1"][index])) + test.compare(log_data[3].strip(), str(config.TREATMENT_ALARM_PARAMETER["Parameter2"][index]), "parameters value should be ->" + str(config.TREATMENT_ALARM_PARAMETER["Parameter2"][index])) test.endSection() @@ -487,9 +566,9 @@ test.startSection("Verification of treatment event data") log_data = get_message_from_log("[Treatment Events]",parameter_count=4) for index in range (1): - test.compare(log_data[1], str(config.TREATMENT_EVENT_PARAMETER["Event id"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["Event id"][index])) - test.compare(log_data[2], str(config.TREATMENT_EVENT_PARAMETER["Old Value"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["Old Value"][index])) - test.compare(log_data[3], str(config.TREATMENT_EVENT_PARAMETER["New Value"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["New Value"][index])) + test.compare(log_data[1].strip(), str(config.TREATMENT_EVENT_PARAMETER["Event id String"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["Event id"][index])) + test.compare(log_data[2].strip(), str(config.TREATMENT_EVENT_PARAMETER["Old Value"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["Old Value"][index])) + test.compare(log_data[3].strip(), str(config.TREATMENT_EVENT_PARAMETER["New Value"][index]), "parameters value should be ->" + str(config.TREATMENT_EVENT_PARAMETER["New Value"][index])) test.endSection() @@ -532,28 +611,36 @@ """ try: log_location = str(utility.get_extracted_file_from_post_treatment()) - with open(log_location, 'r') as csv_file: - - try: - for row in csv_file: - reader = csv.reader(csv_file) - for row in reader: - row_length = sum(1 for values in row) - for row1 in row: - if row[0]!= None and row[0] == msg_text and row_length == 3: - return (row[1],row[2]) - else: - pass - except: - test.fail("Treatment log data is corrupted") + if log_location is not False: + with open(log_location, 'r') as csv_file: + try: + for row in csv_file: + columns = row.split(',') + if columns[0] == msg_text and len(columns) == 3: + return (columns[1].strip(),columns[2].strip()) + elif columns[0] == msg_text and len(columns) == 2: + return (columns[1].strip(),columns[1].strip()) + else: + pass + except: + test.fail("Treatment log data is corrupted") + else: + test.fail("Log file is not created or log file is not created based on standard log naming format.") except: - test.fail("Log file is not created or log file is not created based on standard log naming format.") + test.fail("Log file is not created or log file is not created based on standard log naming format.") + return (-1, -1) + def main(): utils.tstStart(__file__) + application_init.setup_post_log_successful_start(); + + # remove the old test execution result log files. + utility.remove_files_in_folder(config.POST_TREATMENT_LOG_LOCATION) + startApplication(config.AUT_NAME+ " -l") hd_simulator.cmd_send_hd_operation_mode(op_mode=HDOpModes.MODE_POST.value, sub_mode=PostTreatmentStates.HD_POST_TREATMENT_PATIENT_DISCONNECTION_STATE.value) verify_patient_disconnection_screens()