########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file CAN.py # # @author (last) Micahel Garthwaite # @date (last) 30-Jun-2023 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import threading import time from collections import deque import asyncio from typing import Callable import can from can.interfaces import socketcan import math from time import sleep from datetime import datetime import sys from logging import Logger import struct from .. import common from ..common import MsgIds from ..utils import SingletonMeta, IntervalTimer from concurrent.futures import ThreadPoolExecutor import os class DenaliMessage: BYTE_ORDER = 'little' START_BYTE = 0xA5 START_INDEX = 0 MSG_SEQ_INDEX = 1 MSG_ID_INDEX = 3 PAYLOAD_LENGTH_INDEX = 5 PAYLOAD_START_INDEX = 6 PAYLOAD_LENGTH_FIRST_PACKET = 1 HEADER_LENGTH = 6 PACKET_LENGTH = 8 CRC_LENGTH = 1 MAX_MSG_ID_NUMBER = 65535 MAX_NUMBER_OF_PAYLOAD_BYTES = 254 CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] _seq_num = 1 @staticmethod def build_basic_message(channel_id=0, message=None): """ Builds a basic message dictionary containing the channel id and message @param channel_id: (int) indicates the channel @param message: (list) integers forming the message @return:: dictionary with channel_id and message keys """ if message is None: message = [] return {'channel_id': channel_id, 'message': message} @classmethod def build_message(cls, channel_id=0, message_id=0, payload=None, seq=None): """ Builds a Denali message @param channel_id: (int) indicates the channel @param message_id: (int) indicating the request type @param payload: (list) contains the payload @param seq: (int) Overrides current sequence number if set @return:: dictionary with channel_id and 8-byte padded message """ if payload is None: payload = [] message_list = [DenaliCanMessenger.START_BYTE] if 0 <= message_id <= DenaliMessage.MAX_MSG_ID_NUMBER: if seq is None: # Wrap sequence number if it hits a max int16 if cls._seq_num >= 32767: cls._seq_num = 1 seq = cls._seq_num if message_id not in common.msg_defs.ACK_NOT_REQUIRED: seq *= -1 message_seq_in_bytes = seq.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER, signed=True) message_list += [message_seq_in_bytes[0]] message_list += [message_seq_in_bytes[1]] # Add message ID as unsigned 16-bit # message_id_in_bytes = message_id.to_bytes(2, byteorder=DenaliMessage.BYTE_ORDER) message_list += [message_id_in_bytes[0]] message_list += [message_id_in_bytes[1]] cls._seq_num += 1 else: return [] # Check payload length payload_length = len(payload) # if payload is larger than 255 return nothing if payload_length <= DenaliMessage.MAX_NUMBER_OF_PAYLOAD_BYTES: # payload has to be a list message_list += [payload_length] else: return [] message_list += payload # Because CRC does not include first byte, then we pass a list with out it message_list += [DenaliMessage.crc8(message_list[1:])] message_list = DenaliMessage.pad_message_with_zeros(message_list) return DenaliMessage.build_basic_message(channel_id=channel_id, message=message_list) @staticmethod def crc8(message_list): """ Returns the calculated crc from a message list @param message_list: is a list of integer numbers containing the message @return:: integer containing a unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = DenaliMessage.CRC_LIST[unsigned_byte] return crc @staticmethod def pad_message_with_zeros(message): """ Returns a packet padded with zeros that guarantees that the packet is a multiple of 8 bytes. @param message: packet that may or may not be multiple of 8 bytes @return:: packet that is 8-byte multiple """ message_length = len(message) # message must be multiple of 8 if message_length % DenaliMessage.PACKET_LENGTH != 0: # We need to pad the message with trailing zeros add_these_many_zeros = math.ceil(message_length / DenaliMessage.PACKET_LENGTH) * \ DenaliMessage.PACKET_LENGTH - message_length message += [0] * add_these_many_zeros return message @staticmethod def get_crc(message): """ Gets the CRC in message @param message: Dialin complete message with CRC @return:: CRC in message """ crc_index = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) return message['message'][crc_index] @staticmethod def verify_crc(message): """ Verifies message CRC equals calculated message CRC @return:: TRUE if CRC matches, FALSE otherwise """ if message is None: return False else: message_list = message['message'] message_length = DenaliMessage.PAYLOAD_START_INDEX + DenaliMessage.get_payload_length(message) calculated_crc = DenaliMessage.crc8(message_list[1:message_length]) actual_crc = DenaliMessage.get_crc(message) return calculated_crc == actual_crc @staticmethod def get_channel_id(message: dict) -> int: """ Returns request ID from message @param message: dictionary with channel_id and message keys @return:: integer with channel id """ return message['channel_id'] @staticmethod def get_sequence_number(message: dict) -> int: """ Returns sequence number from the message @param message: dictionary containing the message @return:: (int) the sequence number """ seq = message['message'][DenaliMessage.MSG_SEQ_INDEX:DenaliMessage.MSG_ID_INDEX] return int.from_bytes(seq, byteorder=DenaliMessage.BYTE_ORDER, signed=True) @staticmethod def create_ack_message(message: dict, passive_mode: bool = True): """ Negates the sequence number and replaces the original message's sequence number with the negated sequence number to create the ACK message. @param message: (dict) a complete leahi_dialin message @param passive_mode: (dict) true if in passive mode, false otherwise @return: (dict) ACK message for the input message """ message = message.copy() seq = struct.unpack('h', bytearray( message['message'][DenaliMessage.MSG_SEQ_INDEX:DenaliMessage.MSG_ID_INDEX]))[0] # send back empty payload since this is an ACK payload = bytearray() channel_id_rx = DenaliMessage.get_channel_id(message) if passive_mode: channel_rx_tx_pairs = { DenaliChannels.td_alarm_broadcast_ch_id: None, DenaliChannels.dd_alarm_broadcast_ch_id: None, DenaliChannels.fp_alarm_broadcast_ch_id: None, DenaliChannels.ui_alarm_broadcast_ch_id: None, DenaliChannels.td_to_dd_ch_id: None, DenaliChannels.dd_to_td_ch_id: None, DenaliChannels.td_to_ui_ch_id: None, DenaliChannels.td_sync_broadcast_ch_id: None, DenaliChannels.dd_sync_broadcast_ch_id: None, DenaliChannels.fp_sync_broadcast_ch_id: None, DenaliChannels.ui_to_td_ch_id: None, DenaliChannels.ui_sync_broadcast_ch_id: None, DenaliChannels.td_to_dialin_ch_id: DenaliChannels.dialin_to_td_ch_id, DenaliChannels.dd_to_dialin_ch_id: DenaliChannels.dialin_to_dd_ch_id, DenaliChannels.fp_to_dialin_ch_id: DenaliChannels.dialin_to_fp_ch_id, DenaliChannels.ui_to_dialin_ch_id: DenaliChannels.dialin_to_ui_ch_id } else: channel_rx_tx_pairs = { DenaliChannels.td_alarm_broadcast_ch_id: None, DenaliChannels.dd_alarm_broadcast_ch_id: None, DenaliChannels.fp_alarm_broadcast_ch_id: None, DenaliChannels.ui_alarm_broadcast_ch_id: None, DenaliChannels.td_to_dd_ch_id: DenaliChannels.dialin_to_td_ch_id, DenaliChannels.dd_to_td_ch_id: DenaliChannels.dialin_to_dd_ch_id, DenaliChannels.td_to_ui_ch_id: DenaliChannels.dialin_to_td_ch_id, DenaliChannels.td_sync_broadcast_ch_id: DenaliChannels.dialin_to_td_ch_id, DenaliChannels.dd_sync_broadcast_ch_id: DenaliChannels.dialin_to_dd_ch_id, DenaliChannels.fp_sync_broadcast_ch_id: DenaliChannels.dialin_to_fp_ch_id, DenaliChannels.ui_to_td_ch_id: DenaliChannels.td_to_ui_ch_id, DenaliChannels.ui_sync_broadcast_ch_id: DenaliChannels.dialin_to_ui_ch_id, DenaliChannels.td_to_dialin_ch_id: DenaliChannels.dialin_to_td_ch_id, DenaliChannels.dd_to_dialin_ch_id: DenaliChannels.dialin_to_dd_ch_id, DenaliChannels.ui_to_dialin_ch_id: DenaliChannels.dialin_to_ui_ch_id } channel_id_tx = channel_rx_tx_pairs.get(channel_id_rx, None) if channel_id_tx is None: return None message = DenaliMessage.build_message( channel_id=channel_id_tx, message_id=common.msg_defs.MsgIds.MSG_ID_ACK_MESSAGE_THAT_REQUIRES_ACK.value, payload=payload, seq=-seq) return message @staticmethod def get_message_id(message): """ Returns request ID from packet @param message: complete Diality Packet @return:: integer with request ID """ msg_id_array = message['message'][DenaliMessage.MSG_ID_INDEX: DenaliMessage.PAYLOAD_LENGTH_INDEX] return int.from_bytes(msg_id_array, DenaliMessage.BYTE_ORDER) @staticmethod def get_message_id_xstr(message): """ Returns request ID from packet in hex string @param message: complete Diality Packet @return:: integer with request ID """ msg_id = "" for index in range(DenaliMessage.MSG_ID_INDEX, DenaliMessage.PAYLOAD_LENGTH_INDEX): msg_id += "{0:02X}" .format(message['message'][index]) return msg_id @staticmethod def get_payload_length(message): """ Returns payload length from message @param message: dictionary with channel_id and message keys @return:: a unsigned payload length """ return message['message'][DenaliMessage.PAYLOAD_LENGTH_INDEX] @staticmethod def get_payload(message): """ Returns payload array from message @param message: dictionary with channel_id and message keys @return:: a payload array if exist """ payload_length = DenaliMessage.get_payload_length(message) if payload_length == 0: return None else: return message['message'][DenaliMessage.PAYLOAD_START_INDEX:] @staticmethod def get_total_packets(message, is_array=True): """ Returns the number of packets needed to transmit Denali Message @param message: dictionary with channel_id and message keys or raw message @param is_array: True if message is an array and not a dictionary @return:: number of packets """ the_message = message if is_array else message['message'] return math.ceil((the_message[DenaliMessage.PAYLOAD_LENGTH_INDEX] + DenaliMessage.HEADER_LENGTH + DenaliMessage.CRC_LENGTH) / DenaliMessage.PACKET_LENGTH) class DenaliChannels: """ Convenience class listing all the possible CAN channels used in the Denali system. Updates made to the "Message List.xlsx" document found in the Diality Software Team SharePoint, specifically in the CAN Channels sheet, should also be applied here. """ td_alarm_broadcast_ch_id = 0x001 dd_alarm_broadcast_ch_id = 0x002 fp_alarm_broadcast_ch_id = 0x004 ui_alarm_broadcast_ch_id = 0x008 td_to_dd_ch_id = 0x010 dd_to_td_ch_id = 0x011 dd_to_fp_ch_id = 0x020 fp_to_dd_ch_id = 0x021 td_to_ui_ch_id = 0x040 ui_to_td_ch_id = 0x041 td_sync_broadcast_ch_id = 0x100 dd_sync_broadcast_ch_id = 0x101 # Also acts as the dd_to_ui_ch_id fp_sync_broadcast_ch_id = 0x102 ui_sync_broadcast_ch_id = 0x103 # Also acts as the ui_to_dd_ch_id dialin_to_td_ch_id = 0x400 td_to_dialin_ch_id = 0x401 dialin_to_dd_ch_id = 0x402 dd_to_dialin_ch_id = 0x403 dialin_to_fp_ch_id = 0x404 fp_to_dialin_ch_id = 0x405 dialin_to_ui_ch_id = 0x406 ui_to_dialin_ch_id = 0x407 class LongDenaliMessageBuilder: def __init__(self, message: can.Message): """ LongDialityMessageBuilder is a utility object that helps construct a Denali message that is longer than 8 bytes. It is only called when we don't yet have a long message builder for the current channel. Basic principle is to construct an object with the first 8 byte message which contains the length of the message, and the later push the remaining messages. e.g., let's imagine a 3 message packet. obj = LongDialityMessageBuilder(msg1) message = obj.push(msg2), returns None, message is not complete message = obj.push(msg3), return the packet which is the concatenation of msg1, msg2 and msg3 @param message: a CAN message """ self.message_data = [b for b in message.data] self.number_of_can_packets_needed = DenaliMessage.get_total_packets(self.message_data) self.number_of_can_packets_up_to_now = 1 def push(self, message: can.Message, first_packet=False): """ push appends the CAN message to the current list of messages @param message: 8-byte message @param first_packet: True if it is the first packet received @return:: None if the packet is not completed, otherwise returns the complete packet """ message_data = [b for b in message.data] if first_packet: self.message_data = message_data self.number_of_can_packets_needed = DenaliMessage.get_total_packets(message_data) self.number_of_can_packets_up_to_now = 1 else: self.message_data += message_data self.number_of_can_packets_up_to_now += 1 if self.number_of_can_packets_up_to_now == self.number_of_can_packets_needed: return_message = self.message_data self.message_data = None return return_message else: return None class DenaliCanMessenger(metaclass=SingletonMeta): START_BYTE = DenaliMessage.START_BYTE DIALIN_MSG_RESP_TO = 0.5 # number of seconds to wait for a response to a send command def __init__(self, can_interface: str, logger: Logger, passive_mode=True, console_out=False): """ DenaliCanMessenger constructor @param can_interface - string containing the can interface, e.g., 'can0" @return: DialityCanMessenger object """ self.message_queue_mutex = threading.Lock() self.response_dictionary_mutex = threading.Lock() self.transmitting_mutex = threading.Lock() self.logger = logger self.message_queue = deque() self.callback_listener_complete_messages = None self.callback_listener_invalid_messages = None self.thread_pool_executor = ThreadPoolExecutor(max_workers=1) # TODO for debugging purposes. Change to configurable setting. #if os.path.exists('Listener_can_dump.log'): os.remove('Listener_can_dump.log') #self.temp_logger = open('Listener_can_dump.log', 'w') #if os.path.exists('Send_can_dump.log'): os.remove('Send_can_dump.log') #self.temp_send_logger = open('Send_can_dump.log', 'w') #if os.path.exists('dialin_processed_msg.log'): os.remove('dialin_processed_msg.log') #self.temp_dialin_processed_logger = open('dialin_processed_msg.log', 'w') # TODO for debugging purposes # try to setup can bus and exit if the can bus has not ben setup to use. try: self.bus = socketcan.SocketcanBus(channel=can_interface) self.loop = asyncio.get_event_loop() if self.bus is not None: self.thread_canbus = threading.Thread(target=self.listener, daemon=True) self.thread_message_queue = threading.Thread(target=self.handle_messages, daemon=True) else: self.thread_canbus = None s = "Can connection is not valid" self.logger.debug(s) sys.exit(s) self.listener_buffer = can.AsyncBufferedReader() self.notifier = can.Notifier(bus=self.bus, listeners=[self.listener_buffer], loop=self.loop) except Exception as e: s = str(e) self.logger.error(s) print(s) sys.exit(19) self.passive_mode = passive_mode self.console_out = console_out self.send_event = threading.Event() self.long_message_builders = {} self.long_msg_channel_id_set = set() self.messages = None self.command_response_message = None self.response_channel_id = -1 self.run = False self.sync_response_dictionary = {} self.ui_received_function_ptr = None self.pending_requests = {} self.transmit_interval_dictionary = {} def start(self): """ starts listening to the can interface. """ if self.bus is None: self.logger.error("Cannot start can listener.") return else: self.run = True if self.thread_message_queue is not None and self.thread_canbus is not None: if not self.thread_canbus.is_alive(): self.thread_canbus.start() self.logger.info("Canbus thread has started.") if not self.thread_message_queue.is_alive(): self.thread_message_queue.start() self.logger.info("Message queue thread has started.") else: self.logger.error("Cannot start listener...") def stop(self): """ Stop listening the can interface """ self.run = False self.logger.debug("\nCan listener has stopped.") def listener(self): """ Listens for diality message on the can interface passed during construction. """ async def _listener(): while True: message = await self.listener_buffer.get_message() if message is not None: data = str(time.time()) + ',' + str(message) + '\r' # TODO for debugging purposes. Change to configurable setting. #self.temp_logger.write(data) self.message_queue.append(message) else: # no new packets in receive buffer # Careful here, making this any shorter will start limiting CPU time for other threads sleep(0.01) if not self.loop.is_running(): self.loop.run_until_complete(_listener()) else: self.loop.create_task(_listener()) def handle_messages(self): """ Handles messages added to the leahi_dialin canbus message queue @return: None """ while True: if not self.message_queue: # Careful here, making this any shorter will start limiting CPU time for other threads sleep(0.01) else: self.message_queue_mutex.acquire() message: can.Message = self.message_queue.popleft() if message.dlc == DenaliMessage.PACKET_LENGTH: # We have received a legit CAN message of 8 bytes can_data = [b for b in message.data] channel_id = message.arbitration_id if not DenaliMessage.PAYLOAD_LENGTH_INDEX < len(can_data): self.logger.error("Invalid Denali message received: {0}".format(message)) self.messages = None # Can't process this message, get the next one self.message_queue_mutex.release() continue else: message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: self.messages = self.long_message_builders[channel_id].push(message) elif can_data[0] == DenaliMessage.START_BYTE and \ message_length <= DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # This is a short packet # This is the first time that we are building a message self.messages = can_data # deliver the packet elif can_data[0] == self.START_BYTE and \ message_length > DenaliMessage.PAYLOAD_LENGTH_FIRST_PACKET: # Long packet start # We are starting to build a long message, include it in the lonMsgChannelIDSet self.long_msg_channel_id_set.add(channel_id) # if we don't have a long Denali message builder yet, create it if channel_id not in self.long_message_builders.keys(): self.long_message_builders[channel_id] = LongDenaliMessageBuilder(message) self.messages = None else: # if we do have a builder. This is the first time self.long_message_builders[channel_id].push(message, first_packet=True) self.messages = None # Do we have a complete (long or short) Denali Message? if self.messages is not None: message_valid = True # assume true for now, set to false if CRC check fails below complete_dialin_message = DenaliMessage.build_basic_message(channel_id=channel_id, message=self.messages) dialin_msg_id = DenaliMessage.get_message_id(complete_dialin_message) dialin_ch_id = DenaliMessage.get_channel_id(complete_dialin_message) # TODO for debugging purposes. Change to configurable setting. #self.temp_dialin_processed_logger.write(str(complete_dialin_message)) if dialin_ch_id in self.long_msg_channel_id_set: # We need to remove channel ID from the long message set self.long_msg_channel_id_set.remove(dialin_ch_id) # Need to verify CRC at this point if not DenaliMessage.verify_crc(complete_dialin_message): # if verify is False, let's drop (ignore) this message message_valid = False dialin_ch_id = None dialin_msg_id = None self.logger.critical( "Incorrect CRC, received message: {}, crc: {}, calculated crc: {}\n".format( self.messages, DenaliMessage.get_crc(complete_dialin_message), DenaliMessage.crc8(self.messages))) if message_valid: if self.console_out: self.do_console_out(complete_dialin_message) # Send an ack if required if DenaliMessage.get_sequence_number(complete_dialin_message) < 0: # ACK required. Send back the received message with the sequence sign bit flipped msg = DenaliMessage.create_ack_message(message=complete_dialin_message, passive_mode=False) if msg is not None: self.send(msg, 0, is_ack=True) # We first check if this is a response to a send request that is pending if self.pending_requests and dialin_msg_id in self.pending_requests: self.pending_requests[dialin_msg_id] = complete_dialin_message self.send_event.set() # If it is not, this is a publication message and we need to call it's register function else: self.response_dictionary_mutex.acquire() if DenaliCanMessenger.is_ui_received_channel(dialin_ch_id): # check if the channel is in ui channels if self.ui_received_function_ptr is not None: self.thread_pool_executor.submit( self.ui_received_function_ptr, complete_dialin_message, message.timestamp ) if dialin_ch_id in self.sync_response_dictionary.keys() and \ dialin_msg_id in self.sync_response_dictionary[channel_id].keys(): for function_id in self.sync_response_dictionary[dialin_ch_id][dialin_msg_id]: self.thread_pool_executor.submit( self.sync_response_dictionary[dialin_ch_id][dialin_msg_id][function_id], complete_dialin_message, message.timestamp) self.response_dictionary_mutex.release() else: self.logger.critical("Invalid message: {}\n".format(self.messages)) # Done with this message, let's get the next one self.messages = None self.message_queue_mutex.release() @staticmethod def is_ui_received_channel(channel_id: int) -> bool: """ checks if the channel id, channel_id is ui channel. @param channel_id: the channel id to check @return: true, if the channel is the ui channel. """ if channel_id in { DenaliChannels.ui_to_td_ch_id, DenaliChannels.ui_to_dialin_ch_id, DenaliChannels.ui_sync_broadcast_ch_id, DenaliChannels.ui_alarm_broadcast_ch_id }: # check if the channel is in ui channels return True else: return False def register_receiving_publication_function(self, channel_id, message_id, function): """ Assign a function with packet parameter to an sync request id, e.g., def function(packet). @param channel_id: can channel number where messages are received @param message_id: Diality request ID in message @param function: function reference """ # function_id is a UID for each callback per channel,msg pair. function_id = id(function) # if the channel_id exist, we update the dictionary for the channel_id self.response_dictionary_mutex.acquire() if channel_id in self.sync_response_dictionary.keys(): if message_id in self.sync_response_dictionary[channel_id].keys(): self.sync_response_dictionary[channel_id][message_id].update({function_id: function}) else: self.sync_response_dictionary[channel_id].update( {message_id: {function_id: function}}) # otherwise, we need to create the dictionary for the channel_id, msg_id pair else: self.sync_response_dictionary[channel_id] = {message_id: {function_id: function}} self.response_dictionary_mutex.release() def register_received_all_ui_publication_function(self, function_ptr: Callable): """ Assign a function with packet parameter to an sync request id, e.g., def function(packet). @param function_ptr: function reference """ self.ui_received_function_ptr = function_ptr def send(self, built_message: dict, time_out: float = DIALIN_MSG_RESP_TO, resend: bool = False, is_ack: bool = False): """ Sends a Denali message @param built_message: (dict) message built using DialinMessage class @param time_out: (float) time it will wait for a response in seconds @param resend: (bool) Allow resending the message when no response is received. Disabled by default @param is_ack: (bool) If we're sending an ACK, False by default @return: (dict) The Denali packet. If a timeout occurs returns None """ data = str(time.time()) + ',' + str(built_message) + '\r' # TODO for debugging purposes. Change to configurable setting. #self.temp_send_logger.write(data) msg_sent = False msg_id = -1 # keep trying to send message until we get a response while not msg_sent: channel_id = DenaliMessage.get_channel_id(built_message) padded_can_message_array = built_message['message'] msg_id = DenaliMessage.get_message_id(built_message) if not is_ack and msg_id not in self.pending_requests: self.pending_requests[msg_id] = None # A message can be longer than 8 bytes, so we need to split it # into 8 bytes packets. number_of_packets = DenaliMessage.get_total_packets(padded_can_message_array) # We are sending one message at a time on CAN self.transmitting_mutex.acquire() for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] # Sending one packet at a time packet = can.Message(arbitration_id=channel_id, data=packet, is_extended_id=False) self.bus.send(packet, 0) # 0.1) self.transmitting_mutex.release() # After all messages have been sent, we clear a flag self.send_event.clear() # Block until the timeout completes or until the threading event's flag is set in self.listener, # indicating a response has been received self.send_event.wait(time_out) # if we're sending an ack, nothing left to do if is_ack: return None # only resend the message if resend is enabled and we haven't received a response yet if resend and self.pending_requests.get(msg_id, None) is None: msg_sent = False self.logger.debug("No response. Re-sending message.") else: msg_sent = True response = self.pending_requests.get(msg_id, None) if response is not None: del self.pending_requests[msg_id] return response @staticmethod def _format_message_candump_style(message: can.Message, channel: str, send: bool = True) -> str: """ Formats a packet @param message: (can.Message) The packet to log @param channel: (str) The channel send or received on @param send: (bool) Whether we're sending or receiving this packet @return: The styled message """ tmp = str(message) data = tmp[-41:-18].upper() if send: data = tmp[-23:].upper() return " {0} {1} [{2}] {3}\n".format(channel, str(hex(message.arbitration_id)[2:]).zfill(3), message.dlc, data) def do_log_can(self, packet: can.Message, style="candump", channel="can0", send=True): """ Logs all packets sent or received by leahi_dialin in candump, or non-candump style format @param packet: (can.Message) The packet to log @param style: (str) The style to log in (candump, non-candump) @param channel: (str) The channel send or received on @param send: (bool) Whether we're sending or receiving this packet @return: None """ filename = "Dialin_CAN_Send.log" if not send: filename = "Dialin_CAN_Receive.log" if style == "candump": with open(filename, 'a') as f: styled_message = self._format_message_candump_style(message=packet, channel=channel, send=send) f.write(styled_message) else: with open(filename, 'a') as f: f.write("{0}\n".format(packet)) @staticmethod def convert_message_to_string(complete_dialin_message: dict) -> str: """ Converts the DenaliMessage to hex string (data len is not hex) @param complete_dialin_message: the complete can message in dictionary @return: """ channel = "{0:03X}" .format(DenaliMessage.get_channel_id(complete_dialin_message)) msg_id = DenaliMessage.get_message_id_xstr(complete_dialin_message) data_len = DenaliMessage.get_payload_length(complete_dialin_message) length = "{0:02X}" .format(data_len) data = "" pram_len = 0 if data_len != 0: pram_len = int(data_len / 4) for i in range(pram_len): data += "{0:02X}".format(complete_dialin_message['message'][DenaliMessage.PAYLOAD_START_INDEX + i ]) data += "{0:02X}".format(complete_dialin_message['message'][DenaliMessage.PAYLOAD_START_INDEX + i + 1]) data += "{0:02X}".format(complete_dialin_message['message'][DenaliMessage.PAYLOAD_START_INDEX + i + 2]) data += "{0:02X}".format(complete_dialin_message['message'][DenaliMessage.PAYLOAD_START_INDEX + i + 3]) data += " " message = "{} {} {} {}".format(channel, msg_id, data_len, data) return message @staticmethod def do_console_out(complete_dialin_message: dict) -> None: """ prints out the message in hex format similar to the candump @return: None """ exception_msg_id = { MsgIds.MSG_ID_UI_CHECK_IN.value, MsgIds.MSG_ID_ACK_MESSAGE_THAT_REQUIRES_ACK } msg_id = DenaliMessage.get_message_id(complete_dialin_message) if msg_id in exception_msg_id: return message = "# " + DenaliCanMessenger.convert_message_to_string(complete_dialin_message) print(message) def register_transmitting_interval_message(self, interval: float, function) ->None: """ registers a callback function with a specified time interval to a dictionary @return: None """ function_id = id(function) if function_id in self.transmit_interval_dictionary.keys(): self.logger.error("ERROR: Attempting to assign more than one timed interval per given method.") self.transmit_interval_dictionary[function_id].stop() self.transmit_interval_dictionary[function_id].start() else: self.transmit_interval_dictionary[function_id] = IntervalTimer(interval, function) return function_id