########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from DialIn.CoreCANProtocol import (DenaliCanMessenger) from time import sleep from .Alarms import HDAlarms from .Buttons import HDButtons from .UI import HDUI from .Watchdog import HDWatchdog from .RTC import HDRTC from .BloodFlow import HDBloodFlow from .DialysateInletFlow import HDDialysateInletFlow from .DialysateOutletFlow import HDDialysateOutletFlow from .Treatment import HDTreatment from .Basics import HDBasics from .PressureOcclusion import HDPressureOcclusion class HD: # for reset param in override commands NO_RESET = 0 RESET = 1 def __init__(self, can_interface_name="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can_interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface_name) self.can_interface.start() # Create command groups self.Basics = HDBasics(self.can_interface) self.Alarms = HDAlarms(self.can_interface) self.Buttons = HDButtons(self.can_interface) self.UI = HDUI(self.can_interface) self.RTC = HDRTC(self.can_interface) self.Watchdog = HDWatchdog(self.can_interface) self.BloodFlow = HDBloodFlow(self.can_interface) self.DialysateInletFlow = HDDialysateInletFlow(self.can_interface) self.DialysateOutletFlow = HDDialysateOutletFlow(self.can_interface) self.Treatment = HDTreatment(self.can_interface) self.Pressure_Occlusion = HDPressureOcclusion(self.can_interface) if __name__ == "__main__": # create an HD object called hd hd = HD() while True: sleep(1) print(hd.Alarms.alarmStates) # wait 2 seconds and then login to HD as a tester sleep(2) hd.Basics.cmd_log_in_to_hd() # FIXME: Delete commented code: # hd.RTC.CmdSetRTCTimeAndDate(0, 2, 1, 5, 1, 2020) while True: print(hd.RTC.RTCEpoch) sleep(1) # FIXME: This should either be deleted or moved to a file: """" hd.BloodFlow.CmdBloodPumpMeasuredCurrentOverride(hd.NO_RESET,149) totalVolumeInMl = 2400 rxTimeInMins = 30 flowRateMlmin = 100 sleep(2) resp = hd.DialOut.setUFRx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(2) rxTimeInMins = 60 resp = hd.DialOut.setUFRx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(5) hd.DialOut.setUFState(DialOutStates.RUN) state_run = hd.DialOut.DialOutBroadcast['state'] sleep(2) print("After RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 20 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 40 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) hd.DialOut.setUFState(DialOutStates.PAUSE) sleep(2) print("After PAUSE: {}".format(hd.DialOut.DialOutBroadcast)) state_pause = hd.DialOut.DialOutBroadcast['state'] sleep(10) hd.DialOut.setUFState(DialOutStates.STOP) sleep(2) print("---> After STOP:{} <---".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.RUN) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After RUN again: {}".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.STOP) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After STOP: {}".format(hd.DialOut.DialOutBroadcast)) sleep(5) hd.DialOut.plotBroadCastSignals() exit() tgtRate = 0 hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) while True: if hd.BloodFlow.TargetBloodFlowRate == 0: if tgtRate != 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) tgtRate = hd.BloodFlow.TargetBloodFlowRate sleep(1) print(hd.BloodFlow.MeasuredBloodFlowRate) # hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) """