# -*- coding: utf-8 -*- ## # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # copyright # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, # IN PART OR IN WHOLE, # WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # file denaliMessages.py # date 2020/04/08 # author Behrouz NematiPour # import time import subprocess import utils import messageBuilder from enum import IntEnum class GuiActionType(IntEnum): Unknown = 0 PowerOff = 1 KeepAlive = 7 BloodFlow = 5 DialysateInletFlow = 8 DialysateOutletFlow = 11 TreatmentTime = 13 PowerOffBroadcast = 14 AlarmStatus = 2 AlarmTriggered = 3 AlarmCleared = 4 PressureOcclusion = 9 AdjustBloodDialysateReq = 23 # 23 AdjustBloodDialysateRsp = 24 # 24 TreatmentRanges = 26 # 26 String = 65279 Acknow = 65535 def send_acknowledge_HD(): subprocess.call(['cansend', 'can0', '020#A5.01.00.FF.FF.00.44.00']) def send_acknowledge_UI(): subprocess.call(['cansend', 'can0', '100#A5.01.00.FF.FF.00.44.00']) def show_PowerOffDialog(): subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.00.38']) def hide_PowerOffDialog(): subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.01.09']) def show_PowerOffNotificationDialog(): subprocess.call(['cansend', 'can0', '040#A5.01.00.0E.00.00.24.00']) def show_PowerOffRejectionDialog(): subprocess.call(['cansend', 'can0', '020#A5.01.00.01.00.01.02.5A']) ################################################################################################## # After each multi-frame message put a 100ms sleep, time.sleep(0.1) # # it seems it's needed otherwise the test will check a value which has not been received yet. # ################################################################################################## def waitForMessageToBeSent(): time.sleep(0.1) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildSetTreatmentParamRanges(vMinTreatmentDuration, vMaxTreatmentDuration, vMinUFVolume, vMaxUFVolume, vMinDialysateFlowRate, vMaxDialysateFlowRate): msg = messageBuilder.buildMessage( GuiActionType.TreatmentRanges, 4 * 6, True, utils.toI32(vMinTreatmentDuration), utils.toI32(vMaxTreatmentDuration), utils.toI32(vMinUFVolume ), utils.toI32(vMaxUFVolume ), utils.toI32(vMinDialysateFlowRate), utils.toI32(vMaxDialysateFlowRate) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentParamRanges(vMinTreatmentDuration, vMaxTreatmentDuration, vMinUFVolume, vMaxUFVolume, vMinDialysateFlowRate, vMaxDialysateFlowRate): # This message needs to be acknowledged by seq# 01.00 : just call send_acknowledge_UI after this frames = buildSetTreatmentParamRanges(vMinTreatmentDuration, vMaxTreatmentDuration, vMinUFVolume, vMaxUFVolume, vMinDialysateFlowRate, vMaxDialysateFlowRate) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildSetTreatmentBloodFlowRate(vFlowSetPt, vMeasFlow = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): msg = messageBuilder.buildMessage( GuiActionType.BloodFlow, 4 * 7, False, utils.toI32(vFlowSetPt ), utils.toF32(vMeasFlow ), utils.toF32(vRotSpd ), utils.toF32(vMotSpd ), utils.toF32(vMCSpd ), utils.toF32(vMCCurr ), utils.toF32(vPWM ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentBloodFlowRate(vFlowSetPt, vMeasFlow = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): frames = buildSetTreatmentBloodFlowRate(vFlowSetPt) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '040#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildSetTreatmentDialysateFlowRate(vFlowSetPt, vMeasFlow = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): msg = messageBuilder.buildMessage( GuiActionType.DialysateInletFlow, 4 * 7, False, utils.toI32(vFlowSetPt ), utils.toF32(vMeasFlow ), utils.toF32(vRotSpd ), utils.toF32(vMotSpd ), utils.toF32(vMCSpd ), utils.toF32(vMCCurr ), utils.toF32(vPWM ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentDialysateFlowRate(vFlowSetPt, vMeasFlow = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): frames = buildSetTreatmentDialysateFlowRate(vFlowSetPt) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '040#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildTreatmentAdjustBloodDialysateResponse(vAccepted, vReason, vBloodRate, vDialysate): msg = messageBuilder.buildMessage( GuiActionType.AdjustBloodDialysateRsp, 4 * 4, True, utils.toI32(vAccepted), utils.toI32(vReason), utils.toI32(vBloodRate), utils.toI32(vDialysate) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def sendTreatmentAdjustBloodDialysateResponse(vAccepted, vReason, vBloodRate, vDialysate): frames = buildTreatmentAdjustBloodDialysateResponse(vAccepted, vReason, vBloodRate, vDialysate) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '020#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildTreatmentTime(vSecsTotal, vSecsElap, vSecsRem): msg = messageBuilder.buildMessage( GuiActionType.TreatmentTime, 3 * 4, False, utils.toI32(vSecsTotal ), utils.toI32(vSecsElap ), utils.toI32(vSecsRem ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentTime(vSecsTotal, vSecsElap, vSecsRem): frames = buildTreatmentTime(vSecsTotal, vSecsElap, vSecsRem) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '040#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildSetTreatmentUltrafiltration(vRefUFVol, vMeasUFVol = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): msg = messageBuilder.buildMessage( GuiActionType.DialysateOutletFlow, 4 * 7, False, utils.toF32(vRefUFVol ), utils.toF32(vMeasUFVol ), utils.toF32(vRotSpd ), utils.toF32(vMotSpd ), utils.toF32(vMCSpd ), utils.toF32(vMCCurr ), utils.toF32(vPWM ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentUltrafiltration(vRefUFVol, vMeasUFVol = 0, vRotSpd = 0, vMotSpd = 0, vMCSpd = 0, vMCCurr = 0, vPWM = 0): frames = buildSetTreatmentUltrafiltration(vRefUFVol, vMeasUFVol, vRotSpd, vMotSpd, vMCSpd, vMCCurr, vPWM) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '040#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildSetTreatmentPressureOcclusion(vArtPres, vVenPres, vBPOccl, vDIPOccl, vDOPOccl): msg = messageBuilder.buildMessage( GuiActionType.PressureOcclusion, 4 * 5, False, utils.toF32(vArtPres ), utils.toF32(vVenPres ), utils.toF32(vBPOccl ), utils.toF32(vDIPOccl ), utils.toF32(vDOPOccl ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def setTreatmentPressureOcclusion(vArtPres = 0, vVenPres = 0, vBPOccl = 0, vDIPOccl = 0, vDOPOccl = 0): frames = buildSetTreatmentPressureOcclusion(vArtPres, vVenPres, vBPOccl, vDIPOccl, vDOPOccl) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '040#{}'.format(frame)]) waitForMessageToBeSent() ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def buildAlarm(vState, vTop, vEsclatesIn, vSilentExpires, vFlags): msg = messageBuilder.buildMessage( GuiActionType.AlarmStatus, 4 * 4 + 2, False, utils.toI32(vState ), utils.toI32(vTop ), utils.toI32(vEsclatesIn ), utils.toI32(vSilentExpires ), utils.toI16(vFlags ) ) return messageBuilder.toFrames(msg) ## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def sendAlarm(vState, vTop, vEsclatesIn, vSilentExpires, vFlags): frames = buildAlarm(vState, vTop, vEsclatesIn, vSilentExpires, vFlags) frames = messageBuilder.toCandumpFormat(frames) for frame in frames: subprocess.call(['cansend', 'can0', '001#{}'.format(frame)]) waitForMessageToBeSent()