Source code for MSIGen.mzml

"""
This module contains a subclass of the base MSIGen class for handling files with the .mzml file extension.
This can handle files with or without ion mobility data, and with or without MS2 data.
This has been tested on the following file formats converted using MSConvert.
    Thermo .raw files that contain MS1 or MS2 data and do not contain ion mobility data.
    Agilent .d files containing MS1 or MS2 data with or without ion mobility data.
    Bruker .d files of .tsf format containing MS1 or MS2 data.
    Bruker .d files of .baf format containing MS1 data.
    Bruker .d files of .tdf format containing ion mobility data and MS1 or MS2 data.
.mzml files from other sources may or may not be processed as expected.
"""

from MSIGen.base_class import MSIGen_base

# mzML access
import pymzml

import numpy as np
from tqdm import tqdm
from scipy.interpolate import interpn
from time import time

[docs] class MSIGen_mzml(MSIGen_base): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def load_files(self, *args, **kwargs): """Processes the data files based on the MS level and whether ion mobility data are present.""" if (not self.is_MS2) and (not self.is_mobility): return self.ms1_no_mob(*args, **kwargs) elif (self.is_MS2) and (not self.is_mobility): return self.ms2_no_mob(*args, **kwargs) elif (not self.is_MS2) and (self.is_mobility): return self.ms1_mob(*args, **kwargs) else: return self.ms2_mob(*args, **kwargs)
# ====================================================================== # MS1 without mobility # ======================================================================
[docs] def ms1_no_mob(self, metadata=None, in_jupyter = None, testing = None, gui=None, pixels_per_line = None, tkinter_widgets = None, **kwargs): """ Data processing for .mzml files with only MS1 data. Args: metadata (dict): Metadata dictionary to store instrument information. Overwrites self.metadata if provided. in_jupyter (bool): Flag indicating if the code is running in a Jupyter notebook. Overwrites self.in_jupyter if provided. testing (bool): Flag for testing mode. Overwrites self.testing if provided. gui (bool): Flag for GUI mode. Overwrites self.gui if provided. pixels_per_line (int): Number of pixels per line for the output image. Overwrites self.pixels_per_line if provided. tkinter_widgets: Tkinter widgets for GUI progress bar. Overwrites self.tkinter_widgets if provided. Returns: metadata (dict): Updated metadata dictionary with instrument information. pixels_aligned (np.ndarray): 3D array of intensity data of shape (m/z+1, lines, pixels_per_line). """ # unpack variables. Any other kwargs are ignored. for i in [("in_jupyter", in_jupyter), ("testing", testing), ("gui", gui), ("pixels_per_line", pixels_per_line), ("tkinter_widgets", tkinter_widgets), ("metadata", metadata)]: if i[1] is not None: setattr(self, i[0], i[1]) # monitor progress on gui self.progressbar_start_preprocessing() # get mass windows MS1_list, _, MS1_polarity_list, _, _, _, _, mass_list_idxs = self.mass_list lb, _, _, _, _, _, _ = self.lower_lims ub, _, _, _, _, _, _ = self.upper_lims # monitor progress on gui self.progressbar_start_extraction() # initiate accumulator pixels = [] rts = [] for i, file_dir in tqdm(enumerate(self.line_list), total = len(self.line_list), desc='Progress through lines', disable = (self.testing or self.gui)): with pymzml.run.Reader(file_dir, obo_version = '4.1.9') as reader: # if i == 0: # self.metadata = get_basic_instrument_metadata_mzml_no_mob(line_list, data, self.metadata) # grab headers for all scans num_spe = reader.get_spectrum_count() assert num_spe>0, 'Data from file {} is corrupt, not present, or not loading properly'.format(file_dir) line_rts = np.zeros(num_spe) line_pixels = np.zeros((num_spe, len(lb)+1)) for j, spectrum in enumerate(reader): # Update gui variables self.progressbar_update_progress(num_spe, i, j) # save scan time and TICs line_rts[j] = spectrum.scan_time[0] TIC = spectrum.TIC line_pixels[j,0] = TIC # get mz and intensity values mz = spectrum.mz intensity_points = np.append(spectrum.i,0) pixel = self.extract_masses_no_mob(mz, lb, ub, intensity_points) line_pixels[j,1:] = pixel pixels.append(line_pixels) rts.append(line_rts) self.metadata['average_start_time'] = np.mean([i[0] for i in rts]) self.metadata['average_end_time'] = np.mean([i[-1] for i in rts]) self.rts = rts pixels_aligned = self.ms1_interp(pixels, mass_list = MS1_list) return self.metadata, pixels_aligned
# ====================================================================== # MS1 with mobility # ====================================================================== # MAKE SURE MOBILITY DATA ARE COMBINED WHEN USING MSCONVERT
[docs] def mzml_ms1_mob(self, metadata=None, in_jupyter = None, testing = None, gui=None, pixels_per_line = None, tkinter_widgets = None, **kwargs): """ Data processing from .mzml files with only MS1 data and ion mobility data. When using MSConvert to create this .mzml file, the option "combine ion mobility scans" must be checked for MSIGen to read the data properly. Args: metadata (dict): Metadata dictionary to store instrument information. Overwrites self.metadata if provided. in_jupyter (bool): Flag indicating if the code is running in a Jupyter notebook. Overwrites self.in_jupyter if provided. testing (bool): Flag for testing mode. Overwrites self.testing if provided. gui (bool): Flag for GUI mode. Overwrites self.gui if provided. pixels_per_line (int): Number of pixels per line for the output image. Overwrites self.pixels_per_line if provided. tkinter_widgets: Tkinter widgets for GUI progress bar. Overwrites self.tkinter_widgets if provided. Returns: metadata (dict): Updated metadata dictionary with instrument information. pixels_aligned (np.ndarray): 3D array of intensity data of shape (m/z+1, lines, pixels_per_line). """ # unpack variables. Any other kwargs are ignored. for i in [("in_jupyter", in_jupyter), ("testing", testing), ("gui", gui), ("pixels_per_line", pixels_per_line), ("tkinter_widgets", tkinter_widgets), ("metadata", metadata)]: if i[1] is not None: setattr(self, i[0], i[1]) # monitor progress on gui self.progressbar_start_preprocessing() MS1_list, MS1_mob_list, MS1_polarity_list, _, _, _, _, mass_list_idxs = self.mass_list # monitor progress on gui self.progressbar_start_extraction() pixels_meta = [] rts = [] # Get the mass ranges and determine the minimum and max mass and mobility values that may be used. mz_lb, mz_ub = self.lower_lims[0], self.upper_lims[0] mob_lb, mob_ub = self.lower_lims[1], self.upper_lims[1] mz_min, mz_max = np.min(mz_lb), np.max(mz_ub) mob_min, mob_max = np.min(mob_lb), np.max(mob_ub) for i, file_dir in enumerate(self.line_list): with pymzml.run.Reader(file_dir, obo_version = '4.1.9') as reader: # Initialize data collector for the line num_spe = reader.get_spectrum_count() assert num_spe>0, 'Data from file {} is corrupt, not present, or not loading properly'.format(file_dir) line_pixels = np.zeros((num_spe, len(MS1_list)+1)) line_acq_times = np.zeros((num_spe)) for j, spec in tqdm(enumerate(reader), desc = "progress thru line {linenum} of {total_lines}".format(linenum = i+1, total_lines = len(self.line_list)), total = num_spe, delay = 0.05, disable = (self.testing or self.gui)): # Update gui variables self.progressbar_update_progress(num_spe, i, j) # Get TIC and retention time line_acq_times[j] = spec.scan_time[0] line_pixels[j,0] = spec.TIC # get spectrum intensity_points = spec.i mzs = spec.mz # Get mobility values all_present_arrays = spec.get_all_arrays_in_spec() if "mean inverse reduced ion mobility array" in all_present_arrays: mobs = spec.get_array("mean inverse reduced ion mobility array") else: try: with self.HiddenPrints(): mobs = spec.get_array("raw ion mobility array") except: mobs = None if mobs is None: mobs = np.zeros(mzs.shape) # remove peaks with 0 intensity for faster slicing zeros_mask = np.where(intensity_points!=0)[0] # print(zeros_mask,type(intensity_points), type(mzs), type(mobs)) intensity_points = intensity_points[zeros_mask] mzs = mzs[zeros_mask] mobs = mobs[zeros_mask] # remove peaks outside of potential mass and mobility ranges for faster slicing mz_mob_mask = np.where((mzs>mz_min)&(mzs<mz_max)&(mobs>mob_min)&(mobs<mob_max)) intensity_points = intensity_points[mz_mob_mask] mzs = mzs[mz_mob_mask] mobs = mobs[mz_mob_mask] # Select each peak based on corresponding mass/mobility window for k in range(len(mz_lb)): selected_idxs_mask = np.where((mzs>mz_lb[k])&(mzs<mz_ub[k])&(mobs>mob_lb[k])&(mobs<mob_ub[k])) line_pixels[j,k+1] = np.sum(intensity_points[selected_idxs_mask]) pixels_meta.append(line_pixels) rts.append(line_acq_times) self.metadata['average_start_time'] = np.mean([i[0] for i in rts]) self.metadata['average_end_time'] = np.mean([i[-1] for i in rts]) self.rts = rts pixels_aligned = self.ms1_interp(pixels_meta, mass_list = MS1_list) return self.metadata, pixels_aligned
# ====================================================================== # MS2 # ======================================================================
[docs] def check_dim(self, ShowNumLineSpe=False): """ Gets the acquisition times and other information about each scan to decide what mass list entries can be obtained from each scan. Returns: acq_times (list): A list of acquisition times for each line. filter_list (list): A list of information that would be included in Thermo-style filter strings for each line. """ # determine the filetype given acq_times = [] filter_list = [] for file_dir in self.line_list: with pymzml.run.Reader(file_dir, obo_version = '4.1.9') as reader: num_spe = reader.get_spectrum_count() assert num_spe>0, 'Data from file {} is corrupt, not present, or not loading properly'.format(file_dir) # Get Start times, end times, number of spectra per line, and list of unique filters. line_acq_times = [] line_filter_list = [] for spectrum in reader: # get mass fragmentation level as an integer level_int = spectrum.ms_level # Match formatting of other workflows if level_int == 1: level = 'MS1' else: level = 'MS2' # only check for precursors and collision energy if MS2 scan if level == 'MS2': mz = spectrum.selected_precursors[0]['mz'] energy = spectrum['collision energy'] else: mz = 0.0 energy = 0.0 if energy == None: energy = 0.0 # Check polarity if spectrum['positive scan'] == True: polarity = '+' elif spectrum['negative scan'] == True: polarity = '-' else: polarity = '' # get retention time information line_acq_times.append(spectrum.scan_time[0]) # Get mass range mass_range_start, mass_range_end = spectrum['scan window lower limit'], spectrum['scan window upper limit'] if not (mass_range_start and mass_range_end): mass_range_start, mass_range_end = spectrum.extremeValues('mz') # if no mobility present if not self.is_mobility: # save filter information line_filter_list.append([mz, energy, level, polarity, mass_range_start, mass_range_end]) # if mobility present else: mob_range_start, mob_range_end = self.get_mobility_range_from_mzml_spectrum(spectrum) line_filter_list.append([mz, energy, level, polarity, mass_range_start, mass_range_end, mob_range_start, mob_range_end]) # warnings for using the incorrect is_ms2 and is_mobility values if any([i in ["mean inverse reduced ion mobility array", "raw ion mobility array"] for i in spectrum.get_all_arrays_in_spec()]): if not self.is_mobility: raise Warning("The data file used contains mobility data but is_mobility is set to False") if 'MS2' in np.array(line_filter_list): if not self.is_MS2: raise Warning("The data file used contains MS2 data but is_MS2 is set to False") filter_list.append(line_filter_list) acq_times.append(line_acq_times) num_spe_per_line = [len(i) for i in acq_times] # show results if ShowNumLineSpe: print('\nline scan spectra summary\n# of lines is: {}\nmean # of spectra is: {}\nmin # of spectra is: {}\nmean start time is {}\nmean end time is: {}'.format( len(num_spe_per_line), int(np.mean(num_spe_per_line)), int(np.min(num_spe_per_line)),np.mean([i[0] for i in acq_times]),np.mean([i[-1] for i in acq_times]))) return acq_times, filter_list
[docs] def get_ScansPerFilter(self, filters_info, all_filters_list, filter_inverse, display_tqdm = False): """Determines the number of scans that use a specific filter group""" # unpack filters_info filter_list = filters_info[0] # accumulator scans_per_filter = np.zeros((len(all_filters_list), len(filter_list)), dtype = int) # used to separate the filter_list into each line counter = 0 for i in tqdm(range(len(all_filters_list)), disable = not display_tqdm): # Get each filter for j in filter_inverse[counter: counter + len(all_filters_list[i])]: # count on scans_per_filter[i,j]+=1 return scans_per_filter
# ====================================================================== # MS2 without mobility # ======================================================================
[docs] def mzml_ms2_no_mob(self, metadata=None, normalize_img_sizes=None, in_jupyter=None, testing=None, gui=None, pixels_per_line=None, tkinter_widgets=None, **kwargs): """ Data processing for .mzml files that contain MS2 data. Args: metadata (dict): Metadata dictionary to store instrument information. Overwrites self.metadata if provided. normalize_img_sizes (bool): Flag indicating if image sizes should be normalized. Overwrites self.normalize_img_sizes if provided. in_jupyter (bool): Flag indicating if the code is running in a Jupyter notebook. Overwrites self.in_jupyter if provided. testing (bool): Flag for testing mode. Overwrites self.testing if provided. gui (bool): Flag for GUI mode. Overwrites self.gui if provided. pixels_per_line (int): Number of pixels per line for the output image. Overwrites self.pixels_per_line if provided. tkinter_widgets: Tkinter widgets for GUI progress bar. Overwrites self.tkinter_widgets if provided. Returns: metadata (dict): Updated metadata dictionary with instrument information. pixels_aligned (np.ndarray): 3D array of intensity data of shape (m/z+1, lines, pixels_per_line) or list of ion image arrays of shape (height, width). """ # unpack variables. Any other kwargs are ignored. for i in [("in_jupyter", in_jupyter), ("testing", testing), ("gui", gui), ("pixels_per_line", pixels_per_line), ("tkinter_widgets", tkinter_widgets), ("normalize_img_sizes", normalize_img_sizes), ("metadata", metadata)]: if i[1] is not None: setattr(self, i[0], i[1]) # monitor progress on gui self.progressbar_start_preprocessing() if not gui: print("preprocessing data...") t_i = time() # get mass windows MS1_list, _, MS1_polarity_list, prec_list, frag_list, _, MS2_polarity_list, mass_list_idxs = self.mass_list acq_times, all_filters_list = self.check_dim(ShowNumLineSpe=in_jupyter) self.metadata['average_start_time'] = np.mean([i[0] for i in acq_times]) self.metadata['average_end_time'] = np.mean([i[-1] for i in acq_times]) filters_info, filter_inverse = self.get_filters_info(all_filters_list) mzsPerFilter, mzsPerFilter_lb, mzsPerFilter_ub, mzIndicesPerFilter = self.get_CountsPerFilter(filters_info) # finds the number of scans that use a specific filter scans_per_filter = self.get_ScansPerFilter(filters_info, all_filters_list, filter_inverse) consolidated_filter_list, mzs_per_filter_grp, mzs_per_filter_grp_lb, mzs_per_filter_grp_ub, mz_idxs_per_filter_grp, \ scans_per_filter_grp, peak_counts_per_filter_grp, consolidated_idx_list \ = self.consolidate_filter_list(filters_info, mzsPerFilter, scans_per_filter, mzsPerFilter_lb, mzsPerFilter_ub, mzIndicesPerFilter) num_filter_groups = len(consolidated_filter_list) # get an array that gives the scan group number from the index of any scan (1d index) grp_from_scan_idx = np.empty((len(filters_info[0])), dtype = int) for idx, i in enumerate(consolidated_idx_list): for j in i: grp_from_scan_idx[j]=idx grp_from_scan_idx = grp_from_scan_idx[filter_inverse] # There was an issue with the scans_per_filter_group defined above. This overwrites it because I couldnt figure out what the issue was above. scans_per_filter_grp = np.zeros((len(self.line_list), num_filter_groups), dtype = int) j=0 for i in range(len(acq_times)): scans_per_filter_grp[i,:] = np.unique(grp_from_scan_idx[np.arange(j,j+len(acq_times[i]))], return_counts=True, axis = 0)[1] j+=len(acq_times[i]) # monitor progress on gui self.progressbar_start_extraction() if not gui: print("finished data preprocessing after {tot_time:.2f} s".format(tot_time = time()-t_i)) all_TimeStamps = [] pixels_metas = [] # holds index of current scan scan_idx = 0 for i, Name in tqdm(enumerate(self.line_list), desc = 'Progress through lines', total = len(self.line_list), disable = (testing or gui)): # accumulators for all fitlers,for line before interpolation, interpolation: intensity, scan/acq_time TimeStamps = [ np.zeros((scans_per_filter_grp[i][_])) for _ in range(num_filter_groups) ] # spectra for each filter # counts how many times numbers have been inputted each array counter = np.zeros((scans_per_filter_grp[0].shape[0])).astype(int)-1 # start from -1, +=1 before handeling with pymzml.run.Reader(Name, obo_version = '4.1.9') as reader: # collect metadata from raw file # if i == 0: # self.metadata = get_basic_instrument_metadata_raw_no_mob(data, self.metadata) # a list of 2d matrix, matrix: scans x (mzs +1) , 1 -> tic pixels_meta = [ np.zeros((scans_per_filter_grp[i][_] , peak_counts_per_filter_grp[_] + 1)) for _ in range(num_filter_groups) ] for j, spectrum in tqdm(enumerate(reader), disable = True): # Update gui variables self.progressbar_update_progress(len(acq_times[i]), i, j) # determine which group is going to be used grp = grp_from_scan_idx[scan_idx] counter[grp]+=1 # handle info TimeStamps[grp][counter[grp]] = acq_times[i][j] pixels_meta[grp][counter[grp], 0] = spectrum.TIC # skip filters with no masses in the mass list if peak_counts_per_filter_grp[grp]: # get mz and intensity values mz = spectrum.mz intensity_points = np.append(spectrum.i,0) lb,ub = np.array(mzs_per_filter_grp_lb[grp]), np.array(mzs_per_filter_grp_ub[grp]) pixel = self.extract_masses_no_mob(mz, lb, ub, intensity_points) pixels_meta[grp][counter[grp],1:] = pixel # if self.numba_present: # idxs_to_sum = self.vectorized_sorted_slice_njit(mz, lb, ub) # pixel = self.assign_values_to_pixel_njit(intensity_points, idxs_to_sum) # pixels_meta[grp][counter[grp],1:] = pixel # else: # idxs_to_sum = self.vectorized_sorted_slice(mz, lb, ub) # Slower # pixels_meta[grp][counter[grp],1:] = np.sum(np.take(intensity_points, idxs_to_sum), axis = 1) # keep count of the 1d scan index scan_idx += 1 all_TimeStamps.append(TimeStamps) pixels_metas.append(pixels_meta) self.rts = acq_times pixels, all_TimeStamps_aligned = self.ms2_interp(pixels_metas, all_TimeStamps, acq_times, scans_per_filter_grp, mzs_per_filter_grp) # Order the pixels in the way the mass list csv/excel file was ordered pixels = self.reorder_pixels(pixels, consolidated_filter_list, mz_idxs_per_filter_grp, mass_list_idxs) if self.normalize_img_sizes: pixels = self.pixels_list_to_array(pixels, all_TimeStamps_aligned) return self.metadata, pixels
## Currently unused so commented out until sure it can be deleted # def get_filter_idx(self, Filter,acq_types,acq_polars,mz_ranges,precursors): # precursor, energy, acq_type, acq_polar, mass_range_start, mass_range_end = Filter # if acq_polar == '+': # polarity_numeric = 1.0 # elif acq_polar == '-': # polarity_numeric = -1.0 # if acq_type == 'MS1': # since filter name varies for ms, we just hard code this situation. # precursor = 0.0 # mz_range = [100.0, 950.0] # elif acq_type == 'MS2': # mz_range = [float(mass_range_start),float(mass_range_end)] # mz_range_judge = np.array(mz_range).reshape(1, 2) == np.array(mz_ranges).astype(float) # # to match look-up table: acq_types, acq_polars, precursors # if acq_type == 'MS1': # idx = (polarity_numeric == acq_polars)&(acq_type == acq_types)&(mz_range_judge[:,0])&(mz_range_judge[:,1]) # if acq_type == 'MS2': # idx = (polarity_numeric == acq_polars)&(acq_type == acq_types)&(mz_range_judge[:,0])&(mz_range_judge[:,1])&(precursor == precursors) # idx = np.where(idx)[0] # return idx # ====================================================================== # MS2 with mobility # ======================================================================
[docs] def mzml_ms2_mob(self, metadata=None, normalize_img_sizes=None, in_jupyter=None, testing=None, gui=None, pixels_per_line=None, tkinter_widgets=None, **kwargs): """ Data processing from .mzml files that contain MS2 data and ion mobility data. Args: metadata (dict): Metadata dictionary to store instrument information. Overwrites self.metadata if provided. normalize_img_sizes (bool): Flag indicating if image sizes should be normalized. Overwrites self.normalize_img_sizes if provided. in_jupyter (bool): Flag indicating if the code is running in a Jupyter notebook. Overwrites self.in_jupyter if provided. testing (bool): Flag for testing mode. Overwrites self.testing if provided. gui (bool): Flag for GUI mode. Overwrites self.gui if provided. pixels_per_line (int): Number of pixels per line for the output image. Overwrites self.pixels_per_line if provided. tkinter_widgets: Tkinter widgets for GUI progress bar. Overwrites self.tkinter_widgets if provided. Returns: metadata (dict): Updated metadata dictionary with instrument information. pixels_aligned (np.ndarray): 3D array of intensity data of shape (m/z+1, lines, pixels_per_line) or list of ion image arrays of shape (height, width). """ # unpack variables. Any other kwargs are ignored. for i in [("in_jupyter", in_jupyter), ("testing", testing), ("gui", gui), ("pixels_per_line", pixels_per_line), ("tkinter_widgets", tkinter_widgets), ("normalize_img_sizes", normalize_img_sizes), ("metadata", metadata)]: if i[1] is not None: setattr(self, i[0], i[1]) # monitor progress on gui self.progressbar_start_preprocessing() if not gui: print("preprocessing data...") t_i = time() MS1_list, _, MS1_polarity_list, _, _, _, _, mass_list_idxs = self.mass_list acq_times, all_filters_list = self.check_dim(ShowNumLineSpe=in_jupyter) self.metadata['average_start_time'] = np.mean([i[0] for i in acq_times]) self.metadata['average_end_time'] = np.mean([i[-1] for i in acq_times]) filters_info, filter_inverse = self.get_filters_info(all_filters_list) mzsPerFilter, mzsPerFilter_lb, mzsPerFilter_ub, mobsPerFilter_lb, mobsPerFilter_ub, mzIndicesPerFilter \ = self.get_CountsPerFilter(filters_info) scans_per_filter = self.get_ScansPerFilter(filters_info, all_filters_list, filter_inverse) consolidated_filter_list, mzs_per_filter_grp, mzs_per_filter_grp_lb, mzs_per_filter_grp_ub, mz_idxs_per_filter_grp, \ scans_per_filter_grp, peak_counts_per_filter_grp, consolidated_idx_list \ = self.consolidate_filter_list(filters_info, mzsPerFilter, scans_per_filter, mzsPerFilter_lb, mzsPerFilter_ub, mzIndicesPerFilter) #get ms level of each filter group ms_lvl_per_filter_grp = [] for grp in consolidated_filter_list: ms_lvl_per_filter_grp.append(grp[0][2]) num_filter_groups = len(consolidated_filter_list) # get an array that gives the scan group number from the index of any scan (1d index) grp_from_scan_idx = np.empty((len(filters_info[0])), dtype = int) for idx, i in enumerate(consolidated_idx_list): for j in i: grp_from_scan_idx[j]=idx grp_from_scan_idx = grp_from_scan_idx[filter_inverse] # monitor progress on gui self.progressbar_start_extraction() if not self.gui: print("finished data preprocessing after {tot_time:.2f} s".format(tot_time = time()-t_i)) all_TimeStamps = [] pixels_metas = [] # holds index of current scan/spectrum scan_idx = 0 for i, file_dir in tqdm(enumerate(self.line_list), desc = 'Progress through lines', total = len(self.line_list), disable = (testing or gui)): # accumulators for all fitlers,for line before interpolation, interpolation: intensity, scan/acq_time TimeStamps = [ np.zeros((scans_per_filter_grp[i][_])) for _ in range(num_filter_groups) ] # spectra for each filter # counts how many times numbers have been inputted each array counter = np.zeros((scans_per_filter_grp[0].shape[0])).astype(int)-1 # start from -1, +=1 before handeling with pymzml.run.Reader(file_dir, obo_version = '4.1.9') as reader: # a list of 2d matrix, matrix: scans x (mzs +1) , 1 -> tic pixels_meta = [ np.zeros((scans_per_filter_grp[i][_] , peak_counts_per_filter_grp[_] + 1)) for _ in range(num_filter_groups) ] for j, spectrum in enumerate(reader): # Update gui variables self.progressbar_update_progress(len(acq_times[i]), i, j) # collect metadata from raw file # if i == 0: # self.metadata = get_basic_instrument_metadata_raw_no_mob(data, self.metadata) # determine which group is going to be used grp = grp_from_scan_idx[scan_idx] counter[grp]+=1 # handle info TimeStamps[grp][counter[grp]] = acq_times[i][j] pixels_meta[grp][counter[grp], 0] = spectrum.TIC # skip filters with no masses in the mass list if peak_counts_per_filter_grp[grp]: mz = spectrum.mz intensity_points = np.append(spectrum.i,0) # Get mobility values all_present_arrays = spectrum.get_all_arrays_in_spec() if "mean inverse reduced ion mobility array" in all_present_arrays: mob = spectrum.get_array("mean inverse reduced ion mobility array") else: try: with self.HiddenPrints(): mob = spectrum.get_array("raw ion mobility array") except: mob = None if mob is None: mob = [0.0] # get all m/z and mobility values that bound the selection windows as their tof or scan index lbs = np.array(mzs_per_filter_grp_lb[grp]) ubs = np.array(mzs_per_filter_grp_ub[grp]) mob_lbs = np.array(mobsPerFilter_lb[consolidated_idx_list[grp][0]]) mob_ubs = np.array(mobsPerFilter_ub[consolidated_idx_list[grp][0]]) # simultaneously slice by mz and mobility idxs_to_sum = self.vectorized_unsorted_slice_mob(mz,mob,lbs,ubs,mob_lbs,mob_ubs) pixels_meta[grp][counter[grp],1:] = np.sum(np.take(intensity_points, np.array(idxs_to_sum)), axis = 1) # keep count of the 1d scan index scan_idx += 1 all_TimeStamps.append(TimeStamps) pixels_metas.append(pixels_meta) self.rts = acq_times pixels, all_TimeStamps_aligned = self.ms2_interp(pixels_metas, all_TimeStamps, acq_times, scans_per_filter_grp, mzs_per_filter_grp) # Order the pixels in the way the mass list csv/excel file was ordered pixels = self.reorder_pixels(pixels, consolidated_filter_list, mz_idxs_per_filter_grp, mass_list_idxs) if self.normalize_img_sizes: pixels = self.pixels_list_to_array(pixels, all_TimeStamps_aligned) return self.metadata, pixels
[docs] def get_mobility_range_from_mzml_spectrum(self, spectrum): """ Determines the lower and upper bounds of the mobility range from the spectrum object. """ # get mobility range from keyword parameters if possible mob_range_start, mob_range_end = self.getUserParam(spectrum, 'ion mobility lower limit'), self.getUserParam(spectrum, 'ion mobility upper limit') # otherwise extract the mobility array and get the max & min values. if (mob_range_start is None) or (mob_range_end is None): all_present_arrays = spectrum.get_all_arrays_in_spec() if "mean inverse reduced ion mobility array" in all_present_arrays: mob = spectrum.get_array("mean inverse reduced ion mobility array") else: try: with self.HiddenPrints(): mob = spectrum.get_array("raw ion mobility array") except: mob = None if mob is None: mob =[0.0] mob_range_start, mob_range_end = np.min(mob), np.max(mob) return [mob_range_start, mob_range_end]
[docs] def getUserParam(self, spectrum, param_name): """ Obtains the value of a parameter based on its parameter name from the spectrum object. """ search_string = './/*[@name="{0}"]'.format(param_name) elements = [] for x in spectrum.element.iterfind(search_string): val = x.attrib.get("value", "") try: val = float(val) except: pass elements.append(val) if len(elements) == 0: return_val = None elif len(elements) == 1: return_val = elements[0] else: return_val = elements if return_val == "": return_val = True return return_val