Index: dialin/hd/buttons.py =================================================================== diff -u -r8d1f61499650e23dac6f857e48daad42180db949 -rf3171e14b9cbfabe015e6aa093234d8dd8b55b45 --- dialin/hd/buttons.py (.../buttons.py) (revision 8d1f61499650e23dac6f857e48daad42180db949) +++ dialin/hd/buttons.py (.../buttons.py) (revision f3171e14b9cbfabe015e6aa093234d8dd8b55b45) @@ -20,18 +20,24 @@ from .constants import RESET, NO_RESET from ..utils.base import _AbstractSubSystem, _publish from logging import Logger +import struct class HDButtons(_AbstractSubSystem): """ Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ + START_POS_POWEROFF = DenaliMessage.PAYLOAD_START_INDEX + END_POS_POWEROFF_STATUS = START_POS_POWEROFF + 2 + # buttons message IDs MSG_ID_HD_OFF_BUTTON_OVERRIDE = 0x8002 MSG_ID_HD_STOP_BUTTON_OVERRIDE = 0x8003 - MSG_ID_HD_POWEROFF_TIMEOUT = 0x000E + MSG_ID_HD_POWEROFF_OPEN = 0x01 + MSG_ID_HD_POWEROFF_TIMEOUT = 0x01 + MSG_ID_HD_POWEROFF_BROADCAST = 0x000E PRESSED = 1 RELEASED = 0 @@ -48,9 +54,10 @@ self.logger = logger self.poweroff_timeout_expired = False - self.can_interface.register_receiving_publication_function(DenaliChannels.hd_to_ui_ch_id, - self.MSG_ID_HD_POWEROFF_TIMEOUT, - self._handler_poweroff_timeout_occurred) + if self.can_interface is not None: + self.can_interface.register_receiving_publication_function(DenaliChannels.hd_to_ui_ch_id, + self.MSG_ID_HD_POWEROFF_TIMEOUT, + self._handler_poweroff_timeout_occurred) def get_power_timeout_expired(self): """ @@ -61,12 +68,32 @@ return self.poweroff_timeout_expired + def reset_poweroff_timeout_expired(self): + """ + Resets the dialin poweroff timeout flag to False + + @return: None + """ + + self.poweroff_timeout_expired = False + @_publish(["poweroff_timeout_expired"]) - def _handler_poweroff_timeout_occurred(self): - pass + def _handler_poweroff_timeout_occurred(self, message): + """ + Poweroff timeout occurred handler + @param message: The poweroff timeout occurred message string + @return: None + """ + if len(message["message"]) < 16: + self.logger.debug("Poweroff message id detected, but was the wrong length.") + return + mode = struct.unpack('h', bytearray( + message['message'][self.START_POS_POWEROFF:self.END_POS_POWEROFF_STATUS])) + if len(mode) > 0: + self.poweroff_timeout_expired = bool(mode[0]) def cmd_off_button_override(self, state, reset=NO_RESET): """ Index: dialin/ui/hd_simulator.py =================================================================== diff -u -ra08848ed6275cf2afa3bfb7f3c87709b69c224bc -rf3171e14b9cbfabe015e6aa093234d8dd8b55b45 --- dialin/ui/hd_simulator.py (.../hd_simulator.py) (revision a08848ed6275cf2afa3bfb7f3c87709b69c224bc) +++ dialin/ui/hd_simulator.py (.../hd_simulator.py) (revision f3171e14b9cbfabe015e6aa093234d8dd8b55b45) @@ -13,15 +13,17 @@ # @brief This class simulates the hd when interfacing with the UI # ############################################################################ -from dialin.protocols.CAN import (DenaliMessage, - DenaliCanMessenger, - DenaliChannels) -from dialin.utils.base import _AbstractSubSystem, _publish, _LogManager -from dialin.utils.conversions import integer_to_bytearray -from dialin.hd.alarms import HDAlarms +from ..protocols.CAN import (DenaliMessage, + DenaliCanMessenger, + DenaliChannels) +from ..utils.base import _AbstractSubSystem, _publish, _LogManager +from ..utils.conversions import integer_to_bytearray +from ..hd.alarms import HDAlarms +from ..hd.buttons import HDButtons import enum from typing import List + HIGH = 3 MED = 2 LOW = 1 @@ -94,7 +96,6 @@ ALARM_ID_DOES_NOT_EXIST = (HIGH, 99, 0, 0, 0) - class MsgDefs(enum.Enum): REQUEST_REJECT_REASON_NONE = 0 @@ -120,10 +121,6 @@ MSG_ID_HD_CREATE_TREATMENT_RESPONSE = 0x0036 - MSG_ID_HD_POWEROFF = 0x01 - MSG_ID_HD_POWEROFF_BROADCAST = 0x000E - MSG_ID_CLEAR_ALARMS = 0x04 - NUM_TREATMENT_PARAMETERS = 18 def __init__(self, can_interface="can0", log_level=None): @@ -175,29 +172,44 @@ payload = bytearray(0x01) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, - message_id=self.MSG_ID_HD_POWEROFF, + message_id=HDButtons.MSG_ID_HD_POWEROFF_OPEN, payload=payload) self.can_interface.send(message, 0) - def cmd_send_broadcast_poweroff(self): + def cmd_send_broadcast_poweroff_imminent(self): """ - Broadcast a power-off message event + Broadcast that the system will shut down @return: None """ payload = integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, - message_id=self.MSG_ID_HD_POWEROFF_BROADCAST, + message_id=HDButtons.MSG_ID_HD_POWEROFF_BROADCAST, payload=payload) self.can_interface.send(message, 0) + def cmd_send_poweroff_timeout(self): + """ + Sends a poweroff timeout + + @return: None + """ + + payload = integer_to_bytearray(1) + + message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, + message_id=HDButtons.MSG_ID_HD_POWEROFF_TIMEOUT, + payload=payload) + + self.can_interface.send(message, 0) + def cmd_activate_alarm(self, alarm: Alarms): """ - Send alarm + Send the specified alarm @return: None """ @@ -223,16 +235,14 @@ @return: None """ - state = integer_to_bytearray(0) - top = integer_to_bytearray(0) - escalates_in = integer_to_bytearray(0) - silence_expires = integer_to_bytearray(0) - flags = integer_to_bytearray(0) + state = integer_to_bytearray(Alarms.ALARM_ID_NO_ALARM[0]) + top = integer_to_bytearray(Alarms.ALARM_ID_NO_ALARM[1]) + escalates_in = integer_to_bytearray(Alarms.ALARM_ID_NO_ALARM[2]) + silence_expires = integer_to_bytearray(Alarms.ALARM_ID_NO_ALARM[3]) + flags = integer_to_bytearray(Alarms.ALARM_ID_NO_ALARM[4]) payload = state + top + escalates_in + silence_expires + flags - # payload = bytearray([0,0,0,0,0]) - message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=HDAlarms.MSG_ID_HD_ALARMS_PUBLISHED_STATUS, payload=payload) Index: tests/test_hd_simulator.py =================================================================== diff -u -ra08848ed6275cf2afa3bfb7f3c87709b69c224bc -rf3171e14b9cbfabe015e6aa093234d8dd8b55b45 --- tests/test_hd_simulator.py (.../test_hd_simulator.py) (revision a08848ed6275cf2afa3bfb7f3c87709b69c224bc) +++ tests/test_hd_simulator.py (.../test_hd_simulator.py) (revision f3171e14b9cbfabe015e6aa093234d8dd8b55b45) @@ -18,6 +18,8 @@ sys.path.append("..") from dialin.ui.hd_simulator import HDSimulator, MsgDefs, Alarms +from dialin.hd.hemodialysis_device import HD +from dialin.utils.base import AbstractObserver from time import sleep @@ -47,23 +49,45 @@ hd_simulator.cmd_send_treatment_parameter_validation_response(rejections) +class Observer(AbstractObserver): + def __init__(self): + self.timeout_expired = False + + def update(self, result): + print(result) + self.timeout_expired = result.get("poweroff_timeout_expired", False) + + def test_poweroff(): + hd = HD() + + observer = Observer() + hd.buttons.attach(observer) + hd_simulator = HDSimulator(log_level="DEBUG") hd_simulator.cmd_send_poweroff_button_pressed() - hd_simulator.cmd_send_broadcast_poweroff() + sleep(1) + hd_simulator.cmd_send_poweroff_timeout() + sleep(1) + hd_simulator.cmd_send_broadcast_poweroff_imminent() + while not observer.timeout_expired: + sleep(1) + def test_clear_alarms(): hd_simulator = HDSimulator() - hd_simulator.cmd_activate_alarm(Alarms.ALARM_ID_ARTERIAL_PRESSURE_HIGH) + hd_simulator.cmd_activate_alarm(Alarms.ALARM_ID_TREATMENT_STOPPED_BY_USER) sleep(1) hd_simulator.cmd_send_clear_alarms() if __name__ == '__main__': - # test_invalid_parameters() test_clear_alarms() + sleep(1) + test_poweroff() +