Source code for test_partitionedStorageUnit


# ISC License
#
# Copyright (c) 2016, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


import inspect
import os
import pytest
import numpy as np

filename = inspect.getframeinfo(inspect.currentframe()).filename
path = os.path.dirname(os.path.abspath(filename))
bskName = 'Basilisk'
splitPath = path.split(bskName)

# Import all of the modules that we are going to be called in this simulation
from Basilisk.utilities import SimulationBaseClass
from Basilisk.simulation import partitionedStorageUnit
from Basilisk.architecture import messaging
from Basilisk.utilities import macros


params_storage_limits = [(1200, 1200, 2400, 2400),
                     (600, 1200, 3600, 3600),
                     (600, 600, 10000, 6000)]

[docs]@pytest.mark.parametrize("baudRate_1, baudRate_2, storageCapacity, expectedStorage", params_storage_limits) def test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage): """ Tests: 1. Whether the partitionedStorageUnit can add multiple nodes (core base class functionality); 2. That the partitionedStorageUnit correctly evaluates how much stored data it should have given a pair of baud input messages. :return: """ unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.1) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit() test_storage_unit.storageCapacity = storageCapacity # bit capacity. dataMsg1 = messaging.DataNodeUsageMsgPayload() dataMsg1.baudRate = baudRate_1 # baud dataMsg1.dataName = "node_1_msg" dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1) dataMsg2 = messaging.DataNodeUsageMsgPayload() dataMsg2.baudRate = baudRate_2 # baud dataMsg2.dataName = "node_2_msg" dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2) # Test the addNodeToStorage method: test_storage_unit.addDataNodeToModel(dat1Msg) test_storage_unit.addDataNodeToModel(dat2Msg) unitTestSim.AddModelToTask(unitTaskName, test_storage_unit) dataLog = test_storage_unit.storageUnitDataOutMsg.recorder() unitTestSim.AddModelToTask(unitTaskName, dataLog) unitTestSim.InitializeSimulation() unitTestSim.ConfigureStopTime(macros.sec2nano(5.0)) unitTestSim.ExecuteSimulation() storedDataLog = dataLog.storageLevel capacityLog = dataLog.storageCapacity netBaudLog = dataLog.currentNetBaud # Check 1 - is net baud rate correct? for ind in range(0,len(netBaudLog)): currentBaud = netBaudLog[ind] np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-1, err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate.")) # Check 2 - is used storage space correct? np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4, err_msg=("FAILED: PartitionedStorageUnit did not track integrated data.")) # Check 3 - is the amount of data more than zero and less than the capacity? for ind in range(0,len(storedDataLog)): assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind], capacityLog[ind]), ( "FAILED: PartitionedStorageUnit's stored data exceeded its capacity.") assert storedDataLog[ind] >= 0., ( "FAILED: PartitionedStorageUnit's stored data was negative.")
params_set_data = [(1200, 1200, ['test'], [1200], 2400, 2400), (600, 600, ['test'], [0], 10000, 6000), (600, 600, ['test'], [4000], 10000, 10000), (0, 0, ['test'], [1000], 2000, 1000), (0, 0, ['test'], [-1000], 2000, 0), (1000, 0, ['node_1_msg'], [-2000], 6000, 3000), (0, 0, ['test'], [3000], 2000, 0), (600, 0, ['node_1_msg'], [3000], 10000, 6000), (300, 600, ['node_1_msg'], [3000], 10000, 7500), (600, 600, ['node_1_msg', 'node_2_msg'], [1000, 1000], 10000, 8000), (600, 300, ['test', 'node_2_msg'], [1000, 1000], 10000, 6500)]
[docs]@pytest.mark.parametrize( "baudRate_1, baudRate_2, partitionName, add_data, storageCapacity, expectedStorage", params_set_data) def test_set_data_buffer(baudRate_1, baudRate_2, partitionName, add_data, storageCapacity, expectedStorage): """ Tests: 1. Whether the partitionedStorageUnit properly adds data in different partitions using the setDataBuffer method; 2. That the partitionedStorageUnit correctly evaluates how much stored data it should have given a pair of input messages and using setDataBuffer. :return: """ unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.1) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit() test_storage_unit.storageCapacity = storageCapacity # bit capacity. dataMsg1 = messaging.DataNodeUsageMsgPayload() dataMsg1.baudRate = baudRate_1 # baud dataMsg1.dataName = "node_1_msg" dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1) dataMsg2 = messaging.DataNodeUsageMsgPayload() dataMsg2.baudRate = baudRate_2 # baud dataMsg2.dataName = "node_2_msg" dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2) # Test the addNodeToStorage method: test_storage_unit.addDataNodeToModel(dat1Msg) test_storage_unit.addDataNodeToModel(dat2Msg) unitTestSim.AddModelToTask(unitTaskName, test_storage_unit) dataLog = test_storage_unit.storageUnitDataOutMsg.recorder() unitTestSim.AddModelToTask(unitTaskName, dataLog) # Initialize the partition initData = [0 for i in range(0, len(partitionName))] test_storage_unit.setDataBuffer(partitionName, initData) unitTestSim.InitializeSimulation() sim_time = 5.0 unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0)) unitTestSim.ExecuteSimulation() test_storage_unit.setDataBuffer(partitionName, add_data) unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time)) unitTestSim.ExecuteSimulation() storedDataLog = dataLog.storageLevel capacityLog = dataLog.storageCapacity netBaudLog = dataLog.currentNetBaud # Check 1 - is net baud rate correct? for ind in range(0,len(netBaudLog)): currentBaud = netBaudLog[ind] np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-4, err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate.")) # Check 2 - is used storage space correct? np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4, err_msg=("FAILED: PartitionedStorageUnit did not track integrated data.")) # Check 3 - is the amount of data more than zero and less than the capacity? for ind in range(0,len(storedDataLog)): assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind], capacityLog[ind]), ( "FAILED: PartitionedStorageUnit's stored data exceeded its capacity.") assert storedDataLog[ind] >= 0., ( "FAILED: PartitionedStorageUnit's stored data was negative.")
params_set_data = [(600, 600, ['test'], [0], 1E4), (0, 0, ['test'], [1000], 2000), (1000, 0, ['node_1_msg'], [-2000], 6000), (600, 0, ['node_1_msg'], [3000], 1e4), (600, 600, ['node_1_msg'], [3000], 10000), (600, 600, ['node_1_msg', 'node_2_msg'], [1e3, 1e3], 10000), (600, 300, ['test', 'node_1_msg', 'node_2_msg'], [1500, 1e3, 1000], 10000)]
[docs]@pytest.mark.parametrize( "baudRate_1, baudRate_2, partitionName, add_data, storageCapacity", params_set_data) def test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName, add_data, storageCapacity): """ Tests: 1. Whether the partitionedStorageUnit manages the data in already existing and new partitions correctly; :return: """ unitTaskName = "unitTask" # arbitrary name (don't change) unitProcessName = "TestProcess" # arbitrary name (don't change) # Create a sim module as an empty container unitTestSim = SimulationBaseClass.SimBaseClass() # Create test thread testProcessRate = macros.sec2nano(0.1) # update process rate update time testProc = unitTestSim.CreateNewProcess(unitProcessName) testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate)) test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit() test_storage_unit.storageCapacity = storageCapacity # bit capacity. dataMsg1 = messaging.DataNodeUsageMsgPayload() dataMsg1.baudRate = baudRate_1 # baud dataMsg1.dataName = "node_1_msg" dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1) dataMsg2 = messaging.DataNodeUsageMsgPayload() dataMsg2.baudRate = baudRate_2 # baud dataMsg2.dataName = "node_2_msg" dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2) # Test the addNodeToStorage method: test_storage_unit.addDataNodeToModel(dat1Msg) test_storage_unit.addDataNodeToModel(dat2Msg) unitTestSim.AddModelToTask(unitTaskName, test_storage_unit) dataLog = test_storage_unit.storageUnitDataOutMsg.recorder() unitTestSim.AddModelToTask(unitTaskName, dataLog) # Initialize the partition initData = [0 for i in range(0, len(partitionName))] test_storage_unit.setDataBuffer(partitionName, initData) unitTestSim.InitializeSimulation() sim_time = 5.0 unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0)) unitTestSim.ExecuteSimulation() test_storage_unit.setDataBuffer(partitionName, add_data) unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time)) unitTestSim.ExecuteSimulation() dataNameVec = dataLog.storedDataName dataVec = dataLog.storedData # Check 1 - are the partition names in the data name vector? for partName in partitionName: assert partName in list(dataNameVec[-1]), ( "FAILED: PartitionedStorageUnit did not add the new partition.") partIndex = list(dataNameVec[-1]).index(partName) dataIndex = list(partitionName).index(partName) if partName == "test": baudRate = 0.0 elif partName == "node_1_msg": baudRate = baudRate_1 elif partName == "node_2_msg": baudRate = baudRate_2 # Check 2 - if partition exists, does it added data to the correct partition? np.testing.assert_allclose(dataVec[-1][partIndex], add_data[dataIndex] + baudRate*sim_time, err_msg = ( "FAILED: PartitionedStorageUnit did not use the correct partition."))
if __name__ == "__main__": baudRate_1 = 1200 baudRate_2 = 1200 storageCapacity = 2400 expectedStorage = 2400 test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage) add_data = [1200, 200] partitionName = ["test", "node_1_msg"] test_set_data_buffer(baudRate_1, baudRate_2, partitionName, add_data, storageCapacity, expectedStorage) storageCapacity = 20000 test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName, add_data, storageCapacity)