Source code for test_okeefeEKF

#
#  ISC License
#
#  Copyright (c) 2016, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import inspect
import os
import sys

import numpy as np
import pytest

filename = inspect.getframeinfo(inspect.currentframe()).filename
path = os.path.dirname(os.path.abspath(filename))
splitPath = path.split('FswAlgorithms')
sys.path.append(splitPath[0] + '/modules')
sys.path.append(splitPath[0] + '/PythonModules')

import SunLineOEKF_test_utilities as FilterPlots
from Basilisk.fswAlgorithms import okeefeEKF
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import macros
from Basilisk.utilities import unitTestSupport  # general support file with common unit test functions
from Basilisk.architecture import messaging


def setupFilterData(filterObject):
    filterObject.sensorUseThresh = 0.
    filterObject.state = [1.0, 1.0, 1.0]
    filterObject.omega = [0.1, 0.2, 0.1]
    filterObject.x = [1.0, 0.0, 1.0]
    filterObject.covar = [0.4, 0.0, 0.0,
                          0.0, 0.4, 0.0,
                          0.0, 0.0, 0.4]

    filterObject.qProcVal = 0.1**2
    filterObject.qObsVal = 0.001
    filterObject.eKFSwitch = 5. #If low (0-5), the CKF kicks in easily, if high (>10) it's mostly only EKF

[docs]def test_all_functions_oekf(show_plots): """Module Unit Test""" [testResults, testMessage] = sunline_individual_test() assert testResults < 1, testMessage [testResults, testMessage] = StatePropStatic() assert testResults < 1, testMessage [testResults, testMessage] = StatePropVariable(show_plots) assert testResults < 1, testMessage
# uncomment this line is this test is to be skipped in the global unit test run, adjust message as needed # @pytest.mark.skipif(conditionstring) # uncomment this line if this test has an expected failure, adjust message as needed # @pytest.mark.xfail(True) # The following 'parametrize' function decorator provides the parameters and expected results for each # of the multiple test runs for this test. @pytest.mark.parametrize("SimHalfLength, AddMeasNoise , testVector1 , testVector2, stateGuess", [ (200, True ,[-0.7, 0.7, 0.0] ,[0.8, 0.9, 0.0], [0.7, 0.7, 0.0]), (2000, True ,[-0.7, 0.7, 0.0] ,[0.8, 0.9, 0.0], [0.7, 0.7, 0.0]), (200, False ,[-0.7, 0.7, 0.0] ,[0.8, 0.9, 0.0], [0.7, 0.7, 0.0]), (200, False ,[0., 0.4, -0.4] ,[0., 0.7, 0.2], [0.3, 0.0, 0.6]), (200, True ,[0., 0.4, -0.4] ,[0.4, 0.5, 0.], [0.7, 0.7, 0.0]), (200, True ,[-0.7, 0.7, 0.0] ,[0.8, 0.9, 0.0], [0.7, 0.7, 0.0]) ]) # uncomment this line is this test is to be skipped in the global unit test run, adjust message as needed # @pytest.mark.skipif(conditionstring) # uncomment this line if this test has an expected failure, adjust message as needed # @pytest.mark.xfail() # need to update how the RW states are defined # provide a unique test method name, starting with test_ def test_all_sunline_oekf(show_plots, SimHalfLength, AddMeasNoise, testVector1, testVector2, stateGuess): [testResults, testMessage] = StateUpdateSunLine(show_plots, SimHalfLength, AddMeasNoise, testVector1, testVector2, stateGuess) assert testResults < 1, testMessage def sunline_individual_test(): # The __tracebackhide__ setting influences pytest showing of tracebacks: # the mrp_steering_tracking() function will not be shown unless the # --fulltrace command line option is specified. __tracebackhide__ = True testFailCount = 0 # zero unit test result counter testMessages = [] # create empty list to store test log messages NUMSTATES = 3 ################################################################################### ## Testing dynamics matrix computation ################################################################################### inputOmega = [0.1, 0.2, 0.1] dt =0.5 expDynMat = - np.array([[0., -inputOmega[2], inputOmega[1]], [inputOmega[2], 0., -inputOmega[0]], [ -inputOmega[1], inputOmega[0], 0.]]) dynMat = okeefeEKF.new_doubleArray(3*3) for i in range(9): okeefeEKF.doubleArray_setitem(dynMat, i, 0.0) okeefeEKF.sunlineDynMatrixOkeefe(inputOmega, dt, dynMat) DynOut = [] for i in range(NUMSTATES*NUMSTATES): DynOut.append(okeefeEKF.doubleArray_getitem(dynMat, i)) DynOut = np.array(DynOut).reshape(3, 3) errorNorm = np.linalg.norm(expDynMat - DynOut) if(errorNorm > 1.0E-12): print(errorNorm) testFailCount += 1 testMessages.append("Dynamics Matrix generation Failure \n") ################################################################################### ## Testing omega computation ################################################################################### inputStates = [2,1,0.75] inputPrevStates = [1,0.1,0.5] norm1 = np.linalg.norm(np.array(inputStates)) norm2 = np.linalg.norm(np.array(inputPrevStates)) dt =0.5 expOmega = 1./dt*np.cross(np.array(inputStates),np.array(inputPrevStates))/(norm1*norm2)*np.arccos(np.dot(np.array(inputStates),np.array(inputPrevStates))/(norm1*norm2)) omega = okeefeEKF.new_doubleArray(NUMSTATES) for i in range(3): okeefeEKF.doubleArray_setitem(omega, i, 0.0) okeefeEKF.sunlineRateCompute(inputStates, dt, inputPrevStates, omega) omegaOut = [] for i in range(NUMSTATES): omegaOut.append(okeefeEKF.doubleArray_getitem(omega, i)) omegaOut = np.array(omegaOut) errorNorm = np.linalg.norm(expOmega - omegaOut) if(errorNorm > 1.0E-12): print(errorNorm) testFailCount += 1 testMessages.append("Dynamics Matrix generation Failure \n") ################################################################################### ## STM and State Test ################################################################################### inputStates = [2,1,0.75] inputOmega = [0.1, 0.2, 0.1] dt =0.5 stateTransition = okeefeEKF.new_doubleArray(NUMSTATES*NUMSTATES) states = okeefeEKF.new_doubleArray(NUMSTATES) prev_states = okeefeEKF.new_doubleArray(NUMSTATES) for i in range(NUMSTATES): okeefeEKF.doubleArray_setitem(states, i, inputStates[i]) for j in range(NUMSTATES): if i==j: okeefeEKF.doubleArray_setitem(stateTransition, NUMSTATES*i+j, 1.0) else: okeefeEKF.doubleArray_setitem(stateTransition, NUMSTATES*i+j, 0.0) okeefeEKF.sunlineStateSTMProp(expDynMat.flatten().tolist(), dt, inputOmega, states, prev_states, stateTransition) PropStateOut = [] PropSTMOut = [] for i in range(NUMSTATES): PropStateOut.append(okeefeEKF.doubleArray_getitem(states, i)) for i in range(NUMSTATES*NUMSTATES): PropSTMOut.append(okeefeEKF.doubleArray_getitem(stateTransition, i)) STMout = np.array(PropSTMOut).reshape([NUMSTATES,NUMSTATES]) StatesOut = np.array(PropStateOut) expectedSTM = dt*np.dot(expDynMat, np.eye(NUMSTATES)) + np.eye(NUMSTATES) expectedStates = np.zeros(NUMSTATES) inputStatesArray = np.array(inputStates) ## Equations when removing the unobservable states from d_dot expectedStates[0:3] = np.array(inputStatesArray - dt*(np.cross(np.array(inputOmega), np.array(inputStatesArray)))) errorNormSTM = np.linalg.norm(expectedSTM - STMout) errorNormStates = np.linalg.norm(expectedStates - StatesOut) if(errorNormSTM > 1.0E-12): print(errorNormSTM) testFailCount += 1 testMessages.append("STM Propagation Failure \n") if(errorNormStates > 1.0E-12): print(errorNormStates) testFailCount += 1 testMessages.append("State Propagation Failure \n") ################################################################################### # ## Test the H and yMeas matrix generation as well as the observation count # ################################################################################### numCSS = 4 cssCos = [np.cos(np.deg2rad(10.)), np.cos(np.deg2rad(25.)), np.cos(np.deg2rad(5.)), np.cos(np.deg2rad(90.))] sensorTresh = np.cos(np.deg2rad(50.)) cssNormals = [1.,0.,0.,0.,1.,0., 0.,0.,1., 1./np.sqrt(2), 1./np.sqrt(2),0.] cssBias = [1.0 for i in range(numCSS)] measMat = okeefeEKF.new_doubleArray(8*NUMSTATES) obs = okeefeEKF.new_doubleArray(8) yMeas = okeefeEKF.new_doubleArray(8) numObs = okeefeEKF.new_intArray(1) for i in range(8*NUMSTATES): okeefeEKF.doubleArray_setitem(measMat, i, 0.) for i in range(8): okeefeEKF.doubleArray_setitem(obs, i, 0.0) okeefeEKF.doubleArray_setitem(yMeas, i, 0.0) okeefeEKF.sunlineHMatrixYMeas(inputStates, numCSS, cssCos, sensorTresh, cssNormals, cssBias, obs, yMeas, numObs, measMat) obsOut = [] yMeasOut = [] numObsOut = [] HOut = [] for i in range(8*NUMSTATES): HOut.append(okeefeEKF.doubleArray_getitem(measMat, i)) for i in range(8): yMeasOut.append(okeefeEKF.doubleArray_getitem(yMeas, i)) obsOut.append(okeefeEKF.doubleArray_getitem(obs, i)) numObsOut.append(okeefeEKF.intArray_getitem(numObs, 0)) #Fill in expected values for test expectedH = np.zeros([8,NUMSTATES]) expectedY = np.zeros(8) for j in range(3): expectedH[j,0:3] = np.eye(3)[j,:] expectedY[j] =np.array(cssCos[j]) - np.dot(np.array(inputStates)[0:3], np.array(cssNormals)[j*3:(j+1)*3]) expectedObs = np.array([np.cos(np.deg2rad(10.)), np.cos(np.deg2rad(25.)), np.cos(np.deg2rad(5.)),0.,0.,0.,0.,0.]) expectedNumObs = 3 HOut = np.array(HOut).reshape([8, NUMSTATES]) errorNorm = np.zeros(4) errorNorm[0] = np.linalg.norm(HOut - expectedH) errorNorm[1] = np.linalg.norm(yMeasOut - expectedY) errorNorm[2] = np.linalg.norm(obsOut - expectedObs) errorNorm[3] = np.linalg.norm(numObsOut[0] - expectedNumObs) for i in range(4): if(errorNorm[i] > 1.0E-12): testFailCount += 1 testMessages.append("H and yMeas update failure \n") # ################################################################################### # ## Test the Kalman Gain # ################################################################################### numObs = 3 h = [1., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.] covar = [1., 0., 1., 0., 1., 0., 1., 0., 1. ] noise= 0.01 Kalman = okeefeEKF.new_doubleArray(NUMSTATES * 8) for i in range(8 * NUMSTATES): okeefeEKF.doubleArray_setitem(Kalman, i, 0.) okeefeEKF.sunlineKalmanGainOkeefe(covar, h, noise, numObs, Kalman) KalmanOut = [] for i in range(8 * NUMSTATES): KalmanOut.append(okeefeEKF.doubleArray_getitem(Kalman, i)) # Fill in expected values for test Hmat = np.array(h).reshape([8,NUMSTATES]) Pk = np.array(covar).reshape([NUMSTATES,NUMSTATES]) R = noise*np.eye(3) expectedK = np.dot(np.dot(Pk, Hmat[0:numObs,:].T), np.linalg.inv(np.dot(np.dot(Hmat[0:numObs,:], Pk), Hmat[0:numObs,:].T) + R[0:numObs,0:numObs])) KalmanOut = np.array(KalmanOut)[0:NUMSTATES*numObs].reshape([NUMSTATES, 3]) errorNorm = np.linalg.norm(KalmanOut[:,0:numObs] - expectedK) if (errorNorm > 1.0E-12): print(errorNorm) testFailCount += 1 testMessages.append("Kalman Gain update failure \n") # ################################################################################### # ## Test the EKF update # ################################################################################### KGain = [1.,2.,3., 0., 1., 2., 1., 0., 1.] for i in range(NUMSTATES*8-NUMSTATES*3): KGain.append(0.) inputStates = [2,1,0.75] xbar = [0.1, 0.2, 0.01] numObs = 3 h = [1., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.] covar = [1., 0., 1., 0., 1., 0., 1., 0., 1., ] noise = 0.01 inputY = np.zeros(3) for j in range(3): inputY[j] = np.array(cssCos[j]) - np.dot(np.array(inputStates)[0:3], np.array(cssNormals)[j * 3:(j + 1) * 3]) inputY = inputY.tolist() stateError = okeefeEKF.new_doubleArray(NUMSTATES) covarMat = okeefeEKF.new_doubleArray(NUMSTATES*NUMSTATES) inputs = okeefeEKF.new_doubleArray(NUMSTATES) for i in range(NUMSTATES): okeefeEKF.doubleArray_setitem(stateError, i, 0.) okeefeEKF.doubleArray_setitem(inputs, i, inputStates[i]) for j in range(NUMSTATES): okeefeEKF.doubleArray_setitem(covarMat,i+j,0.) okeefeEKF.okeefeEKFUpdate(KGain, covar, noise, numObs, inputY, h, inputs, stateError, covarMat) stateOut = [] covarOut = [] errorOut = [] for i in range(NUMSTATES): stateOut.append(okeefeEKF.doubleArray_getitem(inputs, i)) errorOut.append(okeefeEKF.doubleArray_getitem(stateError, i)) for j in range(NUMSTATES*NUMSTATES): covarOut.append(okeefeEKF.doubleArray_getitem(covarMat, j)) # Fill in expected values for test KK = np.array(KGain)[0:NUMSTATES*3].reshape([NUMSTATES,3]) expectedStates = np.array(inputStates) + np.dot(KK, np.array(inputY)) H = np.array(h).reshape([8,NUMSTATES])[0:3,:] Pk = np.array(covar).reshape([NUMSTATES, NUMSTATES]) R = noise * np.eye(3) expectedP = np.dot(np.dot(np.eye(NUMSTATES) - np.dot(KK, H), Pk), np.transpose(np.eye(NUMSTATES) - np.dot(KK, H))) + np.dot(KK, np.dot(R,KK.T)) errorNorm = np.zeros(2) errorNorm[0] = np.linalg.norm(np.array(stateOut) - expectedStates) errorNorm[1] = np.linalg.norm(expectedP - np.array(covarOut).reshape([NUMSTATES,NUMSTATES])) for i in range(2): if(errorNorm[i] > 1.0E-12): testFailCount += 1 testMessages.append("EKF update failure \n") # # ################################################################################### # ## Test the CKF update # ################################################################################### KGain = [1., 2., 3.] for i in range(NUMSTATES * 8 - NUMSTATES * 3): KGain.append(0.) inputStates = [2, 1, 0.75] xbar = [0.1, 0.2, 0.01] numObs = 3 h = [1., 0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.] covar = [1., 0., 1., 0., 1., 0., 1., 0., 1.] noise =0.01 inputY = np.zeros(3) for j in range(3): inputY[j] = np.array(cssCos[j]) - np.dot(np.array(inputStates)[0:3], np.array(cssNormals)[j * 3:(j + 1) * 3]) inputY = inputY.tolist() stateError = okeefeEKF.new_doubleArray(NUMSTATES) covarMat = okeefeEKF.new_doubleArray(NUMSTATES * NUMSTATES) for i in range(NUMSTATES): okeefeEKF.doubleArray_setitem(stateError, i, xbar[i]) for j in range(NUMSTATES): okeefeEKF.doubleArray_setitem(covarMat, i + j, 0.) okeefeEKF.sunlineCKFUpdateOkeefe(xbar, KGain, covar, noise, numObs, inputY, h, stateError, covarMat) covarOut = [] errorOut = [] for i in range(NUMSTATES): errorOut.append(okeefeEKF.doubleArray_getitem(stateError, i)) for j in range(NUMSTATES*NUMSTATES): covarOut.append(okeefeEKF.doubleArray_getitem(covarMat, j)) # Fill in expected values for test KK = np.array(KGain)[0:NUMSTATES * 3].reshape([NUMSTATES, 3]) H = np.array(h).reshape([8, NUMSTATES])[0:3, :] expectedStateError = np.array(xbar) + np.dot(KK, (np.array(inputY) - np.dot(H, np.array(xbar)))) Pk = np.array(covar).reshape([NUMSTATES, NUMSTATES]) expectedP = np.dot(np.dot(np.eye(NUMSTATES) - np.dot(KK, H), Pk), np.transpose(np.eye(NUMSTATES) - np.dot(KK, H))) + np.dot(KK, np.dot( R, KK.T)) errorNorm = np.zeros(2) errorNorm[0] = np.linalg.norm(np.array(errorOut) - expectedStateError) errorNorm[1] = np.linalg.norm(expectedP - np.array(covarOut).reshape([NUMSTATES, NUMSTATES])) for i in range(2): if (errorNorm[i] > 1.0E-12): testFailCount += 1 testMessages.append("CKF update failure \n") # print out success message if no error were found if testFailCount == 0: print("PASSED: " + " EKF individual tests") # return fail count and join into a single string all messages in the list # testMessage return [testFailCount, ''.join(testMessages)] #################################################################################### # Test for the time and update with static states (zero d_dot) #################################################################################### def StatePropStatic(): # The __tracebackhide__ setting influences pytest showing of tracebacks: # the mrp_steering_tracking() function will not be shown unless the # --fulltrace command line option is specified. __tracebackhide__ = True NUMSTATES=3 testFailCount = 0 # zero unit test result counter testMessages = [] # create empty list to store test log messages unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.5) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) # Construct algorithm and associated C++ container module = okeefeEKF.okeefeEKF() module.ModelTag = "okeefeEKF" # Add test module to runtime call list unitTestSim.AddModelToTask(unitTaskName, module) setupFilterData(module) module.omega = [0.,0.,0.] kfLog = module.logger(["covar", "state"], testProcessRate*10) unitTestSim.AddModelToTask(unitTaskName, kfLog) # connect messages cssDataInMsg = messaging.CSSArraySensorMsg() cssConfigInMsg = messaging.CSSConfigMsg() module.cssDataInMsg.subscribeTo(cssDataInMsg) module.cssConfigInMsg.subscribeTo(cssConfigInMsg) unitTestSim.InitializeSimulation() unitTestSim.ConfigureStopTime(macros.sec2nano(8000.0)) unitTestSim.ExecuteSimulation() stateLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.state) for i in range(NUMSTATES): if (abs(stateLog[-1, i + 1] - stateLog[0, i + 1]) > 1.0E-10): print(abs(stateLog[-1, i + 1] - stateLog[0, i + 1])) testFailCount += 1 testMessages.append("Static state propagation failure \n") # print out success message if no error were found if testFailCount == 0: print("PASSED: " + "EKF static state propagation") # return fail count and join into a single string all messages in the list # testMessage return [testFailCount, ''.join(testMessages)] #################################################################################### # Test for the time and update with changing states non-zero omega #################################################################################### def StatePropVariable(show_plots): # The __tracebackhide__ setting influences pytest showing of tracebacks: # the mrp_steering_tracking() function will not be shown unless the # --fulltrace command line option is specified. __tracebackhide__ = True NUMSTATES=3 testFailCount = 0 # zero unit test result counter testMessages = [] # create empty list to store test log messages unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.5) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) # Construct algorithm and associated C++ container module = okeefeEKF.okeefeEKF() module.ModelTag = "okeefeEKF" # Add test module to runtime call list unitTestSim.AddModelToTask(unitTaskName, module) setupFilterData(module) InitialState = module.state Initialx = module.x InitialCovar = module.covar InitOmega = module.omega module.state = InitialState kfLog = module.logger(["covar", "state", "stateTransition", "x", "omega"], testProcessRate) unitTestSim.AddModelToTask(unitTaskName, kfLog) # connect messages cssDataInMsg = messaging.CSSArraySensorMsg() cssConfigInMsg = messaging.CSSConfigMsg() module.cssDataInMsg.subscribeTo(cssDataInMsg) module.cssConfigInMsg.subscribeTo(cssConfigInMsg) unitTestSim.InitializeSimulation() unitTestSim.ConfigureStopTime(macros.sec2nano(1000.0)) unitTestSim.ExecuteSimulation() covarLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.covar) stateLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.state) stateErrorLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.x) stmLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.stateTransition) omegaLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.omega) dt = 0.5 expectedOmega = np.zeros([2001, (NUMSTATES + 1)]) expectedStateArray = np.zeros([2001,(NUMSTATES+1)]) expectedPrevArray = np.zeros([2001,(NUMSTATES+1)]) expectedStateArray[0,1:(NUMSTATES+1)] = np.array(InitialState) expectedOmega[0,1:(NUMSTATES+1)] = np.array(InitOmega) expectedXBar = np.zeros([2001,NUMSTATES+1]) expectedXBar[0,1:(NUMSTATES+1)] = np.array(Initialx) expectedSTM = np.zeros([2001,NUMSTATES,NUMSTATES]) expectedSTM[0,:,:] = np.eye(NUMSTATES) expectedCovar = np.zeros([2001,NUMSTATES*NUMSTATES+1]) expectedCovar[0,1:(NUMSTATES*NUMSTATES+1)] = np.array(InitialCovar) expDynMat = np.zeros([2001,NUMSTATES,NUMSTATES]) Gamma = dt ** 2. / 2. * np.eye(3) ProcNoiseCovar = np.dot(Gamma, np.dot(module.qProcVal*np.eye(3),Gamma.T)) for i in range(1,2001): expectedStateArray[i,0] = dt*i*1E9 expectedPrevArray[i,0] = dt*i*1E9 expectedOmega[i,0] = dt*i*1E9 expectedCovar[i,0] = dt*i*1E9 expectedXBar[i,0] = dt*i*1E9 #Simulate sunline Dyn Mat expDynMat[i-1, :, :] = - np.array([[0., -expectedOmega[i-1, 3], expectedOmega[i-1,2]], [expectedOmega[i-1,3], 0., -expectedOmega[i-1,1]], [ -expectedOmega[i-1,2], expectedOmega[i-1,1], 0.]]) #Simulate STM State prop expectedStateArray[i,1:(NUMSTATES+1)] = np.array(expectedStateArray[i-1,1:(NUMSTATES+1)] - dt*(np.cross(np.array(expectedOmega[i-1,1:(NUMSTATES+1)]), np.array(expectedStateArray[i-1,1:(NUMSTATES+1)])))) expectedPrevArray[i, 1:(NUMSTATES + 1)] = expectedStateArray[i-1,1:(NUMSTATES+1)] expectedSTM[i,:,:] = dt * np.dot(expDynMat[i-1,:,:], np.eye(NUMSTATES)) + np.eye(NUMSTATES) # Simulate Rate compute normdk = np.linalg.norm(expectedStateArray[i, 1:(NUMSTATES + 1)]) nomrdkmin1 = np.linalg.norm(expectedPrevArray[i, 1:(NUMSTATES + 1)]) arg = np.dot(expectedStateArray[i, 1:(NUMSTATES + 1)], expectedPrevArray[i , 1:(NUMSTATES + 1)]) / (normdk * nomrdkmin1) if arg>1: expectedOmega[i, 1:(NUMSTATES + 1)] = 1./dt*np.cross(expectedStateArray[i, 1:(NUMSTATES + 1)], expectedPrevArray[i, 1:(NUMSTATES + 1)]) / (normdk * nomrdkmin1) * np.arccos(1) elif arg<-1: expectedOmega[i, 1:(NUMSTATES + 1)] = 1./dt*np.cross(expectedStateArray[i, 1:(NUMSTATES + 1)], expectedPrevArray[i, 1:(NUMSTATES + 1)]) / ( normdk * nomrdkmin1) * np.arccos(-1) else: expectedOmega[i, 1:(NUMSTATES + 1)] = 1./dt*np.cross(expectedStateArray[i, 1:(NUMSTATES + 1)],expectedPrevArray[i, 1:(NUMSTATES + 1)]) / (normdk * nomrdkmin1) * np.arccos(arg) expectedXBar[i, 1:(NUMSTATES+1)] = np.dot(expectedSTM[i, :, :], expectedXBar[i - 1, 1:(NUMSTATES+1)]) expectedCovar[i,1:(NUMSTATES*NUMSTATES+1)] = (np.dot(expectedSTM[i,:,:], np.dot(np.reshape(expectedCovar[i-1,1:(NUMSTATES*NUMSTATES+1)],[NUMSTATES,NUMSTATES]), np.transpose(expectedSTM[i,:,:])))+ ProcNoiseCovar).flatten() FilterPlots.StatesVsExpected(stateLog, expectedStateArray, show_plots) FilterPlots.StatesPlotCompare(stateErrorLog, expectedXBar, covarLog, expectedCovar, show_plots) FilterPlots.OmegaVsExpected(expectedOmega, omegaLog, show_plots) for j in range(1,2001): for i in range(NUMSTATES): if (abs(stateLog[j, i + 1] - expectedStateArray[j, i + 1]) > 1.0E-10): testFailCount += 1 testMessages.append("General state propagation failure: State Prop \n") if (abs(stateErrorLog[j, i + 1] - expectedXBar[j, i + 1]) > 1.0E-10): testFailCount += 1 testMessages.append("General state propagation failure: State Error Prop \n") for i in range(NUMSTATES*NUMSTATES): if (abs(covarLog[j, i + 1] - expectedCovar[j, i + 1]) > 1.0E-8): print(abs(covarLog[j, i + 1] - expectedCovar[j, i + 1])) abs(covarLog[j, i + 1] - expectedCovar[j, i + 1]) testFailCount += 1 # testMessages.append("General state propagation failure: Covariance Prop \n") if (abs(stmLog[j, i + 1] - expectedSTM[j,:].flatten()[i]) > 1.0E-10): testFailCount += 1 testMessages.append("General state propagation failure: STM Prop \n") # print out success message if no error were found if testFailCount == 0: print("PASSED: " + "EKF general state propagation") else: print(testMessages) # return fail count and join into a single string all messages in the list # testMessage return [testFailCount, ''.join(testMessages)] #################################################################################### # Test for the full filter with time and measurement update #################################################################################### def StateUpdateSunLine(show_plots, SimHalfLength, AddMeasNoise, testVector1, testVector2, stateGuess): # The __tracebackhide__ setting influences pytest showing of tracebacks: # the mrp_steering_tracking() function will not be shown unless the # --fulltrace command line option is specified. __tracebackhide__ = True NUMSTATES=3 testFailCount = 0 # zero unit test result counter testMessages = [] # create empty list to store test log messages unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.5) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) # Construct algorithm and associated C++ container module = okeefeEKF.okeefeEKF() module.ModelTag = "okeefeEKF" # Add test module to runtime call list unitTestSim.AddModelToTask(unitTaskName, module) setupFilterData(module) module.omega = [0.,0.,0.] # Set up some test parameters cssConstelation = messaging.CSSConfigMsgPayload() CSSOrientationList = [ [0.70710678118654746, -0.5, 0.5], [0.70710678118654746, -0.5, -0.5], [0.70710678118654746, 0.5, -0.5], [0.70710678118654746, 0.5, 0.5], [-0.70710678118654746, 0, 0.70710678118654757], [-0.70710678118654746, 0.70710678118654757, 0.0], [-0.70710678118654746, 0, -0.70710678118654757], [-0.70710678118654746, -0.70710678118654757, 0.0], ] CSSBias = [1.0 for i in range(len(CSSOrientationList))] totalCSSList = [] i=0 for CSSHat in CSSOrientationList: newCSS = messaging.CSSUnitConfigMsgPayload() newCSS.CBias = CSSBias[i] newCSS.nHat_B = CSSHat totalCSSList.append(newCSS) i = i + 1 cssConstelation.nCSS = len(CSSOrientationList) cssConstelation.cssVals = totalCSSList inputData = messaging.CSSArraySensorMsgPayload() cssConstInMsg = messaging.CSSConfigMsg().write(cssConstelation) cssDataInMsg = messaging.CSSArraySensorMsg() # connect messages module.cssDataInMsg.subscribeTo(cssDataInMsg) module.cssConfigInMsg.subscribeTo(cssConstInMsg) dt =0.5 stateTarget1 = testVector1 module.state = stateGuess module.x = (np.array(stateTarget1) - np.array(stateGuess)).tolist() kfLog = module.logger("x", testProcessRate) unitTestSim.AddModelToTask(unitTaskName, kfLog) dataLog = module.filtDataOutMsg.recorder() unitTestSim.AddModelToTask(unitTaskName, dataLog) unitTestSim.InitializeSimulation() for i in range(SimHalfLength): if i > 20: dotList = [] for element in CSSOrientationList: if AddMeasNoise: dotProd = np.dot(np.array(element), np.array(testVector1)[0:3]) + np.random.normal(0., module.qObsVal) else: dotProd = np.dot(np.array(element), np.array(testVector1)[0:3]) dotList.append(dotProd) inputData.CosValue = dotList cssDataInMsg.write(inputData, unitTestSim.TotalSim.CurrentNanos) unitTestSim.ConfigureStopTime(macros.sec2nano((i + 1) * 0.5)) unitTestSim.ExecuteSimulation() stateLog = dataLog.state covarLog = dataLog.covar if not AddMeasNoise: for i in range(NUMSTATES): if (abs(covarLog[-1, i * NUMSTATES + i] - covarLog[0, i * NUMSTATES + i] / 100.) > 1E-1): testFailCount += 1 testMessages.append("Covariance update failure") if (abs(stateLog[-1, i] - stateTarget1[i]) > 1.0E-10): testFailCount += 1 testMessages.append("State update failure") else: for i in range(NUMSTATES): if (abs(covarLog[-1, i * NUMSTATES + i] - covarLog[0, i * NUMSTATES + i] / 100.) > 1E-1): testFailCount += 1 testMessages.append("Covariance update failure with noise") if (abs(stateLog[-1, i] - stateTarget1[i]) > 1.0E-2): testFailCount += 1 testMessages.append("State update failure with noise") stateTarget2 = testVector2 inputData = messaging.CSSArraySensorMsgPayload() for i in range(SimHalfLength): if i > 20: dotList = [] for element in CSSOrientationList: if AddMeasNoise: dotProd = np.dot(np.array(element), np.array(testVector2)[0:3]) + np.random.normal(0., module.qObsVal) else: dotProd = np.dot(np.array(element), np.array(testVector2)[0:3]) dotList.append(dotProd) inputData.CosValue = dotList cssDataInMsg.write(inputData, unitTestSim.TotalSim.CurrentNanos) unitTestSim.ConfigureStopTime(macros.sec2nano((i + SimHalfLength+1) * 0.5)) unitTestSim.ExecuteSimulation() stateErrorLog = unitTestSupport.addTimeColumn(kfLog.times(), kfLog.x) stateLog = dataLog.state postFitLog = dataLog.postFitRes covarLog = dataLog.covar if not AddMeasNoise: for i in range(NUMSTATES): if (abs(covarLog[-1, i * NUMSTATES + i] - covarLog[0, i * NUMSTATES + i] / 100.) > 1E-1): testFailCount += 1 testMessages.append("Covariance update failure") if (abs(stateLog[-1, i] - stateTarget2[i]) > 1.0E-10): testFailCount += 1 testMessages.append("State update failure") else: for i in range(NUMSTATES): if (abs(covarLog[-1, i * NUMSTATES + i] - covarLog[0, i * NUMSTATES + i] / 100.) > 1E-1): testFailCount += 1 testMessages.append("Covariance update failure") if (abs(stateLog[-1, i] - stateTarget2[i]) > 1.0E-2): testFailCount += 1 testMessages.append("State update failure") target1 = np.array(testVector1) target2 = np.array(testVector2) FilterPlots.StatesPlot(dataLog.times(), stateErrorLog, covarLog, show_plots) FilterPlots.StatesVsTargets(dataLog.times(), target1, target2, stateLog, show_plots) FilterPlots.PostFitResiduals(dataLog.times(), postFitLog, module.qObsVal, show_plots) # print out success message if no error were found if testFailCount == 0: print("PASSED: " + "EKF full test") else: print((testMessages)) # return fail count and join into a single string all messages in the list # testMessage return [testFailCount, ''.join(testMessages)] if __name__ == "__main__": StateUpdateSunLine(True, 200, True ,[-0.7, 0.7, 0.0] ,[0.8, 0.9, 0.0], [0.7, 0.7, 0.0]) # sunline_individual_test() # StatePropStatic() # StatePropVariable(True)