"""mzML is a standard rich XML-format for raw mass spectrometry data storage.
This module provides :class:`MzMLLoader`, a :class:`~.RandomAccessScanSource`
implementation.
The parser is based on :mod:`pyteomics.mzml`.
"""
import io
from typing import Any, Dict, List, Tuple
import warnings
import zlib
import re
import hashlib
from six import string_types as basestring
import numpy as np
from pyteomics import mzml
from pyteomics.xml import unitfloat
from .common import (
PrecursorInformation, ScanDataSource,
ChargeNotProvided, ActivationInformation,
ScanAcquisitionInformation, ScanEventInformation,
ScanWindow, IsolationWindow,
FileInformation, SourceFile, MultipleActivationInformation,
ScanFileMetadataBase)
from .metadata.activation import (
supplemental_energy, UnknownDissociation)
from .metadata.instrument_components import (
InstrumentInformation, ComponentGroup, component,
instrument_models)
from .metadata.software import Software
from .metadata import file_information
from .metadata import data_transformation
from .metadata.sample import Sample
from .metadata.scan_traits import FAIMS_compensation_voltage
from .xml_reader import (
XMLReaderBase, iterparse_until,
get_tag_attributes, _find_section, in_minutes)
from ms_deisotope.data_source.metadata import software
def _open_if_not_file(obj, mode='rt', encoding='utf8'):
if obj is None:
return obj
if hasattr(obj, 'read'):
return obj
return open(obj, mode, encoding=encoding)
class _MzMLParser(mzml.MzML):
# we do not care about chromatograms
_indexed_tags = {'spectrum', }
def __init__(self, *args, **kwargs):
self._index_file_obj = _open_if_not_file(kwargs.pop("index_file", None))
super(_MzMLParser, self).__init__(*args, **kwargs)
def _handle_param(self, element, **kwargs):
if "value" not in element.attrib:
element.attrib['value'] = ''
return super(_MzMLParser, self)._handle_param(element, **kwargs)
def _determine_array_dtype(self, info):
dtype = None
types = {'32-bit float': np.float32, '64-bit float': np.float64,
'32-bit integer': np.int32, '64-bit integer': np.int64,
'null-terminated ASCII string': np.uint8}
for t, code in types.items():
if t in info:
dtype = code
del info[t]
break
# sometimes it's under 'name'
else:
if 'name' in info:
for t, code in types.items():
if t in info['name']:
dtype = code
info['name'].remove(t)
break
return dtype
def _check_has_byte_offset_file(self):
if self._index_file_obj is not None:
return True
return super(_MzMLParser, self)._check_has_byte_offset_file()
def _read_byte_offsets(self):
if self._index_file_obj is not None:
index = self._index_class.load(self._index_file_obj)
try:
self._index_file_obj.close()
except (AttributeError, OSError, IOError):
pass
self._offset_index = index
else:
super(_MzMLParser, self)._read_byte_offsets()
def _find_arrays(data_dict, decode=False):
arrays = dict()
for key, value in data_dict.items():
if " array" in key:
if decode:
if value.data:
arrays[key] = value.decode()
else:
arrays[key] = np.array([], dtype=value.dtype)
else:
arrays[key] = value
return arrays
try:
from ms_deisotope._c.units import _handle_param_fill_missing_value
_MzMLParser._handle_param = _handle_param_fill_missing_value
except ImportError:
pass
class MzMLDataInterface(ScanDataSource):
"""Provides implementations of all of the methods needed to implement the
:class:`ScanDataSource` for mzML files. Not intended for direct instantiation.
"""
def _stray_cvs(self, scan):
return scan.get("name", [])
def _scan_arrays(self, scan):
"""Returns raw data arrays for m/z and intensity
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
mz: np.array
An array of m/z values for this scan
intensity: np.array
An array of intensity values for this scan
"""
try:
decode = not self._decode_binary
except AttributeError:
decode = False
try:
arrays = _find_arrays(scan, decode=decode)
except zlib.error as zerr:
warnings.warn(
f"An error occurred while decompressing the spectrum data arrays for scan {self._scan_id(scan)}: {zerr}"
)
# Fall back assuming the missing key error handling below will just work *tm*
arrays = {}
try:
return arrays.pop('m/z array'), arrays.pop("intensity array"), arrays
except KeyError:
return np.array([]), np.array([])
def _get_selected_ion(self, scan):
try:
pinfo_dict = scan["precursorList"]['precursor'][0]["selectedIonList"]['selectedIon'][0]
except KeyError:
if "precursorList" in scan:
if 'precursor' in scan['precursorList']:
precursor = scan['precursorList']['precursor'][0]
if 'selectedIonList' not in precursor:
warnings.warn("No selected ions were found for precursor")
return None
return None
return pinfo_dict
def _precursor_information(self, scan):
"""Returns information about the precursor ion,
if any, that this scan was derived form.
Returns `None` if this scan has no precursor ion
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
PrecursorInformation
"""
pinfo_dict = self._get_selected_ion(scan)
if pinfo_dict is None:
return None
try:
precursor_scan_id = scan["precursorList"]['precursor'][0]['spectrumRef']
except KeyError:
precursor_scan_id = None
# only attempt to scan if there are supposed to be MS1 scans in the file
if self._has_ms1_scans() and self._use_index:
last_index = self._scan_index(scan) - 1
current_level = self._ms_level(scan)
i = 0
while last_index > 0 and i < 100:
prev_scan = self.get_scan_by_index(last_index)
if prev_scan.ms_level >= current_level:
last_index -= 1
else:
precursor_scan_id = self._scan_id(prev_scan._data)
break
i += 1
keys = set(pinfo_dict) - {"selected ion m/z", 'peak intensity', 'charge state'}
precursor = PrecursorInformation(
mz=pinfo_dict['selected ion m/z'],
intensity=pinfo_dict.get('peak intensity', 0.0),
charge=pinfo_dict.get('charge state', ChargeNotProvided),
precursor_scan_id=precursor_scan_id,
source=self,
product_scan_id=self._scan_id(scan),
annotations={
k: pinfo_dict[k] for k in keys
})
return precursor
def _scan_title(self, scan):
"""Returns a verbose name for this scan, if one
were stored in the file. Usually includes both the
scan's id string, as well as information about the
original file and format.
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
str
"""
try:
return scan["spectrum title"]
except KeyError:
return scan["id"]
def _scan_id(self, scan):
"""Returns the scan's id string, a unique
identifier for this scan in the context of
the data file it is recordered in
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
str
"""
return scan["id"]
def _scan_index(self, scan):
"""Returns the base 0 offset from the start
of the data file in number of scans to reach
this scan.
If the original format does not natively include
an index value, this value may be computed from
the byte offset index.
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
int
"""
return scan['index']
def _ms_level(self, scan):
"""Returns the degree of exponential fragmentation
used to produce this scan.
1 refers to a survey scan of unfragmented ions, 2
refers to a tandem scan derived from an ms level 1
ion, and so on.
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
int
"""
return int(scan['ms level'])
def _scan_time(self, scan):
"""Returns the time in minutes from the start of data
acquisition to when this scan was acquired.
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
float
"""
try:
time = scan['scanList']['scan'][0]['scan start time']
time = in_minutes(time)
return time
except KeyError:
return 0.0
def _is_profile(self, scan):
"""Returns whether the scan contains profile data (`True`)
or centroided data (`False`).
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
bool
"""
return "profile spectrum" in scan or "profile spectrum" in self._stray_cvs(scan)
def _polarity(self, scan):
"""Returns whether this scan was acquired in positive mode (+1)
or negative mode (-1).
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
int
"""
if "positive scan" in scan or "positive scan" in self._stray_cvs(scan):
return 1
elif "negative scan" in scan or "negative scan" in self._stray_cvs(scan):
return -1
def _activation(self, scan):
"""Returns information about the activation method used to
produce this scan, if any.
Returns `None` for MS1 scans
Parameters
----------
scan : Mapping
The underlying scan information storage,
usually a `dict`
Returns
-------
ActivationInformation
"""
try:
struct = dict(scan['precursorList']['precursor'][0]['activation'])
activation = None
activation_methods = []
for key in tuple(struct):
if key in ActivationInformation.dissociation_methods:
activation_methods.append(key)
struct.pop(key)
if activation_methods:
activation = activation_methods[0]
activation_methods = activation_methods[1:]
else:
activation = UnknownDissociation
energy = struct.pop("collision energy", -1)
if energy == -1:
energy = struct.pop("activation energy", -1)
supplemental_energy_ = struct.pop(supplemental_energy, -1)
if supplemental_energy_ != -1:
energies = [energy, supplemental_energy_]
else:
energies = [energy]
if len(activation_methods) == 0:
return ActivationInformation(activation, energy, struct)
else:
return MultipleActivationInformation(
[activation] + activation_methods, energies, struct)
except KeyError:
return None
def _isolation_window(self, scan):
try:
struct = dict(scan['precursorList']['precursor'][0]['isolationWindow'])
except KeyError:
return None
lower = struct.get("isolation window lower offset")
target = struct.get('isolation window target m/z')
upper = struct.get("isolation window upper offset")
if lower is upper is target is None:
upper = struct.get("isolation window upper limit")
lower = struct.get("isolation window lower limit")
if upper is lower is None:
if target is not None:
return IsolationWindow.make_empty(target)
else:
return None
else:
target = (upper - lower) / 2 + lower
upper = upper - target
lower = target - lower
elif target is None:
target = self._precursor_information(scan).mz
if lower is None:
lower = 0.0
if upper is None:
upper = 0.0
return IsolationWindow(lower, target, upper)
def _acquisition_information(self, scan):
scan_info = {}
scan_list_struct = scan['scanList']
combination = "unknown"
if "no combination" in scan_list_struct:
combination = "no combination"
elif "sum of spectra" in scan_list_struct:
combination = "sum of spectra"
elif "median of spectra" in scan_list_struct:
combination = "median of spectra"
elif "mean of spectra" in scan_list_struct:
combination = "mean of spectra"
scan_info['combination'] = combination
scan_info_scan_list = []
misplaced_FAIMS_value = scan.get(FAIMS_compensation_voltage.name, None)
for i, scan_ in enumerate(scan_list_struct.get("scan", [])):
scan_ = scan_.copy()
if misplaced_FAIMS_value is not None and i == 0:
scan[FAIMS_compensation_voltage.name] = misplaced_FAIMS_value
struct = {}
struct['start_time'] = scan_.pop('scan start time', unitfloat(0, 'minute'))
struct['injection_time'] = scan_.pop("ion injection time", unitfloat(0, 'millisecond'))
windows = []
for window in scan_.pop("scanWindowList", {}).get("scanWindow", []):
windows.append(ScanWindow(
window['scan window lower limit'],
window['scan window upper limit']))
struct['window_list'] = windows
scan_.pop("instrumentConfigurationRef", None)
struct['traits'] = scan_
scan_info_scan_list.append(ScanEventInformation(**struct))
scan_info['scan_list'] = scan_info_scan_list
return ScanAcquisitionInformation(**scan_info)
def _instrument_configuration(self, scan):
try:
scan_list_struct = scan['scanList']
reference = None
for scan in scan_list_struct.get("scan", []):
reference = scan.get("instrumentConfigurationRef")
if reference is None:
continue
if reference is None:
reference = self._run_information['defaultInstrumentConfigurationRef']
config = self._instrument_config[reference]
return config
except KeyError:
return None
def _annotations(self, scan):
annot = dict()
annot['base peak intensity'] = scan.get('base peak intensity')
annot['base peak m/z'] = scan.get('base peak m/z')
annot['lowest observed m/z'] = scan.get('lowest observed m/z')
annot['highest observed m/z'] = scan.get('highest observed m/z')
try:
annot['filter string'] = scan['scanList']['scan'][0].get('filter string')
except KeyError:
annot['filter string'] = None
annot['total ion current'] = scan.get('total ion current')
return annot
class _MzMLMetadataLoader(ScanFileMetadataBase):
def _collect_reference_groups(self):
params = next(iterparse_until(self._source, "referenceableParamGroupList", "run"))
self.reset()
if params is None:
return {}
params = self._source._get_info_smart(params)
param_groups = params.get("referenceableParamGroup", [])
by_id = {
g.pop('id'): g for g in param_groups
}
return by_id
def file_description(self):
"""Read the file metadata and provenance from the ``<fileDescription>`` tag
if it is present.
Returns
-------
FileInformation
The description of the file's contents and its sources
"""
desc = _find_section(self._source, "fileDescription")
contents = desc.get("fileContent", {})
if not isinstance(contents, dict):
contents = {k: '' for k in contents}
fi = FileInformation(contents, [])
for sf_data in desc.get('sourceFileList', {}).get("sourceFile", []):
sf_data = sf_data.copy()
sf_name = sf_data.pop('name', '')
sf_location = sf_data.pop('location', '')
sf_id = sf_data.pop('id', '')
# incorrectly merged parameters may contaminate the "name" attribute
if isinstance(sf_name, list):
temp = sf_name
sf_name = temp[0]
for d in temp[1:]:
sf_data[d] = ''
sf = SourceFile(
sf_name, sf_location, sf_id,
parameters=sf_data)
fi.add_file(sf)
return fi
def _convert_instrument(self, configuration):
group_collection = []
if "componentList" in configuration:
for category, groups in configuration['componentList'].items():
if category == 'count':
continue
for group in groups:
group = group.copy()
order = group.pop('order')
parts = [component(key) for key in group]
group_collection.append(ComponentGroup(category, parts, order))
conf_id = configuration['id']
serial_number = configuration.get("instrument serial number")
potential_models = [k for k in configuration if instrument_models.get(k) is not None]
if potential_models:
instrument_model = potential_models[0]
else:
instrument_model = None
acq_software = None
if 'softwareRef' in configuration:
software_list = self.software_list()
sw_id = configuration['softwareRef']['ref']
for sw in software_list:
if sw.id == sw_id:
acq_software = sw
break
config = InstrumentInformation(
conf_id, group_collection,
serial_number=serial_number,
model=instrument_model,
software=acq_software
)
return config
def instrument_configuration(self) -> List[InstrumentInformation]:
"""Read the instrument configurations settings from the
``<instrumentConfigurationList>``
Returns
-------
list of InstrumentConfiguration
A list of different instrument states that scans may be acquired under
"""
instrument_info_list = _find_section(self._source, "instrumentConfigurationList").get(
"instrumentConfiguration", [])
out = []
for instrument_info in instrument_info_list:
if "referenceableParamGroupRef" in instrument_info:
reference_params = self._collect_reference_groups()
for group in instrument_info.pop('referenceableParamGroupRef', []):
param_group = reference_params[group['ref']]
instrument_info.update(param_group)
out.append(self._convert_instrument(instrument_info))
return out
def software_list(self) -> List[software.Software]:
softwares = _find_section(self._source, "softwareList")
software_list: List[software.Software] = []
for software in softwares.get('software', []):
software_list.append(
Software(**software))
return software_list
def data_processing(self) -> List[data_transformation.DataProcessingInformation]:
data_processing_list = _find_section(self._source, "dataProcessingList")
processing_list = []
for processing_group in data_processing_list.get("dataProcessing", []):
dpinfo = data_transformation.DataProcessingInformation()
dpinfo.id = processing_group['id']
for method_group in processing_group['processingMethod']:
order = int(method_group.pop("order", 0))
software_id = method_group.pop("softwareRef", '')
method = data_transformation.ProcessingMethod(order=order, software_id=software_id)
for key, value in method_group.items():
if key == 'name':
key = value
value = ''
method.add(key, value)
dpinfo.methods.append(method)
processing_list.append(dpinfo)
return processing_list
def samples(self) -> List[Sample]:
"""Describe the sample(s) used to generate the mass spectrometry
data contained in this file.
Returns
-------
:class:`list` of :class:`~.Sample`
"""
sample_list = _find_section(self._source, "sampleList")
result = []
for sample_ in sample_list.get("sample", []):
name = sample_.pop("sampleName", None)
sample_.setdefault("name", name)
result.append(Sample(**sample_))
return result
def _get_run_attributes(self):
return get_tag_attributes(self.source, "run")
def _get_spectrum_list_attributes(self):
return get_tag_attributes(self.source, "spectrumList")
def checksum_mzml_stream(stream: io.IOBase) -> Tuple[bytes, bytes]:
"""Calculate the SHA1 checksum of an indexed mzML file for the purposes
of validating the checksum at the end of the file.
Parameters
----------
stream : file-like
A file-like object supporting a `read` method.
Returns
-------
calculated_checksum: bytes
The checksum calculated from the file's contents up to the first occurrence
of <fileChecksum>, exclusive.
obsesrved_checksum: bytes or :const:`None`
The checksum written in the file within the <fileChecksum> tag. Will be :const:`None`
if the tag is not found and closed. Expected to match the calculated checksum.
"""
hasher = hashlib.sha1()
target = b"<fileChecksum>"
target_pattern = re.compile(b"(" + target + b")")
extract_checksum = re.compile(br"<fileChecksum>\s*(\S+)\s*</fileChecksum>")
block_size = int(2 ** 12)
chunk = stream.read(block_size)
hit_target = False
observed_checksum = None
while chunk:
tokens = target_pattern.split(chunk)
for token in tokens:
hasher.update(token)
if token == target:
hit_target = True
chunk += stream.read(5000)
observed_checksum = extract_checksum.findall(chunk)
if observed_checksum:
observed_checksum = observed_checksum[0]
else:
observed_checksum = None
break
if hit_target:
break
chunk = stream.read(block_size)
return hasher.hexdigest(), observed_checksum
def read_file_checksum(stream: io.IOBase) -> bytes:
"""Read the stored file checksum of an indexedmzML file.
"""
stream.seek(-5000, 2)
chunk = stream.read(5001)
target = re.compile(br"<fileChecksum>\s*(\S+)\s*</fileChecksum>")
matches = target.findall(chunk)
return matches
[docs]class MzMLLoader(MzMLDataInterface, XMLReaderBase, _MzMLMetadataLoader):
"""Reads scans from PSI-HUPO mzML XML files. Provides both iterative and
random access.
Attributes
----------
source_file: str
Path to file to read from.
source: pyteomics.mzml.MzML
Underlying scan data source
"""
_parser_cls = _MzMLParser
_run_information: Dict[str, Any]
_spectrum_list_information: Dict[str, Any]
_decode_binary: bool
_use_index: bool
_instrument_config: Dict[str, InstrumentInformation]
_file_description: FileInformation
def __init__(self, source_file, use_index=True, decode_binary=True, index_file=None, **kwargs):
self.source_file = source_file
self._source = self._parser_cls(source_file, read_schema=True, iterative=True,
huge_tree=True, decode_binary=decode_binary,
use_index=use_index, index_file=index_file)
self.initialize_scan_cache()
self._use_index = use_index
self._decode_binary = decode_binary
self._run_information = self._get_run_attributes()
self._spectrum_list_information = self._get_spectrum_list_attributes()
self._instrument_config = {
k.id: k for k in self.instrument_configuration()
}
self._file_description = self.file_description()
self.make_iterator()
@property
def decode_binary(self) -> bool:
"""Whether or not to eagerly decode binary data arrays"""
return self._decode_binary
@decode_binary.setter
def decode_binary(self, value):
self._decode_binary = value
self._source.decode_binary = value
def _has_msn_scans(self):
has_msn_key = file_information.MS_MSn_Spectrum in self._file_description
has_ms1_key = file_information.MS_MS1_Spectrum in self._file_description
# we know for certain, return True
if has_msn_key:
return has_msn_key
# if we didn't find the key in the file description, if we didn't find
# the complementary key either, then the metadata may not have been populated
# so return indeterminate :const:`None`
if not has_ms1_key:
return None
# We know the metadata was populated, so the absence of the key can be trusted
# to mean this scan type is absent
return False
def _has_ms1_scans(self):
has_msn_key = file_information.MS_MSn_Spectrum in self._file_description
has_ms1_key = file_information.MS_MS1_Spectrum in self._file_description
# we know for certain, return True
if has_ms1_key:
return has_ms1_key
# if we didn't find the key in the file description, if we didn't find
# the complementary key either, then the metadata may not have been populated
# so return indeterminate :const:`None`
if not has_msn_key:
return None
# We know the metadata was populated, so the absence of the key can be trusted
# to mean this scan type is absent
return False
[docs] def has_msn_scans(self):
return self._has_msn_scans()
[docs] def has_ms1_scans(self):
return self._has_ms1_scans()
@property
def index(self):
return self._source.index['spectrum']
def _validate(self, scan):
return "m/z array" in scan._data
def _yield_from_index(self, scan_source, start):
offset_provider = scan_source._offset_index['spectrum']
keys = list(offset_provider.keys())
if start is not None:
if isinstance(start, basestring):
start = keys.index(start)
elif isinstance(start, int):
# start already refers to the correct value
pass
else:
raise TypeError(f"Cannot start from object {start}")
else:
start = 0
for key in keys[start:]:
yield scan_source.get_by_id(key)
def __reduce__(self):
return self.__class__, (self.source_file, self._use_index, self._decode_binary)
def __len__(self):
try:
return super(MzMLLoader, self).__len__()
except TypeError:
return int(self._spectrum_list_information['count'])