import re
import warnings
from contextlib import contextmanager
from collections import deque
from typing import Any, Dict, Iterable, Optional, OrderedDict, Union
from lxml import etree
from six import string_types as basestring, add_metaclass, text_type, PY3
from psims.controlled_vocabulary.controlled_vocabulary import ControlledVocabulary
from . import controlled_vocabulary
from .controlled_vocabulary import obj_to_xsdtype, VocabularyResolverBase
from .validation import validate
def make_counter(start=1):
'''
Create a functor whose only internal piece of data is a mutable container
with a reference to an integer, `start`. When the functor is called, it returns
current `int` value of `start` and increments the mutable value by one.
Parameters
----------
start: int, optional
The number to start counting from. Defaults to `1`.
Returns
-------
int:
The next number in the count progression.
'''
start = [start]
def count_up():
'''A closure returning incrementally larger integers'''
ret_val = start[0]
start[0] += 1
return ret_val
return count_up
def camelize(name):
"""Adapts an attribute name from "snake_case" to "camelCase"
to make lookups on Element.attrib easier.
Parameters
----------
name : str
Attribute name
Returns
-------
str
transformed name
"""
parts = name.split("_")
if len(parts) > 1:
return ''.join([parts[0]] + [part.title() if part != "ref" else "_ref" for part in parts[1:]])
else:
return name
def id_maker(type_name, id_number):
'''Generate a consistent ID that is unique within a document,
assuming `id_number` is unique within the tag type.
'''
return "%s_%s" % (type_name.upper(), str(id_number))
def sanitize_id(string):
"""Remove characters from a string which would be invalid
in XML identifiers
Parameters
----------
string : str
Returns
-------
str
"""
string = re.sub(r"\s", '_', string)
string = re.sub(r"\\|/", '', string)
return string
NO_TRACK = object()
class ElementType(type):
"""A metaclass to keep a count of the number of times
an instance of each derived class is created.
"""
_cache = {}
def __new__(mcs, name, parents, attrs):
new_type = type.__new__(mcs, name, parents, attrs)
tag_name = attrs.get("tag_name")
if attrs.get("_track") is NO_TRACK:
return new_type
if not hasattr(mcs, "_cache"):
mcs._cache = dict()
mcs._cache[name] = new_type
if tag_name is not None:
mcs._cache[tag_name] = new_type
return new_type
def attrencode(o):
"""A simple function to convert most
basic python types to a string form
which is safe to serialize in XML
attributes
Parameters
----------
o : object
Attribute value to encode
Returns
-------
str:
The encoded value
"""
if isinstance(o, bool):
return text_type(o).lower()
else:
return text_type(o)
[docs]@add_metaclass(ElementType)
class TagBase(object):
"""Represent a single XML element with arbitrary attributes.
Mocks the :class:`Mapping` interface
Attributes
----------
attrs : dict
The attributes of the element. :meth:`__getattr__` falls back
to querying :attr:`attrs`, trying both requested name and ``camelCase``
versions of the name
is_open : bool
Whether the element has been opened as a context manager
tag_name : str
The name of the element
text : str
The body text of the element
type_attrs : dict
Attributes common to all elements of this type
id : str
The @id attribute of the element.
"""
type_attrs = {}
def __init__(self, tag_name=None, text="", **attrs):
self.tag_name = tag_name or self.tag_name
_id = attrs.pop('id', None)
_id_formatter = attrs.pop('id_formatter', id_maker)
self.attrs = {}
self.attrs.update(self.type_attrs)
self.text = text
self.attrs.update(attrs)
# When passing through a XMLWriterMixin.element() call, tags may be reconstructed
# and any set ids will be passed through the attrs dictionary, but the `with_id`
# flag won't be propagated. `_force_id` preserves this.
self._force_id = True
self._id_formatter = _id_formatter
if _id is None:
self._id_number = None
self._id_string = None
self._force_id = False
elif isinstance(_id, int):
self._id_number = _id
self._id_string = None
else:
self._id_number = None
self._id_string = _id
self.is_open = False
def __getattr__(self, key):
try:
return self.attrs[key]
except KeyError:
try:
return self.attrs[camelize(key)]
except KeyError:
raise AttributeError("%s has no attribute %s" % (self.__class__.__name__, key))
# Support Mapping Interface
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
raise KeyError(key)
def keys(self):
if self.id is not None:
yield "id"
for key in self.attrs:
yield key
def get(self, name, default):
return getattr(self, name, default)
@property
def id(self):
if self._id_string is None and self._id_number is not None:
self._id_string = self._id_formatter(self.tag_name, self._id_number)
return self._id_string
def element(self, xml_file=None, with_id=False):
"""Create an XML element using either a materialized :class:`lxml.etree.Element`
if ``xml_file`` is :const:`None` or the ephemeral context manager element
:class:`lxml.etree._FileWriterElement`.
The element will be constructed with all attributes in :attr:`attrs` where the
value is not :const:`None`.
Parameters
----------
xml_file : :class:`XMLWriterMixin`, optional
The XML writer to build the element for
with_id : bool, optional
Whether to require the ID attribute be present and rendered
Returns
-------
:class:`lxml.etree.Element` or :class:`lxml.etree._FileWriterElement`
Description
Raises
------
ValueError
Description
"""
with_id = with_id or self._force_id
attrs = {k: attrencode(v) for k, v in self.attrs.items() if v is not None}
if with_id:
if self.id is None:
raise ValueError("Required id for %r but id was None" % (self,))
attrs['id'] = self.id
if xml_file is None:
elt = etree.Element(self.tag_name, attrs)
if self.text:
elt.text = self.text
return elt
else:
return xml_file.element(self.tag_name, **attrs)
def write(self, xml_file, with_id=False):
"""Write this element to file
Parameters
----------
xml_file : :class:`XMLWriterMixin`
The XML writer to build the element for
with_id : bool, optional
Whether to require the ID attribute be present and rendered
See Also
--------
:meth:`element`
"""
el = self.element(with_id=with_id)
xml_file.write(el)
def bind(self, xml_file):
self._xml_file = xml_file
@contextmanager
def begin(self, xml_file, with_id=False):
with self.element(xml_file, with_id):
self.is_open = True
yield
self.is_open = False
def __call__(self, xml_file=None, with_id=False):
"""Alias of :meth:`element`
"""
return self.element(xml_file, with_id)
def __repr__(self):
return "<%s id=\"%s\" %s>" % (self.tag_name, self.id, " ".join("%s=\"%s\"" % (
k, attrencode(v)) for k, v in self.attrs.items()))
def __eq__(self, other):
try:
return self.attrs == other.attrs
except AttributeError:
return False
def __ne__(self, other):
try:
return self.attrs != other.attrs
except AttributeError:
return True
def __hash__(self):
return hash((self.tag_name, frozenset(self.attrs.items())))
def identity(x):
return x
def _make_tag_type(name, **attrs):
"""Creates a new TagBase-derived class dynamically at runtime.
The new type will be cached.
Parameters
----------
name : str
The tag name
**attrs : dict
Any class-wide attributes to include
Returns
-------
type
A TagBase subclass
"""
return type(name, (TagBase,), {"tag_name": name, "type_attrs": attrs})
[docs]def _element(_tag_name, *args, **kwargs):
"""Construct a subclass instance of :class:`TagBase` with the given
tag name. All other arguments are forwarded to the :class:`TagBase`
constructor
Parameters
----------
_tag_name : str
The name of the tag type to create
*args
Arbitrary arguments for the tag
**kwargs
Key word arguments for the tag
Returns
-------
:class:`TagBase`
"""
try:
eltype = ElementType._cache[_tag_name]
except KeyError:
eltype = _make_tag_type(_tag_name)
return eltype(*args, **kwargs)
[docs]def element(xml_file, _tag_name, *args, **kwargs):
"""Construct and immediately write a subclass instance of
:class:`TagBase` with the given tag name. All other arguments
are forwarded to the :class:`TagBase` constructor
Parameters
----------
xml_file : :class:`XMLWriterMixin`
The XML writer to write to
_tag_name : str
The name of the tag type to create
*args
Arbitrary arguments for the tag
**kwargs
Key word arguments for the tag
"""
with_id = kwargs.pop("with_id", False)
if isinstance(_tag_name, basestring):
el = _element(_tag_name, *args, **kwargs)
else:
el = _tag_name
return el.element(xml_file=xml_file, with_id=with_id)
class AttrProperty(object):
def __init__(self, name, transform=None):
if transform is None:
transform = self._default_transform
self.name = name
self.transform = transform
def _default_transform(self, value):
return value
def __get__(self, inst, cls=None):
return inst.attrs.get(self.name)
def __set__(self, inst, value):
inst.attrs[self.name] = self.transform(value)
def __delete__(self, inst):
try:
inst.attrs.pop(self.name)
except KeyError:
pass
class XSDTypingProperty(AttrProperty):
def __get__(self, inst, cls=None):
value = inst.attrs.get(self.name)
xsdtype = obj_to_xsdtype(value)
if xsdtype:
inst.attrs['type'] = xsdtype
return value
def __set__(self, inst, value):
inst.attrs[self.name] = self.transform(value)
value = inst.attrs[self.name]
xsdtype = obj_to_xsdtype(value)
if xsdtype:
inst.attrs['type'] = xsdtype
[docs]class CVParam(TagBase):
"""Represents a ``<cvParam />``
.. note::
This element holds additional data or annotation. Only controlled values
are allowed here
Attributes
----------
name: str
The human-readable name of the parameter
accession: str
The within-vocabulary unique identifier of the parameter
value: object
The value of the parameter, to be converted to text
"""
tag_name = "cvParam"
_track = NO_TRACK
@classmethod
def param(cls, name, value=None, **attrs):
if isinstance(name, cls):
return name
elif isinstance(name, (tuple, list)):
name, value = name
else:
if value is None:
return cls(name=name, **attrs)
else:
return cls(name=name, value=value, **attrs)
@staticmethod
def _normalize_units(attrs):
if 'unit_cv_ref' in attrs:
attrs["unitCvRef"] = attrs.pop("unit_cv_ref")
if 'unit_accession' in attrs:
attrs['unitAccession'] = attrs.pop("unit_accession")
if 'unit_name' in attrs:
attrs['unitName'] = attrs.pop("unit_name")
return attrs
def __init__(self, accession=None, name=None, ref=None, value=None, **attrs):
if ref is not None:
attrs["cvRef"] = ref
if accession is not None:
attrs["accession"] = accession
if name is not None:
attrs["name"] = name
if value is not None:
attrs['value'] = value
else:
attrs['value'] = ''
attrs = self._normalize_units(attrs)
super(CVParam, self).__init__(self.tag_name, **attrs)
self.patch_accession(accession, ref)
value = AttrProperty("value")
ref = AttrProperty("cvRef")
name = AttrProperty("name")
accession = AttrProperty("accession")
unit_name = AttrProperty("unitName")
unit_accession = AttrProperty("unitAccession")
unit_cv_ref = AttrProperty("unitCvRef")
def __call__(self, *args, **kwargs):
self.write(*args, **kwargs)
def __repr__(self):
return "<%s %s>" % (self.tag_name, " ".join("%s=\"%s\"" % (
k, str(v)) for k, v in self.attrs.items()))
def patch_accession(self, accession, ref):
if accession is not None:
if isinstance(accession, int):
accession = "%s:%d" % (ref, accession)
self.attrs['accession'] = accession
else:
self.attrs['accession'] = accession
[docs]class UserParam(CVParam):
"""Represents a ``<userParam />`` element
.. note::
Uncontrolled user parameters (essentially allowing free text). Before
using these, one should verify whether there is an appropriate CV term
available, and if so, use the CV term instead
"""
tag_name = "userParam"
accession = None
value = XSDTypingProperty('value')
type = AttrProperty('type')
def __init__(self, accession=None, name=None, ref=None, value=None, **attrs):
super(UserParam, self).__init__(
accession=accession, name=name, ref=ref, value=value, **attrs)
# get XSDTypingProperty to force property __get__
self.value
[docs]class ParamGroupReference(TagBase):
tag_name = "referenceableParamGroupRef"
def __init__(self, ref):
self.ref = ref
super(ParamGroupReference, self).__init__(self.tag_name, ref=ref)
def __call__(self, *args, **kwargs):
self.write(*args, **kwargs)
CVTypes = Union['CV', 'ProvidedCV']
class CVCollection(object):
"""A partially unique collection of :class:`CV` objects.
"""
storage: OrderedDict[str, CVTypes]
def __init__(self, cvs=None):
self.storage = OrderedDict()
if cvs:
self.update(cvs)
def add(self, cv: CVTypes):
"""Add `cv` to the collection.
If the :attr:`CV.id` is aleady present in the collection, a warning
will be issued.
Parameters
----------
cv : :class:`CV`
The controlled vocabulary reference
"""
if cv.id in self.storage:
old = self.storage[cv.id]
warnings.warn(
"Replacing {cv.id} {old.version}@{old.uri} with {cv.version}@{cv.uri}".format(
cv=cv, old=old))
self.storage[cv.id] = cv
return self
def update(self, collection: Iterable[CVTypes]):
"""Add each element of `collection` to `self`, calling :meth:`add` on each
element.
Parameters
----------
collection : :class:`~.Iterable`
Any iterable collection of :class:`CV`
See Also
--------
:meth:`add`
"""
for cv in collection:
self.add(cv)
append = add
extend = update
def copy(self):
"""Create a copy of `self`
Returns
-------
:class:`CVCollection`
"""
return self.__class__(self)
def __add__(self, other: Iterable[CVTypes]):
copy = self.copy()
copy.update(other)
return copy
def __iadd__(self, other):
self.update(other)
return self
def __getitem__(self, key):
return self.storage[key]
def __iter__(self):
return iter(self.storage.values())
def __len__(self):
return len(self.storage)
def __repr__(self):
template = "{self.__class__.__name__}({members})"
members = list(self)
return template.format(self=self, members=members)
[docs]class CV(object):
"""Represent a Controlled Vocabulary associated with the current document.
The controlled vocabulary referenced must specify a URI that will be used
to either download the definitions from, to be matched to a cache of pre-built
vocabularies, or to be matched with a set of special resolution rules for
handled by the :attr:`resolver`.
This object acts as a lazy-loading proxy for
:class:`~.controlled_vocabulary.ControlledVocabulary`.
Attributes
----------
full_name : str
The full name of the controlled vocabulary, which may or may not
be identical to the :attr:`id`
id : str
A short unique identifier for the controlled vocabulary
options : :class:`dict`
Additional information that may be used during resolution
resolver : :class:`~.VocabularyResolver`
The resolver which will handle all requests for this controlled vocabulary
uri : str
The location where the definition can be found
version : str
The version of the vocabulary resolved
vocabulary : :class:`~.ControlledVocabulary`
The parsed term graph defining this vocabulary
"""
full_name: str
id: str
uri: str
resolver: VocabularyResolverBase
options: Dict[str, Any]
_version: Optional[str]
_vocabulary: ControlledVocabulary
def __init__(self, full_name, id, uri, version=None, resolver=None, **kwargs):
self.full_name = full_name
self.id = id
self.uri = uri
self._version = version
self.options = kwargs
self._vocabulary = None
self.resolver = resolver
def __hash__(self):
return hash(self.uri)
def match(self, other):
return self.uri == other.uri and self.id == other.id and self.full_name == other.full_name
def __eq__(self, other):
return self.match(other) and self.version == other.version
def __ne__(self, other):
return not self == other
@property
def version(self):
if self._version is None:
try:
self._version = self.vocabulary.version
except AttributeError:
pass
return self._version
@property
def vocabulary(self):
if self._vocabulary is None:
self._vocabulary = self.load()
return self._vocabulary
def load(self, handle=None):
"""Load the vocabulary definition from source
Assumes that the definition is in OBO format
Parameters
----------
handle : file-like, optional
An optional file handle which can be read from directly.
Returns
-------
:class:`~.ControlledVocabulary`
The parsed vocabulary
Raises
------
KeyError
When all fallback mechanisms fail, a KeyError is raised
"""
resolver = self.resolver or controlled_vocabulary.obo_cache
if handle is None:
cv = resolver.load(self.uri)
else:
cv = controlled_vocabulary.ControlledVocabulary.from_obo(handle, import_handler=self.resolver.load)
try:
cv.id = self.id
except Exception:
import traceback
traceback.print_exc()
return cv
def __getitem__(self, key):
return self.vocabulary[key]
def query(self, *args, **kwargs):
return self.vocabulary.query(*args, **kwargs)
def __repr__(self):
template = "{self.__class__.__name__}({self.full_name!r}, {self.id!r}, {self.uri!r}, {version!r})"
version = self._version
if version is None:
version = "?"
return template.format(self=self, version=version)
[docs]class ProvidedCV(CV):
"""A wrapper around another object that provides the same basic interface
as :class:`CV` from that object, provided through the :attr:`converter`
function
Attributes
----------
converter : Callable
A function that converts elements of the provided vocabulary into
something matching the :class:`~.controlled_vocabulary.Entity` interface.
"""
def __init__(self, id, uri, converter=identity, **kwargs):
self.converter = converter
super(ProvidedCV, self).__init__(id=id, uri=uri, **kwargs)
def load(self, handle=None):
resolver = self.resolver or controlled_vocabulary.obo_cache
try:
cv = resolver.resolve(self.uri)
except ValueError:
cv = resolver.fallback(self.uri)
if cv is None:
raise LookupError(self.uri)
try:
cv.id = self.id
except Exception:
pass
return cv
def __getitem__(self, key):
return self.converter(super(ProvidedCV, self).__getitem__(key), self)
[docs]class XMLWriterMixin(object):
"""A mixin class to provide methods for writing
XML elements and aggregate Components.
Attributes
----------
verbose : bool
Controls debug printing
writer: lxml.etree._IncrementalFileWriter
The low-level XML writer used by :attr:`xmlfile`
"""
verbose = False
[docs] @contextmanager
def element(self, element_name, **kwargs):
"""Construct and immediately open a subclass instance of
:class:`TagBase` with the given tag name. All other arguments
are forwarded to the :class:`TagBase` constructor.
Parameters
----------
element_name: str
The name of the tag type to create
*args
Arbitrary arguments for the tag
**kwargs
Key word arguments for the tag
See Also
--------
:func:`element`
"""
if self.verbose:
print("In XMLWriterMixin.element", element_name, kwargs)
try:
if isinstance(element_name, basestring):
with element(self.writer, element_name, **kwargs):
yield
else:
with element_name.element(self.writer, **kwargs):
yield
except AttributeError:
if self.writer is None:
raise ValueError(
"This writer has not yet been created."
" Make sure to use this object as a context manager using the "
"`with` notation or by explicitly calling its __enter__ and "
"__exit__ methods.")
else:
raise
[docs] def write(self, *args, **kwargs):
"""Either write a complete XML sub-tree or add free text to the file stream
Parameters
----------
arg: str or :class:`lxml.etree.Element`
The entity to be written out.
"""
if self.verbose:
print(args[0])
try:
self.writer.write(*args, **kwargs)
except AttributeError:
if self.writer is None:
raise ValueError(
"This writer has not yet been created."
" Make sure to use this object as a context manager using the "
"`with` notation or by explicitly calling its __enter__ and "
"__exit__ methods.")
else:
raise
def flush(self):
self.writer.flush()
class XMLFormattingStreamWriter(object):
"""Wraps a writable stream with a layer emulating the
class:`lxml.etree.xmlfile` interface, save that it automatically
formats and indents the XML generated without requiring a separate
pass through the XML pretty printing processing
Attributes
----------
indent_chars : str
The characters to indent with
indent_level : int
The current indentation level
stream : :class:`io.IOBase`
The stream to wrap
writer : :class:`lxml.etree._IncrementalFileWriter`
The actual XML writer
wrote_text_stack : :class:`collections.deque`
A stack to track if a layer wrote text in one of its children and should not
have its end tag written on a new line
xmlfile : :class:`lxml.etree.xmlfile`
The actual XML writer's controller
"""
def __init__(self, stream, encoding=None, indent=' ', **kwargs):
self.stream = stream
self.xmlfile = etree.xmlfile(self.stream, encoding=encoding, buffered=False, **kwargs)
self.writer = None
self.indent_level = 0
self.indent_chars = indent
self.wrote_text_stack = deque()
def __enter__(self):
self.writer = self.xmlfile.__enter__()
return self
def __exit__(self, *args):
self.xmlfile.__exit__(*args)
def _indent_tag(self):
self.writer.write('\n' + (self.indent_chars * self.indent_level))
@contextmanager
def element(self, *args, **kwargs):
if self.indent_level > 0:
self._indent_tag()
elt = self.writer.element(*args, **kwargs)
elt.__enter__()
self.wrote_text_stack.append(False)
self.indent_level += 1
yield
self.indent_level -= 1
if not self.wrote_text_stack[-1]:
self._indent_tag()
elt.__exit__(None, None, None)
self.wrote_text_stack.pop()
def write(self, *args, **kwargs):
if isinstance(args[0], basestring if not PY3 else (str, bytes)):
self.wrote_text_stack[-1] = True
if not self.wrote_text_stack[-1]:
if self.indent_level > 0:
self._indent_tag()
self.writer.write(*args, **kwargs)
def flush(self):
self.writer.flush()
def write_doctype(self):
self.writer.write_doctype()
def write_declaration(self):
self.writer.write_declaration()
[docs]class XMLDocumentWriter(XMLWriterMixin):
"""A base class for types which are used to
write complete XML documents.
Attributes
----------
outfile : file
A writable file object
toplevel : TagBase
The top-level XML tag
xmlfile : lxml.etree.xmlfile
The XML formatter
writer : lxml.etree._IncrementalFileWriter
The low-level XML writer used by :attr:`xmlfile`
"""
[docs] @staticmethod
def toplevel_tag():
"""Overridable method to construct the appropriate
tag for :attr:`toplevel`
Returns
-------
TagBase
"""
return _element("xmldocument")
def __init__(self, outfile, close=None, encoding=None, **kwargs):
if encoding is None:
encoding = 'utf-8'
self.outfile = outfile
self.encoding = encoding
self.xmlfile = XMLFormattingStreamWriter(outfile, encoding=encoding, **kwargs)
self._writer = None
self.toplevel = None
self._ended = False
self._close = close
def _should_close(self):
if self._close is None:
return hasattr(self.outfile, 'close')
return bool(self._close)
@property
def writer(self):
if self._writer is None:
raise ValueError(
"The writer hasn't been started yet. Either use the object as a context manager"
" or call the begin() method")
return self._writer
@writer.setter
def writer(self, value):
self._writer = value
[docs] def begin(self):
"""Writes the doctype and starts the low-level writing machinery
"""
if self._has_begun():
return
self.writer = self.xmlfile.__enter__()
self.writer.write_declaration()
toplevel = self.toplevel_tag()
if isinstance(toplevel, TagBase):
toplevel_element = element(self.writer, toplevel)
else:
toplevel_element = toplevel
self.toplevel = toplevel_element
self.toplevel.__enter__()
def _has_begun(self):
return self.toplevel is not None
def __enter__(self):
"""Begins writing, opening the top-level tag
"""
self.begin()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Closes the top-level tag, the XML formatter,
and the file itself.
"""
self.end(exc_type, exc_value, traceback)
[docs] def end(self, exc_type=None, exc_value=None, traceback=None):
"""Ends the XML document, and flushes and closes the file
if appropriate.
"""
self.toplevel.__exit__(exc_type, exc_value, traceback)
self.writer.flush()
try:
self.xmlfile.__exit__(exc_type, exc_value, traceback)
except etree.SerialisationError as err:
# There seems to be a bug when `__exit__`ing xmlfile objects, usually
# when they get large see here: https://bugs.launchpad.net/lxml/+bug/1570388
# https://github.com/mobiusklein/psims/issues/7
if str(err).startswith('unknown error'):
warnings.warn("Error closing file: {}".format(err))
else:
raise
try:
self.flush()
except Exception:
pass
self._ended = True
if self._should_close():
self._do_close()
[docs] def controlled_vocabularies(self):
"""Write out the `<cvList>` element and all its children,
including both this format's default controlled vocabularies
and those passed as arguments to this method.this
This method requires writing to have begun.
"""
cvlist = self.CVList(self.vocabularies)
cvlist.write(self.writer)
def _do_close(self):
try:
self.outfile.close()
except AttributeError:
pass
def close(self):
if not self._ended:
self.end()
# end() will close the file if _should_close() returns True,
# otherwise the user must be asking to close explicitly.
if not self._should_close():
self._do_close()
def flush(self):
try:
self.outfile.flush()
except AttributeError:
self.writer.flush()
[docs] def validate(self):
"""Attempt to perform XSD validation on the XML document
this writer wrote
Returns
-------
bool:
Whether or not the document was valid
lxml.etree.XMLSchema:
The schema object where errors are logged
Raises
------
TypeError
When the file cannot be recovered from the writer object,
a :class:`TypeError` is thrown
"""
prev = None
try:
if isinstance(self.outfile, basestring):
fname = self.outfile
else:
fname = self.outfile.name
except AttributeError:
if hasattr(self.outfile, 'seek'):
prev = self.outfile.tell()
self.outfile.seek(0)
fname = self.outfile
else:
raise TypeError("Can't get file from %r" % (self.outfile,))
result, schema = validate(fname)
if prev is not None:
self.outfile.seek(prev)
return result, schema