Source code for pisa.utils.hdf

"""Set of utilities for handling HDF5 file I/O"""


from __future__ import absolute_import

from collections.abc import Mapping
from collections import OrderedDict
import os

import numpy as np
import h5py
from six import string_types

from pisa.utils.log import logging, set_verbosity
from pisa.utils.hash import hash_obj
from pisa.utils.resources import find_resource
from pisa.utils.comparisons import recursiveEquality


__all__ = ['HDF5_EXTS', 'from_hdf', 'to_hdf', 'test_hdf']

__author__ = 'S. Boeser, J.L. Lanfranchi'

__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.'''


HDF5_EXTS = ['hdf', 'h5', 'hdf5']


# TODO: convert to allow reading of icetray-produced HDF5 files


[docs] def from_hdf(val, return_node=None, choose=None): """Return the contents of an HDF5 file or node as a nested dict; optionally return a second dict containing any HDF5 attributes attached to the entry-level HDF5 entity. Parameters ---------- val : string or h5py.Group Specifies entry-level entity * If val is a string, it is interpreted as a filename; file is opened as an h5py.File * Otherwise, val must be an h5py.Group in an instantiated object return_node : None or string Not yet implemented choose : None or list Optionally can provide a list of variables names to parse (items not in this list will be skipped, saving time & memory) Returns ------- data : OrderedDict with additional attr of type OrderedDict named `attrs` Nested dictionary; keys are HDF5 node names and values contain the contents of that node. If the entry-level entity of `val` has "attrs", these are extracted and attached as an OrderedDict at `data.attrs`; otherwise, this entity is an empty OrderedDict. """ if return_node is not None: raise NotImplementedError('`return_node` is not yet implemented.') def visit_group(obj, sdict, choose=None): """Iteratively parse `obj` to create the dictionary `sdict`""" name = obj.name.split('/')[-1] if isinstance(obj, h5py.Dataset): if (choose is None) or (name in choose) : sdict[name] = obj[()] if isinstance(obj, (h5py.Group, h5py.File)): sdict[name] = OrderedDict() for sobj in obj.values(): visit_group(sobj, sdict[name], choose) myfile = False if isinstance(val, str): try: root = h5py.File(find_resource(val), 'r') except Exception: logging.error('Failed to load HDF5 file, `val`="%s"', val) raise myfile = True else: root = val logging.trace('root = %s, root.values() = %s', root, root.values()) data = OrderedDict() attrs = OrderedDict() try: # Retrieve attrs if present if hasattr(root, 'attrs'): attrs = OrderedDict(root.attrs) # Run over the whole dataset for obj in root.values(): visit_group(obj, data, choose) finally: if myfile: root.close() data.attrs = attrs return data
[docs] def to_hdf(data_dict, tgt, attrs=None, overwrite=True, warn=True): """Store a (possibly nested) dictionary to an HDF5 file or branch node within an HDF5 file (an h5py Group). This creates hardlinks for duplicate non-trivial leaf nodes (h5py Datasets) to minimize storage space required for redundant datasets. Duplication is detected via object hashing. NOTE: Branch nodes are sorted before storing (by name) for consistency in the generated file despite Python dictionaries having no defined ordering among keys. Parameters ---------- data_dict : Mapping Dictionary, OrderedDict, or other Mapping to be stored tgt : str or h5py.Group Target for storing data. If `tgt` is a str, it is interpreted as a filename; a file is created with that name (overwriting an existing file, if present). After writing, the file is closed. If `tgt` is an h5py.Group, the data is simply written to that Group and it is left open at function return. attrs : Mapping Attributes to apply to the top-level entity being written. See http://docs.h5py.org/en/latest/high/attr.html overwrite : bool Set to `True` (default) to allow overwriting existing file. Raise exception and quit otherwise. warn : bool Issue a warning message if a file is being overwritten. Suppress warning by setting to `False` (e.g. when overwriting is the desired behaviour). """ if not isinstance(data_dict, Mapping): raise TypeError('`data_dict` only accepts top-level' ' dict/OrderedDict/etc.') def store_recursively(fhandle, node, path=None, attrs=None, node_hashes=None): """Function for iteratively doing the work""" path = [] if path is None else path full_path = '/' + '/'.join(path) node_hashes = OrderedDict() if node_hashes is None else node_hashes if attrs is None: sorted_attr_keys = [] else: if isinstance(attrs, OrderedDict): sorted_attr_keys = attrs.keys() else: sorted_attr_keys = sorted(attrs.keys()) if isinstance(node, Mapping): logging.trace(' creating Group "%s"', full_path) try: dset = fhandle.create_group(full_path) for key in sorted_attr_keys: dset.attrs[key] = attrs[key] except ValueError: pass for key in sorted(node.keys()): if isinstance(key, str): key_str = key else: key_str = str(key) logging.warning( 'Making string from key "%s", %s for use as' ' name in HDF5 file', key_str, type(key) ) val = node[key] new_path = path + [key_str] store_recursively(fhandle=fhandle, node=val, path=new_path, node_hashes=node_hashes) else: # Check for existing node node_hash = hash_obj(node) if node_hash in node_hashes: logging.trace(' creating hardlink for Dataset: "%s" -> "%s"', full_path, node_hashes[node_hash]) # Hardlink the matching existing dataset fhandle[full_path] = fhandle[node_hashes[node_hash]] return # For now, convert None to np.nan since h5py appears to not handle # None if node is None: node = np.nan logging.warning( ' encountered `None` at node "%s"; converting to' ' np.nan', full_path ) # "Scalar datasets don't support chunk/filter options". Shuffling # is a good idea otherwise since subsequent compression will # generally benefit; shuffling requires chunking. Compression is # not done here since it is slow, but can be done by # post-processing the generated file(s). if np.isscalar(node): shuffle = False chunks = None else: shuffle = True chunks = True # Store the node_hash for linking to later if this is more than # a scalar datatype. Assumed that "None" has node_hashes[node_hash] = full_path # -- Handle special types -- # # See h5py docs at # # https://docs.h5py.org/en/stable/strings.html#how-to-store-text-strings # # where using `bytes` objects (i.e., in numpy, np.string_) is # deemed the most compatible way to encode objects, but apparently # we don't have pytables compatibility right now. # # For boolean support, see # # https://docs.h5py.org/en/stable/faq.html#faq # TODO: make written hdf5 files compatible with pytables # see docs at https://www.pytables.org/usersguide/datatypes.html if isinstance(node, string_types): node = np.string_(node) elif isinstance(node, bool): node = np.bool_(node) elif isinstance(node, np.ndarray): if issubclass(node.dtype.type, string_types): node = node.astype(np.string_) elif node.dtype.type is bool: node = node.astype(np.bool_) logging.trace(' creating dataset at path "%s", hash %s', full_path, node_hash) try: dset = fhandle.create_dataset( name=full_path, data=node, chunks=chunks, compression=None, shuffle=shuffle, fletcher32=False ) except TypeError: try: shuffle = False chunks = None dset = fhandle.create_dataset( name=full_path, data=node, chunks=chunks, compression=None, shuffle=shuffle, fletcher32=False ) except Exception: logging.error(' full_path: "%s"', full_path) logging.error(' chunks : %s', str(chunks)) logging.error(' shuffle : %s', str(shuffle)) logging.error(' node : "%s"', str(node)) raise for key in sorted_attr_keys: dset.attrs[key] = attrs[key] # Perform the actual operation using the dict passed in by user if isinstance(tgt, str): from pisa.utils.fileio import check_file_exists fpath = check_file_exists(fname=tgt, overwrite=overwrite, warn=warn) h5file = h5py.File(fpath, 'w') try: if attrs is not None: h5file.attrs.update(attrs) store_recursively(fhandle=h5file, node=data_dict) finally: h5file.close() elif isinstance(tgt, h5py.Group): store_recursively(fhandle=tgt, node=data_dict, attrs=attrs) else: raise TypeError('to_hdf: Invalid `tgt` type: %s' % type(tgt))
[docs] def test_hdf(): """Unit tests for hdf module""" from shutil import rmtree from tempfile import mkdtemp data = OrderedDict([ ('top', OrderedDict([ ('secondlvl1', OrderedDict([ ('thirdlvl11', np.linspace(1, 100, 10000).astype(np.float64)), ('thirdlvl12', b"this is a string"), ('thirdlvl13', b"this is another string"), ('thirdlvl14', 1), ('thirdlvl15', 1.1), ('thirdlvl16', np.float32(1.1)), ('thirdlvl17', np.float64(1.1)), ('thirdlvl18', np.int8(1)), ('thirdlvl19', np.int16(1)), ('thirdlvl110', np.int32(1)), ('thirdlvl111', np.int64(1)), ('thirdlvl112', np.uint8(1)), ('thirdlvl113', np.uint16(1)), ('thirdlvl114', np.uint32(1)), ('thirdlvl115', np.uint64(1)), ])), ('secondlvl2', OrderedDict([ ('thirdlvl21', np.linspace(1, 100, 10000).astype(np.float32)), ('thirdlvl22', b"this is a string"), ('thirdlvl23', b"this is another string"), ])), ('secondlvl3', OrderedDict([ ('thirdlvl31', np.array(range(1000)).astype(int)), ('thirdlvl32', b"this is a string"), ])), ('secondlvl4', OrderedDict([ ('thirdlvl41', np.linspace(1, 100, 10000)), ('thirdlvl42', b"this is a string"), ])), ('secondlvl5', OrderedDict([ ('thirdlvl51', np.linspace(1, 100, 10000)), ('thirdlvl52', b"this is a string"), ])), ('secondlvl6', OrderedDict([ ('thirdlvl61', np.linspace(100, 1000, 10000)), ('thirdlvl62', b"this is a string"), ])), ])) ]) temp_dir = mkdtemp() try: fpath = os.path.join(temp_dir, 'to_hdf_noattrs.hdf5') to_hdf(data, fpath, overwrite=True, warn=False) loaded_data1 = from_hdf(fpath) assert data.keys() == loaded_data1.keys() assert recursiveEquality(data, loaded_data1), \ str(data) + "\n" + str(loaded_data1) attrs = OrderedDict([ ('float', 9.98237), ('float32', np.float32(1.)), ('float64', np.float64(1.)), ('pi', np.float64(np.pi)), ('string', "string attribute!"), ('int', 1), ('int8', np.int8(1)), ('int16', np.int16(1)), ('int32', np.int32(1)), ('int64', np.int64(1)), ('uint8', np.uint8(1)), ('uint16', np.uint16(1)), ('uint32', np.uint32(1)), ('uint64', np.uint64(1)), ('bool', True), ('bool_', np.bool_(True)), ]) attr_type_checkers = { "float": lambda x: isinstance(x, float), "float32": lambda x: x.dtype == np.float32, "float64": lambda x: x.dtype == np.float64, "pi": lambda x: x.dtype == np.float64, "string": lambda x: isinstance(x, string_types), "int": lambda x: isinstance(x, int), "int8": lambda x: x.dtype == np.int8, "int16": lambda x: x.dtype == np.int16, "int32": lambda x: x.dtype == np.int32, "int64": lambda x: x.dtype == np.int64, "uint8": lambda x: x.dtype == np.uint8, "uint16": lambda x: x.dtype == np.uint16, "uint32": lambda x: x.dtype == np.uint32, "uint64": lambda x: x.dtype == np.uint64, "bool": lambda x: isinstance(x, bool), "bool_": lambda x: x.dtype == np.bool_, } fpath = os.path.join(temp_dir, 'to_hdf_withattrs.hdf5') to_hdf(data, fpath, attrs=attrs, overwrite=True, warn=False) loaded_data2 = from_hdf(fpath) loaded_attrs = loaded_data2.attrs assert data.keys() == loaded_data2.keys() assert attrs.keys() == loaded_attrs.keys(), \ '\n' + str(attrs.keys()) + '\n' + str(loaded_attrs.keys()) assert recursiveEquality(data, loaded_data2) assert recursiveEquality(attrs, loaded_attrs) for key, val in attrs.items(): tgt_type_checker = attr_type_checkers[key] assert tgt_type_checker(val), \ "key '%s': val '%s' is type '%s'" % \ (key, val, type(loaded_attrs[key])) finally: rmtree(temp_dir) logging.info('<< PASS : test_hdf >>')
if __name__ == "__main__": set_verbosity(1) test_hdf()