Source code for ms_deisotope.deconvolution.exhaustive

"""A collection of base classes for "exhaustive" search strategies.
These strategies attempt to assign *every* peak, but they are not meant
to be used on their own.

For complete implementations see :class:`~.AveragineDeconvoluter` and
:class:`~.AveraginePeakDependenceGraphDeconvoluter`.
"""

import operator

from ms_deisotope.constants import (
    ERROR_TOLERANCE, TRUNCATE_AFTER, IGNORE_BELOW, MAX_ITERATION,
    CONVERGENCE)

from ms_deisotope.peak_set import (
    DeconvolutedPeak,
    DeconvolutedPeakSet)

from ms_deisotope.averagine import PROTON, neutral_mass

from ms_deisotope.envelope_statistics import (
    average_mz,
    a_to_a2_ratio,
    most_abundant_mz)

from ms_deisotope.utils import (
    TrivialTargetedDeconvolutionResult)

from ms_deisotope.peak_dependency_network import (
    PeakDependenceGraph,
    NetworkedTargetedDeconvolutionResult)


from .base import DeconvoluterBase
from .utils import (
    ChargeIterator,
    has_previous_peak_at_charge,
    has_successor_peak_at_charge,
    drop_placeholders,
    count_placeholders,
    first_peak,
    mean,
    info,
    debug)


[docs]class ExhaustivePeakSearchDeconvoluterBase(DeconvoluterBase): """Provides common methods for algorithms which attempt to find a deconvolution for every peak in a spectrum. This assumes no dependence between different peaks, instead it relies on subtraction, breadth of search, and order of encounter to avoid artefactual fits. This is usually not reasonable, so instead please use this class's extension, :class:`PeakDependenceGraphDeconvoluterBase` which can express dependence of fits on common resources. This class is not meant to be instantiated, but instead used as a mixin for classes that also inherit from :class:`DeconvoluterBase` and provide methods `fit_theoretical_distribution` and `_fit_peaks_at_charges` Attributes ---------- use_quick_charge: bool Whether or not to use the :title-reference:`QuickCharge` algorithm when generating putative charge states. incremental_truncation: float or :const:`None` If not :const:`None`, the isotopic pattern truncation lower bound to contract each fit to incrementally, generating new fits for each dropped peak using :meth:`~.DeconvoluterBase.fit_incremental_truncation` """ def __init__(self, peaklist, *args, **kwargs): super(ExhaustivePeakSearchDeconvoluterBase, self).__init__(peaklist, *args, **kwargs) self.use_quick_charge = kwargs.get("use_quick_charge", False) self.incremental_truncation = kwargs.get("incremental_truncation", None) def _get_all_peak_charge_pairs(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, recalculate_starting_peak=True, use_quick_charge=False): """Construct the set of all unique candidate (monoisotopic peak, charge state) pairs using the provided search parameters. The search is performed using :func:`has_previous_peak_at_charge`, :func:`has_successor_peak_at_charge`, :meth:`_find_previous_putative_peak`, and :meth:`_find_next_putative_peak`. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 recalculate_starting_peak : bool, optional Whether or not to re-calculate the putative starting peak m/z based upon nearby peaks close to where isotopic peaks for `peak` should be. Defaults to True Returns ------- set The set of all unique candidate (monoisotopic peak, charge state) """ target_peaks = set() charges = ChargeIterator(*charge_range) if use_quick_charge: charges.sequence_from_quickcharge(self.peaklist, peak) for charge in charges: target_peaks.add((peak, charge)) # Look Left for i in range(1, left_search_limit): prev_peak = has_previous_peak_at_charge( self, peak, charge, i) if prev_peak is None: continue target_peaks.add((prev_peak, charge)) if recalculate_starting_peak: target_peaks.update(self._find_previous_putative_peak( peak.mz, charge, i, 2 * error_tolerance)) # Look Right for i in range(1, right_search_limit): nxt_peak = has_successor_peak_at_charge( self, peak, charge, i) if nxt_peak is None: continue target_peaks.add((nxt_peak, charge)) if recalculate_starting_peak: target_peaks.update(self._find_next_putative_peak( peak.mz, charge, i, 2 * error_tolerance)) if recalculate_starting_peak: for i in range(min(left_search_limit, 2)): target_peaks.update(self._find_next_putative_peak( peak.mz, charge, step=i, tolerance=2 * error_tolerance)) return target_peaks def _fit_all_charge_states(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, recalculate_starting_peak=True, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Carry out the fitting process for `peak`. This method calls :meth:`_get_all_peak_charge_pairs` to collect all hypothetical solutions for `peak`, and invokes :meth:`_fit_peaks_at_charges` to evaluate them. The method :meth:`_fit_peaks_at_charges` is required by this interface, but is not defined by it, as it depends upon the underlying isotopic pattern fitting algorithm. See one of the Averagine-based algorithms for an implementation, such as :class:`AveragineDeconvoluterBase`, a complementary ancestor with this class. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 recalculate_starting_peak : bool, optional Whether or not to re-calculate the putative starting peak m/z based upon nearby peaks close to where isotopic peaks for `peak` should be. Defaults to True charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- set The set of :class:`~.IsotopicFitRecord` instances produced """ target_peaks = self._get_all_peak_charge_pairs( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, recalculate_starting_peak=recalculate_starting_peak, use_quick_charge=self.use_quick_charge) results = self._fit_peaks_at_charges( target_peaks, error_tolerance, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) return (results)
[docs] def charge_state_determination(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Determine the optimal isotopic fit for `peak`, extracting it's charge state and monoisotopic peak. This method invokes :meth:`_fit_all_charge_states`, and then uses :attr:`scorer`'s `select` method to choose the optimal solution. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- :class:`~.IsotopicFitRecord` The best scoring isotopic fit """ results = self._fit_all_charge_states( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) if self.verbose: info("Fits for %r", peak) for rec in sorted(results)[-10:]: info(rec) try: result = self.scorer.select(results) return result except ValueError: return None
def _make_deconvoluted_peak(self, fit, charge_carrier): score, charge, eid, tid = fit rep_eid = drop_placeholders(eid) total_abundance = sum(p.intensity for p in rep_eid) monoisotopic_mass = neutral_mass( tid.monoisotopic_mz, charge, charge_carrier) reference_peak = first_peak(eid) dpeak = DeconvolutedPeak( neutral_mass=monoisotopic_mass, intensity=total_abundance, charge=charge, signal_to_noise=mean(p.signal_to_noise for p in rep_eid), index=reference_peak.index, full_width_at_half_max=mean( p.full_width_at_half_max for p in rep_eid), a_to_a2_ratio=a_to_a2_ratio(tid), most_abundant_mass=neutral_mass( most_abundant_mz(eid), charge), average_mass=neutral_mass(average_mz(eid), charge), score=score, envelope=[(p.mz, p.intensity) for p in eid], mz=tid.monoisotopic_mz, fit=fit, area=sum(e.area for e in eid)) return dpeak
[docs] def deconvolute_peak(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Perform a deconvolution for `peak`, generating a new :class:`ms_deisotope.peak_set.DeconvolutedPeak` instance corresponding to the optimal solution. This new peak has an m/z matching the monoisotopic peak of the pattern containing `peak`, and its intensity is the sum of all the matched peaks in its isotopic pattern. Its charge, isotopic fit, and other qualities are derived from the :class:`ms_deisotope.scoring.IsotopicFitRecord` instance corresponding to its best solution. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 charge_carrier : float, optional The mass of the charge carrier, or more specifically, the moiety which is added for each incremental change in charge state. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- :class:`~.DeconvolutedPeak` """ fit = self.charge_state_determination( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) if fit is None: return tid = fit.theoretical dpeak = self._make_deconvoluted_peak(fit, charge_carrier) self._deconvoluted_peaks.append(dpeak) if self.use_subtraction: self.subtraction(tid, error_tolerance) return dpeak
[docs] def targeted_deconvolution(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Express the intent that this peak's deconvolution solution will be retrieved at a later point in the process and that it should be deconvoluted, and return a handle to retrieve the results with. This algorithm's implementation is simple enough that this is equivalent to just performing the deconvolution now and storing the result in a :class:`~.TrivialTargetedDeconvolutionResult` instance. Otherwise identical to :meth:`deconvolute_peak`. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- :class:`~.TrivialTargetedDeconvolutionResult` """ dpeak = self.deconvolute_peak( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) result = TrivialTargetedDeconvolutionResult(self, dpeak, peak) return result
[docs] def deconvolute(self, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), order_chooser=operator.attrgetter('index'), left_search_limit=3, right_search_limit=3, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW, **kwargs): """Completely deconvolute the spectrum. Visit each peak in the order chosen by `order_chooser`, and call :meth:`deconvolute_peak` on it with the provided arguments. This assumes all overlaps in isotopic pattern are captured by the search limits. This is usually not the case. For an alternative see :class:`PeakDependenceGraphDeconvoluterBase` Parameters ---------- error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) order_chooser : callable, optional: A callable used as a key function for sorting peaks into the order they will be visited during deconvolution. Defaults to :obj:`operator.attrgetter("index")` left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- :class:`~.DeconvolutedPeakSet` """ i = 0 for peak in sorted(self.peaklist, key=order_chooser, reverse=True): if peak.mz < 2 or peak.intensity < self.minimum_intensity: continue self.deconvolute_peak( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) i += 1 if self.merge_isobaric_peaks: self._deconvoluted_peaks = self._merge_peaks( self._deconvoluted_peaks) return DeconvolutedPeakSet(self._deconvoluted_peaks).reindex()
try: from ms_deisotope._c.deconvoluter_base import ( _get_all_peak_charge_pairs as _c_get_all_peak_charge_pairs, _make_deconvoluted_peak as _c_make_deconvoluted_peak) ExhaustivePeakSearchDeconvoluterBase._get_all_peak_charge_pairs = _c_get_all_peak_charge_pairs ExhaustivePeakSearchDeconvoluterBase._make_deconvoluted_peak = _c_make_deconvoluted_peak except ImportError as e: print(e)
[docs]class PeakDependenceGraphDeconvoluterBase(ExhaustivePeakSearchDeconvoluterBase): """Extends the concept of :class:`ExhaustivePeakSearchDeconvoluterBase` to include a way to handle conflicting solutions which claim the same experimental peak. This lets the Deconvoluter assign a single peak only once, and to the "best" solution to use it. To do this, the Deconvoluter constructs a graph where peaks are nodes, and isotopic fits are hyperedges connecting multiple nodes. Rather than deconvoluting the spectrum step by step, assigning signal as it explores the spectrum, the Deconvoluter instead inserts each considered isotopic fit into the graph. After completely traversing the spectrum, the Deconvoluter solves the dependence graph attempting to maximize some criterion and produce a set of disjoint isotopic fits. These fits are then assigned signal and added to the deconvoluted spectrum as normal. The criterion used is currently a greedy maximization (or minimization) of each connected component of the peak dependence graph. Attributes ---------- max_missed_peaks : int The maximum number of missing peaks to tolerate in an isotopic fit peak_dependency_network : :class:`~.PeakDependenceGraph` The peak dependence graph onto which isotopic fit dependences on peaks are constructed and solved. """ def __init__(self, peaklist, *args, **kwargs): max_missed_peaks = kwargs.get("max_missed_peaks", 1) self.subgraph_solver_type = kwargs.get("subgraph_solver", 'top') super(PeakDependenceGraphDeconvoluterBase, self).__init__(peaklist, *args, **kwargs) self.peak_dependency_network = PeakDependenceGraph( self.peaklist, maximize=self.scorer.is_maximizing()) self.max_missed_peaks = max_missed_peaks self.fit_postprocessor = kwargs.get("fit_postprocessor", None) self._priority_map = {} @property def max_missed_peaks(self): """The maximum number of missed peaks per isotopic fit record permitted. This property directly mirrors :attr:`PeakDependenceGraph.max_missed_peaks` Returns ------- int """ return self.peak_dependency_network.max_missed_peaks @max_missed_peaks.setter def max_missed_peaks(self, value): self.peak_dependency_network.max_missed_peaks = value def _explore_local(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=1, right_search_limit=0, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Given a peak, explore the local neighborhood for candidate isotopic fits and add each fit above a threshold to the peak dependence graph. The threshold assumes that a single peak's neighborhood will contain many, many fits, but that only the top `n` scoring fits are worth considering. For now, `n` is fixed at 100 or the half number of fits returned, whichever is larger. This is to prevent the fit graph from growing out of control and wasting time storing impractical fits. Any fit added to the graph will have to pass :attr:`scorer.select` as well, so weak fits will never be added, regardless of how many fits are allowed to be inserted. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 1 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 0 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. Returns ------- int The number of fits added to the graph """ results = self._fit_all_charge_states( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) hold = set() for fit in results: if fit.charge > 1 and len(drop_placeholders(fit.experimental)) == 1: continue hold.add(fit) results = hold n = len(results) stop = max(min(n // 2, 100), 10) if n == 0: return 0 i = 0 for i in range(stop): if len(results) == 0: break candidate = self.scorer.select(results) if candidate is None: break self.peak_dependency_network.add_fit_dependence(candidate) results.discard(candidate) return i def populate_graph(self, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=1, right_search_limit=0, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Visit each experimental peak and execute :meth:`_explore_local` on it with the provided parameters, populating the peak dependence graph with all viable candidates. Parameters ---------- peak : :class:`~.FittedPeak` error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 1 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 0 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. """ for peak in self.peaklist: if peak in self._priority_map or peak.intensity < self.minimum_intensity: if self.verbose: debug("Skipping %r (%r)", peak, peak.intensity < self.minimum_intensity) continue n = self._explore_local( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) if self.verbose: debug("Exlporing Area Around %r Yielded %d Fits", peak, n)
[docs] def postprocess_fits(self, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), charge_carrier=PROTON, *args, **kwargs): """Postprocesses fits before solving the peak dependence graph. Currently a no-op. Parameters ---------- error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| """ if self.fit_postprocessor is None: return
[docs] def select_best_disjoint_subgraphs(self, error_tolerance=ERROR_TOLERANCE, charge_carrier=PROTON): """Construct connected envelope graphs from :attr:`peak_dependency_network` and extract the best disjoint isotopic pattern fits in each envelope graph. This in turn produces one or more :class:`DeconvolutedPeak` instances from each disjoint fit, which are processed and added to the results set. Parameters ---------- error_tolerance : float, optional The error tolerance to use when performing subtraction, if subtraction is being performed. charge_carrier : float, optional The mass of the charge carrier as used for the deconvolution. Required to back-out the neutral mass of the deconvoluted result """ disjoint_envelopes = self.peak_dependency_network.find_non_overlapping_intervals() i = 0 if self.subgraph_solver_type == 'disjoint': solver = self._solve_subgraph_disjoint elif self.subgraph_solver_type == 'iterative': solver = self._solve_subgraph_iterative elif self.subgraph_solver_type == 'top': solver = self._solve_subgraph_top else: raise ValueError("Unknown solver type %r" % (self.subgraph_solver_type, )) for cluster in disjoint_envelopes: solutions = solver(cluster, error_tolerance, charge_carrier) for dpeak in solutions: self.peak_dependency_network.add_solution(dpeak.fit, dpeak) self._deconvoluted_peaks.append(dpeak) i += 1
def _solve_subgraph_top(self, cluster, error_tolerance=ERROR_TOLERANCE, charge_carrier=PROTON): """Given a :class:`~.DependenceCluster`, return the single best fit from the collection of co-dependent fits. Parameters ---------- cluster : :class:`~.DependenceCluster` The connected subgraph whose nodes will be searched error_tolerance : float, optional The error tolerance to use when performing subtraction, if subtraction is being performed. charge_carrier : float, optional The mass of the charge carrier as used for the deconvolution. Required to back-out the neutral mass of the deconvoluted result Returns ------- list of :class:`~DeconvolutedPeak` The solved deconvolution solutions """ fit = cluster[0] _, _, eid, tid = fit rep_eid = drop_placeholders(eid) if len(rep_eid) == 0: return [] dpeak = self._make_deconvoluted_peak(fit, charge_carrier) if self.use_subtraction: self.subtraction(tid, error_tolerance) return [dpeak] def _solve_subgraph_disjoint(self, cluster, error_tolerance=ERROR_TOLERANCE, charge_carrier=PROTON): """Given a :class:`~.DependenceCluster`, find a greedy disjoint set of isotopic fits. Parameters ---------- cluster : :class:`~.DependenceCluster` The connected subgraph whose nodes will be searched error_tolerance : float, optional The error tolerance to use when performing subtraction, if subtraction is being performed. charge_carrier : float, optional The mass of the charge carrier as used for the deconvolution. Required to back-out the neutral mass of the deconvoluted result Returns ------- list of :class:`~DeconvolutedPeak` The solved deconvolution solutions """ disjoint_best_fits = cluster.disjoint_best_fits() i = 0 solutions = [] for fit in disjoint_best_fits: _, _, eid, tid = fit rep_eid = drop_placeholders(eid) if len(rep_eid) == 0: continue dpeak = self._make_deconvoluted_peak(fit, charge_carrier) solutions.append(dpeak) i += 1 if self.use_subtraction: self.subtraction(tid, error_tolerance) return solutions def _solve_subgraph_iterative(self, cluster, error_tolerance=ERROR_TOLERANCE, charge_carrier=PROTON): """Given a :class:`~.DependenceCluster`, build a :class:`~.ConnectedSubgraph` and incrementally subtract the best fitting solution and update its overlapping envelopes. Parameters ---------- cluster : :class:`~.DependencyCluster` The connected subgraph whose nodes will be searched error_tolerance : float, optional The error tolerance to use when performing subtraction, if subtraction is being performed. charge_carrier : float, optional The mass of the charge carrier as used for the deconvolution. Required to back-out the neutral mass of the deconvoluted result Returns ------- list of :class:`~DeconvolutedPeak` The solved deconvolution solutions """ print(cluster) subgraph = cluster.build_graph() solutions = [] mask = set() best_node = subgraph[0] peak = self._make_deconvoluted_peak(best_node.fit, charge_carrier) solutions.append(peak) if self.use_subtraction: self.subtraction(best_node.fit.theoretical, error_tolerance) mask.add(best_node.index) print("Masking %d (%0.3f, %d)" % ( best_node.index, best_node.fit.monoisotopic_peak.mz, best_node.fit.charge)) overlapped_nodes = list(best_node.overlap_edges) maximize = subgraph.maximize n = len(subgraph) while len(mask) != n: best_node = None best_score = 0 if maximize else float('inf') retained = [] for node in overlapped_nodes: if node.index in mask: continue missed_peaks = node.fit.missed_peaks = count_placeholders( node.fit.experimental) total_peaks = len(node.fit.experimental) invalid_peak_count = (missed_peaks >= total_peaks - 1 and abs(node.fit.charge) > 1 ) or missed_peaks == total_peaks if invalid_peak_count or missed_peaks > self.max_missed_peaks: mask.add(node.index) print("Masking %d (%0.3f, %d). Invalidated Peak Count %r | Missed Peaks %d" % ( node.index, node.fit.monoisotopic_peak.mz, node.fit.charge, invalid_peak_count, missed_peaks)) continue fit = node.fit fit.theoretical.normalize().scale(fit.experimental, self.scale_method) fit.score = self.scorer.evaluate( self.peaklist, fit.experimental, fit.theoretical.peaklist) if self.scorer.reject(fit): mask.add(node.index) continue else: retained.append(node) if maximize: if fit.score > best_score: best_node = node best_score = fit.score else: if fit.score < best_score: best_node = node best_score = fit.score if best_node is not None: peak = self._make_deconvoluted_peak( best_node.fit, charge_carrier) solutions.append(peak) if self.use_subtraction: self.subtraction( best_node.fit.theoretical, error_tolerance) mask.add(best_node.index) print("Masking %d (%0.3f, %d)" % ( best_node.index, best_node.fit.monoisotopic_peak.mz, best_node.fit.charge)) overlapped_nodes = [ node for node in best_node.overlap_edges if node.index not in mask] else: overlapped_nodes = [] if not overlapped_nodes and len(mask) != n: overlapped_nodes = [ node for node in subgraph if node.index not in mask] return solutions
[docs] def targeted_deconvolution(self, peak, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), left_search_limit=3, right_search_limit=3, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW): """Express the intent that this peak's deconvolution solution will be retrieved at a later point in the process and that it should be deconvoluted, and return a handle to retrieve the results with. As the operation does not immediately result in a deconvoluted peak but just adds the resulting fits to :attr:`peak_dependency_network`, this method constructs an instance of :class:`~.NetworkedTargetedDeconvolutionResult` which holds all the required information for recovering the best fit containing `peak`. Parameters ---------- peak : :class:`~.FittedPeak` The peak to start the search from error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) left_search_limit : int, optional The number of steps to search to the left of `peak`. Defaults to 3 right_search_limit : int, optional The number of steps to search to the right of `peak`. Defaults to 3 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| Returns ------- :class:`~.NetworkedTargetedDeconvolutionResult` """ self._explore_local( peak, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) result = NetworkedTargetedDeconvolutionResult(self, peak) self._priority_map[peak] = result return result
def _deconvolution_step(self, iteration_step, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), # pylint: disable=arguments-differ left_search_limit=1, right_search_limit=0, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW, **kwargs): if iteration_step != 0: self.peak_dependency_network.reset() self.populate_graph( error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below) self.postprocess_fits( charge_range=charge_range, charge_carrier=charge_carrier, error_tolerance=error_tolerance) self.select_best_disjoint_subgraphs(error_tolerance, charge_carrier) self._slice_cache.clear()
[docs] def deconvolute(self, error_tolerance=ERROR_TOLERANCE, charge_range=(1, 8), # pylint: disable=arguments-differ left_search_limit=1, right_search_limit=0, iterations=MAX_ITERATION, charge_carrier=PROTON, truncate_after=TRUNCATE_AFTER, ignore_below=IGNORE_BELOW, convergence=CONVERGENCE, **kwargs): """Completely deconvolute the spectrum. For each iteration, clear :attr:`peak_depencency_network`, then invoke :meth:`populate_graph` followed by :meth:`select_best_disjoint_subgraphs` to populate the resulting :class:`~.DeconvolutedPeakSet` Parameters ---------- error_tolerance : float, optional The parts-per-million error tolerance in m/z to search with. Defaults to |ERROR_TOLERANCE| charge_range : tuple, optional The range of charge states to consider. Defaults to (1, 8) order_chooser : callable, optional: A callable used as a key function for sorting peaks into the order they will be visited during deconvolution. Defaults to :obj:`operator.attrgetter("index")` left_search_limit : int, optional The number of steps to search to the left of :obj:`peak`. Defaults to 1 right_search_limit : int, optional The number of steps to search to the right of :obj:`peak`. Defaults to 0 charge_carrier : float, optional The mass of the charge carrier. Defaults to |PROTON| truncate_after : float, optional The percent of intensity to ensure is included in a theoretical isotopic pattern starting from the monoisotopic peak. This will cause theoretical isotopic patterns to be truncated, excluding trailing peaks which do not contribute substantially to the overall shape of the isotopic pattern. ignore_below : float, optional The minimum relative abundance to consider a peak in a theoretical isotopic pattern convergence : float, optional The threshold of the below which after the `(sum(intensity_before) - sum( intensity_after)) / sum(intensity_after)` Returns ------- :class:`~.DeconvolutedPeakSet` """ if not self.use_subtraction: iterations = 1 begin_signal = sum([p.intensity for p in self.peaklist]) i = 0 for i in range(iterations): if self.verbose: info("<== Starting Iteration %d ===================>", (i, )) self._deconvolution_step( i, error_tolerance=error_tolerance, charge_range=charge_range, left_search_limit=left_search_limit, right_search_limit=right_search_limit, charge_carrier=charge_carrier, truncate_after=truncate_after, ignore_below=ignore_below, **kwargs) end_signal = sum([p.intensity for p in self.peaklist]) + 1 if (begin_signal - end_signal) / end_signal < convergence: if self.verbose: info("(%0.4e - %0.4e) / %0.4e < %0.2g, Converged!", begin_signal, end_signal, end_signal, convergence) break begin_signal = end_signal else: if self.verbose: info("Did Not Converge.") if self.verbose: info("Finished Deconvolution in %d Iterations", (i + 1, )) if self.merge_isobaric_peaks: self._deconvoluted_peaks = self._merge_peaks( self._deconvoluted_peaks) return DeconvolutedPeakSet(list(self._deconvoluted_peaks)).reindex()
try: _has_c = True from ms_deisotope._c.deconvoluter_base import ( _explore_local as _c_explore_local, populate_graph as cpopulate_graph) PeakDependenceGraphDeconvoluterBase._explore_local = _c_explore_local PeakDependenceGraphDeconvoluterBase.populate_graph = cpopulate_graph except ImportError as e: _has_c = False