Source code for ms_deisotope.data_source.mgf

"""MGF is a simple human-readable format for MS/MS data. It
allows storing MS/MS peak lists and exprimental parameters.

This module provides :class:`MGFLoader`, a :class:`~.RandomAccessScanSource`
implementation.

The parser is based on :mod:`pyteomics.mgf`.
"""

import os
from pyteomics import mgf
from pyteomics.auxiliary import OffsetIndex
import numpy as np

from six import string_types as basestring

from .scan import (
    ScanFileMetadataBase, RandomAccessScanSource, ScanDataSource,
    PrecursorInformation, _FakeGroupedScanIteratorImpl,
    ChargeNotProvided)

from .metadata.file_information import (
    FileInformation, MS_MSn_Spectrum)

from ._compression import test_if_file_has_fast_random_access


class _MGFParser(mgf.IndexedMGF):

    def parse_charge(self, charge_text, list_only=False):
        """Pyteomics _parse_charge is very general-purpose, and
        can't be sped up, so we specialize it here."""
        try:
            if not list_only:
                return int(charge_text.replace('+', ''))
            return list(map(self.parse_charge, charge_text.split(" ")))
        except Exception:
            if '-' in charge_text:
                return int(charge_text.replace("-", '')) * -1
            raise

    def parse_peak_charge(self, charge_text, list_only=False):
        return self.parse_charge(charge_text, list_only=list_only)


class _MGFMetadata(ScanFileMetadataBase):
    """Objects implementing this interface can describe the original source
    files, instrument configuration, and data processing parameters used to
    create the current spectral data file.

    Patterned after the provenance features of mzML that could also be mapped
    onto mzXML and other complete vendor readers.
    """

    def file_description(self):
        """Describe the file and its components, as well
        as any content types it has.

        Returns
        -------
        :class:`~.FileInformation`
        """
        finfo = FileInformation()
        finfo.add_content("centroid spectrum")
        finfo.add_content(MS_MSn_Spectrum)
        if isinstance(self.source_file, (basestring, os.PathLike)):
            finfo.add_file(self.source_file)
        elif hasattr(self.source_file, 'name'):
            finfo.add_file(self.source_file.name)
        return finfo

    def instrument_configuration(self):
        """Describe the different instrument components and configurations used
        to acquire scans in this run.

        Returns
        -------
        :class:`list` of :class:`~.InstrumentInformation`
        """
        return super(_MGFMetadata, self).instrument_configuration()

    def data_processing(self):
        """Describe any preprocessing steps applied to the data described by this
        instance.

        Returns
        -------
        :class:`list` of :class:`~.DataProcessingInformation`
        """
        return super(_MGFMetadata, self).data_processing()


class MGFInterface(ScanDataSource):
    """Provides a basic set of widely used MASCOT Generic File (MGF)
    data accessor mechanisms. Because MGF files lack any form of standardization,
    no strong guarantees of correctness can be made.

    This dialect does not know how to use the charge column of the peak data
    section, see :class:`~.ProcessedMGFLoader`.
    """

    def _scan_arrays(self, scan):
        """Returns raw data arrays for m/z and intensity

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        mz: np.array
            An array of m/z values for this scan
        intensity: np.array
            An array of intensity values for this scan
        """
        try:
            return scan['m/z array'], scan["intensity array"]
        except KeyError:
            return np.array([]), np.array([])

    def _ms_level(self, scan):
        return 2

    def _scan_title(self, scan):
        """Returns a verbose name for this scan, if one
        were stored in the file. Usually includes both the
        scan's id string, as well as information about the
        original file and format.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        str
        """
        return scan['params']["title"].strip('.')

    def _scan_id(self, scan):
        """Returns the scan's id string, a unique
        identifier for this scan in the context of
        the data file it is recordered in

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        str
        """
        return scan['params']["title"].strip('.')

    def _scan_time(self, scan):
        try:
            return float(scan['params']['rtinseconds']) / 60.0
        except KeyError:
            return -1

    def _is_profile(self, scan):
        return False

    def _precursor_information(self, scan):
        mz, intensity = scan['params']['pepmass']
        charge = scan['params'].get('charge', [ChargeNotProvided])[0]
        pinfo = PrecursorInformation(
            mz, intensity, charge, source=self,
            product_scan_id=self._scan_id(scan),
            defaulted=True, orphan=True)
        return pinfo

    def _polarity(self, scan):
        pinfo = self._precursor_information(scan)
        if pinfo is not None:
            if pinfo.charge:
                if pinfo.charge == ChargeNotProvided or pinfo.charge > 0:
                    return 1
                return -1
            return 1
        return 1

    def _activation(self, scan):
        return None

    def _scan_index(self, scan):
        """Returns the base 0 offset from the start
        of the data file in number of scans to reach
        this scan.

        If the original format does not natively include
        an index value, this value may be computed from
        the byte offset index.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        int
        """
        try:
            return self._title_to_index[self._scan_title(scan)]
        except KeyError:
            try:
                return self._title_to_index[self._scan_title(scan) + '.']
            except KeyError:
                return -1
        return -1

    def _annotations(self, scan):
        annots = dict()
        params = scan['params']
        for key, value in params.items():
            if key in ("pepmass", "charge", "title", "rtinseconds"):
                continue
            else:
                try:
                    value = float(value)
                except ValueError:
                    if value == 'None':
                        value = None
            annots[key] = value
        return annots


[docs]class MGFLoader(MGFInterface, RandomAccessScanSource, _MGFMetadata): """Reads scans from MASCOT Generic File (MGF) Format files. Provides both iterative and random access. .. note:: If the file is not sorted by retention time, :meth:`get_scan_by_time` and any other time-based accessors will fail. Attributes ---------- source_file: str Path to file to read from. source: pyteomics.mgf.MGFBase Underlying scan data source header: dict Any top-of-the-file parameters """ def __init__(self, source_file, encoding='utf-8', use_index=True, **kwargs): self.source_file = source_file self.encoding = encoding self._use_index = use_index self._source = self._create_parser() self.initialize_scan_cache() self.make_iterator() self._title_to_index = self._prepare_index_lookup() @property def has_fast_random_access(self): return test_if_file_has_fast_random_access(self.source.file) def _prepare_index_lookup(self): title_to_index = dict() for i, key in enumerate(self.index): title_to_index[key] = i return title_to_index @property def header(self): """Any top-of-the-file parameters Returns ------- dict """ return self._source.header def __reduce__(self): return self.__class__, (self.source_file, self.encoding, self._use_index, )
[docs] def has_msn_scans(self): return True
[docs] def has_ms1_scans(self): return False
def _create_parser(self): if self._use_index: return _MGFParser(self.source_file, read_charges=False, convert_arrays=1, encoding=self.encoding) simple_reader = mgf.MGF( self.source_file, read_charges=False, convert_arrays=1, encoding=self.encoding) simple_reader.index = OffsetIndex() return simple_reader
[docs] def get_scan_by_id(self, scan_id): """Retrieve the scan object for the specified scan id. If the scan object is still bound and in memory somewhere, a reference to that same object will be returned. Otherwise, a new object will be created. Parameters ---------- scan_id : str The unique scan id value to be retrieved Returns ------- Scan """ try: return self.scan_cache[scan_id] except KeyError: pass try: scan = self.source.get_spectrum(scan_id) except KeyError: scan = self.source.get_spectrum(scan_id + '.') scan = self._make_scan(scan) self.scan_cache[scan_id] = scan return scan
[docs] def get_scan_by_index(self, index): """Retrieve the scan object for the specified scan index. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- index: int The index to get the scan for Returns ------- Scan """ if not self._use_index: raise TypeError("This method requires the index. Please pass `use_index=True` during initialization") id_str = self.index.from_index(index) return self.get_scan_by_id(id_str)
[docs] def get_scan_by_time(self, time): """Retrieve the scan object for the specified scan time. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- time : float The time to get the nearest scan from Returns ------- Scan """ if not self._use_index: raise TypeError("This method requires the index. Please pass `use_index=True` during initialization") scan_ids = tuple(self.index) lo = 0 hi = len(scan_ids) best_match = None best_error = float('inf') if time == float('inf'): return self.get_scan_by_id(scan_ids[-1]) while hi != lo: mid = (hi + lo) // 2 sid = scan_ids[mid] scan = self.get_scan_by_id(sid) scan_time = scan.scan_time err = abs(scan_time - time) if err < best_error: best_error = err best_match = scan if scan_time == time: return scan elif (hi - lo) == 1: return best_match elif scan_time > time: hi = mid else: lo = mid if hi == 0 and not self._use_index: raise TypeError("This method requires the index. Please pass `use_index=True` during initialization")
@property def source(self): """The file parser that this reader consumes. """ return self._source @property def index(self): """The byte offset index used to achieve fast random access. Maps :class:`~.ScanBase` IDs to the byte offsets, implying the order the scans reside in the file. Returns ------- :class:`pyteomics.xml.ByteEncodingOrderedDict` """ return self.source.index def __len__(self): return len(self.index)
[docs] def close(self): """Close the underlying reader. """ self._source.close() self._dispose()
[docs] def reset(self): """Reset the object, clearing out any existing state. This resets the underlying file iterator, then calls :meth:`make_iterator`, and clears the scan cache. """ self._source.reset() try: self.source.seek(0) except (IOError, AttributeError): pass self.make_iterator(None) self.initialize_scan_cache()
def _make_default_iterator(self): return iter(self._source)
[docs] def make_iterator(self, iterator=None, grouped=False): """Configure the iterator's behavior. Parameters ---------- iterator : Iterator, optional The iterator to manipulate. If missing, the default iterator will be used. grouped : bool, optional Whether the iterator should be grouped and produce :class:`.ScanBunch` objects or single :class:`.Scan`. Defaults to False """ return super(MGFLoader, self).make_iterator(iterator, grouped)
def _yield_from_index(self, scan_source, start): offset_provider = self.index keys = list(offset_provider.keys()) if start is not None: if isinstance(start, basestring): try: start = keys.index(start) except ValueError: start = keys.index(start + '.') elif isinstance(start, int): start = start else: raise TypeError("Cannot start from object %r" % start) else: start = 0 for key in keys[start:]: yield scan_source.get_by_id(key)
[docs] def start_from_scan(self, scan_id=None, rt=None, index=None, require_ms1=True, grouped=True): """Reconstruct an iterator which will start from the scan matching one of ``scan_id``, ``rt``, or ``index``. Only one may be provided. After invoking this method, the iterator this object wraps will be changed to begin yielding scan bunchs (or single scans if ``grouped`` is ``False``). This method will trigger several random-access operations, making it prohibitively expensive for normally compressed files. Arguments --------- scan_id: str, optional Start from the scan with the specified id. rt: float, optional Start from the scan nearest to specified time (in minutes) in the run. If no exact match is found, the nearest scan time will be found, rounded up. index: int, optional Start from the scan with the specified index. require_ms1: bool, optional Whether the iterator must start from an MS1 scan. True by default. grouped: bool, optional whether the iterator should yield scan bunches or single scans. True by default. """ if scan_id is None: if rt is not None: scan = self.get_scan_by_time(rt) elif index is not None: try: scan = self.get_scan_by_index(index) except IndexError: if index > len(self.index): index = len(self.index) - 1 else: index = 0 scan = self.get_scan_by_index(index) else: raise ValueError("Must provide a scan locator, one of (scan_id, rt, index)") scan_id = scan.id else: scan = self.get_scan_by_id(scan_id) # MGF files do not contain MS1 scans if require_ms1: pass iterator = self._yield_from_index(self._source, scan_id) self.make_iterator(iterator, grouped=grouped) return self
def _scan_group_iterator(self, iterator=None, mode=None): if iterator is None: iterator = self._make_default_iterator() impl = _FakeGroupedScanIteratorImpl( iterator, self._make_scan, self._validate, self._cache_scan) return impl
[docs] def next(self): return next(self._producer)
def _validate(self, scan): return True