Index: shared/scripts/configuration/utility.py =================================================================== diff -u -r0cc92d3b75bfb96dc4ecafd760a9ce15e455033b -rd9160e9049fec8daa9ff8eabd5dc03ea839793a7 --- shared/scripts/configuration/utility.py (.../utility.py) (revision 0cc92d3b75bfb96dc4ecafd760a9ce15e455033b) +++ shared/scripts/configuration/utility.py (.../utility.py) (revision d9160e9049fec8daa9ff8eabd5dc03ea839793a7) @@ -14,12 +14,22 @@ import sys +import csv +import os import test +import glob import squish -from configuration import config +from dialin.ui.hd_simulator import HDSimulator from builtins import int as pyInt +from datetime import datetime +from dialin.common.msg_ids import MsgIds - +hd_simulator = HDSimulator() + +LOG_LOCATION = "/home/denali/Desktop/sd-card/log/*.log" +INSTRUCTION_CONF_LOCATION = "/home/denali/Projects/application/resources/settings/Instructions/Instructions.conf" + + def start_application(app_name): """ Function to start application and verify application status [running] @@ -108,3 +118,334 @@ raise LookupError("zone object is not in view to the user after " + \ "trying 100 times") + + +def navigate_to_pretreatment_screen(mode): + """ + Method to navigate to sub mode under pre-treatment screen + @param mode - (int) pre treatment state + """ + hd_simulator.cmd_set_hd_operation_mode_data(5,0) + + hd_simulator.cmd_send_pre_treatment_state_data(sub_mode=mode, water_sample_state=0,consumables_self_test_state=0, + no_cartridge_self_test_state=0,installation_state=0, dry_self_test_state=0, + prime_state=0,recirculate_state=0, patient_connection_state=0) + + + +def get_current_date_and_time(date_format='%Y/%b/%d - %H:%M:%S'): + + date = datetime.now() + return str(date.strftime(date_format)) + + +def is_float(num): + """ + This function checks the value is adaptable for float conversion. + + @param num - (string) input value for conversion. + @return True/False- (bool) returns True if the value can type casted into float, else False + """ + try: + if '.' in num: + float(num) + return True + except ValueError: + return False + + +def is_intiger(num): + """ + This function checks the value is adaptable for integer conversion. + + @param num - (string) (string) input value for conversion. + @return True/False- (bool) returns True if the value can type casted into int, else False + """ + try: + if num.isdigit(): + return True + except ValueError: + return False + + +def get_extracted_file(): + """ + This function is the handler for getting file from log folder. + + This handler will go inside log folder and looks for newly added log based on current time. + if it satisfied that condition, it will return the exact path of newly created log. + + @return latest_file - (string) returns latest file that append on log folder from sd-data + """ + try: + list_of_files = glob.glob(LOG_LOCATION) + latest_file = max(list_of_files, key=os.path.getctime) + return latest_file + except: + test.fail("log file is not created during application interaction") + return False + + +# def get_message_from_log(file_name, message_text): +# +# """ +# This method intended to extract the message from application log. +# +# The handler find a match between time displayed on log and occurrences time of event to +# extract the details. For row[index], index represent column to be extracted. +# +# @param file_name - (string) path of the latest log file created. +# @param message_text - (string) message text to be extracted. +# @return message - (string) binary message for the event. +# """ +# message = "" # Message binaries appending with for easy identification +# try: +# with open(file_name, 'r') as csv_file: +# try: +# for row in reversed(list(csv.reader(csv_file))): +# if row[0].isalpha(): +# pass +# else: +# row_length = sum(1 for values in row) +# if row_length >= 4: +# if row[3]!= None and row[3] == message_text: +# message_length = sum(1 for values in row) +# for column in range((message_length-1), 3, -1): +# message = row[column] + message +# return message +# else: +# pass +# except: +# test.fail("application log data is corrupted") +# except: +# test.fail("application log is protected") + + +def get_message_from_log(file_name, message_text): + + """ + This method intended to extract the message from application log. + For row[index], index represent column to be extracted. + + @param file_name - (string) path of the latest log file created. + @param message_text - (string) message text to be extracted. + @return message - (string) binary message for the event. + """ + message = [] + try: + with open(file_name, 'r') as csv_file: + try: + for row in reversed(list(csv.reader(csv_file))): + if row[0].isalpha(): + pass + else: + row_length = sum(1 for values in row) + if row_length >= 4: + if row[3]!= None and row[3] == message_text: + message_length = sum(1 for values in row) + for column in range(4, (message_length),1): #for column in range((message_length-1), 3, -1): + message.append(row[column]) + for value in range(len(message)): + float_status = is_float(message[value]) + int_status = is_intiger(message[value]) + if float_status is True: + message[value] = float(message[value]) + if int_status is True: + message[value] = int(message[value]) + return message + else: + pass + except: + test.fail("application log data is corrupted") + except: + test.fail("application log is protected") + + +# def get_ack_request_details(file_name, message_text): +# """ +# This method intended to extract the ack request from application log. +# +# The handler find a match between time displayed on log and occurrences time of event +# along with message id to extract the request acknowledgement. +# For row[index], were index represent column to be extracted. +# +# @param file_name - (string) path of the latest log file created. +# @return row[3] - (string) acknowledgement request status. +# @return row[4] - (string) Negative requested acknowledgement value (Sq) . +# """ +# try: +# message_id = " ID" +# with open(file_name, 'r') as csv_file: +# try: +# for row in reversed(list(csv.reader(csv_file))): +# if row[0].isalpha(): +# pass +# else: +# row_length = sum(1 for values in row) +# if row_length == 6 and row[3] != message_text: +# if row[5].split(':')[0] == message_id: +# extracted_message_id = pyInt(row[5].split(':')[1], 16) +# formatted_message_id = format(extracted_message_id, '#0x') +# return row[3], row[4], formatted_message_id.replace("00", "") +# else: +# pass +# except: +# test.fail("application log data is corrupted - unable to fetch data") +# except: +# test.fail("application log is protected") + + +def get_ack_request_details(file_name, message_text): + """ + This method intended to extract acknowledgement request status, negative requested acknowledgement + and message id from application log. + + For row[index], were index represent column to be extracted. + + @param file_name - (string) path of the latest log file created. + @param message_text - (string) message text to be extracted from log. + @return row[3] - (string) acknowledgement request status. + @return row[4] - (string) Negative requested acknowledgement value (Sq). + @return message_id - (string) formatted message id from log. + """ + try: + message_id = " ID" + with open(file_name, 'r') as csv_file: + try: + for row in reversed(list(csv.reader(csv_file))): + if row[0].isalpha(): + pass + else: + row_length = sum(1 for values in row) + if row_length == 6 and row[3] != message_text: + if row[5].split(':')[0] == message_id: + extracted_message_id = pyInt(row[5].split(':')[1], 16) + formatted_message_id = format(extracted_message_id, '#0x') + return row[3], row[4], formatted_message_id.replace("00", "") + else: + pass + except: + test.fail("application log data is corrupted - unable to fetch data") + except: + test.fail("application log is protected") + + +# def get_bak_request_details(file_name, ack_bak_value): +# """ +# This method intended to extract the ack from application log. +# +# The handler find a match between time displayed on log and occurrences time of event +# along with message id to extract the request acknowledgement. +# For row[index], were index represent column to be extracted. +# +# @param file_name - (string) path of the latest log file created. +# @param message_id - (string) message text to be extracted. +# @param ack_bak_value - (string) Positive back acknowledgement value (Sq). +# @return row[3] - (string) acknowledgement back status. +# """ +# try: +# with open(file_name, 'r') as csv_file: +# try: +# for row in reversed(list(csv.reader(csv_file))): +# if row[0].isalpha(): +# pass +# else: +# row_length = sum(1 for values in row) +# log_time = row[0].split(':') +# if row_length >= 5: +# if row[4] == ack_bak_value: +# return row[3] +# else: +# pass +# else: +# pass +# except: +# test.fail("application log data is corrupted") +# except: +# test.fail("application log is protected") + + +def get_bak_request_details(file_name, ack_bak_value): + """ + This method intended to extract the acknowledgement back status from application log. + + For row[index], were index represent column to be extracted. + + @param file_name - (string) path of the latest log file created. + @param ack_bak_value - (string) Positive back acknowledgement value (Sq). + @return row[3] - (string) acknowledgement back status. + """ + try: + with open(file_name, 'r') as csv_file: + try: + for row in reversed(list(csv.reader(csv_file))): + if row[0].isalpha(): + pass + else: + row_length = sum(1 for values in row) + if row_length >= 5: + if row[4] == ack_bak_value: + return row[3] + else: + pass + else: + pass + except: + test.fail("application log data is corrupted") + except: + test.fail("application log is protected") + + +# def get_current_log_details(message_ack = None, message_text = None): +# """ +# This function is capable to perform data analysis from application log folder. +# +# logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". +# +# In row[index], index represent column to be extracted. +# @param message_ack - (string) 'Y' - if ack is satisfied in log / 'N' - if ack condition is not satisfied +# @param message_text - (string) message text to be extracted from log. +# @param message_id - (string) message id to be extracted from log. +# @return content_record - (list) list contains extracted data (ack_req, ack_bak, message). +# """ +# content_record = [] +# file_name = get_extracted_file() +# if message_text != None: +# message = get_message_from_log(file_name, message_text) +# content_record.append(message) +# if message_ack != None: +# ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) +# ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack +# content_record.append(ack_req_status) +# content_record.append(message_id.lower()) +# if message_ack != None and ack_bak_value != None: +# ack_bak_status = get_bak_request_details(file_name, ack_bak_value) +# content_record.append(ack_bak_status) +# return content_record + + +def get_current_log_details(message_ack = None, message_text = None): + """ + This function is capable to perform data analysis from application log folder. + + logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". + + In row[index], index represent column to be extracted. + @param message_ack - (string) 'Y' - if ack is satisfied in log / 'N' - if ack condition is not satisfied + @param message_text - (string) message text to be extracted from log. + @return content_record - (list) list contains extracted data (ack_req, ack_bak, message, message_id). + """ + content_record = [] + file_name = get_extracted_file() + if message_text != None: + message = get_message_from_log(file_name, message_text) + content_record.append(message) + if message_ack != None: + ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) + ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack + content_record.append(ack_req_status) + content_record.append(message_id.lower()) + if message_ack != None and ack_bak_value != None: + ack_bak_status = get_bak_request_details(file_name, ack_bak_value) + content_record.append(ack_bak_status) + return content_record