Source code for ms_deisotope.data_source.scan.loader

"""A collection of common base classes for types that
load data for :class:`~.Scan` objects.
"""
import abc
import logging
import os
from typing import Any, Dict, Hashable, Iterator, List, Optional, Tuple, Union, Generic, TypeVar

from weakref import WeakValueDictionary

from six import string_types as basestring

import numpy as np

from ms_deisotope.utils import add_metaclass

from ms_deisotope.utils import Constant
from ms_deisotope.data_source.metadata.file_information import FileInformation
from ms_deisotope.data_source.metadata.activation import ActivationInformation
from ms_deisotope.data_source.metadata.instrument_components import InstrumentInformation
from ms_deisotope.data_source.metadata.data_transformation import DataProcessingInformation
from ms_deisotope.data_source.metadata.software import Software
from ms_deisotope.data_source.metadata.scan_traits import IsolationWindow, ScanAcquisitionInformation
from ms_deisotope.data_source._compression import MaybeFastRandomAccess

from .base import PrecursorInformation, ScanBase, ScanBunch
from .scan import Scan
from .scan_iterator import (
    _ScanIteratorImplBase,
    _SingleScanIteratorImpl,
    _GroupedScanIteratorImpl,
    _FakeGroupedScanIteratorImpl,
    _InterleavedGroupedScanIteratorImpl,
    ITERATION_MODE_GROUPED,
    ITERATION_MODE_SINGLE,
    MSEIterator)


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


DataPtrType = TypeVar("DataPtrType")
ScanType = TypeVar("ScanType", bound=ScanBase)


[docs]@add_metaclass(abc.ABCMeta) class ScanDataSource(Generic[DataPtrType, ScanType]): """An Abstract Base Class describing an object which can provide a consistent set of accessors for a particular format of mass spectrometry data. Data files come in many shapes and sizes, with different underlying structures. This class provides an API that should make features as consistent as possible to clients of :class:`Scan` objects. """ def _make_scan(self, data: DataPtrType) -> ScanType: return Scan(data, self) def _pick_peaks_vendor(self, scan: DataPtrType, *args, **kwargs): """Invoke the underlying data access library's peak picking procedure. Not available for open format readers, where behavior will default to the :mod:`ms_peak_picker` algorithm. Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- peak_set: :class:`ms_peak_picker.PeakIndex` Raises ------ NotImplementedError: When there is no method available for the given scan and/or data source """ raise NotImplementedError() @abc.abstractmethod def _scan_arrays(self, scan: DataPtrType) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, Dict[str, np.ndarray]]]: """Returns raw data arrays for m/z and intensity Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- mz: np.array An array of m/z values for this scan intensity: np.array An array of intensity values for this scan """ raise NotImplementedError() @abc.abstractmethod def _precursor_information(self, scan: DataPtrType) -> Optional[PrecursorInformation]: """Returns information about the precursor ion, if any, that this scan was derived form. Returns `None` if this scan has no precursor ion Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- PrecursorInformation """ raise NotImplementedError() @abc.abstractmethod def _scan_title(self, scan: DataPtrType) -> str: """Returns a verbose name for this scan, if one was stored in the file. Usually includes both the scan's id string, as well as information about the original file and format. Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- str """ raise NotImplementedError() @abc.abstractmethod def _scan_id(self, scan: DataPtrType) -> str: """Returns the scan's id string, a unique identifier for this scan in the context of the data file it is recordered in Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- str """ raise NotImplementedError() @abc.abstractmethod def _scan_index(self, scan: DataPtrType) -> int: """Returns the base 0 offset from the start of the data file in number of scans to reach this scan. If the original format does not natively include an index value, this value may be computed from the byte offset index. Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- int """ raise NotImplementedError() @abc.abstractmethod def _ms_level(self, scan: DataPtrType) -> int: """Returns the degree of exponential fragmentation used to produce this scan. 1 refers to a survey scan of unfragmented ions, 2 refers to a tandem scan derived from an ms level 1 ion, and so on. Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- int """ raise NotImplementedError() @abc.abstractmethod def _scan_time(self, scan: DataPtrType) -> float: """Returns the time in minutes from the start of data acquisition to when this scan was acquired. Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- float """ raise NotImplementedError() @abc.abstractmethod def _is_profile(self, scan: DataPtrType) -> bool: """Returns whether the scan contains profile data (`True`) or centroided data (`False`). Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- bool """ raise NotImplementedError() @abc.abstractmethod def _polarity(self, scan: DataPtrType) -> int: """Returns whether this scan was acquired in positive mode (+1) or negative mode (-1). Parameters ---------- scan : Mapping The underlying scan information storage, usually a `dict` Returns ------- int """ raise NotImplementedError() @abc.abstractmethod def _activation(self, scan: DataPtrType) -> Optional[ActivationInformation]: """Returns information about the activation method used to produce this scan, if any. Returns :const:`None` for MS1 scans Parameters ---------- scan : Mapping The underlying scan information storage, usually a :class:`dict` Returns ------- ActivationInformation """ raise NotImplementedError() def _acquisition_information(self, scan: DataPtrType) -> Optional[ScanAcquisitionInformation]: return None def _isolation_window(self, scan: DataPtrType) -> Optional[IsolationWindow]: return None def _instrument_configuration(self, scan: DataPtrType) -> Optional[InstrumentInformation]: return None def _annotations(self, scan: DataPtrType) -> Dict[str, Any]: return dict() @property def source_file_name(self) -> Optional[str]: """Return the name of the file that backs this data source, if available. Returns ------- :class:`str` or :const:`None` """ try: file_ = self.source_file except AttributeError: return None if isinstance(file_, (basestring, os.PathLike)): return file_ try: name = file_.name except AttributeError: return None return name
[docs] def close(self): """Close the underlying scan data stream, which may be a file or other system resource. A closed data source may not be able to serve data requests, but not all :class:`ScanDataSource` implementations require the data stream be open for all operations. """
def _dispose(self): pass def __enter__(self): return self def __exit__(self, type, exc_value, traceback): self.close()
[docs]@add_metaclass(abc.ABCMeta) class ScanIterator(ScanDataSource[DataPtrType, ScanType]): """An Abstract Base Class that extends ScanDataSource with additional requirements that enable clients of the class to treat the object as an iterator over the underlying data file. Attributes ---------- iteration_mode: str A string denoting :const:`~.ITERATION_MODE_GROUPED` or :const:`~.ITERATION_MODE_SINGLE` that controls whether :class:`~.ScanBunch` or :class:`~.Scan` are produced by iteration. """ iteration_mode = ITERATION_MODE_GROUPED
[docs] def has_ms1_scans(self) -> bool: """Checks if this :class:`ScanDataSource` contains MS1 spectra. Returns ------- :class:`bool` or :const:`None` Returns a boolean value if the presence of MS1 scans is known for certain, or :const:`None` if it cannot be determined in the case of missing metadata. """ return True
[docs] def has_msn_scans(self) -> bool: """Checks if this :class:`ScanDataSource` contains MSn spectra. Returns ------- :class:`bool` or :const:`None` Returns a boolean value if the presence of MSn scans is known for certain, or :const:`None` if it cannot be determined in the case of missing metadata. """ return True
[docs] @abc.abstractmethod def next(self) -> Union[ScanType, ScanBunch]: """Advance the iterator, fetching the next :class:`~.ScanBunch` or :class:`~.ScanBase` depending upon iteration strategy. Returns ------- :class:`~.ScanBunch` or :class:`~.ScanBase` """ raise NotImplementedError()
def __next__(self) -> Union[ScanType, ScanBunch]: """Advance the iterator, fetching the next :class:`~.ScanBunch` or :class:`~.ScanBase` depending upon iteration strategy. Returns ------- :class:`~.ScanBunch` or :class:`~.ScanBase` """ return self.next() def __iter__(self) -> Iterator[Union[ScanType, ScanBunch]]: return self
[docs] def reset(self): """Reset the iterator, if possible, and clear any caches. """ raise NotImplementedError()
def _dispose(self): extant = len(self.scan_cache) if extant > 0: logger.info("Disposing of %s with %d extant scans attached to it.", self, extant) for _key, value in list(self.scan_cache.items()): value.clear() self.scan_cache.clear() # Break reference cycle between the iterator and the iteration strategy self._producer = iter([]) @abc.abstractmethod def _make_default_iterator(self): """Set up the default iterator for the :class:`ScanIterator`. """ raise NotImplementedError()
[docs] def make_iterator(self, iterator=None, grouped=None, **kwargs) -> 'ScanIterator': """Configure the :class:`ScanIterator`'s behavior, selecting it's iteration strategy over either its default iterator or the provided ``iterator`` argument. Parameters ---------- iterator : Iterator, optional The iterator to manipulate. If missing, the default iterator will be used. grouped : bool, optional Whether the iterator should be grouped and produce :class:`.ScanBunch` objects or single :class:`.Scan`. If :const:`None` is passed, :meth:`has_ms1_scans` will be be used instead. Defaults to :const:`None`. """ if grouped is None: grouped = self.has_ms1_scans() if grouped: self._producer = self._scan_group_iterator(iterator, grouped, **kwargs) self.iteration_mode = ITERATION_MODE_GROUPED else: self._producer = self._single_scan_iterator(iterator, grouped, **kwargs) self.iteration_mode = ITERATION_MODE_SINGLE return self
def _make_cache_key(self, scan) -> Hashable: return scan.id def _cache_scan(self, scan): key = self._make_cache_key(scan) self._scan_cache[key] = scan return key def _validate(self, scan) -> bool: return True def _single_scan_iterator(self, iterator: Iterator=None, mode=None, **kwargs) -> _SingleScanIteratorImpl: if iterator is None: iterator = self._make_default_iterator() impl = _SingleScanIteratorImpl.from_scan_source(iterator, self, **kwargs) return impl def _scan_group_iterator(self, iterator: Iterator=None, mode=None, **kwargs) -> Union[_InterleavedGroupedScanIteratorImpl, _FakeGroupedScanIteratorImpl]: if iterator is None: iterator = self._make_default_iterator() if isinstance(mode, _ScanIteratorImplBase): impl = mode.from_scan_source(iterator, self, **kwargs) return impl elif callable(mode): impl = mode(iterator, self, **kwargs) return impl elif mode == "mse": impl = MSEIterator.from_scan_source(iterator, self, **kwargs) return impl elif self.has_ms1_scans(): impl = _InterleavedGroupedScanIteratorImpl.from_scan_source(iterator, self, **kwargs) else: impl = _FakeGroupedScanIteratorImpl.from_scan_source(iterator, self, **kwargs) return impl def _scan_cleared(self, scan): self.scan_cache.pop(self._make_cache_key(scan), None)
[docs] def initialize_scan_cache(self): """Initialize a cache which keeps track of which :class:`~.Scan` objects are still in memory using a :class:`weakref.WeakValueDictionary`. When a scan is requested, if the scan object is found in the cache, the existing object is returned rather than re-read from disk. """ self._scan_cache = WeakValueDictionary()
@property def scan_cache(self): """A :class:`weakref.WeakValueDictionary` mapping used to retrieve scans from memory if available before re-reading them from disk. """ return self._scan_cache @scan_cache.setter def scan_cache(self, value): self._scan_cache = value
[docs]@add_metaclass(abc.ABCMeta) class RandomAccessScanSource(ScanIterator[DataPtrType, ScanType]): """An Abstract Base Class that extends ScanIterator with additional requirements that the implementation support random access to individual scans. This should be doable by unique identifier, sequential index, or by scan time. """ @property def has_fast_random_access(self) -> Constant: """Check whether the underlying data stream supports fast random access or not. Even if the file format supports random access, it may be impractical due to overhead in parsing the underlying data stream, e.g. calling :meth:`gzip.GzipFile.seek` can force the file to be decompressed from the *beginning of the file* on each call. This property can be used to signal to the caller whether or not it should use a different strategy. Returns ------- :class:`Constant`: One of :data:`~.DefinitelyNotFastRandomAccess`, :data:`~.MaybeFastRandomAccess`, or :data:`~.DefinitelyFastRandomAccess`. The first is a False-y value, the latter two will evaluate to :const:`True` """ return MaybeFastRandomAccess
[docs] @abc.abstractmethod def get_scan_by_id(self, scan_id: str) -> ScanType: """Retrieve the scan object for the specified scan id. If the scan object is still bound and in memory somewhere, a reference to that same object will be returned. Otherwise, a new object will be created. Parameters ---------- scan_id : str The unique scan id value to be retrieved Returns ------- Scan """ raise NotImplementedError()
[docs] @abc.abstractmethod def get_scan_by_time(self, time: float) -> ScanType: """Retrieve the scan object for the specified scan time. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- time : float The time to get the nearest scan from Returns ------- Scan """ raise NotImplementedError()
[docs] @abc.abstractmethod def get_scan_by_index(self, index: int) -> ScanType: """Retrieve the scan object for the specified scan index. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- index: int The index to get the scan for Returns ------- Scan """ raise NotImplementedError()
[docs] @abc.abstractmethod def start_from_scan(self, scan_id: Optional[str]=None, rt: Optional[float]=None, index: Optional[int]=None, require_ms1: bool=True, grouped=True, **kwargs) -> 'RandomAccessScanSource': """Reconstruct an iterator which will start from the scan matching one of ``scan_id``, ``rt``, or ``index``. Only one may be provided. After invoking this method, the iterator this object wraps will be changed to begin yielding scan bunchs (or single scans if ``grouped`` is ``False``). This method will trigger several random-access operations, making it prohibitively expensive for normally compressed files. Arguments --------- scan_id: str, optional Start from the scan with the specified id. rt: float, optional Start from the scan nearest to specified time (in minutes) in the run. If no exact match is found, the nearest scan time will be found, rounded up. index: int, optional Start from the scan with the specified index. require_ms1: bool, optional Whether the iterator must start from an MS1 scan. True by default. grouped: bool, optional whether the iterator should yield scan bunches or single scans. True by default. """ raise NotImplementedError()
def _locate_ms1_scan(self, scan: ScanType, search_range: int=150) -> Optional[ScanType]: i = 0 initial_scan = scan if (self.has_ms1_scans() is False): raise IndexError('Cannot locate MS1 Scan') while scan.ms_level != 1 and i < search_range: i += 1 if scan.index <= 0: break scan = self.get_scan_by_index(scan.index - 1) if scan.ms_level == 1: return scan scan = initial_scan i = 0 while scan.ms_level != 1 and i < search_range: i += 1 try: scan = self.get_scan_by_index(scan.index + 1) except IndexError: raise IndexError("Cannot locate MS1 Scan") return scan
[docs] def find_previous_ms1(self, start_index: int) -> Optional[ScanType]: """Locate the MS1 scan preceding ``start_index``, iterating backwards through scans until either the first scan is reached or an MS1 scan is found. Returns ------- :class:`~.ScanBase` or :const:`None` if not found """ if self.has_ms1_scans() is False: return None index = start_index - 1 while index >= 0: try: scan = self.get_scan_by_index(index) if scan.ms_level == 1: return scan index -= 1 except (IndexError, KeyError): return None return None
[docs] def find_next_ms1(self, start_index: int) -> Optional[ScanType]: """Locate the MS1 scan following ``start_index``, iterating forwards through scans until either the last scan is reached or an MS1 scan is found. Returns ------- :class:`~.ScanBase` or :const:`None` if not found """ if self.has_ms1_scans() is False: return None index = start_index + 1 n = len(self.index) while index < n: try: scan = self.get_scan_by_index(index) if scan.ms_level == 1: return scan index += 1 except (IndexError, KeyError): return None return None
@abc.abstractmethod def __len__(self) -> int: raise NotImplementedError() def __getitem__(self, i: int) -> Union[ScanType, List[ScanType]]: """Retrieve the scan object for the specified scan index. This internally calls :meth:`get_scan_by_index` but supports slicing and negative indexing. Parameters ---------- index: int The index to get the scan for Returns ------- Scan """ if isinstance(i, slice): n = len(self) scans = [] start, stop, step = i.indices(n) for i in range(start, stop, step): scans.append(self[i]) return scans elif i < 0: n = len(self) i = n + i return self.get_scan_by_index(i) @property def time(self): """A indexer facade that lets you index and slice by scan time. Returns ------- TimeIndex """ return TimeIndex(self)
class TimeIndex(object): """A facade that translates ``[x]`` into scan time access, and supports slicing over a time range. """ def __init__(self, scan_loader): self.scan_loader = scan_loader def is_sorted_by_time(self): lo = 0 hi = len(self.scan_loader) last = 0.0 for i in range(lo, hi): current = self.scan_loader[i].scan_time if last <= current: last = current else: return False return True def __len__(self): return len(self.scan_loader) def __getitem__(self, time): if isinstance(time, slice): start_scan = self.scan_loader.get_scan_by_time(time.start) end_scan = self.scan_loader.get_scan_by_time(time.stop) return self.scan_loader[start_scan.index:end_scan.index] else: return self.scan_loader.get_scan_by_time(time) @add_metaclass(abc.ABCMeta) class ScanFileMetadataBase(object): """Objects implementing this interface can describe the original source files, instrument configuration, and data processing parameters used to create the current spectral data file. Patterned after the provenance features of mzML that could also be mapped onto mzXML and other complete vendor readers. """ @abc.abstractmethod def file_description(self): """Describe the file and its components, as well as any content types it has. Returns ------- :class:`~.FileInformation` """ return FileInformation() @property def id_format(self): file_desc = self.file_description() return file_desc.id_format @abc.abstractmethod def instrument_configuration(self) -> List[InstrumentInformation]: """Describe the different instrument components and configurations used to acquire scans in this run. Returns ------- :class:`list` of :class:`~.InstrumentInformation` """ return [] @abc.abstractmethod def data_processing(self) -> List[DataProcessingInformation]: """Describe any preprocessing steps applied to the data described by this instance. Returns ------- :class:`list` of :class:`~.DataProcessingInformation` """ return [] def software_list(self) -> List[Software]: """Describe any software used on the data described by this instance. Returns ------- :class:`list` of :class:`~.Software` """ return []