import multiprocessing as mp
import os
import pickle
import numpy as np
import pandas as pd
[docs]
class DataWriter(mp.Process):
""" Class to be launched as separate process to pull data from queue and write out to .csv dataFrames
Args:
q: queue object from multiprocessing.Manager.queue
Returns:
Nil
"""
def __init__(self, q):
super(DataWriter, self).__init__()
self._queue = q
self._endToken = None
self._varCast = None
self._logDir = ""
self._dataFiles = set()
[docs]
def run(self):
""" The process run loop. Gets data from a queue and writes it out to per message csv files
Args:
Nil
Returns:
Nil
"""
while self._endToken is None:
data, mcSimIndex, self._endToken = self._queue.get()
print("Starting to log: " + str(mcSimIndex))
if self._endToken:
continue
print("Logging Dataframes from run " + str(mcSimIndex))
for dictName, dictData in data.items(): # Loops through Messages, Variables, Custom dictionaries in the retention policy
for itemName, itemData in dictData.items(): # Loop through all items and their data
if itemName == "OrbitalElements.Omega": # Protects from OS that aren't case sensitive.
itemName = "OrbitalElements.Omega_Capital"
filePath = self._logDir + itemName + ".data"
self._dataFiles.add(filePath)
# Is the data a vector, scalar, or non-existant?
try:
variLen = itemData[:,1:].shape[1]
except:
variLen = 0
# Generate the MultiLabel
outerLabel = [mcSimIndex]
innerLabel = []
for i in range(variLen):
innerLabel.append(i)
if variLen == 0:
innerLabel.append(0) # May not be necessary, might be able to leave blank and get a None
labels = pd.MultiIndex.from_product([outerLabel, innerLabel], names=["runNum", "varIdx"])
# Generate the individual run's dataframe
if variLen >= 2:
df = pd.DataFrame(itemData[:, 1:].tolist(), index=itemData[:,0], columns=labels)
elif variLen == 1:
df = pd.DataFrame(itemData[:, 1].tolist(), index=itemData[:,0], columns=labels)
else:
df = pd.DataFrame([np.nan], columns=labels)
for i in range(0, variLen):
try: # if the data is numeric reduce it to float32 rather than float64 to reduce storage footprint
# Note: You might think you can simplify these three lines into a single:
# df.iloc[:,i] = df.iloc[:,i].apply(pandas.to_numeric, downcast="float")
# but you'd be wrong.
varComp = df.iloc[:,i]
if self._varCast != None:
varComp = pd.to_numeric(varComp, downcast='float')
df.iloc[:,i] = varComp
except:
pass
# If the .data file doesn't exist save the dataframe to create the file
# and skip the remainder of the loop
if not os.path.exists(filePath):
pickle.dump([df], open(filePath, "wb"))
continue
# If the .data file does exists, append the message's pickle.
with open(filePath, "a+b") as pkl:
pickle.dump([df], pkl)
print("Finished logging dataframes from run" + str(mcSimIndex))
# Sort by the MultiIndex (first by run number then by variable component)
print("Starting to concatenate dataframes")
for filePath in self._dataFiles:
# We create a new index so that we populate any missing run data (in the case that a run breaks) with NaNs.
allData = []
with open(filePath, 'rb') as pkl:
try:
while True:
allData.extend(pickle.load(pkl))
except EOFError:
pass
allData = pd.concat(allData, axis=1)
newMultInd = pd.MultiIndex.from_product([list(range(allData.columns.min()[0], allData.columns.max()[0]+1)),
list(range(allData.columns.min()[1], allData.columns.max()[1]+1))],
names=["runNum", "varIdx"])
#allData = allData.sort_index(axis=1, level=[0,1]) #TODO: When we dont lose MCs anymore, we should just use this call
allData = allData.reindex(columns=newMultInd)
allData.index.name = 'time[ns]'
allData.to_pickle(filePath)
print("Finished concatenating dataframes")
def setLogDir(self, logDir):
self._logDir = logDir
def setVarCast(self, varCast):
self._varCast = varCast