Index: dialin/common/hd_defs.py =================================================================== diff -u --- dialin/common/hd_defs.py (revision 0) +++ dialin/common/hd_defs.py (revision c61eff081973c9fb186c353e9ff0360549b57a78) @@ -0,0 +1,30 @@ +################################################################ +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file alarm_defs.py +# +# @author (last) Peter Lucia +# @date (last) 30-Nov-2020 +# @author (original) Peter Lucia +# @date (original) 30-Nov-2020 +# +############################################################################ +from enum import unique +from ..utils.base import DialinEnum + +@unique +class HDDefs(DialinEnum): + MODE_FAUL = 0 # Fault mode + MODE_SERV = 1 # Service mode + MODE_INIT = 2 # Initialization & POST mode + MODE_STAN = 3 # Standby mode + MODE_TPAR = 4 # Treatment Parameters mode + MODE_PRET = 5 # Pre-Treatment mode + MODE_TREA = 6 # Treatment mode + MODE_POST = 7 # Post-Treatment mode + MODE_NLEG = 8 # Not legal - an illegale mode transition occurred + NUM_OF_MODES = 9 # Number of HD operation modes \ No newline at end of file Index: dialin/ui/hd_simulator.py =================================================================== diff -u -rf3ea3f9a7864df4f448b60a3192ca83c51c4e881 -rc61eff081973c9fb186c353e9ff0360549b57a78 --- dialin/ui/hd_simulator.py (.../hd_simulator.py) (revision f3ea3f9a7864df4f448b60a3192ca83c51c4e881) +++ dialin/ui/hd_simulator.py (.../hd_simulator.py) (revision c61eff081973c9fb186c353e9ff0360549b57a78) @@ -23,6 +23,7 @@ from . import utils, messageBuilder from .hd_simulator_alarms import HDProxyAlarms from ..common.msg_defs import RequestRejectReasons, MsgIds +from ..common.hd_defs import HDDefs from ..protocols.CAN import (DenaliMessage, DenaliCanMessenger, DenaliChannels) @@ -337,12 +338,16 @@ request = struct.unpack('i', bytearray( message['message'][START_POS:END_POS]))[0] + if request == 0: self.logger.debug("Selecting treatment parameters") + self.cmd_send_hd_operation_mode(HDDefs.MODE_PRET.value) elif request == 1: self.logger.debug("Canceling treatment") + self.cmd_send_hd_operation_mode(HDDefs.MODE_STAN.value) elif request == 2: self.logger.debug("Starting treatment") + self.cmd_send_hd_operation_mode(HDDefs.MODE_TREA.value) self.cmd_send_start_treatment_response(YES, 0) @@ -364,6 +369,24 @@ self.can_interface.send(message, 0) + def cmd_send_hd_operation_mode(self, mode): + """ + Broadcasts the current HD operation mode + @param: (int) mode + @return: None + """ + + if not isinstance(mode, int): + raise ValueError("Provided mode is not of type 'int'") + + payload = integer_to_bytearray(mode) + + message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, + message_id=MsgIds.MSG_ID_HD_OP_MODE.value, + payload=payload) + + self.can_interface.send(message, 0) + def cmd_send_end_treatment_response(self): """ Sends an end treatment response @@ -377,6 +400,7 @@ payload=payload) self.can_interface.send(message, 0) + def _handler_ui_end_treatment(self, message): """ @@ -432,121 +456,164 @@ """ self.logger.info("Test Completed") - @staticmethod - def send_acknowledge_HD(): + def cmd_send_acknowledge_hd(self): """ the acknowledge from HD :return: none """ - subprocess.call(['cansend', 'can0', '020#A5.01.00.FF.FF.00.19.00']) - @staticmethod - def send_acknowledge_UI(): + payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_send_acknowledge_ui(self): """ the acknowledge from UI :return: none """ - subprocess.call(['cansend', 'can0', '100#A5.01.00.FF.FF.00.19.00']) - @staticmethod - def send_CheckIn_DG(): + payload = ["A5", "01", "00", "FF", "FF", "00", "19", "00"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.ui_to_hd_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_send_checkin_dg(self): """ check-in (keep alive) message from DG :return: none """ - subprocess.call(['cansend', 'can0', '010#A5.01.00.06.00.00.76.00']) - @staticmethod - def show_PowerOffDialog(): + payload = ["A5", "01", "00", "06", "00", "00", "76", "00"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.dg_to_hd_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_show_poweroff_dialog(self): """ the message from HD to UI to show the power off dialog :return: none """ - subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.00.38']) - @staticmethod - def hide_PowerOffDialog(): + payload = ["A5", "01", "00", "01", "00", "01", "00", "38"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_hide_poweroff_dialog(self): """ the message from HD to UI to hide the power off dialog :return: none """ - subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.01.09']) - @staticmethod - def show_PowerOffNotificationDialog(): + payload = ["A5", "01", "00", "01", "00", "01", "01", "09"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_show_poweroff_notification_dialog(self): """ - the message from HD to UI to shew the shutting down notipication box + the message from HD to UI to show the shutting down notification box :return: none """ - subprocess.call(['cansend', 'can0', '040#A5.01.00.0E.00.00.24.00']) - @staticmethod - def show_PowerOffRejectionDialog(): + payload = ["A5", "01", "00", "0E", "00", "00", "24", "00"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.hd_sync_broadcast_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + + def cmd_show_poweroff_rejection_dialog(self): """ the message from HD to UI to show the power off dialog :return: none """ - subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.02.5A']) + payload = ["A5", "01", "00", "01", "00", "01", "02", "5A"] + payload = [int(each, 16) for each in payload] + + message = {"channel_id": DenaliChannels.hd_to_ui_ch_id, + "message": payload} + + self.can_interface.send(message, 0) + @staticmethod - def waitForMessageToBeSent(): + def wait_for_message_to_be_sent(delay=0.050): """ - After each multi-frame message put a 100ms sleep, time.sleep(0.1) # - it seems it's needed otherwise the test will check a value which has not been received yet. # + After each multi-frame message put a 50ms sleep, time.sleep(0.1) + it seems it's needed otherwise the test will check a value which has not been received yet. + :@param delay: the number of seconds to wait :return: none """ - time.sleep(0.050) # 50ms + time.sleep(delay) @staticmethod - def buildHDDebugText(vText): + def build_dg_debug_text(vText): """ - the debug text message from HD builder method + the debug text message from DG builder method :param vText: (str) the debug text :return: none """ - len = 40 - txt = messageBuilder.textToByte(vText, len) # + 1 null term - msg = messageBuilder.buildMessage(GuiActionType.HDDebugText, 1 * (len + 1), False, txt) + message_length = 40 + txt = messageBuilder.textToByte(vText, message_length) # + 1 null term + msg = messageBuilder.buildMessage(GuiActionType.DGDebugText, 1 * (message_length + 1), False, txt) return messageBuilder.toFrames(msg) @staticmethod - def setHDDebugText(vText): + def build_hd_debug_text(vText): """ - the debug text message from HD setter/sender method + the debug text message from HD builder method :param vText: (str) the debug text :return: none """ - frames = HDSimulator.buildHDDebugText(vText) - frames = messageBuilder.toCandumpFormat(frames) - for frame in frames: - subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) - HDSimulator.waitForMessageToBeSent() + message_length = 40 + txt = messageBuilder.textToByte(vText, message_length) # + 1 null term + msg = messageBuilder.buildMessage(GuiActionType.HDDebugText, 1 * (message_length + 1), False, txt) + return messageBuilder.toFrames(msg) @staticmethod - def buildDGDebugText(vText): + def cmd_set_hd_debug_text(debug_text): """ - the debug text message from DG builder method - :param vText: (str) the debug text + the debug text message from HD setter/sender method + :param debug_text: (str) the debug text :return: none """ - len = 40 - txt = messageBuilder.textToByte(vText, len) # + 1 null term - msg = messageBuilder.buildMessage(GuiActionType.DGDebugText, 1 * (len + 1), False, txt) - return messageBuilder.toFrames(msg) + frames = HDSimulator.build_hd_debug_text(debug_text) + frames = messageBuilder.toCandumpFormat(frames) + for frame in frames: + subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) + HDSimulator.wait_for_message_to_be_sent() @staticmethod - def setDGDebugText(vText): + def cmd_set_dg_debug_text(debug_text): """ the debug text message from DG setter/sender method - :param vText: (str) the debug text + :param debug_text: (str) the debug text :return: none """ - frames = HDSimulator.buildDGDebugText(vText) + frames = HDSimulator.build_dg_debug_text(debug_text) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '070#{}'.format(frame)]) - HDSimulator.waitForMessageToBeSent() + HDSimulator.wait_for_message_to_be_sent() def cmd_set_treatment_parameter_ranges(self, vMinTreatmentDuration, vMaxTreatmentDuration, vMinUFVolume, vMaxUFVolume, vMinDialysateFlowRate, vMaxDialysateFlowRate): @@ -877,7 +944,6 @@ self.can_interface.send(message, 0) - def cmd_set_pressure_occlusion_data(self, vArterialPressure, vVenousPressure, vBloodPumpOcclusion, vDialysateInletPumpOcclusion, vDialysateOutletPumpOcclusion): """ @@ -1066,7 +1132,6 @@ self.can_interface.send(message, 0) - def cmd_set_dg_load_cell_readings_data(self, vRs1Prim, vRs1Bkup, vRs2Prim, vRs2Bkup): """ The DG Load Cell Readings Data message setter/sender method @@ -1237,60 +1302,49 @@ self.can_interface.send(message, 0) - @staticmethod - def buildCanBUSFaultCount(vCount): + def cmd_set_canbus_fault_count(self, count): """ - the CANBus fault count message builder method - :param vCount: (int) Fault Count - :return: (str) built message frame(s) - """ - msg = messageBuilder.buildMessage(GuiActionType.CanBUSFaultCount, 4 * 1, False, - utils.toI32(vCount) - ) - return messageBuilder.toFrames(msg) - - @staticmethod - def setCanBUSFaultCount(vCount): - """ the CANBus fault count message setter/sender method - :param vCount: (int) Fault Count + :param count: (int) Fault Count :return: none """ - frames = HDSimulator.buildCanBUSFaultCount(vCount) - frames = messageBuilder.toCandumpFormat(frames) - for frame in frames: - subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) - HDSimulator.waitForMessageToBeSent() - @staticmethod - def buildUnknown(): - """ - the unknown message builder method - :return: (str) built message frame(s) - """ - msg = messageBuilder.buildMessage(GuiActionType.Unknown, 0, False) - return messageBuilder.toFrames(msg) + payload = integer_to_bytearray(count) - @staticmethod - def sendUnknown_HD(): + message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, + message_id=MsgIds.MSG_ID_CAN_ERROR_COUNT.value, + payload=payload) + + self.can_interface.send(message, 0) + HDSimulator.wait_for_message_to_be_sent() + + + def cmd_send_unknown_hd(self): """ the unknown message from HD setter/sender method :return: none """ - frames = HDSimulator.buildUnknown() - frames = messageBuilder.toCandumpFormat(frames) - for frame in frames: - subprocess.call(['cansend', 'can0', '001#{}'.format(frame)]) # send from HD - HDSimulator.waitForMessageToBeSent() - @staticmethod - def sendUnknown_DG(): + payload = integer_to_bytearray(0) + + message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, + message_id=MsgIds.MSG_ID_UNUSED.value, + payload=payload) + + self.can_interface.send(message, 0) + + def cmd_send_unknown_dg(self): """ the unknown message from DG setter/sender method :return: none """ - frames = HDSimulator.buildUnknown() - frames = messageBuilder.toCandumpFormat(frames) - for frame in frames: - subprocess.call(['cansend', 'can0', '070#{}'.format(frame)]) # send from DG - HDSimulator.waitForMessageToBeSent() + + payload = integer_to_bytearray(0) + + message = DenaliMessage.build_message(channel_id=DenaliChannels.dg_to_ui_ch_id, + message_id=MsgIds.MSG_ID_UNUSED.value, + payload=payload) + + self.can_interface.send(message, 0) + + Index: tests/test_debug_text.py =================================================================== diff -u --- tests/test_debug_text.py (revision 0) +++ tests/test_debug_text.py (revision c61eff081973c9fb186c353e9ff0360549b57a78) @@ -0,0 +1,32 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file test_debug_text.py +# +# @author (last) Peter Lucia +# @date (last) 03-Dec-2020 +# @author (original) Peter Lucia +# @date (original) 03-Dec-2020 +# +############################################################################ +import sys +sys.path.append("..") + +from dialin.ui.hd_simulator import HDSimulator +from dialin.ui.messageBuilder import textToByte + +def test_debug_text(): + + hd_simulator = HDSimulator(log_level="CAN_ONLY") + + # print(textToByte("test", 40)) + # print(hd_simulator.buildHDDebugText("test")) + print(hd_simulator.cmd_set_hd_debug_text("test")) + # print(HDSimulator.setHDDebugText("test")) + +if __name__ == '__main__': + test_debug_text()