Source code for DataWriter

import multiprocessing as mp
import pandas as pd
import numpy as np
import pickle
import os

[docs]class DataWriter(mp.Process): """ Class to be launched as separate process to pull data from queue and write out to .csv dataFrames Args: q: queue object from multiprocessing.Manager.queue Returns: Nil """ def __init__(self, q): super(DataWriter, self).__init__() self._queue = q self._endToken = None self._varCast = None self._logDir = "" self._dataFiles = set()
[docs] def run(self): """ The process run loop. Gets data from a queue and writes it out to per message csv files Args: Nil Returns: Nil """ while self._endToken is None: data, mcSimIndex, self._endToken = self._queue.get() print("Starting to log: " + str(mcSimIndex)) if self._endToken: continue print("Logging Dataframes from run " + str(mcSimIndex)) for dictName, dictData in data.items(): # Loops through Messages, Variables, Custom dictionaries in the retention policy for itemName, itemData in dictData.items(): # Loop through all items and their data if itemName == "OrbitalElements.Omega": # Protects from OS that aren't case sensitive. itemName = "OrbitalElements.Omega_Capital" filePath = self._logDir + itemName + ".data" self._dataFiles.add(filePath) # Is the data a vector, scalar, or non-existant? try: variLen = itemData[:,1:].shape[1] except: variLen = 0 # Generate the MultiLabel outerLabel = [mcSimIndex] innerLabel = [] for i in range(variLen): innerLabel.append(i) if variLen == 0: innerLabel.append(0) # May not be necessary, might be able to leave blank and get a None labels = pd.MultiIndex.from_product([outerLabel, innerLabel], names=["runNum", "varIdx"]) # Generate the individual run's dataframe if variLen >= 2: df = pd.DataFrame(itemData[:, 1:].tolist(), index=itemData[:,0], columns=labels) elif variLen == 1: df = pd.DataFrame(itemData[:, 1].tolist(), index=itemData[:,0], columns=labels) else: df = pd.DataFrame([np.nan], columns=labels) for i in range(0, variLen): try: # if the data is numeric reduce it to float32 rather than float64 to reduce storage footprint # Note: You might think you can simplify these three lines into a single: # df.iloc[:,i] = df.iloc[:,i].apply(pandas.to_numeric, downcast="float") # but you'd be wrong. varComp = df.iloc[:,i] if self._varCast != None: varComp = pd.to_numeric(varComp, downcast='float') df.iloc[:,i] = varComp except: pass # If the .data file doesn't exist save the dataframe to create the file # and skip the remainder of the loop if not os.path.exists(filePath): pickle.dump([df], open(filePath, "wb")) continue # If the .data file does exists, append the message's pickle. with open(filePath, "a+b") as pkl: pickle.dump([df], pkl) print("Finished logging dataframes from run" + str(mcSimIndex)) # Sort by the MultiIndex (first by run number then by variable component) print("Starting to concatenate dataframes") for filePath in self._dataFiles: # We create a new index so that we populate any missing run data (in the case that a run breaks) with NaNs. allData = [] with open(filePath, 'rb') as pkl: try: while True: allData.extend(pickle.load(pkl)) except EOFError: pass allData = pd.concat(allData, axis=1) newMultInd = pd.MultiIndex.from_product([list(range(allData.columns.min()[0], allData.columns.max()[0]+1)), list(range(allData.columns.min()[1], allData.columns.max()[1]+1))], names=["runNum", "varIdx"]) #allData = allData.sort_index(axis=1, level=[0,1]) #TODO: When we dont lose MCs anymore, we should just use this call allData = allData.reindex(columns=newMultInd) allData.index.name = 'time[ns]' allData.to_pickle(filePath) print("Finished concatenating dataframes")
def setLogDir(self, logDir): self._logDir = logDir def setVarCast(self, varCast): self._varCast = varCast