########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file CoreCANProtocol.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief Classes in this file facilitate sending and receiving of Denali Messages over CAN # ############################################################################ import threading import can import math import time from time import sleep import sys import argparse from argparse import RawTextHelpFormatter import logging class DenaliMessage: BYTE_ORDER = 'little' START_BYTE = 0xA5 START_INDEX = 0 MSG_SEQ_INDEX = 1 MSG_ID_INDEX = 3 PAYLOAD_LENGTH_INDEX = 5 PAYLOAD_START_INDEX = 6 PAYLOAD_LENGTH_FIRST_PACKET = 1 HEADER_LENGTH = 6 PACKET_LENGTH = 8 CRC_LENGTH = 1 MAX_MSG_ID_NUMBER = 65535 MAX_NUMBER_OF_PAYLOAD_BYTES = 254 CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] @staticmethod def build_basic_message(channel_id=0, message=None): """ Builds a basic message dictionary containing the channel id and message \param channel_id: integer with channel id \param message: an array of ints forming the message \return: dictionary with channel_id and message keys """ if message is None: message = [] return {'channel_id': channel_id, 'message': message} @staticmethod def build_message(channel_id=0, message_id=0, payload=None): """ buildPacket builds a Diality Packet \param channel_id: is an integer with channel ID \param message_id: is an integer indicating request ID \param payload: list with payload. It does not include length \return dictionary with channel_id and 8-byte padded message """ if payload is None: payload = [] message_list = [DenaliCanMessenger.START_BYTE] if 0 <= message_id <= DenaliMessage.MAX_MSG_ID_NUMBER: # Add a zero seq # for dialin messages for now seq_no = 0 message_seq_in_bytes = seq_no.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message_list += [message_seq_in_bytes[0]] message_list += [message_seq_in_bytes[1]] # Add message ID as unsigned 16-bit # message_id_in_bytes = message_id.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message_list += [message_id_in_bytes[0]] message_list += [message_id_in_bytes[1]] else: return [] # Check payload length payload_length = len(payload) # if payload is larger than 255 return nothing if payload_length <= DenaliMessage.MAX_NUMBER_OF_PAYLOAD_BYTES: # payload has to be a list message_list += [payload_length] else: return [] message_list += payload # Because CRC does not include first byte, then we pass a list with out it message_list += [DenaliMessage.crc8(message_list[1:])] message_list = DenaliMessage.pad_message_with_zeros(message_list) return DenaliMessage.build_basic_message(channel_id=channel_id, message=message_list) @staticmethod def crc8(message_list): """ Returns the calculated crc from a message list \param message_list: is a list of integer numbers containing the message \return: integer containing a unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = DenaliMessage.CRC_LIST[unsigned_byte] return crc @staticmethod def pad_message_with_zeros(message): """ Returns a packet padded with zeros that guarantees that the packet is a multiple of 8 bytes. \param message: packet that may or may not be multiple of 8 bytes \return: packet that is 8-byte multiple """ message_length = len(message) # message must be multiple of 8 if message_length % DenaliMessage.PACKET_LENGTH != 0: # We need to pad the message with trailing zeros add_these_many_zeros = math.ceil(message_length / DenaliMessage.PACKET_LENGTH) * \ DenaliMessage.PACKET_LENGTH - message_length message += [0] * add_these_many_zeros return message @staticmethod def get_crc(message): """ Gets the CRC in message \param message: Dialin complete message with CRC \return: CRC in message """ crc_index = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) return message['message'][crc_index] @staticmethod def verify_crc(message): """ Verifies message CRC equals calculated message CRC \return: TRUE if CRC matches, FALSE otherwise """ if message is None: return False else: message_list = message['message'] message_length = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) calculated_crc = DenaliMessage.crc8(message_list[1:message_length]) actual_crc = DenaliMessage.get_crc(message) return calculated_crc == actual_crc @staticmethod def get_channel_id(message): """ Returns request ID from message \param message: dictionary with channel_id and message keys \return: integer with channel id """ return message['channel_id'] @staticmethod def get_message_id(message): """ Returns request ID from packet \param message: complete Diality Packet \return: integer with request ID """ msg_id_array = message['message'][DenaliMessage.MSG_ID_INDEX: DenaliMessage.PAYLOAD_LENGTH_INDEX] return int.from_bytes(msg_id_array, byteorder=DenaliMessage.BYTE_ORDER) @staticmethod def get_payload_length(message): """ Returns payload length from message \param message: dictionary with channel_id and message keys \return: a unsigned payload length """ return message['message'][DenaliMessage.PAYLOAD_LENGTH_INDEX] @staticmethod def get_payload(message): """ Returns payload array from message \param message: dictionary with channel_id and message keys \return: a payload array if exist """ payload_length = DenaliMessage.get_payload_length(message) if payload_length == 0: return None else: return message['message'][DenaliMessage.PAYLOAD_START_INDEX:] @staticmethod def get_total_packets(message, is_array=True): """ Returns the number of packets needed to transmit Denali Message \param message: dictionary with channel_id and message keys or raw message \param is_array: True if message is an array and not a dictionary \return: number of packets """ the_message = message if is_array else message['message'] return math.ceil((the_message[DenaliMessage.PAYLOAD_LENGTH_INDEX] + DenaliMessage.HEADER_LENGTH + DenaliMessage.CRC_LENGTH) / DenaliMessage.PACKET_LENGTH) class DenaliChannels: """ Convenience class listing all the possible CAN channels used in the denali system. Updates made to the "Message List.xlsx" document found in the Diality Software Team SharePoint, specifically in the CAN Channels sheet, should also be applied here. """ hd_alarm_broadcast_ch_id = 0x001 dg_alarm_broadcast_ch_id = 0x002 ui_alarm_broadcast_ch_id = 0x004 hd_to_dg_ch_id = 0x008 dg_to_hd_ch_id = 0x010 hd_to_ui_ch_id = 0x020 hd_sync_broadcast_ch_id = 0x040 dg_to_ui_ch_id = 0x070 dg_sync_broadcast_ch_id = 0x080 ui_to_hd_ch_id = 0x100 ui_sync_broadcast_ch_id = 0x200 dialin_to_hd_ch_id = 0x400 hd_to_dialin_ch_id = 0x401 dialin_to_dg_ch_id = 0x402 dg_to_dialin_ch_id = 0x403 dialin_to_ui_ch_id = 0x404 ui_to_dialin_ch_id = 0x405 class LongDenaliMessageBuilder: def __init__(self, can_message): """ LongDialityMessageBuilder is a utility object that helps construct a diality message that is longer than 8 bytes. Basic principle is to construct an object with the first 8 byte message which contains the length of the message, and the later push the remaining messages. e.g., let's imagine a 3 message packet. obj = LongDialityMessageBuilder(msg1) message = obj.push(msg2), returns None, message is not complete message = obj.push(msg3), return the packet which is the concatenation of msg1, msg2 and msg3 \param can_message: an 8 byte message needed to build a diality message. \returns object """ self.message = can_message self.number_of_can_packets_needed = DenaliMessage.get_total_packets(can_message) self.number_of_can_packets_up_to_now = 1 def push(self, can_message, first_packet=False): """ push appends the can_message to the current packet. \param can_message: 8-byte message \param first_packet: True if it is the first packet received \return: None if the packet is not completed, otherwise returns the complete packet """ if first_packet: self.message = can_message self.number_of_can_packets_needed = DenaliMessage.get_total_packets(can_message) self.number_of_can_packets_up_to_now = 1 else: self.message += can_message self.number_of_can_packets_up_to_now += 1 if self.number_of_can_packets_up_to_now == self.number_of_can_packets_needed: return_message = self.message self.message = None return return_message else: return None class DenaliCanMessenger: START_BYTE = DenaliMessage.START_BYTE DIALIN_MSG_RESP_TO = 0.1 # number of seconds to wait for a response to a sent command def __init__(self, can_interface='can0', log_level="error"): """ DenaliCanMessenger constructor \param can_interface - string containing the can interface, e.g., 'can0" \returns DialityCanMessenger object """ self.bus = can.interfaces.socketcan.SocketcanBus(channel=can_interface) self.listener_buffer = can.BufferedReader() self.notifier = can.Notifier(self.bus, [self.listener_buffer]) self.send_packet_request_id = -1 self.send_event = threading.Event() self.long_message_builders = {} self.long_msg_channel_id_set = set() self.messages = None self.last_sent_message = None self.command_response_message = None self.response_channel_id = -1 self.run = False self.sync_response_dictionary = {} numeric_level = getattr(logging, log_level.upper(), logging.ERROR) logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', filename="DialIn.log", datefmt='%m-%d-%Y:%H:%M:%S', level=numeric_level) # DEBUG, INFO, WARNING, ERROR, CRITICAL if self.bus is not None: self.serial_listener_thread = threading.Thread(target=self.listener, daemon=True) else: self.serial_listener_thread = None print_and_log("Can connection is not valid") def start(self): """ starts listening to the can interface. """ if self.bus is None: print_and_log("Cannot start can listener.") return else: self.run = True if self.serial_listener_thread is not None: self.serial_listener_thread.start() print_and_log("Can listener has started.") else: print_and_log("Cannot start listener...") def stop(self): """ Stop listening the can interface """ self.run = False print_and_log("\nCan listener has stopped.") def listener(self): """ Listens for diality message on the can interface passed during construction. """ while self.run: message = self.listener_buffer.get_message(0.0) print_and_log("Message = {}".format(message)) if message is not None: if message.dlc == DenaliMessage.PACKET_LENGTH: # We have received a legit can message of 8 bytes can_data = [b for b in message.data] channel_id = message.arbitration_id message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] print_and_log(str(time.time()) + " " + str(channel_id) + " " + str(can_data), log_level=logging.INFO) # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: self.messages = self.long_message_builders[channel_id].push(can_data) elif can_data[0] == DenaliMessage.START_BYTE and \ message_length <= DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # This is a short packet # This is the first time that we are building a message self.messages = can_data # deliver the packet elif can_data[0] == self.START_BYTE and \ message_length > DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # Long packet start # We are starting to build a long message, include it in the lonMsgChannelIDSet self.long_msg_channel_id_set.add(channel_id) if channel_id not in self.long_message_builders.keys(): # if we don't have a builder. Create it! self.long_message_builders[channel_id] = LongDenaliMessageBuilder(can_data) self.messages = None else: # if we do have a builder. This is the first time # self.messages = self.long_message_builders[channel_id].push(can_data, first_packet=True) self.long_message_builders[channel_id].push(can_data, first_packet=True) self.messages = None # Do we have a complete (long or short) Denali Message? if self.messages is not None: message_valid = True # assume true for now, set to false if CRC check fails below complete_dialin_message = DenaliMessage.build_basic_message(channel_id=channel_id, message=self.messages) dialin_msg_id = DenaliMessage.get_message_id(complete_dialin_message) dialin_ch_id = DenaliMessage.get_channel_id(complete_dialin_message) if dialin_ch_id in self.long_msg_channel_id_set: # We need to remove channel ID from the long message set self.long_msg_channel_id_set.remove(dialin_ch_id) # Need to verify CRC at this point if DenaliMessage.verify_crc(complete_dialin_message) is False: # if verify is False, let's drop (ignore) this message message_valid = False dialin_ch_id = None dialin_msg_id = None sys.stderr.write( "Incorrect CRC, received message: {}, crc: {}, calculated crc: {}\n".format( self.messages, DenaliMessage.get_crc(complete_dialin_message), DenaliMessage.crc8(self.messages))) if message_valid == True: # We first check if this is a response to a send request that is pending if dialin_msg_id == self.send_packet_request_id: self.command_response_message = complete_dialin_message self.send_event.set() self.send_packet_request_id = -1 # If it is not, this is a publication message and we need to call it's register function elif dialin_ch_id in self.sync_response_dictionary.keys() and \ dialin_msg_id in self.sync_response_dictionary[channel_id].keys(): self.sync_response_dictionary[dialin_ch_id][dialin_msg_id](complete_dialin_message) # Done with this message, let's get the next one self.messages = None else: # no new packets in receive buffer # We have received nothing, let's sleep 1 msec and let's check again sleep(0.01) def register_receiving_publication_function(self, channel_id, message_id, function): """ assign a function with packet parameter to an sync request id, e.g., def function(packet). \param channel_id: can channel number where messages are received \param message_id: Diality request ID in message \param function: function reference """ # if the channel_id exist, we just update the dictionary for the channel_id if channel_id in self.sync_response_dictionary.keys(): self.sync_response_dictionary[channel_id].update({message_id: function}) # otherwise, we need to create the dictionary for the channel_id else: self.sync_response_dictionary[channel_id] = {message_id: function} def send(self, built_message, time_out=DIALIN_MSG_RESP_TO): """ sends can_packet to channel_id. \param built_message: message built using DialinMessage class \param time_out: time it will wait for a response in seconds \returns: Diality Packet, it it times out it returns None """ msg_sent = False # keep trying to send message until we get a response while msg_sent is not True: self.last_sent_message = built_message channel_id = DenaliMessage.get_channel_id(built_message) padded_can_message_array = built_message['message'] self.send_packet_request_id = DenaliMessage.get_message_id(built_message) # A message can be longer than 8 bytes, so we need to split it # into 8 bytes packets. number_of_packets = DenaliMessage.get_total_packets(padded_can_message_array) # We are sending one message at a time on CAN for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] # Sending one packet at a time packet = can.Message(arbitration_id=channel_id, data=packet, is_extended_id=False) print_and_log(packet) self.bus.send(packet, 0) # 0.1) # Sending self.command_response_message = None # After all message has been sent, we clear a flag self.send_event.clear() # At this point, we sleep until the system times out or flag is set self.send_event.wait(time_out) if self.command_response_message is not None: msg_sent = True elif time_out == 0: msg_sent = True else: # msg_sent = False print_and_log("No response. Re-sending message.") # We are ready to send again. Reset request ID appropriately self.send_packet_request_id = -1 # This value is None or it has a message depending of the listener return self.command_response_message def print_and_log(message, log_level=logging.DEBUG): """ Prints a message if its severity is >= the current log level. Also logs the message. \param message: The message to print and log \param log_level: The logging level, indicates the severity of the message \returns: None """ if logging.getLogger().getEffectiveLevel() <= log_level: print(message) if log_level == "debug": logging.debug(message) elif log_level == "info": logging.info(message) elif log_level == "warning": logging.warning(message) elif log_level == "error": logging.error(message) elif log_level == "critical": logging.critical(message) def test_print_received_packet(self, message, sync=False): channel_id = message[0] message[0] = DenaliMessage.START_BYTE introduction = "Received: " if sync: introduction = "Sync " + introduction print_and_log("{0} {1} in channel: {3}".format(introduction, message, channel_id)) def test_print_to_screen(message): if message is None: print_and_log("Timeout!!!") else: test_print_received_packet(message) def test_function_for_sync(message): test_print_received_packet(message, sync=True) def test_print_sending_dg_board(message): print_and_log("Sending to board: {0} on channel: {1}".format(message['message'], message['channel_id'])) def test_print_sending_dg_sim(message): print_and_log("Sending to DG simulator: {0} on channel {1}".format(message["message"], message["channel_id"])) def test_dg(log_level): test_messenger = DenaliCanMessenger(log_level=log_level) test_received_channel_id = DenaliChannels.hd_to_ui_ch_id test_received_message_id = 0x100 test_messenger.register_receiving_publication_function(test_received_channel_id, test_received_message_id, test_function_for_sync) test_dg_simulator_received_channel_id = DenaliChannels.dialin_to_dg_ch_id test_dg_simulator_sync_msg_id = 0x05 test_dg_simulator_msg_id = 0x03 test_messenger.register_receiving_publication_function(test_dg_simulator_received_channel_id, test_dg_simulator_sync_msg_id, test_function_for_sync) test_messenger.start() test_msg = DenaliMessage.build_message(channel_id=1000, message_id=0x01, payload=[1]) test_dg_msg = DenaliMessage.build_message(channel_id=test_dg_simulator_received_channel_id, message_id=test_dg_simulator_msg_id, payload=[]) sleep(3.0) test_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) test_print_to_screen(test_response) sleep(3.0) test_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) test_print_to_screen(test_response) def test_can(self): # TODO: Parse received input and setup as a sender test_messenger = DenaliCanMessenger() test_messenger.start() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Dial-In Core Can Protocol \n" "\tExample: \n" "\tpython3 CoreCanProtocol.py --test-dg --log-level=\"debug\"", formatter_class=RawTextHelpFormatter) parser.add_argument("--test-dg", action="store_true") parser.add_argument("--log-level", default="error") args = parser.parse_args() if args.test_dg: test_dg(args.log_level) if len(sys.argv) < 2: parser.print_help()