Source code for pisa.utils.hdfchain

"""
class to access hdf5 files chained together.
"""
from __future__ import print_function


import numpy as n
import tables
from glob import glob
from collections import defaultdict


__all__ = ['HDFTableProxy', 'TableAccessor', 'HDFChain']


[docs] class HDFTableProxy(object): def __init__(self, table, files): self.path = str(table._v_pathname) self._v_dtype = table.description._v_dtype self.files = files
[docs] def read(self): # first loop to calculate number of rows lengths = n.zeros(len(self.files), dtype=int) for i, file in enumerate(self.files): try: lengths[i] = len(file.getNode(self.path)) except tables.NoSuchNodeError: print("WARN: node %s does not exist in file %s" % (self.path, file.filename)) lengths[i] = 0 # create result array ... result = n.zeros(lengths.sum(), dtype=self._v_dtype) # .. and fill it for i, file in enumerate(self.files): if lengths[i] == 0: continue result[lengths[:i].sum():lengths[:i].sum()+lengths[i]] = file.getNode(self.path).read() return result
[docs] def read_iter(self): for i, file in enumerate(self.files): yield file.getNode(self.path).read()
[docs] def col_iter(self, colname): for i, file in enumerate(self.files): yield file.getNode(self.path).col(colname)
[docs] def col(self, colname): dtype = self._v_dtype[colname] # first loop to calculate number of rows lengths = n.zeros(len(self.files), dtype=int) #print "INFO: counting rows" for i, file in enumerate(self.files): try: lengths[i] = len(file.getNode(self.path)) except tables.NoSuchNodeError: print("WARN: node %s does not exist in file %s" % (self.path, file.filename)) lengths[i] = 0 # create result array ... result = n.zeros(lengths.sum(), dtype=dtype) # .. and fill it for i, file in enumerate(self.files): #print "INFO: read %d/%d" % (i+1, len(self.files)) if lengths[i] == 0: continue result[lengths[:i].sum():lengths[:i].sum()+lengths[i]] = file.getNode(self.path).col(colname) return result
def __len__(self): length = 0 for i, file in enumerate(self.files): length += len(file.getNode(self.path)) return length def __repr__(self): return ("chained table with %d files:\n" % len(self.files))+self.files[0].getNode(self.path).__repr__()
[docs] class TableAccessor(object): def __init__(self, tabledict): for tabname, proxy in tabledict.items(): self.__dict__[tabname] = proxy def __repr__(self): return ", ".join([key for (key,value) in self.__dict__.items() if type(value) is HDFTableProxy])
[docs] class HDFChain(object): def __init__(self, files, maxdepth=1, verbose=False, **kwargs): """ setup a chain of hdf files. files is either a list of filenames or a glob string kwargs are passed to tables.openFile (e.g. NODE_CACHE_SLOTS) """ self.files = list() self._tables = defaultdict(HDFTableProxy) self.verbose = verbose self.pathes = dict() if self.verbose: print("opening files in chain...") if type(files) is list: if len(files) == 0: raise ValueError("provided file list is empty!") self.files = [tables.openFile(fname, **kwargs) for fname in files ] elif type(files) is str: self.files = [tables.openFile(fname, **kwargs) for fname in sorted(glob(files)) ] if len(self.files) == 0: raise ValueError("glob string matches no file!") else: raise ValueError("parameter files must be either a list of filenames or a globstring") file = self.files[0] if self.verbose: print("walking through first file %s" % file.filename) for table in file.walkNodes(classname="Table"): if table._v_depth > maxdepth: continue if table.name in self._tables: print("WARN: skipping additional occurence of table %s at %s (using %s)!" % (table.name, table._v_pathname, self._tables[table.name].path)) continue else: proxy = HDFTableProxy(table, self.files) self._tables[table.name] = proxy self.pathes[table._v_pathname] = proxy self.root = TableAccessor(self._tables) def __del__(self): for tabname, tabproxy in self._tables.items(): tabproxy.file = None for file in self.files: file.close()
[docs] def getNode(self, path): return self.pathes[path]