Source code for ms_deisotope.scoring

# -*- coding: utf-8 -*-

import math
import numpy as np
import operator

from .utils import Base

eps = 1e-4


class IsotopicFitRecord(object):
    """Describes a single isotopic pattern fit, comparing how well an
    experimentally observed sequence of peaks matches a theoretical isotopic
    pattern.

    IsotopicFitRecord instances are hashable and orderable (by score).

    Attributes
    ----------
    charge : int
        The charge state used to generate the theoretical pattern
    data : object
        An arbitrary Python object containing extra information
    experimental : list of FittedPeak
        The observed experimental peaks to be fitted
    missed_peaks : int
        The number of peaks in the theoretical pattern that do not
        have a matching experimental peak
    monoisotopic_peak : FittedPeak
        The fitted peak which corresponds to the monoisotopic peak
    score : float
        The score assigned to the fit by an IsotopicFitter object
    seed_peak : FittedPeak
        The peak that was used to initiate the fit. This may be unused
        if not using an Averagine method
    theoretical : list of TheoreticalPeak
        The theoretical isotopic pattern being fitted on the experimental data
    """

    __slots__ = ["seed_peak", "score", "charge", "experimental", "theoretical",
                 "monoisotopic_peak", "data", "missed_peaks"]

    def __init__(self, seed_peak, score, charge, theoretical, experimental, data=None,
                 missed_peaks=0, **kwargs):
        self.seed_peak = seed_peak
        self.score = score
        self.charge = charge
        self.experimental = experimental
        self.theoretical = theoretical
        self.monoisotopic_peak = experimental[0]
        self.data = data
        self.missed_peaks = missed_peaks

    def clone(self) -> 'IsotopicFitRecord':
        """Make a shallow copy of this object.

        Returns
        -------
        IsotopicFitRecord
        """
        return self.__class__(
            self.seed_peak, self.score, self.charge, self.theoretical, self.experimental, self.data, self.missed_peaks)

    def __reduce__(self):
        return self.__class__, (
            self.seed_peak, self.score, self.charge, self.theoretical, self.experimental, self.data, self.missed_peaks)

    def __eq__(self, other):
        val = (self.score == other.score and
               self.charge == other.charge and
               self.experimental == other.experimental and
               self.theoretical == other.theoretical)
        if self.data is not None or other.data is not None:
            val = val and (self.data == other.data)
        return val

    def __ne__(self, other):
        return not (self == other)

    def __lt__(self, other):
        return self.score < other.score

    def __gt__(self, other):
        return self.score > other.score

    def __hash__(self):
        return hash((self.monoisotopic_peak.mz, self.charge))

    def __iter__(self):
        yield self.score
        yield self.charge
        yield self.experimental
        yield self.theoretical

    @property
    def npeaks(self):
        """The number of experimental peaks covered in the fit.

        Returns
        -------
        int
        """
        return len(self.experimental)

    def __repr__(self):
        return "IsotopicFitRecord(score=%0.5f, charge=%d, npeaks=%d, monoisotopic_mz=%0.5f)" % (
            self.score, self.charge, self.npeaks, self.monoisotopic_peak.mz)


[docs]class FitSelectorBase(Base): """An object that controls the filtering and selection of IsotopicFitRecord Attributes ---------- minimum_score : int The minimum score needed to be a candidate for selection. If the FitSelector is `minimizing` it is the maximal score to be a candidate. """ minimum_score = 0 def __init__(self, minimum_score=0): self.minimum_score = minimum_score def best(self, results): raise NotImplementedError() def __call__(self, *args, **kwargs): return self.best(*args, **kwargs) def reject(self, fit): raise NotImplementedError() def reject_score(self, score): raise NotImplementedError() def is_maximizing(self): return False
[docs]class MinimizeFitSelector(FitSelectorBase): """A FitSelector which tries to minimize the score of the best fit.""" def best(self, results): """Returns the IsotopicFitRecord with the smallest score Parameters ---------- results : list of IsotopicFitRecord List of isotopic fits to select the most optimal case from Returns ------- IsotopicFitRecord The most optimal fit """ return min(results, key=operator.attrgetter("score")) def reject(self, fit): """Decide whether the fit should be discarded for having too large a score. Compares against :attr:`minimum_score` Parameters ---------- fit : IsotopicFitRecord Returns ------- bool """ return fit.score > self.minimum_score def reject_score(self, score): return score > self.minimum_score def is_maximizing(self): """Returns that this is *not* a maximizing selector Returns ------- False """ return False
[docs]class MaximizeFitSelector(FitSelectorBase): """A FitSelector which tries to maximize the score of the best fit.""" def best(self, results): """Returns the IsotopicFitRecord with the largest score Parameters ---------- results : list of IsotopicFitRecord List of isotopic fits to select the most optimal case from Returns ------- IsotopicFitRecord The most optimal fit """ return max(results, key=operator.attrgetter("score")) def reject(self, fit): """Decide whether the fit should be discarded for having too small a score. Compares against :attr:`minimum_score` Parameters ---------- fit : IsotopicFitRecord Returns ------- bool """ return fit.score < self.minimum_score def reject_score(self, score): return score < self.minimum_score def is_maximizing(self): """Returns that this *is* a maximizing selector Returns ------- False """ return True
[docs]class IsotopicFitterBase(Base): """A base class for Isotopic Pattern Fitters, objects which given a set of experimental peaks and a set of matching theoretical peaks, returns a fit score describing how good the match is. An IsotopicFitter may be optimal when the score is small (minimizing) or when the score is large (maximizing), and the appropriate :class:`FitSelectorBase` type will be used for :attr:`select`. This will also be reflected by :meth:`is_maximizing`. """ def __init__(self, score_threshold=0.5): self.select = MinimizeFitSelector(score_threshold)
[docs] def evaluate(self, peaklist, observed, expected, **kwargs): """Evaluate a pair of peak lists for goodness-of-fit. Parameters ---------- peaklist : :class:`~.PeakSet` The full set of all experimental peaks observed : list The list of experimental peaks that are part of this fit expected : list The list of theoretical peaks that are part of this fit **kwargs Returns ------- float The score """ return NotImplemented
def _evaluate(self, peaklist, observed, expected, **kwargs): return self.evaluate(peaklist, observed, expected, **kwargs)
[docs] def __call__(self, *args, **kwargs): """Invokes :meth:`evaluate` Parameters ---------- *args Forwarded to :meth:`evaluate` **kwargs Forwarded to :meth:`evaluate` Returns ------- float The score """ return self.evaluate(*args, **kwargs)
[docs] def reject(self, fit): """Test whether this fit is too poor to be used Parameters ---------- fit : :class:`~IsotopicFitRecord` The fit to test Returns ------- bool """ return self.select.reject(fit)
[docs] def is_maximizing(self): """Whether or not this fitter's score gets better as it grows Returns ------- bool Whether or not this fitter is a maximizing fitter """ return self.select.is_maximizing()
def configure(self, deconvoluter, **kwargs): return self
[docs]class GTestFitter(IsotopicFitterBase): r"""Evaluate an isotopic fit using a G-test .. math:: G = 2\sum_i^n{o_i * ({log}o_i - {log}e_i)} where :math:`o_i` is the intensity of the ith experimental peak and :math:`e_i` is the intensity of the ith theoretical peak. """ def evaluate(self, peaklist, observed, expected, **kwargs): g_score = 2 * sum([obs.intensity * np.log( obs.intensity / theo.intensity) for obs, theo in zip(observed, expected)]) return g_score
g_test = GTestFitter() class ScaledGTestFitter(IsotopicFitterBase): r"""Evaluate an isotopic fit using a G-test after normalizing the list of experimental and theoretical peaks to both sum to 1. .. math:: G = 2\sum_i^n{o_i * ({log}o_i - {log}e_i)} where :math:`o_i` is the intensity of the ith experimental peak and :math:`e_i` is the intensity of the ith theoretical peak. """ def evaluate(self, peaklist, observed, expected, **kwargs): total_observed = sum(p.intensity for p in observed) total_expected = sum(p.intensity for p in expected) total_expected += eps normalized_observed = [obs.intensity / total_observed for obs in observed] normalized_expected = [theo.intensity / total_expected for theo in expected] g_score = 2 * sum([obs * np.log(obs / theo) for obs, theo in zip( normalized_observed, normalized_expected)]) return g_score g_test_scaled = ScaledGTestFitter() class ChiSquareFitter(IsotopicFitterBase): def evaluate(self, peaklist, observed, expected, **kwargs): score = sum([(obs.intensity - theo.intensity)**2 / theo.intensity for obs, theo in zip(observed, expected)]) return score chi_sqr_test = ChiSquareFitter() class LeastSquaresFitter(IsotopicFitterBase): r"""Evaluate an isotopic fit using least squares coefficient of determination :math:`R^2`. .. math:: {\hat e_i} &= e_i / max(e) {\hat t_i} &= t_i / max(t) {\hat t} &= \sum_i^n {\hat t_i}^2 R^2 &= \frac{1}{{\hat t}}\sum_i^n ({\hat e_i} - {\hat t_i})^2 where :math:`e_i` is the ith experimental peak intensity and :math:`t_i` is the ith theoretical peak intensity """ def evaluate(self, peaklist, observed, expected, **kwargs): exp_max = max(p.intensity for p in observed) theo_max = max(p.intensity for p in expected) sum_of_squared_errors = 0 sum_of_squared_theoreticals = 0 for e, t in zip(observed, expected): normed_expr = e.intensity / exp_max normed_theo = t.intensity / theo_max sum_of_squared_errors += (normed_theo - normed_expr) ** 2 sum_of_squared_theoreticals += normed_theo ** 2 return sum_of_squared_errors / sum_of_squared_theoreticals least_squares = LeastSquaresFitter() class MSDeconVFitter(IsotopicFitterBase): """An implementation of the scoring function used in :title-reference:`MSDeconV` References ---------- Liu, X., Inbar, Y., Dorrestein, P. C., Wynne, C., Edwards, N., Souda, P., … Pevzner, P. A. (2010). Deconvolution and database search of complex tandem mass spectra of intact proteins: a combinatorial approach. Molecular & Cellular Proteomics : MCP, 9(12), 2772–2782. https://doi.org/10.1074/mcp.M110.002766 """ def __init__(self, minimum_score=10, mass_error_tolerance=0.02): self.select = MaximizeFitSelector() self.select.minimum_score = minimum_score self.mass_error_tolerance = mass_error_tolerance def calculate_minimum_signal_to_noise(self, observed): snr = 0 n = 0 for obs in observed: if obs.signal_to_noise < 1: continue snr += obs.signal_to_noise n += 1 return (snr / n) * 0.05 def reweight(self, obs, theo, obs_total, theo_total): norm_obs = obs.intensity / obs_total norm_theo = theo.intensity / theo_total return norm_obs * np.log(norm_obs / norm_theo) def score_peak(self, obs, theo, mass_error_tolerance=0.02, minimum_signal_to_noise=1): if obs.signal_to_noise < minimum_signal_to_noise: return 0. mass_error = np.abs(obs.mz - theo.mz) if mass_error <= mass_error_tolerance: mass_accuracy = 1 - mass_error / mass_error_tolerance else: mass_accuracy = 0 if obs.intensity < theo.intensity and (((theo.intensity - obs.intensity) / obs.intensity) <= 1): abundance_diff = 1 - \ ((theo.intensity - obs.intensity) / obs.intensity) elif obs.intensity >= theo.intensity and (((obs.intensity - theo.intensity) / obs.intensity) <= 1): abundance_diff = np.sqrt( 1 - ((obs.intensity - theo.intensity) / obs.intensity)) else: abundance_diff = 0. score = np.sqrt(theo.intensity) * mass_accuracy * abundance_diff return score def evaluate(self, peaklist, observed, expected, **kwargs): score = 0 for obs, theo in zip(observed, expected): inc = self.score_peak(obs, theo, self.mass_error_tolerance, 1) score += inc return score class PenalizedMSDeconVFitter(IsotopicFitterBase): r"""An Isotopic Fitter which uses the :class:`MSDeconVFitter` score weighted by 1 - :attr:`penalty_factor` * :class:`ScaledGTestFitter` score .. math:: S(e, t) = M(e, t) * (1 - G(e, t)) where :math:`e` is the experimental peak list and :math:`t` is the theoretical peak list """ def __init__(self, minimum_score=10, penalty_factor=1., mass_error_tolerance=0.02): self.select = MaximizeFitSelector(minimum_score) self.msdeconv = MSDeconVFitter(mass_error_tolerance=mass_error_tolerance) self.penalizer = ScaledGTestFitter() self.penalty_factor = penalty_factor def evaluate(self, peaklist, observed, expected, **kwargs): score = self.msdeconv.evaluate(peaklist, observed, expected) penalty = abs(self.penalizer.evaluate(peaklist, observed, expected)) return score * (1 - penalty * self.penalty_factor) def decon2ls_chisqr_test(peaklist, observed, expected, **kwargs): fit_total = 0 sum_total = 0 for obs, theo in zip(observed, expected): intensity_diff = obs.intensity - theo.intensity fit_total += (intensity_diff ** 2) / (theo.intensity + obs.intensity) sum_total += theo.intensity * obs.intensity return fit_total / (sum_total + 0.01) class InterferenceDetection(object): def __init__(self, peaklist): self.peaklist = peaklist def __call__(self, experimental_peaks, lower=None, upper=None): return self.detect_interference(experimental_peaks, lower, upper) def detect_interference(self, experimental_peaks, lower=None, upper=None): min_peak = experimental_peaks[0] max_peak = experimental_peaks[-1] if lower is None: try: lower = min_peak.mz - min_peak.full_width_at_half_max except AttributeError: lower = min_peak.mz if upper is None: try: upper = max_peak.mz + max_peak.full_width_at_half_max except AttributeError: upper = max_peak.mz region = self.peaklist.between(lower, upper) included_intensity = sum(p.intensity for p in experimental_peaks) region_intensity = sum(p.intensity for p in region) if region_intensity == 0: return 0. score = 1 - (included_intensity / region_intensity) return score class DistinctPatternFitter(IsotopicFitterBase): def __init__(self, minimum_score=0.3, peak_count_scale=1.5, domain_scale=100.): self.select = MinimizeFitSelector(minimum_score) self.interference_detector = None self.g_test_scaled = ScaledGTestFitter() self.peak_count_scale = peak_count_scale self.domain_scale = domain_scale def evaluate(self, peaklist, experimental, theoretical, **kwargs): npeaks = float(len(experimental)) if self.interference_detector is None: self.interference_detector = InterferenceDetection(peaklist) score = self.g_test_scaled(peaklist, experimental, theoretical) score *= abs((self.interference_detector.detect_interference(experimental) + 0.00001) / ( npeaks * self.peak_count_scale)) * self.domain_scale return score def percentile(N, percent): if not N: return None k = (len(N) - 1) * percent f = math.floor(k) c = math.ceil(k) if f == c: return N[int(k)] d0 = N[int(f)] * (c - k) d1 = N[int(c)] * (k - f) return d0 + d1 class DotProductFitter(IsotopicFitterBase): def evaluate(self, peaklist, observed, expected, **kwargs): total = 0 for e, t in zip(observed, expected): total += e.intensity * t.intensity return total try: _has_c = True _IsotopicFitRecord = IsotopicFitRecord _LeastSquaresFitter = LeastSquaresFitter _MSDeconVFitter = MSDeconVFitter _ScaledGTestFitter = ScaledGTestFitter _PenalizedMSDeconVFitter = PenalizedMSDeconVFitter _DistinctPatternFitter = DistinctPatternFitter _DotProductFitter = DotProductFitter from ._c.scoring import ( IsotopicFitRecord, LeastSquaresFitter, MSDeconVFitter, ScaledGTestFitter, PenalizedMSDeconVFitter, DistinctPatternFitter, DotProductFitter, FunctionScorer) except ImportError as e: _has_c = False msdeconv = MSDeconVFitter() least_squares = LeastSquaresFitter() g_test_scaled = ScaledGTestFitter() penalized_msdeconv = PenalizedMSDeconVFitter() distinct_pattern_fitter = DistinctPatternFitter() class MassShiftSupportPostProcessorBase(object): def __init__(self, shifts=None): if shifts is None: shifts = [] self.shifts = shifts self._fits = None @property def fits(self): return self._fits @fits.setter def fits(self, value): self._fits = value def reweight(self, fit, shift): pass