#!/usr/bin/python3 # -*- coding: utf-8 -*- from jinja2 import Environment, FileSystemLoader from MsgData import MsgData from pathlib import Path # \brief Class that usees the loaded MsgData and outputs converts to Protobuf class MsgCpp(MsgData): # \brief Initializer def __init__(self): super().__init__() # \brief Load the csv files and cache the data. # \note This will clear any previously cached csv data. # \param[in] filename Filename of the csv to load. # \param[in] clear If true, clear out any already loaded data before processing new data, # otherwise new data will be added to the existing data # \return none def load(self, filename, clear=True): super().load(filename, clear) for (msg_id_value, msg) in self.data.items(): msg['cpp_struct'] = self.__struct_data(msg) # \brief Write the loaded MsgData to a C++ header file # \param[in] device_name name of the device used to name the output message definitions header file # \param[in] output_dir directory where the output header file will be written # \param[in] namespace namespace to use in the header file, if namespace is blank then no namespace will be added # \param[in] proto flag to add protobuf utility function to generated header file def write_msg_defs_header(self, device_name, output_dir, namespace=None, proto=False): env = Environment(loader = FileSystemLoader(f'{Path(__file__).parent.absolute()}/templates')) template = env.get_template('MsgDefs_h.jinja') render = template.render({ 'msg_cpp': self, 'cpp_namespace': namespace, 'proto': proto, 'device_name': device_name }) header_path = Path(output_dir).joinpath(f"{device_name}MsgDefs.h") with open(header_path, mode='w', encoding='utf-8', newline='\n') as out_file: out_file.write(render) out_file.close() print(f"Wrote C++ message definitions header file {header_path}") # \brief Write the loaded MsgData to a source file # \param[in] device_name name of the device used to name the output message definitions source file # \param[in] output_dir directory where the output header file will be written # \param[in] namespace namespace to use in the header file, if namespace is blank then no namespace will be added # \param[in] proto flag to add protobuf utility function to generated header file def write_msg_defs_source(self, device_name, output_dir, namespace=None, proto=False): env = Environment(loader = FileSystemLoader(f'{Path(__file__).parent.absolute()}/templates')) template = env.get_template('MsgDefs_cpp.jinja') render = template.render({ 'msg_cpp': self, 'msg_defs_header': f"{device_name}MsgDefs.h", 'cpp_namespace': namespace, 'proto': proto, }) srcPath = Path(output_dir).joinpath(f"{device_name}MsgDefs.cpp") with open(srcPath, mode='w', encoding='utf-8', newline='\n') as out_file: out_file.write(render) out_file.close() print(f"Wrote C++ message definitions source file {srcPath}") def write_log_parser(self, basename, header_dir, source_dir, namespace, msg_defs_header): with open((header_dir.rstrip('/') + "/" if header_dir else "") + basename + '.h', mode='w', encoding='utf-8', newline='\n') as out_file: includes = set() includes.add(f"\"{msg_defs_header}\"") if len(includes): for key, value in self.__includeGroups(includes): out_file.write("\n".join(f"#include {include}" for include in value) + "\n\n") if namespace: out_file.write(f"namespace {namespace}\n{{\n\n") if namespace: out_file.write(f"\n}} // namespace {namespace}\n") out_file.close() print(f'Wrote message source file {(output_dir.rstrip("/") + "/" if output_dir else "") + basename}.cpp') def write_source(self, basename, output_dir, namespace): with open((output_dir.rstrip('/') + "/" if output_dir else "") + basename + '.cpp', mode='w', encoding='utf-8', newline='\n') as out_file: includes = set() includes.add(f"\"{Path(basename + '.h').name}\"") if len(includes): for key, value in self.__includeGroups(includes): out_file.write("\n".join(f"#include {include}" for include in value) + "\n\n") if namespace: out_file.write(f"namespace {namespace}\n{{\n\n") if namespace: out_file.write(f"\n}} // namespace {namespace}\n") out_file.close() print(f'Wrote message source file {(output_dir.rstrip("/") + "/" if output_dir else "") + basename}.cpp') def __includeGroups(self, includes): # sort the includes groups = { } # create groups for the includes based on their search order (i.e. , "include") for include in includes: if include[0] not in groups: groups[include[0]] = [ ] groups[include[0]].append(include) # sort the groups, system includes () first and local includes ("include") second groups = sorted(groups.items(), key=lambda x: x[0], reverse=True) # sort each group of includes based on alphabetic order of the filename for key, value in groups: value.sort() return groups def __struct_data(self, msg_def): def field_type_to_cpp_type(field_type): if field_type == 'BOOL': return "Types::U32" elif field_type == 'U08': return "Types::U08" elif field_type == 'U16': return "Types::U16" elif field_type == 'U32': return "Types::U32" elif field_type == 'S08': return "Types::S08" elif field_type == 'S16': return "Types::S16" elif field_type == 'S32': return "Types::S32" elif field_type == 'F32': return "Types::F32" elif field_type == 'union': return "ParamUnion" else: return None def field_type_size(field_type): if field_type == 'BOOL': return 4 elif field_type == 'U08': return 1 elif field_type == 'U16': return 2 elif field_type == 'U32': return 4 elif field_type == 'S08': return 1 elif field_type == 'S16': return 2 elif field_type == 'S32': return 4 elif field_type == 'F32': return 4 elif field_type == 'union': return 4 else: return 0 struct_info = { 'payload': [ ], 'payload_size': 0, } for field in msg_def['payload']: struct_info['payload_size'] += field_type_size(field['type']) struct_info['payload'].append({ 'name': field['name'], 'type': field['type'], 'cpp_type': field_type_to_cpp_type(field['type']) }) if struct_info['payload'][-1]['type'] is None: print(f"WARNING: unhandled type \"{field['type']}\" for field {field['name']} in message {msg_def['msg_id']} ({msg_def['msg_id_hex_string']})") return struct_info