import os
import sys
import re
import logging
from urllib.request import urlopen, Request
from typing import Any, Dict, Hashable, Mapping, Callable, Optional, Union
from six import PY2
from psims.utils import ensure_iterable
from psims.controlled_vocabulary.entity import Entity
from psims.controlled_vocabulary.relationship import Reference
from .obo import OBOParser
from . import unimod
from .vendor import (
_use_vendored_bto_obo, _use_vendored_gno_obo, _use_vendored_go_obo,
_use_vendored_pato_obo, _use_vendored_psimod_obo, _use_vendored_psims_obo,
_use_vendored_unimod_xml, _use_vendored_unit_obo, _use_vendored_xlmod_obo)
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
fallback = {
("http://psidev.cvs.sourceforge.net/*checkout*/"
"psidev/psi/psi-ms/mzML/controlledVocabulary/psi-ms.obo"): _use_vendored_psims_obo,
("http://psidev.cvs.sourceforge.net/viewvc/*checkout*/"
"psidev/psi/psi-ms/mzML/controlledVocabulary/psi-ms.obo"): _use_vendored_psims_obo,
("https://raw.githubusercontent.com/HUPO-PSI/psi-ms-CV/master/psi-ms.obo"): _use_vendored_psims_obo,
"http://purl.obolibrary.org/obo/ms/psi-ms.obo": _use_vendored_psims_obo,
("http://obo.cvs.sourceforge.net/*checkout*/"
"obo/obo/ontology/phenotype/unit.obo"): _use_vendored_unit_obo,
("http://ontologies.berkeleybop.org/uo.obo"): _use_vendored_unit_obo,
"http://purl.obolibrary.org/obo/uo.obo": _use_vendored_unit_obo,
("http://ontologies.berkeleybop.org/pato.obo"): _use_vendored_pato_obo,
("https://raw.githubusercontent.com/HUPO-PSI/mzIdentML/master/cv/XLMOD.obo"): _use_vendored_xlmod_obo,
("http://www.brenda-enzymes.info/ontology/tissue/tree/update/update_files/BrendaTissueOBO"
): _use_vendored_bto_obo,
"http://purl.obolibrary.org/obo/go.obo": _use_vendored_go_obo,
"https://raw.githubusercontent.com/HUPO-PSI/psi-mod-CV/master/PSI-MOD.obo": _use_vendored_psimod_obo,
"http://purl.obolibrary.org/obo/gno.obo": _use_vendored_gno_obo,
}
def _default_import_resolver(url: str) -> Optional['ControlledVocabulary']:
if url.endswith("obo"):
obo_handle = obo_cache.resolve(url)
return ControlledVocabulary.from_obo(obo_handle)
def is_curie(text: Union[str, Reference]) -> bool:
if isinstance(text, Reference):
text = text.accession
if isinstance(text, str):
return re.match(r"(\S+):(\S+)", text)
else:
return False
[docs]class ControlledVocabulary(Mapping):
"""A Controlled Vocabulary is a collection
of terms or entities with controlled meanings
and semantics.
This object makes entities resolvable by name,
accession number, or synonym.
This object implements the :class:`~collections.abc.Mapping` protocol.
Attributes
----------
id : str
Unique identifier for this collection
metadata : dict
A mapping of metadata describing this controlled vocabulary
version : str
A string describing the version of this controlled vocabulary.
Not all vocabularies are versioned the same way, so this is value
is not interpreted further automatically.
id : str
An identifier for this controlled vocabulary that is unique within
a particular context
name : str
A human-friendly name for this controlled vocabulary
terms : dict
The storage for storing the primary mapping from term ID to terms
"""
version: str
name: str
id: str
metadata: Dict[str, Any]
import_resolver: Callable[[str], 'ControlledVocabulary']
terms: Dict[str, Entity]
type_definitions: Dict[str, Any]
imports: Dict[str, 'ControlledVocabulary']
[docs] @classmethod
def from_obo(cls, handle, **kwargs):
'''Construct a new instance from an OBO format stream.
Parameters
----------
handle : file-like
A file-like object over an OBO format.
Returns
-------
ControlledVocabulary
Raises
------
ValueError:
When the controlled vocabulary produced contains no terms
'''
parser = OBOParser(handle)
inst = cls(parser.terms, metadata=parser.header, version=parser.version, name=parser.name, **kwargs)
if len(parser.terms) == 0:
raise ValueError("Empty Vocabulary")
return inst
def __init__(self, terms, id=None, metadata=None, version=None, name=None, import_resolver: Optional[Callable[[str], 'ControlledVocabulary']]=None):
if metadata is None:
metadata = dict()
if version is None:
version = 'unknown'
if import_resolver is None:
import_resolver = _default_import_resolver
self.version = version
self.name = name
self.id = id
self.metadata = metadata
self.type_definitions = dict()
self._terms = dict()
self.terms = terms
self.import_resolver = import_resolver
self.imports = {}
def __getitem__(self, key):
'''A wrapper for :meth:`query`
'''
return self.query(key)
[docs] def query(self, key):
'''Search for a term whose id or name matches `key`, or if it is a synonym.
This search is case-insensitive, but case-matching is preferred.
Parameters
----------
key : str
The key to look up.
Returns
-------
term : :class:`~.Entity`
The found entity, if any.
Raises
------
KeyError :
If there is no match to any term in this vocabulary
See Also
--------
search
__getitem__
'''
if isinstance(key, Reference):
key = key.accession
if key in self.terms:
return self.terms[key]
elif key in self._names:
return self._names[key]
else:
try:
normalized_key = self.normalize_name(key)
if normalized_key in self._names:
return self._names[normalized_key]
except KeyError:
# Just to have a value to show.
normalized_key = key.lower()
lower_key = key.lower()
if lower_key in self._synonyms:
return self._synonyms[lower_key]
elif lower_key in self.terms:
return self.terms[lower_key]
elif lower_key in self._obsolete_names:
return self._obsolete_names[lower_key]
else:
if is_curie(key):
result = self._query_imported(key)
if result is not None:
return result
raise KeyError("%s and %s were not found." % (key, normalized_key)) from None
[docs] def search(self, query):
'''Search for any term containing the query in its id, name, or synonyms.
This algorithm uses substring containment and may return multiple hits,
and can be ambiguous when given a common or short substring. For exact
string matches, use :meth:`query`
Parameters
----------
query : str
The search query
Returns
-------
matched : list
The matched terms.
See Also
--------
query
'''
terms = {}
query = query.lower()
for key in self.terms:
if query in key.lower():
val = self.terms[key]
terms[val.id] = val
for key in self._names:
if query in key.lower():
val = self._names[key]
terms[val.id] = val
for key in self._synonyms:
if query in key.lower():
val = self._synonyms[key]
terms[val.id] = val
return sorted(terms.values(), key=lambda x: x.id)
def __repr__(self):
template = ("{self.__class__.__name__}(terms={size}, id={self.id}, "
"name={self.name}, version={self.version})")
return template.format(self=self, size=len(self.terms))
def __iter__(self):
return iter(self.terms)
def __len__(self):
return len(self.terms)
@property
def terms(self):
return self._terms
@terms.setter
def terms(self, value):
self._terms = dict(value or {})
self._reindex()
def _reindex(self):
self._build_names()
self._build_case_normalized()
self._build_synonyms()
self._bind_terms()
def _build_names(self):
self._names = {
v['name']: v for v in self.terms.values()
if not v.get("is_obsolete", False) and isinstance(v['name'], Hashable)
}
self._obsolete_names = {
v['name'].lower(): v for v in self.terms.values()
if v.get("is_obsolete", False) and isinstance(v['name'], Hashable)
}
def _bind_terms(self):
if PY2 or (sys.version_info.major == 3 and sys.version_info.minor < 6):
value_typed = []
for term in self.terms.values():
term.vocabulary = self
value_types = term.get('has_value_type')
if value_types:
value_typed.append(value_types)
for value_types in value_typed:
for value_type in value_types:
value_type.make_value_type(self)
else:
for term in self.terms.values():
term.vocabulary = self
value_types = term.get('has_value_type')
if value_types:
for value_type in value_types:
value_type.make_value_type(self)
def _build_synonyms(self):
self._synonyms = {}
for term in self.terms.values():
if term.get('synonym'):
for synonym in term.get('synonym'):
self._synonyms[synonym.lower()] = term
def _build_case_normalized(self):
self._normalized = {
v['name'].lower(): v['name']
for v in self.terms.values()
if isinstance(v['name'], str)
}
[docs] def keys(self):
return self.terms.keys()
[docs] def names(self):
'''A key-view over all the names in this controlled vocabulary, distinct
from accessions.
Returns
-------
collections.KeysView
'''
return self._names.keys()
[docs] def items(self):
return self.terms.items()
def normalize_name(self, name):
return self._normalized[name.lower()]
def _query_imported(self, query):
term = None
for url in ensure_iterable(self.metadata['import']):
if url in self.imports:
cv = self.imports[url]
else:
try:
logger.debug(f"Importing {url} for {self.name}")
cv = self.imports[url] = self.import_resolver(url)
except ValueError:
cv = self.imports[url] = None
if cv is None:
continue
try:
term = cv.query(query)
break
except KeyError:
continue
return term
DEFAULT_USER_AGENT = (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like'
' Gecko) Chrome/68.0.3440.106 Safari/537.36')
class VocabularyResolverBase(Callable):
def load(self, uri: str):
raise NotImplementedError()
def resolve(self, uri: str):
raise NotImplementedError()
def fallback(self, uri: str):
raise NotImplementedError()
def __call__(self, uri: str):
return self.resolve(uri)
[docs]class OBOCache(VocabularyResolverBase):
"""A cache for retrieved ontology sources stored on the file system, and an
abstraction layer to make registered controlled vocabularies constructable
from a URI even if they are not in the same format.
Attributes
----------
cache_exists : bool
Whether the cache directory exists
cache_path : str
The path to the cache directory
enabled : bool
Whether the cache will be used or not
resolvers : dict
A mapping from ontology URL to a function which will be called instead of
opening the URL to retrieve the :class:`ControlledVocabulary` object. A
resolver is any callable that takes only an :class:`OBOCache` instance as
a single argument.
use_remote : bool
Whether or not to try to access remote repositories over the network to
retrieve controlled vocabularies. If not, will automatically default to
either the cached copy or use the fallback value.
user_agent_emulation : bool
Whether or not to try to emulate a web browser's user agent when trying
to download a controlled vocabulary.
"""
default_resolvers = {}
def __init__(self, cache_path='.obo_cache', enabled=True, resolvers=None, use_remote=True,
user_agent_emulation=True):
self._cache_path = None
self.cache_path = cache_path
self.enabled = enabled
self.resolvers = resolvers or {}
self.use_remote = use_remote
self.user_agent_emulation = user_agent_emulation
self._register_default_resolvers()
def _register_default_resolvers(self):
for uri, resolver in self.default_resolvers.items():
self.set_resolver(uri, resolver)
@property
def cache_path(self):
return self._cache_path
@cache_path.setter
def cache_path(self, value):
self._cache_path = value
self.cache_exists = os.path.exists(self.cache_path)
[docs] def path_for(self, name, setext=False):
'''Construct a path for a given controlled vocabulary file
in the cache on the file system.
.. note::
If the cache directory does not exist, this will create it.
Parameters
----------
name : str
The name of the controlled vocabulary file
setext : bool
Whether or not to enforce the .obo extension
Returns
-------
path : str
The path in the file system cache to use for this name.
'''
if not self.cache_exists:
os.makedirs(self.cache_path)
self.cache_exists = True
name = os.path.basename(name)
if not name.endswith(".obo") and setext:
name += '.obo'
return os.path.join(self.cache_path, name)
def _open_url(self, uri):
try:
if not self.use_remote:
raise Exception("Fail fast!")
headers = {}
if self.user_agent_emulation:
headers['User-Agent'] = DEFAULT_USER_AGENT
req = Request(uri, headers=headers)
f = urlopen(req)
code = None
# The keepalive library monkey patches urllib2's urlopen and returns
# an object with a different API. First handle the normal case, then
# the patched case.
if hasattr(f, 'getcode'):
code = f.getcode()
elif hasattr(f, "code"):
code = f.code
else:
raise ValueError("Can't understand how to get HTTP response code from %r" % f)
if code != 200:
raise ValueError("%s did not resolve" % uri)
except Exception:
if uri in fallback:
f = fallback[uri]()
else:
raise ValueError(uri)
return f
[docs] def fallback(self, uri):
'''Obtain a stream for the vocabulary specified by `uri`
from the packaged bundle distributed with :mod:`psims`.
Parameters
----------
uri : str
The URI to retrieve a fallback stream for.
Returns
-------
result : file-like or :const:`None`
Returns a backup stream, or :const:`None` if no fallback exists.
'''
if uri in fallback:
f = fallback[uri]()
else:
logger.warning("Failed to locate fallback for %r", uri)
f = None
return f
[docs] def has_custom_resolver(self, uri):
'''Test if `uri` has a resolver function.
Parameters
----------
uri : str
The URI to test
Returns
-------
bool
'''
return uri in self.resolvers
[docs] def resolve(self, uri):
'''Get an readable file-like object for the controlled vocabulary referred
to by `uri`.
If `uri` has a custom resolver, by :meth:`has_custom_resolver`, the custom
resolver function will be called instead.
Parameters
----------
uri : str
The URI for the controlled vocabulary to access
Returns
-------
fp : object
If `uri` has a custom resolver, any type may be returned, otherwise a readable
file-like object in binary mode over the requested controlled vocabulary.
'''
if self.has_custom_resolver(uri):
return self.resolvers[uri](self)
try:
if self.enabled:
name = self.path_for(uri)
if os.path.exists(name) and os.path.getsize(name) > 0:
return open(name, 'rb')
else:
f = self._open_url(uri)
with open(name, 'wb') as cache_f:
n_chars = 0
for i, line in enumerate(f.readlines()):
n_chars += len(line)
cache_f.write(line)
if n_chars < 5:
raise ValueError("No bytes written")
if os.path.getsize(name) > 0:
return open(name, 'rb')
else:
raise ValueError("Failed to download file")
else:
f = self._open_url(uri)
return f
except ValueError:
import traceback
traceback.print_exc()
raise
def load(self, uri: str):
if self.has_custom_resolver(uri):
return self.resolvers[uri](self)
try:
fh = self.resolve(uri)
except ValueError:
fh = self.fallback(uri)
if fh is None:
raise ValueError(f"Failed to resolve {uri} or via its fall-back")
if uri.endswith("obo"):
cv = ControlledVocabulary.from_obo(fh, import_resolver=self.load)
return cv
else:
raise ValueError(f"Don't know how to load {uri}")
[docs] def set_resolver(self, uri, resolver):
'''Register a resolver callable for `uri`
Parameters
----------
uri : str
The URI to register the custom resolver for
resolver : Callable
A resolver is any callable that takes only an :class:`OBOCache` instance as
a single argument.
'''
self.resolvers[uri] = resolver
def __repr__(self):
return "OBOCache(cache_path=%r, enabled=%r, resolvers=%s)" % (
self.cache_path, self.enabled, self.resolvers)
def _make_relative_sqlite_sqlalchemy_uri(path):
return "sqlite:///%s" % path
def resolve_unimod(cache):
if cache.enabled:
path = _make_relative_sqlite_sqlalchemy_uri(
cache.path_for("unimod.db", False))
try:
return unimod.Unimod(path)
except IOError:
return unimod.Unimod(path, _use_vendored_unimod_xml())
else:
try:
return unimod.Unimod()
except IOError:
return unimod.Unimod(None, _use_vendored_unimod_xml())
OBOCache.default_resolvers.setdefault("http://www.unimod.org/obo/unimod.obo", resolve_unimod)
obo_cache = OBOCache(enabled=False)
def configure_obo_store(path):
if path is None:
obo_cache.enabled = False
else:
obo_cache.cache_path = path
obo_cache.enabled = True
def register_resolver(name, fn):
obo_cache.set_resolver(name, fn)
def load_psims():
try:
cv = obo_cache.resolve(
("http://purl.obolibrary.org/obo/ms/psi-ms.obo"))
return ControlledVocabulary.from_obo(cv)
except TypeError:
cv = _use_vendored_psims_obo()
return ControlledVocabulary.from_obo(cv)
def load_uo():
cv = obo_cache.resolve("http://purl.obolibrary.org/obo/uo.obo")
return ControlledVocabulary.from_obo(cv)
def load_pato():
cv = obo_cache.resolve("http://purl.obolibrary.org/obo/pato.obo")
return ControlledVocabulary.from_obo(cv)
def load_xlmod():
cv = obo_cache.resolve("https://raw.githubusercontent.com/HUPO-PSI/mzIdentML/master/cv/XLMOD.obo")
return ControlledVocabulary.from_obo(cv)
def load_unimod():
return obo_cache.resolve("http://www.unimod.org/obo/unimod.obo")
def load_bto():
cv = obo_cache.resolve("http://www.brenda-enzymes.info/ontology/tissue/tree/update/update_files/BrendaTissueOBO")
return ControlledVocabulary.from_obo(cv)
def load_go():
cv = obo_cache.resolve("http://purl.obolibrary.org/obo/go.obo")
return ControlledVocabulary.from_obo(cv)
def load_psimod():
cv = obo_cache.resolve("https://raw.githubusercontent.com/HUPO-PSI/psi-mod-CV/master/PSI-MOD.obo")
return ControlledVocabulary.from_obo(cv)
def load_gno():
cv = obo_cache.resolve("http://purl.obolibrary.org/obo/gno.obo")
return ControlledVocabulary.from_obo(cv)