Index: dialin/common/msg_defs.py =================================================================== diff -u -rf053467ac7cfb9fe349e394342d3a9253a377403 -rbe687b87ec7dbbb312fc9dc67fd422119e43e64c --- dialin/common/msg_defs.py (.../msg_defs.py) (revision f053467ac7cfb9fe349e394342d3a9253a377403) +++ dialin/common/msg_defs.py (.../msg_defs.py) (revision be687b87ec7dbbb312fc9dc67fd422119e43e64c) @@ -58,6 +58,7 @@ REQUEST_REJECT_REASON_SALINE_BOLUS_NOT_IN_PROGRESS = 22 NUM_OF_REQUEST_REJECT_REASONS = 23 + class MsgFieldPositions: # Generic response msg field byte positions (where 32-bit data fields are used) START_POS_FIELD_1 = 6 # Hardcoded for now to avoid cyclic import issue. See protocols.CAN.DenaliMessage class Index: dialin/common/ui_defs.py =================================================================== diff -u -rf053467ac7cfb9fe349e394342d3a9253a377403 -rbe687b87ec7dbbb312fc9dc67fd422119e43e64c --- dialin/common/ui_defs.py (.../ui_defs.py) (revision f053467ac7cfb9fe349e394342d3a9253a377403) +++ dialin/common/ui_defs.py (.../ui_defs.py) (revision be687b87ec7dbbb312fc9dc67fd422119e43e64c) @@ -94,15 +94,6 @@ UF_OFF_STATE = 3 # Completed/off state of the ultrafiltration state machine UF_COMPLETED_STATE = 4 # Completed state of ultrafiltration state machine - # Sub Mode - TREATMENT_START_STATE = 0 - TREATMENT_BLOOD_PRIME_STATE = 1 - TREATMENT_DIALYSIS_STATE = 2 - TREATMENT_STOP_STATE = 3 - TREATMENT_RINSEBACK_STATE = 4 - TREATMENT_RECIRC_STATE = 5 - TREATMENT_END_STATE = 6 - # Heparin states HEPARIN_STATE_OFF = 0 # No heparin delivery is in progress HEPARIN_STATE_PAUSED = 1 # Heparin delivery paused Index: dialin/dg/calibration_record.py =================================================================== diff -u -rdd42e4d9cfe821b0a755ccc86cc1a4a2a3dd2f37 -rbe687b87ec7dbbb312fc9dc67fd422119e43e64c --- dialin/dg/calibration_record.py (.../calibration_record.py) (revision dd42e4d9cfe821b0a755ccc86cc1a4a2a3dd2f37) +++ dialin/dg/calibration_record.py (.../calibration_record.py) (revision be687b87ec7dbbb312fc9dc67fd422119e43e64c) @@ -52,7 +52,7 @@ self._raw_cal_record = [] self._write_fw_data_to_excel = True self._is_getting_cal_in_progress = False - self._utilities = NVOpsUtils() + self._utilities = NVOpsUtils(logger=self.logger) # DG calibration_record main record self.dg_calibration_record = OrderedDict() @@ -84,7 +84,7 @@ """ # If getting the calibration is in progress, do not start another process - if self._is_getting_cal_in_progress is not True: + if not self._is_getting_cal_in_progress: self._is_getting_cal_in_progress = True # Clear the list for the next call self._raw_cal_record.clear() @@ -95,7 +95,7 @@ """ Handles getting DG calibration_record record from firmware. - @return: None + @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_GET_CALIBRATION_RECORD.value) @@ -149,7 +149,8 @@ # Done with getting all the calibration data. self._is_getting_cal_in_progress = False # If all the messages have been received, call another function to process the raw data - self._utilities.process_received_record_from_fw(self.dg_calibration_record, self._raw_cal_record, + self._utilities.process_received_record_from_fw(self.dg_calibration_record, + self._raw_cal_record, write_to_excel=self._write_fw_data_to_excel) def set_dg_calibration_record(self): @@ -171,7 +172,7 @@ time.sleep(self._DIALIN_RECORD_UPDATE_DELAY_S) # Write the excel record - self._utilities.write_excel_record_to_dialin_record(self.dg_calibration_record) + self._utilities.write_excel_record_to_calibration_record(self.dg_calibration_record) record_packets = self._utilities.prepare_record_to_send_to_fw(self.dg_calibration_record) @@ -207,18 +208,18 @@ # Call the other functions to get the dictionaries of each hardware group. All the dictionaries are # ordered dictionaries to maintain the order in which they are inserted. The results are a tuple, the first # element is the dictionary that was built and the second element is the byte size of the dictionary. - functions = [self._prepare_pressure_sensors_cal_record(), self._prepare_flow_sensors_cal_record(), + records_with_sizes = [self._prepare_pressure_sensors_cal_record(), self._prepare_flow_sensors_cal_record(), self._prepare_load_cells_record(), self._prepare_temperature_sensors_record(), self._prepare_conductivity_sensors_record(), self._prepare_pumps_record(), self._prepare_volume_record(), self._prepare_acid_concentrates_record(), self._prepare_bicarb_concentrates_record(), self._prepare_filters_record(), self._prepare_fans_record(), self._prepare_accelerometer_sensor_record()] - for function in functions: + for record, byte_size in records_with_sizes: # Update the groups bytes size so far to be use to padding later - groups_byte_size += function[1] + groups_byte_size += byte_size # Update the calibration record - self.dg_calibration_record.update(function[0]) + self.dg_calibration_record.update(record) # Build the CRC of the main calibration_record record record_crc = OrderedDict({'crc': [' 1) + column_letter = get_column_letter(column) + cell_name = column_letter + str(row) - @param workbook_obj: Excel workbook object - @param project: Name of the sheet (i.e HD-DEN-4308) - @param row: Current row number - @param column: Current column number - @param data: Data to be written at the cell - @param name: Font type (default Calibri) - @param font: Font size (default 11) - @param bold: Bold or un-bold (default un-bold) - @param color: Color of the cell (default no color) - @param merge: Merge cells. Cells must be provided with A1:A4 format (default none) - @param horizontal: Horizontal alignment (default left) - @param freeze: Freeze top row (default false) - @param max_col_len: maximum length of a column (default none, means it is not restricted) + # Set the active worksheet to the provided worksheet + active_sheet = workbook_obj[project] + # Set the provided data into the specified row and column and set the bold, color and horizontal alignment + active_sheet.cell(row=row, column=column).value = data + active_sheet.cell(row=row, column=column).font = Font(size=font, bold=bold, name=name) + # Wrapping text is not needed unless the length of the data is more than maximum column + # length + active_sheet[cell_name].alignment = Alignment(vertical='center', horizontal=horizontal, wrap_text=False) + # Get the width of the current column + column_width = active_sheet.column_dimensions[column_letter].width + # When the column width is on the default, openpyxl reports None. If the width is reported as None, + # it will be set to 0 for math comparison + column_width = 0 if column_width is None else column_width + # If the length is not provided, use the default maximum length + max_len = COLUMN_MAX_WIDTH if max_col_len is None else max_col_len + # Remove all the end of the line artifacts + length_of_data = len(str(data).rstrip()) - @return: None - """ - row_height = 0 - # Get the number of an alphabetic column (i.e A -> 1) - column_letter = get_column_letter(column) - cell_name = column_letter + str(row) + # If the length of data was greater than the maximum length, calculate the number of + # rows is needed with the + # default height + if length_of_data > max_len: + # Since the length is greater than maximum, enable wrap text + active_sheet[cell_name].alignment = Alignment(vertical='center', horizontal=horizontal, wrap_text=True) + # Calculate what the row height should be when the cell is extended + row_height = math.ceil(length_of_data / max_len) * ROW_DEFAULT_HEIGHT + active_sheet.column_dimensions[column_letter].width = max_len + # If the length of the data provided is already less than the length of cell, + # do nothing + elif column_width < length_of_data: + active_sheet.column_dimensions[column_letter].width = length_of_data + COLUMN_WIDTH_TOLERANCE + row_height = ROW_DEFAULT_HEIGHT - # Set the active worksheet to the provided worksheet - active_sheet = workbook_obj[project] - # Set the provided data into the specified row and column and set the bold, color and horizontal alignment - active_sheet.cell(row=row, column=column).value = data - active_sheet.cell(row=row, column=column).font = Font(size=font, bold=bold, name=name) - # Wrapping text is not needed unless the length of the data is more than maximum column - # length - active_sheet[cell_name].alignment = Alignment(vertical='center', horizontal=horizontal, wrap_text=False) - # Get the width of the current column - column_width = active_sheet.column_dimensions[column_letter].width - # When the column width is on the default, openpyxl reports None. If the width is reported as None, - # it will be set to 0 for math comparison - column_width = 0 if column_width is None else column_width - # If the length is not provided, use the default maximum length - max_len = self.COLUMN_MAX_WIDTH if max_col_len is None else max_col_len - # Remove all the end of the line artifacts - length_of_data = len(str(data).rstrip()) + # If the current row height is not defined or the row height is less than the calculated new height, set it + if active_sheet.row_dimensions[row].height is None or active_sheet.row_dimensions[row].height < row_height: + active_sheet.row_dimensions[row].height = row_height - # If the length of data was greater than the maximum length, calculate the number of - # rows is needed with the - # default height - if length_of_data > max_len: - # Since the length is greater than maximum, enable wrap text - active_sheet[cell_name].alignment = Alignment(vertical='center', horizontal=horizontal, wrap_text=True) - # Calculate what the row height should be when the cell is extended - row_height = math.ceil(length_of_data / max_len) * self.ROW_DEFAULT_HEIGHT - active_sheet.column_dimensions[column_letter].width = max_len - # If the length of the data provided is already less than the length of cell, - # do nothing - elif column_width < length_of_data: - active_sheet.column_dimensions[column_letter].width = length_of_data + self.COLUMN_WIDTH_TOLERANCE - row_height = self.ROW_DEFAULT_HEIGHT + # If color has been defined, set the color of the cell + if color is not None: + active_sheet[cell_name].fill = color - # If the current row height is not defined or the row height is less than the calculated new height, set it - if active_sheet.row_dimensions[row].height is None or active_sheet.row_dimensions[row].height < row_height: - active_sheet.row_dimensions[row].height = row_height + # If merge has been requested + if merge is not None: + # The format of merge for this function is A1:C1 + active_sheet.merge_cells(str(merge)) - # If color has been defined, set the color of the cell - if color is not None: - active_sheet[cell_name].fill = color + if freeze: + # To freeze row 1, make the cell is not row 1, that's why A2 was chosen + active_sheet.freeze_panes = 'A2' - # If merge has been requested - if merge is not None: - # The format of merge for this function is A1:C1 - active_sheet.merge_cells(str(merge)) - if freeze: - # To freeze row 1, make the cell is not row 1, that's why A2 was chosen - active_sheet.freeze_panes = 'A2' +def load_excel_report(path): + """ + This function returns an object of a currently existing excel workbook - @staticmethod - def load_excel_report(path): - """ - This function returns an object of a currently existing excel workbook + @return loaded excel workbook object + """ + return load_workbook(path) - @return loaded excel workbook object - """ - return load_workbook(path) - @staticmethod - def get_cell_value(workbook_obj, project, row, col): - """ - This function returns the value of a written excel cell +def get_cell_value(workbook_obj, project, row, col): + """ + This function returns the value of a written excel cell - @return excel workbook object - """ - # Set the active worksheet to the provided worksheet - active_sheet = workbook_obj[project] - # Get the cell object - cell_obj = active_sheet.cell(row=row, column=col) - # Convert it to the actual value - return cell_obj.value + @return excel workbook object + """ + # Set the active worksheet to the provided worksheet + active_sheet = workbook_obj[project] + # Get the cell object + cell_obj = active_sheet.cell(row=row, column=col) + # Convert it to the actual value + return cell_obj.value - @staticmethod - def merge_cells(workbook_obj, project, start_row, start_col, end_row, end_col): - """ - This function merges the specified cells. - @param workbook_obj: Excel workbook object - @param project: Name of the sheet (i.e HD-DEN-4308) - @param start_row: Row number at the beginning of merge - @param start_col: Column number at the beginning of merge - @param end_row: Row number at the end of merge - @param end_col: Column number at the end of merge +def merge_cells(workbook_obj, project, start_row, start_col, end_row, end_col): + """ + This function merges the specified cells. - @return: None - """ - # Go to the define sheet - active_sheet = workbook_obj[project] - # Convert the cell numbers to text and number (i.e row=1 and col=1 -> A1) for the start and end cells - merge_start_cell = get_column_letter(start_col) + str(start_row) - merge_end_cell = get_column_letter(end_col) + str(end_row) - # The format of merge for this function is A1:C1 - active_sheet.merge_cells(str(merge_start_cell) + ':' + str(merge_end_cell)) + @param workbook_obj: Excel workbook object + @param project: Name of the sheet (i.e HD-DEN-4308) + @param start_row: Row number at the beginning of merge + @param start_col: Column number at the beginning of merge + @param end_row: Row number at the end of merge + @param end_col: Column number at the end of merge - @staticmethod - def save_report(excel_workbook, save_dir, record_name): - """ - This function overrides the save function in the Base class. The function saves the excel file. + @return: None + """ + # Go to the define sheet + active_sheet = workbook_obj[project] + # Convert the cell numbers to text and number (i.e row=1 and col=1 -> A1) for the start and end cells + merge_start_cell = get_column_letter(start_col) + str(start_row) + merge_end_cell = get_column_letter(end_col) + str(end_row) + # The format of merge for this function is A1:C1 + active_sheet.merge_cells(str(merge_start_cell) + ':' + str(merge_end_cell)) - @returns none - """ - # Get the current date - current_date = str(datetime.datetime.now().date()) - # Create the save path by using the path, date and current code review excel count out of total number of them - path = os.path.join(save_dir, current_date + '-' + str(record_name).upper() + '-Record.xlsx') - excel_workbook.save(filename=path) \ No newline at end of file + +def save_report(excel_workbook, save_dir, record_name): + """ + This function overrides the save function in the Base class. The function saves the excel file. + + @returns none + """ + # Get the current date + current_date = str(datetime.datetime.now().date()) + # Create the save path by using the path, date and current code review excel count out of total number of them + path = os.path.join(save_dir, current_date + '-' + str(record_name).upper() + '-Record.xlsx') + excel_workbook.save(filename=path) \ No newline at end of file Fisheye: Tag be687b87ec7dbbb312fc9dc67fd422119e43e64c refers to a dead (removed) revision in file `dialin/utils/fw.py'. Fisheye: No comparison available. Pass `N' to diff? Index: dialin/utils/nv_ops_utils.py =================================================================== diff -u -r1c507eb2cac4a66d8d0fd567b8eed90374c1d809 -rbe687b87ec7dbbb312fc9dc67fd422119e43e64c --- dialin/utils/nv_ops_utils.py (.../nv_ops_utils.py) (revision 1c507eb2cac4a66d8d0fd567b8eed90374c1d809) +++ dialin/utils/nv_ops_utils.py (.../nv_ops_utils.py) (revision be687b87ec7dbbb312fc9dc67fd422119e43e64c) @@ -1,18 +1,16 @@ - import struct -import datetime import time -import math -import os -import shutil -from .excel_ops import ExcelOps +from logging import Logger +from typing import List +from .excel_ops import * + + class NVOpsUtils: """ - Dialysate Generator (DG) Dialin API sub-class for _utilities commands. The commands are used to process - the calibration_record records, service records, system records, and the scheduled runs records. The records are - prepared to be sent to firmware to be received from firmware. + Processes the calibration_record records, service records, system records, and the scheduled runs records. + The records are prepared to be sent to firmware or to be received from firmware. """ CRC_16_TABLE = ( 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, @@ -67,16 +65,21 @@ _PAYLOAD_TOTAL_MSG_INDEX = 1 _PAYLOAD_TOTAL_BYTES_INDEX = 2 - def __init__(self): + def __init__(self, logger: Logger): + """ + Constructor for the NVOptsUtils class + @param logger: (Logger) the logger + """ + + self.logger = logger self._workspace_dir = '' self._excel_workbook = '' self._record_name = '' self._firmware_stack = '' self._is_writing_to_excel_done = False self._is_read_done = False - self.excel_ops = ExcelOps() # This list contains different data packet lists # i.e. [[message 1, payload], [message 2, payload], ...] self._record_packets_to_send_to_fw = [] @@ -119,138 +122,155 @@ if is_report_found: # Load the excel workbook - self._excel_workbook = self.excel_ops.load_excel_report(path) + self._excel_workbook = load_excel_report(path) else: # Get an excel workbook object - self._excel_workbook = self.excel_ops.get_an_excel_workbook() + self._excel_workbook = get_an_excel_workbook() # Setup worksheet and create the current tab - self.excel_ops.setup_excel_worksheet(self._excel_workbook, self._record_name) + setup_excel_worksheet(self._excel_workbook, self._record_name) - def write_fw_record_to_excel(self, dialin_record): + def write_fw_record_to_excel(self, calibration_record: dict): + """ + Writes a calibration record to excel + @param calibration_record: (dict) the record to write to excel + @return: None + """ - row = 1 - for group in dialin_record.keys(): + try: - start_row = row - start_col = 1 - col = 1 - if isinstance(dialin_record[group], dict): + row = 1 + for group in calibration_record.keys(): - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, group, bold=True, - max_col_len=len(group)) + start_row = row + start_col = 1 + col = 1 + if isinstance(calibration_record[group], dict): - for hardware in dialin_record[group].keys(): - list_of_keys = list(dialin_record[group].keys()) + write_to_excel(self._excel_workbook, self._record_name, row, col, group, bold=True, + max_col_len=len(group)) - col = 2 - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, hardware) - col += 1 + for hardware in calibration_record[group].keys(): + list_of_keys = list(calibration_record[group].keys()) - if isinstance(dialin_record[group][hardware], dict): - for spec in dialin_record[group][hardware]: + col = 2 + write_to_excel(self._excel_workbook, self._record_name, row, col, hardware) + col += 1 - spec_value = dialin_record[group][hardware][spec][1] + if isinstance(calibration_record[group][hardware], dict): + for spec in calibration_record[group][hardware]: - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, spec) - col += 1 - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, spec_value) - col += 1 - else: - spec_value = dialin_record[group][hardware][1] - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, spec_value) + spec_value = calibration_record[group][hardware][spec][1] - if list_of_keys.index(hardware) == len(list_of_keys) - 1: - self.excel_ops.merge_cells(self._excel_workbook, self._record_name, start_row, start_col, - row, start_col) - row += 1 + write_to_excel(self._excel_workbook, self._record_name, row, col, spec) + col += 1 + write_to_excel(self._excel_workbook, self._record_name, row, col, spec_value) + col += 1 + else: + spec_value = calibration_record[group][hardware][1] + write_to_excel(self._excel_workbook, self._record_name, row, col, spec_value) - row += 1 + if list_of_keys.index(hardware) == len(list_of_keys) - 1: + merge_cells(self._excel_workbook, self._record_name, start_row, start_col, + row, start_col) + row += 1 - self.excel_ops.save_report(self._excel_workbook, self._workspace_dir, self._firmware_stack) - # Signal reading is done - self._is_writing_to_excel_done = True + row += 1 - def write_excel_record_to_dialin_record(self, dialin_record): + save_report(self._excel_workbook, self._workspace_dir, self._firmware_stack) + # Signal reading is done + self._is_writing_to_excel_done = True + except Exception as e: + self.logger.error("Failed to write calibration record to excel: {0}".format(e)) + + def write_excel_record_to_calibration_record(self, calibration_record: dict): """ - Publicly accessible function to write excel record to Dialin record + Publicly accessible function to write excel record to a calibration record - @param dialin_record: Dialin record dictionary(i.e calibration record) - + @param calibration_record: (dict) the calibration record to write @return True if reading to firmware records to excel is done otherwise False """ - temp_buffer = [] - is_value_different = False - row = 1 - for group in dialin_record.keys(): + try: + temp_buffer = [] + is_value_different = False + row = 1 + for group in calibration_record.keys(): - col = 1 - cell_value = self.excel_ops.get_cell_value(self._excel_workbook, self._record_name, row, col) + col = 1 + cell_value = get_cell_value(self._excel_workbook, self._record_name, row, col) - if cell_value == group: + if cell_value == group: - if isinstance(dialin_record[group], dict): - for hardware in dialin_record[group].keys(): - if isinstance(dialin_record[group][hardware], dict): - col = 4 - for spec in dialin_record[group][hardware]: - cell_value = self.excel_ops.get_cell_value(self._excel_workbook, self._record_name, row, - col) - # Check if the cell value is not none. If it is none, it cannot be converted to a number - # like float or integer - is_cell_value_valid = True if cell_value is not None else False - if 'crc' not in spec and 'time' not in spec and is_cell_value_valid: - temp_buffer.append(struct.pack(dialin_record[group][hardware][spec][0], cell_value)) - if dialin_record[group][hardware][spec][1] != cell_value: - is_value_different = True - dialin_record[group][hardware][spec][1] = cell_value + if isinstance(calibration_record[group], dict): + for hardware in calibration_record[group].keys(): + if isinstance(calibration_record[group][hardware], dict): + col = 4 + for spec in calibration_record[group][hardware]: + cell_value = get_cell_value(self._excel_workbook, self._record_name, row, + col) + # Check if the cell value is not none. If it is none, it cannot be converted to a number + # like float or integer + is_cell_value_valid = True if cell_value is not None else False + if 'crc' not in spec and 'time' not in spec and is_cell_value_valid: + temp_buffer.append(struct.pack(calibration_record[group][hardware][spec][0], cell_value)) + if calibration_record[group][hardware][spec][1] != cell_value: + is_value_different = True + calibration_record[group][hardware][spec][1] = cell_value - if is_value_different and 'time' in spec: - epoch_time = self.get_current_time_in_epoch() - dialin_record[group][hardware][spec][1] = epoch_time - temp_buffer.append(struct.pack(dialin_record[group][hardware][spec][0], epoch_time)) - date_time = self.get_current_date_time(epoch_time) - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, - date_time) - elif 'time' in spec and isinstance(cell_value, str) and is_cell_value_valid: - epoch_time = self.get_date_time_in_epoch(cell_value) - dialin_record[group][hardware][spec][1] = epoch_time + if is_value_different and 'time' in spec: + epoch_time = self.get_current_time_in_epoch() + calibration_record[group][hardware][spec][1] = epoch_time + temp_buffer.append(struct.pack(calibration_record[group][hardware][spec][0], epoch_time)) + date_time = self.get_current_date_time(epoch_time) + write_to_excel(self._excel_workbook, self._record_name, row, col, + date_time) + elif 'time' in spec and isinstance(cell_value, str) and is_cell_value_valid: + epoch_time = self.get_date_time_in_epoch(cell_value) + calibration_record[group][hardware][spec][1] = epoch_time - if is_value_different and 'crc' in spec: - # Convert the data in the temp buffer to bytes and calculate its crc. - data = b''.join(temp_buffer) - crc_value = self.crc_16(data) - # Clear the temp buffer for the next round of data - is_value_different = False - dialin_record[group][hardware][spec][1] = crc_value - self.excel_ops.write_to_excel(self._excel_workbook, self._record_name, row, col, - crc_value) - col += 2 - else: - col = 3 - for spec in dialin_record[group][hardware]: - cell_value = self.excel_ops.get_cell_value(self._excel_workbook, self._record_name, row, - col) - # Check if the cell value is not none. If it is none, it cannot be converted to a number - # like float or integer - is_cell_value_valid = True if cell_value is not None else False - if is_cell_value_valid: - dialin_record[group][hardware][1] = cell_value - temp_buffer.clear() - row += 1 - row += 1 + if is_value_different and 'crc' in spec: + # Convert the data in the temp buffer to bytes and calculate its crc. + data = b''.join(temp_buffer) + crc_value = self.crc_16(data) + # Clear the temp buffer for the next round of data + is_value_different = False + calibration_record[group][hardware][spec][1] = crc_value + write_to_excel(self._excel_workbook, self._record_name, row, col, + crc_value) + col += 2 + else: + col = 3 + for spec in calibration_record[group][hardware]: + cell_value = get_cell_value(self._excel_workbook, self._record_name, row, + col) + # Check if the cell value is not none. If it is none, it cannot be converted to a number + # like float or integer + is_cell_value_valid = True if cell_value is not None else False + if is_cell_value_valid: + calibration_record[group][hardware][1] = cell_value + temp_buffer.clear() + row += 1 + row += 1 - self.excel_ops.save_report(self._excel_workbook, self._workspace_dir, self._firmware_stack) + save_report(self._excel_workbook, self._workspace_dir, self._firmware_stack) + except Exception as e: + self.logger.error("Failed to load the excel record as a calibration record: {0}".format(e)) + def get_writing_to_excel_status(self): """ Publicly accessible function to get the reading status @return True if reading to firmware records to excel is done otherwise False """ + return self._is_writing_to_excel_done def get_reading_record_status(self): + """ + Gets the reading record status + @return: True if done, false otherwise + """ return self._is_read_done @@ -273,125 +293,130 @@ if not os.path.isdir(self._workspace_dir): # Create the directory and go to it os.mkdir(self._workspace_dir) + # TODO: Fix this os.chdir(self._workspace_dir) @staticmethod def get_current_time_in_epoch(): """ Returns the current date and time in epoch in integer format. This is a static method. - @return: data and time in epoch in integer format + @return: (int) data and time in epoch """ return int(datetime.datetime.now().timestamp()) @staticmethod - def get_current_date_time(epoch): + def get_current_date_time(epoch: int): """ Returns the current date and time from an epoch time + @param: (int) the epoch @return: data and time in string """ return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch)) @staticmethod - def get_date_time_in_epoch(data_time): + def get_date_time_in_epoch(data_time: str): """ Returns the date time in epoch - @return: data and time in epoch + @return: (str) data and time in epoch """ time_struct = time.strptime(data_time, '%Y-%m-%d %H:%M:%S') return int(time.mktime(time_struct)) - def process_received_record_from_fw(self, dialin_record, fw_raw_records_bytes, write_to_excel=True): + def process_received_record_from_fw(self, cal_record: dict, fw_raw_records_bytes: List[int], write_to_excel: bool = True): """ Handles processing the received record from firmware - @param dialin_record: dictionary that is being updated from firmware - @param fw_raw_records_bytes: list of the firmware record from firmware in bytes - @param write_to_excel: flag to whether write the latest data to the excel document (default is True) + @param cal_record: (dict) calibration record from firmware + @param fw_raw_records_bytes: (List[int]) list of the firmware record from firmware in bytes + @param write_to_excel: (bool) flag to whether write the latest data to the excel document (default is True) @return: none """ - raw_payload_temp_start_index = 0 - # Convert the concatenated raw data into a byte array since the struct library requires byte arrays. - fw_raw_records_bytes = bytearray(fw_raw_records_bytes) + try: + raw_payload_temp_start_index = 0 + # Convert the concatenated raw data into a byte array since the struct library requires byte arrays. + fw_raw_records_bytes = bytearray(fw_raw_records_bytes) - # Loop through the keys for the main calibration_record dictionary - # DG_Calibration : {pressure_sensors : { ppi : { gain: [' self._MIN_PAYLOAD_BYTES_SPACE: - current_payload_length += data_type_bytes - temp_buffer[self._PAYLOAD_TOTAL_MSG_INDEX] = struct.pack(' self._MIN_PAYLOAD_BYTES_SPACE: + current_payload_length += data_type_bytes + temp_buffer[self._PAYLOAD_TOTAL_MSG_INDEX] = struct.pack('