import copy import pprint import subprocess import sys import struct from dialin import HD, AbstractObserver sys.path.append("../../") from dialin.dg.dialysate_generator import DG from dialin.utils.nv_ops_utils import NVOpsUtils from time import sleep class Observer(AbstractObserver): def __init__(self, prop): self.received = False self.prop = prop def update(self, message): print("Message: {0}".format(message)) self.received = message.get(self.prop, False) def test_hd_calibration_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): observer = Observer("hd_calibration_record") hd.calibration_record.attach(observer) hd.calibration_record.cmd_request_hd_calibration_record() while not observer.received: sleep(0.2) # store the old record after reading it from fw record_old_formatted = pprint.pformat(hd.calibration_record.hd_calibration_record, indent=4) with open("/home/fw/projects/dialin/tests/peter/hd_cal_record.log", 'w') as f: f.write(record_old_formatted) hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["y_offset"][1] = 2.0 hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["cal_time"][1] = \ NVOpsUtils.get_current_time_in_epoch() hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.calibration_record.hd_calibration_record["accelerometer_sensor"]["accelerometer"]) hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["fourth_order"][1] = 4.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["third_order"][1] = 4.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["second_order"][1] = 0.0 hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["cal_time"][1] = \ NVOpsUtils.get_current_time_in_epoch() hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.calibration_record.hd_calibration_record["heparin_force_sensor"]["heparin_force"]) print(hd.calibration_record.hd_calibration_record) hd.calibration_record.cmd_set_hd_calibration_record(hd.calibration_record.hd_calibration_record) def test_hd_reset_calibration_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.calibration_record.setup_base_calibration_record() print(hd.calibration_record.hd_calibration_record) reset_record = NVOpsUtils.reset_fw_record(hd.calibration_record.hd_calibration_record) print(reset_record) hd.calibration_record.cmd_set_hd_calibration_record(reset_record) def test_hd_service_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): observer = Observer("hd_service_record") hd.service_record.attach(observer) hd.service_record.cmd_request_hd_service_record() while not observer.received: sleep(0.2) hd.service_record.cmd_set_hd_service_record(hd.service_record.hd_service_record) def test_hd_reset_record_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.calibration_record.cmd_reset_hd_calibration_record() print(hd.calibration_record.hd_calibration_record) hd.system_record.cmd_reset_hd_system_record() hd.service_record.cmd_reset_hd_service_record() def test_hd_system_record(): hd = HD(log_level="DEBUG") if hd.cmd_log_in_to_hd(resend=False): hd.ui.cmd_ui_request_hd_version() observer = Observer("hd_system_record") hd.system_record.attach(observer) hd.system_record.cmd_request_hd_system_record() while not observer.received: sleep(0.2) record_old_formatted = pprint.pformat(hd.system_record.hd_system_record, indent=4) with open("/home/fw/projects/dialin/tests/peter/hd_sys_record_old.log", 'w') as f: f.write(record_old_formatted) hd.system_record.hd_system_record['system_record']['top_level_pn'][1] = 'ASD-S123' hd.system_record.hd_system_record["system_record"]["top_level_sn"][1] = 'ASD-S1234' hd.system_record.hd_system_record["system_record"]["mfg_location"][1] = 9 hd.system_record.hd_system_record["system_record"]["crc"][1] = \ NVOpsUtils.get_group_record_crc(hd.system_record.hd_system_record["system_record"]) print(hd.system_record.hd_system_record) hd.system_record.cmd_set_hd_system_record(hd.system_record.hd_system_record) if __name__ == "__main__": #test_hd_reset_record_record() #test_hd_calibration_record() # test_hd_service_record() #test_hd_reset_system_record() test_hd_system_record() # test_crc() # test_hd_system_record()