Index: dialin/protocols/CAN.py =================================================================== diff -u -rd3a22e97fad0b715b1e9fba138236334ae7dbb6d -r5b4a63897b88aed356e53641a518a9038801ae3e --- dialin/protocols/CAN.py (.../CAN.py) (revision d3a22e97fad0b715b1e9fba138236334ae7dbb6d) +++ dialin/protocols/CAN.py (.../CAN.py) (revision 5b4a63897b88aed356e53641a518a9038801ae3e) @@ -7,8 +7,8 @@ # # @file CAN.py # -# @author (last) Behrouz NematiPour -# @date (last) 02-Apr-2022 +# @author (last) Micahel Garthwaite +# @date (last) 30-Jun-2023 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # @@ -29,7 +29,7 @@ import struct from .. import common from ..common import MsgIds -from ..utils import SingletonMeta +from ..utils import SingletonMeta, IntervalTimer from concurrent.futures import ThreadPoolExecutor @@ -454,7 +454,6 @@ def __init__(self, can_interface: str, logger: Logger, - log_can=False, passive_mode=True, console_out=False): """ @@ -464,9 +463,10 @@ @return: DialityCanMessenger object """ - + self.message_queue_mutex = threading.Lock() + self.response_dictionary_mutex = threading.Lock() + self.transmitting_mutex = threading.Lock() self.logger = logger - self.log_can = log_can self.message_queue = deque() self.callback_listener_complete_messages = None self.callback_listener_invalid_messages = None @@ -504,6 +504,7 @@ self.sync_response_dictionary = {} self.ui_received_function_ptr = None self.pending_requests = {} + self.transmit_interval_dictionary = {} def start(self): """ @@ -566,6 +567,7 @@ # Careful here, making this any shorter will start limiting CPU time for other threads sleep(0.01) else: + self.message_queue_mutex.acquire() message: can.Message = self.message_queue.popleft() if message.dlc == DenaliMessage.PACKET_LENGTH: @@ -575,13 +577,11 @@ if not DenaliMessage.PAYLOAD_LENGTH_INDEX < len(can_data): self.logger.error("Invalid Denali message received: {0}".format(message)) self.messages = None # Can't process this message, get the next one + self.message_queue_mutex.release() continue else: message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] - if self.log_can: - self.do_log_can(message, style="candump", channel=self.bus.channel, send=False) - # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: self.messages = self.long_message_builders[channel_id].push(message) @@ -647,31 +647,29 @@ # If it is not, this is a publication message and we need to call it's register function else: + self.response_dictionary_mutex.acquire() + if DenaliCanMessenger.is_ui_received_channel(dialin_ch_id): # check if the channel is in ui channels if self.ui_received_function_ptr is not None: self.thread_pool_executor.submit( self.ui_received_function_ptr, - complete_dialin_message + complete_dialin_message, message.timestamp ) - if dialin_ch_id in self.sync_response_dictionary.keys() : - # if message specifc handler is defined, call it - if dialin_msg_id in self.sync_response_dictionary[channel_id].keys() : + if dialin_ch_id in self.sync_response_dictionary.keys() and \ + dialin_msg_id in self.sync_response_dictionary[channel_id].keys(): + for function_id in self.sync_response_dictionary[dialin_ch_id][dialin_msg_id]: self.thread_pool_executor.submit( - self.sync_response_dictionary[dialin_ch_id][dialin_msg_id], - complete_dialin_message - ) - # else use a generic default handler, if one is defined (for None) - elif None in self.sync_response_dictionary[channel_id].keys(): - self.thread_pool_executor.submit( - self.sync_response_dictionary[dialin_ch_id][None], - complete_dialin_message - ) + self.sync_response_dictionary[dialin_ch_id][dialin_msg_id][function_id], + complete_dialin_message, message.timestamp) + + self.response_dictionary_mutex.release() else: self.logger.critical("Invalid message: {}\n".format(self.messages)) # Done with this message, let's get the next one self.messages = None + self.message_queue_mutex.release() @staticmethod def is_ui_received_channel(channel_id: int) -> bool: @@ -700,14 +698,20 @@ @param message_id: Diality request ID in message @param function: function reference """ - - # if the channel_id exist, we just update the dictionary for the channel_id + # function_id is a UID for each callback per channel,msg pair. + function_id = id(function) + # if the channel_id exist, we update the dictionary for the channel_id + self.response_dictionary_mutex.acquire() if channel_id in self.sync_response_dictionary.keys(): - self.sync_response_dictionary[channel_id].update({message_id: function}) + if message_id in self.sync_response_dictionary[channel_id].keys(): + self.sync_response_dictionary[channel_id][message_id].update({function_id: function}) + else: + self.sync_response_dictionary[channel_id].update( {message_id: {function_id: function}}) - # otherwise, we need to create the dictionary for the channel_id + # otherwise, we need to create the dictionary for the channel_id, msg_id pair else: - self.sync_response_dictionary[channel_id] = {message_id: function} + self.sync_response_dictionary[channel_id] = {message_id: {function_id: function}} + self.response_dictionary_mutex.release() def register_received_all_ui_publication_function(self, function_ptr: Callable): """ @@ -754,6 +758,8 @@ number_of_packets = DenaliMessage.get_total_packets(padded_can_message_array) # We are sending one message at a time on CAN + self.transmitting_mutex.acquire() + for n in range(number_of_packets): packet = padded_can_message_array[n * DenaliMessage.PACKET_LENGTH: (n + 1) * DenaliMessage.PACKET_LENGTH] @@ -763,10 +769,10 @@ data=packet, is_extended_id=False) - if self.log_can: - self.do_log_can(packet, style="candump", channel=self.bus.channel, send=True) self.bus.send(packet, 0) # 0.1) + self.transmitting_mutex.release() + # After all messages have been sent, we clear a flag self.send_event.clear() @@ -869,3 +875,19 @@ message = "# " + DenaliCanMessenger.convert_message_to_string(complete_dialin_message) print(message) + + def register_transmitting_interval_message(self, interval: float, function) ->None: + """ + registers a callback function with a specified time interval to a dictionary + @return: None + """ + function_id = id(function) + + if function_id in self.transmit_interval_dictionary.keys(): + self.logger.error("ERROR: Attempting to assign more than one timed interval per given method.") + self.transmit_interval_dictionary[function_id].stop() + self.transmit_interval_dictionary[function_id].start() + else: + self.transmit_interval_dictionary[function_id] = IntervalTimer(interval, function) + + return function_id