Source code for test_NonSwigMessaging

#
#  Copyright (c) 2023, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

#
# Purpose:  Test if a C-wrapped input message can be logged with a recorder module and that a C++ module can access its data using non-swig modules
# Author: Scott Piggott
# Creation Date:  Aug. 17 2024
#


from Basilisk.architecture import bskLogging
from Basilisk.architecture import messaging
from Basilisk.architecture import sim_model
from Basilisk.moduleTemplates import cModuleTemplate
from Basilisk.moduleTemplates import cppModuleTemplate
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import macros
from Basilisk.utilities import unitTestSupport as uts


[docs] def test_RecordingInputMessages(): """ testing recording a C-wrapped input message with the recorder module """ bskLogging.setDefaultLogLevel(bskLogging.BSK_WARNING) testFailCount = 0 # zero unit test result counter testMessages = [] # create empty array to store test log messages # Create a sim module as an empty container scSim = SimulationBaseClass.SimBaseClass() # create the simulation process dynProcess = scSim.CreateNewProcess("dynamicsProcess") # create the dynamics task and specify the integration update time dynProcess.addTask(scSim.CreateNewTask("dynamicsTask", macros.sec2nano(1.))) # create modules mod1 = cModuleTemplate.cModuleTemplate() mod1.ModelTag = "cModule1" scSim.AddModelToTask("dynamicsTask", mod1) # Write input data inputData = messaging.CModuleTemplateMsgPayload() inputData.dataVector = [1, 2, 3] inputDataMsg = messaging.CModuleTemplateMsg().write(inputData) # Subscribe input message to stand alone message inputAddr = sim_model.getObjectAddress(inputDataMsg) mod1.dataInMsg.subscribeTo(inputAddr) # Create recorders tied to IO messages dataInRec = mod1.dataInMsg.recorder() scSim.AddModelToTask("dynamicsTask", dataInRec) dataOutRec = mod1.dataOutMsg.recorder() scSim.AddModelToTask("dynamicsTask", dataOutRec) attGuidMsg = messaging.CModuleTemplateMsg_C() attGuidMsgPayload = messaging.CModuleTemplateMsgPayload() attGuidMsg.write(attGuidMsgPayload) messaging.CModuleTemplateMsg_C_addAuthor(mod1.dataOutMsg, attGuidMsg) dataOut2Rec = attGuidMsg.recorder() scSim.AddModelToTask("dynamicsTask", dataOut2Rec) # create modules mod1CPP = cppModuleTemplate.CppModuleTemplate() mod1CPP.ModelTag = "cppModule1" scSim.AddModelToTask("dynamicsTask", mod1CPP) # Subscribe input message to C-module output message address cOutAddr = sim_model.getObjectAddress(mod1.dataOutMsg) mod1CPP.dataInMsg.subscribeToCAddr(cOutAddr) # Create recorders tied to IO messages dataInRecCPP = mod1CPP.dataInMsg.recorder() scSim.AddModelToTask("dynamicsTask", dataInRecCPP) dataOutRecCPP = mod1CPP.dataOutMsg.recorder() scSim.AddModelToTask("dynamicsTask", dataOutRecCPP) # initialize Simulation: scSim.InitializeSimulation() # configure a simulation stop time and execute the simulation run scSim.ConfigureStopTime(macros.sec2nano(1.0)) scSim.ExecuteSimulation() # reading the module output message show not change the earlier redirection # further, we are testing that the read() command copies the payload from # the stand alone msg to the module output module tempSet = mod1.dataOutMsg.read().dataVector scSim.ConfigureStopTime(macros.sec2nano(2.0)) scSim.ExecuteSimulation() # print(dataInRec.dataVector) # print(dataOutRec.dataVector) # print(dataOut2Rec.dataVector) testFailCount, testMessages = uts.compareArray([inputData.dataVector]*3 , dataInRec.dataVector , 0.01 , "recorded input message was not correct." , testFailCount , testMessages) testFailCount, testMessages = uts.compareArray([[0, 0, 0], [0, 0, 0], [3, 2, 3]] , dataOutRec.dataVector , 0.01 , "recorded module output message was not correct." , testFailCount , testMessages) testFailCount, testMessages = uts.compareArray([[2, 2, 3], [3, 2, 3], [4, 2, 3]] , dataOut2Rec.dataVector , 0.01 , "recorded redirected module output message was not correct." , testFailCount , testMessages) testFailCount, testMessages = uts.compareArray([[4., 2., 3.]] , [mod1.dataOutMsg.read().dataVector] , 0.01 , "read of module output message was not correct." , testFailCount , testMessages) testFailCount, testMessages = uts.compareArray([[4, 2, 3]] , [attGuidMsg.read().dataVector] , 0.01 , "read of module redirected output message was not correct." , testFailCount , testMessages) #Check that the input message subscribed with address reflects data right testFailCount, testMessages = uts.compareArray(dataOutRec.dataVector , dataInRecCPP.dataVector , 0.01 , "recorded input message from addr was not correct." , testFailCount , testMessages) dataRecExpectation = dataInRecCPP.dataVector dummyVal = 1 for i in range(dataRecExpectation.shape[0]): dataRecExpectation[i, 0] += dummyVal dummyVal += 1 #Check that the output message using input subscribed with address is right testFailCount, testMessages = uts.compareArray(dataRecExpectation , dataOutRecCPP.dataVector , 0.01 , "recorded output message from addr was not correct." , testFailCount , testMessages) if testFailCount: print(testMessages) # each test method requires a single assert method to be called # this check below just makes sure no sub-test failures were found assert testFailCount < 1, testMessages
if __name__ == "__main__": test_RecordingInputMessages()