########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import serial import threading from enum import Enum from time import sleep START_BYTE = b'\xA5' class DSM_State(Enum): SEARCHING_FOR_START_BYTE = 0 SEARCHING_FOR_PACKET_LENGTH = 1 SEARCHING_FOR_CRC = 2 class DialitySerialMessenger: def __init__(self, serial_port='/dev/ttyUSB0'): self.__serialConnection = serial.Serial(serial_port, baudrate=115200) self.__dialitySerialByteBuffer = bytearray() self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE self.__dialityPacket = bytearray() self.__dialityPacketLength = 0 self.__sendRID = 0 self.__sendEvent = threading.Event() self.responseFunction = None if self.__serialConnection is not None: self.__serialListenerThread = threading.Thread(target=self.__listener) else: self.__serialListenerThread = None print("Serial connection is not valid") def start(self): if self.__serialConnection is None: print("Cannot start serial listener.") return else: print("Serial listener has started.") self.__serialListenerThread.start() def __getRequestID(self, message): return int.from_bytes(message[1:2], byteorder='little', signed=False) def __listener(self): while True: num_of_bytes_in_serial_port = self.__serialConnection.in_waiting if num_of_bytes_in_serial_port != 0: chunk_of_bytes = self.__serialConnection.read(num_of_bytes_in_serial_port) self.__dialitySerialByteBuffer += chunk_of_bytes print("\n\nArriving Buffer: " + str(chunk_of_bytes)) continueProcessing = True while continueProcessing: # First we look for the start of the packet # ========================================= if self.__searchState == DSM_State.SEARCHING_FOR_START_BYTE: print("\n\nProcessing buffer: " + str(self.__dialitySerialByteBuffer)) print("Searching for start byte") index = self.__dialitySerialByteBuffer.find(START_BYTE) if index >= 0: self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[index:] self.__searchState = DSM_State.SEARCHING_FOR_PACKET_LENGTH else: self.__dialitySerialByteBuffer.clear() continueProcessing = False # Then we see if we have the information about its length # ======================================================= if self.__searchState == DSM_State.SEARCHING_FOR_PACKET_LENGTH and \ len(self.__dialitySerialByteBuffer) >= 4: print("Searching for packet length") self.__dialityPacketLength = self.__dialitySerialByteBuffer[3] + 5 self.__searchState = DSM_State.SEARCHING_FOR_CRC else: continueProcessing = False # We verify that the packet is in the buffer, and if it is, we print for now # ========================================================================== if self.__searchState == DSM_State.SEARCHING_FOR_CRC and \ len(self.__dialitySerialByteBuffer) >= self.__dialityPacketLength: print("Searching for crc") crc = self.__dialitySerialByteBuffer[self.__dialityPacketLength - 1] # Get the Denali packet self.__dialityPacket = self.__dialitySerialByteBuffer[0:self.__dialityPacketLength] print("Packet CRC: " + str(crc) + ", Calculated CRC:" + str(self.__denaliPacketCRC())) if crc == 0: #self.__denaliPacketCRC(): denaliResponseRequestID = self.__getRequestID(self.__dialityPacket) if denaliResponseRequestID == self.__sendRID: self.__sendEvent.set() # Set the "sendEvent", so the "send" command can continue else: self.__dialityPacket.clear() # We didn't get what we were expecting # We are going to let the command timeout print("Final Denali Message: " + str(self.__dialityPacket)) else: # CRC is corrupted. Drop the packet self.__dialityPacket.clear() self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[self.__dialityPacketLength:] # If there is more to process if len(self.__dialitySerialByteBuffer) > 0: continueProcessing = True else: continueProcessing = False self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE else: continueProcessing = False else: sleep(0.001) def __denaliPacketCRC(self): return sum(self.__dialityPacket[0:self.__dialityPacketLength - 1]) % 256 def registerResponseFunction(self, function): self.responseFunction = function def write(self, message): # Check message ID self.__sendRID = self.__getRequestID(message) self.__sendEvent.clear() self.__serialConnection.write(message) return self.__sendEvent def getMessage(self): return self.__dialityPacket if __name__ == "__main__": the_messenger = DialitySerialMessenger(serial_port="/dev/ttyUSB0") the_messenger.start()