Source code for ms_deisotope.data_source.mzxml

"""**mzXML** is a standard XML-format for raw mass spectrometry data storage created
by the Institute for Systems Biology, intended to be replaced with **mzML**.
This module provides :class:`MzXMLLoader`, a :class:`~.RandomAccessScanSource`
implementation.

The parser is based on :mod:`pyteomics.mzxml`.
"""
from typing import List
from six import string_types as basestring

import numpy as np
from pyteomics import mzxml
from .common import (
    PrecursorInformation, ScanDataSource, ChargeNotProvided,
    ActivationInformation, IsolationWindow, ScanAcquisitionInformation,
    ScanEventInformation, ScanWindow,
    ComponentGroup, component, InstrumentInformation,
    FileInformation, ScanFileMetadataBase)
from .metadata import data_transformation, file_information, software
from .xml_reader import (
    XMLReaderBase, iterparse_until)


class _MzXMLParser(mzxml.MzXML):
    pass


scan_number_only_id_format = file_information.id_format("MS:1000776")


class _MzXMLMetadataLoader(ScanFileMetadataBase):
    def file_description(self):
        """Read the file provenance from the ``<parentFile>`` tags
        if any are present.

        This returns no information about the file's contents as this
        was not part of the mzXML schema

        Returns
        -------
        FileInformation
            The description of the file's  sources
        """
        file_info = map(self.source._get_info_smart, iterparse_until(self.source, "parentFile", "scan"))
        self.source.reset()
        file_info = list(file_info)
        fi = FileInformation({}, [])
        for parent in file_info:
            path = parent.get("fileName")
            if path is None:
                continue
            path = path.replace("file:///", '')
            fi.add_file(path, check=False)
        return fi

    def instrument_configuration(self):
        """Read the instrument configurations settings from the
        ``<msInstrument>`` elements.

        Returns
        -------
        list of InstrumentConfiguration
            A list of different instrument states that scans may be acquired under
        """
        instrument_configuration = map(self.source._get_info_smart, iterparse_until(
            self.source, "msInstrument", "scan"))
        self.source.reset()
        instrument_configuration = [
            self._convert_instrument(ic) for ic in instrument_configuration
        ]
        return instrument_configuration

    def _convert_instrument(self, configuration):
        try:
            detector = configuration.get('msDetector', {}).get('value')
        except AttributeError:
            detector = None
        if detector is not None:
            detector = component(detector)
        try:
            ionisation = configuration.get('msIonisation', {}).get('value')
        except AttributeError:
            ionisation = None
        if ionisation is not None:
            ionisation = component(ionisation)
        try:
            analyzer = configuration.get('msMassAnalyzer', {}).get('value')
        except AttributeError:
            analyzer = None
        if analyzer is not None:
            analyzer = component(analyzer)
        parts = [
            ComponentGroup("source", [ionisation], 1),
            ComponentGroup("analyzer", [analyzer], 2),
            ComponentGroup("detector", [detector], 3)
        ]
        inst_model = configuration.get("msManufacturer", {}).get('value')
        inst_software_conf = configuration.get("software", {})
        inst_software = None
        if inst_software_conf:
            inst_software = software.Software(
                inst_software_conf.get('name'),
                inst_software_conf.get('name'),
                inst_software_conf.get('version')
            )
        return InstrumentInformation(
            configuration.get('msInstrumentID', 1),
            parts,
            inst_model,
            software=inst_software)

    def software_list(self) -> List[software.Software]:
        sws = []
        softwares = map(self.source._get_info_smart, iterparse_until(
            self.source, "software", "scan"))
        self.source.reset()

        for sw in softwares:
            sws.append(
                software.Software(
                    sw.get('name'),
                    sw.get('name'),
                    sw.get('version')
                )
            )
        return sws

    def data_processing(self):
        data_processing = map(self.source._get_info_smart, iterparse_until(
            self.source, "dataProcessing", "scan"))
        self.source.reset()
        operation_groups = []
        for i, group in enumerate(data_processing):
            software_id = group.get("software", {}).get("name", "")
            method_group = data_transformation.ProcessingMethod(order=1, software_id=software_id)
            operations = group.get("processingOperation", [])
            if isinstance(operations, list):
                for op in operations:
                    method_group.add(op)
            else:
                method_group.add(operations)
            operation_groups.append(
                data_transformation.DataProcessingInformation([method_group], id=i))
        return operation_groups

    @property
    def id_format(self):
        return scan_number_only_id_format


class MzXMLDataInterface(ScanDataSource):
    """Provides implementations of all of the methods needed to implement the
    :class:`ScanDataSource` for mzXML files. Not intended for direct instantiation.
    """
    def _scan_arrays(self, scan):
        """Returns raw data arrays for m/z and intensity

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        mz: np.array
            An array of m/z values for this scan
        intensity: np.array
            An array of intensity values for this scan
        """
        try:
            return (scan['m/z array'], scan["intensity array"])
        except KeyError:
            return np.array([]), np.array([])

    def _precursor_information(self, scan):
        """Returns information about the precursor ion,
        if any, that this scan was derived form.

        Returns `None` if this scan has no precursor ion

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        PrecursorInformation
        """
        try:
            pinfo_dict = scan['precursorMz'][0]
            precursor_scan_id = pinfo_dict.get('precursorScanNum')
            pinfo = PrecursorInformation(
                mz=float(pinfo_dict['precursorMz']),
                intensity=float(pinfo_dict.get('precursorIntensity', 0.0)),
                charge=int(pinfo_dict.get('precursorCharge')) if pinfo_dict.get(
                    'precursorCharge') else ChargeNotProvided,
                precursor_scan_id=precursor_scan_id,
                source=self,
                product_scan_id=self._scan_id(scan))
            return pinfo
        except KeyError:
            return None

    def _scan_title(self, scan):
        """Returns a verbose name for this scan, if one
        was stored in the file. Usually includes both the
        scan's id string, as well as information about the
        original file and format.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        str
        """
        return self._scan_id(scan)

    def _scan_id(self, scan):
        """Returns the scan's id string, a unique
        identifier for this scan in the context of
        the data file it is recordered in

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        str
        """
        return scan["num"]

    def _scan_index(self, scan):
        """Returns the base 0 offset from the start
        of the data file in number of scans to reach
        this scan.

        If the original format does not natively include
        an index value, this value may be computed from
        the byte offset index.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        int
        """
        try:
            if self._scan_index_lookup is None:
                raise ValueError("Index Not Built")
            scan_index = self._scan_index_lookup[self._scan_id(scan)]
            return scan_index
        except KeyError:
            return -1
        except ValueError:
            return -2

    def _ms_level(self, scan):
        """Returns the degree of exponential fragmentation
        used to produce this scan.

        1 refers to a survey scan of unfragmented ions, 2
        refers to a tandem scan derived from an ms level 1
        ion, and so on.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        int
        """
        return int(scan['msLevel'])

    def _scan_time(self, scan):
        """Returns the time in minutes from the start of data
        acquisition to when this scan was acquired.

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        float
        """
        try:
            return scan['retentionTime']
        except KeyError:
            return None

    def _is_profile(self, scan):
        """Returns whether the scan contains profile data (`True`)
        or centroided data (`False`).

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        bool
        """
        try:
            return not bool(int(scan['centroided']))
        except KeyError:
            return True

    def _polarity(self, scan):
        """Returns whether this scan was acquired in positive mode (+1)
        or negative mode (-1).

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        int
        """
        try:
            if scan['polarity'] == '+':
                return 1
            else:
                return -1
        except KeyError:
            return None

    def _activation(self, scan):
        """Returns information about the activation method used to
        produce this scan, if any.

        Returns `None` for MS1 scans

        Parameters
        ----------
        scan : Mapping
            The underlying scan information storage,
            usually a `dict`

        Returns
        -------
        ActivationInformation
        """
        try:
            return ActivationInformation(
                scan['precursorMz'][0]['activationMethod'], scan['collisionEnergy'])
        except KeyError:
            return None

    def _isolation_window(self, scan):
        try:
            pinfo_dict = scan['precursorMz'][0]
            target = float(pinfo_dict['precursorMz'])
            width = float(pinfo_dict['windowWideness'])
            lower = width / 2
            upper = width / 2
            return IsolationWindow(lower, target, upper)
        except KeyError:
            try:
                pinfo_dict = scan['precursorMz'][0]
                target = float(pinfo_dict['precursorMz'])
                return IsolationWindow.make_empty(target)
            except KeyError:
                return None

    def _instrument_configuration(self, scan):
        try:
            return self._instrument_config[scan['msInstrumentID']]
        except KeyError:
            return None

    def _acquisition_information(self, scan):
        scan_event = ScanEventInformation(
            scan['retentionTime'],
            window_list=[
                ScanWindow(scan.get("lowMz"), scan.get("highMz"))
            ])
        return ScanAcquisitionInformation("no combination", [scan_event])


[docs]class MzXMLLoader(MzXMLDataInterface, XMLReaderBase, _MzXMLMetadataLoader): """Reads scans from mzXML files. Provides both iterative and random access. Attributes ---------- source_file: str Path to file to read from. source: pyteomics.mzxml.MzXML Underlying scan data source """ _parser_cls = _MzXMLParser def __init__(self, source_file, use_index=True, **kwargs): self.source_file = source_file self._source = _MzXMLParser(source_file, read_schema=True, iterative=True, huge_tree=True, use_index=use_index) self.initialize_scan_cache() self._use_index = use_index self._scan_index_lookup = None if self._use_index: self._build_scan_index_lookup() self._instrument_config = { k.id: k for k in self.instrument_configuration() } self.reset() self.make_iterator() @property def index(self): return self._source.index['scan'] def _get_scan_by_id_raw(self, scan_id): return self._source.get_by_id(scan_id, "num") def _build_scan_index_lookup(self): if not self._use_index: raise ValueError("Must index the entire file before sequential indices may computed.") index = dict() i = 0 for scan, _offset in self.index.items(): index[scan] = i i += 1 self._scan_index_lookup = index def _validate(self, scan): return "m/z array" in scan._data def _yield_from_index(self, scan_source, start=None): offset_provider = scan_source._offset_index['scan'] keys = list(offset_provider.keys()) if start is not None: if isinstance(start, basestring): start = keys.index(start) elif isinstance(start, int): start = start else: raise TypeError("Cannot start from object %r" % start) else: start = 0 for key in keys[start:]: scan = scan_source.get_by_id(key, "num") yield scan