Source code for methodsParser

''' '''
'''
 ISC License

 Copyright (c) 2016-2018, Autonomous Vehicle Systems Lab, University of Colorado at Boulder

 Permission to use, copy, modify, and/or distribute this software for any
 purpose with or without fee is hereby granted, provided that the above
 copyright notice and this permission notice appear in all copies.

 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

'''
import sys, os, inspect
import modulesParser as dataParser

filename = inspect.getframeinfo(inspect.currentframe()).filename
path = os.path.dirname(os.path.abspath(filename))

from Basilisk.architecture import alg_contain
from Basilisk.architecture import sim_model
import numpy as np


[docs]def parseSimAlgorithms(TheSim, taskActivityDir, outputCFileName, str_ConfigData, simTag='TheSim', localPath = os.path.dirname(os.path.abspath(filename))): """ This method finds the name of the four main algorithms (speaking generically: self-init, cross-init, update and reset) for every module that needs to be auto-coded. """ def areTasksInSimTaskList(taskActivityDir, TheSim): """ This method checks for matches between the task names instantiated in TheSim and the names defined in the taskActivityDir dictionary. """ taskIdxDir = [] taskOrderedList = [] procLength = len(TheSim.TotalSim.processList) for procIdx in range(procLength): locProcList = [] for i_task in range(len(TheSim.TotalSim.processList[procIdx].processTasks)): theTask = TheSim.TotalSim.processList[procIdx].processTasks[i_task] taskFound = False for ordIdx in range(len(locProcList)): locTask = locProcList[ordIdx] # locTask[0] = taskName # locTask[1] = taskPriority if theTask.taskPriority > locTask[1]: locProcList.insert(ordIdx, [theTask.TaskPtr.TaskName, theTask.taskPriority]) taskFound = True break if taskFound != True: locProcList.append([theTask.TaskPtr.TaskName, theTask.taskPriority, theTask]) taskOrderedList.extend(locProcList) for i_task in range(0, len(taskOrderedList)): # taskOrderedList[i_task][0] = taskName # taskOrderedList[i_task][1] = taskPriority # taskOrderedList[i_task][2] = theTask taskName = taskOrderedList[i_task][0] if taskName in taskActivityDir.keys(): idxUse = getTaskIndex(TheSim, taskOrderedList[i_task][2].TaskPtr) taskIdxDir.append(idxUse) print(i_task, taskName) return taskIdxDir def parseSwigVars(list): """ First Parsing Method. Function: get rid of all the variables that come after a built-in. The methods SelfInit, CrossInit, Update and Restart will always come first, and so will do the variables that are capitalized (this is based on how "dir()" command works in the Python interpreter). :return: parsed array. """ parsed_array = np.array([]) length = len(list) i = 0 while i < length: if list[i][0] != '_': parsed_array = np.append(parsed_array, list[i]) i += 1 else: break return parsed_array def evalParsedList(list, module): """ Second Parsing Method: Collects all the SwigPyObjects present in the list. Only the methods self-init, cross-init, update and reset methods are wrapped by SWIG in the .i files. Therefore they are the only SwigPyObjects. :return: dictionary addressDict = {method, address} """ addressDict = {} for methodName in list: methodObject = eval('sys.modules["' + module + '"].' + methodName) if type(methodObject).__name__ == "SwigPyObject": methodAddress = sim_model.getObjectAddress(methodObject) addressDict[methodName] = int(methodAddress) return addressDict def checkMethodType(methodName): """ This function checks the method type of the input and returns the corresponding strings. :param methodName: name of the model data algorithm :return: corresponding function strings """ str_selfInit = 'SelfInit' str_crossInit = 'CrossInit' str_update = 'Update' str_reset = 'Reset' str_blank = '' # the methods SelfInit and CrossInit don't need the callTime parameter str_callTime = ', callTime' # the methods Reset and Update need an extra parameter for callTine if methodName[0:len(str_selfInit)] == str_selfInit: return (str_selfInit, str_blank) elif methodName[0:len(str_crossInit)] == str_crossInit: return (str_crossInit, str_blank) elif methodName[0:len(str_update)] == str_update: return (str_update, str_callTime) elif methodName[0:len(str_reset)] == str_reset: return (str_reset, str_callTime) else: raise ValueError('Cannot recognize the method. Parse better.') def getTaskModelKey(i_task, i_model): """ This function returns the key name of the NameReplace dictionary according to the index of the task and the index of the model inside the task :param i_task: task index :param i_model: model index :return: key of NameReplace dictionary """ key = 'self.TaskList[' + str(i_task) + '].TaskModels[' + str(i_model) + ']' return key def getTaskIndex(theSim,taskUse): """ This function returns the task index corresponding to the task name """ j=0 for taskPy in theSim.TaskList: if taskUse.TaskName == taskPy.Name: return j j+=1 return -1 def findAddressMatch(addressVal, modelTagKey, dict): """ This function makes sure that each algorithm in a data model is matched with the proper algorithm in the corresponding model wrap. If there's a match, returns the ID of the model. Otherwise, an ugly error is raised and the whole program quits. :param addressVal: address of the model data algorithm :param modelTagKey: modelTag name :param dict: wrap-algorithms' address dictionary in which to look for a match dict[modelTagKey][0] = model wrap algorithm address dict[modelTagKey][1] = model ID :return: """ try: address = dict[modelTagKey][0] if address == addressVal: IDVal = dict[modelTagKey][1] return IDVal except: raise ValueError(str(modelTagKey) + ' is not wrapping all the existing algorithms in ' 'the corresponding data model. Fix it.') def writeTaskAlgs(algName, algList, theVoidList, theAlgList): """ This function progressively creates a list with all the void definitions that will go in the output header file and the algorithms names that will go in the output source file for SelfInit, CrossInit and Reset methods """ void = 'void ' + algName void_header = void + ';\n' void_source = void + '\n{\n' for alg in algList: void_source += '\t' + alg + ';\n' void_source += '}\n' theVoidList.append(void_header) theAlgList.append(void_source) def writeUpdateTaskActivityAlg(algName, globalAlgUpdate, theVoidList, theAlgList): """ This function progressively creates a list with all the void definitions that will go in the output header file and the algorithms names that will go in the output source file for Update methods. It adds an if-check to learn about the task activity flag. Only tasks that are active should be Updated. """ void = 'void ' + algName void_header = void + ';\n' void_source = void + '\n{\n' for updateElem in globalAlgUpdate: void_source += '\t' + 'if (data->' + updateElem[1][0] + '){' + '\n' void_source += '\t\t' + updateElem[0] + '(data, callTime);' + '\n' void_source += '\t' + '}' + '\n' void_source += '}' + '\n' theVoidList.append(void_header) theAlgList.append(void_source) def writeTaskActivityVars(globalAlgUpdate, varType): """ This function creates two lists for the activity flag variables: declaration(header) and initialization(source) """ theTaskActivity_declareList = [] theTaskActivity_initList = [] for updateElem in globalAlgUpdate: # updateElem[0] = algNameUpdate # updateElem[1] = [algNameTaskActivity, boolIsTaskActive] declare_str = varType + ' ' + updateElem[1][0] + ';' + '\n' init_str = 'data->' + updateElem[1][0] + ' = ' + updateElem[1][1] + ';' + '\n' theTaskActivity_declareList.append(declare_str) theTaskActivity_initList.append(init_str) return (theTaskActivity_declareList, theTaskActivity_initList) def findFilePath(file): """ This function looks for the path of the required header files """ ADCSPath = path + '/../fswAlgorithms/' for dirpath, subdirs, files in os.walk(ADCSPath): for x in files: if x == file: relDir = os.path.relpath(dirpath, path) filePath = os.path.join(relDir, file) return filePath def createModuleHeaderName(module, headersList): """ This function appends a module's header string to the global headers list (only if it's not there) """ #print(len(module)) moduleName = module[:len(module) // 2] + '.h' headerPath = findFilePath(moduleName) if(headerPath == None): return header = '#include "' + headerPath + '"\n' if not(header in headersList): headersList.append(header) def createConfigDataHeader(tag, configData, configlist): """ This function creates a list with the declaration of all config data structures """ string = configData + ' ' + tag + ';' + '\n' if not (string in configlist): configlist.append(string) # Model Wraps SelfInit_dict = {} # dictionary D = {modelTag: SelfInit alg address, moduleID} Update_dict = {} # dictionary D = {modelTag: Update alg address, moduleID} Reset_dict = {} # dictionary D = {modelTag: Reset alg address, moduleID} TheSimList = dir(eval(simTag)) i = 0 for elemName in TheSimList: elem = eval(simTag + '.' + elemName) if type(elem) == alg_contain.AlgContain: SelfInit_dict[elem.ModelTag] = (int(elem.getSelfInitAddress()), i) Update_dict[elem.ModelTag] = (int(elem.getUpdateAddress()), i) hasResetAddress = int(elem.getResetAddress()) if (hasResetAddress): Reset_dict[elem.ModelTag] = (hasResetAddress, i) i += 1 # Model Data NameReplaceList = TheSim.NameReplace allAlgSelfInit = [] # global list for all models' SelfInit algorithms globalAllAlgReset = [] # global list for all models' Reset algorithms globalAlgUpdate = [] # theConfigDataList = [] theAlgList = [] # global list for all source algorithms theVoidList = [] # global list for all header void function definitions. theHeadersList = [] # global list for all the module headers ConfigData = '(' + str_ConfigData + ' *data)' ConfigData_callTime = '(' + str_ConfigData + ' *data, uint64_t callTime)' taskIdxDir = areTasksInSimTaskList(taskActivityDir, TheSim) for i_task in taskIdxDir: task = TheSim.TaskList[i_task] isTaskActive = taskActivityDir[task.Name] allAlgUpdate = [] # local list for task models' Update algorithms allAlgReset = [] # local list for task model's Reset algorithms i_model = 0 for model in task.TaskModels: key = getTaskModelKey(i_task, i_model) modelTag = NameReplaceList[key] module = model.__module__ modelConfigDataName = str(type(model).__name__) createConfigDataHeader(modelTag, modelConfigDataName, theConfigDataList) createModuleHeaderName(module, theHeadersList) sysMod = sys.modules[module] dirList = dir(sysMod) parsed_dirList = parseSwigVars(dirList) addressDict = evalParsedList(parsed_dirList, module) for k, v in addressDict.items(): (methodType, methodCallTime) = checkMethodType(k) dictUse = eval(methodType + '_dict') modelID = findAddressMatch(v, modelTag, dictUse) theString = k +'(&(data->' + modelTag + ')' + methodCallTime + ', ' + str(modelID) + ')' theList = eval('allAlg' + methodType) if not(theString in theList): theList.append(theString) i_model += 1 algNameUpdate = task.Name + '_Update' algNameUpdateTaskActivity = task.Name + '_isActive' globalAlgUpdate.append([algNameUpdate, (algNameUpdateTaskActivity,isTaskActive)]) algNameReset = task.Name + '_Reset' writeTaskAlgs(algNameUpdate + ConfigData_callTime, allAlgUpdate, theVoidList, theAlgList) if (allAlgReset): # check if there are any reset methods in the task models writeTaskAlgs(algNameReset + ConfigData_callTime, allAlgReset, theVoidList, theAlgList) for reset in allAlgReset: if not(reset in globalAllAlgReset): globalAllAlgReset.append(reset) algNameAllSelfInit = str_ConfigData + '_AllAlg_SelfInit' algNameAllReset = str_ConfigData + '_AllAlg_Reset' taskNameUpdate = str_ConfigData + '_AllTasks_Update' algNameDataInit = str_ConfigData + '_DataInit' writeTaskAlgs(algNameAllSelfInit + ConfigData, allAlgSelfInit, theVoidList, theAlgList) writeTaskAlgs(algNameAllReset + ConfigData_callTime, globalAllAlgReset, theVoidList, theAlgList) writeUpdateTaskActivityAlg(taskNameUpdate + ConfigData_callTime, globalAlgUpdate, theVoidList, theAlgList) varType = 'uint32_t' (theTaskActivity_declareList, theTaskActivity_initList) = writeTaskActivityVars(globalAlgUpdate, varType) # Open/Create C source&header files and swig file filename = inspect.getframeinfo(inspect.currentframe()).filename fileMode = 'w+' # create file to be written if it doesn't exist alg_source = open(localPath + '/' + outputCFileName + '.c', fileMode) # source file alg_header = open(localPath + '/' + outputCFileName + '.h', fileMode) # header file alg_swig = open(localPath + '/' + outputCFileName + '.i', fileMode) # header file # Write source file alg_source.write('#include "' + outputCFileName + '.h"' + '\n\n') for alg in theAlgList: alg_source.write(alg) alg_source.write('\n') theDataVoid = 'void ' + algNameDataInit + '(' + str_ConfigData + ' *data)' alg_source.write(theDataVoid + '{\n') alg_source.write('\tmemset(data, 0x0, sizeof(' + str_ConfigData + '));\n') for init in theTaskActivity_initList: alg_source.write('\t') alg_source.write(init) dataParser.ParseConfigData(TheSim, taskIdxDir, 'data->', alg_source) alg_source.write('}') # Write header file # begin header file defName = '_FSW_AUTOCODE_' alg_header.write('#ifndef ' + defName + '\n' + '#define ' + defName + '\n\n') # define auto-code init data for header in theHeadersList: alg_header.write(header) alg_header.write('\n' + 'typedef struct{' + '\n') for configData in theConfigDataList: alg_header.write('\t') alg_header.write(configData) for declare in theTaskActivity_declareList: alg_header.write('\t') alg_header.write(declare) alg_header.write('}' + str_ConfigData +';' + '\n') # define auto-code algorithms alg_header.write('\n' + '#ifdef ' + '__cplusplus' + '\n' + 'extern "C" {' + '\n' + '#endif' + '\n') for void in theVoidList: alg_header.write('\t') alg_header.write(void) alg_header.write('\t' + theDataVoid + ';' + '\n') alg_header.write('#ifdef ' + '__cplusplus' + '\n' + '}' + '\n' + '#endif') alg_header.write('\n\n' + '#endif') def writeSwigAlgCode(algNames, swigFile): """ This function writes the swig file """ endL = ';\n' for name in algNames: constant_str = '%constant void ' + name + endL ignore_str = '%ignore ' + name + endL swigFile.write(constant_str + ignore_str) return # begin swig code alg_swig.write('%module ' + outputCFileName + '\n' + '%{' + '\n') alg_swig.write('\t' + '#include "' + outputCFileName + '.h"' + '\n' + '%}' + '\n\n') alg_swig.write('%include "swig_conly_data.i"' + '\n\n') # wrap C auto-code algorithms configData_paramsDefault = '(void*, unit64_t)' configData_paramsCallTime = '(void*, unit64_t, uint64_t)' algNames = [ algNameDataInit +configData_paramsDefault, algNameAllSelfInit + configData_paramsDefault, algNameAllReset + configData_paramsCallTime, taskNameUpdate + configData_paramsCallTime ] writeSwigAlgCode(algNames, alg_swig) # end swig code alg_swig.write('%include "'+ outputCFileName + '.h"' + '\n\n') alg_swig.write('%pythoncode %{' + '\n' + 'import sys' + '\n' + 'protectAllClasses(sys.modules[__name__])' + '\n' + '%}') # Close C source&header files and swig file alg_source.close() alg_header.close() alg_swig.close()