Source code for ms_deisotope.data_source.scan.scan

"""Represent the basic structures of a mass spectrum and its processed contents,
and provide an interface for manipulating that data.
"""
from typing import Any, Dict, Iterator, List, Optional, Union, TYPE_CHECKING
import warnings

from ms_deisotope.peak_set import DeconvolutedPeak, DeconvolutedPeakSet
try:
    from collections.abc import Sequence
except ImportError:
    from collections import Sequence
import logging
import numpy as np

from ms_peak_picker import (
    pick_peaks, reprofile, average_signal,
    scan_filter, PeakIndex, PeakSet, FittedPeak)

from ms_deisotope.utils import decimal_shift
from ms_deisotope.deconvolution import deconvolute_peaks

from ms_deisotope.data_source.metadata.scan_traits import (
    IsolationWindow,
    ScanAcquisitionInformation)

from ms_deisotope.data_source.metadata.activation import ActivationInformation
from ms_deisotope.data_source.metadata.instrument_components import InstrumentInformation


from .base import (ScanBase, RawDataArrays, PrecursorInformation)


if TYPE_CHECKING:
    from .loader import ScanDataSource


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


DEFAULT_DX: float = 0.002


[docs]class Scan(ScanBase): """Container for mass spectral data and associated descriptive information. A :class:`Scan` object is a generic object intended to be created by a :class:`ScanDataSource` and describes a mass spectrum at each level of processing (Profile --> Peak Fitted --> Deconvoluted). The raw object provided by the source is wrapped and queried lazily when an attribute is requested, delegated through :attr:`source`. To access the raw mass spectral signal, see :attr:`arrays`. This can be converted to a centroided peak list using :meth:`pick_peaks`, populating :attr:`peak_set`. After picking peaks, :meth:`deconvolute` can be used to deconvolute the spectrum, populating :attr:`deconvoluted_peak_set`. To apply signal processing filters to the spectrum, :meth:`reprofile` can create a copy of the object replacing :attr:`arrays` with a synthetic profile created from the fitted peaks in :attr:`peak_set`. Multiple spectra may be averaged together to form a new synthetic profile spectrum using :meth:`average_with` (when the scans are already chosen) or :meth:`average` (when selecting nearby scans automatically, MS1-only). The :meth:`denoise` method can remove local unstructured noise. :meth:`transform` can apply any filter from :mod:`ms_peak_picker.scan_filter`, returning a copy. Attributes ---------- deconvoluted_peak_set : :class:`ms_deisotope.DeconvolutedPeakSet` or None Deconvoluted peaks resulting from charge state deconvolution and deisotoping. Will be `None` if deconvolution has not been done. peak_set : :class:`ms_peak_picker.PeakSet` or None Picked peaks and (possibly) associated raw data points as produced by :meth:`pick_peaks`. Will be `None` if peak picking has not been done. product_scans : list A list of :class:`Scan` instances which were produced by fragmenting ions from this one. This attribute is not guaranteed to be populated depending upon how the scan is loaded. source : :class:`ScanDataSource` The object which produced this scan and which defines the methods for retrieving common attributes from the underlying data structures. precursor_information: :class:`PrecursorInformation` or None Descriptive metadata for the ion which was chosen for fragmentation, and a reference to the precursor scan arrays: :class:`RawDataArrays` A pair of :class:`numpy.ndarray` objects corresponding to the raw m/z and intensity data points id: str The unique identifier for this scan as given by the source title: str The human-readable display string for this scan as shown in some external software ms_level: int The degree of fragmentation performed. 1 corresponds to a MS1 or "Survey" scan, 2 corresponds to MS/MS, and so on. If :attr:`ms_level` > 1, the scan is considered a "tandem scan" or "MS^n" scan scan_time: float The time the scan was acquired during data acquisition. The unit of time will always be minutes. drift_time: float or None The time measured by the ion mobility spectrometer for this scan or frame. This quantity is None if the scan does not have ion mobility information associated with it, which is usually recorded in :attr:`acquisition_information` index: int The integer number indicating how many scans were acquired prior to this scan. is_profile: bool Whether this scan's raw data points corresponds to a profile scan or whether the raw data was pre-centroided. polarity: int If the scan was acquired in positive mode, the value ``+1``. If the scan was acquired in negative mode, the value ``-1``. May be used to indicating how to calibrate charge state determination methods. activation: :class:`.ActivationInformation` or None If this scan is an MS^n scan, this attribute will contain information about the process used to produce it from its parent ion. instrument_configuration: :class:`~.InstrumentInformation` The instrument configuration used to acquire this scan. acquisition_information: :class:`.ScanAcquisitionInformation` or None Describes the type of event that produced this scan, as well as the scanning method used. isolation_window: :class:`.IsolationWindow` or None Describes the range of m/z that were isolated from a parent scan to create this scan annotations: dict A set of key-value pairs describing the scan not part of the standard interface """ def __init__(self, data, source, peak_set=None, deconvoluted_peak_set=None, product_scans=None, annotations=None): if product_scans is None: product_scans = [] if annotations is None: annotations = dict() self.source = source self.peak_set = peak_set self.deconvoluted_peak_set = deconvoluted_peak_set self._data = data self._arrays = None self._id = None self._title = None self._ms_level = None self._scan_time = None self._precursor_information = None self._index = None self._is_profile = None self._polarity = None self._activation = None self._acquisition_information = None self._isolation_window = None self._instrument_configuration = None self._annotations = None self._external_annotations = annotations self.product_scans = product_scans
[docs] def clone(self, deep=True) -> 'Scan': """Return a copy of the :class:`Scan` object wrapping the same reference data, potentially a deep one Parameters ---------- deep: :class:`bool` Returns ------- :class:`Scan` """ dup = self.__class__( self._data, self.source, self.peak_set.clone() if self.peak_set is not None else None if deep else self.peak_set, self.deconvoluted_peak_set.clone() if self.deconvoluted_peak_set is not None else None if deep else self.deconvoluted_peak_set, [s.clone(deep=deep) for s in self.product_scans], self._external_annotations.copy()) return dup
def _load(self): _ = self.arrays _ = self.id _ = self.title _ = self.ms_level _ = self.scan_time _ = self.index _ = self.polarity _ = self.precursor_information _ = self.activation _ = self.acquisition_information _ = self.isolation_window _ = self.is_profile _ = self.instrument_configuration _ = self.annotations _ = None del _ return self def _unload(self): self._arrays = None self._id = None self._title = None self._ms_level = None self._scan_time = None self._precursor_information = None self._index = None self._is_profile = None self._polarity = None self._activation = None self._acquisition_information = None self._isolation_window = None self._instrument_configuration = None
[docs] def clear(self, full=False): """Releases all associated in-memory data and clears the cached attributes. The data reference attribute :attr:`_data` is retained and unchanged. Parameters ---------- full: bool Whether to clear more attributes to aggressively free memory. """ if self.source is not None: self.source._scan_cleared(self) self._unload() if full: self.peak_set = None self.deconvoluted_peak_set = None self.product_scans = [] self._external_annotations = {}
@property def ms_level(self) -> int: """The degree of fragmentation performed. 1 corresponds to a MS1 or "Survey" scan, 2 corresponds to MS/MS, and so on. If :attr:`ms_level` > 1, the scan is considered a "tandem scan" or "MS^n" scan Returns ------- :class:`int` """ if self._ms_level is None: self._ms_level = self.source._ms_level(self._data) return self._ms_level @ms_level.setter def ms_level(self, value): self._ms_level = int(value) @property def is_profile(self) -> bool: """Whether this scan's raw data points corresponds to a profile scan or whether the raw data was pre-centroided. Returns ------- :class:`bool` """ if self._is_profile is None: self._is_profile = self.source._is_profile(self._data) return self._is_profile @is_profile.setter def is_profile(self, value): self._is_profile = bool(value) @property def polarity(self) -> int: """If the scan was acquired in positive mode, the value ``+1``. If the scan was acquired in negative mode, the value ``-1``. May be used to indicating how to calibrate charge state determination methods. Returns ------- :class:`int` """ if self._polarity is None: self._polarity = self.source._polarity(self._data) return self._polarity @polarity.setter def polarity(self, value): self._polarity = int(value) @property def scan_time(self) -> float: """The time the scan was acquired during data acquisition. The unit of time will always be minutes. Returns ------- :class:`float` """ if self._scan_time is None: self._scan_time = self.source._scan_time(self._data) return self._scan_time @scan_time.setter def scan_time(self, value): self._scan_time = float(value) @property def arrays(self) -> RawDataArrays: """A pair of :class:`numpy.ndarray` objects corresponding to the raw m/z and intensity data points. These arrays are wrapped in a :class:`~.RawDataArrays` instance, which provides additional methods. Returns ------- :class:`~.RawDataArrays` """ if self._arrays is None: self._arrays = RawDataArrays(*self.source._scan_arrays(self._data)) return self._arrays @arrays.setter def arrays(self, value): if isinstance(value, RawDataArrays) or value is None: self._arrays = value elif isinstance(value, Sequence): if len(value) == 2: self._arrays = RawDataArrays(*map(np.asanyarray, value)) elif len(value) == 3: self._arrays = RawDataArrays(*map(np.asanyarray, value[:2]), arrays=dict(value[2])) else: raise ValueError("Too many values to convert. Please provide two arrays, " "or two arrays and a dictionary of additional arrays.") else: raise TypeError( "arrays must be an instance of RawDataArrays or a pair of numpy arrays") @property def title(self) -> str: """The human-readable display string for this scan as shown in some external software. Returns ------- :class:`str` """ if self._title is None: self._title = self.source._scan_title(self._data) return self._title @title.setter def title(self, value): self._title = value @property def id(self) -> str: """The within run unique scan identifier. Returns ------- :class:`str` """ if self._id is None: self._id = self.source._scan_id(self._data) return self._id @id.setter def id(self, value): self._id = value scan_id = id @property def index(self) -> int: """The integer number indicating how many scans were acquired prior to this scan. Returns ------- :class:`int` """ if self._index is None: self._index = self.source._scan_index(self._data) return self._index @index.setter def index(self, value): self._index = int(value) @property def precursor_information(self) -> Optional[PrecursorInformation]: """Descriptive metadata for the ion which was chosen for fragmentation, and a reference to the precursor scan. Returns ------- :class:`~.PrecursorInformation`""" if self.ms_level < 2: return None if self._precursor_information is None: self._precursor_information = self.source._precursor_information( self._data) return self._precursor_information @precursor_information.setter def precursor_information(self, value): if not isinstance(value, PrecursorInformation) and value is not None: raise TypeError("precursor_information must be a %r instance" % ( PrecursorInformation, )) self._precursor_information = value @property def activation(self) -> Optional[ActivationInformation]: """If this scan is an MS^n scan, this attribute will contain information about the process used to produce it from its parent ion. Returns ------- :class:`~.ActivationInformation` """ if self.ms_level < 2: return None if self._activation is None: self._activation = self.source._activation(self._data) return self._activation @activation.setter def activation(self, value): if not isinstance(value, ActivationInformation) and value is not None: raise TypeError( "activation must be an %r instance" % (ActivationInformation, )) self._activation = value @property def isolation_window(self) -> Optional[IsolationWindow]: """Describes the range of m/z that were isolated from a parent scan to create this scan. Returns ------- :class:`~.IsolationWindow` """ if self.ms_level < 2: return None if self._isolation_window is None: self._isolation_window = self.source._isolation_window(self._data) return self._isolation_window @isolation_window.setter def isolation_window(self, value): if isinstance(value, IsolationWindow) or value is None: self._isolation_window = value elif isinstance(value, Sequence): if len(value) == 2: lo, hi = value width = (hi - lo) / 2. center = lo + width self._isolation_window = IsolationWindow(center, width, width) elif len(value) == 3: lo, center, hi = value self._isolation_window = IsolationWindow(lo, center, hi) else: raise ValueError("Could not convert %r to an %r" % (value, IsolationWindow)) else: raise TypeError( "isolation_window must be an either an %r instance or a sequence of two or three elements" % ( IsolationWindow)) @property def acquisition_information(self) -> ScanAcquisitionInformation: """Describes the type of event that produced this scan, as well as the scanning method used.""" if self._acquisition_information is None: self._acquisition_information = self.source._acquisition_information( self._data) return self._acquisition_information @acquisition_information.setter def acquisition_information(self, value): if not isinstance(value, ScanAcquisitionInformation) and value is not None: raise TypeError("acquisition_information must be an instance of %r" % ( ScanAcquisitionInformation, )) self._acquisition_information = value @property def instrument_configuration(self) -> InstrumentInformation: """The instrument configuration used to acquire this scan.""" if self._instrument_configuration is None: self._instrument_configuration = self.source._instrument_configuration( self._data) return self._instrument_configuration @instrument_configuration.setter def instrument_configuration(self, value): if not isinstance(value, InstrumentInformation) and value is not None: raise TypeError("instrument_configuration must be an instance of %r" % ( InstrumentInformation, )) self._instrument_configuration = value @property def annotations(self) -> Dict[str, Any]: """A set of key-value pairs describing the scan not part of the standard interface""" if self._annotations is None: self._annotations = self.source._annotations(self._data) self._annotations.update(self._external_annotations) return self._annotations @annotations.setter def annotations(self, value): self._external_annotations = dict(value) self._annotations = self._external_annotations.copy()
[docs] def bind(self, source: 'ScanDataSource'): super(Scan, self).bind(source) self.source = source return self
[docs] def unbind(self): super(Scan, self).unbind() self.source = None return self
def __repr__(self): try: precursor_information = self.precursor_information except KeyError: precursor_information = None return "Scan(%r, index=%d, time=%0.4f, ms_level=%r%s)" % ( self.id, (self.index if self.index is not None else -1), ( self.scan_time if self.scan_time is not None else -1), self.ms_level, ", " + repr(precursor_information) if precursor_information else '') # peak manipulation def __iter__(self) -> Iterator[FittedPeak]: if self.peak_set is None: raise ValueError("Cannot iterate over peaks in a scan that has not been " "centroided. Call `pick_peaks` first.") return iter(self.peak_set) def __getitem__(self, i) -> FittedPeak: if self.peak_set is None: raise ValueError("Cannot retrieve peaks in a scan that has not been " "centroided. Call `pick_peaks` first.") return self.peak_set[i] def __len__(self): if self.peak_set is None: raise ValueError("Cannot count peaks in a scan that has not been " "centroided. Call `pick_peaks` first.") return len(self.peak_set) def __bool__(self): return bool(self._data) def __nonzero__(self): return self.__bool__()
[docs] def has_peak(self, *args, **kwargs) -> FittedPeak: """A wrapper around :meth:`ms_peak_picker.PeakSet.has_peak` to query the :class:`ms_peak_picker.FittedPeak` objects picked for this scan. Parameters ---------- mz: float The m/z to search for error_tolerance: float The parts per million mass error tolerance to use Returns ------- :class:`ms_peak_picker.FittedPeak` or None The peak closest to the query m/z within the error tolerance window or None if there are no peaks satisfying the requirements Raises ------ ValueError: If the scan has not yet had peaks picked yet See Also -------- :meth:`.Scan.pick_peaks` """ if self.peak_set is None: raise ValueError("Cannot search for peaks in a scan that has not been " "centroided. Call `pick_peaks` first.") return self.peak_set.has_peak(*args, **kwargs)
[docs] def pick_peaks(self, *args, **kwargs): """A wrapper around :func:`ms_peak_picker.pick_peaks` which will populate the :attr:`peak_set` attribute of this scan. Parameters ---------- fit_type : str, optional The name of the peak model to use. One of "quadratic", "gaussian", "lorentzian", or "apex" signal_to_noise_threshold : int, optional Minimum signal-to-noise measurement to accept a peak intensity_threshold : int, optional Minimum intensity measurement to accept a peak threshold_data : bool, optional Whether to apply thresholds to the data target_envelopes : list, optional A sequence of (start m/z, end m/z) pairs, limiting peak picking to only those intervals transforms : list, optional A list of :class:`scan_filter.FilterBase` instances or callable that accepts (mz_array, intensity_array) and returns (mz_array, intensity_array) or `str` matching one of the premade names in `scan_filter.filter_register` verbose : bool, optional Whether to log extra information while picking peaks start_mz : float, optional A minimum m/z value to start picking peaks from stop_mz : float, optional A maximum m/z value to stop picking peaks after *args : Passed along to :func:`ms_peak_picker.pick_peaks` **kwargs : Passed along to :func:`ms_peak_picker.pick_peaks` Returns ------- Scan Returns self """ # Check to see if the user requested one of the ms_peak_picker fits or wanted # to use the vendor peak picker if provided. fit_type_k = kwargs.get("fit_type") if len(args) > 0: fit_type_a = args[0] else: fit_type_a = None if fit_type_k == 'vendor' or fit_type_a == 'vendor': try: peaks = self.source._pick_peaks_vendor(self._data, *args, **kwargs) self.peak_set = peaks return self except NotImplementedError: pass # Prepare the peak picking parameters mzs, intensities = self.arrays if len(mzs) == 0: self.peak_set = PeakIndex(mzs, intensities, PeakSet([])) return self if self.is_profile: peak_mode = 'profile' else: peak_mode = 'centroid' kwargs.setdefault('peak_mode', peak_mode) self.peak_set = pick_peaks(mzs, intensities, *args, **kwargs) return self
[docs] def deconvolute(self, *args, **kwargs): """A wrapper around :func:`ms_deisotope.deconvolution.deconvolute_peaks`. The scan must have had its peaks picked before it can be deconvoluted. Parameters ---------- decon_config : dict, optional Parameters to use to initialize the deconvoluter instance produced by ``deconvoluter_type`` charge_range : tuple of integers, optional The range of charge states to consider. error_tolerance : float, optional PPM error tolerance to use to match experimental to theoretical peaks priority_list : list, optional The set of peaks to target for deconvolution to be able to enforce external constraints on, such as selected precursors for fragmentation. left_search_limit : int, optional The maximum number of neutron shifts to search to the left (decrease) from each query peak right_search_limit : int, optional The maximum number of neutron shifts to search to the right (increase) from each query peak left_search_limit_for_priorities : int, optional The maximum number of neutron shifts to search to the left (decrease) from each query peak for priority targets right_search_limit_for_priorities : None, optional The maximum number of neutron shifts to search to the right (increase) from each query peak for priority targets charge_carrier : float, optional The mass of the charge carrier. Defaults to PROTON truncate_after : float, optional The percentage of the isotopic pattern to include. Defaults to TRUNCATE_AFTER deconvoluter_type : type or callable, optional A callable returning a deconvoluter. Defaults to :class:`~.AveraginePeakDependenceGraphDeconvoluter` **kwargs Additional keywords passed to :func:`~.deconvolute_peaks` Returns ------- Scan Returns self Raises ------ ValueError If :attr:`peak_set` is None, a :class:`ValueError` will be raised indicating that a scan must be centroided before it can be deconvoluted See Also -------- :func:`~.deconvolute_peaks` """ if self.peak_set is None: raise ValueError("Cannot deconvolute a scan that has not been " "centroided. Call `pick_peaks` first.") charge_range = kwargs.get("charge_range", (1, 8)) if self.polarity < 0 and max(charge_range) > 0: charge_range = tuple(c * self.polarity for c in charge_range) kwargs['charge_range'] = charge_range decon_results = deconvolute_peaks(self.peak_set, *args, **kwargs) self.deconvoluted_peak_set = decon_results.peak_set return self
[docs] def pack(self, bind=False) -> 'ProcessedScan': """Pack the (dispersed) representation of the data in this :class:`Scan` into a packed :class:`ProcessedScan` object. .. note:: A reference to :attr:`precursor_information` is passed to the returned :class:`ProcessedScan`, so both objects share it. Because the :attr:`~.PrecursorInformation.product` property works by looking up the scan in :attr:`source`, it's not possible to retrieve the :class:`ProcessedScan` this way. Parameters ---------- bind: bool Whether or not the :class:`ProcessedScan` object should also be bound to :attr:`source` Returns ------- :class:`ProcessedScan` """ precursor_info = self.precursor_information scan = ProcessedScan( self.id, self.title, precursor_info, self.ms_level, self.scan_time, self.index, self.peak_set.pack() if self.peak_set is not None else None, self.deconvoluted_peak_set, self.polarity, self.activation, self.acquisition_information, self.isolation_window, self.instrument_configuration, self.product_scans, self.annotations, source=self.source if bind else None) return scan
# signal transformation
[docs] def reprofile(self, max_fwhm=0.2, dx=0.005, model_cls=None, override_fwhm=None): """Use the picked peaks in :attr:`peak_set` to create a new profile mass spectrum using a peak shape model. Parameters ---------- max_fwhm : float, optional Maximum peak width above which peaks will be ignored dx : float, optional The distance between each new point in m/z space in the reprofiled spectrum model_cls : ms_peak_picker.peak_statistics.PeakShapeModel, optional The peak shape model to use to generate the profile data from the centroided peaks. Defaults a Gaussian model Returns ------- Scan A shallow copy of this scan with its :attr:`arrays` replaced with the new reprofiled arrays Raises ------ ValueError A scan that has not been centroided and is already in profile mode must have its peaks picked before it can be reprofiled. """ if self.peak_set is None and self.is_profile: raise ValueError( "Cannot reprofile a scan that has not been centroided") elif self.peak_set is None and not self.is_profile: self.pick_peaks() if not self.peak_set: arrays = (np.array([], dtype=float), np.array([], dtype=float)) else: arrays = reprofile(self.peak_set, max_fwhm, dx, model_cls, override_fwhm=override_fwhm) scan = WrappedScan( self._data, self.source, arrays, list(self.product_scans), is_profile=True, annotations=self._external_annotations) return scan
[docs] def denoise(self, scale=5.0, window_length=2.0, region_width=10): """Create a shallow copy of the scan with a noise reduction transformation applied. This method uses the scan filter :class:`ms_peak_picker.scan_filter.FTICRBaselineRemoval` which uses the MasSpike noise reduction algorithm. Parameters ---------- scale : float, optional The multiplier of the local noise window to remove window_length : float, optional The width (in m/z) of each window region_width : int, optional The width (in m/z) of each region of windows Returns ------- Scan The denoised version of this scan """ mzs, intensities = self.arrays mzs = mzs.astype(float) intensities = intensities.astype(float) transform = scan_filter.FTICRBaselineRemoval( window_length=window_length, scale=scale, region_width=region_width) mzs, intensities = transform(mzs, intensities) return WrappedScan(self._data, self.source, (mzs, intensities), list(self.product_scans), is_profile=self.is_profile, annotations=self._external_annotations)
[docs] def transform(self, filters=None): """Applies a series of :class:`ms_peak_picker.scan_filter.FilterBase`, or strings that are recognized by :func:`ms_peak_picker.scan_filter.transform` Arguments --------- filters: :class:`Iterable` An iterable of transformations of :class:`ms_peak_picker.scan_filter.FilterBase` or strings. Returns ------- :class:`WrappedScan` """ mzs, intensities = self.arrays mzs = mzs.astype(float) intensities = intensities.astype(float) mzs, intensities = scan_filter.transform( mzs, intensities, filters=filters) return WrappedScan(self._data, self.source, (mzs, intensities), list(self.product_scans), is_profile=self.is_profile, annotations=self._external_annotations)
[docs] def average_with(self, scans, dx=None, weight_sigma=None, num_threads=None): r"""Average together multiple scans' raw data arrays to create a composite intensity profile for a common m/z axis. Parameters ---------- scans: list: A list of :class:`Scan` objects dx : float, optional The distance between each point in the generated common m/z axis. weight_sigma : float, optional When this value is not None, scans are weighted according to a gaussian distribution with a $\sigma$ equal to this value num_threads : int, optional The maximum number of threads to use while averaging signal. Defaults to the number of spectra being averaged or the maximum available from the hardware, whichever is smaller. Returns ------- :class:`AveragedScan` A shallow copy of this scan with its :attr:`arrays` attribute replaced with the averaged array """ default_dx = False if dx is None: dx = DEFAULT_DX # default_dx = True # else: # default_dx = False scans = [self] + list(scans) arrays = [] for scan in scans: if scan.is_profile: if scan.arrays.mz.size == 0: continue arrays.append(scan.arrays) else: scan_arrays = scan.reprofile(dx=dx).arrays if scan_arrays.mz.size == 0: continue arrays.append(scan_arrays) if weight_sigma: if weight_sigma == 1: weight_sigma = 0.025 weights = self._compute_smoothing_weights( scans, mean=self.scan_time, sigma=weight_sigma) else: weights = None if default_dx and arrays: if len(arrays) > 2: reference = arrays[len(arrays) // 2 + 1] else: reference = arrays[0] empirical_dx = decimal_shift(2 * np.median(np.diff(reference.mz))) dx = min(dx, empirical_dx) new_arrays = average_signal( arrays, dx=dx, weights=weights, num_threads=num_threads) indices = [scan.index for scan in scans] return AveragedScan( self._data, self.source, new_arrays, indices, list(self.product_scans), is_profile=True, annotations=self._external_annotations)
def _get_adjacent_scans(self, index_interval=None, rt_interval=None): if index_interval is None and rt_interval is None: raise ValueError( "One of `index_interval` or `rt_interval` must be provided") if self.ms_level > 1: raise ValueError("Cannot average MSn scans at this time") if not self.source: raise ValueError("Can't average an unbound scan") before = [] after = [] if index_interval is not None: before = [] current_index = self.index for _ in range(index_interval): next_scan = self.source.find_previous_ms1(current_index) if next_scan is None: break before.append(next_scan) current_index = next_scan.index before = before[::-1] after = [] current_index = self.index for _ in range(index_interval): try: next_scan = self.source.find_next_ms1(current_index) except ValueError: break if next_scan is None: break after.append(next_scan) current_index = next_scan.index elif rt_interval is not None: reference_time = self.scan_time before = [] current_index = self.index current_time = self.scan_time while abs(reference_time - current_time) < rt_interval and current_index > 0: next_scan = self.source.find_previous_ms1(current_index) if next_scan is None: break before.append(next_scan) current_index = next_scan.index current_time = next_scan.scan_time before = before[::-1] after = [] current_index = self.index current_time = self.scan_time while abs(reference_time - current_time) < rt_interval and current_index > 0: try: next_scan = self.source.find_next_ms1(current_index) except ValueError: break if next_scan is None: break after.append(next_scan) current_index = next_scan.index current_time = next_scan.scan_time else: raise ValueError( "One of `index_interval` or `rt_interval` must be provided") return before, after def _compute_smoothing_weights(self, scans, mean, sigma=0.025): sigma_sqrd_2 = (2 * sigma ** 2) time_array = np.array([s.scan_time for s in scans]) weights = np.exp((-(time_array - mean) ** 2) / sigma_sqrd_2) return weights
[docs] def average(self, index_interval=None, rt_interval=None, dx=None, weight_sigma=None): r"""Average together multiple scans' raw data arrays to create a composite intensity profile for a common m/z axis. Only MS1 scans will be averaged with this method Either an absolute number of scans before and after can be specified using ``index_interval`` or a time window may be specified using ``rt_interval``. Parameters ---------- index_interval : int, optional The number of scans preceding and proceding to average with. rt_interval : float, optional The range of time (in minutes) preceding and proceding to look for other scans to average with. dx : float, optional The distance between each point in the generated common m/z axis. weight_sigma : float, optional When this value is not None, scans are weighted according to a gaussian distribution with a $\sigma$ equal to this value Returns ------- :class:`AveragedScan` A shallow copy of this scan with its :attr:`arrays` attribute replaced with the averaged array """ default_dx = False if dx is None: dx = DEFAULT_DX # default_dx = True else: default_dx = False before, after = self._get_adjacent_scans(index_interval, rt_interval) scans = before + [self] + after arrays = [] for scan in scans: if scan.is_profile: scan_arrays = scan.arrays else: scan_arrays = scan.reprofile(dx=dx).arrays if len(scan_arrays.mz) > 0: arrays.append(scan_arrays) if weight_sigma: if weight_sigma == 1: weight_sigma = 0.025 weights = self._compute_smoothing_weights( scans, mean=self.scan_time, sigma=weight_sigma) else: weights = None if default_dx: if len(arrays) > 2: reference = arrays[len(arrays) // 2 + 1] else: reference = arrays[0] empirical_dx = decimal_shift(2 * np.median(np.diff(reference.mz))) dx = min(dx, empirical_dx) if arrays: new_arrays = average_signal(arrays, dx=dx, weights=weights) else: new_arrays = self.arrays[:] indices = [scan.index for scan in scans] return AveragedScan( self._data, self.source, new_arrays, indices, list(self.product_scans), is_profile=True, annotations=self._external_annotations)
class WrappedScan(Scan): """A wrapper around a :class:`Scan` object with one or more attributes overridden. """ overridable_keys = [ "_arrays", "_id", "_title", "_ms_level", "_scan_time", "_precursor_information", "_index", "_is_profile", "_polarity", "_activation", "_acquisition_information", "_isolation_window", "_instrument_configuration" ] def __init__(self, data, source, array_data, product_scans=None, annotations=None, **overrides): super(WrappedScan, self).__init__( data, source, peak_set=None, deconvoluted_peak_set=None, annotations=annotations, product_scans=product_scans) self._arrays = RawDataArrays(*array_data) self._overrides = overrides for key, value in overrides.items(): if not key.startswith("_"): key = "_" + key if key in self.overridable_keys: setattr(self, key, value) else: warnings.warn("Cannot override attribute %s" % (key,)) def clone(self, deep=True): dup = self.__class__( self._data, self.source, self.arrays, [s.clone(deep=deep) for s in self.product_scans], annotations=self._external_annotations, **self._overrides) if deep: dup.peak_set = self.peak_set.clone() if self.peak_set is not None else None dup.deconvoluted_peak_set = self.deconvoluted_peak_set.clone()\ if self.deconvoluted_peak_set is not None else None else: dup.peak_set = self.peak_set dup.deconvoluted_peak_set = self.deconvoluted_peak_set return dup class AveragedScan(WrappedScan): """An averaged :class:`Scan` object, storing additional information for retrieving the spectra that were averaged together. Attributes ---------- scan_indices: list The :attr:`index` values for all of the scans that were averaged together """ def __init__(self, data, source, array_data, scan_indices, product_scans=None, annotations=None, **overrides): super(AveragedScan, self).__init__( data, source, array_data, product_scans=product_scans, annotations=annotations, **overrides) self.scan_indices = scan_indices def clone(self, deep=True): dup = self.__class__( self._data, self.source, self.arrays, self.scan_indices, [s.clone(deep=deep) for s in self.product_scans], annotations=self._external_annotations, **self._overrides) if deep: dup.peak_set = self.peak_set.clone() if self.peak_set is not None else None dup.deconvoluted_peak_set = self.deconvoluted_peak_set.clone()\ if self.deconvoluted_peak_set is not None else None else: dup.peak_set = self.peak_set dup.deconvoluted_peak_set = self.deconvoluted_peak_set return dup class ProcessedScan(ScanBase): """Container for mass spectral data and associated descriptive information that has been processed and reduced. A :class:`ProcessedScan` object has been processed and compacted. It does not carry a :class:`~.RawDataArrays` :attr:`arrays` attribute, only at least one of :attr:`peak_set` and :attr:`deconvoluted_peak_set`. The rest of its attributes are all loaded in memory directly without needing to reconstruct them from a specific format lazily through a :class:`~.ScanDataSource`. Attributes ---------- deconvoluted_peak_set : :class:`~.DeconvolutedPeakSet` or None Deconvoluted peaks resulting from charge state deconvolution and deisotoping. Will be `None` if deconvolution has not been done. peak_set : :class:`ms_peak_picker.PeakSet` or None Picked peaks and (possibly) associated raw data points as produced by :meth:`pick_peaks`. Will be `None` if peak picking has not been done. product_scans : list A list of :class:`Scan` instances which were produced by fragmenting ions from this one. This attribute is not guaranteed to be populated depending upon how the scan is loaded. source : :class:`ScanDataSource` The object which produced this scan and which defines the methods for retrieving common attributes from the underlying data structures. precursor_information: :class:`PrecursorInformation` or None Descriptive metadata for the ion which was chosen for fragmentation, and a reference to the precursor scan id: str The unique identifier for this scan as given by the source title: str The human-readable display string for this scan as shown in some external software ms_level: int The degree of fragmentation performed. 1 corresponds to a MS1 or "Survey" scan, 2 corresponds to MS/MS, and so on. If :attr:`ms_level` > 1, the scan is considered a "tandem scan" or "MS^n" scan scan_time: float The time the scan was acquired during data acquisition. The unit of time will always be minutes. drift_time: float or None The time measured by the ion mobility spectrometer for this scan or frame. This quantity is None if the scan does not have ion mobility information associated with it, which is usually recorded in :attr:`acquisition_information` index: int The integer number indicating how many scans were acquired prior to this scan. is_profile: bool Whether this scan's raw data points corresponds to a profile scan or whether the raw data was pre-centroided. polarity: int If the scan was acquired in positive mode, the value ``+1``. If the scan was acquired in negative mode, the value ``-1``. May be used to indicating how to calibrate charge state determination methods. activation: :class:`.ActivationInformation` or None If this scan is an MS^n scan, this attribute will contain information about the process used to produce it from its parent ion. instrument_configuration: :class:`~.InstrumentInformation` The instrument configuration used to acquire this scan. acquisition_information: :class:`.ScanAcquisitionInformation` or None Describes the type of event that produced this scan, as well as the scanning method used. isolation_window: :class:`.IsolationWindow` or None Describes the range of m/z that were isolated from a parent scan to create this scan annotations: dict A set of key-value pairs describing the scan not part of the standard interface """ id: str title: str ms_level: int scan_time: float index: int polarity: int precursor_information: Optional[PrecursorInformation] activation: Optional[ActivationInformation] isolation_window: Optional[IsolationWindow] acquisition_information: ScanAcquisitionInformation instrument_configuration: InstrumentInformation annotations: Dict[str, Any] peak_set: Optional[PeakSet] deconvoluted_peak_set: Optional[DeconvolutedPeakSet] def __init__(self, id, title, precursor_information, ms_level, scan_time, index, peak_set, deconvoluted_peak_set, polarity=None, activation=None, acquisition_information=None, isolation_window=None, instrument_configuration=None, product_scans=None, annotations=None, source=None): if product_scans is None: product_scans = [] if annotations is None: annotations = {} self.id = id self.title = title self.precursor_information = precursor_information self.ms_level = ms_level self.scan_time = scan_time self.index = index self.peak_set = peak_set self.deconvoluted_peak_set = deconvoluted_peak_set self.polarity = polarity self.activation = activation self.acquisition_information = acquisition_information self.isolation_window = isolation_window self.instrument_configuration = instrument_configuration self.product_scans = product_scans self.annotations = annotations self.source = source def clear(self, full=False): """Clear storage-heavy attribute values Parameters ---------- full: bool Whether to clear attributes more aggressively to free up space. """ self.peak_set = None self.deconvoluted_peak_set = None self.activation = None self.acquisition_information = None self.isolation_window = None self.instrument_configuration = None self.product_scans = None @property def scan_id(self) -> str: return self.id @property def is_profile(self) -> bool: """Whether this scan's raw data points corresponds to a profile scan or whether the raw data was pre-centroided. """ return False def _resolve_peaks(self) -> Union[PeakSet, DeconvolutedPeakSet, List[DeconvolutedPeak]]: if self.deconvoluted_peak_set is not None: return self.deconvoluted_peak_set elif self.peak_set is not None: return self.peak_set else: return [] def __iter__(self) -> Union[Iterator[DeconvolutedPeak], Iterator[FittedPeak]]: return iter(self._resolve_peaks()) def __getitem__(self, index) -> Union[DeconvolutedPeak, FittedPeak]: return self._resolve_peaks()[index] def __len__(self): return len(self._resolve_peaks()) def has_peak(self, mass, error_tolerance=2e-5) -> Optional[Union[DeconvolutedPeak, FittedPeak]]: """A wrapper around :meth:`~.DeconvolutedPeakSet.has_peak` to query the :class:`~.DeconvolutedPeak` objects picked for this scan. If no deconvoluted peaks are available, but centroided peaks are, this method will instead behave like :class:`Scan.has_peak` Parameters ---------- mass: float The mass to search for error_tolerance: float The parts per million mass error tolerance to use Returns ------- :class:`~.PeakBase` """ if self.deconvoluted_peak_set is not None: return self.deconvoluted_peak_set.has_peak(mass, error_tolerance) elif self.peak_set is not None: return self.peak_set.has_peak(mass, error_tolerance) else: raise ValueError("No peaks available") def __repr__(self): if self.deconvoluted_peak_set is not None: peaks = self.deconvoluted_peak_set elif self.peak_set is not None: peaks = self.peak_set else: peaks = [] pinfo = self.precursor_information if pinfo: pinfo_string = ", %s" % pinfo else: pinfo_string = "" return "ProcessedScan(id=%s, ms_level=%d, %d peaks%s)" % ( self.id, self.ms_level, len(peaks), pinfo_string) def bind(self, source: 'ScanDataSource'): super(ProcessedScan, self).bind(source) self.source = source return self def unbind(self): super(ProcessedScan, self).unbind() self.source = None return self def clone(self, deep=True) -> 'ProcessedScan': """Return a copy of the :class:`ProcessedScan` object, potentially a deep one Parameters ---------- deep: :class:`bool` Returns ------- :class:`Scan` """ dup = self.__class__( self.id, self.title, self.precursor_information, self.ms_level, self.scan_time, self.index, self.peak_set.clone() if self.peak_set is not None else None if deep else self.peak_set, self.deconvoluted_peak_set.clone() if self.deconvoluted_peak_set is not None else None if deep else self.deconvoluted_peak_set, self.polarity, self.activation, self.acquisition_information, self.isolation_window, self.instrument_configuration, list(self.product_scans), self.annotations.copy()) return dup