########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file utils.py # # @author (last) Joseph varghese # @date (last) 15-Jan-2022 # ############################################################################ import csv import test import os import shutil from dialin.ui import utils from configuration import config from datetime import timezone from datetime import datetime def get_current_date_and_time_in_utc(date_format='%Y/%m/%d - %H:%M:%S'): """ Method to get current date and time. @input date_format (str) - format of date to be retrieved. @return (str) - date in specified format. """ date = datetime.now(timezone.utc) return str(date.strftime(date_format)) def append_cloudsync_credentials_file(): """ This function is used for creating .pem files in the cloudsync folder. pem files is created on '/home/denali/Desktop/cloudsync/credentials' """ try: os.makedirs(config.CLOUD_CREDENTIALS_LOCATION, exist_ok = True) test.log("Directory created successfully") for file_handler in range(len(config.PEM_FILES)): path = os.path.join(config.CLOUD_CREDENTIALS_LOCATION, config.PEM_FILES[file_handler]) with open(path, 'w') as file_reader: pass except OSError: test.log("Directory can not be created") def get_current_date_and_time(date_format='%Y/%b/%d - %H:%M:%S'): date = datetime.now() return str(date.strftime(date_format)) def get_cloud_sync_input_file(): """ This function is the handler for getting file from log folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_inp.log' @return latest_file - (string) returns latest input log file path from sd-data """ try: current_date = get_current_date_and_time_in_utc(date_format = "%Y_%m_%d") latest_file = config.INP_BUF_FILE_LOCATION+current_date+'_inp.buf' return latest_file except: return False def get_cloud_sync_output_file(): """ This function is the handler for getting file from log folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_out.log' @return latest_file - (string) returns latest output log file path from sd-data """ try: current_date = get_current_date_and_time_in_utc(date_format = "%Y_%m_%d") latest_file = config.INP_BUF_FILE_LOCATION+current_date+'_out.buf' return latest_file except: return False def retrive_log_data(readline_count = 1): """ This function is the handler for getting file from log folder. Application log data is automatically appended on '/home/denali/Desktop/sd-card/cloudsync/ {current_date}_out.log' @input readline_count (int) - number of line to be read from cloud-sync log. @return cloudsync_data - (list) returns extracted log data. """ cloudsync_data = [] count = 0 file_name = get_cloud_sync_input_file() try: with open(file_name,mode = 'r') as filereader: contents = csv.reader(filereader) try: for reader in reversed(list(contents)): if readline_count == count: return cloudsync_data cloudsync_data.append(reader) count = count + 1 except: test.fail("application log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_epoch_value_consistancy(current_epoch_value, expected_epoch_value): """ This function is verify consistancy of epoch value. @input current_epoch_value (float) - current epoch time @input expected_epoch_value (str) - epoch time from cloud-sync log. @return (bool) - True - if expected value is in range. else, return False """ expected_epoch_value = int(expected_epoch_value) maximum_epoch_value = int(current_epoch_value + 25) minimum_epoch_value = int(current_epoch_value - 25) if expected_epoch_value > minimum_epoch_value and expected_epoch_value < maximum_epoch_value : return True return False def is_intiger(num): """ This function checks the value is adaptable for integer conversion. @param num - (string) (string) input value for conversion. @return True/False- (bool) returns True if the value can type casted into int, else False """ try: if num.isdigit(): return True except ValueError: return False def get_extracted_file(): """ This function is the handler for getting file from log folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/log/ {current_date}_denaliSquish.log' @return latest_file - (string) returns latest file that append on log folder from sd-data """ try: current_date = get_current_date_and_time(date_format = "%Y_%m_%d") latest_file = config.LOG_LOCATION + current_date + '_denaliSquish.log' return latest_file except: return False def get_extracted_error_file(): """ This function is the handler for getting error file from service folder. Application log file is automatically created on '/home/denali/Desktop/sd-card/service/ {current_date}_denaliSquish.err ' @return latest_file - (string) returns latest file that append on log folder from service """ try: current_date = get_current_date_and_time(date_format = "%Y_%m_%d") latest_file = config.ERROR_FILE_LOCATION + current_date + '_denaliSquish.err' return latest_file except: return False def get_message_from_log(file_name, message_text): """ This method intended to extract the message from application log. For row[index], index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param message_text - (string) message text to be extracted. @return message - (list) API arguments displayed in log. """ message = [] count = 0 try: with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length >= 4: if row[3]!= None and row[3] == message_text: if count == 30: test.fail("handler unable to find message text from log file.") message_length = sum(1 for values in row) for column in range(4, message_length, 1): message.append(row[column]) count +=1 for value in range(len(message)): float_status = is_float(message[value]) int_status = is_intiger(message[value]) if float_status is True: message[value] = float(message[value]) if int_status is True: message[value] = int(message[value]) return message else: pass except: test.fail("application log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_ack_request_details(file_name, message_text): """ This method intended to extract acknowledgement request status, negative requested acknowledgement and message id from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param message_text - (string) message text to be extracted from log. @return row[3] - (string) acknowledgement request status. @return row[4] - (string) Negative requested acknowledgement value (Sq). @return message_id - (string) formatted message id from log. """ try: message_id = " ID" with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length == 6 and row[3] != message_text: if row[5].split(':')[0] == message_id: extracted_message_id = pyInt(row[5].split(':')[1], 16) formatted_message_id = format(extracted_message_id, '#0x') # MSG_ID_HD_DEBUG_EVENT (0xFFF1) and MSG_ID_DG_DEBUG_EVENT (0xFFF2) hex values are reversed in log. string_format_id = str(formatted_message_id) first_two_char = string_format_id[2:4] last_two_char = string_format_id[4:6] if last_two_char != '00' and len(string_format_id) != 5: return row[3], row[4], ('0x'+last_two_char+first_two_char) else: return row[3], row[4], string_format_id.replace("00", "") else: pass except: test.fail("application log data is corrupted - unable to fetch data") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_bak_request_details(file_name, ack_bak_value): """ This method intended to extract the acknowledgement back status from application log. For row[index], were index represent column to be extracted. @param file_name - (string) path of the latest log file created. @param ack_bak_value - (string) Positive back acknowledgement value (Sq). @return row[3] - (string) acknowledgement back status. """ try: with open(file_name, 'r') as csv_file: try: for row in reversed(list(csv.reader(csv_file))): if row[0].isalpha(): pass else: row_length = sum(1 for values in row) if row_length >= 5: if row[4] == ack_bak_value: return row[3] else: pass else: pass except: test.fail("application log data is corrupted") except: test.fail("Log file is not created or log file is not created based on standard log naming format.") def get_current_log_details(message_ack = False, message_text = None): """ This function is capable to perform data analysis from application log folder. logs are automatically created in path :"/home/denali/Desktop/sd-card/log/*.log". In row[index], index represent column to be extracted. @param message_ack - (bool) 'True' - if ack is satisfied in log / 'False' - if ack condition is not satisfied @param message_text - (string) message text to be extracted from log. @return content_record - (list) list contains extracted data (ack_req, ack_bak, message, message_id). """ content_record = [] file_name = get_extracted_file() if message_text != None: message = get_message_from_log(file_name, message_text) content_record.append(message) if message_ack != False: ack_req_status, ack_req_value, message_id = get_ack_request_details(file_name, message_text) ack_bak_value = ack_req_value.replace(":-", ":") # getting negative requested ack from positive requested ack content_record.append(ack_req_status) content_record.append(message_id.lower()) if message_ack != False and ack_bak_value != None: ack_bak_status = get_bak_request_details(file_name, ack_bak_value) content_record.append(ack_bak_status) return content_record def rename_file(path,filename): """ This method Rename the original folder name to dummy name. """ folder_path = os.listdir(path) for files in folder_path: if files == filename+"_1": if os.path.exists(path+filename+"_1"): shutil.rmtree(path+filename+"_1") test.log(str(filename+" name changed to "+files)) if files == filename: if os.path.exists(path+filename): utils.waitForGUI(1) os.rename(path+filename,path+filename+"_1") test.log(str(files+" name changed to "+filename+"_1")) def rename_old_name(path,filename): """ This method Rename the dummy name to original folder name . """ folder_path = os.listdir(path) for files in folder_path: if files == filename: if os.path.exists(path+filename): shutil.rmtree(path+filename) test.log(str(files+"_1 name changed to "+filename)) utils.waitForGUI(5) if files != filename: if (files == filename+"_1"): os.rename(path+files,path+filename) test.log(str(files+" name changed to "+filename))