"""Represent the basic structures of a mass spectrum and its processed contents,
and provide an interface for manipulating that data.
"""
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import warnings
from collections import namedtuple
from ms_deisotope.qc.isolation import CoIsolation
try:
from collections.abc import Sequence as _SequenceABC
except ImportError:
from collections import Sequence as _SequenceABC
from numbers import Number
import numpy as np
from ms_peak_picker import average_signal, FittedPeak, PeakSet
from ms_peak_picker.base import PeakLike
from ms_deisotope.averagine import neutral_mass, mass_charge_ratio
from ms_deisotope.peak_set import DeconvolutedPeak, DeconvolutedPeakSet
from ms_deisotope.deconvolution import deconvolute_peaks
from ms_deisotope.data_source.metadata.scan_traits import _IonMobilityMixin
try:
from ms_deisotope.plot import annotate_scan as _annotate_precursors, draw_raw
except ImportError:
def _missing_matplotlib(*args, **kwargs):
raise ImportError(
"This method requires matplotlib. Please install it.")
_annotate_precursors = _missing_matplotlib
draw_raw = _missing_matplotlib
from ms_deisotope.utils import Constant
if TYPE_CHECKING:
from ms_deisotope.data_source.scan.loader import ScanDataSource
from ms_deisotope.qc.isolation import CoIsolation
DEFAULT_CHARGE_WHEN_NOT_RESOLVED = 1
ChargeNotProvided = Constant("ChargeNotProvided")
[docs]class ScanBunch(namedtuple("ScanBunch", ["precursor", "products"])):
"""Represents a single MS1 scan and all MSn scans derived from it,
or a collection of related MSn scans.
Attributes
----------
precursor: :class:`~.ScanBase`
A single MS1 scan which may have undergone MSn
products: list
A list of 0 or more :class:`~.ScanBase` objects which were derived
from :attr:`precursor` or another element of this list derived
from it.
"""
precursor: 'ScanBase'
produces: List['ScanBase']
_id_map: Dict[str, 'ScanBase']
def __new__(cls, *args, **kwargs):
inst = super(ScanBunch, cls).__new__(cls, *args, **kwargs)
inst._id_map = {}
if inst.precursor is not None:
inst._id_map[inst.precursor.id] = inst.precursor
for scan in inst.products:
inst._id_map[scan.id] = scan
return inst
[docs] def precursor_for(self, scan: 'ScanBase') -> Optional['ScanBase']:
"""Find the precursor :class:`~.ScanBase` instance
for the given scan object
Parameters
----------
scan : :class:`~.ScanBase`
The MSn scan to look for the MSn-1 scan for
Returns
-------
:class:`~.ScanBase`
"""
if scan.precursor_information is not None:
scan_id = scan.precursor_information.precursor_scan_id
return self.get_scan_by_id(scan_id)
return None
[docs] def get_scan_by_id(self, scan_id: str) -> 'ScanBase':
"""Retrieve the scan object for the specified scan id from this
group in memory.
Parameters
----------
scan_id : str
The unique scan id value to be retrieved
Returns
-------
:class:`~.ScanBase`
"""
return self._id_map[scan_id]
[docs] def annotate_precursors(self, nperrow=4, ax=None):
"""Plot the spectra in this group as a grid, with the full
MS1 spectrum in profile in the top row, and each MSn spectrum's
precursor ion revealed in a grid panel below, with isolation
window and selected ion/monoisotopic peak annotated.
Parameters
----------
nperrow: :class:`int`
The number of precursors to annotate per row
in the grid.
ax: :class:`matplotlib._axes.Axes`, optional
The axis to draw on. If not provided, a new figure
will be created, along with a new axis.
Returns
-------
:class:`matplotlib._axes.Axes`
"""
return _annotate_precursors(
self.precursor, self.products, nperrow=nperrow, ax=ax)
def _repr_pretty_(self, p, cycle): # pragma: no cover
if cycle:
p.text("ScanBunch(...)")
return
p.text("ScanBunch(\n")
with p.group(2):
with p.group(4, "precursor=\n"):
p.pretty(self.precursor)
with p.group(4, ",\nproducts=\n"):
p.pretty(self.products)
p.text(")")
[docs] def pack(self):
"""Build a new :class:`ScanBunch` where each scan in it is returned by calling
:meth:`~.Scan.pack`
Returns
-------
:class:`ScanBunch`
"""
precursor = self.precursor.pack()
products = [p.pack() for p in self.products]
inst = self.__class__(precursor, products)
precursor.product_scans = products
return inst
[docs]class RawDataArrays(namedtuple("RawDataArrays", ['mz', 'intensity'])):
"""Represent the m/z and intensity arrays associated with a raw
mass spectrum.
Supports scaling and summing, as well as low level m/z search.
Thin wrapper around a ``namedtuple``, so this object supports
the same interfaces as a tuple.
Attributes
----------
mz: :class:`np.ndarray`
The m/z axis of a mass spectrum
intensity: :class:`np.ndarray`
The intensity measured at the corresponding m/z of a mass spectrum
"""
mz: np.ndarray
intensity: np.ndarray
data_arrays: Dict[str, np.ndarray]
def __new__(cls, mz, intensity, data_arrays=None):
inst = super(RawDataArrays, cls).__new__(cls, mz, intensity)
inst.data_arrays = dict()
if data_arrays:
inst.data_arrays.update(data_arrays)
return inst
def __copy__(self) -> 'RawDataArrays':
inst = self.__class__(self.mz.copy(), self.intensity.copy(), {
k: v.copy() for k, v in self.data_arrays.items()
})
return inst
[docs] def has_array(self, array_type):
"""Check if this array set contains an array of the
requested type.
This method uses the semantic lookup mechanism to test
"is-a" relationships so if a more abstract term is used,
a wider range of terms may be matched.
Parameters
----------
array_type : str or :class:`~.Term`
The array type name to test.
Returns
-------
bool
"""
from ms_deisotope.data_source.metadata.scan_traits import binary_data_arrays
try:
term = binary_data_arrays[array_type]
except KeyError:
warnings.warn("Array type %r could not be resolved, treating as a plain string" % (array_type, ))
return array_type in self.binary_data_arrays
if self.mz is not None and len(self.mz):
k = binary_data_arrays['m/z array']
if term.is_a(k):
return k
if self.intensity is not None and len(self.intensity):
k = binary_data_arrays['intensity array']
if term.is_a(k):
return k
for k in self.data_arrays:
try:
k = binary_data_arrays[k]
if term.is_a(k):
return k
except KeyError:
if term == k:
return k
return False
[docs] def copy(self):
"""Make a deep copy of this object.
Returns
-------
:class:`RawDataArray`
"""
return self.__copy__()
def _slice(self, i):
inst = self.__class__(self.mz[i], self.intensity[i], {
k: v[i] for k, v in self.data_arrays.items()
})
return inst
[docs] def plot(self, *args, **kwargs):
"""Draw the profile spectrum described by the
contained arrays.
Parameters
----------
ax: :class:`matplotlib._axes.Axes`
The figure axes onto which to draw the plot. If not provided,
this will default to the current figure interactively.
**kwargs
All keywords are forwarded to :meth:`plot` on ``ax``.
Returns
-------
:class:`matplotlib._axes.Axes`
The axes drawn on
"""
ax = draw_raw(self, *args, **kwargs)
return ax
def __eq__(self, other):
try:
return np.allclose(
self[0], other[0]) and np.allclose(
self[1], other[1])
except ValueError:
return False
def __ne__(self, other):
return not (self == other)
def __mul__(self, i):
return self.__class__(self.mz, self.intensity * i)
def __div__(self, d):
return self.__class__(self.mz, self.intensity / d)
def __add__(self, other):
if len(self.mz) == len(other.mz) and np.allclose(self.mz, other.mz):
return self.__class__(self.mz, self.intensity + other.intensity)
else:
return self.__class__(*average_signal([self, other])) * 2
[docs] def find_mz(self, mz: float) -> int:
"""Find the nearest index to the query ``mz``
Parameters
----------
mz : float
The m/z value to search for
Returns
-------
int
The index nearest to the query m/z
"""
n = len(self.mz)
lo = 0
hi = n
while hi != lo:
mid = int((hi + lo) // 2)
y = self.mz[mid]
err = y - mz
if abs(err) < 0.1:
best_index = mid
best_err = abs(err)
i = mid
while i >= 0:
y = self.mz[i]
err = y - mz
if err <= -0.1:
break
abs_err = abs(err)
if abs_err < best_err:
best_err = abs_err
best_index = i
i -= 1
i = mid
while i < n:
y = self.mz[i]
err = y - mz
if err >= 0.1:
break
abs_err = abs(err)
if abs_err < best_err:
best_err = abs_err
best_index = i
i += 1
return best_index
elif hi - lo == 1:
return mid
elif err > 0:
hi = mid
else:
lo = mid
return 0
[docs] def between_mz(self, low: float, high: float) -> 'RawDataArrays':
"""Returns a slice of the arrays between ``low`` and ``high``
m/z
Parameters
----------
low : float
The lower bound m/z
high : float
The upper bound m/z
Returns
-------
:class:`.RawDataArrays`
"""
i = self.find_mz(low)
j = self.find_mz(high) + 1
if not (low <= self.mz[i] <= high):
i += 1
return self.__class__(self.mz[i:j], self.intensity[i:j])
[docs] @classmethod
def empty(cls) -> 'RawDataArrays':
"""Create a new, empty instance.
Returns
-------
:class:`RawDataArrays`
"""
return cls(np.array([]), np.array([]))
def __getitem__(self, i):
if isinstance(i, int):
return super(RawDataArrays, self).__getitem__(i)
elif isinstance(i, (slice, list, tuple, np.ndarray)):
return self._slice(i)
else:
return self.data_arrays[i]
@property
def size(self) -> int:
return self.mz.size
class ScanBase(object):
"""Abstract base class for Scan-like objects
"""
source: 'ScanDataSource'
peak_set: Optional[PeakSet]
deconvoluted_peak_set: Optional[DeconvolutedPeakSet]
def has_ion_mobility(self) -> bool:
"""Check whether this scan has drift time information associated with
it.
If this scan has been aggregated, it will only check the first scan in
the aggregate.
"""
acq = self.acquisition_information
if acq is None:
return False
scan_event = acq[0]
return scan_event.has_ion_mobility()
@property
def ion_mobility_type(self):
"""Fetch the ion mobility type of the scan.
Returns
-------
ims_type: :class:`ScanAttribute` or :const:`None`
The ion mobility type, or :const:`None`
"""
acq = self.acquisition_information
if acq is None:
return None
scan_event = acq[0]
return scan_event.ion_mobility_type
@property
def drift_time(self) -> Optional[float]:
"""A convenience method to access the first
scan event to retrieve its drift time.
Returns
-------
float or None
"""
acq = self.acquisition_information
if acq is None:
return None
scan_event = acq[0]
return scan_event.drift_time
@property
def scan_id(self) -> str:
"""An alias for :attr:`id`
"""
return self.id
@property
def tic(self) -> 'TICMethods':
"""A facade function for calculating the total ion current (TIC) of a spectrum.
This exposes a facade object of type :class:`TICMethods` to take care of the different
ways in which the TIC may be calculated.
Returns
-------
:class:`TICMethods`
Examples
--------
Just directly calling the `tic` attribute will use the most refined data source
to calculate the TIC. This means that if the TIC is recalculated after refinement,
the number may change.
>>> from ms_deisotope.test.common import example_scan_bunch
>>> bunch = example_scan_bunch()
>>> bunch.precursor.tic()
8886549.0
>>> bunch.precursor.tic.raw()
8886549.0
The picked peaks can be used through :meth:`TICMethods.centroided`, which take
priority over the raw signal when calling :meth:`tic` directly.
>>> bunch.precursor.pick_peaks()
>>> bunch.precursor.tic.centroided()
8886548.890350103
The deconvoluted peaks can be used through :meth:`TICMethods.deconvoluted`.
>>> bunch.precursor.deconvolute(use_quick_charge=True)
>>> bunch.precursor.tic.deconvoluted()
8195619.241884331
>>> bunch.precursor.tic()
8195619.241884331
"""
return TICMethods(self)
@property
def base_peak(self) -> 'BasePeakMethods':
"""A facade function for calculating the base peak, the most abundant peak,
of a spectrum.
This exposes a facade object of type :class:`BasePeakMethods` to take care of
the different ways in which the base peak may be calculated. The interface of
this object is the same as the interface exposed by the :attr:`tic` attribute,
but instead of returning a scalar float, it returns a :class:`~.PeakLike` object.
Returns
-------
:class:`BasePeakMethods`
See Also
--------
:attr:`tic`
"""
return BasePeakMethods(self)
@property
def plot(self) -> 'PlottingMethods':
"""A facade function for plotting different layers of signal in the scan from raw,
centroided, and deconvoluted representation, as well as limited precursor annotation.
Returns
-------
:class:`PlottingMethods`
"""
return PlottingMethods(self)
@property
def peaks(self) -> 'PeakSetMethods':
"""Automatically locate the most refined representation of the signal from the :class:`Scan`
object.
Returns
-------
:class:`PeakSetMethods`
"""
return PeakSetMethods(self)
def copy(self, deep=True) -> 'ScanBase':
"""Return a deep copy of the :class:`Scan` object
wrapping the same reference data.
Returns
-------
:class:`ScanBase`
"""
return self.clone(deep)
def __copy__(self) -> 'ScanBase':
return self.clone()
def __eq__(self, other) -> bool:
if other is None:
return False
if not isinstance(other, ScanBase):
return False
try:
eq = (self.scan_id == other.scan_id) and (
abs(self.scan_time - other.scan_time) < 1e-3) and (
self.index == other.index) and (
self.ms_level == other.ms_level)
if not eq:
return False
except AttributeError:
return False
try:
eq = self.arrays == other.arrays
if not eq:
return False
except AttributeError:
# ProcessedScan doesn't have an arrays attribute
pass
try:
eq = self.peak_set == other.peak_set
if not eq:
return False
except AttributeError:
if ((self.peak_set is None and other.peak_set is not None) or (
self.peak_set is not None and other.peak_set is None)):
pass
else:
return False
try:
eq = self.deconvoluted_peak_set == other.deconvoluted_peak_set
if not eq:
return False
except AttributeError:
if ((self.deconvoluted_peak_set is None and other.deconvoluted_peak_set is not None) or (
self.deconvoluted_peak_set is not None and other.deconvoluted_peak_set is None)):
pass
else:
return False
eq = self.precursor_information == other.precursor_information
if not eq:
return False
eq = self.isolation_window == other.isolation_window
if not eq:
return False
try:
a = self.acquisition_information
b = other.acquisition_information
if a is not None and b is not None:
eq = a == b
else:
eq = True
if not eq:
return False
except AttributeError:
pass
try:
a = self.activation
b = other.activation
if a is not None and b is not None:
eq = a == b
else:
eq = True
if not eq:
return False
except AttributeError:
pass
return True
def __ne__(self, other):
return not (self == other)
__hash__ = None
# Scan objects shouldn't be hashed.
def bind(self, source: 'ScanDataSource') -> 'ScanBase':
"""Attach this object and its other referent members
to ``source``, letting them load information.
"""
if self.precursor_information is not None:
self.precursor_information.bind(source)
return self
def unbind(self) -> 'ScanBase':
"""Detattch this object and its other referent members
from their currently bound :attr:`source`.
This may cause errors if more information is requested but is not
cached, or if requesting another :class:`ScanBase` be loaded.
"""
if self.precursor_information is not None:
self.precursor_information.unbind()
return self
@property
def source_file_name(self) -> Optional[str]:
"""Get the name of the source file this :class:`~.ScanBase` object is bound to
Returns
-------
:class:`str`
"""
source = self.source
if source is None:
return None
return source.source_file_name
class TICMethods(object):
"""A helper class that will figure out the most refined signal source to
calculate the total ion current from.
"""
def __init__(self, scan):
self.scan = scan
def _peak_sequence_tic(self, peaks) -> float:
total = 0
for peak in peaks:
total += peak.intensity
return total
def _simple_tic(self, points) -> float:
return sum(points)
def _tic_raw_data_arrays(self, arrays) -> float:
return arrays.intensity.sum()
def __call__(self) -> float:
return self._guess()
def _guess(self) -> float:
"""Guess which strategy to use to calculate the most refined representation of
the TIC.
Returns
-------
float
"""
try:
return self.deconvoluted()
except (AttributeError, TypeError):
pass
try:
return self.centroided()
except (AttributeError, TypeError):
pass
try:
return self.raw()
except (AttributeError, TypeError):
pass
points = list(self.scan)
if points:
# This may not work if PeakLike is recognizing an external peak-like object
# that is not derived from PeakBase and C-extensions are enabled?
if isinstance(points[0], PeakLike):
return self._peak_sequence_tic(points)
elif isinstance(points[0], Number):
return self._simple_tic(points)
else:
raise TypeError(
"Cannot determine how to calculate a TIC from %r of type %r" % (
self.scan, type(self.scan)))
else:
raise TypeError(
"Cannot determine how to calculate a TIC from %r of type %r" % (
self.scan, type(self.scan)))
def raw(self) -> float:
"""Calculate the TIC from the raw intensity signal of the spectrum with no processing.
Returns
-------
float
"""
return self._tic_raw_data_arrays(self.scan.arrays)
def centroided(self) -> float:
"""Calculate the TIC from the picked peak list of the spectrum.
Returns
-------
float
"""
return self._peak_sequence_tic(self.scan.peak_set)
def deconvoluted(self) -> float:
"""Calculate the TIC from the deconvoluted peak list of the spectrum.
Returns
-------
float
"""
return self._peak_sequence_tic(self.scan.deconvoluted_peak_set)
class BasePeakMethods(object):
"""A helper class that will figure out the most refined signal source to
calculate the base peak from.
"""
base_peak_t = namedtuple("BasePeak", ("mz", "intensity"))
def __init__(self, scan):
self.scan = scan
def _peak_sequence_bp(self, peaks) -> float:
if not peaks:
return None
peak = max(peaks, key=lambda x: x.intensity)
return peak
def _bp_raw_data_arrays(self, arrays) -> float:
i = np.argmax(arrays.intensity)
return self.base_peak_t(arrays.mz[i], arrays.intensity[i])
def __call__(self) -> float:
return self._guess()
def _guess(self) -> float:
"""Guess which strategy to use to produce the most refined representation
of the base peak.
Returns
-------
:class:`~.PeakLike`
"""
try:
return self.deconvoluted()
except (AttributeError, TypeError):
pass
try:
return self.centroided()
except (AttributeError, TypeError):
pass
try:
return self.raw()
except (AttributeError, TypeError):
pass
points = list(self.scan)
if points:
if isinstance(points[0], PeakLike):
return self._peak_sequence_bp(points)
raise TypeError(
"Cannot determine how to calculate a base peak from %r of type %r" % (
self.scan, type(self.scan)))
else:
raise TypeError(
"Cannot determine how to calculate a base peak from %r of type %r" % (
self.scan, type(self.scan)))
def raw(self) -> float:
"""Calculate the base peak from the raw intensity signal of the spectrum with no processing.
Returns
-------
:class:`~.PeakLike`
"""
return self._bp_raw_data_arrays(self.scan.arrays)
def centroided(self) -> float:
"""Calculate the base peak from the picked peak list of the spectrum.
Returns
-------
:class:`~.FittedPeak`
"""
return self._peak_sequence_bp(self.scan.peak_set)
def deconvoluted(self) -> float:
"""Calculate the base peak from the deconvoluted peak list of the spectrum.
Returns
-------
:class:`~.DeconvolutedPeak`
"""
return self._peak_sequence_bp(self.scan.deconvoluted_peak_set)
class PeakSetMethods(_SequenceABC):
"""A facade to simplify determining how to call common peak-set methods
on an object like :class:`~.Scan` which may have multiple tiers of
representation of the peaks it contains, while still supporting directly
using real PeakSet-like objects.
In addition to the three common peak searching methods, this type supports
:class:`~.Sequence` operations.
Attributes
----------
scan: object
The scan containing peaks to manipulate
is_scan: bool
Whether or not :attr:`scan` is an instance of :class:`~.ScanBase`
is_deconvoluted: bool
Whether or not :attr:`scan` has been deconvoluted.
is_centroided: bool
Whether or not :attr:`scan` has been centroided (undergone peak picking).
is_raw_sequence: bool
Whether or not :attr:`scan` is of a known type or just a sequence.
"""
def __init__(self, scan):
self.scan = scan
self.is_scan = self._is_scan()
self.is_deconvoluted = self._is_deconvoluted()
self.is_centroided = self._is_centroided()
self.is_raw_sequence = self._is_raw_sequence()
def _is_scan(self):
return isinstance(self.scan, ScanBase)
def _is_deconvoluted(self):
if self.is_scan:
return self.scan.deconvoluted_peak_set is not None
else:
from ms_deisotope.peak_set import DeconvolutedPeakSet
return isinstance(self.scan, DeconvolutedPeakSet)
def deconvoluted(self):
"""Get the deconvoluted peak set
Returns
-------
:class:`~.DeconvolutedPeakSet`
"""
if self.is_deconvoluted:
return self._get_deconvoluted()
return None
def centroided(self):
"""Get the centroided peak set
Returns
-------
:class:`~.PeakSet` or :class:`~.PeakIndex`
"""
if self.is_centroided:
return self._get_centroided()
def raw(self):
"""Get the raw signal, or lacking that, the simplest representation available.
Returns
-------
objects
"""
if self.is_scan:
return self.scan.arrays
elif self.is_raw_sequence:
return self.scan
else:
return None
def _get_deconvoluted(self):
if self.is_scan:
return self.scan.deconvoluted_peak_set
elif self.is_deconvoluted:
return self.scan
else:
return None
def _is_centroided(self):
if self.is_scan:
return self.scan.peak_set is not None
else:
from ms_peak_picker import PeakIndex, PeakSet
return isinstance(self.scan, (PeakIndex, PeakSet))
def _get_centroided(self):
if self.is_scan:
return self.scan.peak_set
elif self.is_centroided:
return self.scan
else:
return None
def _is_raw_sequence(self):
if self.is_scan:
return False
else:
if hasattr(self.scan, 'has_peak'):
return False
return isinstance(self.scan, _SequenceABC)
def get_nearest_peak(self, m):
if self.is_deconvoluted:
return self._get_deconvoluted().get_nearest_peak(m)
elif self.is_centroided:
return self._get_centroided().get_nearest_peak(m)
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return self._get_centroided().get_nearest_peak(m)
elif self.is_raw_sequence:
if not hasattr(self.scan, 'get_nearest_peak'):
raise NotImplementedError()
return self.scan.get_nearest_peak(m)
else:
raise NotImplementedError()
def has_peak(self, m, error_tolerance=2e-5):
"""Search the most refined representation available for a peak at the
given mass dimension coordinates with the specified parts-per-million
error tolerance.
This method invokes the underlying collection's ``has_peak`` method,
and may not expose all specialized parameters. If the underlying collection
cannot support this operation, an error will be raised.
Parameters
----------
m : float
The mass dimension coordinates to search for. Depending upon if the
peaks have been deconvoluted, this may be m/z or neutral mass.
error_tolerance : float, optional
The parts-per-million error tolerance to apply (the default is 2e-5)
Returns
-------
:class:`~.PeakBase`
See Also
--------
:meth:`~.DeconvolutedPeakSet.has_peak`
:meth:`~.PeakSet.has_peak`
Raises
------
NotImplementedError
When the underlying collection does not support calling ``has_peak``
"""
if self.is_deconvoluted:
return self._get_deconvoluted().has_peak(m, error_tolerance)
elif self.is_centroided:
return self._get_centroided().has_peak(m, error_tolerance)
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return self._get_centroided().has_peak(m, error_tolerance)
elif self.is_raw_sequence:
if not hasattr(self.scan, 'has_peak'):
raise NotImplementedError()
return self.scan.has_peak(m, error_tolerance)
else:
raise NotImplementedError()
def all_peaks_for(self, m, error_tolerance=2e-5):
"""Search the most refined representation available for all peaks at the
given mass dimension coordinates with the specified parts-per-million
error tolerance.
This method invokes the underlying collection's ``all_peaks_for`` method,
and may not expose all specialized parameters. If the underlying collection
cannot support this operation, an error will be raised.
Parameters
----------
m : float
The mass dimension coordinates to search for. Depending upon if the
peaks have been deconvoluted, this may be m/z or neutral mass.
error_tolerance : float, optional
The parts-per-million error tolerance to apply (the default is 2e-5)
Returns
-------
:class:`tuple` of :class:`~.PeakBase`
See Also
--------
:meth:`~.DeconvolutedPeakSet.all_peaks_for`
:meth:`~.PeakSet.all_peaks_for`
Raises
------
NotImplementedError
When the underlying collection does not support calling ``all_peaks_for``
"""
if self.is_deconvoluted:
return self._get_deconvoluted().all_peaks_for(m, error_tolerance)
elif self.is_centroided:
return self._get_centroided().all_peaks_for(m, error_tolerance)
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return self._get_centroided().all_peaks_for(m, error_tolerance)
elif self.is_raw_sequence:
if not hasattr(self.scan, 'all_peaks_for'):
raise NotImplementedError()
return self.scan.all_peaks_for(m, error_tolerance)
else:
raise NotImplementedError()
def between(self, lo, hi, **kwargs):
"""Search the most refined representation available for all peaks at the
between the given low and high mass dimension coordinates.
This method invokes the underlying collection's ``between`` method,
and may not expose all specialized parameters. If the underlying collection
cannot support this operation, an error will be raised.
Parameters
----------
lo : float
The lower bound mass dimension coordinates to search for.
Depending upon if the peaks have been deconvoluted, this may be m/z
or neutral mass.
hi : float
The upper bound mass dimension coordinates to search for.
Depending upon if the peaks have been deconvoluted, this may be m/z
or neutral mass.
**kwargs
Forwarded to the underlying collection's ``between`` method.
Returns
-------
object
See Also
--------
:meth:`~.DeconvolutedPeakSet.between`
:meth:`~.PeakSet.between`
Raises
------
NotImplementedError
When the underlying collection does not support calling ``between``
"""
if self.is_deconvoluted:
return self._get_deconvoluted().between(lo, hi, **kwargs)
elif self.is_centroided:
return self._get_centroided().between(lo, hi, **kwargs)
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return self._get_centroided().between(lo, hi, **kwargs)
elif self.is_raw_sequence:
if not hasattr(self.scan, 'between'):
raise NotImplementedError()
return self.scan.between(lo, hi, **kwargs)
else:
raise NotImplementedError()
def __len__(self):
if self.is_raw_sequence:
return len(self.scan)
elif self.is_deconvoluted:
return len(self._get_deconvoluted())
elif self.is_centroided:
return len(self._get_centroided())
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return len(self._get_centroided())
elif self.is_raw_sequence:
return len(self.scan)
else:
raise NotImplementedError()
def __getitem__(self, i):
if self.is_raw_sequence:
return (self.scan)[i]
elif self.is_deconvoluted:
return (self._get_deconvoluted())[i]
elif self.is_centroided:
return (self._get_centroided())[i]
elif self.is_scan:
self.scan.pick_peaks()
self.is_centroided = self._is_centroided()
return (self._get_centroided())[i]
elif self.is_raw_sequence:
return self.scan[i]
else:
raise NotImplementedError()
def __call__(self):
if self.is_deconvoluted:
return self.deconvoluted()
elif self.is_centroided:
return self.centroided()
else:
return self.raw()
class PlottingMethods(object):
"""A plotting method facade that knows how to draw different facets of a spectrum.
When called directly, the behavior is the same as calling :meth:`raw`, :meth:`centroided`,
and meth:`deconvoluted` with a shared ``ax`` argument and common ``**kwargs``.
"""
def __init__(self, scan):
self.scan = scan
self._plot_api = None
from ms_deisotope import plot
self._plot_api = plot
def raw(self, *args, **kwargs):
"""Draws un-centroided profile data, visualizing continuous
data points
Parameters
----------
ax : :class:`matplotlib.Axes`, optional
The axis to draw the plot on. If missing, a new one will be created using
:func:`matplotlib.pyplot.subplots`
pretty: bool, optional
If `True`, will call :func:`_beautify_axes` on `ax`
normalize: bool, optional
if `True`, will normalize the abundance dimension to be between 0 and 100%
**kwargs
Passed to :meth:`matplotlib.Axes.plot`
Returns
-------
:class:`~.Axes`
See Also
--------
:func:`ms_deisotope.plot.draw_raw`
"""
return self._plot_api.draw_raw(self.scan.arrays, *args, **kwargs)
def centroided(self, *args, **kwargs):
"""Draws centroided peak data, visualizing peak apexes.
If :attr:`scan.peak_set` is not populated, this will fail.
Parameters
----------
ax : matplotlib.Axes, optional
The axis to draw the plot on. If missing, a new one will be created using
:func:`matplotlib.pyplot.subplots`
pretty: bool, optional
If `True`, will call :func:`_beautify_axes` on `ax`
normalize: bool, optional
if `True`, will normalize the abundance dimension to be between 0 and 100%
**kwargs
Passed to :meth:`matplotlib.Axes.plot`
Returns
-------
matplotlib.Axes
See Also
--------
:func:`ms_deisotope.plot.draw_peaklist`
"""
return self._plot_api.draw_peaklist(self.scan.peak_set, *args, **kwargs)
def deconvoluted(self, *args, **kwargs):
"""Draws deconvoluted peak data, visualizing collapsed envelope monoisotopic peaks.
If :attr:`scan.peak_set` is not populated, this will fail.
Parameters
----------
ax : matplotlib.Axes, optional
The axis to draw the plot on. If missing, a new one will be created using
:func:`matplotlib.pyplot.subplots`
pretty: bool, optional
If `True`, will call :func:`_beautify_axes` on `ax`
normalize: bool, optional
if `True`, will normalize the abundance dimension to be between 0 and 100%
**kwargs
Passed to :meth:`matplotlib.Axes.plot`
Returns
-------
matplotlib.Axes
See Also
--------
:func:`ms_deisotope.plot.draw_peaklist`
"""
return self._plot_api.draw_peaklist(self.scan.deconvoluted_peak_set, *args, **kwargs)
def annotate_precursor(self, *args, **kwargs):
"""Draw a zoomed-in view of the precursor scan of :attr:`scan` surrounding the
area around each precursor ion that gave rise to :attr:`scan`
with monoisotopic peaks and isolation windows marked.
Parameters
----------
ax: :class:`matplotlib._axes.Axes`
An :class:`~.Axes` object to draw the plot on
Returns
-------
:class:`matplotlib._axes.Axes`
See Also
--------
:func:`ms_deisotope.plot.annotate_scan_single`
"""
precursor = kwargs.pop("precursor", None)
if precursor is None:
pinfo = self.scan.precursor_information
if pinfo is None:
return
precursor = pinfo.precursor
return self._plot_api.annotate_scan_single(precursor, self.scan, *args, **kwargs)
def label_peaks(self, *args, **kwargs):
"""Label a region of the peak list, marking centroids with their m/z or mass. If the peaks
of `scan` have been deconvoluted, the most abundant peak will be annotated with
"<neutral mass> (<charge>)", otherwise just "<m/z>".
Parameters
----------
scan : :class:`~.ScanBase`
The scan to annotate
min_mz : float, optional
The minimum m/z to annotate
max_mz : float, optional
The maximum m/z to annotate
ax: :class:`matplotlib._axes.Axes`
An :class:`~.Axes` object to draw the plot on
is_deconvoluted : bool, optional
Whether or not to always use :attr:`Scan.deconvoluted_peak_set`
threshold : float, optional
The intensity threshold under which peaks will be ignored
Returns
-------
ax: :class:`matplotlib._axes.Axes`
The axes the plot was drawn on
annotations: :class:`list` of :class:`matplotlib.text.Text`
The list of :class:`matplotlib.text.Text` annotations
"""
return self._plot_api.label_peaks(self.scan, *args, **kwargs)
def annotate_isotopic_peaks(self, *args, **kwargs):
"""Mark distinct isotopic peaks from the :class:`~.DeconvolutedPeakSet`
in ``scan``.
Parameters
----------
color_cycle: :class:`~.Iterable`
An iterable to draw isotopic cluster colors from
ax: :class:`matplotlib._axes.Axes`
An :class:`~.Axes` object to draw the plot on
Returns
-------
:class:`matplotlib._axes.Axes`
See Also
--------
:func:`ms_deisotope.plot.annotate_isotopic_peaks`
"""
return self._plot_api.annotate_isotopic_peaks(self.scan, *args, **kwargs)
def _guess(self, ax=None, **kwargs):
try:
if self.scan.arrays:
if self.scan.is_profile:
ax = self.raw(ax=ax)
except (AttributeError, TypeError):
pass
try:
if self.scan.peak_set:
ax = self.centroided(ax=ax)
except (AttributeError, TypeError):
pass
try:
if self.scan.deconvoluted_peak_set:
ax = self.deconvoluted(ax=ax)
except (AttributeError, TypeError):
pass
if ax is None:
try:
if not self.scan.is_profile:
self.scan.pick_peaks()
ax = self.centroided(ax=ax)
except (AttributeError, TypeError):
pass
return ax
def __call__(self, ax=None, **kwargs):
return self._guess(ax=ax, **kwargs)
try:
from ms_deisotope._c.utils import _peak_sequence_tic, _peak_sequence_bp
TICMethods._peak_sequence_tic = _peak_sequence_tic
BasePeakMethods._peak_sequence_bp = _peak_sequence_bp
except ImportError:
pass