########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import threading import can import math from time import sleep class LongDialityPacketBuilder: def __init__(self, can_message): """ LongDialityPacketBuilder is a utility object that helps construct a diality packet that is longer than 8 bytes. Basic principle is to construct an object with the first 8 byte message which contains the length of the packet, and the later push the remaining messages. e.g., let's imagine a 3 message packet. obj = LongDialityPacketBuilder(msg1) packet = obj.push(msg2), returns None packet = obj.push(msg3), return the packet which is the concatenation of msg1, msg2 and msg3 :param can_message: an 8 byte message needed to build a Diality packet. :returns object """ self.__message = can_message self.__number_of_can_message_needed = math.ceil((can_message[3] + 4) / 8) self.__number_of_can_message_up_to_now = 1 def push(self, can_message): """ push appends the can_message to the current packet. :param can_message: 8-byte message :return: None if the packet is not completed, otherwise returns the complete packet """ self.__message += can_message self.__number_of_can_message_up_to_now += 1 if self.__number_of_can_message_up_to_now == self.__number_of_can_message_needed: return_message = self.__message self.__message = None return return_message else: return None class DialityCanMessenger: START_BYTE = 0xA5 def __init__(self, can_interface='can0'): """ DialityCanMesseger constructor :param can_interface - string containing the can interface, e.g., 'can0" :returns DialityCanMessenger object """ self.__canConnection = can.interface.Bus(can_interface, bustype='socketcan') self.__sendPacketRequestID = -1 self.__sendEvent = threading.Event() self.__longPacketsBuilders = {} self.__dialityPacket = None self.__dialityResponsePacket = None self.__dialityResponseChannelID = -1 self.__run = False self.__sync_response_dictionary = {} if self.__canConnection is not None: self.__serialListenerThread = threading.Thread(target=self.__listener) else: self.__serialListenerThread = None print("Can connection is not valid") def start(self): """ starts listening to the can interface. """ if self.__canConnection is None: print("Cannot start can listener.") return else: self.__run = True self.__serialListenerThread.start() print("Can listener has started.") def stop(self): """ stop listening the can interface """ self.__run = False print("Can listener has stopped.") def __getRequestID(self, message): return int.from_bytes(message[1:3], byteorder='big', signed=False) def __listener(self): """ listens for diality message on the can interface passed during construction. """ while self.__run: message = self.__canConnection.recv(0.0) if message is not None and message.dlc == 8: # We have received a legit can message of 8 bytes can_data = [b for b in message.data] channel_id = message.arbitration_id packet_length = can_data[3] # We decide what to do with it if can_data[0] == self.START_BYTE and packet_length <= 4: # This is a short packet self.__dialityPacket = can_data # deliver the packet elif can_data[0] == self.START_BYTE and packet_length > 4: # This is the start of a long packet if channel_id not in self.__longPacketsBuilders.keys(): # if we don't have a builder. Create it! self.__longPacketsBuilders[channel_id] = LongDialityPacketBuilder(can_data) self.__dialityPacket = None else: # this is the continuation of a long packet. A builder must have been created if channel_id in self.__longPacketsBuilders.keys(): self.__dialityPacket = self.__longPacketBuilders[channel_id].push(can_data) # At this point we have a complete (long or short) Diality Packet if self.__dialityPacket is not None: dialityPacketRequestID = DialityPacket.getRequestID(self.__dialityPacket) self.__dialityPacket[0] = channel_id # We first check if this is a response to a send request that is pending if dialityPacketRequestID == self.__sendPacketRequestID: self.__dialityResponsePacket = self.__dialityPacket self.__sendEvent.set() self.__sendPacketRequestID = -1 # If it is not, this is a sync packet and we need to call it's register function elif channel_id in self.__sync_response_dictionary.keys() and \ dialityPacketRequestID in self.__sync_response_dictionary[channel_id].keys(): self.__sync_response_dictionary[channel_id][dialityPacketRequestID](self.__dialityPacket) # Done with this packet, let's get the next one self.__dialityPacket = None else: # We have received nothing, let's sleep 1 msec and let's check again sleep(0.001) def registerSyncFunction(self, channel_id, request_id, function): """ assign a function with packet parameter to an sync request id, e.g., def function(packet). :param channel_id: can channel number where messages are received :param request_id: Diality request ID in message :param function: function reference """ # if the channel_id exist, we just update the dictionary for the channel_id if channel_id in self.__sync_response_dictionary.keys(): self.__sync_response_dictionary[channel_id].update({request_id: function}) # otherwise, we need to create the dictionary for the channel_id else: self.__sync_response_dictionary[channel_id] = {request_id: function} def send(self, channel_id, can_packet, time_out=1): """ sends can_packet to channel_id. :param channel_id: CAN channel ID :param can_packet: list of integers with the Diality format :param time_out: time it will wait for a response in seconds :return: Diality Packet, it it times out it returns None """ padded_can_packet = DialityPacket.padPacketWithZeros(can_packet) padded_can_packet_length = padded_can_packet[3] self.__sendPacketRequestID = DialityPacket.getRequestID(can_packet) # A packet can be longer than 8 bytes, so we need to split it # into 8 bytes messages. number_of_messages = math.ceil((padded_can_packet_length + 4) / 8) # We are sending one message at a time on CAN for n in range(number_of_messages): message = padded_can_packet[n * 8:n * 8 + 8] # Sending one message at a time msg = can.Message(arbitration_id=channel_id, data=message, is_extended_id=False) self.__canConnection.send(msg) # Sending self.__dialityResponsePacket = None # After all message has been sent, we clear a flag self.__sendEvent.clear() # At this point, we sleep until the system times out or flag is set didTimeOut = not self.__sendEvent.wait(time_out) # We are ready to send again. Reset request ID appropriately self.__sendPacketRequestID = -1 # This value is None or it has a message depending of the listener return self.__dialityResponsePacket class DialityPacket: @staticmethod def buildPacket(request_id=0, cargo=[]): """ buildPacket builds a Diality Packet :param request_id: is an integer indicating request ID :param cargo: list with cargo. It does not include length """ packet = [DialityCanMessenger.START_BYTE] if 0 <= request_id <= (2 ** 16 - 1): # Make sure an int was passed request_id_in_bytes = request_id.to_bytes(2, byteorder='big') packet += [request_id_in_bytes[0]] packet += [request_id_in_bytes[1]] else: return [] # Check cargo length cargo_length = len(cargo) # if cargo is larger than 255 return nothing if cargo_length <= 255: # cargo has to be a list packet += [cargo_length] else: return [] packet += cargo return DialityPacket.padPacketWithZeros(packet) @staticmethod def padPacketWithZeros(packet): """ returns a packet padded with zeros that guarantees that the packet is a multiple of 8 bytes. :param packet: packet that may or may not be multiple of zeros :return: packet that is 8-byte multiple """ packet_length = len(packet) # message must be multiple of 8 if packet_length % 8 != 0: # We might need to patch the message with zeros add_these_many_zeros = math.ceil(packet_length / 8) * 8 - packet_length packet += [0] * add_these_many_zeros return packet @staticmethod def getRequestID(packet): """ returns request ID from packet :param packet: complete Diality Packet :return: integer with request ID """ return int.from_bytes(packet[1:3], byteorder='big') def print_received_packet(packet, sync=False): channel_id = packet[0] packet[0] = 0xA5 introduction = "Received: " if sync: introduction = "Sync " + introduction print(introduction, packet, " in channel: ", channel_id) def print_to_screen(packet): if packet is None: print("Timeout!!!") else: print_received_packet(packet) def function_for_sync(packet): print_received_packet(packet, sync=True) if __name__ == "__main__": the_messenger = DialityCanMessenger() channel_id = 0x100 received_channel_id = 0x20 received_request_id = 0x100 the_messenger.registerSyncFunction(received_channel_id, received_request_id, function_for_sync) dg_simulator_received_channel_id = 0x01 dg_simulator_sync_req_id = 0x05 dg_simulator_req_id = 0x03 the_messenger.registerSyncFunction(dg_simulator_received_channel_id, dg_simulator_sync_req_id, function_for_sync) the_messenger.start() packet = DialityPacket.buildPacket(0x100, [1]) dg_packet = DialityPacket.buildPacket(dg_simulator_req_id, []) sleep(3.0) print("Sending to board: ", packet, " in channel: ", channel_id) response = the_messenger.send(channel_id, packet) print_to_screen(response) sleep(3.0) print("Sending to board: ", packet, " in channel: ", channel_id) response = the_messenger.send(channel_id, packet) print_to_screen(response) # sleep(3.0) print("Sending to DG simulator: ", dg_packet, " in channel", dg_simulator_received_channel_id, end=" ---> ") response = the_messenger.send(dg_simulator_received_channel_id, dg_packet) print_to_screen(response) sleep(3.0) print("Sending to DG simulator: ", dg_packet, " in channel", dg_simulator_received_channel_id, end=" ---> ") response = the_messenger.send(dg_simulator_received_channel_id, dg_packet) print_to_screen(response) ## the_messenger.stop() ## the_test = DialityPacket.buildPacket(request_id=8000, cargo=b'123') ## print(the_test) ## the_test1 = DialityPacket.buildPacket(request_id=8000, cargo=b'12345') ## print(the_test1)