########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import threading import can import math from time import sleep class DenaliMessage: BYTE_ORDER = 'little' START_BYTE = 0xA5 START_INDEX = 0 MSG_ID_INDEX = 1 PAYLOAD_LENGTH_INDEX = 3 PAYLOAD_START_INDEX = 4 PAYLOAD_LENGTH_FIRST_PACKET = 4 HEADER_LENGTH = 4 PACKET_LENGTH = 8 @staticmethod def buildBasicMessage(channel_id=0, message=None): """ build a message using channel_id and dialin message :param channel_id: integer with channel id :param message: array of int with message :return: dictionary with channel_id and message keys """ if message is None: message = [] return {'channel_id': channel_id, 'message': message} @staticmethod def buildMessage(channel_id=0, message_id=0, payload=None): """ buildPacket builds a Diality Packet :param channel_id: is an integer with channel ID :param message_id: is an integer indicating request ID :param payload: list with payload. It does not include length :return dictionary with channel_id and 8-byte padded message """ if payload is None: payload = [] message = [DenaliCanMessenger.START_BYTE] if 0 <= message_id <= (2 ** 16 - 1): # Make sure an unsigned int was passed message_id_in_bytes = message_id.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message += [message_id_in_bytes[0]] message += [message_id_in_bytes[1]] else: return [] # Check payload length payload_length = len(payload) # if payload is larger than 255 return nothing if payload_length <= 255: # payload has to be a list message += [payload_length] else: return [] message += payload message = DenaliMessage.__padMessageWithZeros(message) return DenaliMessage.buildBasicMessage(channel_id=channel_id, message=message) @staticmethod def __padMessageWithZeros(message): """ returns a packet padded with zeros that guarantees that the packet is a multiple of 8 bytes. :param message: packet that may or may not be multiple of 8 bytes :return: packet that is 8-byte multiple """ message_length = len(message) # message must be multiple of 8 if message_length % DenaliMessage.PACKET_LENGTH != 0: # We need to pad the message with trailing zeros add_these_many_zeros = math.ceil(message_length / DenaliMessage.PACKET_LENGTH) * \ DenaliMessage.PACKET_LENGTH - message_length message += [0] * add_these_many_zeros return message @staticmethod def getChannelID(message): """ returns request ID from message :param message: dictionary with channel_id and message keys :return: integer with channel id """ return message['channel_id'] @staticmethod def getMessageID(message): """ returns request ID from packet :param message: complete Diality Packet :return: integer with request ID """ msg_id_array = message['message'][DenaliMessage.MSG_ID_INDEX: DenaliMessage.PAYLOAD_LENGTH_INDEX] return int.from_bytes(msg_id_array, byteorder=DenaliMessage.BYTE_ORDER) @staticmethod def getPayloadLength(message): """ returns payload length from message :param message: dictionary with channel_id and message keys :return: a unsigned payload length """ return message['message'][DenaliMessage.PAYLOAD_LENGTH_INDEX] @staticmethod def getPayload(message): """ returns payload array from message :param message: dictionary with channel_id and message keys :return: a payload array if exist """ payload_length = DenaliMessage.getPayloadLength(message) if payload_length == 0: return None else: return message['message'][DenaliMessage.PAYLOAD_START_INDEX:] @staticmethod def getTotalPackets(message, is_array=True): """ returns the number of packets needed to transmit Denali Message :param message: dictionary with channel_id and message keys or raw message :param is_array: True if message is an array and not a dictionary :return: number of packets """ the_message = message if is_array else message['message'] return math.ceil((the_message[DenaliMessage.PAYLOAD_LENGTH_INDEX] + DenaliMessage.HEADER_LENGTH) / DenaliMessage.PACKET_LENGTH) class DenaliChannels: # This are all the channels available hd_alarm_broadcast_ch_id = 0x001 dg_alarm_broadcast_ch_id = 0x002 ui_alarm_broadcast_ch_id = 0x004 hd_to_dg_ch_id = 0x008 dg_to_hd_ch_id = 0x010 hd_to_ui_ch_id = 0x020 hd_sync_broadcast_ch_id = 0x040 dg_sync_broadcast_ch_id = 0x080 ui_to_hd_ch_id = 0x100 ui_sync_broadcast_ch_id = 0x200 dialin_to_hd_ch_id = 0x400 hd_to_dialin_ch_id = 0x401 dialin_to_dg_ch_id = 0x402 dg_to_dialin_ch_id = 0x403 dialin_to_ui_ch_id = 0x404 ui_to_dialin_ch_id = 0x405 class LongDenaliMessageBuilder: def __init__(self, can_message): """ LongDialityMessageBuilder is a utility object that helps construct a diality message that is longer than 8 bytes. Basic principle is to construct an object with the first 8 byte message which contains the length of the message, and the later push the remaining messages. e.g., let's imagine a 3 message packet. obj = LongDialityMessageBuilder(msg1) message = obj.push(msg2), returns None, message is not complete message = obj.push(msg3), return the packet which is the concatenation of msg1, msg2 and msg3 :param can_message: an 8 byte message needed to build a diality message. :returns object """ self.__message = can_message self.__number_of_can_packets_needed = DenaliMessage.getTotalPackets(can_message) self.__number_of_can_packets_up_to_now = 1 def push(self, can_message, first_packet=False): """ push appends the can_message to the current packet. :param can_message: 8-byte message :param first_packet: True if it is the first packet received :return: None if the packet is not completed, otherwise returns the complete packet """ if first_packet: self.__message = can_message self.__number_of_can_packets_needed = DenaliMessage.getTotalPackets(can_message) self.__number_of_can_packets_up_to_now = 1 else: self.__message += can_message self.__number_of_can_packets_up_to_now += 1 if self.__number_of_can_packets_up_to_now == self.__number_of_can_packets_needed: return_message = self.__message self.__message = None return return_message else: return None class DenaliCanMessenger: START_BYTE = DenaliMessage.START_BYTE def __init__(self, can_interface='can0'): """ DenaliCanMessenger constructor :param can_interface - string containing the can interface, e.g., 'can0" :returns DialityCanMessenger object """ self.__bus = can.interfaces.socketcan.SocketcanBus(channel=can_interface) self.__listener_buffer = can.BufferedReader() self.__notifier = can.Notifier(self.__bus, [self.__listener_buffer]) self.__sendPacketRequestID = -1 self.__sendEvent = threading.Event() self.__longMessageBuilders = {} self.__longMsgChannelIDSet = set() self.__dialinMessage = None self.__dialinCommandResponseMessage = None self.__dialinResponseChannelID = -1 self.__run = False self.__sync_response_dictionary = {} if self.__bus is not None: self.__serialListenerThread = threading.Thread(target=self.__listener, daemon=True) else: self.__serialListenerThread = None print("Can connection is not valid") def start(self): """ starts listening to the can interface. """ if self.__bus is None: print("Cannot start can listener.") return else: self.__run = True self.__serialListenerThread.start() print("Can listener has started.") def stop(self): """ stop listening the can interface """ self.__run = False print("\nCan listener has stopped.") def __listener(self): """ listens for diality message on the can interface passed during construction. """ while self.__run: message = self.__listener_buffer.get_message(0.0) if message is not None and message.dlc == DenaliMessage.PACKET_LENGTH: # We have received a legit can message of 8 bytes can_data = [b for b in message.data] channel_id = message.arbitration_id message_length = can_data[3] # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.__longMsgChannelIDSet: self.__dialinMessage = self.__longMessageBuilders[channel_id].push(can_data) elif can_data[0] == self.START_BYTE and \ message_length <= DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # This is a short packet # This is the first time that we are building a message self.__dialinMessage = can_data # deliver the packet elif can_data[0] == self.START_BYTE and \ message_length > DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # Long packet start # We are starting to build a long message, include it in the lonMsgChannelIDSet self.__longMsgChannelIDSet.add(channel_id) if channel_id not in self.__longMessageBuilders.keys(): # if we don't have a builder. Create it! self.__longMessageBuilders[channel_id] = LongDenaliMessageBuilder(can_data) self.__dialinMessage = None else: # if we do have a builder. This is the first time self.__dialinMessage = self.__longMessageBuilders[channel_id].push(can_data, first_packet=True) # At this point we have a complete (long or short) Diality Packet if self.__dialinMessage is not None: completeDialinMessage = DenaliMessage.buildBasicMessage(channel_id=channel_id, message=self.__dialinMessage) dialin_msg_id = DenaliMessage.getMessageID(completeDialinMessage) dialin_ch_id = DenaliMessage.getChannelID(completeDialinMessage) if dialin_ch_id in self.__longMsgChannelIDSet: # We need to remove channel ID from the long message set self.__longMsgChannelIDSet.remove(dialin_ch_id) # We first check if this is a response to a send request that is pending if dialin_msg_id == self.__sendPacketRequestID: self.__dialinCommandResponseMessage = completeDialinMessage self.__sendEvent.set() self.__sendPacketRequestID = -1 # If it is not, this is a publication message and we need to call it's register function elif dialin_ch_id in self.__sync_response_dictionary.keys() and \ dialin_msg_id in self.__sync_response_dictionary[channel_id].keys(): self.__sync_response_dictionary[dialin_ch_id][dialin_msg_id](completeDialinMessage) # Done with this message, let's get the next one self.__dialinMessage = None else: # We have received nothing, let's sleep 1 msec and let's check again sleep(0.001) def registerReceivingPublicationFunction(self, channel_id, message_id, function): """ assign a function with packet parameter to an sync request id, e.g., def function(packet). :param channel_id: can channel number where messages are received :param message_id: Diality request ID in message :param function: function reference """ # if the channel_id exist, we just update the dictionary for the channel_id if channel_id in self.__sync_response_dictionary.keys(): self.__sync_response_dictionary[channel_id].update({message_id: function}) # otherwise, we need to create the dictionary for the channel_id else: self.__sync_response_dictionary[channel_id] = {message_id: function} def send(self, built_message, time_out=1): """ sends can_packet to channel_id. :param built_message: message built using DialinMessage class :param time_out: time it will wait for a response in seconds :returns: Diality Packet, it it times out it returns None """ channel_id = DenaliMessage.getChannelID(built_message) padded_can_message_array = built_message['message'] self.__sendPacketRequestID = DenaliMessage.getMessageID(built_message) # A message can be longer than 8 bytes, so we need to split it # into 8 bytes packets. number_of_packets = DenaliMessage.getTotalPackets(padded_can_message_array) # We are sending one message at a time on CAN for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] # Sending one packet at a time packet = can.Message(arbitration_id=channel_id, data=packet, is_extended_id=False) self.__bus.send(packet) # Sending self.__dialinCommandResponseMessage = None # After all message has been sent, we clear a flag self.__sendEvent.clear() # At this point, we sleep until the system times out or flag is set self.__sendEvent.wait(time_out) # We are ready to send again. Reset request ID appropriately self.__sendPacketRequestID = -1 # This value is None or it has a message depending of the listener return self.__dialinCommandResponseMessage def test_print_received_packet(message, sync=False): channel_id = message[0] message[0] = DenaliMessage.START_BYTE introduction = "Received: " if sync: introduction = "Sync " + introduction print(introduction, message, " in channel: ", channel_id) def test_print_to_screen(message): if message is None: print("Timeout!!!") else: test_print_received_packet(message) def test_function_for_sync(message): test_print_received_packet(message, sync=True) def test_print_sending_dg_board(message): print("Sending to board: ", message['message'], " in channel: ", message['channel_id']) def test_print_sending_dg_sim(message): print("Sending to DG simulator: ", message['message'], " in channel", message['channel_id'], end=" ---> ") if __name__ == "__main__": test_messenger = DenaliCanMessenger() test_channel_id = DenaliChannels.ui_to_hd_ch_id test_received_channel_id = DenaliChannels.hd_to_ui_ch_id test_received_message_id = 0x100 test_messenger.registerReceivingPublicationFunction(test_received_channel_id, test_received_message_id, test_function_for_sync) test_dg_simulator_received_channel_id = DenaliChannels.dialin_to_dg_ch_id test_dg_simulator_sync_msg_id = 0x05 test_dg_simulator_msg_id = 0x03 test_messenger.registerReceivingPublicationFunction(test_dg_simulator_received_channel_id, test_dg_simulator_sync_msg_id, test_function_for_sync) test_messenger.start() test_msg = DenaliMessage.buildMessage(channel_id=1000, message_id=0x01, payload=[1]) test_dg_msg = DenaliMessage.buildMessage(channel_id=test_dg_simulator_received_channel_id, message_id=test_dg_simulator_msg_id, payload=[]) sleep(3.0) test_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) test_print_to_screen(test_response)