########################################################################### # # Copyright (c) 2021-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_hd_records.py # # @author (last) Dara Navaei # @date (last) 18-Jun-2022 # @author (original) Dara Navaei # @date (original) 20-Jul-2021 # ############################################################################ import copy import pprint import subprocess import sys import struct from dialin import HD, AbstractObserver sys.path.append("../../") from dialin.dg.dialysate_generator import DG from dialin.utils.nv_ops_utils import NVOpsUtils from time import sleep class Observer(AbstractObserver): def __init__(self, prop): self.received = False self.prop = prop def update(self, message): print("Message: {0}".format(message)) self.received = message.get(self.prop, False) def test_hd_calibration_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): observer = Observer("hd_calibration_record") hd.calibration_record.attach(observer) hd.calibration_record.cmd_request_hd_calibration_record() while not observer.received: sleep(0.2) #print(hd.calibration_record.hd_calibration_record) """ # store the old record after reading it from fw record_old_formatted = pprint.pformat(hd.calibration_record.hd_calibration_record, indent=4) with open("/home/fw/projects/dialin/tests/peter/hd_cal_record.log", 'w') as f: f.write(record_old_formatted) """ hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["y_offset"][1] = 2.0 hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["cal_time"][1] = \ NVOpsUtils.get_current_time_in_epoch() hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]) hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["fourth_order"][1] = 4.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["third_order"][1] = 4.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["second_order"][1] = 0.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["cal_time"][1] = \ NVOpsUtils.get_current_time_in_epoch() hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]) print(hd.calibration_record.hd_calibration_record) hd.calibration_record.cmd_set_hd_calibration_record(hd.calibration_record.hd_calibration_record) def test_hd_reset_calibration_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.calibration_record.setup_base_calibration_record() print(hd.calibration_record.hd_calibration_record) reset_record = NVOpsUtils.reset_fw_record(hd.calibration_record.hd_calibration_record) print(reset_record) hd.calibration_record.cmd_set_hd_calibration_record(reset_record) def test_hd_service_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): observer = Observer("hd_service_record") hd.service_record.attach(observer) hd.service_record.cmd_request_hd_service_record() while not observer.received: sleep(0.2) hd.service_record.cmd_set_hd_service_record(hd.service_record.hd_service_record) def test_hd_reset_record_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.calibration_record.cmd_reset_hd_calibration_record() sleep(0.2) #print(hd.calibration_record.hd_calibration_record) #hd.system_record.cmd_reset_hd_system_record() hd.service_record.cmd_reset_hd_service_record() #print(hd.service_record.hd_service_record) def test_hd_system_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.ui.cmd_ui_request_hd_version() observer = Observer("hd_system_record") hd.system_record.attach(observer) hd.system_record.cmd_request_hd_system_record() while not observer.received: sleep(0.2) record_old_formatted = pprint.pformat(hd.system_record.hd_system_record, indent=4) with open("/home/fw/projects/dialin/tests/peter/hd_sys_record_old.log", 'w') as f: f.write(record_old_formatted) hd.system_record.hd_system_record['system_record']['top_level_pn'][1] = 'ASD-S123' hd.system_record.hd_system_record["system_record"]["top_level_sn"][1] = 'HD_Device_00000' hd.system_record.hd_system_record["system_record"]["mfg_location"][1] = 9 hd.system_record.hd_system_record["system_record"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.system_record.hd_system_record["system_record"]) print(hd.system_record.hd_system_record) hd.system_record.cmd_set_hd_system_record(hd.system_record.hd_system_record) def test_hd_sw_config_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(): #print(dg.cmd_ui_request_dg_version()) #dg.sw_configs.cmd_reset_dg_sw_config_record() #hd.sw_configs.cmd_reset_hd_sw_config_record() #sleep(2) hd.sw_configs.cmd_get_hd_sw_config_record() #hd.sw_configs.cmd_update_hd_sw_config_record('/home/fw/projects/HD_NV_Records/2022-02-08-HD-SW-CONFIGS-Record.xlsx') #dg.sw_configs.cmd_set_dg_sw_config_record('/home/fw/DG_NV_Records/2022-01-22-SW-CONFIGS-Record.xlsx') """ observer = Observer("dg_sw_config_record") dg.sw_configs.attach(observer) while not observer.received: sleep(0.2) """ #dg.sw_configs.dg_sw_config_record['sw_configs'][DGSWConfigs.SW_CONFIG_DISABLE_HEATERS_MONITOR.name][1] = 1 #print(dg.sw_configs.dg_sw_config_record) #dg.sw_configs.cmd_set_dg_sw_config_record(dg.sw_configs.dg_sw_config_record) if __name__ == "__main__": # test_hd_reset_record_record() #test_hd_sw_config_record() #test_hd_calibration_record() #test_hd_service_record() #test_hd_reset_system_record() #test_hd_system_record() # test_crc() test_hd_system_record()