Source code for pisa.utils.jsons

"""
A set of utilities for reading (and instantiating) objects from and writing
objects to JSON files.
"""


from __future__ import absolute_import, division

import bz2
from collections import OrderedDict
from collections.abc import Iterable, Mapping, Sequence
from numbers import Integral, Number, Real
import os
import tempfile

import numpy as np
from packaging import version
import simplejson as json
from six import string_types

from pisa import ureg
from pisa.utils.log import logging, set_verbosity

__all__ = [
    'JSON_EXTS',
    'ZIP_EXTS',
    'XOR_EXTS',
    'json_string',
    'dumps',
    'loads',
    'from_json',
    'to_json',
    'NumpyEncoder',
    'NumpyDecoder',
    'test_to_json_from_json',
]

__author__ = 'S. Boeser, J.L. Lanfranchi'

__license__ = '''Copyright (c) 2014-2019, The IceCube Collaboration

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.'''


JSON_EXTS = ['json']
ZIP_EXTS = ['bz2']
XOR_EXTS = ['xor']


[docs] def json_string(string): """Decode a json string""" return json.loads(string)
[docs] def dumps(content, indent=2): """Dump object to JSON-encoded string""" return json.dumps(content, cls=NumpyEncoder, indent=indent, sort_keys=False)
[docs] def loads(s): """Load (create) object from JSON-encoded string""" return json.loads(s, cls=NumpyDecoder)
[docs] def from_json(filename, cls=None): """Open a file in JSON format (optionally compressed with bz2 or xor-scrambled) and parse the content into Python objects. Parameters ---------- filename : str cls : class (type) object, optional If provided, the class is attempted to be instantiated as described in Notes section. Returns ------- contents_or_obj : simple Python objects or `cls` instantiated therewith Notes ----- If `cls` is provided as a class (type) object, this function attempts to instantiate the class with the data loaded from the JSON file, as follows: * if `cls` has a `from_json` method, that is called directly: .. :: cls.from_json(filename) * if the data loaded from the JSON file is a non-string sequence: .. :: cls(*data) * if the data loaded is a Mapping (dict, OrderedDict, etc.): .. :: cls(**data) * for all other types loaded from the JSON: .. :: cls(data) Note that this currently only recognizes files by their extensions. I.e., the file must be named .. :: myfile.json myfile.json.bz2 myfile.json.xor represent a bsic JSON file, a bzip-compressed JSON, and an xor-scrambled JSON, respectively. """ # Import here to avoid circular imports from pisa.utils.log import logging from pisa.utils.resources import open_resource if cls is not None: if not isinstance(cls, type): raise TypeError( "`cls` should be a class object (type); got {} instead".format( type(cls) ) ) if hasattr(cls, "from_json"): return cls.from_json(filename) # Otherwise, handle instantiating the class generically (which WILL # surely fail for many types) based on the type of the object loaded # from JSON file: Mapping is passed via cls(**data), non-string # Sequence is passed via cls(*data), and anything else is passed via # cls(data) _, ext = os.path.splitext(filename) ext = ext.replace('.', '').lower() assert ext in JSON_EXTS or ext in ZIP_EXTS + XOR_EXTS try: if ext == 'bz2': fobj = open_resource(filename, 'rb') try: bz2_content = fobj.read() finally: fobj.close() decompressed = bz2.decompress(bz2_content).decode() del bz2_content content = json.loads( decompressed, cls=NumpyDecoder, object_pairs_hook=OrderedDict ) del decompressed elif ext == 'xor': with open(filename, 'rb') as infile: encrypted_bytes = infile.read() # decrypt with key 42 decypted_bytes = bytearray() for byte in encrypted_bytes: decypted_bytes.append(byte ^ 42) content = json.loads(decypted_bytes.decode(), cls=NumpyDecoder, object_pairs_hook=OrderedDict) else: fobj = open_resource(filename) try: content = json.load( fobj, cls=NumpyDecoder, object_pairs_hook=OrderedDict, ) finally: fobj.close() except: logging.error('Failed to load JSON, `filename`="%s"', filename) raise if cls is None: return content if isinstance(content, Mapping): return cls(**content) if not isinstance(string_types) and isinstance(content, Sequence): return cls(*content) return cls(content)
[docs] def to_json(content, filename, indent=2, overwrite=True, warn=True, sort_keys=False): """Write `content` to a JSON file at `filename`. Uses a custom parser that automatically converts numpy arrays to lists. If `filename` has a ".bz2" extension, the contents will be compressed (using bz2 and highest-level of compression, i.e., -9). If `filename` has a ".xor" extension, the contents will be xor-scrambled to make them human-unreadable (this is useful for, e.g., blind fits). Parameters ---------- content : obj Object to be written to file. Tries making use of the object's own `to_json` method if it exists. filename : str Name of the file to be written to. Extension has to be 'json' or 'bz2'. indent : int Pretty-printing. Cf. documentation of json.dump() or json.dumps() overwrite : bool Set to `True` (default) to allow overwriting existing file. Raise exception and quit otherwise. warn : bool Issue a warning message if a file is being overwritten (`True`, default). Suppress warning by setting to `False` (e.g. when overwriting is the desired behaviour). sort_keys : bool Output of dictionaries will be sorted by key if set to `True`. Default is `False`. Cf. json.dump() or json.dumps(). """ # Import here to avoid circular imports from pisa.utils.fileio import check_file_exists from pisa.utils.log import logging if hasattr(content, 'to_json'): return content.to_json(filename, indent=indent, overwrite=overwrite, warn=warn, sort_keys=sort_keys) check_file_exists(fname=filename, overwrite=overwrite, warn=warn) _, ext = os.path.splitext(filename) ext = ext.replace('.', '').lower() assert ext == 'json' or ext in ZIP_EXTS + XOR_EXTS with open(filename, 'wb') as outfile: if ext == 'bz2': outfile.write( bz2.compress( json.dumps( content, outfile, indent=indent, cls=NumpyEncoder, sort_keys=sort_keys, allow_nan=True, ignore_nan=False ).encode() ) ) elif ext == 'xor': json_bytes = json.dumps( content, indent=indent, cls=NumpyEncoder, sort_keys=sort_keys, allow_nan=True, ignore_nan=False ).encode() # encrypt with key 42 encrypted_bytes = bytearray() for byte in json_bytes: encrypted_bytes.append(byte ^ 42) outfile.write(encrypted_bytes) else: outfile.write( json.dumps( content, indent=indent, cls=NumpyEncoder, sort_keys=sort_keys, allow_nan=True, ignore_nan=False ).encode() ) logging.debug('Wrote %.2f kiB to %s', outfile.tell()/1024., filename)
# TODO: figure out how to serialize / deserialize scalars and arrays with # uncertainties
[docs] class NumpyEncoder(json.JSONEncoder): """ Subclass of ::class::`json.JSONEncoder` that overrides `default` method to allow writing numpy arrays and other special objects PISA uses to JSON files. """
[docs] def default(self, obj): # pylint: disable=method-hidden """Encode special objects to be representable as JSON.""" if hasattr(obj, 'serializable_state'): return obj.serializable_state if isinstance(obj, string_types): return obj if isinstance(obj, ureg.Quantity): converted = [self.default(x) for x in obj.to_tuple()] return converted # must have checked for & handled strings prior to this or infinite # recursion will result if isinstance(obj, Iterable): return [self.default(x) for x in obj] if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) # NOTE: np.bool_ is *Numpy* scalar bool type if isinstance(obj, np.bool_): return bool(obj) # NOTE: we check for these more generic types _after_ checking for # np.bool_ since np.bool_ is considered to be both Integral and Real, # but we want a boolean values (True or False) written out as such if isinstance(obj, Integral): return int(obj) if isinstance(obj, Real): return float(obj) if isinstance(obj, string_types): return obj # If we get here, we have a type that cannot be serialized. This call # should simply raise an exception. return super().default(obj)
if version.parse(json.__version__) >= version.parse("3.19.1"): class NumpyDecoder(json.JSONDecoder): """Decode JSON array(s) as numpy.ndarray; also returns python strings instead of unicode.""" def __init__( self, encoding=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, strict=True, allow_nan=True, object_pairs_hook=None, ): super().__init__( encoding=encoding, object_hook=object_hook, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, strict=strict, allow_nan=allow_nan, object_pairs_hook=object_pairs_hook, ) # Only need to override the default array handler self.parse_array = self.json_array_numpy self.scan_once = json.scanner.py_make_scanner(self) def json_array_numpy(self, s_and_end, scan_once, **kwargs): """Interpret arrays (lists by default) as numpy arrays where this does not yield a string or object array; also handle conversion of particularly-formatted input to pint Quantities.""" # Use the default array parser to get list-ified version of the data values, end = json.decoder.JSONArray(s_and_end, scan_once, **kwargs) # Assumption for all below logic is the result is a Sequence (i.e., has # attribute `__len__`) assert isinstance(values, Sequence), str(type(values)) + "\n" + str(values) if len(values) == 0: return values, end try: # -- Check for pint quantity -- # if ( isinstance(values, ureg.Quantity) or any(isinstance(val, ureg.Quantity) for val in values) ): return values, end # Quantity tuple (`quantity.to_tuple()`) with a scalar produces from # the raw JSON, e.g., # # [9.8, [['meter', 1.0], ['second', -2.0]]] # # or an ndarray (here of shape (2, 3)) produces from the raw JSON, # e.g., # # [[[0, 1, 2], [2, 3, 4]], [['meter', 1.0], ['second', -2.0]]] # if ( len(values) == 2 and isinstance(values[1], Sequence) and all( isinstance(subval, Sequence) and len(subval) == 2 and isinstance(subval[0], string_types) and isinstance(subval[1], Number) for subval in values[1] ) ): values = ureg.Quantity.from_tuple(values) return values, end # Units part of quantity tuple (`quantity.to_tuple()[1]`) # e.g. m / s**2 is represented as .. :: # # [['meter', 1.0], ['second', -2.0]] # # --> Simply return, don't perform further conversion if ( isinstance(values[0], Sequence) and all( len(subval) == 2 and isinstance(subval[0], string_types) and isinstance(subval[1], Number) for subval in values ) ): return values, end # Individual unit (`quantity.to_tuple()[1][0]`) # e.g. s^-2 is represented as .. :: # # ['second', -2.0] # # --> Simply return, don't perform further conversion if ( len(values) == 2 and isinstance(values[0], string_types) and isinstance(values[1], Number) ): return values, end try: ndarray_values = np.asarray(values) except ValueError: return values, end # Things like lists of dicts, or mixed types, will result in an # object array; these are handled in PISA as lists, not numpy # arrays, so return the pre-converted (list) version of `values`. # # Similarly, sequences of strings should stay lists of strings, not # become numpy arrays. if issubclass(ndarray_values.dtype.type, (np.object_, np.str_, str)): return values, end return ndarray_values, end except TypeError: return values, end else: # without allow_nan parameter
[docs] class NumpyDecoder(json.JSONDecoder): """Decode JSON array(s) as numpy.ndarray; also returns python strings instead of unicode.""" def __init__( self, encoding=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, strict=True, object_pairs_hook=None, ): super().__init__( encoding=encoding, object_hook=object_hook, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, strict=strict, object_pairs_hook=object_pairs_hook, ) # Only need to override the default array handler self.parse_array = self.json_array_numpy self.scan_once = json.scanner.py_make_scanner(self)
[docs] def json_array_numpy(self, s_and_end, scan_once, **kwargs): """Interpret arrays (lists by default) as numpy arrays where this does not yield a string or object array; also handle conversion of particularly-formatted input to pint Quantities.""" # Use the default array parser to get list-ified version of the data values, end = json.decoder.JSONArray(s_and_end, scan_once, **kwargs) # Assumption for all below logic is the result is a Sequence (i.e., has # attribute `__len__`) assert isinstance(values, Sequence), str(type(values)) + "\n" + str(values) if len(values) == 0: return values, end try: # -- Check for pint quantity -- # if ( isinstance(values, ureg.Quantity) or any(isinstance(val, ureg.Quantity) for val in values) ): return values, end # Quantity tuple (`quantity.to_tuple()`) with a scalar produces from # the raw JSON, e.g., # # [9.8, [['meter', 1.0], ['second', -2.0]]] # # or an ndarray (here of shape (2, 3)) produces from the raw JSON, # e.g., # # [[[0, 1, 2], [2, 3, 4]], [['meter', 1.0], ['second', -2.0]]] # if ( len(values) == 2 and isinstance(values[1], Sequence) and all( isinstance(subval, Sequence) and len(subval) == 2 and isinstance(subval[0], string_types) and isinstance(subval[1], Number) for subval in values[1] ) ): values = ureg.Quantity.from_tuple(values) return values, end # Units part of quantity tuple (`quantity.to_tuple()[1]`) # e.g. m / s**2 is represented as .. :: # # [['meter', 1.0], ['second', -2.0]] # # --> Simply return, don't perform further conversion if ( isinstance(values[0], Sequence) and all( len(subval) == 2 and isinstance(subval[0], string_types) and isinstance(subval[1], Number) for subval in values ) ): return values, end # Individual unit (`quantity.to_tuple()[1][0]`) # e.g. s^-2 is represented as .. :: # # ['second', -2.0] # # --> Simply return, don't perform further conversion if ( len(values) == 2 and isinstance(values[0], string_types) and isinstance(values[1], Number) ): return values, end try: ndarray_values = np.asarray(values) except ValueError: return values, end # Things like lists of dicts, or mixed types, will result in an # object array; these are handled in PISA as lists, not numpy # arrays, so return the pre-converted (list) version of `values`. # # Similarly, sequences of strings should stay lists of strings, not # become numpy arrays. if issubclass(ndarray_values.dtype.type, (np.object_, np.str_, str)): return values, end return ndarray_values, end except TypeError: return values, end
# TODO: include more basic types in testing (strings, etc.)
[docs] def test_to_json_from_json(): """Unit tests for writing various types of objects to and reading from JSON files (including bz2-compressed and xor-scrambled files)""" # pylint: disable=unused-variable from shutil import rmtree from pisa.utils.comparisons import recursiveEquality proto_float_array = np.array( [-np.inf, np.nan, np.inf, -1.1, 0.0, 1.1], dtype=np.float64 ) proto_int_array = np.array([-2, -1, 0, 1, 2], dtype=np.int64) proto_str_array = np.array(['a', 'ab', 'abc', '', ' '], dtype=str) floating_types = [float] + sorted( set(t for _, t in np.sctypeDict.items() if issubclass(t, np.floating)), key=str, ) integer_types = [int] + sorted( set(t for _, t in np.sctypeDict.items() if issubclass(t, np.integer)), key=str, ) test_info = [ dict( proto_array=proto_float_array, dtypes=floating_types, ), dict( proto_array=proto_int_array, dtypes=integer_types, ), # TODO: strings currently do not work #dict( # proto_array=proto_str_array, # dtypes=[str, np.str0, np.str_, np.string_], #), ] test_data = OrderedDict() for info in test_info: proto_array = info['proto_array'] for dtype in info['dtypes']: typed_array = proto_array.astype(dtype) s_dtype = str(np.dtype(dtype)) test_data["array_" + s_dtype] = typed_array test_data["scalar_" + s_dtype] = dtype(typed_array[0]) temp_dir = tempfile.mkdtemp() try: for name, obj in test_data.items(): # Test that the object can be written / read directly base_fname = os.path.join(temp_dir, name + '.json') for ext in ['', '.bz2', '.xor']: fname = base_fname + ext to_json(obj, fname) loaded_data = from_json(fname) if obj.dtype in floating_types: assert np.allclose( loaded_data, obj, rtol=1e-12, atol=0, equal_nan=True ), '{}=\n{}\nloaded=\n{}\nsee file: {}'.format( name, obj, loaded_data, fname ) else: assert np.all(loaded_data == obj), \ '{}=\n{}\nloaded_nda=\n{}\nsee file: {}'.format( name, obj, loaded_data, fname ) # Test that the same object can be written / read as a value in a # dictionary orig = OrderedDict([(name, obj), (name + "x", obj)]) base_fname = os.path.join(temp_dir, 'd.{}.json'.format(name)) for ext in ['', '.bz2', '.xor']: fname = base_fname + ext to_json(orig, fname) loaded = from_json(fname) assert recursiveEquality(loaded, orig), \ 'orig=\n{}\nloaded=\n{}\nsee file: {}'.format( orig, loaded, fname ) finally: rmtree(temp_dir) logging.info('<< PASS : test_to_json_from_json >>')
if __name__ == '__main__': set_verbosity(1) test_to_json_from_json()