Source code for ms_deisotope.data_source.thermo_raw_net

"""Thermo RAW file reading implementation using the pure .NET
RawFileReader library released in 2017.

This module provides :class:`ThermoRawLoader`, a :class:`~.RandomAccessScanSource`
implementation.

Depends upon the ``pythonnet`` project which provides the :mod:`clr`
module, enabling nearly seamless interoperation with the Common Language
Runtime.

The public interface of this module should be identical to
:mod:`ms_deisotope.data_source.thermo_raw`.

.. note::
    This interface was largely based upon the APIs that ProteoWizard used, both
    in order to understand how the Thermo libraries really worked, and to maintain
    parity with it.
"""

import sys
import os

from collections import OrderedDict
from typing import List

import numpy as np

from pyteomics.auxiliary import unitfloat

from six import string_types as basestring


from ms_peak_picker import PeakSet, PeakIndex, simple_peak

from ms_deisotope.data_source.common import (
    PrecursorInformation, ChargeNotProvided, Scan,
    ActivationInformation, MultipleActivationInformation,
    IsolationWindow, ScanDataSource, ScanEventInformation,
    ScanAcquisitionInformation, ScanWindow, RandomAccessScanSource)

from ms_deisotope.data_source._thermo_helper import (
    _InstrumentMethod, ThermoRawScanPtr, FilterString,
    _make_id, _id_template, _RawFileMetadataLoader, analyzer_map)

from ms_deisotope.data_source.metadata.activation import (
    supplemental_term_map, dissociation_methods_map)

from ms_deisotope.data_source.metadata import software
from ms_deisotope.data_source.metadata.sample import Sample
from ms_deisotope.data_source.metadata.scan_traits import FAIMS_compensation_voltage

def _try_number(string):
    try:
        x = float(string)
        return x
    except (TypeError, ValueError):
        return string


_DEFAULT_DLL_PATH = os.path.join(
    os.path.dirname(
        os.path.realpath(__file__)),
    "_vendor",
    "ThermoRawFileReader_3_0_41",
    "Libraries")


# late binding imports

Business = None
_RawFileReader = None
clr = None
NullReferenceException = Exception
Marshal = None
IntPtr = None
Int64 = None


def _open_raw_file(path):
    if not _test_dll_loaded():
        register_dll()
        if not _test_dll_loaded():
            raise ValueError(
                "Could not load .NET runtime or library. Verify `pythonnet` "
                "is installed correctly and the RawFileReader library is registered")
    raw_file = _RawFileReader.RawFileReaderAdapter.FileFactory(path)
    return raw_file


def is_thermo_raw_file(path):
    """Detect whether or not the file referenced by ``path``
    is a Thermo RAW file.

    Parameters
    ----------
    path: :class:`str`
        The path to test

    Returns
    -------
    :class:`bool`:
        Whether or not the file is a Thermo RAW file.
    """
    if not _test_dll_loaded():
        try:
            register_dll()
        except ImportError:
            return False
    with open(path, 'rb') as fh:
        lead_bytes = fh.read(32)
        decoded = lead_bytes.decode("utf-16")[1:9]
        if decoded == "Finnigan":
            try:
                source = _open_raw_file(path)
                source.SelectInstrument(Business.Device.MS, 1)
                return True
            except NullReferenceException:   # pylint: disable=broad-except
                return False
        return False


def infer_reader(path):
    """If the file referenced by ``path`` is a Thermo RAW
    file, return the callable (:class:`ThermoRawLoader`) to
    open it, otherwise raise an exception.

    Parameters
    ----------
    path: :class:`str`
        The path to test

    Returns
    -------
    :class:`type`:
        The type to use to open the file

    Raises
    ------
    :class:`ValueError`:
        If the file is not a Thermo RAW file
    """
    if is_thermo_raw_file(path):
        return ThermoRawLoader
    raise ValueError("Not Thermo Raw File")


def determine_if_available():
    """Checks whether or not the .NET-based Thermo
    RAW file reading feature is available.

    Returns
    -------
    :class:`bool`:
        Whether or not the feature is enabled.
    """
    try:
        return _register_dll([_DEFAULT_DLL_PATH])
    except (OSError, ImportError):
        return False


def _register_dll(search_paths=None):
    """Start the Common Language Runtime interop service by importing
    the :mod:`clr` module from Pythonnet, and then populate the global
    names referring to .NET entities, and finally attempt to locate the
    ThermoRawFileReader DLLs by searching alogn ``search_paths``.

    Parameters
    ----------
    search_paths: list
        The paths to check along for the ThermoRawFileReader DLL bundle.

    Returns
    -------
    :class:`bool`:
        Whether or not the .NET library successfully loaded
    """
    from ms_deisotope.config import get_config
    if search_paths is None:
        search_paths = []
    search_paths = list(search_paths)
    search_paths.append(_DEFAULT_DLL_PATH)
    # Take user-specified search paths first.
    search_paths = get_config().get('vendor_readers', {}).get('thermo-net', []) + search_paths
    global _RawFileReader, Business, clr, NullReferenceException   # pylint: disable=global-statement
    global Marshal, IntPtr, Int64   # pylint: disable=global-statement
    if _test_dll_loaded():
        return True
    try:
        import clr  # pylint: disable=redefined-outer-name
        from System import NullReferenceException  # pylint: disable=redefined-outer-name
        clr.AddReference("System.Runtime")
        clr.AddReference("System.Runtime.InteropServices")
        from System import IntPtr, Int64  # pylint: disable=redefined-outer-name
        from System.Runtime.InteropServices import Marshal  # pylint: disable=redefined-outer-name
    except ImportError:
        return False
    for path in search_paths:
        sys.path.append(path)
        try:
            clr.AddReference('ThermoFisher.CommonCore.RawFileReader')
            clr.AddReference('ThermoFisher.CommonCore.Data')
        except OSError:
            continue
        try:
            import ThermoFisher.CommonCore.Data.Business as Business  # pylint: disable=redefined-outer-name
            import ThermoFisher.CommonCore.RawFileReader as _RawFileReader  # pylint: disable=redefined-outer-name
        except ImportError:
            continue
    return _test_dll_loaded()


def register_dll(search_paths=None):
    """Register the location of the Thermo RawFileReader DLL bundle with
    the Common Language Runtime interop system and load the .NET symbols
    used by this feature.

    Parameters
    ----------
    search_paths: list
        The paths to check along for the ThermoRawFileReader DLL bundle.

    """
    if search_paths is None:
        search_paths = []
    loaded = _register_dll(search_paths)
    if not loaded:
        msg = """The ThermoFisher.CommonCore libraries could not be located and loaded."""
        raise ImportError(msg)


def _test_dll_loaded():
    return _RawFileReader is not None


def _copy_double_array(src):
    """A quick and dirty implementation of the fourth technique shown in
    https://mail.python.org/pipermail/pythondotnet/2014-May/001525.html for
    copying a .NET Array[Double] to a NumPy ndarray[np.float64] via a raw
    memory copy.

    ``int_ptr_tp`` must be an integer type that can hold a pointer. On Python 2
    this is :class:`long`, and on Python 3 it is :class:`int`.
    """
    # When the input .NET array pointer is None, return an empty array. On Py2
    # this would happen automatically, but not on Py3, and perhaps not safely on
    # all Py2 because it relies on pythonnet and the .NET runtime properly checking
    # for nulls.
    if src is None:
        return np.array([], dtype=np.float64)
    dest = np.empty(len(src), dtype=np.float64)
    Marshal.Copy(
        src, 0,
        IntPtr.__overloads__[Int64](dest.__array_interface__['data'][0]),
        len(src))
    return dest


class RawReaderInterface(ScanDataSource):
    """:class:`~.ScanDataSource` implementation for Thermo's RawFileReader API.
    Not intended for direct instantiation.
    """

    def _scan_arrays(self, scan):
        scan_number = scan.scan_number + 1
        stats = self._source.GetScanStatsForScanNumber(scan_number)
        segscan = self._source.GetSegmentedScanFromScanNumber(scan_number, stats)
        mzs = _copy_double_array(segscan.Positions)
        inten = _copy_double_array(segscan.Intensities)
        return mzs, inten

    def _pick_peaks_vendor(self, scan, *args, **kwargs):
        scan_info = Business.Scan.FromFile(self._source, scan.scan_number + 1)
        if scan_info.HasCentroidStream:
            stream = self._source.GetCentroidStream(scan.scan_number + 1, 0)
            mzs = stream.Masses
            intens = stream.Intensities
            peaks = PeakSet([simple_peak(mzs[i], intens[i], 0.001) for i in range(len(mzs))])
            peaks.reindex()
            arrays = self._scan_arrays(scan)
            return PeakIndex(arrays[0], arrays[1], peaks)
        else:
            raise NotImplementedError()

    def _scan_id(self, scan):
        scan_number = scan.scan_number
        return _make_id(scan_number + 1)

    def _is_profile(self, scan):
        return not self._source.IsCentroidScanFromScanNumber(
            scan.scan_number + 1)

    def _polarity(self, scan):
        filter_string = self._filter_string(scan)
        return filter_string.data['polarity']

    def _scan_title(self, scan):
        return "%s %r" % (self._scan_id(scan), self._filter_string(scan))

    def _filter_string(self, scan):
        if scan.filter_string is None:
            scan_number = scan.scan_number
            scan.filter_string = FilterString(self._source.GetFilterForScanNumber(scan_number + 1).Filter)
        return scan.filter_string

    def _scan_index(self, scan):
        scan_number = scan.scan_number
        return scan_number

    def _scan_time(self, scan):
        scan_number = scan.scan_number
        return self._source.RetentionTimeFromScanNumber(scan_number + 1)

    def _ms_level(self, scan):
        scan_number = scan.scan_number
        f = self._source.GetFilterForScanNumber(scan_number + 1)
        return f.MSOrder

    def _isolation_window(self, scan):
        scan_number = scan.scan_number
        ms_level = self._ms_level(scan)
        width = 0
        trailer = self._trailer_values(scan)
        filt = self._source.GetFilterForScanNumber(scan_number + 1)
        seq_index = filt.MSOrder - 2
        try:
            # Fetch the isolation window width from the old location first, which
            # will be correct on old files, where the new API won't be right.
            width = trailer['MS%d Isolation Width' % ms_level]
        except KeyError:
            # Fall back to the new API, which is akin to our only hope here?
            width = filt.GetIsolationWidth(seq_index)
        width /= 2.0
        offset = filt.GetIsolationWidthOffset(seq_index)
        precursor_mz = filt.GetMass(seq_index)
        return IsolationWindow(width, precursor_mz + offset, width)

    def _trailer_values(self, scan):
        if scan.trailer_values is not None:
            return scan.trailer_values
        scan_number = scan.scan_number
        trailers = self._source.GetTrailerExtraInformation(scan_number + 1)
        scan.trailer_values = OrderedDict(
            zip([label.strip(":") for label in trailers.Labels], map(_try_number, trailers.Values)))
        return scan.trailer_values

    def _precursor_information(self, scan):
        scan_number = scan.scan_number
        filt = self._source.GetFilterForScanNumber(scan_number + 1)
        precursor_mz = filt.GetMass(filt.MSOrder - 2)
        trailers = self._trailer_values(scan)
        _precursor_mz = float(trailers.get("Monoisotopic M/Z", 0))
        if _precursor_mz > 0:
            precursor_mz = _precursor_mz

        # imitate proteowizard's firmware bug correction
        isolation_window = self._isolation_window(scan)
        if (isolation_window.upper + isolation_window.lower) / 2 <= 2.0:
            if (isolation_window.target - 3.0 > precursor_mz) or (isolation_window.target + 2.5 < precursor_mz):
                precursor_mz = isolation_window.target
        elif precursor_mz not in isolation_window:
            precursor_mz = isolation_window.target
        charge = int(trailers.get("Charge State", 0))
        if charge == 0:
            charge = ChargeNotProvided
        inten = 0
        precursor_scan_number = None
        precursor_scan_number = trailers.get('Master Scan Number')
        if precursor_scan_number == 0:
            precursor_scan_number = None
        if precursor_scan_number is not None:
            precursor_scan_number = int(precursor_scan_number) - 1
            if self.get_scan_by_index(precursor_scan_number).ms_level >= self._ms_level(scan):
                precursor_scan_number = None
        if precursor_scan_number is None:
            current_level = self._ms_level(scan)
            # We want to start looking for the most recent spectrum with the next lowest MS
            # level. The expecteation is that we couldn't determine the precursor scan accurately,
            # so we'll probe backwards until the next best candidate.
            current_index = self._scan_index(scan)
            # Use the index of the current scan to check the index of the previous scans at a lower
            # MS level. This may be None, in which case there is no earlier scan recorded.
            lookup = self._get_previous_scan_index_for_ms_level(current_index, current_level - 1)
            if lookup is not None:
                last_index = lookup
            else:
                last_index = current_index - 1
            i = 0
            while last_index >= 0 and i < 100:
                prev_scan = self.get_scan_by_index(last_index)
                if prev_scan.ms_level >= current_level:
                    last_index -= 1
                else:
                    precursor_scan_number = prev_scan._data.scan_number
                    break
                i += 1
        if precursor_scan_number is not None:
            precursor_scan_id = self.get_scan_by_index(precursor_scan_number).id
        else:
            import warnings
            warnings.warn("Could not resolve precursor scan for %s" % (self._scan_id(scan), ))
            precursor_scan_id = None
        return PrecursorInformation(
            precursor_mz, inten, charge, precursor_scan_id,
            source=self, product_scan_id=self._scan_id(scan))

    def _get_scan_segment(self, scan):
        trailer = self._trailer_values(scan)
        try:
            return int(trailer['Scan Segment'])
        except KeyError:
            return 1

    def _get_scan_event(self, scan):
        trailer = self._trailer_values(scan)
        try:
            return int(trailer['Scan Event'])
        except KeyError:
            return 1

    def _activation(self, scan):
        filter_string = self._filter_string(scan)
        tandem_sequence = filter_string.get("tandem_sequence")
        # If the tandem sequence exists, the last entry is the most recent tandem acquisition.
        # It will list contain one or more activation types. Alternatively, multiple activations
        # of the same precursor may exist in the list as separate events in the tandem sequence.
        if tandem_sequence is not None:
            activation_event = tandem_sequence[-1]
            activation_type = list(activation_event.get("activation_type"))
            has_supplemental_activation = filter_string.get("supplemental_activation")

            if activation_type is not None:
                energy = list(activation_event.get("activation_energy"))
                if len(tandem_sequence) > 1:
                    prev_event = tandem_sequence[-2]
                    # Merge previous tandem sequences of the same precursor
                    if abs(prev_event['isolation_mz'] - activation_event['isolation_mz']) < 1e-3:
                        activation_type = list(prev_event.get("activation_type")) + activation_type
                        energy = list(prev_event.get("activation_energy")) + energy
                        has_supplemental_activation = True

                if has_supplemental_activation and len(activation_type) > 1:
                    activation_type.append(supplemental_term_map[
                        dissociation_methods_map[activation_type[-1]]])
                if len(activation_type) == 1:
                    return ActivationInformation(activation_type[0], energy[0])
                else:
                    return MultipleActivationInformation(activation_type, energy)
        return None

    def _acquisition_information(self, scan):
        fline = self._filter_string(scan)
        event = self._get_scan_event(scan)
        trailer_extras = self._trailer_values(scan)
        traits = {
            'preset scan configuration': event,
            'filter string': fline,
        }
        cv = fline.get("compensation_voltage")
        if cv is not None:
            traits[FAIMS_compensation_voltage] = cv
        event = ScanEventInformation(
            self._scan_time(scan),
            injection_time=unitfloat(trailer_extras.get('Ion Injection Time (ms)', 0.0), 'millisecond'),
            window_list=[ScanWindow(
                fline.get("scan_window")[0], fline.get("scan_window")[1])], traits=traits)
        return ScanAcquisitionInformation("no combination", [event])

    def _instrument_configuration(self, scan):
        fline = self._filter_string(scan)
        try:
            confid = self._analyzer_to_configuration_index[analyzer_map[fline.data.get("analyzer")]]
            return self._instrument_config[confid]
        except KeyError:
            return None

    def _annotations(self, scan):
        fline = self._filter_string(scan)
        trailer_extras = self._trailer_values(scan)
        annots = {
            "filter string": fline,
        }
        microscans = trailer_extras.get("Micro Scan Count")
        if microscans is not None:
            annots['[Thermo Trailer Extra]Micro Scan Count'] = float(microscans)
        scan_segment = trailer_extras.get("Scan Segment")
        if scan_segment is not None:
            annots['[Thermo Trailer Extra]Scan Segment'] = int(scan_segment)
        scan_event = trailer_extras.get("Scan Event")
        if scan_event is not None:
            annots['[Thermo Trailer Extra]Scan Event'] = int(scan_event)
        mono_mz = float(trailer_extras.get("Monoisotopic M/Z", 0))
        if mono_mz is not None and mono_mz > 0:
            annots['[Thermo Trailer Extra]Monoisotopic M/Z'] = mono_mz
        hcd_ev = _trailer_float(trailer_extras.get('HCD Energy eV'))
        if hcd_ev is not None and hcd_ev > 0:
            annots['[Thermo Trailer Extra]HCD Energy eV'] = float(hcd_ev)
        hcd_energies = trailer_extras.get('HCD Energy')
        if hcd_energies is not None:
            if isinstance(hcd_energies, basestring) and not hcd_energies.strip():
                pass
            else:
                annots['[Thermo Trailer Extra]HCD Energy'] = hcd_energies
        return annots


def _trailer_float(value):
    if value is None:
        return None
    try:
        value = value.strip()
    except AttributeError:
        return value
    if not value:
        return None
    try:
        return float(value)
    except ValueError:
        return None


[docs]class ThermoRawLoader(RawReaderInterface, RandomAccessScanSource, _RawFileMetadataLoader): """Reads scans from Thermo Fisher RAW files directly. Provides both iterative and random access. """ def __init__(self, source_file, _load_metadata=True, **kwargs): if not _test_dll_loaded(): register_dll() if not _test_dll_loaded(): raise ValueError( "Could not load .NET runtime or library. Verify `pythonnet` " "is installed correctly and the RawFileReader library is registered") self.source_file = source_file self._source_impl = None # self._source = _RawFileReader.RawFileReaderAdapter.FileFactory(source_file) # self._source.SelectInstrument(Business.Device.MS, 1) self._producer = None self._scan_type_index = dict() self._method = None self._analyzer_to_configuration_index = {} self._instrument_config = {} self.make_iterator() self.initialize_scan_cache() self._first_scan_time = self.get_scan_by_index(0).scan_time self._last_scan_time = self.get_scan_by_index( self._source.RunHeaderEx.LastSpectrum - 1).scan_time self._index = self._pack_index() if _load_metadata: self._method = self._parse_method() self._build_scan_type_index() self._get_instrument_info()
[docs] def software_list(self) -> List[software.Software]: inst_data = self._source.GetInstrumentData() sw_list = [ software.Software('Xcalibur', 'Xcalibur', inst_data.SoftwareVersion) ] return sw_list
@property def _source(self): if self._source_impl is None: self._source_impl = _open_raw_file(self.source_file) self._source_impl.SelectInstrument(Business.Device.MS, 1) return self._source_impl @_source.setter def _source(self, value): self._source_impl = value def _has_ms1_scans(self): if self._scan_type_index: return 1 in self._scan_type_index else: # metadata has not been loaded so best to assume there is return True def _has_msn_scans(self): if self._scan_type_index: return max(self._scan_type_index) > 1 else: # metadata has not been loaded so best to assume there is return True
[docs] def has_msn_scans(self): return self._has_msn_scans()
[docs] def has_ms1_scans(self): return self._has_ms1_scans()
def __reduce__(self): reduced = self.__class__, (self.source_file, False), self.__getstate__() self._close_handle() return reduced def __getstate__(self): state = { "method": self._method, "scan_type_index": self._scan_type_index, "analyzer_to_configuration_index": self._analyzer_to_configuration_index, "instrument_config": self._instrument_config, "previous_ms_levels": self._previous_ms_levels, } return state def __setstate__(self, state): self._method = state['method'] self._scan_type_index = state['scan_type_index'] self._analyzer_to_configuration_index = state['analyzer_to_configuration_index'] self._instrument_config = state['instrument_config'] self._previous_ms_levels = state['previous_ms_levels'] @property def index(self): """Accesses the scan index Returns ------- :class:`collections.OrderedDict` Maps scan ID to index """ return self._index def __len__(self): return len(self.index) def __repr__(self): return "ThermoRawLoader(%r)" % (self.source_file) def _close_handle(self): if self._source is not None: self._source.Close() self._source = None
[docs] def close(self): """Close the underlying file reader. """ self._close_handle() self._dispose()
def __del__(self): self.close()
[docs] def reset(self): self.make_iterator(None) self.initialize_scan_cache()
def _pack_index(self): index = OrderedDict() for sn in range(self._source.RunHeaderEx.FirstSpectrum, self._source.RunHeaderEx.LastSpectrum + 1): index[_make_id(sn)] = sn return index def _get_instrument_model_name(self): return self._source.GetInstrumentData().Model def _get_instrument_serial_number(self): return self._source.GetInstrumentData().SerialNumber def _parse_method(self): try: method_count = self._source.InstrumentMethodsCount if method_count == 0: return _InstrumentMethod('') # the data acquisition method should be the last method return _InstrumentMethod(self._source.GetInstrumentMethod(method_count - 1)) except NullReferenceException: # pylint: disable=broad-except return _InstrumentMethod('') def _scan_time_to_scan_number(self, scan_time): scan_number = self._source.ScanNumberFromRetentionTime(scan_time) - 1 return scan_number
[docs] def samples(self): """Describe the sample(s) used to generate the mass spectrometry data contained in this file. Returns ------- :class:`list` of :class:`~.Sample` """ result = [] si = self._source.SampleInformation raw_name = os.path.basename(self.source_file_name) sample = Sample(si.SampleId or raw_name) sample.name = si.SampleName or si.SampleId if si.SampleVolume: sample.parameters['sample volume'] = si.SampleVolume if si.SampleWeight: sample.parameters['sample mass'] = si.SampleWeight if si.Vial: sample.parameters['sample vial'] = si.Vial if si.Barcode: sample.parameters['sample barcode'] = si.Barcode result.append(sample) return result
[docs] def get_scan_by_index(self, index): """Retrieve the scan object for the specified scan index. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- index: int The index to get the scan for Returns ------- Scan """ scan_number = int(index) try: return self._scan_cache[scan_number] except KeyError: package = ThermoRawScanPtr(scan_number) if not package.validate(self): raise IndexError(index) scan = Scan(package, self) self._scan_cache[scan_number] = scan return scan
[docs] def get_scan_by_id(self, scan_id): """Retrieve the scan object for the specified scan id. If the scan object is still bound and in memory somewhere, a reference to that same object will be returned. Otherwise, a new object will be created. Parameters ---------- scan_id : str The unique scan id value to be retrieved Returns ------- Scan """ scan_number = int(str(scan_id).replace(_id_template, '')) - 1 try: return self._scan_cache[scan_number] except KeyError: package = ThermoRawScanPtr(scan_number) if not package.validate(self): raise KeyError(str(scan_id)) scan = Scan(package, self) self._scan_cache[scan_number] = scan return scan
[docs] def get_scan_by_time(self, time): """Retrieve the scan object for the specified scan time. This internally calls :meth:`get_scan_by_id` which will use its cache. Parameters ---------- time : float The time to get the nearest scan from Returns ------- Scan """ if time < self._first_scan_time: time = self._first_scan_time elif time > self._last_scan_time: time = self._last_scan_time scan_number = self._scan_time_to_scan_number(time) try: return self._scan_cache[scan_number] except KeyError: package = ThermoRawScanPtr(scan_number) scan = Scan(package, self) self._scan_cache[scan_number] = scan return scan
[docs] def start_from_scan(self, scan_id=None, rt=None, index=None, require_ms1=True, grouped=True, **kwargs): """Reconstruct an iterator which will start from the scan matching one of ``scan_id``, ``rt``, or ``index``. Only one may be provided. After invoking this method, the iterator this object wraps will be changed to begin yielding scan bunchs (or single scans if ``grouped`` is ``False``). Arguments --------- scan_id: str, optional Start from the scan with the specified id. rt: float, optional Start from the scan nearest to specified time (in minutes) in the run. If no exact match is found, the nearest scan time will be found, rounded up. index: int, optional Start from the scan with the specified index. require_ms1: bool, optional Whether the iterator must start from an MS1 scan. True by default. grouped: bool, optional whether the iterator should yield scan bunches or single scans. True by default. """ if scan_id is not None: scan_number = int(str(scan_id).replace(_id_template, '')) - 1 elif index is not None: scan_number = int(index) elif rt is not None: scan_number = self._scan_time_to_scan_number(rt) if require_ms1: start_index = scan_number while start_index != 0: scan = self.get_scan_by_index(start_index) if scan.ms_level > 1: start_index -= 1 else: break scan_number = start_index iterator = self._make_pointer_iterator(start_index=scan_number) if grouped: self._producer = self._scan_group_iterator(iterator, grouped, **kwargs) else: self._producer = self._single_scan_iterator( iterator, grouped, **kwargs) return self
def _make_scan_index_producer(self, start_index=None, start_time=None): if start_index is not None: return range(start_index, self._source.RunHeaderEx.LastSpectrum) elif start_time is not None: start_index = self._scan_time_to_scan_number(start_time) while start_index != 0: scan = self.get_scan_by_index(start_index) if scan.ms_level > 1: start_index -= 1 else: break return range(start_index, self._source.RunHeaderEx.LastSpectrum) else: return range(0, self._source.RunHeaderEx.LastSpectrum) def _make_pointer_iterator(self, start_index=None, start_time=None): iterator = self._make_scan_index_producer(start_index, start_time) for i in iterator: yield ThermoRawScanPtr(i) def _make_default_iterator(self): return self._make_pointer_iterator() def _make_cache_key(self, scan): return scan._data.scan_number
[docs] def next(self): return next(self._producer)