########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DenaliSerialMessenger.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending and receiving of Denali Messages on # the serial port. # ############################################################################ import threading import can import math import time from time import sleep import sys import argparse import logging class DenaliMessage: BYTE_ORDER = 'little' START_BYTE = 0xA5 START_INDEX = 0 MSG_SEQ_INDEX = 1 MSG_ID_INDEX = 3 PAYLOAD_LENGTH_INDEX = 5 PAYLOAD_START_INDEX = 6 PAYLOAD_LENGTH_FIRST_PACKET = 1 HEADER_LENGTH = 6 PACKET_LENGTH = 8 CRC_LENGTH = 1 MAX_MSG_ID_NUMBER = 65535 MAX_NUMBER_OF_PAYLOAD_BYTES = 254 CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] @staticmethod def build_basic_message(channel_id=0, message=None): """ Builds a basic message dictionary containing the channel id and message \param channel_id: integer with channel id \param message: an array of ints forming the message \return: dictionary with channel_id and message keys """ if message is None: message = [] return {'channel_id': channel_id, 'message': message} @staticmethod def build_message(channel_id=0, message_id=0, payload=None): """ buildPacket builds a Diality Packet \param channel_id: is an integer with channel ID \param message_id: is an integer indicating request ID \param payload: list with payload. It does not include length \return dictionary with channel_id and 8-byte padded message """ if payload is None: payload = [] message_list = [DenaliCanMessenger.START_BYTE] if 0 <= message_id <= DenaliMessage.MAX_MSG_ID_NUMBER: # Add a zero seq # for dialin messages for now seq_no = 0 message_seq_in_bytes = seq_no.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message_list += [message_seq_in_bytes[0]] message_list += [message_seq_in_bytes[1]] # Add message ID as unsigned 16-bit # message_id_in_bytes = message_id.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message_list += [message_id_in_bytes[0]] message_list += [message_id_in_bytes[1]] else: return [] # Check payload length payload_length = len(payload) # if payload is larger than 255 return nothing if payload_length <= DenaliMessage.MAX_NUMBER_OF_PAYLOAD_BYTES: # payload has to be a list message_list += [payload_length] else: return [] message_list += payload # Because CRC does not include first byte, then we pass a list with out it message_list += [DenaliMessage.crc8(message_list[1:])] message_list = DenaliMessage.pad_message_with_zeros(message_list) return DenaliMessage.build_basic_message(channel_id=channel_id, message=message_list) @staticmethod def crc8(message_list): """ Returns the calculated crc from a message list \param message_list: is a list of integer numbers containing the message \return: integer containing a unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = DenaliMessage.CRC_LIST[unsigned_byte] return crc @staticmethod def pad_message_with_zeros(message): """ Returns a packet padded with zeros that guarantees that the packet is a multiple of 8 bytes. \param message: packet that may or may not be multiple of 8 bytes \return: packet that is 8-byte multiple """ message_length = len(message) # message must be multiple of 8 if message_length % DenaliMessage.PACKET_LENGTH != 0: # We need to pad the message with trailing zeros add_these_many_zeros = math.ceil(message_length / DenaliMessage.PACKET_LENGTH) * \ DenaliMessage.PACKET_LENGTH - message_length message += [0] * add_these_many_zeros return message @staticmethod def get_crc(message): """ Gets the CRC in message \param message: Dialin complete message with CRC \return: CRC in message """ crc_index = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) return message['message'][crc_index] @staticmethod def verify_crc(message): """ Verifies message CRC equals calculated message CRC \return: TRUE if CRC matches, FALSE otherwise """ if message is None: return False else: message_list = message['message'] message_length = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) calculated_crc = DenaliMessage.crc8(message_list[1:message_length]) actual_crc = DenaliMessage.get_crc(message) return calculated_crc == actual_crc @staticmethod def get_channel_id(message): """ Returns request ID from message \param message: dictionary with channel_id and message keys \return: integer with channel id """ return message['channel_id'] @staticmethod def get_message_id(message): """ Returns request ID from packet \param message: complete Diality Packet \return: integer with request ID """ msg_id_array = message['message'][DenaliMessage.MSG_ID_INDEX: DenaliMessage.PAYLOAD_LENGTH_INDEX] return int.from_bytes(msg_id_array, byteorder=DenaliMessage.BYTE_ORDER) @staticmethod def get_payload_length(message): """ Returns payload length from message \param message: dictionary with channel_id and message keys \return: a unsigned payload length """ return message['message'][DenaliMessage.PAYLOAD_LENGTH_INDEX] @staticmethod def get_payload(message): """ Returns payload array from message \param message: dictionary with channel_id and message keys \return: a payload array if exist """ payload_length = DenaliMessage.get_payload_length(message) if payload_length == 0: return None else: return message['message'][DenaliMessage.PAYLOAD_START_INDEX:] @staticmethod def get_total_packets(message, is_array=True): """ Returns the number of packets needed to transmit Denali Message \param message: dictionary with channel_id and message keys or raw message \param is_array: True if message is an array and not a dictionary \return: number of packets """ the_message = message if is_array else message['message'] return math.ceil((the_message[DenaliMessage.PAYLOAD_LENGTH_INDEX] + DenaliMessage.HEADER_LENGTH + DenaliMessage.CRC_LENGTH) / DenaliMessage.PACKET_LENGTH) class DenaliChannels: # All the available channels hd_alarm_broadcast_ch_id = 0x001 dg_alarm_broadcast_ch_id = 0x002 ui_alarm_broadcast_ch_id = 0x004 hd_to_dg_ch_id = 0x008 dg_to_hd_ch_id = 0x010 hd_to_ui_ch_id = 0x020 hd_sync_broadcast_ch_id = 0x040 dg_sync_broadcast_ch_id = 0x080 ui_to_hd_ch_id = 0x100 ui_sync_broadcast_ch_id = 0x200 dialin_to_hd_ch_id = 0x400 hd_to_dialin_ch_id = 0x401 dialin_to_dg_ch_id = 0x402 dg_to_dialin_ch_id = 0x403 dialin_to_ui_ch_id = 0x404 ui_to_dialin_ch_id = 0x405 class LongDenaliMessageBuilder: def __init__(self, can_message): """ LongDialityMessageBuilder is a utility object that helps construct a diality message that is longer than 8 bytes. Basic principle is to construct an object with the first 8 byte message which contains the length of the message, and the later push the remaining messages. e.g., let's imagine a 3 message packet. obj = LongDialityMessageBuilder(msg1) message = obj.push(msg2), returns None, message is not complete message = obj.push(msg3), return the packet which is the concatenation of msg1, msg2 and msg3 \param can_message: an 8 byte message needed to build a diality message. \returns object """ self.message = can_message self.number_of_can_packets_needed = DenaliMessage.get_total_packets(can_message) self.number_of_can_packets_up_to_now = 1 def push(self, can_message, first_packet=False): """ push appends the can_message to the current packet. \param can_message: 8-byte message \param first_packet: True if it is the first packet received \return: None if the packet is not completed, otherwise returns the complete packet """ if first_packet: self.message = can_message self.number_of_can_packets_needed = DenaliMessage.get_total_packets(can_message) self.number_of_can_packets_up_to_now = 1 else: self.message += can_message self.number_of_can_packets_up_to_now += 1 if self.number_of_can_packets_up_to_now == self.number_of_can_packets_needed: return_message = self.message self.message = None return return_message else: return None class DenaliCanMessenger: START_BYTE = DenaliMessage.START_BYTE DIALIN_MSG_RESP_TO = 0.1 # number of seconds to wait for a response to a sent command def __init__(self, can_interface='can0', log_level="debug"): """ DenaliCanMessenger constructor \param can_interface - string containing the can interface, e.g., 'can0" \returns DialityCanMessenger object """ self.bus = can.interfaces.socketcan.SocketcanBus(channel=can_interface) self.listener_buffer = can.BufferedReader() self.notifier = can.Notifier(self.bus, [self.listener_buffer]) self.send_packet_request_id = -1 self.send_event = threading.Event() self.long_message_builders = {} self.long_msg_channel_id_set = set() self.messages = None self.last_sent_message = None self.command_response_message = None self.response_channel_id = -1 self.run = False self.sync_response_dictionary = {} numeric_level = getattr(logging, log_level.upper(), None) logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', filename="DialIn.log", datefmt='%m-%d-%Y:%H:%M:%S', level=numeric_level) # DEBUG, INFO, WARNING, ERROR, CRITICAL if self.bus is not None: self.serial_listener_thread = threading.Thread(target=self.listener, daemon=True) else: self.serial_listener_thread = None print_and_log("Can connection is not valid") def start(self): """ starts listening to the can interface. """ if self.bus is None: print_and_log("Cannot start can listener.") return else: self.run = True if self.serial_listener_thread is not None: self.serial_listener_thread.start() print_and_log("Can listener has started.") else: print_and_log("Cannot start listener...") def stop(self): """ Stop listening the can interface """ self.run = False print_and_log("\nCan listener has stopped.") def listener(self): """ Listens for diality message on the can interface passed during construction. """ while self.run: message = self.listener_buffer.get_message(0.0) print_and_log("Message = {}".format(message)) if message is not None: if message.dlc == DenaliMessage.PACKET_LENGTH: # We have received a legit can message of 8 bytes can_data = [b for b in message.data] channel_id = message.arbitration_id message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] print_and_log(str(time.time()) + " " + str(channel_id) + " " + str(can_data), log_level="info") # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: self.messages = self.long_message_builders[channel_id].push(can_data) elif can_data[0] == DenaliMessage.START_BYTE and \ message_length <= DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # This is a short packet # This is the first time that we are building a message self.messages = can_data # deliver the packet elif can_data[0] == self.START_BYTE and \ message_length > DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # Long packet start # We are starting to build a long message, include it in the lonMsgChannelIDSet self.long_msg_channel_id_set.add(channel_id) if channel_id not in self.long_message_builders.keys(): # if we don't have a builder. Create it! self.long_message_builders[channel_id] = LongDenaliMessageBuilder(can_data) self.messages = None else: # if we do have a builder. This is the first time # self.messages = self.long_message_builders[channel_id].push(can_data, first_packet=True) self.long_message_builders[channel_id].push(can_data, first_packet=True) self.messages = None # Do we have a complete (long or short) Denali Message? if self.messages is not None: message_valid = True # assume true for now, set to false if CRC check fails below complete_dialin_message = DenaliMessage.build_basic_message(channel_id=channel_id, message=self.messages) dialin_msg_id = DenaliMessage.get_message_id(complete_dialin_message) dialin_ch_id = DenaliMessage.get_channel_id(complete_dialin_message) if dialin_ch_id in self.long_msg_channel_id_set: # We need to remove channel ID from the long message set self.long_msg_channel_id_set.remove(dialin_ch_id) # Need to verify CRC at this point if DenaliMessage.verify_crc(complete_dialin_message) is False: # if verify is False, let's drop (ignore) this message message_valid = False dialin_ch_id = None dialin_msg_id = None sys.stderr.write( "Incorrect CRC, received message: {}, crc: {}, calculated crc: {}\n".format( self.messages, DenaliMessage.get_crc(complete_dialin_message), DenaliMessage.crc8(self.messages))) if message_valid == True: # We first check if this is a response to a send request that is pending if dialin_msg_id == self.send_packet_request_id: self.command_response_message = complete_dialin_message self.send_event.set() self.send_packet_request_id = -1 # If it is not, this is a publication message and we need to call it's register function elif dialin_ch_id in self.sync_response_dictionary.keys() and \ dialin_msg_id in self.sync_response_dictionary[channel_id].keys(): self.sync_response_dictionary[dialin_ch_id][dialin_msg_id](complete_dialin_message) # Done with this message, let's get the next one self.messages = None else: # no new packets in receive buffer # We have received nothing, let's sleep 1 msec and let's check again sleep(0.01) def register_receiving_publication_function(self, channel_id, message_id, function): """ assign a function with packet parameter to an sync request id, e.g., def function(packet). \param channel_id: can channel number where messages are received \param message_id: Diality request ID in message \param function: function reference """ # if the channel_id exist, we just update the dictionary for the channel_id if channel_id in self.sync_response_dictionary.keys(): self.sync_response_dictionary[channel_id].update({message_id: function}) # otherwise, we need to create the dictionary for the channel_id else: self.sync_response_dictionary[channel_id] = {message_id: function} def send(self, built_message, time_out=DIALIN_MSG_RESP_TO): """ sends can_packet to channel_id. \param built_message: message built using DialinMessage class \param time_out: time it will wait for a response in seconds \returns: Diality Packet, it it times out it returns None """ msg_sent = False # keep trying to send message until we get a response while msg_sent is not True: self.last_sent_message = built_message channel_id = DenaliMessage.get_channel_id(built_message) padded_can_message_array = built_message['message'] self.send_packet_request_id = DenaliMessage.get_message_id(built_message) # A message can be longer than 8 bytes, so we need to split it # into 8 bytes packets. number_of_packets = DenaliMessage.get_total_packets(padded_can_message_array) # We are sending one message at a time on CAN for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] # Sending one packet at a time packet = can.Message(arbitration_id=channel_id, data=packet, is_extended_id=False) self.logger.debug(packet) # input("Press enter to send...") self.bus.send(packet, 0) # 0.1) # Sending self.command_response_message = None # After all message has been sent, we clear a flag self.send_event.clear() # At this point, we sleep until the system times out or flag is set self.send_event.wait(time_out) if self.command_response_message is not None: msg_sent = True elif time_out == 0: msg_sent = True else: # msg_sent = False print_and_log("No response. Re-sending message.") # We are ready to send again. Reset request ID appropriately self.send_packet_request_id = -1 # This value is None or it has a message depending of the listener return self.command_response_message def print_and_log(message, log_level="debug"): """ Prints and logs a message \param message: The message to print and log \param log_level: The logging level, indicates the severity of the message \returns: None """ print(message) if log_level == "debug": logging.debug(message) elif log_level == "info": logging.info(message) elif log_level == "warning": logging.warning(message) elif log_level == "error": logging.error(message) elif log_level == "critical": logging.critical(message) def tst_print_received_packet(message, sync=False): channel_id = message[0] message[0] = DenaliMessage.START_BYTE introduction = "Received: " if sync: introduction = "Sync " + introduction print_and_log(introduction, message, " in channel: ", channel_id) def tst_print_to_screen(message): if message is None: print_and_log("Timeout!!!") else: tst_print_received_packet(message) def tst_function_for_sync(message): tst_print_received_packet(message, sync=True) def tst_print_sending_dg_board(message): print_and_log("Sending to board: ", message['message'], " in channel: ", message['channel_id']) def tst_print_sending_dg_sim(message): print_and_log("Sending to DG simulator: ", message['message'], " in channel", message['channel_id'], end=" ---> ") def tst_dg(log_level): test_messenger = DenaliCanMessenger(log_level=log_level) test_received_channel_id = DenaliChannels.hd_to_ui_ch_id test_received_message_id = 0x100 test_messenger.register_receiving_publication_function(test_received_channel_id, test_received_message_id, tst_function_for_sync) test_dg_simulator_received_channel_id = DenaliChannels.dialin_to_dg_ch_id test_dg_simulator_sync_msg_id = 0x05 test_dg_simulator_msg_id = 0x03 test_messenger.register_receiving_publication_function(test_dg_simulator_received_channel_id, test_dg_simulator_sync_msg_id, tst_function_for_sync) test_messenger.start() test_msg = DenaliMessage.build_message(channel_id=1000, message_id=0x01, payload=[1]) test_dg_msg = DenaliMessage.build_message(channel_id=test_dg_simulator_received_channel_id, message_id=test_dg_simulator_msg_id, payload=[]) sleep(3.0) tst_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) tst_print_to_screen(test_response) sleep(3.0) tst_print_sending_dg_board(test_msg) test_response = test_messenger.send(test_msg) tst_print_to_screen(test_response) sleep(3.0) tst_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) tst_print_to_screen(test_response) sleep(3.0) tst_print_sending_dg_sim(test_dg_msg) test_response = test_messenger.send(test_dg_msg) tst_print_to_screen(test_response) def tst_can(): test_messenger = DenaliCanMessenger() test_messenger.start() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Dial-In Core Can Protocol") parser.add_argument("--test-dg", action="store_true") parser.add_argument("--log-level", default="debug") args = parser.parse_args() if args.test_dg: tst_dg(args.log_level) if len(sys.argv) < 2: parser.print_help()