import sys
import numpy as np
from Basilisk.architecture import (sim_model, alg_contain)
import copy
import warnings
[docs]class CppWrapperClass(object):
"""
This is the main class handling instrospection of the C modules.
It generates the C++ wrapper classes (with callbacks to the C algorithms and
setters and getters for the C module variables) and it also generates the MicroPython integration patch.
"""
def __init__(self, TheSim, taskActivityDir, simTag, outputPath):
self.output_store_path = outputPath
self.wrapper_template = CppWrapClassTemplate()
self.module_mpy_file = open(self.output_store_path + "/module.hpp", 'w+')
self.name_replace = TheSim.NameReplace
self.SelfInit_dict = dict() # {modelTag: SelfInit alg address, moduleID}
self.Update_dict = dict() # dictionary D = {modelTag: Update alg address, moduleID}
self.Reset_dict = dict() # dictionary D = {modelTag: Reset alg address, moduleID}
self.fill_dictionaries(simTag, TheSim)
self.taskIdxDir = self.areTasksInSimTaskList(taskActivityDir, TheSim)
self.create_class_functions_wrapper(TheSim, taskActivityDir)
def fill_dictionaries(self, simTag, TheSim):
TheSimList = dir(eval(simTag))
i = 0
for elemName in TheSimList:
elem = eval(simTag + '.' + elemName)
if type(elem) == alg_contain.AlgContain:
self.SelfInit_dict[elem.ModelTag] = (int(elem.getSelfInitAddress()), i)
self.Update_dict[elem.ModelTag] = (int(elem.getUpdateAddress()), i)
hasResetAddress = int(elem.getResetAddress())
if (hasResetAddress):
self.Reset_dict[elem.ModelTag] = (hasResetAddress, i)
i += 1
[docs] def getTaskIndex(self, theSim, taskUse):
"""
This function returns the key name of the NameReplace dictionary according to the index of the task
and the index of the model inside the task
"""
j = 0
for taskPy in theSim.TaskList:
if taskUse.TaskName == taskPy.Name:
return j
j += 1
return -1
def areTasksInSimTaskList(self, taskActivityDir, TheSim):
print('TASKS BEING PARSED: ')
taskIdxDir = []
taskOrderedList = []
for procIdx in range(0, len(TheSim.TotalSim.processList)):
locProcList = []
for i_task in range(len(TheSim.TotalSim.processList[procIdx].processTasks)):
theTask = TheSim.TotalSim.processList[procIdx].processTasks[i_task]
taskFound = False
for ordIdx in range(len(locProcList)):
locTask = locProcList[ordIdx]
# locTask[0] = taskName
# locTask[1] = taskPriority
if theTask.taskPriority > locTask[1]:
locProcList.insert(ordIdx,
[theTask.TaskPtr.TaskName, theTask.taskPriority])
taskFound = True
break
if taskFound != True:
locProcList.append([theTask.TaskPtr.TaskName, theTask.taskPriority, theTask])
taskOrderedList.extend(locProcList)
for i_task in range(0, len(taskOrderedList)):
# taskOrderedList[i_task][0] = taskName
# taskOrderedList[i_task][1] = taskPriority
# taskOrderedList[i_task][2] = theTask
taskName = taskOrderedList[i_task][0]
if taskName in taskActivityDir.keys():
idxUse = self.getTaskIndex(TheSim, taskOrderedList[i_task][2].TaskPtr)
taskIdxDir.append(idxUse)
print( i_task, taskName)
return taskIdxDir
def getTaskModelKey(self, i_task, i_model):
key = 'self.TaskList[' + str(i_task) + '].TaskModels[' + str(i_model) + ']'
return key
[docs] def parseSwigVars(self, list):
"""
First Parsing Method: Gets rid of all the variables that come after a built-in.
The methods SelfInit, Update and Restart will always come first, and so will
the variables that are capitalized (this is based on how "dir()" command works in the Python interpreter).
Returns a reduced array.
"""
parsed_array = np.array([])
length = len(list)
i = 0
while i < length:
if list[i][0] != '_':
parsed_array = np.append(parsed_array, list[i])
i += 1
else:
break
return parsed_array
[docs] def evalParsedList(self, list, module):
"""
Second Parsing Method: Collects all the SwigPyObjects present in the list.
Only the methods ``SelfInit_(...)``, ``Update_(...)`` and ``Restart_(...)``
are wrapped by Swig in the ``.i`` files. Therefore they are the only ``SwigPyObjects``.
:return: a dictionary D = {method, address}
"""
addressDict = {}
for methodName in list:
methodObject = eval('sys.modules["' + module + '"].' + methodName)
if type(methodObject).__name__ == "SwigPyObject":
methodAddress = sim_model.getObjectAddress(methodObject)
addressDict[methodName] = int(methodAddress)
return addressDict
[docs] def checkMethodType(self, methodName):
"""
This function checks the method type of the input and returns the corresponding strings.
:return: methodName: name of the model data algorithm
"""
str_selfInit = 'SelfInit'
str_update = 'Update'
str_reset = 'Reset'
str_blank = '' # the methods SelfInit doesn't need the callTime parameter
str_callTime = ', callTime' # the methods Reset and Update need an extra parameter for callTine
if methodName[0:len(str_selfInit)] == str_selfInit:
return str_selfInit, str_blank
elif methodName[0:len(str_update)] == str_update:
return str_update, str_callTime
elif methodName[0:len(str_reset)] == str_reset:
return str_reset, str_callTime
else:
raise ValueError('Cannot recognize the method. Parse better.')
[docs] def findAddressMatch(self, addressVal, modelTagKey, dict):
"""
This function makes sure that each algorithm in a data model is matched with the proper algorithm in
the corresponding model wrap.
If there's a match, returns the ID of the model.
Otherwise, an ugly error is raised and the whole program quits.
:param addressVal: address of the model data algorithm.
:param modelTagKey: modelTag = key with which to search in ``dict``.
:param dict: wrap algorithms' address dictionary in which to look for a match. For example, use dict[modelTagKey][0] = model wrap algorithm address or dict[modelTagKey][1] = model ID
"""
try:
address = dict[modelTagKey][0]
if address == addressVal:
IDVal = dict[modelTagKey][1]
return IDVal
except:
raise ValueError(str(modelTagKey) + ' is not wrapping all the existing algorithms in '
'the corresponding data model. Fix it.')
def create_class_functions_wrapper(self, TheSim, taskActivityDir):
for i_task in self.taskIdxDir:
task = TheSim.TaskList[i_task]
i_model = 0
for model in task.TaskModels:
curr_model_algs_dict = dict()
key = self.getTaskModelKey(i_task, i_model)
modelTag = TheSim.NameReplace[key]
module = model.__module__
c_struct_name = str(type(model).__name__)
split_names = module.split('.')
h_file_name = split_names[-1]
h_folder_name = split_names[-2]
header_line = '#include "%s/%s.h"' % (h_folder_name, h_file_name)
hpp_line = '#include "%s/%s.hpp"' % (h_folder_name, h_file_name)
sysMod = sys.modules[module]
dirList = dir(sysMod)
parsed_dirList = self.parseSwigVars(dirList)
addressDict = self.evalParsedList(parsed_dirList, module)
for k, v in addressDict.items():
methodType, methodCallTime = self.checkMethodType(k)
c_call = k + "(&(this->config_data)" + methodCallTime + ", this->moduleID);"
split_arg = methodCallTime.split()
arg = ''
if split_arg:
arg = 'uint64_t %s' % split_arg[-1]
curr_model_algs_dict[methodType] = c_call, arg
self.wrapper_template.create_new_template(modelTag, header_line, c_struct_name, curr_model_algs_dict,
hpp_line)
self.autocode_model(model=model, prefix=modelTag)
self.wrapper_template.join_swigged_data()
output_file_name = self.output_store_path + "/%s.hpp" % h_file_name
self.wrapper_template.write_new_class(abs_path_file_name=output_file_name)
self.wrapper_template.generate_mpy_wrapper_code(self.module_mpy_file)
self.wrapper_template.reset()
i_model += 1
self.module_mpy_file.close()
def printList(self, input, prefix):
#print( "input = ", input)
#print( "len_input = ", len(input))
for i in range(len(input)):
prefixIn = prefix + '[' + str(i) + ']'
if type(input[i]).__name__ == 'list':
#print( "RECURSION")
self.printList(input[i], prefixIn)
else:
if(input[i] != 0):
list_write = prefixIn + ' = ' + str(input[i])+';\n'
#print( "str_write = ", list_write)
#print( "input_type = ", type(input[i]).__name__)
def classify_list(self, list, prefix):
len_list = len(list)
if type(list[0]).__name__ == 'float':
callback = "self.wrapper_template.swig_v%s_doubles(prefix)" % len_list
try:
eval(callback)
except:
warnings.warn("Couldn't evaluate the following callback: %s. Not swigging anything for you here." %
callback)
def autocode_model(self, model, prefix):
#print( "\nmodel = ", model)
dir_model = dir(model)
field_names = copy.copy(dir_model)
for k in range(0, len(dir_model)):
fieldName = dir_model[k]
fieldValue = getattr(model, fieldName)
fieldTypeName = type(fieldValue).__name__
fieldTypeFull = str(type(fieldValue))
if (sys.version_info < (3, 0)):
# this and __ and SwigPyObject and instancemethod
if fieldName[0:2] == '__' or fieldName[0:4] == 'this' or \
fieldTypeName == 'SwigPyObject' or fieldTypeName == 'instancemethod':
field_names.remove(fieldName)
continue
# class
elif fieldTypeFull[1:6] == 'class':
class_prefix = "%s.%s" % (prefix, fieldName)
#print( "class_prefix = %s. RECURSION NEEDED." % class_prefix)
# character array
elif fieldTypeName == 'str':
self.wrapper_template.swig_string(field_name=fieldName)
elif fieldTypeName == 'int': # and fieldValue is not 0:
if fieldValue == 0:
continue
else:
self.wrapper_template.swig_numeric_value(field_name=fieldName, field_type=fieldTypeName)
elif fieldTypeName == 'float': # and fieldValue is not 0.0:
if fieldValue == 0:
#print( "Value iz zero for fieldName = %s, of type = %s" % (fieldName, fieldTypeName))
continue
else:
self.wrapper_template.swig_numeric_value(field_name=fieldName, field_type='double')
# handle numeric lists
elif fieldTypeName == 'list':
if all(v == 0 for v in fieldValue):
# print( "Value iz zero for fieldName = %s, of type = %s" % (fieldName, fieldTypeName))
continue
else:
self.classify_list(list=fieldValue, prefix=fieldName)
else:
pass
#print( "field_name: %s\t field_value: %s\t field_type_name: %s\t field_type_full: %s\t" % (fieldName, fieldValue, fieldTypeName, fieldTypeFull))
else:
# this and __ and SwigPyObject and instancemethod
if fieldName[0:2] == '__' or fieldName[0:4] == 'this' or \
fieldTypeName == 'SwigPyObject' or fieldTypeName == 'instancemethod':
field_names.remove(fieldName)
continue
# class
elif 'Basilisk' in fieldTypeFull:
class_prefix = "%s.%s" % (prefix, fieldName)
# print( "class_prefix = %s. RECURSION NEEDED." % class_prefix)
# character array
elif fieldTypeName == 'str':
self.wrapper_template.swig_string(field_name=fieldName)
elif fieldTypeName == 'int': # and fieldValue is not 0:
if fieldValue == 0:
continue
else:
self.wrapper_template.swig_numeric_value(field_name=fieldName, field_type=fieldTypeName)
elif fieldTypeName == 'float': # and fieldValue is not 0.0:
if fieldValue == 0:
# print( "Value iz zero for fieldName = %s, of type = %s" % (fieldName, fieldTypeName))
continue
else:
self.wrapper_template.swig_numeric_value(field_name=fieldName, field_type='double')
# handle numeric lists
elif fieldTypeName == 'list':
if all(v == 0 for v in fieldValue):
# print( "Value iz zero for fieldName = %s, of type = %s" % (fieldName, fieldTypeName))
continue
else:
self.classify_list(list=fieldValue, prefix=fieldName)
# # non-array variable
else:
pass
# print( "field_name: %s\t field_value: %s\t field_type_name: %s\t field_type_full: %s\t" % (fieldName, fieldValue, fieldTypeName, fieldTypeFull))
#print( "field_names = ", field_names)
#print( "swigged vars = ", self.wrapper_template.get_swigged_keys())
[docs]class SwigNamer(object):
"""
Class to handler creation of setters and getters (i.e. SWIG functionality)
"""
def __init__(self, field_name):
self.config_str = "this->config_data.%s" % field_name
self.input_str = "new_%s" % field_name
self.set_call = "Set_%s" % field_name
self.output_str = "local_%s" % field_name
self.get_call = "Get_%s" % field_name
[docs]class MpyWrapCodeTemplate(object):
"""
Template class defining the glue-code to include in the MicroPython C++ Wrapper source
"""
def __init__(self, model_tag, algs_dict, class_name, hpp_line):
self.wrap_name = 'wrap_%s' % model_tag
self.class_name = class_name
self.str_mpy_wrap_header = ""
self.str_mpy_wrap_struct_names = ""
self.str_mpy_wrap_init = ""
self.str_mpy_wrap_props = ""
self.initialize_code(model_tag, algs_dict, hpp_line)
def initialize_code(self, model_tag, algs_dict, hpp_line):
# Create MPy Wrapper Class
self.str_mpy_wrap_header = hpp_line + '\n'
struct_name = '%s_FunctionNames' % model_tag
self.str_mpy_wrap_struct_names = "struct %s" % struct_name + \
"\n{\n" + \
"\tfunc_name_def(SelfInit)\n" + \
"\tfunc_name_def(Update)\n"
if 'Reset' in algs_dict:
self.str_mpy_wrap_struct_names += "\tfunc_name_def(Reset)\n"
self.str_mpy_wrap_struct_names += "};\n"
self.str_mpy_wrap_init = '\tupywrap::ClassWrapper < %s > %s("%s", mod);\n' \
% (self.class_name, self.wrap_name, self.class_name) + \
'\t%s.DefInit <> ();\n' % self.wrap_name + \
'\t%s.Def < %s::SelfInit > (& %s::SelfInit);\n' % (self.wrap_name, struct_name, self.class_name) + \
'\t%s.Def < %s::Update > (& %s::UpdateState);\n' % (self.wrap_name, struct_name, self.class_name)
if 'Reset' in algs_dict:
self.str_mpy_wrap_init += '\t%s.Def < %s::Reset > (& %s::Reset);\n' % (self.wrap_name, struct_name, self.class_name)
# print( "str_mpy_wrap_struct_names =\n", self.str_mpy_wrap_struct_names)
# print( "str_mpy_wrap_init =\n", self.str_mpy_wrap_init)
def add_properties_to_wrap(self, current_vars_swig_dict):
str_props = ''
# str_props = '%s.Property("%s", &%s::%s, &%s::%s);\n' % (self.wrap_name, "ModelTag", self.class_name, values[0], self.class_name, values[1])
for key, values in current_vars_swig_dict.items():
prop_line = '\t%s.Property("%s", &%s::%s, &%s::%s);\n' % (self.wrap_name, key, self.class_name, values[0],
self.class_name, values[1])
str_props += prop_line
self.str_mpy_wrap_props = str_props
return
def write_module_wrap_code(self, module_mpy_file):
the_string = self.str_mpy_wrap_header + \
self.str_mpy_wrap_struct_names + \
self.str_mpy_wrap_init + self.str_mpy_wrap_props + \
'\n\n'
module_mpy_file.write(the_string)
[docs]class CppWrapClassTemplate(object):
"""
Template class defining the C++ wrapper class
"""
def __init__(self):
self.str_class_algs = ""
self.str_class_swig = ""
self.str_class_config = ""
self.current_model = ""
self.current_vars_swig_dict = dict()
self.current_props_swig_dict = dict()
self.current_mpy_code_model = None
def reset(self):
# print( "CppWrapClassTemplate now resetting(...)")
self.str_class_algs = ""
self.str_class_swig = ""
self.str_class_config = ""
self.current_model = ""
self.current_vars_swig_dict = dict()
self.current_props_swig_dict = dict()
self.current_mpy_code_model = None
def get_swigged_keys(self):
return self.current_vars_swig_dict.keys()
def generate_mpy_wrapper_code(self, abs_path_file_name):
self.current_mpy_code_model.add_properties_to_wrap(self.current_props_swig_dict)
self.current_mpy_code_model.write_module_wrap_code(abs_path_file_name)
return
def create_new_template(self, model_tag, header_line, c_struct_name, algs_dict, hpp_line):
self.current_model = model_tag
# Create C++ class
compile_def_name = 'WRAP_%s_HPP' % model_tag
#print( "compile_def_name = ", compile_def_name)
str_compile_def = '#ifndef ' + compile_def_name + '\n' + \
'#define ' + compile_def_name + '\n\n'
str_includes = '#include <iostream>\n' + \
'#include "utilities/linearAlgebra.h"\n' + \
'#include "_GeneralModuleFiles/sys_model.h"\n' + \
header_line + '\n'
class_name = model_tag + "Class"
# class_name = c_struct_name + "Class"
str_class = 'class %s: public SysModel {\n' % class_name + \
'public: \n' + \
'\t%s(){ memset(&this->config_data, 0x0, sizeof(%s));}\n' % (class_name, c_struct_name) + \
'\t~%s(){return;}\n' % class_name
str_callbacks = '\tvoid SelfInit(%s){ %s }\n' % (algs_dict['SelfInit'][1], algs_dict['SelfInit'][0]) + \
'\tvoid UpdateState(%s){ %s }\n' % (algs_dict['Update'][1], algs_dict['Update'][0])
if 'Reset' in algs_dict:
str_callbacks += '\tvoid Reset(%s){ %s }\n\n' % (algs_dict['Reset'][1], algs_dict['Reset'][0])
self.str_class_algs = str_compile_def + str_includes + str_class + str_callbacks
str_c_data = 'private: \n' + \
'\t%s config_data;' % c_struct_name
str_end = '\n}; \n\n#endif'
self.str_class_config = str_c_data + str_end
self.swig_model_tag()
self.current_mpy_code_model = MpyWrapCodeTemplate(model_tag, algs_dict, class_name, hpp_line)
def join_swigged_data(self):
str_swig = ""
for key, values in self.current_vars_swig_dict.items():
str_swig += values[0] + values[1]
self.str_class_swig = str_swig
#print( "str_class_swig = ", self.str_class_swig))
def write_new_class(self, abs_path_file_name):
print( "NEW WRAP: output_file_name = ", abs_path_file_name)
the_string = self.str_class_algs + self.str_class_swig + self.str_class_config
#print( "\nthe_string=\n", the_string)
#return
# Write cpp class
file_mode = 'w+' # create file to be written if it doesn't exist
#alg_class = open(self.output_store_path + '/' + h_file_name + '.hpp', file_mode)
output_file = open(abs_path_file_name, file_mode)
output_file.write(the_string)
output_file.close()
def swig_model_tag(self):
field_name = "ModelTag"
swig = SwigNamer(field_name=field_name)
# Setter
setter_str = "\tvoid %s(std::string %s){\n" % (swig.set_call, swig.input_str) + \
"\t\tthis->%s = %s;\n" % (field_name, swig.input_str) + \
"\t}\n"
# Getter
getter_str = "\tstd::string %s() const{\n" % swig.get_call + \
"\t\treturn(this->%s);\n" % field_name + \
"\t}\n"
self.current_vars_swig_dict[field_name] = setter_str, getter_str
self.current_props_swig_dict[field_name] = swig.set_call, swig.get_call
def swig_string(self, field_name):
swig = SwigNamer(field_name)
# Setter
setter_str = "\tvoid %s(std::string %s){\n" % (swig.set_call, swig.input_str) + \
"\t\tmemset(%s, '\\0', sizeof(char) * MAX_STAT_MSG_LENGTH);\n" % swig.config_str + \
"\t\tstrncpy(%s, %s.c_str(), %s.length());\n" % (swig.config_str, swig.input_str, swig.input_str) + \
"\t}\n"
# Getter
getter_str = "\tstd::string %s() const{\n" % swig.get_call + \
"\t\tstd::string %s(%s);\n" % (swig.output_str, swig.config_str) + \
"\t\treturn(%s);\n" % swig.output_str + \
"\t}\n"
self.current_vars_swig_dict[field_name] = setter_str, getter_str
self.current_props_swig_dict[field_name] = swig.set_call, swig.get_call
# print( "\nswig_string(). field_name = ", field_name)
# print( "setter_str = \n", setter_str)
# print( "getter_str = \n", getter_str)
return
def swig_numeric_value(self, field_name, field_type):
swig = SwigNamer(field_name)
# Setter
setter_str = "\tvoid %s(%s %s) {\n" % (swig.set_call, field_type, swig.input_str) + \
"\t\t%s = %s;\n" % (swig.config_str, swig.input_str) + \
"\t}\n"
# Getter
getter_str = "\t%s %s() const {\n" % (field_type, swig.get_call) + \
"\t\treturn (%s);\n" % swig.config_str + \
"\t}\n"
self.current_vars_swig_dict[field_name] = setter_str, getter_str
self.current_props_swig_dict[field_name] = swig.set_call, swig.get_call
def swig_v3_doubles(self, field_name):
swig = SwigNamer(field_name)
# Setter
setter_str = "\tvoid %s(std::vector<double>%s) {\n" % (swig.set_call, swig.input_str) + \
"\t\tv3Copy(%s.data(), %s);\n" % (swig.input_str, swig.config_str) + \
"\t}\n"
# Getter
getter_str = "\tstd::vector<double> %s() const {\n" % swig.get_call + \
"\t\tstd::vector<double> %s(%s, %s + sizeof(%s) / sizeof(%s[0]) );\n" % \
(swig.output_str, swig.config_str, swig.config_str, swig.config_str, swig.config_str) + \
"\t\treturn (%s);\n" % swig.output_str + \
"\t}\n"
self.current_vars_swig_dict[field_name] = setter_str, getter_str
self.current_props_swig_dict[field_name] = swig.set_call, swig.get_call
# print( "\nswig_v3_doubles(). field_name = ", field_name)
# print( "setter_str = \n", setter_str)
# print( "getter_str = \n", getter_str)
return
def swig_v9_doubles(self, field_name):
swig = SwigNamer(field_name)
# Setter
setter_str = "\tvoid %s(std::vector<double>%s) {\n" % (swig.set_call, swig.input_str) + \
"\t\tm33Copy(RECAST3X3 %s.data(), RECAST3X3 %s);\n" % (swig.input_str, swig.config_str) + \
"\t}\n"
# Getter
getter_str = "\tstd::vector<double> %s() const {\n" % swig.get_call + \
"\t\tstd::vector<double> %s(%s, %s + sizeof(%s) / sizeof(%s[0]) );\n" % \
(swig.output_str, swig.config_str, swig.config_str, swig.config_str, swig.config_str) + \
"\t\treturn (%s);\n" % swig.output_str + \
"\t}\n"
self.current_vars_swig_dict[field_name] = setter_str, getter_str
self.current_props_swig_dict[field_name] = swig.set_call, swig.get_call
# print( "\nswig_v9_doubles(). field_name = ", field_name)
# print( "setter_str = \n", setter_str)
# print( "getter_str = \n", getter_str)
return