Source code for ms_deisotope.peak_dependency_network.peak_network

import operator
import logging

from collections import defaultdict

from .subgraph import ConnectedSubgraph
from .intervals import SpanningMixin, IntervalTreeNode
from ..utils import Base, TargetedDeconvolutionResultBase
from ..task import LogUtilsMixin

logger = logging.getLogger("ms_deisotope.scan_processor")


def ident(x):
    return x


class PeakNode(Base):
    """
    Represent a single FittedPeak and the junction of multiple
    IsotopicFitRecords which depend uponit

    Attributes
    ----------
    links : dict
        Mapping IsotopicFitRecords which depend upon this peak to those fit's scores
    peak : ms_peak_picker.FittedPeak
        The peak being depended upon
    """

    def __init__(self, peak, links=None):
        if links is None:
            links = {}
        self.peak = peak
        self.links = links
        self._hash = hash(self.peak)

    def __hash__(self):
        return self._hash

    def __eq__(self, other):
        try:
            return self.peak == other.peak
        except AttributeError:
            return False

    def __ne__(self, other):
        return not self == other

    def __contains__(self, fit):
        return fit in self.links

    def __repr__(self):
        return "PeakNode(%s, %s)" % (self.peak, self.links)


class DependenceCluster(SpanningMixin):
    """
    Represent a cluster of isotopic fits which are overlapping

    Attributes
    ----------
    dependencies : set
        The set of all fits which are codependent
    start: float
        The smallest mz of all elements of this cluster
    end: float
        The largest mz of all elements of this cluster
    maximize: bool
        Whether the objective is to maximize or minimize the
        isotopic fit score
    """

    def __init__(self, parent=None, dependencies=None, maximize=True):
        if parent is None:
            parent = self
        if dependencies is None:
            dependencies = []
        else:
            dependencies = sorted(dependencies, key=lambda x: x.score, reverse=maximize)
        self.parent = parent
        self.dependencies = dependencies
        self.rank = 0
        self.maximize = maximize
        self._reset()

    def _reset(self):
        self.start = self._start()
        self.end = self._end()
        self.best_fit = self._best_fit()

    def add(self, fit):
        """
        Adds a new IsotopicFitRecord to this cluster, and ensures the sorted
        property still holds

        Parameters
        ----------
        fit : IsotopicFitRecord
            The fit being added to the record

        """
        self.dependencies.append(fit)
        self.dependencies.sort(key=lambda x: x.score, reverse=self.maximize)
        self._reset()

    def disjoint_subset(self):
        graph = self.build_graph()
        return graph.find_heaviest_path()

    def build_graph(self):
        graph = ConnectedSubgraph(self.dependencies, maximize=self.maximize)
        return graph

    def _best_fit(self):
        """
        Retrieve the absolute best single fit for the cluster

        Returns
        -------
        name : IsotopicFitRecord
            The best scoring IsotopicFitRecord in this cluster
        """
        return self.dependencies[0]

    def disjoint_best_fits(self):
        """
        Compute the best set of disjoint isotopic fits spanning this cluster

        Returns
        -------
        list of IsotopicFitRecord
        """
        fit_sets = tuple(self.disjoint_subset())
        best_fits = fit_sets
        return [node.fit for node in best_fits]

    def _start(self):
        """
        Determine the first mz coordinate for members of this cluster

        Returns
        -------
        float
        """
        return min([member.experimental[0].mz for member in self.dependencies])

    def _end(self):
        """
        Determines the last mz coordinate for members of this cluster

        Returns
        -------
        float
        """
        return max([member.experimental[-1].mz for member in self.dependencies])

    def __len__(self):
        return len(self.dependencies)

    def __iter__(self):
        return iter(self.dependencies)

    def __contains__(self, fit):
        return fit in self.dependencies

    def __eq__(self, other):
        return self.dependencies == other.dependencies

    def __repr__(self):
        return "DependenceCluster(dependencies=[%s], start=%0.4f, end=%0.4f)" % (
            '\n'.join(map(str, self.dependencies[:10])), self.start, self.end)

    def __getitem__(self, i):
        return self.dependencies[i]

    def uses_mz(self, mz):
        for fit in self:
            for peak in fit.experimental:
                if peak.mz == mz:
                    return True
        return False

    def fits_using_mz(self, mz):
        fits = []
        for fit in self:
            for peak in fit.experimental:
                if peak.mz == mz:
                    fits.append(fit)
                    break
        return fits

try:
    _has_c = True
    from ms_deisotope._c.peak_dependency_network.peak_network import (
        PeakNode, DependenceCluster, PeakDependenceGraphBase)
except ImportError:
    _has_c = False

    class PeakDependenceGraphBase(object):

        def reset(self):
            # Keep a record of all clusters from previous iterations
            self._all_clusters.extend(
                self.clusters if self.clusters is not None else [])
            self.nodes = dict()
            self.dependencies = set()
            self._interval_tree = None
            self._populate_initial_graph()

        def _populate_initial_graph(self):
            for peak in self.peaklist:
                self.nodes[peak.index] = PeakNode(peak)

        def add_fit_dependence(self, fit_record):
            """Add the relatoinship between the experimental peaks
            in `fit_record` to the graph, expressed as a hyper-edge
            denoted by `fit_record`.

            This adds `fit_record` to :attr:`PeakNode.links` for each
            node corresponding to the :class:`~.FittedPeak` instance in
            :attr:`IsotopicFitRecord.experimental` of `fit_record`. It also
            adds `fit_record to :attr:`dependencies`

            Parameters
            ----------
            fit_record: :class:`~.IsotopicFitRecord`
            """
            for peak in fit_record.experimental:
                # Check for null peak
                if peak.index == 0:
                    continue
                self.nodes[peak.index].links[fit_record] = fit_record.score
            self.dependencies.add(fit_record)

        def nodes_for(self, fit_record, cache=None):
            if cache is None:
                return [self.nodes[p.index] for p in fit_record.experimental if p.peak_count >= 0]
            else:
                try:
                    return cache[fit_record]
                except KeyError:
                    cache[fit_record] = value = [
                        self.nodes[p.index] for p in fit_record.experimental if p.peak_count >= 0]
                    return value

        def drop_fit_dependence(self, fit_record):
            """Remove this fit from the graph, deleting all
            hyper-edges.
            """
            for node in self.nodes_for(fit_record):
                try:
                    del node.links[fit_record]
                except KeyError:
                    pass

        def best_exact_fits(self):
            """For each distinct group of experimental peaks, retain only
            the best scoring fit using exactly those peaks.
            """
            by_peaks = defaultdict(list)
            best_fits = []
            for fit in self.dependencies:
                by_peaks[tuple(fit.experimental)].append(fit)
            for _peak_tuple, fits in by_peaks.items():
                fits = sorted(fits, key=lambda x: x.score,
                              reverse=not self.maximize)
                for fit in fits[:-1]:
                    self.drop_fit_dependence(fit)
                best_fits.append(fits[-1])
            self.dependencies = set(best_fits)

        def _gather_independent_clusters(self, nodes_for_cache=None):
            clusters = defaultdict(set)
            if nodes_for_cache is None:
                nodes_for_cache = dict()
            seed_nodes = self.nodes.values()
            for seed_node in seed_nodes:
                # This peak is depended upon by each fit in `dependencies`
                dependencies = set(seed_node.links.keys())

                if len(dependencies) == 0:
                    continue

                # These fits also depend upon these other peaks, and those peaks are depended upon
                # for other fits in turn, which depend upon the assignment of this peak.
                for dep in list(dependencies):
                    for node in self.nodes_for(dep, nodes_for_cache):
                        dependencies |= clusters[node]

                # Create a fresh copy to share out again to avoid eliminate possible errors of shared storage.
                # This is likely unecessary under the current implementation.
                dependencies = set(dependencies)

                # Update all co-depended nodes with the full set of all fits which depend upon them
                for dep in dependencies:
                    for node in self.nodes_for(dep, nodes_for_cache):
                        clusters[node] = dependencies
            return clusters


def dependency_cluster_uses_mz(self, mz):
    for fit in self:
        for peak in fit.experimental:
            if peak.mz == mz:
                return True
    return False


def dependency_cluster_fits_using_mz(self, mz):
    fits = []
    for fit in self:
        for peak in fit.experimental:
            if peak.mz == mz:
                fits.append(fit)
                break
    return fits


[docs]class PeakDependenceGraph(PeakDependenceGraphBase, LogUtilsMixin): def __init__(self, peaklist, nodes=None, dependencies=None, max_missed_peaks=1, use_monoisotopic_superceded_filtering=True, maximize=True): if nodes is None: nodes = {} if dependencies is None: dependencies = set() self.peaklist = peaklist self.nodes = nodes self.dependencies = dependencies self.max_missed_peaks = max_missed_peaks self.use_monoisotopic_superceded_filtering = use_monoisotopic_superceded_filtering self.maximize = maximize if len(self.nodes) == 0: self._populate_initial_graph() self.clusters = None self._interval_tree = None self._solution_map = {} self._all_clusters = [] def add_solution(self, key, solution): self._solution_map[key] = solution @property def interval_tree(self): """An :class:`~.IntervalTreeNode` built over all dependency clusters that the :class:`PeakDependenceGraph` has seen over the course of all iterations. Raises ------ NoIsotopicClustersError: When no isotopic clusters, and in turn no :class:`DependenceCluster`s are found Returns ------- :class:`~.IntervalTreeNode` """ if self._interval_tree is None: # Build tree from both the current set of clusters # and all clusters from previous iterations self._interval_tree = IntervalTreeNode.build( self.clusters + self._all_clusters) if self._interval_tree is None: raise NoIsotopicClustersError( "Could not build intervals for peak retrieval with %d clusters" % len( self.clusters), self) return self._interval_tree def _find_fuzzy_solution_for(self, peak, shift=0.5): tree = self.interval_tree clusters = tree.contains_point(peak.mz + shift) if len(clusters) == 0: # No dependency clusters span this point. No signal? return None else: best_fits = [cluster.disjoint_best_fits() for cluster in clusters] acc = [] for fits in best_fits: acc.extend(fits) best_fits = acc index = 0 error = float('inf') for i, fit in enumerate(best_fits): err = abs(fit.monoisotopic_peak.mz - peak.mz) if err < error: error = err index = i fit = best_fits[index] try: return self._solution_map[fit] except KeyError: best_fits.sort(key=lambda f: abs(f.monoisotopic_peak.mz - peak.mz)) # The first index is the minimum error, which the loop above found, # but by virtue of the KeyError getting us here, we know it is not # present in the solution map. for f in best_fits[1:]: try: return self._solution_map[f] except KeyError: continue # No solution could be found return None def find_cluster_for(self, peak): tree = self.interval_tree if tree is None: raise TypeError( "This network hasn't constructed clusters yet. Call find_non_overlapping_intervals first!") clusters = tree.contains_point(peak.mz) for cluster in clusters: # by construction, only one cluster contains a particular peak if dependency_cluster_uses_mz(cluster, peak.mz): return cluster def find_solution_for(self, peak): """Find the best isotopic pattern fit which includes ``peak`` or is close enough to ``peak`` that it might have been selected instead. Parameters ---------- peak: :class:`~.FittedPeak` The peak to find the best fit for. Returns ------- :class:`~.DeconvolutedPeak` """ peak_node = self.nodes[peak.index] tree = self.interval_tree clusters = tree.contains_point(peak.mz) if not clusters: return self._find_fuzzy_solution_for(peak) best_fits = [cluster.disjoint_best_fits() for cluster in clusters] acc = [] for fits in best_fits: acc.extend(fits) best_fits = acc self.debug("For query m/z %f has candidates %r" % (peak.mz, best_fits)) # Extract only fits that use the query peak common = tuple(set(best_fits) & set(peak_node.links)) used_common = False if len(common) > 1 or not common: if len(common) > 1: self.debug("There is not exactly one solution for %r (%d)" % (peak, len(common))) # If there were no fits for this peak, then it may be that this peak # was not included in a fit. Try to find the nearest solution. fit = self._find_best_fit_by_weighted_distance(peak, best_fits) else: used_common = True fit = common[0] try: return self._solution_map[fit] except KeyError: self.debug( "The closest common fit was not chosen. The closest fit was common? %r" % ( used_common)) candidates = list(set(best_fits) & set(self._solution_map)) fit = self._find_best_fit_by_weighted_distance(peak, candidates) return self._solution_map[fit] def _find_best_fit_by_weighted_distance(self, peak, candidates): """Locate the fit that is nearest to ``peak`` in m/z space, weighted by the score of the fit such that given two fits with the same m/z, the better scoring fit is chosen. Parameters ---------- peak: :class:`~.FittedPeak` The peak to find solutions with reference to. candidates: :class:`list` A set of candidate :class:`~.IsotopicFitRecord` to compare between Returns ------- :class:`~.IsotopicFitRecord` """ i = 0 err = float('inf') for j, case in enumerate(candidates): case_err = abs(case.monoisotopic_peak.mz - peak.mz) if self.maximize: case_err /= case.score else: case_err /= (1. / (case.score + 1e-6)) if case_err < err: i = j err = case_err fit = candidates[i] return fit def claimed_nodes(self): """Return all nodes with at least one fit connecting it. Returns ------- :class:`set` """ peaks = set() for fit in self.dependencies: for peak in fit.experimental: if peak.peak_count < 0: continue peaks.add(self.nodes[peak.index]) return peaks def drop_superceded_fits(self): """Drop all fits where there is a better fit using the first fit's monoisotopic peak at the same charge state. """ suppressed = [] keep = [] if self.maximize: comparator = operator.lt else: comparator = operator.gt for dep in self.dependencies: mono_peak = dep.monoisotopic_peak if mono_peak.peak_count < 0: continue mono_peak_node = self.nodes[mono_peak.index] suppress = False for candidate in mono_peak_node.links: if dep.charge == candidate.charge: if mono_peak in candidate.experimental: if comparator(dep.score, candidate.score): suppress = True break if suppress: suppressed.append(dep) else: keep.append(dep) for dropped in suppressed: self.drop_fit_dependence(dropped) self.dependencies = set(keep) def drop_gapped_fits(self, n=None): """Discard any fit with more missed peaks than :attr:`max_missed_peaks`. """ if n is None: n = self.max_missed_peaks keep = [] for dep in self.dependencies: if dep.missed_peaks > n: self.drop_fit_dependence(dep) else: keep.append(dep) self.dependencies = set(keep) def find_non_overlapping_intervals(self): self.drop_gapped_fits() self.best_exact_fits() if self.use_monoisotopic_superceded_filtering: self.drop_superceded_fits() clusters = self._gather_independent_clusters() # Use an `id` keyed dictionary over these copied sets to ensure we have exactly one reference # to each set of inter-dependent fits, and then convert each set into an instance of `DependenceCluster` clusters = [DependenceCluster(dependencies=c, maximize=self.maximize) for c in { id(v): v for v in clusters.values() if v}.values()] clusters = sorted(clusters, key=operator.attrgetter("start")) self.clusters = clusters return clusters def __iter__(self): return iter(self.clusters) def __getitem__(self, i): return self.clusters[i] def __repr__(self): return "PeakDependenceGraph(%s, %d)" % (self.peaklist, len(self.dependencies))
PeakDependenceGraph.log_with_logger(logger)
[docs]class NetworkedTargetedDeconvolutionResult(TargetedDeconvolutionResultBase): """Stores the necessary information to retrieve the local optimal solution for a single peak for a deconvolution algorithm from the optimal fit containing :attr:`query_peak` in the set of disjoint best fits for the enclosing connected component Attributes ---------- query_peak : FittedPeak The peak queried with solution_peak : DeconvolutedPeak The optimal solution peak """ def __init__(self, deconvoluter, peak, *args, **kwargs): super(NetworkedTargetedDeconvolutionResult, self).__init__(deconvoluter, *args, **kwargs) self.query_peak = peak self.solution_peak = None def _get_solution(self): try: self.solution_peak = self.deconvoluter.peak_dependency_network.find_solution_for( self.query_peak) except IndexError: pass
[docs] def get(self): """Fetch the optimal solution after the computation has finished. Returns ------- DeconvolutedPeak """ self._get_solution() return self.solution_peak
class NoIsotopicClustersError(ValueError): def __init__(self, msg, graph=None, scan_id=None): ValueError.__init__(self, msg) self.graph = graph self.scan_id = scan_id