Index: dialin/protocols/CAN.py =================================================================== diff -u -r8c39fe1f9affe360ee6a97c5e6243e58a5c27509 -r10ed41d4e0580b54aa859c9ddde83183ff4f51e4 --- dialin/protocols/CAN.py (.../CAN.py) (revision 8c39fe1f9affe360ee6a97c5e6243e58a5c27509) +++ dialin/protocols/CAN.py (.../CAN.py) (revision 10ed41d4e0580b54aa859c9ddde83183ff4f51e4) @@ -20,13 +20,10 @@ import time from time import sleep import sys -import argparse -from argparse import RawTextHelpFormatter import logging class DenaliMessage: - BYTE_ORDER = 'little' START_BYTE = 0xA5 START_INDEX = 0 @@ -346,7 +343,7 @@ START_BYTE = DenaliMessage.START_BYTE DIALIN_MSG_RESP_TO = 0.1 # number of seconds to wait for a response to a sent command - def __init__(self, can_interface='can0', log_level="error"): + def __init__(self, can_interface='can0', log_level=None): """ DenaliCanMessenger constructor @@ -358,33 +355,31 @@ self.bus = can.interfaces.socketcan.SocketcanBus(channel=can_interface) self.listener_buffer = can.BufferedReader() self.notifier = can.Notifier(self.bus, [self.listener_buffer]) - self.send_packet_request_id = -1 self.send_event = threading.Event() - self.long_message_builders = {} self.long_msg_channel_id_set = set() - self.messages = None self.last_sent_message = None self.command_response_message = None self.response_channel_id = -1 - self.run = False - self.sync_response_dictionary = {} - numeric_level = getattr(logging, log_level.upper(), logging.ERROR) - logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', - filename="dialin.log", - datefmt='%m-%d-%Y:%H:%M:%S', - level=numeric_level) # DEBUG, INFO, WARNING, ERROR, CRITICAL + self.logging_enabled = log_level is not None and log_level.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + if self.logging_enabled: + numeric_level = getattr(logging, log_level.upper(), logging.ERROR) + logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', + filename="dialin.log", + datefmt='%m-%d-%Y:%H:%M:%S', + level=numeric_level) # DEBUG, INFO, WARNING, ERROR, CRITICAL + if self.bus is not None: self.serial_listener_thread = threading.Thread(target=self.listener, daemon=True) else: self.serial_listener_thread = None - print_and_log("Can connection is not valid") + self.print_and_log("Can connection is not valid") def start(self): """ @@ -393,23 +388,23 @@ """ if self.bus is None: - print_and_log("Cannot start can listener.") + self.print_and_log("Cannot start can listener.") return else: self.run = True if self.serial_listener_thread is not None: self.serial_listener_thread.start() - print_and_log("Can listener has started.") + self.print_and_log("Can listener has started.") else: - print_and_log("Cannot start listener...") + self.print_and_log("Cannot start listener...") def stop(self): """ Stop listening the can interface """ self.run = False - print_and_log("\nCan listener has stopped.") + self.print_and_log("\nCan listener has stopped.") def listener(self): """ @@ -419,7 +414,7 @@ while self.run: message = self.listener_buffer.get_message(0.0) - print_and_log("Message = {}".format(message)) + self.print_and_log("Message = {}".format(message)) if message is not None: @@ -429,8 +424,8 @@ channel_id = message.arbitration_id message_length = can_data[DenaliMessage.PAYLOAD_LENGTH_INDEX] - print_and_log(str(time.time()) + " " + str(channel_id) + " " + str(can_data), - log_level=logging.INFO) + self.print_and_log(str(time.time()) + " " + str(channel_id) + " " + str(can_data), + log_level=logging.INFO) # if we are building a long message, then proceed to push it to the channel dictionary if channel_id in self.long_msg_channel_id_set: @@ -554,7 +549,7 @@ data=packet, is_extended_id=False) - print_and_log(packet) + self.print_and_log(packet) self.bus.send(packet, 0) # 0.1) # Sending @@ -572,133 +567,37 @@ msg_sent = True else: # msg_sent = False - print_and_log("No response. Re-sending message.") + self.print_and_log("No response. Re-sending message.") # We are ready to send again. Reset request ID appropriately self.send_packet_request_id = -1 # This value is None or it has a message depending of the listener return self.command_response_message + def print_and_log(self, message, log_level=logging.DEBUG): + """ + Prints a message if its severity is >= the current log level. + Also logs the message. -def print_and_log(message, log_level=logging.DEBUG): - """ - Prints a message if its severity is >= the current log level. - Also logs the message. + \param message: The message to print and log + \param log_level: The logging level, indicates the severity of the message - \param message: The message to print and log - \param log_level: The logging level, indicates the severity of the message + \returns: None + """ + if not self.logging_enabled: + return - \returns: None - """ - if logging.getLogger().getEffectiveLevel() <= log_level: - print(message) + if logging.getLogger().getEffectiveLevel() <= log_level: + print(message) - if log_level == "debug": - logging.debug(message) - elif log_level == "info": - logging.info(message) - elif log_level == "warning": - logging.warning(message) - elif log_level == "error": - logging.error(message) - elif log_level == "critical": - logging.critical(message) - -def test_print_received_packet(self, message, sync=False): - channel_id = message[0] - message[0] = DenaliMessage.START_BYTE - introduction = "Received: " - - if sync: - introduction = "Sync " + introduction - - print_and_log("{0} {1} in channel: {3}".format(introduction, message, channel_id)) - - -def test_print_to_screen(message): - if message is None: - print_and_log("Timeout!!!") - else: - test_print_received_packet(message) - - -def test_function_for_sync(message): - test_print_received_packet(message, sync=True) - - -def test_print_sending_dg_board(message): - print_and_log("Sending to board: {0} on channel: {1}".format(message['message'], message['channel_id'])) - - -def test_print_sending_dg_sim(message): - print_and_log("Sending to DG simulator: {0} on channel {1}".format(message["message"], message["channel_id"])) - - -def test_dg(log_level): - test_messenger = DenaliCanMessenger(log_level=log_level) - - test_received_channel_id = DenaliChannels.hd_to_ui_ch_id - test_received_message_id = 0x100 - - test_messenger.register_receiving_publication_function(test_received_channel_id, test_received_message_id, - test_function_for_sync) - - test_dg_simulator_received_channel_id = DenaliChannels.dialin_to_dg_ch_id - test_dg_simulator_sync_msg_id = 0x05 - test_dg_simulator_msg_id = 0x03 - - test_messenger.register_receiving_publication_function(test_dg_simulator_received_channel_id, - test_dg_simulator_sync_msg_id, - test_function_for_sync) - - test_messenger.start() - - test_msg = DenaliMessage.build_message(channel_id=1000, - message_id=0x01, - payload=[1]) - test_dg_msg = DenaliMessage.build_message(channel_id=test_dg_simulator_received_channel_id, - message_id=test_dg_simulator_msg_id, - payload=[]) - - sleep(3.0) - test_print_sending_dg_board(test_msg) - test_response = test_messenger.send(test_msg) - test_print_to_screen(test_response) - - sleep(3.0) - test_print_sending_dg_board(test_msg) - test_response = test_messenger.send(test_msg) - test_print_to_screen(test_response) - - sleep(3.0) - test_print_sending_dg_sim(test_dg_msg) - test_response = test_messenger.send(test_dg_msg) - test_print_to_screen(test_response) - - sleep(3.0) - test_print_sending_dg_sim(test_dg_msg) - test_response = test_messenger.send(test_dg_msg) - test_print_to_screen(test_response) - - -def test_can(self): - # TODO: Parse received input and setup as a sender - test_messenger = DenaliCanMessenger() - test_messenger.start() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Dial-In Core Can Protocol \n" - "\tExample: \n" - "\tpython3 CoreCanProtocol.py --test-dg --log-level=\"debug\"", - formatter_class=RawTextHelpFormatter) - parser.add_argument("--test-dg", action="store_true") - parser.add_argument("--log-level", default="error") - args = parser.parse_args() - - if args.test_dg: - test_dg(args.log_level) - - if len(sys.argv) < 2: - parser.print_help() + if log_level == "debug": + logging.debug(message) + elif log_level == "info": + logging.info(message) + elif log_level == "warning": + logging.warning(message) + elif log_level == "error": + logging.error(message) + elif log_level == "critical": + logging.critical(message) Index: tests/test_can_protocol.py =================================================================== diff -u --- tests/test_can_protocol.py (revision 0) +++ tests/test_can_protocol.py (revision 10ed41d4e0580b54aa859c9ddde83183ff4f51e4) @@ -0,0 +1,84 @@ +import sys +sys.path.append("..") +import argparse +from argparse import RawTextHelpFormatter +from dialin.protocols.CAN import DenaliMessage, DenaliCanMessenger + + +class CANTests: + def __init__(self): + self.messenger = DenaliCanMessenger(logging_enabled=True) + + def test_dg(self, log_level): + raise NotImplementedError + + + """ + self.messenger = DenaliCanMessenger(log_level=log_level) + + test_received_channel_id = DenaliChannels.hd_to_ui_ch_id + test_received_message_id = 0x100 + + self.messenger.register_receiving_publication_function(test_received_channel_id, test_received_message_id, + test_function_for_sync) + + test_dg_simulator_received_channel_id = DenaliChannels.dialin_to_dg_ch_id + test_dg_simulator_sync_msg_id = 0x05 + test_dg_simulator_msg_id = 0x03 + + self.messenger.register_receiving_publication_function(test_dg_simulator_received_channel_id, + test_dg_simulator_sync_msg_id, + test_function_for_sync) + + self.messenger.start() + + test_msg = DenaliMessage.build_message(channel_id=1000, + message_id=0x01, + payload=[1]) + test_dg_msg = DenaliMessage.build_message(channel_id=test_dg_simulator_received_channel_id, + message_id=test_dg_simulator_msg_id, + payload=[]) + + sleep(3.0) + test_print_sending_dg_board(test_msg) + test_response = self.messenger.send(test_msg) + test_print_to_screen(test_response) + + sleep(3.0) + self.test_print_sending_dg_board(test_msg) + test_response = self.messenger.send(test_msg) + self.test_print_to_screen(test_response) + + sleep(3.0) + self.test_print_sending_dg_sim(test_dg_msg) + test_response = self.messenger.send(test_dg_msg) + self.test_print_to_screen(test_response) + + sleep(3.0) + self.test_print_sending_dg_sim(test_dg_msg) + test_response = self.messenger.send(test_dg_msg) + test_print_to_screen(test_response) + """ + + + def test_can_receive(self): + # TODO: Parse received input and setup as a sender + self.messenger = DenaliCanMessenger() + self.messenger.start() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Dial-In Core Can Protocol \n" + "\tExample: \n" + "\tpython3 CoreCanProtocol.py --test-dg --log-level=\"debug\"", + formatter_class=RawTextHelpFormatter) + parser.add_argument("--test-dg", action="store_true") + parser.add_argument("--log-level", default="error") + args = parser.parse_args() + + tests = CANTests() + if args.test_dg: + tests.test_dg(args.log_level) + + if len(sys.argv) < 2: + parser.print_help()