Index: dialin/dg/valves.py =================================================================== diff -u -rd66e924d16e5b5f62570f57a483acd3f10c50dcc -rfb9052e39fb778d3d029eda4d6917a0a1fcb9cbd --- dialin/dg/valves.py (.../valves.py) (revision d66e924d16e5b5f62570f57a483acd3f10c50dcc) +++ dialin/dg/valves.py (.../valves.py) (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -123,7 +123,8 @@ self.valve_state_VPD.get("state", None) ] - def sort_by_id(self, observation): + @staticmethod + def sort_by_id(observation): """ Converts a published dictionary of valve state information to an ordered list of tuples. Index: dialin/protocols/CAN.py =================================================================== diff -u -r02e6bbf85f0699488317daa135d2530bc5b19507 -rfb9052e39fb778d3d029eda4d6917a0a1fcb9cbd --- dialin/protocols/CAN.py (.../CAN.py) (revision 02e6bbf85f0699488317daa135d2530bc5b19507) +++ dialin/protocols/CAN.py (.../CAN.py) (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -21,7 +21,7 @@ from time import sleep import sys import logging -import os +from ..utils.base import _Base class DenaliMessage: @@ -338,7 +338,7 @@ return None -class DenaliCanMessenger: +class DenaliCanMessenger(_Base): START_BYTE = DenaliMessage.START_BYTE DIALIN_MSG_RESP_TO = 0.1 # number of seconds to wait for a response to a sent command @@ -350,6 +350,8 @@ @return: DialityCanMessenger object """ + super().__init__(log_level=log_level) + self.bus = can.interfaces.socketcan.SocketcanBus(channel=can_interface) self.listener_buffer = can.BufferedReader() self.notifier = can.Notifier(self.bus, [self.listener_buffer]) @@ -363,30 +365,7 @@ self.response_channel_id = -1 self.run = False self.sync_response_dictionary = {} - self.logging_enabled = log_level is not None and log_level.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] - self.log_can_separate = log_level is not None and log_level.upper() in ["CAN_ONLY"] - self.log_can_file = "Dialin_CAN_Sent.log" - if os.path.exists(self.log_can_file): - os.remove(self.log_can_file) - if self.logging_enabled: - numeric_level = getattr(logging, log_level.upper(), logging.ERROR) - self.logger = logging.getLogger("Dialin") - self.logger.setLevel(numeric_level) - - fh = logging.FileHandler("Dialin.log") - fh.setLevel(numeric_level) - ch = logging.StreamHandler() - ch.setLevel(numeric_level) - formatter = logging.Formatter(fmt='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', - datefmt='%m-%d-%Y:%H:%M:%S') - fh.setFormatter(formatter) - ch.setFormatter(formatter) - - self.logger.addHandler(fh) - self.logger.addHandler(ch) - - if self.bus is not None: self.serial_listener_thread = threading.Thread(target=self.listener, daemon=True) @@ -563,7 +542,7 @@ is_extended_id=False) self.print_and_log(packet, log_level=logging.CRITICAL) - if self.log_can_separate: + if self.log_level == "CAN_ONLY": self.do_log_can(packet) self.bus.send(packet, 0) # 0.1) @@ -601,30 +580,3 @@ with open(self.log_can_file, 'a') as f: f.write("{0}\n".format(packet)) - - def print_and_log(self, message, log_level=logging.DEBUG): - """ - Prints a message if its severity is >= the current log level. - Also logs the message. - - @param message: The message to print and log - @param log_level: The logging level, indicates the severity of the message - - @return:: None - """ - if not self.logging_enabled: - return - - if self.logger.getEffectiveLevel() <= log_level: - print(message) - - if log_level == logging.DEBUG: - self.logger.debug(message) - elif log_level == logging.INFO: - self.logger.info(message) - elif log_level == logging.WARNING: - self.logger.warning(message) - elif log_level == logging.ERROR: - self.logger.error(message) - elif log_level == logging.CRITICAL: - self.logger.critical(message) Index: dialin/utils/base.py =================================================================== diff -u -rca7f6eff65d988c19ccce8cde5efd1ef537e2095 -rfb9052e39fb778d3d029eda4d6917a0a1fcb9cbd --- dialin/utils/base.py (.../base.py) (revision ca7f6eff65d988c19ccce8cde5efd1ef537e2095) +++ dialin/utils/base.py (.../base.py) (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -1,5 +1,7 @@ from abc import ABC, abstractmethod from datetime import datetime +import os +import logging class AbstractObserver(ABC): @@ -17,15 +19,153 @@ pass -# abstract base class requires all abstract methods are overridden by children classes -class _AbstractSubSystem(ABC): +class _Base(ABC): + def __init__(self, log_level=None, log_filepath="Dialin.log"): + """ + + @param log_level: (str) or (None) if not set, contains the logging level + @param log_filepath: (str) the log filepath + """ + + self.log_level = log_level + self.logging_enabled = self.log_level_enables_logging(log_level) + self.log_filepath = self.get_available_log_path(log_filepath) + self.logger = None + self.configure_logging(self.log_filepath) + + def log_level_enables_logging(self, log_level:str): + """ + Check if the log level string is a valid logging level + + @param log_level: (str) the logging level + @return: True if the log level is valid, False otherwise + """ + return log_level is not None and log_level.upper() in self.get_logging_levels() + + def get_logging_levels(self): + """ + Gets all possible logging levels + + @return: All possible logging levels + """ + return ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "CAN_ONLY"] + + def request_new_log_path(self, path:str): + """ + Clears the logger, gets a new log filepath + and configures the logger to use it. + + @param path: the requested log filepath + @return: The new log filepath + """ + self.clear_logger() + self.log_filepath = self.get_available_log_path(path) + self.configure_logging(self.log_filepath) + return self.log_filepath + + def clear_logger(self): + """ + If the logger has been created, clear its handlers + + @return: True if successful, False otherwise + """ + + # remove existing handlers + if self.logger is not None: + for handler in self.logger.handlers: + self.logger.removeHandler(handler) + return True + return False + + def configure_logging(self, log_path): + """ + Sets up the logger to use the provided log path + + @param log_path: Path to the log file + @return: True if success, False otherwise + """ + if self.logger is not None: + self.print_and_log("Logger already configured. Please clear the logger first.") + return False + + # configure the logging + if self.logging_enabled: + numeric_level = getattr(logging, self.log_level.upper(), logging.ERROR) + self.logger = logging.getLogger("Dialin") + self.logger.setLevel(numeric_level) + + fh = logging.FileHandler(log_path) + fh.setLevel(numeric_level) + ch = logging.StreamHandler() + ch.setLevel(numeric_level) + formatter = logging.Formatter(fmt='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', + datefmt='%m-%d-%Y:%H:%M:%S') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + self.logger.addHandler(fh) + self.logger.addHandler(ch) + return True + return False + + def print_and_log(self, message, log_level=logging.DEBUG): + """ + Prints a message if its severity is >= the current log level. + Also logs the message. + + @param message: The message to print and log + @param log_level: The logging level, indicates the severity of the message + + @return:: None + """ + if not self.logging_enabled: + return + + if self.logger.getEffectiveLevel() <= log_level: + print(message) + + if log_level == logging.DEBUG: + self.logger.debug(message) + elif log_level == logging.INFO: + self.logger.info(message) + elif log_level == logging.WARNING: + self.logger.warning(message) + elif log_level == logging.ERROR: + self.logger.error(message) + elif log_level == logging.CRITICAL: + self.logger.critical(message) + + + def get_available_log_path(self, filepath:str): + """ + Gets an available log path from filepath + appends integer to the end if file already exists. + + @param filepath: The full path to the file + @return: (str) The available log filepath + """ + + if not os.path.exists(filepath): + return filepath + + path, ext = os.path.splitext(filepath) + i = 0 + while os.path.exists("{0}{1}{2}".format(path, i, ext)): + i += 1 + return "{0}{1}{2}".format(path, i, ext) + + +class _AbstractSubSystem(_Base): + @abstractmethod - def __init__(self): + def __init__(self, log_level=None, log_filepath="Dialin.log"): """ Initialization function for the sub system + # The abstract base class requires all abstract methods are overridden by children classes """ + super().__init__(log_level=log_level, log_filepath=log_filepath) self._observers = [] self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" pass @@ -67,6 +207,7 @@ return None result["datetime"] = datetime.now() + result["subsystem"] = self.__class__.__name__ for key in keys: result[key] = getattr(self, key) Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/can_xmit_test.py'. Fisheye: No comparison available. Pass `N' to diff? Index: tests/coverage/run_coverage.py =================================================================== diff -u --- tests/coverage/run_coverage.py (revision 0) +++ tests/coverage/run_coverage.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1 @@ \ No newline at end of file Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/dg_test_script.py'. Fisheye: No comparison available. Pass `N' to diff? Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/dg_valves_test.py'. Fisheye: No comparison available. Pass `N' to diff? Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/dg_valves_test_observer.py'. Fisheye: No comparison available. Pass `N' to diff? Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/hd_test_script.py'. Fisheye: No comparison available. Pass `N' to diff? Index: tests/test_can_xmit.py =================================================================== diff -u --- tests/test_can_xmit.py (revision 0) +++ tests/test_can_xmit.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,39 @@ +########################################################################### +# +# Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file HD_TestScript.py +# +# @date 19-Nov-2019 +# @author S. Nash +# +# @brief This is an example test script for the HD. +# +############################################################################ + +import sys +sys.path.append("..") +from dialin.hd.hemodialysis_device import HD +from time import sleep + +if __name__ == "__main__": + # create an HD object called hd + hd = HD() + + # log into HD + if not hd.cmd_log_in_to_hd(): + sys.exit() + + # reset sequence counter + x = 0 + # send large CAN test messages (6 frames each) every 100ms + while True: + sleep(0.1) + hd.pressure_occlusion.test_can_message(x) + x += 6 + + + Index: tests/test_dg.py =================================================================== diff -u --- tests/test_dg.py (revision 0) +++ tests/test_dg.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,101 @@ +########################################################################### +# +# Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file hd_tests_script.py +# +# @date 7-Apr-2019 +# @author P. Lucia +# +# @brief This is an example test script for the HD. +# +############################################################################ +import sys +sys.path.append("..") +from dialin.protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels +from dialin.dg.dialysate_generator import DG +from time import sleep +import unittest + + +class Test(unittest.TestCase): + + # @unittest.skip("Skipping dg_start.") + def test_dg_start(self): + dg = DG() + sleep(2) + success = dg.fill("start") + self.assertTrue(success) + + # @unittest.skip("Skipping dg_start_stop.") + def test_dg_start_stop(self): + dg = DG() + sleep(2) + success = dg.fill("start") + self.assertTrue(success) + sleep(2) + success = dg.fill('stop') + self.assertTrue(success) + + @unittest.skip("Skipping test_dg_login") + def test_dg_login(self): + raise NotImplementedError + """ + # create an DG object called dg + dg = DG() + + # wait 2 seconds and then login to DG as a tester + sleep(2) + dg.cmd_log_in_to_dg() + """ + + + @unittest.skip("Skipping test_dg_3") + def test_dg_3(self): + raise NotImplementedError + + """ + + dialin_messenger = DenaliCanMessenger() + + # Building response message + response_msg = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_hd_ch_id, + message_id=1000, + payload=[1]) + + # Building Publication message + publication_msg = DenaliMessage.build_message(channel_id=DenaliChannels.dg_sync_broadcast_ch_id, + message_id=0x7100, + payload=[1, 2, 3, 4, 5]) + + print("") + print("o -> response to fill command") + print(". -> publication message") + print("") + + + def respond_to_command(message): + dialin_messenger.send(response_msg) + print("o", end='', flush=True) + + + # Register response command for the DG + dialin_messenger.register_receiving_publication_function(channel_id=DenaliChannels.ui_to_hd_ch_id, + message_id=1000, + function=respond_to_command) + + dialin_messenger.start() + + while True: + dialin_messenger.send(publication_msg) + print(".", end='', flush=True) + sleep(1) + + """ + + +if __name__ == '__main__': + unittest.main(verbosity=2) Index: tests/test_dg_valves.py =================================================================== diff -u --- tests/test_dg_valves.py (revision 0) +++ tests/test_dg_valves.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,58 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file dg_valves_test.py +# +# @date 20-May-2020 +# @author P. Montazemi +# +# @brief This development test script exercises the DG valves states in \n +# single and batch modes +# +############################################################################ + +import sys +sys.path.append("..") +from dialin.dg.dialysate_generator import DG +from time import sleep + +if __name__ == "__main__": + # Create an instance of the DG Class called dg + dg = DG() + sleep(2) + + # Log in to DG as tester + if dg.cmd_log_in_to_dg() == 0: + print("DG login failed.") + exit(1) + sleep(1) + + # Create log file + with open("DG_valves_test.log", "w") as f: + # Collect DG valves states + while True: + sleep(1) + valvesStates = "DGValves.VBF," + str(dg.valves.valve_state_VRF.get("state", None)) + \ + ",DGValves.VRI," + str(dg.valves.valve_state_VRI.get("state", None)) + \ + ",DGValves.VRD," + str(dg.valves.valve_state_VRD.get("state", None)) + \ + ",DGValves.VRO," + str(dg.valves.valve_state_VRO.get("state", None)) + \ + ",DGValves.VPO," + str(dg.valves.valve_state_VPO.get("state", None)) + \ + ",DGValves.VBF," + str(dg.valves.valve_state_VBF.get("state", None)) + \ + ",DGValves.VRC," + str(dg.valves.valve_state_VRC.get("state", None)) + \ + ",DGValves.VDR," + str(dg.valves.valve_state_VDR.get("state", None)) + \ + ",DGValves.VPI," + str(dg.valves.valve_state_VPI.get("state", None)) + \ + ",DGValves.VSP," + str(dg.valves.valve_state_VSP.get("state", None)) + \ + ",DGValves.VR1," + str(dg.valves.valve_state_VR1.get("state", None)) + \ + ",DGValves.VR2," + str(dg.valves.valve_state_VR2.get("state", None)) + \ + ",DGValves.VPD," + str(dg.valves.valve_state_VPD.get("state", None)) + + # Log data + f.write(valvesStates) + f.write("\n") + + # Print to console + print("DG Valves States: "+valvesStates) Index: tests/test_dg_valves_observer.py =================================================================== diff -u --- tests/test_dg_valves_observer.py (revision 0) +++ tests/test_dg_valves_observer.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,73 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file dg_valves_test1.py +# +# @date 01-June-2020 +# @author P. Lucia +# +# @brief This development test script shows how to use the AbstractObserver class +# +############################################################################ + +import sys +sys.path.append("..") +from dialin.dg.dialysate_generator import DG +from datetime import datetime, timedelta +from dialin.utils.base import AbstractObserver +import time + + +class Observer(AbstractObserver): + def __init__(self, max_observations=20): + self.ready = False + self.timedelta_in_ms = 0 + self.timestamps = [] + self.max_observations = max_observations + self.observations = 0 + self.run = True + self.deltas = [] + + def update(self, result): + self.observations += 1 + + if result.get('datetime', False): + self.timestamps.append(datetime.strptime(result["datetime"], "%m.%d.%Y_%I.%M.%S.%f")) + + max_publish_rate = 500 + tolerance = 0.1 + max_publish_rate_after_tolerance = max_publish_rate * (1+tolerance) + if len(self.timestamps) > 1: + delta = self.timestamps[-1] - self.timestamps[-2] + self.deltas.append(delta.microseconds*10**-3) + max_delta = timedelta(seconds = max_publish_rate_after_tolerance*10**(-3)) + print(delta) + assert(delta <= max_delta) + + if self.observations >= self.max_observations: + self.run = False + + +def test_monitor_dg_valve_state_rate_1(): + """ + Test if DG monitors valve states within every 500 ms + """ + dg = DG() + + valves_observer = Observer() + dg.valves.attach(valves_observer) + + while valves_observer.run: + time.sleep(0.010) + + # print("DG Valve State Updates (ms):") + # for ms in valves_observer.deltas: + # print(ms) + + +if __name__ == "__main__": + test_monitor_dg_valve_state_rate_1() Index: tests/test_hd.py =================================================================== diff -u --- tests/test_hd.py (revision 0) +++ tests/test_hd.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,85 @@ +########################################################################### +# +# Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file hd_tests_script.py +# +# @date 7-Apr-2019 +# @author P. Lucia +# +# @brief This is an example test script for the HD. +# +############################################################################ +import sys +sys.path.append("..") +from dialin.hd.constants import RESET, NO_RESET +from dialin.hd.hemodialysis_device import HD +from time import sleep +import unittest + + +class Test(unittest.TestCase): + + # @unittest.skip("Skipping test_hd_1.") + def test_hd_1(self): + hd = HD() + sleep(2) + + if hd.basics.cmd_log_in_to_hd() == 0: + exit(1) + + hd.bloodflow.cmd_blood_flow_broadcast_interval_override(RESET, 0) + + sleep(2) + print("Blood Flow Target = {}".format(hd.bloodflow.target_blood_flow_rate)) + print("Blood Pump Current = {}".format(hd.bloodflow.measured_blood_pump_mc_current)) + sleep(5) + print("Blood Pump Current = {}".format(hd.bloodflow.measured_blood_pump_mc_current)) + + hd.bloodflow.cmd_blood_pump_measured_current_override(NO_RESET, 140) + + sleep(1) + print("Blood Pump Current= {}".format(hd.bloodflow.measured_blood_pump_mc_current)) + sleep(5) + hd.bloodflow.cmd_blood_pump_measured_current_override(RESET, 0) + + i = 0 + while True: + sleep(0.5) + print("Measured Flow = {} mL/min".format(hd.bloodflow.measured_blood_flow_rate)) + if i > 0 and i % 60 == 0: + resp = input("Press 'Enter' to continue or 'q' to quit: ") + if resp.lower() == "q": + break + i += 1 + tgtRate = 0 + hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) + + i = 0 + while True: + if hd.bloodflow.target_blood_flow_rate == 0: + if tgtRate != 0: + hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) + tgtRate = 0 + else: + if tgtRate == 0: + hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 200) + tgtRate = hd.bloodflow.target_blood_flow_rate + if i > 0 and i % 60 == 0: + resp = input("Press 'Enter' to continue or 'q' to quit: ") + if resp.lower() == "q": + break + i += 1 + + # hd.bloodflow.cmd_blood_flow_broadcast_interval_override(RESET,0) + + # FIXME: Update passing criteria + self.assertTrue(True) + + +if __name__ == '__main__': + unittest.main(verbosity=2) + Index: tests/test_uf.py =================================================================== diff -u --- tests/test_uf.py (revision 0) +++ tests/test_uf.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,102 @@ +########################################################################### +# +# Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file HD_UFTest.py +# +# @date 28-Jan-2020 +# @author S. Nash +# +# @brief This script simulates 300 grams of load cell increase over 15 minutes (20 mL/min). +# +############################################################################ + +import sys +sys.path.append("..") +from dialin.hd.hemodialysis_device import HD +from dialin.dg.dialysate_generator import DG +from time import sleep + +if __name__ == "__main__": + # create an HD object called hd + hd = HD() + # create a DG object called dg + dg = DG() + sleep(2) + + # log in to HD and DG as tester +# if hd.cmd_log_in_to_hd() == 0: +# exit(1) +# if dg.cmd_log_in_to_dg() == 0: +# exit(1) +# sleep(1) + + # create log file + with open("UF_test.log", "w") as f: + + # collect UF related data from HD and DG + while True: + sleep(1) + modes = "HD.m, " + '{:2d}'.format(hd.hd_operation_mode) + \ + ", DG.m, " + '{:2d}'.format(dg.dg_operation_mode) + \ + ", DG.s, " + '{:2d}'.format(dg.dg_operation_sub_mode) + loadCells = ", A1, " + '{:8.1f}'.format(dg.load_cells.load_cell_A1) + \ + ", B1, " + '{:8.1f}'.format(dg.load_cells.load_cell_B1) + \ + ", A2, " + '{:8.1f}'.format(dg.load_cells.load_cell_A2) + \ + ", B2, " + '{:8.1f}'.format(dg.load_cells.load_cell_B2) + ultraFilt = ", RfUF, " + '{:9.2f}'.format(hd.dialysate_outlet_flow.reference_dialysate_outlet_uf_volume) + \ + ", MsUF, " + '{:9.2f}'.format(hd.dialysate_outlet_flow.measured_dialysate_outlet_uf_volume) + \ + ", Act.Res, " + '{:1d}'.format(dg.reservoirs.active_reservoir) + valves = ", Vlv, " + '{:4X}'.format(dg.valves.valve_states_all) + pumpSetPts = ", B.s, " + '{:4d}'.format(hd.bloodflow.target_blood_flow_rate) + \ + ", DI.s, " + '{:4d}'.format(hd.dialysate_inlet_flow.target_dialysate_inlet_flow_rate) + \ + ", RO.s, " + '{:4d}'.format(dg.ro_pump.target_pressure_psi) + \ + ", DR.s, " + '{:5d}'.format(dg.drain_pump.target_drain_pump_speed_RPM) + pumpMeasSpds = ", B.m, " + '{:7.1f}'.format(hd.bloodflow.measured_blood_pump_speed) + \ + ", B.r, " + '{:6.1f}'.format(hd.bloodflow.measured_blood_pump_rotor_speed) + \ + ", B.f, " + '{:7.1f}'.format(hd.bloodflow.measured_blood_flow_rate) + \ + ", DI.m, " + '{:7.1f}'.format(hd.dialysate_inlet_flow.measured_dialysate_inlet_pump_speed) + \ + ", DI.r, " + '{:7.1f}'.format(hd.dialysate_inlet_flow.measured_dialysate_inlet_pump_rotor_speed) + \ + ", DI.f, " + '{:7.1f}'.format(hd.dialysate_inlet_flow.measured_dialysate_inlet_flow_rate) + \ + ", DO.m, " + '{:7.1f}'.format(hd.dialysate_outlet_flow.measured_dialysate_outlet_pump_speed) + \ + ", DO.r, " + '{:6.1f}'.format(hd.dialysate_outlet_flow.measured_dialysate_outlet_pump_rotor_speed) + \ + ", RO.f, " + '{:7.1f}'.format(dg.ro_pump.measured_flow_rate_lpm) + pumpPWMs = ", B.w, "+'{:6.1f}'.format(hd.bloodflow.pwm_duty_cycle_pct) + \ + ", DI.w, " + '{:6.1f}'.format(hd.dialysate_inlet_flow.pwm_duty_cycle_pct) + \ + ", DO.w, " + '{:6.1f}'.format(hd.dialysate_outlet_flow.pwm_duty_cycle_pct) + \ + ", RO.w, " + '{:6.1f}'.format(dg.ro_pump.pwm_duty_cycle_pct) + \ + ", DR.w, " + '{:4d}'.format(dg.drain_pump.dac_value) + dgPres = ", ROi, " + '{:9.2f}'.format(dg.pressures.ro_pump_inlet_pressure) + \ + ", ROo, " + '{:9.2f}'.format(dg.pressures.ro_pump_outlet_pressure) + \ + ", DRi, " + '{:9.2f}'.format(dg.pressures.drain_pump_inlet_pressure) + \ + ", DRo, " + '{:9.2f}'.format(dg.pressures.drain_pump_outlet_pressure) + alarms = ", AL.s, " + hd.alarms.get_current_alarms_state() + \ + ", AL.t, " + '{:4d}'.format(hd.alarms.alarm_top) + + # log data + f.write(modes) + f.write(loadCells) + f.write(ultraFilt) + f.write(valves) + f.write(pumpSetPts) + f.write(pumpMeasSpds) + f.write(pumpPWMs) + f.write(dgPres) + f.write(alarms) + f.write("\n") + + # print to console + print(" Modes: "+modes) + print(" Load Cells: "+loadCells) + print("Ultrafiltration: "+ultraFilt) + print(" DG Valves: "+valves) + print("Pump Set Points: "+pumpSetPts) + print(" Pump Speeds: "+pumpMeasSpds) + print(" Pump PWMs/DACs: "+pumpPWMs) + print(" DG Pressures: "+dgPres) + print(" Alarms: "+alarms) + + Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/uf_test.py'. Fisheye: No comparison available. Pass `N' to diff? Fisheye: Tag fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd refers to a dead (removed) revision in file `tests/unit_tests.py'. Fisheye: No comparison available. Pass `N' to diff? Index: tests/unit_tests/test_dg_valves.py =================================================================== diff -u --- tests/unit_tests/test_dg_valves.py (revision 0) +++ tests/unit_tests/test_dg_valves.py (revision fb9052e39fb778d3d029eda4d6917a0a1fcb9cbd) @@ -0,0 +1,60 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file test_imports.py +# +# @date 29-Apr-2020 +# @author P. Lucia +# +# @brief Tests imports of all available modules +# +############################################################################ +import unittest +import sys +sys.path.append("../..") +from datetime import datetime +from dialin.dg.dialysate_generator import DG + + +class DGValves(unittest.TestCase): + + # @unittest.skip("Skipping test_imports") + def test_dg_valves_conversion(self): + dg = DG() + observation = {'datetime': datetime(2020, 7, 13, 10, 43, 27, 433357), + 'valve_state_VBF': {'id': 5, 'state': True}, + 'valve_state_VDR': {'id': 7, 'state': True}, + 'valve_state_VPD': {'id': 12, 'state': True}, + 'valve_state_VPI': {'id': 8, 'state': True}, + 'valve_state_VPO': {'id': 4, 'state': True}, + 'valve_state_VR1': {'id': 10, 'state': True}, + 'valve_state_VR2': {'id': 11, 'state': True}, + 'valve_state_VRC': {'id': 6, 'state': True}, + 'valve_state_VRD': {'id': 2, 'state': True}, + 'valve_state_VRF': {'id': 0, 'state': True}, + 'valve_state_VRI': {'id': 1, 'state': True}, + 'valve_state_VRO': {'id': 3, 'state': True}, + 'valve_state_VSP': {'id': 9, 'state': True}, + 'valve_states_all': 8191} + + assert(dg.valves.sort_by_id(observation) == [('valve_state_VRF', 0, True), + ('valve_state_VRI', 1, True), + ('valve_state_VRD', 2, True), + ('valve_state_VRO', 3, True), + ('valve_state_VPO', 4, True), + ('valve_state_VBF', 5, True), + ('valve_state_VRC', 6, True), + ('valve_state_VDR', 7, True), + ('valve_state_VPI', 8, True), + ('valve_state_VSP', 9, True), + ('valve_state_VR1', 10, True), + ('valve_state_VR2', 11, True), + ('valve_state_VPD', 12, True)]) + + +if __name__ == '__main__': + unittest.main(verbosity=2)