########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from DialIn.CoreCANProtocol import (DenaliCanMessenger) from time import sleep from Alarms import HDAlarms from Buttons import HDButtons from UI import HDUI from Watchdog import HDWatchdog from RTC import HDRTC from BloodFlow import HDBloodFlow from DialysateInletFlow import HDDialysateInletFlow from DialysateOutletFlow import HDDialysateOutletFlow from Treatment import HDTreatment from Basics import HDBasics from PressureOcclusion import HDPressureOcclusion from utils import integer_to_bytearray, RESET, NO_RESET from DialIn.CoreCANProtocol import (DenaliMessage, DenaliChannels) class HD: def __init__(self, can_interface_name="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can_interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface_name) self.can_interface.start() # Create command groups self.basics = HDBasics(self.can_interface) self.alarms = HDAlarms(self.can_interface) self.buttons = HDButtons(self.can_interface) self.ui = HDUI(self.can_interface) self.rtc = HDRTC(self.can_interface) self.watchdog = HDWatchdog(self.can_interface) self.bloodflow = HDBloodFlow(self.can_interface) self.dialysate_inlet_flow = HDDialysateInletFlow(self.can_interface) self.dialysate_outlet_flow = HDDialysateOutletFlow(self.can_interface) self.treatment = HDTreatment(self.can_interface) self.pressure_occlusion = HDPressureOcclusion(self.can_interface) def test_can_message(self, seq): """ Sends a test can message \param seq: the starting integer of the sequence """ zero = integer_to_bytearray(0) seq1 = integer_to_bytearray(seq) seq2 = integer_to_bytearray(seq + 1) seq3 = integer_to_bytearray(seq + 2) seq4 = integer_to_bytearray(seq + 3) seq5 = integer_to_bytearray(seq + 4) payload = seq1 + zero + seq2 + zero + seq3 + zero + seq4 + zero + seq5 message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=6, payload=payload) self.can_interface.send(message, 0) if __name__ == "__main__": # create an HD object called hd hd = HD() while True: sleep(1) print(hd.alarms.alarm_states) # wait 2 seconds and then login to HD as a tester sleep(2) hd.Basics.cmd_log_in_to_hd() # FIXME: Delete commented code: # hd.rtc.cmd_set_rtc_time_and_date(0, 2, 1, 5, 1, 2020) while True: print(hd.RTC.RTCEpoch) sleep(1) # FIXME: This should either be deleted or moved to a file: """" hd.bloodflow.cmd_blood_pump_measured_current_override(NO_RESET,149) totalVolumeInMl = 2400 rxTimeInMins = 30 flowRateMlmin = 100 sleep(2) resp = hd.DialOut.set_uf_rx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(2) rxTimeInMins = 60 resp = hd.DialOut.set_uf_rx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(5) hd.DialOut.set_uf_state(DialOutStates.RUN) state_run = hd.DialOut.DialOutBroadcast['state'] sleep(2) print("After RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 20 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 40 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) hd.DialOut.set_uf_state(DialOutStates.PAUSE) sleep(2) print("After PAUSE: {}".format(hd.DialOut.DialOutBroadcast)) state_pause = hd.DialOut.DialOutBroadcast['state'] sleep(10) hd.DialOut.set_uf_state(DialOutStates.STOP) sleep(2) print("---> After STOP:{} <---".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.set_uf_state(DialOutStates.RUN) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After RUN again: {}".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.set_uf_state(DialOutStates.STOP) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After STOP: {}".format(hd.DialOut.DialOutBroadcast)) sleep(5) hd.DialOut.plot_broadcast_signals() exit() tgtRate = 0 hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) while True: if hd.bloodflow.target_blood_flow_rate == 0: if tgtRate != 0: hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 200) tgtRate = hd.bloodflow.target_blood_flow_rate sleep(1) print(hd.bloodflow.measured_blood_flow_rate) # hd.bloodflow.cmd_blood_flow_broadcast_interval_override(RESET,0) """