Index: dialin/protocols/CAN.py =================================================================== diff -u -r03187a38c55eb5c2b51bbd1c8a4633ad42cea9c5 -r3bc7e000a3f98d87aa718d34f11305c82ab1f03c --- dialin/protocols/CAN.py (.../CAN.py) (revision 03187a38c55eb5c2b51bbd1c8a4633ad42cea9c5) +++ dialin/protocols/CAN.py (.../CAN.py) (revision 3bc7e000a3f98d87aa718d34f11305c82ab1f03c) @@ -7,8 +7,8 @@ # # @file CAN.py # -# @author (last) Micahel Garthwaite -# @date (last) 30-Jun-2023 +# @author (last) Behrouz NematiPour +# @date (last) 02-Apr-2022 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # @@ -454,6 +454,7 @@ def __init__(self, can_interface: str, logger: Logger, + log_can=False, passive_mode=True, console_out=False): """ @@ -463,10 +464,9 @@ @return: DialityCanMessenger object """ - self.message_queue_mutex = threading.Lock() - self.response_dictionary_mutex = threading.Lock() - self.transmitting_mutex = threading.Lock() + self.logger = logger + self.log_can = log_can self.message_queue = deque() self.callback_listener_complete_messages = None self.callback_listener_invalid_messages = None @@ -504,7 +504,6 @@ self.sync_response_dictionary = {} self.ui_received_function_ptr = None self.pending_requests = {} - self.transmit_interval_dictionary = {} def start(self): """ @@ -567,7 +566,6 @@ # Careful here, making this any shorter will start limiting CPU time for other threads sleep(0.01) else: - self.message_queue_mutex.acquire() message: can.Message = self.message_queue.popleft() if message.dlc == DenaliMessage.PACKET_LENGTH: @@ -577,11 +575,13 @@ if not DenaliMessage.PAYLOAD_LENGTH_INDEX < len(can_data): self.logger.error("Invalid Denali message received: {0}".format(message)) self.messages = None # Can't process this message, get the next one - self.message_queue_mutex.release() continue else: message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] + if self.log_can: + self.do_log_can(message, style="candump", channel=self.bus.channel, send=False) + # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: self.messages = self.long_message_builders[channel_id].push(message) @@ -647,29 +647,31 @@ # If it is not, this is a publication message and we need to call it's register function else: - self.response_dictionary_mutex.acquire() - if DenaliCanMessenger.is_ui_received_channel(dialin_ch_id): # check if the channel is in ui channels if self.ui_received_function_ptr is not None: self.thread_pool_executor.submit( self.ui_received_function_ptr, - complete_dialin_message, message.timestamp + complete_dialin_message ) - if dialin_ch_id in self.sync_response_dictionary.keys() and \ - dialin_msg_id in self.sync_response_dictionary[channel_id].keys(): - for function_id in self.sync_response_dictionary[dialin_ch_id][dialin_msg_id]: + if dialin_ch_id in self.sync_response_dictionary.keys() : + # if message specifc handler is defined, call it + if dialin_msg_id in self.sync_response_dictionary[channel_id].keys() : self.thread_pool_executor.submit( - self.sync_response_dictionary[dialin_ch_id][dialin_msg_id][function_id], - complete_dialin_message, message.timestamp) - - self.response_dictionary_mutex.release() + self.sync_response_dictionary[dialin_ch_id][dialin_msg_id], + complete_dialin_message + ) + # else use a generic default handler, if one is defined (for None) + elif None in self.sync_response_dictionary[channel_id].keys(): + self.thread_pool_executor.submit( + self.sync_response_dictionary[dialin_ch_id][None], + complete_dialin_message + ) else: self.logger.critical("Invalid message: {}\n".format(self.messages)) # Done with this message, let's get the next one self.messages = None - self.message_queue_mutex.release() @staticmethod def is_ui_received_channel(channel_id: int) -> bool: @@ -698,20 +700,14 @@ @param message_id: Diality request ID in message @param function: function reference """ - # function_id is a UID for each callback per channel,msg pair. - function_id = id(function) - # if the channel_id exist, we update the dictionary for the channel_id - self.response_dictionary_mutex.acquire() + + # if the channel_id exist, we just update the dictionary for the channel_id if channel_id in self.sync_response_dictionary.keys(): - if message_id in self.sync_response_dictionary[channel_id].keys(): - self.sync_response_dictionary[channel_id][message_id].update({function_id: function}) - else: - self.sync_response_dictionary[channel_id].update( {message_id: {function_id: function}}) + self.sync_response_dictionary[channel_id].update({message_id: function}) - # otherwise, we need to create the dictionary for the channel_id, msg_id pair + # otherwise, we need to create the dictionary for the channel_id else: - self.sync_response_dictionary[channel_id] = {message_id: {function_id: function}} - self.response_dictionary_mutex.release() + self.sync_response_dictionary[channel_id] = {message_id: function} def register_received_all_ui_publication_function(self, function_ptr: Callable): """ @@ -758,8 +754,6 @@ number_of_packets = DenaliMessage.get_total_packets(padded_can_message_array) # We are sending one message at a time on CAN - self.transmitting_mutex.acquire() - for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] @@ -769,10 +763,10 @@ data=packet, is_extended_id=False) + if self.log_can: + self.do_log_can(packet, style="candump", channel=self.bus.channel, send=True) self.bus.send(packet, 0) # 0.1) - self.transmitting_mutex.release() - # After all messages have been sent, we clear a flag self.send_event.clear() @@ -875,4 +869,3 @@ message = "# " + DenaliCanMessenger.convert_message_to_string(complete_dialin_message) print(message) -