psims

Source code for psims.mzmlb.writer

import logging
import io
import numbers
import warnings

from typing import Dict
from collections import Counter

import numpy as np

logging.getLogger('hdf5plugin').addHandler(logging.NullHandler())

import h5py
try:
    import hdf5plugin
except ImportError:
    warnings.warn(
        "hdf5plugin is missing! Only the slower GZIP compression scheme will be available! "
        "Please install hdf5plugin to be able to use Blosc.")
    hdf5plugin = None

from ..mzml.binary_encoding import encode_array_direct, encoding_map, compression_map, dtype_to_encoding, COMPRESSION_NONE
from ..mzml.writer import PlainMzMLWriter as _MzMLWriter, NON_STANDARD_ARRAY, ARRAY_TYPES, Mapping
from ..mzml.index import IndexingStream

from . import components


HDF5_COMPRESSORS = {}

DEFAULT_COMPRESSOR = 'gzip'

if hdf5plugin is not None:
    HDF5_COMPRESSORS = {
        "blosc": hdf5plugin.Blosc(),
        "blosc:lz4": hdf5plugin.Blosc('lz4'),
        "blosc:lz4hc": hdf5plugin.Blosc('lz4hc'),
        "blosc:zlib": hdf5plugin.Blosc('zlib'),
        "blosc:zstd": hdf5plugin.Blosc('zstd'),
    }
    HDF5_COMPRESSORS = {k: dict(v) for k, v in HDF5_COMPRESSORS.items()}
    DEFAULT_COMPRESSOR = 'blosc'

HDF5_COMPRESSORS['zlib'] = HDF5_COMPRESSORS['gzip'] = {'compression': 'gzip', 'compression_opts': 4}


HDF5_COMPRESSOR_MAGIC_NUMBERS_TO_NAME = {
    v['compression']: k for k, v in HDF5_COMPRESSORS.items()
}


class ArrayBuffer(object):
    def __init__(self, dataset, dtype, size=1024 ** 2):
        self.dataset = dataset
        self.dtype = dtype
        self.size = size
        self.buffer = io.BytesIO()
        self.offset = 0

    def add(self, array):
        self.buffer.write(array.tobytes())
        self.check()

    def check(self):
        v = self.buffer.tell()
        if v >= self.size:
            self.flush()

    def flush(self):
        array = np.frombuffer(self.buffer.getvalue(), dtype=self.dtype)
        n = len(array)
        total_size = self.offset + n
        if self.dataset.size < total_size:
            self.dataset.resize((total_size, ))
        self.dataset[self.offset:total_size] = array
        self.offset += n
        self.buffer.seek(0)
        self.buffer.truncate(0)


[docs]class MzMLbWriter(_MzMLWriter): '''A high level API for generating mzMLb HDF5 files from simple Python objects. This class's public interface is identical to :class:`~.IndexedMzMLWriter`, with the exception of those related to HDF5 compression described below. .. note:: Although :mod:`h5py` can read and write through Python file-like objects, if they are used they must be opened in read+write mode to allow the file to be partially re-read during an update to an existing block. Attributes ---------- h5_compression : str A valid HDF5 compressor ID or compression scheme name or :const:`None`. Available compression schemes are "gzip"/"zlib", and if :mod:`hdf5plugin` is installed, "blosc", "blosc:lz4", "blosc:zlib", and "blosc:zstd". All Blosc-based compressors enable byte shuffling. h5_compressor_options : int or tuple The options to provide to the compressor designated by :attr:`h5_compressor`. For "gzip", this a single integer setting the compression level, while Blosc takes a tuple of integers. h5_blocksize : int The number of bytes to include in a single HDF5 data block. Smaller blocks improve random access speed at the expense of compression efficiency and space. Defaults to 2 ** 20, 1MB. buffer_blocks: int The number of array blocks to buffer in memory before syncing to disk to reduce the number of resize operations. This applies to each array independently. Defaults to 10. ''' buffer_blocks: int h5_blocksize: int h5_compression: str h5_compression_options: Dict array_name_cache: Dict array_buffers: Dict[str, ArrayBuffer] offset_tracker: Counter def __init__(self, h5_file, close=None, vocabularies=None, missing_reference_is_error=False, vocabulary_resolver=None, id=None, accession=None, h5_compression=DEFAULT_COMPRESSOR, h5_compression_options=None, h5_blocksize: int=2**20, buffer_blocks: int=10, **kwargs): if h5_compression in HDF5_COMPRESSORS: key = h5_compression h5_compression = HDF5_COMPRESSORS[key]['compression'] if h5_compression_options is None: h5_compression_options = HDF5_COMPRESSORS[key]['compression_opts'] if h5_compression_options is None: h5_compression_options = 4 self.compressor_name = HDF5_COMPRESSOR_MAGIC_NUMBERS_TO_NAME[h5_compression] if not isinstance(h5_file, h5py.File): h5_file = h5py.File(h5_file, 'w') self.xml_buffer = io.BytesIO() outfile = IndexingStream(self.xml_buffer) super(MzMLbWriter, self).__init__( outfile, close, vocabularies, missing_reference_is_error, vocabulary_resolver, id, accession, **kwargs) self.index_builder = outfile self.h5_file = h5_file self.h5_blocksize = h5_blocksize self.h5_compression = h5_compression self.h5_compression_options = h5_compression_options self.buffer_blocks = buffer_blocks self.array_name_cache = {} self.array_buffers = {} self.offset_tracker = Counter() def begin(self): self.h5_file.attrs['compression'] = self.compressor_name return super(MzMLbWriter, self).begin() def end(self, type=None, value=None, traceback=None): close_ = self._close self._close = False super(MzMLbWriter, self).end(type, value, traceback) xml_bytes = self.xml_buffer.getvalue() n = self.create_buffer("mzML", xml_bytes) self.h5_file['mzML'].attrs['version'] = "mzMLb 1.0" for array, z in self.offset_tracker.items(): buff = self.array_buffers[array] buff.flush() if buff.dataset.size != z: self.h5_file[array].resize((z, )) for index in self.index_builder.indices: self._prepare_offset_index(index, index.name, n) self._close = close_ self.h5_file.flush() if self._should_close(): self.close() def _generate_array_name(self, array_type, is_non_standard=False, scope='spectrum', dtype=None): if not is_non_standard: cv_ref, name, accession, term = self.context._resolve_cv_ref(array_type, None, None) key = accession.replace(":", "_") else: key = array_type.replace(" ", "_") tag_name = "{scope}_{key}".format(scope=scope, key=key) if dtype is not None: tag_name += '_' + dtype.__name__ dset = self.h5_file.create_dataset( tag_name, chunks=(self.h5_blocksize, ), shape=(self.h5_blocksize, ), dtype=dtype, compression=self.h5_compression, compression_opts=self.h5_compression_options, maxshape=(None, )) self.array_buffers[tag_name] = ArrayBuffer(dset, dtype, self.h5_blocksize * self.buffer_blocks) return tag_name def _prepare_array(self, array, encoding=32, compression=COMPRESSION_NONE, array_type=None, default_array_length=None, scope='spectrum'): if isinstance(encoding, numbers.Number): _encoding = int(encoding) else: _encoding = encoding dtype = encoding_map[_encoding] array = np.array(array, dtype=dtype) encoded_array = encode_array_direct( array, compression=compression, dtype=dtype) if default_array_length is not None and len(array) != default_array_length: override_length = True else: override_length = False is_non_standard = False params = [] if array_type is not None: params.append(array_type) if isinstance(array_type, Mapping): array_type_ = array_type['name'] else: array_type_ = array_type if array_type_ not in ARRAY_TYPES: is_non_standard = True params.append(NON_STANDARD_ARRAY) params.append(compression_map[compression]) params.append(dtype_to_encoding[dtype]) if isinstance(array_type, dict): if len(array_type) == 1: array_name = next(iter(array_type.keys())) else: array_name = array_type['name'] else: array_name = array_type try: storage_name = self.array_name_cache[array_name, is_non_standard, scope, dtype] except KeyError: storage_name = self._generate_array_name(array_name, is_non_standard, scope, dtype) self.array_name_cache[array_name, is_non_standard, scope, dtype] = storage_name length = len(encoded_array) offset = self.offset_tracker[storage_name] buff = self.array_buffers[storage_name] buff.add(encoded_array) self.offset_tracker[storage_name] += length return self.ExternalBinaryDataArray( external_dataset_name=storage_name, offset=offset, array_length=length, params=params) def _prepare_offset_index(self, index, name, last): '''Prepare an offset index. Parameters ---------- index : :class:`Iterable` of :class:`~.Offset` The offset index records to store name : str The name of the indexed entity, e.g. "spectrum" or "chromatogram" last : int The final offset value at the end of the file or list ''' offset_index_key = "mzML_{name}Index".format(name=name) offset_index_id_ref = "mzML_{name}Index_idRef".format(name=name) id_ref_array = [] offset_array = [] for i, o in index: id_ref_array.append(i) offset_array.append(o.offset) id_ref_array.append(b'') offset_array.append(last) id_ref_array_enc = bytearray(b'\x00'.join(id_ref_array)) self.h5_file.create_dataset( offset_index_key, data=np.array(offset_array), compression=self.h5_compression, compression_opts=self.h5_compression_options) self.h5_file.create_dataset( offset_index_id_ref, data=id_ref_array_enc, compression=self.h5_compression, compression_opts=self.h5_compression_options)
[docs] def create_array(self, data, name, last=None, dtype=np.float32, chunks=True): '''Store a typed data array as a named dataset in the HDF5 file. .. note:: The array should not be textual unless they've already been translated into a byte array with terminal null bytes. Parameters ---------- data : :class:`Iterable` The data to be stored. name : str The name to store the dataset by. last : object, optional A value to associate with the final entry of the array. dtype : type The type of the entries in the array. chunks : bool Whether or not to store the dataset in chunks. ''' n = len(data) value = np.empty(n + 1, dtype=dtype) value[:n] = data value[n] = dtype(last) if last is not None else dtype() self.h5_file.create_dataset( name, data=value, compression=self.h5_compression, compression_opts=self.h5_compression_options, chunks=min(self.h5_blocksize, n) if chunks else None)
[docs] def create_buffer(self, name, content): '''Create a compressed binary buffer with a name and fixed length in the HDF5 file. Parameters ---------- name : str The name of the HDF5 dataset content : bytes-like object The data to store. Must be convertable into a :class:`bytearray`, e.g. through the buffer interface. Returns ------- n : int The size of the buffer written ''' n = len(content) self.h5_file.create_dataset( name, shape=(n, ), chunks=(min(self.h5_blocksize, n), ), dtype=np.int8, data=bytearray(content), compression=self.h5_compression, compression_opts=self.h5_compression_options) return n