Source code for ms_deisotope.peak_set

import operator
import math
from collections import namedtuple
from typing import Any, List, Optional, Sequence, TYPE_CHECKING

from brainpy import mass_charge_ratio, neutral_mass as calc_neutral_mass, PROTON

from ms_peak_picker import simple_peak, PeakSet as FittedPeakSet
from ms_deisotope.peak_dependency_network.intervals import SimpleInterval

from ms_deisotope.scoring import IsotopicFitRecord

from .utils import Base, ppm_error


class _Index(object):
    """
    Stores the ordered index of an object under sort by `mz` and
    sort by `neutral_mass`

    Attributes
    ----------
    mz : int
        Index of this object (or its parent), ordered by mz
    neutral_mass : int
        Index of this object (or its parent), ordered by neutral mass
    """

    __slots__ = ["neutral_mass", "mz"]

    neutral_mass: int
    mz: int

    def __init__(self, neutral_mass=None, mz=None):
        self.neutral_mass = neutral_mass
        self.mz = mz

    def __reduce__(self):
        return self.__class__, (self.neutral_mass, self.mz)

    def __repr__(self):
        return "%d|%d" % (self.neutral_mass, self.mz)

    def clone(self):
        return self.__class__(self.neutral_mass, self.mz)


EnvelopePair = namedtuple("EnvelopePair", ("mz", "intensity"))


class Envelope(Sequence[EnvelopePair]):
    """
    Represents a sequence of (mz, intensity) pairs which store peak positions
    that were matched. Since these peaks may later be mutated by subtraction,
    this structure stores copies of the numbers

    Supports the Sequence protocol, and is read-only.

    Attributes
    ----------
    pairs : tuple of tuple of (float, float)
        list of (mz, intensity) pairs
    """

    __slots__ = ['pairs']

    def __init__(self, pairs):
        self.pairs = [EnvelopePair(*x) for x in pairs]

    def __reduce__(self):
        return self.__class__, (self.pairs,)

    def __getitem__(self, i):
        return self.pairs[i]

    def __iter__(self):
        return iter(self.pairs)

    def __len__(self):
        return len(self.pairs)

    def __repr__(self):
        return "[%s]" % (', '.join("(%0.4f, %0.2f)" % t for t in self),)

    def clone(self):
        return self.__class__(self)


class DeconvolutedPeak(Base):
    """Represent a single deconvoluted peak which represents an aggregated isotopic
    pattern collapsed to its monoisotopic peak, with a known charge state

    Attributes
    ----------
    a_to_a2_ratio : float
        Ratio of intensities of A peak to A+2 peak
    average_mass : float
        The averaged neutral mass of the composition, the weighted average
        of the envelope peaks.
    charge : int
        The signed charge state of the isotopic pattern
    envelope : Envelope
        The sequence of (mz, intensity) pairs which map to this peak
    full_width_at_half_max : float
        The averaged full width at half max of this isotopic pattern
    index : _Index
        The position of this peak in different orderings
    intensity : float
        The summed height of the peaks in the isotopic pattern this peak captures
    most_abundant_mass : float
        The neutral mass of the most abundant peak in the isotopic pattern this peak captures
    mz : float
        The mass-charge-ratio of the monoisotopic peak
    neutral_mass : float
        The neutral mass of the monoisotopic peak
    score : float
        An assigned value describing the quality of this peak's fit. The semantics of this score
        depends upon the scoring function
    signal_to_noise : float
        The average signal-to-noise ratio of the peaks in the isotopic pattern this peak captures
    """

    __slots__ = [
        "neutral_mass", "intensity", "signal_to_noise",
        "index", "full_width_at_half_max", "charge",
        "a_to_a2_ratio", "most_abundant_mass", "average_mass",
        "score", "envelope", "mz", "fit", "chosen_for_msms", 'area'
    ]

    neutral_mass: float
    intensity: float
    signal_to_noise: float
    index: _Index
    full_width_at_half_max: float
    charge: int
    a_to_a2_ratio: float
    most_abundant_mass: float
    average_mass: float
    score: float
    envelope: Envelope
    mz: float
    fit: Optional[IsotopicFitRecord]
    chosen_for_msms: bool
    area: float

    def __init__(self, neutral_mass, intensity, charge, signal_to_noise, index, full_width_at_half_max,
                 a_to_a2_ratio=0, most_abundant_mass=0, average_mass=0, score=0,
                 envelope=None, mz=0, fit=None, chosen_for_msms=False, area=0):
        if index is None:
            index = _Index()
        if envelope is None:
            envelope = ()
        self.neutral_mass = neutral_mass
        self.intensity = intensity
        self.signal_to_noise = signal_to_noise
        self.index = index
        self.full_width_at_half_max = full_width_at_half_max
        self.charge = charge
        self.a_to_a2_ratio = a_to_a2_ratio
        self.most_abundant_mass = most_abundant_mass
        self.average_mass = average_mass
        self.score = score
        self.envelope = Envelope(envelope)
        self.mz = mz or mass_charge_ratio(self.neutral_mass, self.charge)
        self.fit = fit
        self.chosen_for_msms = chosen_for_msms
        self.area = area

    def __eq__(self, other):
        epsilon = 1e-3
        return (abs(self.neutral_mass - other.neutral_mass) < epsilon) and (
            abs(self.intensity - other.intensity) < epsilon) and (
            self.charge == other.charge) and (
            abs(self.score - other.score) < epsilon)

    def __ne__(self, other):
        return not (self == other)

    def __hash__(self):
        return hash((self.mz, self.intensity, self.charge))

    def clone(self):
        return DeconvolutedPeak(self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
                                self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
                                self.most_abundant_mass, self.average_mass, self.score,
                                self.envelope, self.mz, self.fit, self.chosen_for_msms,
                                self.area)

    def __reduce__(self):
        return DeconvolutedPeak, (self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
                                  self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
                                  self.most_abundant_mass, self.average_mass, self.score,
                                  self.envelope, self.mz, self.fit, self.chosen_for_msms, self.area)


class IonMobilityDeconvolutedPeak(DeconvolutedPeak):
    drift_time: float

    def __init__(self, neutral_mass, intensity, charge, signal_to_noise, index, full_width_at_half_max,
                 a_to_a2_ratio=0, most_abundant_mass=0, average_mass=0, score=0,
                 envelope=(), mz=0, fit=None, chosen_for_msms=False, area=0, drift_time=0):
        super(IonMobilityDeconvolutedPeak, self).__init__(
            neutral_mass, intensity, charge, signal_to_noise, index, full_width_at_half_max,
            a_to_a2_ratio, most_abundant_mass, average_mass, score,
            envelope, mz, fit, chosen_for_msms, area)
        self.drift_time = drift_time

    def clone(self):
        return IonMobilityDeconvolutedPeak(
            self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
            self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
            self.most_abundant_mass, self.average_mass, self.score,
            self.envelope, self.mz, self.fit, self.chosen_for_msms, self.area,
            self.drift_time)

    def __reduce__(self):
        return IonMobilityDeconvolutedPeak, (
            self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
            self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
            self.most_abundant_mass, self.average_mass, self.score,
            self.envelope, self.mz, self.fit, self.chosen_for_msms, self.area,
            self.drift_time)

    def __repr__(self):
        return ("{self.__class__.__name__}(a_to_a2_ratio={self.a_to_a2_ratio}, area={self.area}, "
                "average_mass={self.average_mass}, charge={self.charge}, chosen_for_msms={self.chosen_for_msms}, "
                "envelope={self.envelope}, full_width_at_half_max={self.full_width_at_half_max}, index={self.index}, "
                "intensity={self.intensity}, most_abundant_mass={self.most_abundant_mass}, mz={self.mz}, "
                "neutral_mass={self.neutral_mass}, score={self.score}, signal_to_noise={self.signal_to_noise},"
                "drift_time={self.drift_time})").format(self=self)


class DeconvolutedPeakSolution(DeconvolutedPeak):
    """
    Extends :class:`DeconvolutedPeak` to also include a reference to
    the :class:`IsotopicFitRecord` instance it is derived from, and optionally an
    object which the fit derives from.

    Attributes
    ----------
    fit : IsotpicFitRecord
        The isotopic pattern fit used to construct this peak
    solution : object
        Representation of the "source" of the isotopic fit
    """

    __slots__ = [
        "solution", "fit",
        "neutral_mass", "intensity", "signal_to_noise",
        "index", "full_width_at_half_max", "charge",
        "a_to_a2_ratio", "most_abundant_mass", "average_mass",
        "score", "envelope", "mz", "chosen_for_msms", 'area'
    ]

    solution: Any

    def __init__(self, solution, fit, *args, **kwargs):
        self.solution = solution
        super(DeconvolutedPeakSolution, self).__init__(*args, **kwargs)
        self.fit = fit

    def clone(self):
        return DeconvolutedPeakSolution(
            self.solution, self.fit, self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
            self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
            self.most_abundant_mass, self.average_mass, self.score,
            self.envelope, self.mz, self.chosen_for_msms, self.area)

    def __reduce__(self):
        return DeconvolutedPeakSolution, (
            self.solution, self.fit, self.neutral_mass, self.intensity, self.charge, self.signal_to_noise,
            self.index, self.full_width_at_half_max, self.a_to_a2_ratio,
            self.most_abundant_mass, self.average_mass, self.score,
            self.envelope, self.mz, self.chosen_for_msms, self.area)

    def __iter__(self):
        yield self.solution
        yield self
        yield self.fit

    @classmethod
    def from_peak(cls, peak, solution):
        inst = cls(
            solution, peak.fit, peak.neutral_mass, peak.intensity,
            peak.charge, peak.signal_to_noise, peak.index, peak.full_width_at_half_max,
            peak.a_to_a2_ratio, peak.most_abundant_mass, peak.average_mass,
            peak.score, peak.envelope, peak.mz, peak.chosen_for_msms, peak.area)
        return inst


class DeconvolutedPeakSet(Base):
    """
    Represents a collection of :class:`DeconvolutedPeak` instances under multiple orderings.

    Attributes
    ----------
    peaks : tuple of DeconvolutedPeak
        Collection of peaks ordered by `neutral_mass`
    _mz_ordered: tuple of DeconvolutedPeak
        Collection of peaks ordered by `mz`
    """

    def __init__(self, peaks):
        self.peaks = peaks
        self._mz_ordered = None

    def reindex(self):
        """
        Updates the :attr:`index` of each peak in `self` and updates the
        sorted order.

        Returns
        -------
        self: DeconvolutedPeakSet
        """
        self.peaks = tuple(
            sorted(self.peaks, key=operator.attrgetter("neutral_mass")))
        self._mz_ordered = tuple(
            sorted(self.peaks, key=operator.attrgetter("mz")))
        for i, peak in enumerate(self.peaks):
            peak.index = _Index()
            peak.index.neutral_mass = i
        for i, peak in enumerate(self._mz_ordered):
            peak.index.mz = i
        return self

    def __len__(self):
        return len(self.peaks)

    def get_nearest_peak(self, neutral_mass, use_mz=False):
        if use_mz:
            return _get_nearest_peak(self._mz_ordered, neutral_mass, use_mz=use_mz)
        else:
            return _get_nearest_peak(self.peaks, neutral_mass, use_mz=use_mz)

    def has_peak(self, neutral_mass, tolerance=1e-5, use_mz=False):
        if use_mz:
            return binary_search(self._mz_ordered, neutral_mass, tolerance, mz_getter)
        return binary_search(self.peaks, neutral_mass, tolerance, neutral_mass_getter)

    def all_peaks_for(self, neutral_mass, tolerance=1e-5):
        lo = neutral_mass - neutral_mass * tolerance
        hi = neutral_mass + neutral_mass * tolerance
        lo_peak, lo_err = self.get_nearest_peak(lo)
        hi_peak, hi_err = self.get_nearest_peak(hi)
        lo_ix = lo_peak.index.neutral_mass
        if abs(ppm_error(lo_peak.neutral_mass, neutral_mass)) > tolerance:
            lo_ix += 1
        hi_ix = hi_peak.index.neutral_mass + 1
        if abs(ppm_error(hi_peak.neutral_mass, neutral_mass)) > tolerance:
            hi_ix -= 1
        return tuple(self[lo_ix:hi_ix])

    def __repr__(self):
        return "<DeconvolutedPeakSet %d Peaks>" % (len(self))

    def __getitem__(self, item):
        if isinstance(item, slice):
            return self.__class__(tuple(p.clone() for p in self.peaks[item]))
        return self.peaks[item]

    def __iter__(self):
        return iter(self.peaks)

    def clone(self):
        return self.__class__(tuple(p.clone() for p in self))

    def copy(self):
        return self.clone()

    def __eq__(self, other):
        try:
            return self.peaks == other.peaks
        except AttributeError:
            return tuple(self) == tuple(other)

    def __ne__(self, other):
        try:
            return self.peaks != other.peaks
        except AttributeError:
            return tuple(self) != tuple(other)

    def between(self, m1, m2, tolerance=1e-5, use_mz=False):
        acc = []
        collecting = False
        if not use_mz:
            getter = operator.attrgetter("neutral_mass")
            iterator = self.peaks
        else:
            getter = operator.attrgetter("mz")
            iterator = self._mz_ordered
        for peak in iterator:
            if not collecting and getter(peak) >= m1:
                collecting = True
            elif collecting and getter(peak) > m2:
                break

            if collecting:
                acc.append(peak.clone())

        return self.__class__(acc)._reindex()


[docs]def merge(peaks_a, *peaks_b, **kwargs): """Combine two :class:`DeconvolutedPeakSet` objects. Parameters ---------- peaks_a: :class:`DeconvolutedPeakSet` peaks_b: :class:`DeconvolutedPeakSet` copy: bool Whether or not to copy the peaks first. If not, the two input peak sets should not be used again as their indices will have been corrupted. Returns ------- :class:`DeconvolutedPeakSet` """ copy = kwargs.get("copy", True) tp = peaks_a.__class__ if copy: peaks = () peaks += tuple(p.clone() for p in peaks_a) for b in peaks_b: peaks += tuple(p.clone() for p in b) else: peaks = tuple(peaks_a) for b in peaks_b: peaks += tuple(b) peaks = tp(peaks) try: peaks.reindex() except AttributeError: pass return peaks
mz_getter = operator.attrgetter('mz') neutral_mass_getter = operator.attrgetter("neutral_mass") def _get_nearest_peak(peaklist, neutral_mass, use_mz=False): lo = 0 hi = len(peaklist) tol = 1 getter = operator.attrgetter("mz") if use_mz else operator.attrgetter("neutral_mass") def sweep(lo, hi): best_error = float('inf') best_index = None for i in range(hi - lo): i += lo v = getter(peaklist[i]) err = abs(v - neutral_mass) if err < best_error: best_error = err best_index = i return peaklist[best_index], best_error def binsearch(lo, hi): if (hi - lo) < 5: return sweep(lo, hi) else: mid = (hi + lo) // 2 v = getter(peaklist[mid]) if abs(v - neutral_mass) < tol: return sweep(lo, hi) elif v > neutral_mass: return binsearch(lo, mid) else: return binsearch(mid, hi) return binsearch(lo, hi) def _sweep_solution(array, value, mid, tolerance, getter=operator.attrgetter('neutral_mass')): best_index = mid best_error = float('inf') n = len(array) i = 0 while mid - i >= 0 and i <= mid: target = array[mid - i] abs_error = abs(ppm_error(value, getter(target))) if abs_error < tolerance: if abs_error < best_error: best_index = mid - i best_error = abs_error else: break i += 1 i = 1 while (mid + i) < (n - 1): target = array[mid + i] abs_error = abs(ppm_error(value, getter(target))) if abs_error < tolerance: if abs_error < best_error: best_index = mid + i best_error = abs_error else: break i += 1 if best_error == float('inf'): return None else: return array[best_index] def binary_search(peak_set, neutral_mass, tolerance, getter=operator.attrgetter('neutral_mass')): lo = 0 hi = len(peak_set) while hi != lo: mid = (hi + lo) // 2 found_peak = peak_set[mid] found_mass = getter(found_peak) if abs(ppm_error(found_mass, neutral_mass)) < tolerance: found_peak = _sweep_solution(peak_set, neutral_mass, mid, tolerance) return found_peak elif hi - lo == 1: return None elif found_mass > neutral_mass: hi = mid else: lo = mid try: has_c = True _Envelope = Envelope __Index = _Index _DeconvolutedPeak = DeconvolutedPeak _DeconvolutedPeakSolution = DeconvolutedPeakSolution _DeconvolutedPeakSet = DeconvolutedPeakSet from ms_deisotope._c.peak_set import ( Envelope, _Index, DeconvolutedPeak, DeconvolutedPeakSolution, DeconvolutedPeakSetIndexed as DeconvolutedPeakSet ) from ms_deisotope._c.peak_set import IonMobilityDeconvolutedPeak except ImportError: has_c = False
[docs]def window_peak_set(peak_set: DeconvolutedPeakSet, window_size: float=100.0, peaks_per_window: int=10) -> List[List[DeconvolutedPeak]]: """Generate non-overlapping windows over a :class:`~.DeconvolutedPeakSet`, selecting the most abundant peaks in each window. Parameters ---------- peak_set : DeconvolutedPeakSet The peak set to generate windows from. window_size : float The width of the window in daltons. The default is 100. peaks_per_window : int The number of peaks to retain per window, by intensity. Default is 10. Returns ------- List[List[DeconvolutedPeak]] """ lower_bound = math.floor(peak_set[0].neutral_mass / window_size) * window_size upper_bound = math.ceil(peak_set[-1].neutral_mass / window_size) * window_size window_count = int(math.ceil((upper_bound - lower_bound) / window_size)) windows = [] window_upper_bound = lower_bound + window_size window_lower_bound = lower_bound for _ in range(window_count): window = peak_set.between(window_lower_bound, window_upper_bound) window = sorted( sorted(window, key=lambda x: x.intensity, reverse=True)[:peaks_per_window], key=lambda x: x.neutral_mass) windows.append(window) window_lower_bound += window_size window_upper_bound += window_size return windows
[docs]def decharge(self, include_envelopes=True, charge_carrier=PROTON, new_charge=1): """Transfomr multiply charged deconvoluted peaks into singly charged deconvoluted peaks. This operation returns a copy of the peak set, the input peak set is unchanged. Parameters ---------- include_envelopes : bool, optional Whether or not to charge-transform the isotopic envelope fitted in :attr:`DeconvolutedPeak.envelope` (the default is True) charge_carrier : float, optional The charge carrier mass to use when recalculating m/z (the default is PROTON) new_charge : int, optional The new charge to set all peaks to, the default being +1 Returns ------- DeconvolutedPeakSet """ peaks = [] for peak in self: mz = mass_charge_ratio(peak.neutral_mass, new_charge, charge_carrier=charge_carrier) if include_envelopes: envelope = Envelope( [EnvelopePair( mass_charge_ratio( calc_neutral_mass(p.mz, peak.charge, charge_carrier), new_charge, charge_carrier), p.intensity) for p in peak.envelope]) else: envelope = peak.envelope new_peak = DeconvolutedPeak( peak.neutral_mass, peak.intensity, new_charge, peak.signal_to_noise, None, peak.full_width_at_half_max, peak.a_to_a2_ratio, peak.most_abundant_mass, peak.average_mass, peak.score, envelope, mz, peak.fit, peak.chosen_for_msms, peak.area) peaks.append(new_peak) new_peak_set = self.__class__(peaks) new_peak_set.reindex() return new_peak_set
[docs]def envelopes_to_peak_set(self): """Convert a set of deconvoluted peaks with fitted isotopic envelopes into a set of centroids representing those envelope peaks. Returns ------- :class:`ms_peak_picker.PeakSet` """ peaks = [] for peak in self: for point in peak.envelope: peaks.append( simple_peak(point.mz, point.intensity, peak.full_width_at_half_max)) new_peak_set = FittedPeakSet(peaks) new_peak_set.reindex() return new_peak_set
class IonMobilityDeconvolutedPeakSet(DeconvolutedPeakSet): def reindex(self): """ Updates the :attr:`index` of each peak in `self` and updates the sorted order. Returns ------- self: DeconvolutedPeakSet """ self.peaks = tuple( sorted(self.peaks, key=lambda x: (x.neutral_mass, x.drift_time))) self._mz_ordered = tuple( sorted(self.peaks, key=lambda x: (x.mz, x.drift_time))) for i, peak in enumerate(self.peaks): peak.index = _Index() peak.index.neutral_mass = i for i, peak in enumerate(self._mz_ordered): peak.index.mz = i return self if TYPE_CHECKING: from ms_deisotope.feature_map.feature_fit import IonMobilityProfileDeconvolutedLCMSFeature class IonMobilityProfileDeconvolutedPeakSolution(DeconvolutedPeakSolution): ion_mobility_interval: SimpleInterval def __init__(self, solution, fit=None, *args, **kwargs): super().__init__(solution, fit=fit, *args, **kwargs) self.ion_mobility_interval = SimpleInterval( solution.start_time, solution.end_time) def overlaps(self, other: 'IonMobilityProfileDeconvolutedPeakSolution') -> bool: return self.ion_mobility_interval.overlaps(other.ion_mobility_interval) @classmethod def from_feature(cls, feature: 'IonMobilityProfileDeconvolutedLCMSFeature', include_mz: bool=False, include_envelope: bool=True) -> 'IonMobilityProfileDeconvolutedPeakSolution': return cls(feature, fit=None, neutral_mass=feature.neutral_mass, intensity=feature.intensity, charge=feature.charge, envelope=feature.sum_envelopes() if include_envelope else None, signal_to_noise=feature.intensity, index=0, full_width_at_half_max=0, a_to_a2_ratio=0, most_abundant_mass=0, average_mass=0, score=feature.score, chosen_for_msms=False, mz=feature.mz if include_mz else 0.0)