Source code for ms_deisotope.deconvolution.utils

# -*- coding: utf-8 -*-

import logging

import numpy as np

from ms_peak_picker import (
    FittedPeak, PeakSet, PeakIndex, simple_peak, is_peak)

from ms_deisotope.averagine import (isotopic_shift, neutral_mass)
from ms_deisotope.peak_set import DeconvolutedPeak, Envelope
from ms_deisotope.constants import ERROR_TOLERANCE


logger = logging.getLogger("ms_deisotope.deconvolution")
logger.addHandler(logging.NullHandler())
info = logger.info
debug = logger.debug
error = logger.error


def _is_parallel_arrays(peaks):
    if len(peaks) == 2:
        if isinstance(peaks[0], np.ndarray) and isinstance(peaks[1], np.ndarray) and \
                len(peaks[0]) == len(peaks[1]):
            return True
    return False


[docs]def prepare_peaklist(peaks): """Ensure ``peaks`` is a :class:`~.PeakSet` object, converting from other compatible types as needed. Additionally, make a deep copy of the peaks as signal subtraction methods will modify peaks in place. This function ensures that any of the following common input types are coerced to the appropriate type: 1. :class:`ms_peak_picker.PeakSet` will be copied and indexed 2. :class:`ms_peak_picker.PeakIndex` will have its peaks extracted and copied 3. Any other *sequence* of :class:`PeakLike` objects (objects having an mz and intensity attribute) will be converted into a :class:`ms_peak_picker.PeakSet` 4. A pair of parallel sequences or NumPy arrays corresponding to m/z and intensity values for a series of peaks. 5. Any *sequence* of :class:`tuple` or :class:`list` having at least two entries will be converted into a :class:`ms_peak_picker.PeakSet` with the m/z value of each peak being the the `p[0]` of each entry and the intensity `p[1]`. Any other entries will be ignored. Parameters ---------- peaks: Sequence Any sequence of :class:`~.FittedPeak` objects, objects with ``mz`` and ``intensity`` attributes, or :class:`list` / :class:`tuple` objects containing paired values for ``mz`` and ``intensity`` Returns ------- :class:`~.PeakSet` """ if isinstance(peaks, PeakIndex): peaks = PeakSet(peaks.peaks).clone() else: peaks = tuple(peaks) if len(peaks) == 0: return PeakSet([]) elif _is_parallel_arrays(peaks): peaks = [simple_peak(x, y) for x, y in zip(*peaks)] elif not isinstance(peaks[0], FittedPeak): if is_peak(peaks[0]): peaks = [simple_peak(p.mz, p.intensity, 0.01) for p in peaks] elif isinstance(peaks[0], (list, tuple)): peaks = [simple_peak(p[0], p[1], 0.01) for p in peaks] else: raise TypeError("Cannot convert peaks into a PeakSet") peaks = PeakSet(peaks).clone() peaks.reindex() return peaks
def from_fitted_peak(peak, charge=1): """Convert a :class:`~.FittedPeak` into a :class:`~.DeconvolutedPeak` at the specified charge state. Parameters ---------- peak : :class:`~.FittedPeak` The fitted peak to use as the template charge : int, optional The charge state to use, defaults to 1+ Returns ------- :class:`~.DeconvolutedPeak` """ mass = neutral_mass(peak.mz, charge) dpeak = DeconvolutedPeak( mass, peak.intensity, charge, peak.signal_to_noise, -1, peak.full_width_at_half_max, 0, mass, mass, 0, Envelope([(peak.mz, peak.intensity)]), peak.mz, area=peak.area) return dpeak def mean(numbers): """quick and dirty mean calculation without converting to a NumPy array Parameters ---------- numbers: Iterable Returns ------- float """ n = 0. total = 0 for x in numbers: n += 1. total += x return total / n def has_previous_peak_at_charge(peak_collection, peak, charge=2, step=1, error_tolerance=ERROR_TOLERANCE): """Get the `step`th *preceding* peak from `peak` in a isotopic pattern at charge state `charge`, or return `None` if it is missing. Parameters ---------- peak_collection : DeconvoluterBase Peak collection to look up peaks in. Calls :meth:`has_peak` peak : ms_peak_picker.FittedPeak The peak to use as a point of reference charge : int, optional The charge state to interpolate from. Defaults to 2. step : int, optional The number of peaks along the isotopic pattern to search. Returns ------- FittedPeak """ prev = peak.mz - isotopic_shift(charge) * step return peak_collection.has_peak(prev, error_tolerance) def has_successor_peak_at_charge(peak_collection, peak, charge=2, step=1, error_tolerance=ERROR_TOLERANCE): """Get the `step`th *succeeding* peak from `peak` in a isotopic pattern at charge state `charge`, or return `None` if it is missing. Parameters ---------- peak_collection : DeconvoluterBase Peak collection to look up peaks in. Calls :meth:`has_peak` peak : ms_peak_picker.FittedPeak The peak to use as a point of reference charge : int, optional The charge state to interpolate from. Defaults to 2. step : int, optional The number of peaks along the isotopic pattern to search. Returns ------- FittedPeak """ nxt = peak.mz + isotopic_shift(charge) * step return peak_collection.has_peak(nxt, error_tolerance) def first_peak(peaks): """Get the first non-placeholder peak in a list of peaks Parameters ---------- peaks : Iterable of FittedPeak Returns ------- FittedPeak """ for peak in peaks: if peak.intensity > 1 and peak.mz > 1: return peak def drop_placeholders(peaks): """Removes all placeholder peaks from an iterable of peaks Parameters ---------- peaks : Iterable of FittedPeak Returns ------- list """ return [peak for peak in peaks if peak.mz > 1 and peak.intensity > 1] def count_placeholders(peaks): """Counts the number of placeholder peaks in an iterable of FittedPeaks Parameters ---------- peaks : Iterable of FittedPeak Returns ------- int Number of placeholder peaks """ i = 0 for peak in peaks: if peak.intensity <= 1: i += 1 return i def drop_placeholders_parallel(peaks, otherpeaks): """Given two parallel iterables of Peak objects, `peaks` and `otherpeaks`, for each position that is not a placeholder in `peaks`, include that Peak object and its counterpart in `otherpeaks` in a pair of output lists. Parameters ---------- peaks : Iterable of FittedPeak Peak collection to filter against otherpeaks : Iterable Collection of objects (Peak-like) to include based upon contents of `peaks` Returns ------- list Filtered form of `peaks` list Filtered form of `otherpeaks` """ new_peaks = [] new_otherpeaks = [] for i, peak in enumerate(peaks): peak = peaks[i] if peak.intensity > 1: new_peaks.append(peak) new_otherpeaks.append(otherpeaks[i]) return new_peaks, new_otherpeaks def quick_charge(peak_set, index, min_charge, max_charge): """An implementation of Hoopman's :title-reference:`QuickCharge` [1] algorithm for quickly capping charge state queries Parameters ---------- peak_set : :class:`ms_peak_picker.PeakSet The centroided peak set to search index : int The index of the peak to start the search from min_charge : int The minimum charge state to consider max_charge : int The maximum charge state to consider Returns ------- np.ndarray The list of feasible charge states References ---------- [1] Hoopmann, M. R., Finney, G. L., MacCoss, M. J., Michael R. Hoopmann, Gregory L. Finney, and, MacCoss*, M. J., … MacCoss, M. J. (2007). "High-speed data reduction, feature detection and MS/MS spectrum quality assessment of shotgun proteomics data sets using high-resolution Mass Spectrometry". Analytical Chemistry, 79(15), 5620–5632. https://doi.org/10.1021/ac0700833 """ size = len(peak_set) if size == 0: raise IndexError("peak_set cannot be empty!") if index > size: first_index = peak_set[0].peak_count if (index - first_index) < size and (index - first_index) >= 0: index -= first_index else: raise IndexError("%d is out of bounds for peak list of size %d in quick_charge" % (index, size)) min_intensity = peak_set[index].intensity / 4. charges = np.zeros(max_charge, dtype=int) for j in range(index + 1, size): if peak_set[j].intensity < min_intensity: continue diff = peak_set[j].mz - peak_set[index].mz if diff > 1.1: break raw_charge = 1 / diff charge = int(raw_charge + 0.5) remain = raw_charge - int(raw_charge) if 0.2 < remain < 0.8: continue if charge < min_charge or charge > max_charge: continue charges[charge] = 1 if not np.any(charges): return np.array([], dtype=int) for j in range(index - 1, -1, -1): diff = peak_set[index].mz - peak_set[j].mz if diff > 1.1: break raw_charge = 1 / diff charge = int(raw_charge + 0.5) remain = raw_charge - int(raw_charge) if 0.2 < remain < 0.8: continue if charge < min_charge or charge > max_charge: continue charges[charge] = 1 return np.where(charges)[0] try: _quick_charge = quick_charge from ms_deisotope._c.deconvoluter_base import quick_charge except ImportError: pass def charge_range_(lo, hi, step=None): """Generate a sequence of successive charge states, automatically deducing the polarity of the charge range to infer the step. Parameters ---------- lo : int The smallest magnitude charge (positive or negative) hi : int The largest magnitude charge (positive or negative) step : int, optional The step size and sign to incrementally alter the charge by. Automatically inferred if omitted. Yields ------ int """ sign = -1 if lo < 0 else 1 abs_lo, abs_hi = abs(lo), abs(hi) upper = max(abs_lo, abs_hi) lower = min(abs_lo, abs_hi) for c in range(upper, lower - 1, -1): yield c * sign class ChargeIterator(object): def __init__(self, lo, hi): self.set_bounds(lo, hi) self.make_sequence() def set_bounds(self, lo, hi): self.sign = -1 if lo < 0 else 1 abs_lo, abs_hi = abs(lo), abs(hi) if abs_lo < abs_hi: self.lower = abs_lo self.upper = abs_hi else: self.lower = abs_hi self.upper = abs_lo self.size = self.upper - self.lower + 1 def make_sequence(self): self.index = 0 self.size = self.upper - self.lower + 1 self.values = [self.sign * (self.upper - i) for i in range(self.size)] def __len__(self): return self.size def reset(self): self.index = 0 def sequence_from_quickcharge(self, peak_set, peak): charges = quick_charge(peak_set, peak.peak_count, abs(self.lower), abs(self.upper)) n = charges.shape[0] self.index = 0 if n == 0: self.size = 1 self.values = [1 * self.sign] elif charges[0] != 1: self.size = n + 1 self.values = [1 * self.sign] + list(self.sign * charges) else: self.size = n self.values = self.sign * charges def __iter__(self): return self def __next__(self): if self.index == self.size: raise StopIteration() value = self.values[self.index] self.index += 1 return value def next(self): return self.__next__()