########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import serial import threading import math from enum import Enum from time import sleep class DSM_State(Enum): """ defines 3 states of search """ SEARCHING_FOR_START_BYTE = 0 SEARCHING_FOR_PACKET_LENGTH = 1 SEARCHING_FOR_CRC = 2 class DialitySerialMessenger: START_BYTE = b'\xA5' def __init__(self, serial_port='/dev/ttyUSB0'): """ DialitySerialMesseger constructor \param serial_port - string containing the serial port, e.g., '/dev/ttyUSB69" \returns DialitySerialMessenger object """ self.__serialConnection = serial.Serial(serial_port, baudrate=115200) self.__dialitySerialByteBuffer = bytearray() self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE self.__dialityPacket = bytearray() self.__dialityResponsePacket = bytearray() self.__dialityPacketLength = 0 self.__sendRID = -1 self.__sendEvent = threading.Event() self.responseFunction = None self.__run = False if self.__serialConnection is not None: self.__serialListenerThread = threading.Thread(target=self.__listener) else: self.__serialListenerThread = None print("Serial connection is not valid") def start(self): """ after an object is create, you need to "start" serial communication. """ if self.__serialConnection is None: print("Cannot start serial listener.") return else: self.__run = True self.__serialListenerThread.start() print("Serial", self.__serialConnection.port, "listener has started.") def stop(self): """ stop serial listener thread """ self.__run = False print("Serial", self.__serialConnection.port, "listener has stopped.") @staticmethod def __getRequestID(message): return int.from_bytes(message[1:3], byteorder='big', signed=False) def __listener(self): """ private method used as a thread. """ while self.__run: num_of_bytes_in_serial_port = self.__serialConnection.in_waiting if num_of_bytes_in_serial_port != 0: chunk_of_bytes = self.__serialConnection.read(num_of_bytes_in_serial_port) self.__dialitySerialByteBuffer += chunk_of_bytes #print("\n\nArriving Buffer: " + str(chunk_of_bytes)) continueProcessing = True while continueProcessing: # First we look for the start of the packet # ========================================= if self.__searchState == DSM_State.SEARCHING_FOR_START_BYTE: # print("\n\nProcessing buffer: " + str(self.__dialitySerialByteBuffer)) # print("Searching for start byte") index = self.__dialitySerialByteBuffer.find(self.START_BYTE) if index >= 0: self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[index:] self.__searchState = DSM_State.SEARCHING_FOR_PACKET_LENGTH else: self.__dialitySerialByteBuffer.clear() continueProcessing = False # Then we see if we have the information about its length # ======================================================= if self.__searchState == DSM_State.SEARCHING_FOR_PACKET_LENGTH and \ len(self.__dialitySerialByteBuffer) >= 4: # print("Searching for packet length") self.__dialityPacketLength = self.__dialitySerialByteBuffer[3] + 5 self.__searchState = DSM_State.SEARCHING_FOR_CRC else: continueProcessing = False # We verify that the packet is in the buffer, and if it is, we print for now # ========================================================================== if self.__searchState == DSM_State.SEARCHING_FOR_CRC and \ len(self.__dialitySerialByteBuffer) >= self.__dialityPacketLength: #print("Searching for crc") crc = self.__dialitySerialByteBuffer[self.__dialityPacketLength - 1] # Get the Denali packet self.__dialityPacket = self.__dialitySerialByteBuffer[0:self.__dialityPacketLength] #print("Packet CRC: " + str(crc) + ", Calculated CRC:" + str(self.__denaliPacketCRC())) if True: # crc == self.__denaliPacketCRC(): denaliResponseRequestID = self.__getRequestID(self.__dialityPacket) #print("send: "+str(self.__sendRID)+" rec: "+str(denaliResponseRequestID)) if denaliResponseRequestID == self.__sendRID: self.__dialityResponsePacket = self.__dialityPacket self.__sendEvent.set() # Set the "sendEvent", so the "send" command can continue self.__sendRID = -1 else: self.__dialityResponsePacket = bytearray() self.__dialityPacket.clear() # We didn't get what we were expecting # We are going to let the command timeout # print("Final Denali Message: " + str(self.__dialityPacket)) else: # CRC is corrupted. Drop the packet self.__dialityPacket.clear() self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[self.__dialityPacketLength:] # If there is more to process if len(self.__dialitySerialByteBuffer) > 0: continueProcessing = True else: continueProcessing = False self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE else: continueProcessing = False else: sleep(0.001) def __denaliPacketCRC(self): """ utility method used to calculate incoming message CRC """ return sum(self.__dialityPacket[0:self.__dialityPacketLength - 1]) % 256 def registerResponseFunction(self, function): self.responseFunction = function def send(self, message, time_out=1): # Check message ID self.__sendRID = self.__getRequestID(message) self.__sendEvent.clear() self.__serialConnection.write(message) self.__sendEvent.wait(time_out) self.__sendRID = -1 return self.__dialityResponsePacket class DialityPacket: @staticmethod def getCRC(message): """ getCRC returns a message with its CRC """ message_cargo_length = message[3] return sum(message[0:4 + message_cargo_length]) % 256 @staticmethod def buildPacket(request_id=0, cargo=bytearray()): """ buildPacket builds a Diality Packet \param \request_id is an integer indicating request ID \cargo is a byte array with cargo. It does not include length """ message = b'\xA5' if 0 <= request_id <= (2**16 - 1): # Make sure an int was passed message += request_id.to_bytes(2, byteorder='big') else: return bytearray() # Check cargo length cargo_length = len(cargo) # if cargo is larger than 255 return nothing if cargo_length <= 255: # cargo has to be a byte array message += cargo_length.to_bytes(1, byteorder='big') else: return bytearray() message += cargo crc = DialityPacket.getCRC(message) message += crc.to_bytes(1, byteorder='big') # crc location message_length = len(message) # message must be multiple of 8 if message_length % 8 != 0: # We need to patch the message with zeros add_these_many_zeros = math.ceil(message_length / 8) * 8 - message_length message += bytearray(add_these_many_zeros) return message if __name__ == "__main__": the_messenger = DialitySerialMessenger(serial_port="/dev/ttyUSB0") the_messenger.start() the_messenger.stop() the_test = DialityPacket.buildPacket(request_id=8000, cargo=b'123') print(the_test) the_test1 = DialityPacket.buildPacket(request_id=8000, cargo=b'12345') print(the_test1)