Skip to main content

rustdf/data/
dda.rs

1use crate::data::acquisition::AcquisitionMode;
2use crate::data::handle::{IndexConverter, TimsData, TimsDataLoader};
3use crate::data::meta::{read_dda_precursor_meta, read_global_meta_sql, read_meta_data_sql, read_pasef_frame_ms_ms_info, DDAPrecursor, DDAPrecursorMeta, PasefMsMsMeta};
4use mscore::timstof::frame::{ImsFrame, RawTimsFrame, TimsFrame};
5use mscore::timstof::slice::TimsSlice;
6use mscore::timstof::spectrum_processing::{
7    PASEFFragmentData, PreprocessedSpectrum, SpectrumProcessingConfig,
8    process_pasef_fragments_batch,
9};
10use rayon::prelude::*;
11use rayon::ThreadPoolBuilder;
12use std::collections::BTreeMap;
13use rand::prelude::IteratorRandom;
14use mscore::data::spectrum::MsType;
15use std::collections::HashMap;
16
17#[derive(Clone)]
18pub struct PASEFDDAFragment {
19    pub frame_id: u32,
20    pub precursor_id: u32,
21    pub collision_energy: f64,
22    pub selected_fragment: TimsFrame,
23}
24
25/// Statistical moments of a 1D signal distribution
26#[derive(Clone, Debug, Default)]
27pub struct SignalMoments {
28    pub mean: f64,
29    pub variance: f64,
30    pub skewness: f64,
31    pub apex: f64,
32    pub fwhm: f64,
33    pub total_intensity: f64,
34}
35
36impl SignalMoments {
37    /// Calculate moments from coordinate and intensity arrays
38    pub fn from_signal(coords: &[f64], intensities: &[f64]) -> Self {
39        if coords.is_empty() || intensities.iter().sum::<f64>() == 0.0 {
40            return Self::default();
41        }
42
43        let total: f64 = intensities.iter().sum();
44
45        // First moment (weighted mean)
46        let mean: f64 = coords.iter()
47            .zip(intensities.iter())
48            .map(|(c, i)| c * i / total)
49            .sum();
50
51        // Second moment (weighted variance)
52        let variance: f64 = coords.iter()
53            .zip(intensities.iter())
54            .map(|(c, i)| i / total * (c - mean).powi(2))
55            .sum();
56
57        // Third moment (weighted skewness)
58        let std = variance.sqrt().max(1e-10);
59        let skewness: f64 = coords.iter()
60            .zip(intensities.iter())
61            .map(|(c, i)| i / total * ((c - mean) / std).powi(3))
62            .sum();
63
64        // Apex (position of maximum)
65        let apex_idx = intensities.iter()
66            .enumerate()
67            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
68            .map(|(i, _)| i)
69            .unwrap_or(0);
70        let apex = coords.get(apex_idx).copied().unwrap_or(0.0);
71
72        // FWHM estimation
73        let half_max = intensities.get(apex_idx).copied().unwrap_or(0.0) / 2.0;
74        let above_half: Vec<f64> = coords.iter()
75            .zip(intensities.iter())
76            .filter(|(_, i)| **i >= half_max)
77            .map(|(c, _)| *c)
78            .collect();
79        let fwhm = if above_half.len() >= 2 {
80            above_half.last().unwrap_or(&0.0) - above_half.first().unwrap_or(&0.0)
81        } else {
82            2.355 * std  // Gaussian approximation
83        };
84
85        SignalMoments {
86            mean,
87            variance,
88            skewness,
89            apex,
90            fwhm,
91            total_intensity: total,
92        }
93    }
94}
95
96/// MS1 precursor signal extracted from surrounding frames
97#[derive(Clone, Debug)]
98pub struct PrecursorMS1Signal {
99    pub precursor_id: u32,
100
101    // XIC (chromatographic profile) - 1D projection
102    pub rt_coords: Vec<f64>,           // RT in seconds
103    pub rt_intensities: Vec<f64>,
104    pub rt_moments: SignalMoments,
105
106    // Mobilogram (IM profile) - 1D projection
107    pub im_coords: Vec<f64>,           // 1/K0
108    pub im_intensities: Vec<f64>,
109    pub im_moments: SignalMoments,
110
111    // Isotope envelope - 1D projection
112    pub isotope_mz: Vec<f64>,
113    pub isotope_intensity: Vec<f64>,
114    pub mz_moments: SignalMoments,
115
116    // Raw 2D data (all peaks from filtered MS1 frames in RT window, merged)
117    pub raw_rt: Vec<f64>,              // RT per peak (seconds)
118    pub raw_mz: Vec<f64>,              // m/z per peak
119    pub raw_mobility: Vec<f64>,        // 1/K0 per peak
120    pub raw_intensity: Vec<f64>,       // intensity per peak
121}
122
123/// Input for MS1 extraction - precursor coordinates
124#[derive(Clone, Debug)]
125pub struct PrecursorCoord {
126    pub precursor_id: u32,
127    pub mz: f64,           // largest_peak_mz - used as fallback if mono_mz is 0
128    pub mono_mz: f64,      // Monoisotopic m/z for isotope envelope (M+0 starting point), 0 if unknown
129    pub rt_seconds: f64,
130    pub mobility: f64,     // Center mobility from fragment
131    pub im_start: f64,     // Fragment selection IM start (for plotting)
132    pub im_end: f64,       // Fragment selection IM end (for plotting)
133    pub charge: i32,
134}
135
136pub struct TimsDatasetDDA {
137    pub loader: TimsDataLoader,
138    pub pasef_meta: Vec<PasefMsMsMeta>,
139}
140
141impl TimsDatasetDDA {
142    pub fn new(
143        bruker_lib_path: &str,
144        data_path: &str,
145        in_memory: bool,
146        use_bruker_sdk: bool,
147    ) -> Self {
148        // TODO: error handling
149        let global_meta_data = read_global_meta_sql(data_path).unwrap();
150        let meta_data = read_meta_data_sql(data_path).unwrap();
151
152        let scan_max_index = meta_data.iter().map(|x| x.num_scans).max().unwrap() as u32;
153        let im_lower = global_meta_data.one_over_k0_range_lower;
154        let im_upper = global_meta_data.one_over_k0_range_upper;
155
156        let tof_max_index = global_meta_data.tof_max_index;
157        let mz_lower = global_meta_data.mz_acquisition_range_lower;
158        let mz_upper = global_meta_data.mz_acquisition_range_upper;
159
160        let loader = match in_memory {
161            true => TimsDataLoader::new_in_memory(
162                bruker_lib_path,
163                data_path,
164                use_bruker_sdk,
165                scan_max_index,
166                im_lower,
167                im_upper,
168                tof_max_index,
169                mz_lower,
170                mz_upper,
171            ),
172            false => TimsDataLoader::new_lazy(
173                bruker_lib_path,
174                data_path,
175                use_bruker_sdk,
176                scan_max_index,
177                im_lower,
178                im_upper,
179                tof_max_index,
180                mz_lower,
181                mz_upper,
182            ),
183        };
184        
185        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
186
187        TimsDatasetDDA { loader, pasef_meta }
188    }
189
190    /// Create a new DDA dataset with pre-computed ion mobility calibration lookup table.
191    ///
192    /// This enables accurate ion mobility calibration with fast parallel extraction.
193    /// The im_lookup table should be pre-computed using the Bruker SDK.
194    ///
195    /// # Arguments
196    /// * `data_path` - Path to the .d folder
197    /// * `in_memory` - Whether to load all data into memory
198    /// * `bruker_lib_path` - Path to the Bruker SDK shared library; used to
199    ///   derive an accurate m/z calibration. Pass "NO_SDK" (or an empty
200    ///   string) to skip and use the 2-point boundary m/z model.
201    /// * `im_lookup` - Pre-computed scan→1/K0 lookup table from Bruker SDK
202    ///
203    /// # Returns
204    /// A new TimsDatasetDDA with LookupIndexConverter (thread-safe, accurate)
205    pub fn new_with_calibration(
206        data_path: &str,
207        in_memory: bool,
208        bruker_lib_path: &str,
209        im_lookup: Vec<f64>,
210    ) -> Self {
211        let global_meta_data = read_global_meta_sql(data_path).unwrap();
212
213        let tof_max_index = global_meta_data.tof_max_index;
214        let mz_lower = global_meta_data.mz_acquisition_range_lower;
215        let mz_upper = global_meta_data.mz_acquisition_range_upper;
216
217        let loader = match in_memory {
218            true => TimsDataLoader::new_in_memory_with_calibration(
219                data_path,
220                bruker_lib_path,
221                tof_max_index,
222                mz_lower,
223                mz_upper,
224                im_lookup,
225            ),
226            false => TimsDataLoader::new_lazy_with_calibration(
227                data_path,
228                bruker_lib_path,
229                tof_max_index,
230                mz_lower,
231                mz_upper,
232                im_lookup,
233            ),
234        };
235
236        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
237
238        TimsDatasetDDA { loader, pasef_meta }
239    }
240
241    /// Create a DDA dataset with regression-derived m/z calibration.
242    ///
243    /// This method uses externally-provided m/z calibration coefficients (e.g., from
244    /// linear regression on precursor data) instead of the simple boundary model.
245    ///
246    /// # Arguments
247    /// * `data_path` - Path to the .d folder
248    /// * `in_memory` - Whether to load all data into memory
249    /// * `tof_intercept` - Intercept for sqrt(mz) = intercept + slope * tof
250    /// * `tof_slope` - Slope for sqrt(mz) = intercept + slope * tof
251    ///
252    /// # Returns
253    /// A new TimsDatasetDDA with CalibratedIndexConverter (thread-safe, accurate)
254    pub fn new_with_mz_calibration(
255        data_path: &str,
256        in_memory: bool,
257        tof_intercept: f64,
258        tof_slope: f64,
259    ) -> Self {
260        let global_meta_data = read_global_meta_sql(data_path).unwrap();
261        let frame_meta = read_meta_data_sql(data_path).unwrap();
262
263        let scan_max_index = frame_meta.iter().map(|x| x.num_scans).max().unwrap() as u32;
264        let im_lower = global_meta_data.one_over_k0_range_lower;
265        let im_upper = global_meta_data.one_over_k0_range_upper;
266
267        let loader = match in_memory {
268            true => TimsDataLoader::new_in_memory_with_mz_calibration(
269                data_path,
270                tof_intercept,
271                tof_slope,
272                im_lower,
273                im_upper,
274                scan_max_index,
275            ),
276            false => TimsDataLoader::new_lazy_with_mz_calibration(
277                data_path,
278                tof_intercept,
279                tof_slope,
280                im_lower,
281                im_upper,
282                scan_max_index,
283            ),
284        };
285
286        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
287
288        TimsDatasetDDA { loader, pasef_meta }
289    }
290
291    /// Check if the Bruker SDK is being used for index conversion.
292    /// Returns false for both Simple and Lookup converters (which are thread-safe).
293    pub fn uses_bruker_sdk(&self) -> bool {
294        self.loader.uses_bruker_sdk()
295    }
296
297    pub fn get_selected_precursors(&self) -> Vec<DDAPrecursor> {
298        let precursor_meta = read_dda_precursor_meta(&self.loader.get_data_path()).unwrap();
299        let pasef_meta = &self.pasef_meta;
300
301        let precursor_id_to_pasef_meta: BTreeMap<i64, &PasefMsMsMeta> = pasef_meta
302            .iter()
303            .map(|x| (x.precursor_id as i64, x))
304            .collect();
305
306        // go over all precursors and get the precursor meta data
307        let result: Vec<_> = precursor_meta
308            .iter()
309            .map(|precursor| {
310                let pasef_meta = precursor_id_to_pasef_meta
311                    .get(&precursor.precursor_id)
312                    .unwrap();
313                DDAPrecursor {
314                    frame_id: precursor.precursor_frame_id,
315                    precursor_id: precursor.precursor_id,
316                    mono_mz: precursor.precursor_mz_monoisotopic,
317                    highest_intensity_mz: precursor.precursor_mz_highest_intensity,
318                    average_mz: precursor.precursor_mz_average,
319                    charge: precursor.precursor_charge,
320                    inverse_ion_mobility: self.scan_to_inverse_mobility(
321                        precursor.precursor_frame_id as u32,
322                        &vec![precursor.precursor_average_scan_number as u32],
323                    )[0],
324                    collision_energy: pasef_meta.collision_energy,
325                    precuror_total_intensity: precursor.precursor_total_intensity,
326                    isolation_mz: pasef_meta.isolation_mz,
327                    isolation_width: pasef_meta.isolation_width,
328                }
329            })
330            .collect();
331
332        result
333    }
334
335    pub fn get_precursor_frames(
336        &self,
337        min_intensity: f64,
338        max_num_peaks: usize,
339        num_threads: usize,
340    ) -> Vec<TimsFrame> {
341        // get all precursor frames
342        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
343
344        // get the precursor frames
345        let precursor_frames = meta_data.iter().filter(|x| x.ms_ms_type == 0);
346
347        let tims_silce =
348            self.get_slice(precursor_frames.map(|x| x.id as u32).collect(), num_threads);
349
350        let result: Vec<_> = tims_silce
351            .frames
352            .par_iter()
353            .map(|frame| {
354                frame
355                    .filter_ranged(0.0, 2000.0, 0, 2000, 0.0, 5.0, min_intensity, 1e9, 0, i32::MAX)
356                    .top_n(max_num_peaks)
357            })
358            .collect();
359
360        result
361    }
362
363    /// Extract MS1 precursor signals for a batch of precursors in parallel.
364    ///
365    /// For each precursor, extracts:
366    /// - XIC (chromatographic profile) from MS1 frames in RT window
367    /// - Mobilogram (IM profile)
368    /// - Isotope envelope
369    /// - Statistical moments (mean, variance, skewness, apex, FWHM) for each dimension
370    ///
371    /// Uses batched processing to avoid loading all MS1 frames at once.
372    ///
373    /// # Arguments
374    /// * `precursor_coords` - Vector of precursor coordinates (id, mz, rt_sec, mobility, charge)
375    /// * `rt_window_sec` - RT window in seconds (total width)
376    /// * `mz_tol_ppm` - m/z tolerance in ppm
377    /// * `im_window` - IM window in 1/K0 units (total width)
378    /// * `n_isotopes` - Number of isotope peaks to extract
379    /// * `num_threads` - Number of threads for parallel processing
380    ///
381    /// # Returns
382    /// Vector of PrecursorMS1Signal, one per input precursor
383    pub fn extract_precursor_ms1_signals(
384        &self,
385        precursor_coords: Vec<PrecursorCoord>,
386        rt_window_sec: f64,
387        mz_tol_ppm: f64,
388        im_window: f64,
389        n_isotopes: usize,
390        num_threads: usize,
391    ) -> Vec<PrecursorMS1Signal> {
392        if precursor_coords.is_empty() {
393            return Vec::new();
394        }
395
396        // Get frame metadata
397        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
398
399        // Get all MS1 frame info sorted by time
400        let mut ms1_frame_info: Vec<(u32, f64)> = meta_data
401            .iter()
402            .filter(|f| f.ms_ms_type == 0)
403            .map(|f| (f.id as u32, f.time))
404            .collect();
405        ms1_frame_info.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
406
407        let ms1_times: Vec<f64> = ms1_frame_info.iter().map(|(_, t)| *t).collect();
408
409        // Sort precursors by RT for batched processing
410        let mut sorted_coords: Vec<(usize, &PrecursorCoord)> = precursor_coords
411            .iter()
412            .enumerate()
413            .collect();
414        sorted_coords.sort_by(|a, b| a.1.rt_seconds.partial_cmp(&b.1.rt_seconds).unwrap());
415
416        // Process in RT batches (5 minute chunks = 300 sec)
417        let batch_size_sec = 300.0;
418        let mut results: Vec<(usize, PrecursorMS1Signal)> = Vec::with_capacity(precursor_coords.len());
419
420        let mut batch_start = 0;
421        while batch_start < sorted_coords.len() {
422            // Find batch end (all precursors within batch_size_sec of first)
423            let batch_rt_start = sorted_coords[batch_start].1.rt_seconds;
424            let batch_rt_end = batch_rt_start + batch_size_sec;
425
426            let mut batch_end = batch_start;
427            while batch_end < sorted_coords.len() && sorted_coords[batch_end].1.rt_seconds < batch_rt_end {
428                batch_end += 1;
429            }
430
431            // Determine MS1 frames needed for this batch (with RT window margin)
432            let frame_rt_min = batch_rt_start - rt_window_sec;
433            let frame_rt_max = batch_rt_end + rt_window_sec;
434
435            let frame_start_idx = ms1_times.partition_point(|t| *t < frame_rt_min);
436            let frame_end_idx = ms1_times.partition_point(|t| *t <= frame_rt_max);
437
438            // Load frames for this batch
439            let batch_frame_ids: Vec<u32> = ms1_frame_info[frame_start_idx..frame_end_idx]
440                .iter()
441                .map(|(id, _)| *id)
442                .collect();
443
444            let batch_frames = if !batch_frame_ids.is_empty() {
445                self.loader.get_slice(batch_frame_ids, num_threads)
446            } else {
447                TimsSlice { frames: Vec::new() }
448            };
449
450            let batch_times: Vec<f64> = ms1_times[frame_start_idx..frame_end_idx].to_vec();
451
452            // Process precursors in this batch in parallel
453            let batch_coords = &sorted_coords[batch_start..batch_end];
454
455            let pool = ThreadPoolBuilder::new()
456                .num_threads(num_threads)
457                .build()
458                .unwrap();
459
460            let batch_results: Vec<(usize, PrecursorMS1Signal)> = pool.install(|| {
461                batch_coords.par_iter().map(|(orig_idx, coord)| {
462                    let signal = Self::extract_single_precursor(
463                        coord,
464                        &batch_frames.frames,
465                        &batch_times,
466                        rt_window_sec,
467                        mz_tol_ppm,
468                        im_window,
469                        n_isotopes,
470                    );
471                    (*orig_idx, signal)
472                }).collect()
473            });
474
475            results.extend(batch_results);
476            batch_start = batch_end;
477        }
478
479        // Restore original order
480        results.sort_by_key(|(idx, _)| *idx);
481        results.into_iter().map(|(_, signal)| signal).collect()
482    }
483
484    /// Extract MS1 signal for a single precursor from pre-loaded frames
485    fn extract_single_precursor(
486        coord: &PrecursorCoord,
487        frames: &[TimsFrame],
488        frame_times: &[f64],
489        rt_window_sec: f64,
490        mz_tol_ppm: f64,
491        im_window: f64,
492        n_isotopes: usize,
493    ) -> PrecursorMS1Signal {
494        let rt_sec = coord.rt_seconds;
495
496        // Binary search to find RT window bounds within batch frames
497        let rt_min = rt_sec - rt_window_sec / 2.0;
498        let rt_max = rt_sec + rt_window_sec / 2.0;
499
500        let start_idx = frame_times.partition_point(|t| *t < rt_min);
501        let end_idx = frame_times.partition_point(|t| *t <= rt_max);
502
503        // Calculate isotope spacing
504        let isotope_spacing = 1.003355 / (coord.charge.max(1) as f64);
505        let n_isotopes_to_extract = 4.min(n_isotopes); // M+0 to M+3 only
506
507        // Determine base m/z for extraction:
508        // - If mono_mz > 0: use it as starting point for isotope range
509        // - Otherwise: fall back to largest_peak_mz (single peak extraction)
510        let has_mono_mz = coord.mono_mz > 0.0;
511        let base_mz = if has_mono_mz { coord.mono_mz } else { coord.mz };
512        let mz_tol = base_mz * mz_tol_ppm / 1e6;
513
514        // Calculate isotope m/z values (M+0 through M+3)
515        let isotope_mz_values: Vec<f64> = (0..n_isotopes_to_extract)
516            .map(|i| base_mz + (i as f64) * isotope_spacing)
517            .collect();
518
519        // m/z range for XIC/mobilogram:
520        // - If mono_mz available: use full isotope range (M+0 to M+3) for better S/N
521        // - Otherwise: use single m/z (largest_peak_mz)
522        let (xic_mz_min, xic_mz_max) = if has_mono_mz {
523            // Full isotope range: M+0 - tolerance to M+3 + tolerance
524            (base_mz - mz_tol, base_mz + ((n_isotopes_to_extract - 1) as f64) * isotope_spacing + mz_tol)
525        } else {
526            // Single peak: largest_peak_mz ± tolerance
527            (coord.mz - mz_tol, coord.mz + mz_tol)
528        };
529
530        // IM range (uses im_window parameter, doubled from default for better coverage)
531        let im_min = coord.mobility - im_window / 2.0;
532        let im_max = coord.mobility + im_window / 2.0;
533
534        // Accumulators
535        let n_frames = end_idx.saturating_sub(start_idx);
536        let mut rt_coords = Vec::with_capacity(n_frames);
537        let mut rt_intensities = Vec::with_capacity(n_frames);
538        let mut im_dict: HashMap<i64, f64> = HashMap::new();
539        let mut isotope_intensity = vec![0.0f64; n_isotopes];  // Keep original size for output
540
541        // Accumulators for raw 2D data (merged from all frames)
542        let mut raw_rt = Vec::new();
543        let mut raw_mz = Vec::new();
544        let mut raw_mobility = Vec::new();
545        let mut raw_intensity = Vec::new();
546
547        // Extract from each MS1 frame in the RT window
548        for idx in start_idx..end_idx.min(frames.len()) {
549            let frame_time = frame_times[idx];
550            let frame = &frames[idx];
551
552            // === XIC and Mobilogram: Extract from isotope range (or single m/z if no mono_mz) ===
553            // Scan range is unbounded; the inv_mobility window (im_min/im_max) does
554            // the mobility selection. A hardcoded scan cap (e.g. 0..1000) silently
555            // drops signal on methods configured with more than that many scans
556            // per frame (high mobility-resolution / wide IM range acquisitions).
557            let xic_filtered = frame.filter_ranged(
558                xic_mz_min, xic_mz_max,
559                0, i32::MAX,
560                im_min, im_max,
561                0.0, 1e9,
562                0, i32::MAX,
563            );
564
565            // XIC: sum intensity at this RT from all isotopes
566            let xic_intensity: f64 = xic_filtered.ims_frame.intensity.iter().sum();
567            rt_coords.push(frame_time);
568            rt_intensities.push(xic_intensity);
569
570            // Mobilogram: accumulate from all isotopes in m/z range
571            for (mob, inten) in xic_filtered.ims_frame.mobility.iter().zip(xic_filtered.ims_frame.intensity.iter()) {
572                let mob_bin = (*mob * 1000.0).round() as i64;
573                *im_dict.entry(mob_bin).or_insert(0.0) += *inten;
574            }
575
576            // === Isotope envelope: Extract M+0 through M+3 individually ===
577            // Use same filtered data (already in isotope range)
578            for (iso_idx, iso_mz) in isotope_mz_values.iter().enumerate() {
579                let iso_peak_min = iso_mz - mz_tol;
580                let iso_peak_max = iso_mz + mz_tol;
581                let iso_intensity_sum: f64 = xic_filtered.ims_frame.mz.iter()
582                    .zip(xic_filtered.ims_frame.intensity.iter())
583                    .filter(|(mz, _)| **mz >= iso_peak_min && **mz <= iso_peak_max)
584                    .map(|(_, i)| *i)
585                    .sum();
586                isotope_intensity[iso_idx] += iso_intensity_sum;
587            }
588
589            // Raw 2D data: store all peaks from filtered range
590            let n_peaks = xic_filtered.ims_frame.mz.len();
591            for i in 0..n_peaks {
592                raw_rt.push(frame_time);
593                raw_mz.push(xic_filtered.ims_frame.mz[i]);
594                raw_mobility.push(xic_filtered.ims_frame.mobility[i]);
595                raw_intensity.push(xic_filtered.ims_frame.intensity[i]);
596            }
597        }
598
599        // Convert mobilogram accumulator to sorted arrays
600        let mut im_entries: Vec<(i64, f64)> = im_dict.into_iter().collect();
601        im_entries.sort_by_key(|(k, _)| *k);
602        let im_coords: Vec<f64> = im_entries.iter().map(|(k, _)| *k as f64 / 1000.0).collect();
603        let im_intensities: Vec<f64> = im_entries.iter().map(|(_, v)| *v).collect();
604
605        // Calculate moments
606        let rt_moments = SignalMoments::from_signal(&rt_coords, &rt_intensities);
607        let im_moments = SignalMoments::from_signal(&im_coords, &im_intensities);
608        let mz_moments = SignalMoments::from_signal(&isotope_mz_values, &isotope_intensity);
609
610        PrecursorMS1Signal {
611            precursor_id: coord.precursor_id,
612            rt_coords,
613            rt_intensities,
614            rt_moments,
615            im_coords,
616            im_intensities,
617            im_moments,
618            isotope_mz: isotope_mz_values,
619            isotope_intensity,
620            mz_moments,
621            raw_rt,
622            raw_mz,
623            raw_mobility,
624            raw_intensity,
625        }
626    }
627
628    pub fn get_pasef_frame_ms_ms_info(&self) -> Vec<PasefMsMsMeta> {
629        read_pasef_frame_ms_ms_info(&self.loader.get_data_path()).unwrap()
630    }
631
632    /// Get the fragment spectra for all PASEF selected precursors
633    pub fn get_pasef_fragments(&self, num_threads: usize) -> Vec<PASEFDDAFragment> {
634        // Delegate to the filtered version with no filter (all precursors)
635        self.get_pasef_fragments_for_precursors(None, num_threads)
636    }
637
638    /// Get fragment spectra for specific precursor IDs only.
639    /// If precursor_ids is None, returns all fragments (same as get_pasef_fragments).
640    /// This is more memory-efficient for batched processing.
641    pub fn get_pasef_fragments_for_precursors(
642        &self,
643        precursor_ids: Option<&[u32]>,
644        num_threads: usize,
645    ) -> Vec<PASEFDDAFragment> {
646        // extract fragment spectra information
647        let pasef_info = self.get_pasef_frame_ms_ms_info();
648
649        // Filter to requested precursor IDs if specified
650        let filtered_pasef_info: Vec<&PasefMsMsMeta> = match precursor_ids {
651            Some(ids) => {
652                // Create a HashSet for O(1) lookup
653                let id_set: std::collections::HashSet<u32> = ids.iter().copied().collect();
654                pasef_info.iter()
655                    .filter(|info| id_set.contains(&(info.precursor_id as u32)))
656                    .collect()
657            }
658            None => pasef_info.iter().collect(),
659        };
660
661        // Note: The Bruker SDK is NOT thread-safe, so we must use sequential iteration
662        // when the SDK is being used for index conversion.
663        let uses_bruker_sdk = self.loader.uses_bruker_sdk();
664
665        // Helper closure to process a single PASEF fragment
666        let process_fragment = |pasef_info: &PasefMsMsMeta| -> PASEFDDAFragment {
667            // get the frame
668            let frame = self.loader.get_frame(pasef_info.frame_id as u32);
669
670            // get five percent of the scan range
671            let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
672
673            // get the fragment spectrum by scan range
674            let filtered_frame = frame.filter_ranged(
675                0.0,
676                2000.0,
677                (pasef_info.scan_num_begin - scan_margin) as i32,
678                (pasef_info.scan_num_end + scan_margin) as i32,
679                0.0,
680                5.0,
681                0.0,
682                1e9,
683                0,
684                i32::MAX,
685            );
686
687            PASEFDDAFragment {
688                frame_id: pasef_info.frame_id as u32,
689                precursor_id: pasef_info.precursor_id as u32,
690                collision_energy: pasef_info.collision_energy,
691                // flatten the spectrum
692                selected_fragment: filtered_frame,
693            }
694        };
695
696        if uses_bruker_sdk {
697            // Sequential processing when using Bruker SDK (not thread-safe)
698            filtered_pasef_info.iter().map(|info| process_fragment(info)).collect()
699        } else {
700            // Parallel processing when using simple index converter (thread-safe)
701            let pool = ThreadPoolBuilder::new()
702                .num_threads(num_threads)
703                .build()
704                .unwrap();
705
706            pool.install(|| {
707                filtered_pasef_info.par_iter().map(|info| process_fragment(info)).collect()
708            })
709        }
710    }
711
712    /// Get preprocessed PASEF fragments ready for database search.
713    /// This method performs parallel processing of all fragment spectra, including:
714    /// - Flattening frames across ion mobility dimension
715    /// - Deisotoping (optional)
716    /// - Filtering to top N peaks
717    /// - Computing inverse mobility along scan marginal
718    ///
719    /// # Arguments
720    /// * `dataset_name` - Name of the dataset for generating spec_ids
721    /// * `config` - Spectrum processing configuration
722    /// * `num_threads` - Number of threads to use for parallel processing
723    ///
724    /// # Returns
725    /// Vector of preprocessed spectra ready for Sage search
726    pub fn get_preprocessed_pasef_fragments(
727        &self,
728        dataset_name: &str,
729        config: SpectrumProcessingConfig,
730        num_threads: usize,
731    ) -> Vec<PreprocessedSpectrum> {
732        // Step 1: Get raw PASEF fragments info
733        let pasef_info = self.get_pasef_frame_ms_ms_info();
734
735        // Step 2: Get precursor metadata
736        let precursor_meta = read_dda_precursor_meta(&self.loader.get_data_path()).unwrap_or_default();
737        let frame_meta = read_meta_data_sql(&self.loader.get_data_path()).unwrap_or_default();
738
739        // Create lookup maps
740        let precursor_map: BTreeMap<i64, &DDAPrecursorMeta> = precursor_meta
741            .iter()
742            .map(|p| (p.precursor_id, p))
743            .collect();
744
745        let frame_time_map: BTreeMap<i64, f64> = frame_meta
746            .iter()
747            .map(|f| (f.id, f.time / 60.0))  // Convert to minutes
748            .collect();
749
750        // Step 3: Group PASEF info by precursor_id to aggregate re-fragmented precursors
751        // This matches Python's groupby('precursor_id').agg({'raw_data': 'sum', ...})
752        let mut pasef_by_precursor: BTreeMap<i64, Vec<&PasefMsMsMeta>> = BTreeMap::new();
753        for info in &pasef_info {
754            pasef_by_precursor
755                .entry(info.precursor_id)
756                .or_insert_with(Vec::new)
757                .push(info);
758        }
759
760        // Step 4: Build fragment data with aggregation (merge frames for same precursor)
761        // Note: The Bruker SDK is NOT thread-safe, so we must use sequential iteration
762        // when the SDK is being used for index conversion.
763        let uses_bruker_sdk = self.loader.uses_bruker_sdk();
764
765        // Helper closure to process a single precursor
766        let process_precursor = |(precursor_id, pasef_infos): (&i64, &Vec<&PasefMsMsMeta>)| -> Option<PASEFFragmentData> {
767            // Get precursor metadata
768            let precursor = precursor_map.get(precursor_id)?;
769
770            // Use first PASEF info for metadata (matches Python's 'first')
771            let first_pasef = pasef_infos.first()?;
772
773            // Get retention time from first frame
774            let scan_start_time = frame_time_map.get(&first_pasef.frame_id).copied().unwrap_or(0.0);
775
776            // Collect and merge all frames for this precursor (matches Python's 'sum')
777            let mut combined_scan = Vec::new();
778            let mut combined_mobility = Vec::new();
779            let mut combined_tof = Vec::new();
780            let mut combined_mz = Vec::new();
781            let mut combined_intensity = Vec::new();
782
783            for pasef_info in pasef_infos {
784                // Get the frame and filter by scan range
785                let frame = self.loader.get_frame(pasef_info.frame_id as u32);
786
787                // Get five percent of the scan range for margin
788                let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
789
790                // Filter frame by scan range
791                let filtered_frame = frame.filter_ranged(
792                    0.0,
793                    2000.0,
794                    (pasef_info.scan_num_begin - scan_margin) as i32,
795                    (pasef_info.scan_num_end + scan_margin) as i32,
796                    0.0,
797                    5.0,
798                    0.0,
799                    1e9,
800                    0,
801                    i32::MAX,
802                );
803
804                // Append data from this frame (merge/sum behavior)
805                combined_scan.extend(filtered_frame.scan.iter());
806                combined_mobility.extend(filtered_frame.ims_frame.mobility.iter());
807                combined_tof.extend(filtered_frame.tof.iter());
808                combined_mz.extend(filtered_frame.ims_frame.mz.iter());
809                combined_intensity.extend(filtered_frame.ims_frame.intensity.iter());
810            }
811
812            if combined_mz.is_empty() {
813                return None;
814            }
815
816            // Determine precursor m/z (prefer monoisotopic, fallback to highest intensity)
817            let precursor_mz = precursor.precursor_mz_monoisotopic
818                .unwrap_or(precursor.precursor_mz_highest_intensity);
819
820            Some(PASEFFragmentData {
821                frame_id: first_pasef.frame_id as u32,
822                precursor_id: *precursor_id as u32,
823                collision_energy: first_pasef.collision_energy,
824                scan_start_time,
825                scan: combined_scan,
826                mobility: combined_mobility,
827                tof: combined_tof,
828                mz: combined_mz,
829                intensity: combined_intensity,
830                precursor_mz,
831                precursor_charge: precursor.precursor_charge.map(|c| c as i32),
832                precursor_intensity: precursor.precursor_total_intensity,
833                isolation_mz: first_pasef.isolation_mz,
834                isolation_width: first_pasef.isolation_width,
835            })
836        };
837
838        let fragment_data: Vec<PASEFFragmentData> = if uses_bruker_sdk {
839            // Sequential processing when using Bruker SDK (not thread-safe)
840            pasef_by_precursor
841                .iter()
842                .filter_map(process_precursor)
843                .collect()
844        } else {
845            // Parallel processing when using simple index converter (thread-safe)
846            let pool = ThreadPoolBuilder::new()
847                .num_threads(num_threads)
848                .build()
849                .unwrap();
850
851            pool.install(|| {
852                pasef_by_precursor
853                    .par_iter()
854                    .filter_map(|item| process_precursor(item))
855                    .collect()
856            })
857        };
858
859        // Step 5: Process all fragments in parallel using the batch processor
860        process_pasef_fragments_batch(fragment_data, dataset_name, &config, num_threads)
861    }
862
863    pub fn sample_pasef_fragment_random(
864        &self,
865        target_scan_apex: i32,
866        experiment_max_scan: i32,
867    ) -> TimsFrame {
868        let pasef_meta = &self.pasef_meta;
869        let random_index = rand::random::<usize>() % pasef_meta.len();
870        let pasef_info = &pasef_meta[random_index];
871        
872        // get the frame
873        let frame = self.loader.get_frame(pasef_info.frame_id as u32);
874        
875        // get five percent of the scan range
876        let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
877        
878        // get the fragment spectrum by scan range
879        let mut filtered = frame.filter_ranged(
880            0.0,
881            2000.0,
882            (pasef_info.scan_num_begin - scan_margin) as i32,
883            (pasef_info.scan_num_end + scan_margin) as i32,
884            0.0,
885            5.0,
886            0.0,
887            1e9,
888            0,
889            i32::MAX,
890        );
891
892        // Safety check
893        if filtered.scan.is_empty() {
894            return filtered;
895        }
896
897        // Compute median scan
898        let mut scan_copy = filtered.scan.clone();
899        scan_copy.sort_unstable();
900        let median_scan = scan_copy[scan_copy.len() / 2];
901
902        // Compute shift
903        let scan_shift = target_scan_apex - median_scan;
904
905        // Apply shift to scan values
906        for s in filtered.scan.iter_mut() {
907            *s += scan_shift;
908        }
909
910        // Refilter to clip shifted scans that fall outside valid bounds
911        let re_filtered = filtered.filter_ranged(
912            0.0,
913            2000.0,
914            0,
915            experiment_max_scan,
916            0.0,
917            5.0,
918            0.0,
919            1e9,
920            0,
921            i32::MAX,
922        );
923
924        re_filtered
925    }
926    
927    pub fn sample_pasef_fragments_random(
928        &self,
929        target_scan_apex_values: Vec<i32>,
930        experiment_max_scan: i32,
931    ) -> TimsFrame {
932
933        // return empty frame is target_scan_apex_values is empty
934        if target_scan_apex_values.is_empty() {
935            return TimsFrame {
936                frame_id: 0, // Replace with a suitable default value
937                ms_type: MsType::FragmentDda,
938                scan: Vec::new(),
939                tof: Vec::new(),
940                ims_frame: ImsFrame::default(), // Uses the default implementation for `ImsFrame`
941            }
942        }
943        
944        let mut pasef_frames = Vec::new();
945        
946        for target_scan_apex in target_scan_apex_values {
947            let pasef_frame = self.sample_pasef_fragment_random(target_scan_apex, experiment_max_scan);
948            pasef_frames.push(pasef_frame);
949        }
950        
951        // create combined frame by summing the frame structures, they override add
952        let mut combined_frame = pasef_frames[0].clone();
953        
954        for frame in pasef_frames.iter().skip(1) {
955            combined_frame = combined_frame + frame.clone();
956        }
957
958        // re-calculate ion mobility
959        let im_values = self.scan_to_inverse_mobility(
960            combined_frame.frame_id as u32,
961            &combined_frame.scan.iter().map(|x| *x as u32).collect(),
962        );
963
964        // Update the inverse mobility values
965        combined_frame.ims_frame.mobility = std::sync::Arc::new(im_values);
966        
967        combined_frame
968    }
969
970    pub fn sample_precursor_signal(
971        &self,
972        num_frames: usize,
973        max_intensity: f64,
974        take_probability: f64,
975    ) -> TimsFrame {
976        // get all precursor frames
977        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
978        let precursor_frames = meta_data.iter().filter(|x| x.ms_ms_type == 0);
979
980        // randomly sample num_frames
981        let mut rng = rand::thread_rng();
982        let mut sampled_frames: Vec<TimsFrame> = Vec::new();
983
984        // go through each frame and sample the data
985        for frame in precursor_frames.choose_multiple(&mut rng, num_frames) {
986            let frame_id = frame.id;
987            let frame_data = self
988                .loader
989                .get_frame(frame_id as u32)
990                .filter_ranged(0.0, 2000.0, 0, 1000, 0.0, 5.0, 1.0, max_intensity, 0, i32::MAX)
991                .generate_random_sample(take_probability);
992            sampled_frames.push(frame_data);
993        }
994
995        // get the first frame
996        let mut sampled_frame = sampled_frames.remove(0);
997
998        // sum all the other frames to the first frame
999        for frame in sampled_frames {
1000            sampled_frame = sampled_frame + frame;
1001        }
1002
1003        sampled_frame
1004    }
1005}
1006
1007impl TimsData for TimsDatasetDDA {
1008    fn get_frame(&self, frame_id: u32) -> TimsFrame {
1009        self.loader.get_frame(frame_id)
1010    }
1011
1012    fn get_raw_frame(&self, frame_id: u32) -> RawTimsFrame {
1013        self.loader.get_raw_frame(frame_id)
1014    }
1015
1016    fn get_slice(&self, frame_ids: Vec<u32>, num_threads: usize) -> TimsSlice {
1017        self.loader.get_slice(frame_ids, num_threads)
1018    }
1019
1020    fn get_acquisition_mode(&self) -> AcquisitionMode {
1021        self.loader.get_acquisition_mode().clone()
1022    }
1023
1024    fn get_frame_count(&self) -> i32 {
1025        self.loader.get_frame_count()
1026    }
1027
1028    fn get_data_path(&self) -> &str {
1029        &self.loader.get_data_path()
1030    }
1031}
1032
1033impl IndexConverter for TimsDatasetDDA {
1034    fn tof_to_mz(&self, frame_id: u32, tof_values: &Vec<u32>) -> Vec<f64> {
1035        self.loader
1036            .get_index_converter()
1037            .tof_to_mz(frame_id, tof_values)
1038    }
1039
1040    fn mz_to_tof(&self, frame_id: u32, mz_values: &Vec<f64>) -> Vec<u32> {
1041        self.loader
1042            .get_index_converter()
1043            .mz_to_tof(frame_id, mz_values)
1044    }
1045
1046    fn scan_to_inverse_mobility(&self, frame_id: u32, scan_values: &Vec<u32>) -> Vec<f64> {
1047        self.loader
1048            .get_index_converter()
1049            .scan_to_inverse_mobility(frame_id, scan_values)
1050    }
1051
1052    fn inverse_mobility_to_scan(
1053        &self,
1054        frame_id: u32,
1055        inverse_mobility_values: &Vec<f64>,
1056    ) -> Vec<u32> {
1057        self.loader
1058            .get_index_converter()
1059            .inverse_mobility_to_scan(frame_id, inverse_mobility_values)
1060    }
1061}