psims

Source code for psims.transform.mzml

'''
Transforming mzML Files
-----------------------

Often, we start with an mzML file we want to manipulate or change, but don't want to write out
explicitly unpacking it and re-packing it.

The :class:`MzMLTransformer` class is intended to give you a way to wrap an input file-like object
over an mzML file and an output file-like object to write the manipulated mzML file to, along with
a transformation function to modify spectra, and have it do the rest of the work. It uses :mod:`pyteomics.mzml`
to do the parsing internally.


Transformation Function Semantics
=================================

The transformation function passed receives a :class:`dict` object representing
the spectrum as parsed by :mod:`pyteomics.mzml` and expects the function to return
the dictionary modified or :const:`None` (in which case the spectrum is not written out).

You are free to modify existing keys in the spectrum dictionary, but *new* keys that are
intended to be recognized as either ``<cvParam />`` or ``<userParam />`` elements must
be instances of :class:`pyteomics.auxiliary.cvstr`, or otherwise have an "``accession``"
attribute to be picked up. Alternatively, the converter will make an effort to coerce keys
whose values which are scalars, or :class:`dict`s which look like parameters (having a "name"
or "accession" key, at least).

Alternatively, you can inherit from :class:`MzMLTransformer` and override :meth:`~.MzMLTransformer.format_spectrum`
to modify the spectrum before or after conversion (letting you directly append to the "params" key of the
converted spectrum and avoid needing to mark new params with :class:`cvstr`). Additionally, you
can override all other ``format_`` methods to customize how other elements are converted.


Usage and Examples
==================

In its simplest form, we would use the :class:`MzMLTransformer` like so:

.. code-block:: python

    from psims.transform.mzml import MzMLTransformer, cvstr

    def transform_drop_ms2(spectrum):
        if spectrum['ms level'] > 1:
            return None
        return spectrum

    with open("input.mzML", 'rb') as in_stream, open("ms1_only.mzML", 'wb') as out_stream:
        MzMLTransformer(in_stream, out_stream, transform_drop_ms2).write()



'''
from numbers import Number
from pyteomics import mzml

from psims import MzMLWriter, MzMLbWriter
from psims.utils import ensure_iterable

from .utils import TransformerBase, cvstr


class MzMLParser(mzml.MzML):

    def _handle_param(self, element, **kwargs):
        try:
            element.attrib["value"]
        except KeyError:
            element.attrib["value"] = ""
        return super(MzMLParser, self)._handle_param(element, **kwargs)

    def reset(self):
        super(MzMLParser, self).reset()
        self.seek(0)


def identity(x):
    return x


[docs]class MzMLTransformer(TransformerBase): """Reads an mzML file stream from :attr:`input_stream`, copying its metadata to :attr:`output_stream`, and then copies its spectra, applying :attr:`transform` to each spectrum object as it goes. If :attr:`sort_by_by_scan_time` is :const:`True`, then prior to writing spectra, a first pass will be made over the mzML file and the spectra will be written out ordered by ``MS:1000016|scan start time``. Attributes ---------- input_stream : file-like A byte stream from an mzML format data buffer output_stream : file-like A writable binary stream to copy the contents of :attr:`input_stream` into sort_by_scan_time : :class:`bool` Whether or not to sort spectra by scan time prior to writing transform : :class:`Callable`, optional A function to call on each spectrum, passed as a :class:`dict` object as read by :class:`pyteomics.mzml.MzML`. A spectrum will be skipped if this function returns :const:`None`. transform_description : :class:`str` A description of the transformation to include in the written metadata Parameters ---------- input_stream : path or file-like A byte stream from an mzML format data buffer output_stream : path or file-like A writable binary stream to copy the contents of :attr:`input_stream` into transform : :class:`Callable`, optional A function to call on each spectrum, passed as a :class:`dict` object as read by :class:`pyteomics.mzml.MzML`. transform_description : :class:`str` A description of the transformation to include in the written metadata sort_by_scan_time : :class:`bool` Whether or not to sort spectra by scan time prior to writing """ def __init__(self, input_stream, output_stream, transform=None, transform_description=None, sort_by_scan_time=False): if transform is None: transform = identity self.input_stream = input_stream self.output_stream = output_stream self.transform = transform self.transform_description = transform_description self.sort_by_scan_time = sort_by_scan_time self.reader = MzMLParser(input_stream, iterative=True) self.writer = MzMLWriter(output_stream) self.psims_cv = self.writer.get_vocabulary('PSI-MS').vocabulary def format_referenceable_param_groups(self): self.reader.reset() try: param_list = next(self.reader.iterfind("referenceableParamGroupList", recursive=True, retrive_refs=False)) param_groups = ensure_iterable(param_list.get("referenceableParamGroup", [])) except StopIteration: param_groups = [] return [self.writer.ReferenceableParamGroup.ensure(d) for d in param_groups] def format_instrument_configuration(self): self.reader.reset() configuration_list = next(self.reader.iterfind("instrumentConfigurationList", recursive=True)) configurations = [] for config_dict in configuration_list.get("instrumentConfiguration", []): components = [] for key, members in config_dict.pop('componentList', {}).items(): if key not in ("source", "analyzer", "detector"): continue if key == 'source': components.extend(self.writer.Source.ensure(m) for m in members) elif key == "analyzer": components.extend(self.writer.Analyzer.ensure(m) for m in members) elif key == 'detector': components.extend(self.writer.Detector.ensure(m) for m in members) components.sort(key=lambda x: x.order) software_reference = config_dict.pop("softwareRef", {}).get("ref") configuration = self.writer.InstrumentConfiguration( component_list=components, software_reference=software_reference, **config_dict) configurations.append(configuration) return configurations def format_data_processing(self): self.reader.reset() dpl = next(self.reader.iterfind("dataProcessingList", recursive=True)) data_processing = [] for dp_dict in dpl.get("dataProcessing", []): methods = [] for pm in dp_dict.pop("processingMethod", []): pm['software_reference'] = pm.pop("softwareRef") methods.append(self.writer.ProcessingMethod.ensure(pm)) dp_dict['processing_methods'] = methods data_processing.append(self.writer.DataProcessing.ensure(dp_dict)) return data_processing def copy_metadata(self): self.reader.reset() file_description = next(self.reader.iterfind("fileDescription")) source_files = file_description.get("sourceFileList").get('sourceFile') self.writer.file_description(file_description.get("fileContent", {}).items(), source_files) param_groups = self.format_referenceable_param_groups() if param_groups: self.writer.reference_param_group_list(param_groups) self.reader.reset() software_list = next(self.reader.iterfind("softwareList")) software_list = software_list.get("software", []) software_list.append(self._make_software()) self.writer.software_list(software_list) configurations = self.format_instrument_configuration() self.writer.instrument_configuration_list(configurations) # include transformation description here data_processing = self.format_data_processing() data_processing.append(self._make_data_processing_entry()) self.writer.data_processing_list(data_processing) def _make_software(self): description = { "id": "psims-MzMLTransformer", "params": [ self.writer.param("python-psims"), ] } return description def _make_data_processing_entry(self): description = { "id": "psims-MzMLTransformer-processing", "processing_methods": [ { "order": 1, "software_reference": "psims-MzMLTransformer", "params": ([self.transform_description] if self.transform_description else [] ) + ['conversion to mzML'], } ] } return description def format_scan(self, scan): scan_params = [] scan_window_list = [] scan_start_time = None temp = scan.copy() for key, value in list(temp.items()): if not hasattr(key, 'accession'): continue accession = key.accession if accession == '' or accession is None: scan_params.append({key: value}) if hasattr(value, 'unit_info'): scan_params[-1]['unit_name'] = value.unit_info temp.pop(key) continue term = self.psims_cv[accession] if term.is_of_type("scan attribute"): if term.name == 'scan start time': scan_start_time = { "name": term.id, "value": value, "unit_name": getattr(value, 'unit_info', None) } else: scan_params.append({"name": term.id, "value": value}) if hasattr(value, 'unit_info'): scan_params[-1]['unit_name'] = value.unit_info temp.pop(key) temp = temp.get('scanWindowList', {}).get('scanWindow', [{}])[0].copy() for key, value in list(temp.items()): if not hasattr(key, 'accession'): continue accession = key.accession term = self.psims_cv[accession] if term.is_of_type("selection window attribute"): scan_window_list.append( {"name": term.id, "value": value}) if hasattr(value, 'unit_info'): scan_window_list[-1]['unit_name'] = value.unit_info temp.pop(key) scan_window_list.sort(key=lambda x: x['value']) if len(scan_window_list) % 2 == 0: windows = [] i = 0 n = len(scan_window_list) while i < n: lo = scan_window_list[i] hi = scan_window_list[i + 1] windows.append((lo['value'], hi['value'])) i += 2 scan_window_list = windows else: scan_window_list = [] return scan_start_time, scan_params, scan_window_list def format_spectrum(self, spectrum): spec_data = dict() spec_data["mz_array"] = spectrum.pop("m/z array", None) spec_data["intensity_array"] = spectrum.pop("intensity array", None) spec_data["charge_array"] = spectrum.pop("charge array", None) spec_data['encoding'] = { "m/z array": spec_data["mz_array"].dtype.type if spec_data.get('mz_array') is not None else None, "intensity array": spec_data["intensity_array"].dtype.type if spec_data.get( 'intensity_array') is not None else None, "charge array": spec_data["charge_array"].dtype.type if spec_data.get( 'charge_array') is not None else None } spec_data['id'] = spectrum["id"] params = [] if "positive scan" in spectrum: spec_data['polarity'] = 1 elif "negative scan" in spectrum: spec_data['polarity'] = -1 else: spec_data['polarity'] = None temp = spectrum.copy() attrs_to_skip = {'id', 'index', 'sourceFileRef', 'defaultArrayLength', 'dataProcessingRef', 'count'} for key, value in list(temp.items()): accession = None if not hasattr(key, 'accession'): # Guess if this is looks like it could be a param tag or was added by the user if isinstance(value, dict) and ("name" in value or "accession" in value): pass elif isinstance(value, list): continue elif isinstance(value, (str, int, float, Number)) and key not in attrs_to_skip: pass else: continue else: accession = key.accession if accession == '' or accession is None: if isinstance(value, dict): params.append(value) else: params.append({key: value}) if hasattr(value, 'unit_info'): params[-1]['unit_name'] = value.unit_info temp.pop(key) else: term = self.psims_cv[accession] if term.is_of_type("spectrum representation"): spec_data["centroided"] = term.id == "MS:1000127" temp.pop(key) elif term.is_of_type("spectrum property") or term.is_of_type("spectrum attribute"): params.append({"name": term.id, "value": value}) if hasattr(value, 'unit_info'): params[-1]['unit_name'] = value.unit_info temp.pop(key) spec_data["scan_start_time"], spec_data['scan_params'], spec_data["scan_window_list"] = self.format_scan( spectrum.get("scanList", {}).get('scan', [{}])[0]) spec_data['params'] = params precursors = spectrum.get("precursorList", {}).get("precursor") if precursors: precursor_list = [] for prec in precursors: precursor_information = {} precursor_information['scan_id'] = prec.get("spectrumRef") ion = prec['selectedIonList'].get("selectedIon")[0] for key, value in list(ion.items()): term = self.psims_cv[key] if term.id == "MS:1000744": precursor_information['mz'] = value ion.pop(key) elif term.id == "MS:1000042": precursor_information['intensity'] = value ion.pop(key) elif term.id in ("MS:1000041", "MS:1000633"): precursor_information['charge'] = value ion.pop(key) precursor_information.setdefault("intensity", None) precursor_information.setdefault("charge", None) precursor_information['params'] = ion.items() precursor_information['activation'] = (prec.get('activation', {}).items()) precursor_information['isolation_window_args'] = prec.get("isolationWindow", None) precursor_list.append(precursor_information) else: precursor_list = None spec_data['precursor_information'] = precursor_list # attempt to find the instrumentConfiguration id to reference try: spec_data['instrument_configuration_id'] = spectrum.get( "scanList", {}).get("scan")[0].get("instrumentConfigurationRef") except IndexError: pass return spec_data def iterspectrum(self): self.reader.reset() if self.sort_by_scan_time: time_map = dict() self.log("Building Scan Time Map") for spectrum in self.reader.iterfind("spectrum"): time = self.reader._get_time(spectrum) time_map[spectrum['id']] = time self.reader.reset() by_time = sorted(time_map.items(), key=lambda x: x[1]) generate = (self.reader.get_by_id(spectrum_id) for spectrum_id, _ in by_time) return generate else: return self.reader.iterfind("spectrum")
[docs] def write(self): '''Write out the the transformed mzML file ''' writer = self.writer with writer: writer.controlled_vocabularies() self.copy_metadata() with writer.run(id="transformation_run"): with writer.spectrum_list(len(self.reader._offset_index)): self.reader.reset() for i, spectrum in enumerate(self.iterspectrum()): spectrum = self.transform(spectrum) if spectrum is None: continue self.writer.write_spectrum(**self.format_spectrum(spectrum)) if i % 1000 == 0: self.log("Handled %d spectra" % (i, )) self.log("Handled %d spectra" % (i, ))
[docs]class MzMLToMzMLb(MzMLTransformer): '''Convert an mzML document into an mzMLb file, with an optional transformation along the way. Parameters ---------- input_stream : path or file-like A byte stream from an mzML format data buffer output_stream : path or file-like A writable binary stream to copy the contents of :attr:`input_stream` into transform : :class:`Callable`, optional A function to call on each spectrum, passed as a :class:`dict` object as read by :class:`pyteomics.mzml.MzML`. transform_description : :class:`str` A description of the transformation to include in the written metadata sort_by_scan_time : :class:`bool` Whether or not to sort spectra by scan time prior to writing h5_compression : :class:`str`, optional The name of the HDF5 compression method to use. Defaults to :const:`psims.mzmlb.writer.DEFAULT_COMPRESSOR` h5_compression_opts : :class:`tuple` or :class:`int`, optional The configuration options for the selected compressor. For "gzip", this a single integer setting the compression level, while Blosc takes a tuple of integers. h5_blocksize : :class:`int`, optional The size of the compression blocks used when building the HDF5 file. Smaller blocks improve random access speed at the expense of compression efficiency and space. Defaults to 2 ** 20, 1MB. ''' def __init__(self, input_stream, output_stream, transform=None, transform_description=None, sort_by_scan_time=False, **hdf5args): ''' ''' if transform is None: transform = identity self.input_stream = input_stream self.output_stream = output_stream self.transform = transform self.transform_description = transform_description self.sort_by_scan_time = sort_by_scan_time self.reader = MzMLParser(input_stream, iterative=True) self.writer = MzMLbWriter(output_stream, **hdf5args) self.psims_cv = self.writer.get_vocabulary('PSI-MS').vocabulary