########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import serial import threading import math from enum import Enum from time import sleep class DSM_State(Enum): """ defines 3 states of search """ SEARCHING_FOR_START_BYTE = 0 SEARCHING_FOR_PACKET_LENGTH = 1 SEARCHING_FOR_CRC = 2 class DialitySerialMessenger: START_BYTE = b'\xA5' def __init__(self, serial_port='/dev/ttyUSB0'): """ DialitySerialMesseger constructor \param serial_port - string containing the serial port, e.g., '/dev/ttyUSB69" \returns DialitySerialMessenger object """ self.__serialConnection = serial.Serial(serial_port, baudrate=115200) self.__dialitySerialByteBuffer = bytearray() self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE self.__dialityPacket = bytearray() self.__dialityResponsePacket = bytearray() self.__dialityPacketLength = 0 self.__sendRID = -1 self.__sendEvent = threading.Event() self.__run = False self.__async_response_dictionary = {} self.__sync_response_dictionary = {} if self.__serialConnection is not None: self.__serialListenerThread = threading.Thread(target=self.__listener) else: self.__serialListenerThread = None print("Serial connection is not valid") def start(self): """ after an object is create, you need to "start" serial communication. """ if self.__serialConnection is None: print("Cannot start serial listener.") return else: self.__run = True self.__serialListenerThread.start() print("Serial", self.__serialConnection.port, "listener has started.") def stop(self): """ stop serial listener thread """ self.__run = False print("Serial", self.__serialConnection.port, "listener has stopped.") @staticmethod def __getRequestID(message): return int.from_bytes(message[1:3], byteorder='big', signed=False) def __listener(self): """ private method used as a thread. """ while self.__run: num_of_bytes_in_serial_port = self.__serialConnection.in_waiting if num_of_bytes_in_serial_port != 0: chunk_of_bytes = self.__serialConnection.read(num_of_bytes_in_serial_port) self.__dialitySerialByteBuffer += chunk_of_bytes #print("\n\nArriving Buffer: " + str(chunk_of_bytes)) continueProcessing = True while continueProcessing: # First we look for the start of the packet # ========================================= if self.__searchState == DSM_State.SEARCHING_FOR_START_BYTE: # print("\n\nProcessing buffer: " + str(self.__dialitySerialByteBuffer)) # print("Searching for start byte") index = self.__dialitySerialByteBuffer.find(self.START_BYTE) if index >= 0: self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[index:] self.__searchState = DSM_State.SEARCHING_FOR_PACKET_LENGTH else: self.__dialitySerialByteBuffer.clear() continueProcessing = False # Then we see if we have the information about its length # ======================================================= if self.__searchState == DSM_State.SEARCHING_FOR_PACKET_LENGTH and \ len(self.__dialitySerialByteBuffer) >= 4: # print("Searching for packet length") self.__dialityPacketLength = self.__dialitySerialByteBuffer[3] + 5 self.__searchState = DSM_State.SEARCHING_FOR_CRC else: continueProcessing = False # We verify that the packet is in the buffer, and if it is, we print for now # ========================================================================== if self.__searchState == DSM_State.SEARCHING_FOR_CRC and \ len(self.__dialitySerialByteBuffer) >= self.__dialityPacketLength: #print("Searching for crc") crc = self.__dialitySerialByteBuffer[self.__dialityPacketLength - 1] # Get the Denali packet self.__dialityPacket = self.__dialitySerialByteBuffer[0:self.__dialityPacketLength] #print("Packet CRC: " + str(crc) + ", Calculated CRC:" + str(self.__denaliPacketCRC())) # Get the Denali Response Request ID denaliResponseRequestID = self.__getRequestID(self.__dialityPacket) #if mot crc == self.__denaliPacketCRC(): # TODO: we need to trash the packet if crc is not correct #self.__dialityResponsePacket = bytearray() #self.__dialityPacket.clear() # We didn't get what we were expecting if denaliResponseRequestID in self.__sync_response_dictionary.keys(): # if message is a sync message self.__sync_response_dictionary[denaliResponseRequestID](self.__dialityPacket) elif denaliResponseRequestID in self.__async_response_dictionary.keys(): # if message is a async message self.__async_response_dictionary[denaliResponseRequestID](self.__dialityPacket) elif 0 <= self.__sendRID == denaliResponseRequestID: # if message is a response to a command self.__dialityResponsePacket = self.__dialityPacket self.__sendEvent.set() # Set the "sendEvent", so the "send" command can continue self.__sendRID = -1 # we are waiting for a message... self.__dialitySerialByteBuffer = self.__dialitySerialByteBuffer[self.__dialityPacketLength:] # If there is more to process if len(self.__dialitySerialByteBuffer) > 0: continueProcessing = True else: continueProcessing = False self.__searchState = DSM_State.SEARCHING_FOR_START_BYTE else: continueProcessing = False else: sleep(0.001) def __denaliPacketCRC(self): """ utility method used to calculate incoming message CRC """ return sum(self.__dialityPacket[0:self.__dialityPacketLength - 1]) % 256 def registerAsyncFunction(self, request_id, function): """ assign a function with message parameter to an async request id, e.g., def thefunction(msg). :param request_id: integer :param function: function reference """ self.__async_response_dictionary[request_id] = function def registerSyncFunction(self, request_id, function): """ assign a function with message parameter to an sync request id, e.g., def thefunction(msg). :param request_id: integer :param function: function reference """ self.__sync_response_dictionary[request_id] = function def send(self, message, time_out=1): self.__ # Check message ID #self.__sendRID = self.__getRequestID(message) #self.__sendEvent.clear() #self.__serialConnection.write(message) #self.__sendEvent.wait(time_out) #self.__sendRID = -1 #return self.__dialityResponsePacket class DialityPacket: @staticmethod def getCRC(message): """ getCRC returns a message with its CRC """ message_cargo_length = message[3] return sum(message[0:4 + message_cargo_length]) % 256 @staticmethod def buildPacket(request_id=0, cargo=bytearray()): """ buildPacket builds a Diality Packet \param \request_id is an integer indicating request ID \cargo is a byte array with cargo. It does not include length """ message = b'\xA5' if 0 <= request_id <= (2**16 - 1): # Make sure an int was passed message += request_id.to_bytes(2, byteorder='big') else: return bytearray() # Check cargo length cargo_length = len(cargo) # if cargo is larger than 255 return nothing if cargo_length <= 255: # cargo has to be a byte array message += cargo_length.to_bytes(1, byteorder='big') else: return bytearray() message += cargo crc = DialityPacket.getCRC(message) message += crc.to_bytes(1, byteorder='big') # crc location message_length = len(message) # message must be multiple of 8 if message_length % 8 != 0: # We need to patch the message with zeros add_these_many_zeros = math.ceil(message_length / 8) * 8 - message_length message += bytearray(add_these_many_zeros) return message if __name__ == "__main__": the_messenger = DialitySerialMessenger(serial_port="/dev/ttyUSB0") the_messenger.start() the_messenger.stop() the_test = DialityPacket.buildPacket(request_id=8000, cargo=b'123') print(the_test) the_test1 = DialityPacket.buildPacket(request_id=8000, cargo=b'12345') print(the_test1)