########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from time import sleep from alarms import HDAlarms from buttons import HDButtons from ui import HDUI from watchdog import HDWatchdog from rtc import HDRTC from blood_flow import HDBloodFlow from dialysate_inlet_flow import HDDialysateInletFlow from dialysate_outlet_flow import HDDialysateOutletFlow from treatment import HDTreatment from basics import HDBasics from pressure_occlusion import HDPressureOcclusion from utils.conversions import integer_to_bytearray from constants import RESET, NO_RESET from protocols.CAN import (DenaliMessage, DenaliCanMessenger, DenaliChannels) class HD: def __init__(self, can_interface_name="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can_interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can_interface_name) self.can_interface.start() # Create command groups self.basics = HDBasics(self.can_interface) self.alarms = HDAlarms(self.can_interface) self.buttons = HDButtons(self.can_interface) self.ui = HDUI(self.can_interface) self.rtc = HDRTC(self.can_interface) self.watchdog = HDWatchdog(self.can_interface) self.bloodflow = HDBloodFlow(self.can_interface) self.dialysate_inlet_flow = HDDialysateInletFlow(self.can_interface) self.dialysate_outlet_flow = HDDialysateOutletFlow(self.can_interface) self.treatment = HDTreatment(self.can_interface) self.pressure_occlusion = HDPressureOcclusion(self.can_interface) def test_can_message(self, seq): """ Sends a test can message \param seq: the starting integer of the sequence """ zero = integer_to_bytearray(0) seq1 = integer_to_bytearray(seq) seq2 = integer_to_bytearray(seq + 1) seq3 = integer_to_bytearray(seq + 2) seq4 = integer_to_bytearray(seq + 3) seq5 = integer_to_bytearray(seq + 4) payload = seq1 + zero + seq2 + zero + seq3 + zero + seq4 + zero + seq5 message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=6, payload=payload) self.can_interface.send(message, 0) if __name__ == "__main__": # create an HD object called hd hd = HD() while True: sleep(1) print(hd.alarms.alarm_states) # wait 2 seconds and then login to HD as a tester sleep(2) hd.Basics.cmd_log_in_to_hd() # FIXME: Delete commented code: # hd.rtc.cmd_set_rtc_time_and_date(0, 2, 1, 5, 1, 2020) while True: print(hd.RTC.RTCEpoch) sleep(1) # FIXME: This should either be deleted or moved to a file: """" hd.bloodflow.cmd_blood_pump_measured_current_override(NO_RESET,149) totalVolumeInMl = 2400 rxTimeInMins = 30 flowRateMlmin = 100 sleep(2) resp = hd.DialOut.set_uf_rx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(2) rxTimeInMins = 60 resp = hd.DialOut.set_uf_rx(totalVolumeInMl, rxTimeInMins, flowRateMlmin) print("Set TotalVolume(mL): {}, TotalTime(mins): {} and flowrate (ml/min): {}, resp: {}".format(totalVolumeInMl, rxTimeInMins, flowRateMlmin, resp)) sleep(5) hd.DialOut.set_uf_state(DialOutStates.RUN) state_run = hd.DialOut.DialOutBroadcast['state'] sleep(2) print("After RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 20 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) sleep(20) print("After 40 secs RUN: {}".format(hd.DialOut.DialOutBroadcast)) hd.DialOut.set_uf_state(DialOutStates.PAUSE) sleep(2) print("After PAUSE: {}".format(hd.DialOut.DialOutBroadcast)) state_pause = hd.DialOut.DialOutBroadcast['state'] sleep(10) hd.DialOut.set_uf_state(DialOutStates.STOP) sleep(2) print("---> After STOP:{} <---".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.set_uf_state(DialOutStates.RUN) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After RUN again: {}".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.set_uf_state(DialOutStates.STOP) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After STOP: {}".format(hd.DialOut.DialOutBroadcast)) sleep(5) hd.DialOut.plot_broadcast_signals() exit() tgtRate = 0 hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) while True: if hd.bloodflow.target_blood_flow_rate == 0: if tgtRate != 0: hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.bloodflow.cmd_blood_flow_broadcast_interval_override(NO_RESET, 200) tgtRate = hd.bloodflow.target_blood_flow_rate sleep(1) print(hd.bloodflow.measured_blood_flow_rate) # hd.bloodflow.cmd_blood_flow_broadcast_interval_override(RESET,0) """