rustdf/data/
dda.rs

1use crate::data::acquisition::AcquisitionMode;
2use crate::data::handle::{IndexConverter, TimsData, TimsDataLoader};
3use crate::data::meta::{read_dda_precursor_meta, read_global_meta_sql, read_meta_data_sql, read_pasef_frame_ms_ms_info, DDAPrecursor, DDAPrecursorMeta, PasefMsMsMeta};
4use mscore::timstof::frame::{ImsFrame, RawTimsFrame, TimsFrame};
5use mscore::timstof::slice::TimsSlice;
6use mscore::timstof::spectrum_processing::{
7    PASEFFragmentData, PreprocessedSpectrum, SpectrumProcessingConfig,
8    process_pasef_fragments_batch,
9};
10use rayon::prelude::*;
11use rayon::ThreadPoolBuilder;
12use std::collections::BTreeMap;
13use rand::prelude::IteratorRandom;
14use mscore::data::spectrum::MsType;
15use std::collections::HashMap;
16
17#[derive(Clone)]
18pub struct PASEFDDAFragment {
19    pub frame_id: u32,
20    pub precursor_id: u32,
21    pub collision_energy: f64,
22    pub selected_fragment: TimsFrame,
23}
24
25/// Statistical moments of a 1D signal distribution
26#[derive(Clone, Debug, Default)]
27pub struct SignalMoments {
28    pub mean: f64,
29    pub variance: f64,
30    pub skewness: f64,
31    pub apex: f64,
32    pub fwhm: f64,
33    pub total_intensity: f64,
34}
35
36impl SignalMoments {
37    /// Calculate moments from coordinate and intensity arrays
38    pub fn from_signal(coords: &[f64], intensities: &[f64]) -> Self {
39        if coords.is_empty() || intensities.iter().sum::<f64>() == 0.0 {
40            return Self::default();
41        }
42
43        let total: f64 = intensities.iter().sum();
44
45        // First moment (weighted mean)
46        let mean: f64 = coords.iter()
47            .zip(intensities.iter())
48            .map(|(c, i)| c * i / total)
49            .sum();
50
51        // Second moment (weighted variance)
52        let variance: f64 = coords.iter()
53            .zip(intensities.iter())
54            .map(|(c, i)| i / total * (c - mean).powi(2))
55            .sum();
56
57        // Third moment (weighted skewness)
58        let std = variance.sqrt().max(1e-10);
59        let skewness: f64 = coords.iter()
60            .zip(intensities.iter())
61            .map(|(c, i)| i / total * ((c - mean) / std).powi(3))
62            .sum();
63
64        // Apex (position of maximum)
65        let apex_idx = intensities.iter()
66            .enumerate()
67            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
68            .map(|(i, _)| i)
69            .unwrap_or(0);
70        let apex = coords.get(apex_idx).copied().unwrap_or(0.0);
71
72        // FWHM estimation
73        let half_max = intensities.get(apex_idx).copied().unwrap_or(0.0) / 2.0;
74        let above_half: Vec<f64> = coords.iter()
75            .zip(intensities.iter())
76            .filter(|(_, i)| **i >= half_max)
77            .map(|(c, _)| *c)
78            .collect();
79        let fwhm = if above_half.len() >= 2 {
80            above_half.last().unwrap_or(&0.0) - above_half.first().unwrap_or(&0.0)
81        } else {
82            2.355 * std  // Gaussian approximation
83        };
84
85        SignalMoments {
86            mean,
87            variance,
88            skewness,
89            apex,
90            fwhm,
91            total_intensity: total,
92        }
93    }
94}
95
96/// MS1 precursor signal extracted from surrounding frames
97#[derive(Clone, Debug)]
98pub struct PrecursorMS1Signal {
99    pub precursor_id: u32,
100
101    // XIC (chromatographic profile) - 1D projection
102    pub rt_coords: Vec<f64>,           // RT in seconds
103    pub rt_intensities: Vec<f64>,
104    pub rt_moments: SignalMoments,
105
106    // Mobilogram (IM profile) - 1D projection
107    pub im_coords: Vec<f64>,           // 1/K0
108    pub im_intensities: Vec<f64>,
109    pub im_moments: SignalMoments,
110
111    // Isotope envelope - 1D projection
112    pub isotope_mz: Vec<f64>,
113    pub isotope_intensity: Vec<f64>,
114    pub mz_moments: SignalMoments,
115
116    // Raw 2D data (all peaks from filtered MS1 frames in RT window, merged)
117    pub raw_rt: Vec<f64>,              // RT per peak (seconds)
118    pub raw_mz: Vec<f64>,              // m/z per peak
119    pub raw_mobility: Vec<f64>,        // 1/K0 per peak
120    pub raw_intensity: Vec<f64>,       // intensity per peak
121}
122
123/// Input for MS1 extraction - precursor coordinates
124#[derive(Clone, Debug)]
125pub struct PrecursorCoord {
126    pub precursor_id: u32,
127    pub mz: f64,           // largest_peak_mz - used as fallback if mono_mz is 0
128    pub mono_mz: f64,      // Monoisotopic m/z for isotope envelope (M+0 starting point), 0 if unknown
129    pub rt_seconds: f64,
130    pub mobility: f64,     // Center mobility from fragment
131    pub im_start: f64,     // Fragment selection IM start (for plotting)
132    pub im_end: f64,       // Fragment selection IM end (for plotting)
133    pub charge: i32,
134}
135
136pub struct TimsDatasetDDA {
137    pub loader: TimsDataLoader,
138    pub pasef_meta: Vec<PasefMsMsMeta>,
139}
140
141impl TimsDatasetDDA {
142    pub fn new(
143        bruker_lib_path: &str,
144        data_path: &str,
145        in_memory: bool,
146        use_bruker_sdk: bool,
147    ) -> Self {
148        // TODO: error handling
149        let global_meta_data = read_global_meta_sql(data_path).unwrap();
150        let meta_data = read_meta_data_sql(data_path).unwrap();
151
152        let scan_max_index = meta_data.iter().map(|x| x.num_scans).max().unwrap() as u32;
153        let im_lower = global_meta_data.one_over_k0_range_lower;
154        let im_upper = global_meta_data.one_over_k0_range_upper;
155
156        let tof_max_index = global_meta_data.tof_max_index;
157        let mz_lower = global_meta_data.mz_acquisition_range_lower;
158        let mz_upper = global_meta_data.mz_acquisition_range_upper;
159
160        let loader = match in_memory {
161            true => TimsDataLoader::new_in_memory(
162                bruker_lib_path,
163                data_path,
164                use_bruker_sdk,
165                scan_max_index,
166                im_lower,
167                im_upper,
168                tof_max_index,
169                mz_lower,
170                mz_upper,
171            ),
172            false => TimsDataLoader::new_lazy(
173                bruker_lib_path,
174                data_path,
175                use_bruker_sdk,
176                scan_max_index,
177                im_lower,
178                im_upper,
179                tof_max_index,
180                mz_lower,
181                mz_upper,
182            ),
183        };
184        
185        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
186
187        TimsDatasetDDA { loader, pasef_meta }
188    }
189
190    /// Create a new DDA dataset with pre-computed ion mobility calibration lookup table.
191    ///
192    /// This enables accurate ion mobility calibration with fast parallel extraction.
193    /// The im_lookup table should be pre-computed using the Bruker SDK.
194    ///
195    /// # Arguments
196    /// * `data_path` - Path to the .d folder
197    /// * `in_memory` - Whether to load all data into memory
198    /// * `im_lookup` - Pre-computed scan→1/K0 lookup table from Bruker SDK
199    ///
200    /// # Returns
201    /// A new TimsDatasetDDA with LookupIndexConverter (thread-safe, accurate)
202    pub fn new_with_calibration(
203        data_path: &str,
204        in_memory: bool,
205        im_lookup: Vec<f64>,
206    ) -> Self {
207        let global_meta_data = read_global_meta_sql(data_path).unwrap();
208
209        let tof_max_index = global_meta_data.tof_max_index;
210        let mz_lower = global_meta_data.mz_acquisition_range_lower;
211        let mz_upper = global_meta_data.mz_acquisition_range_upper;
212
213        let loader = match in_memory {
214            true => TimsDataLoader::new_in_memory_with_calibration(
215                data_path,
216                tof_max_index,
217                mz_lower,
218                mz_upper,
219                im_lookup,
220            ),
221            false => TimsDataLoader::new_lazy_with_calibration(
222                data_path,
223                tof_max_index,
224                mz_lower,
225                mz_upper,
226                im_lookup,
227            ),
228        };
229
230        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
231
232        TimsDatasetDDA { loader, pasef_meta }
233    }
234
235    /// Create a DDA dataset with regression-derived m/z calibration.
236    ///
237    /// This method uses externally-provided m/z calibration coefficients (e.g., from
238    /// linear regression on precursor data) instead of the simple boundary model.
239    ///
240    /// # Arguments
241    /// * `data_path` - Path to the .d folder
242    /// * `in_memory` - Whether to load all data into memory
243    /// * `tof_intercept` - Intercept for sqrt(mz) = intercept + slope * tof
244    /// * `tof_slope` - Slope for sqrt(mz) = intercept + slope * tof
245    ///
246    /// # Returns
247    /// A new TimsDatasetDDA with CalibratedIndexConverter (thread-safe, accurate)
248    pub fn new_with_mz_calibration(
249        data_path: &str,
250        in_memory: bool,
251        tof_intercept: f64,
252        tof_slope: f64,
253    ) -> Self {
254        let global_meta_data = read_global_meta_sql(data_path).unwrap();
255        let frame_meta = read_meta_data_sql(data_path).unwrap();
256
257        let scan_max_index = frame_meta.iter().map(|x| x.num_scans).max().unwrap() as u32;
258        let im_lower = global_meta_data.one_over_k0_range_lower;
259        let im_upper = global_meta_data.one_over_k0_range_upper;
260
261        let loader = match in_memory {
262            true => TimsDataLoader::new_in_memory_with_mz_calibration(
263                data_path,
264                tof_intercept,
265                tof_slope,
266                im_lower,
267                im_upper,
268                scan_max_index,
269            ),
270            false => TimsDataLoader::new_lazy_with_mz_calibration(
271                data_path,
272                tof_intercept,
273                tof_slope,
274                im_lower,
275                im_upper,
276                scan_max_index,
277            ),
278        };
279
280        let pasef_meta = read_pasef_frame_ms_ms_info(data_path).unwrap();
281
282        TimsDatasetDDA { loader, pasef_meta }
283    }
284
285    /// Check if the Bruker SDK is being used for index conversion.
286    /// Returns false for both Simple and Lookup converters (which are thread-safe).
287    pub fn uses_bruker_sdk(&self) -> bool {
288        self.loader.uses_bruker_sdk()
289    }
290
291    pub fn get_selected_precursors(&self) -> Vec<DDAPrecursor> {
292        let precursor_meta = read_dda_precursor_meta(&self.loader.get_data_path()).unwrap();
293        let pasef_meta = &self.pasef_meta;
294
295        let precursor_id_to_pasef_meta: BTreeMap<i64, &PasefMsMsMeta> = pasef_meta
296            .iter()
297            .map(|x| (x.precursor_id as i64, x))
298            .collect();
299
300        // go over all precursors and get the precursor meta data
301        let result: Vec<_> = precursor_meta
302            .iter()
303            .map(|precursor| {
304                let pasef_meta = precursor_id_to_pasef_meta
305                    .get(&precursor.precursor_id)
306                    .unwrap();
307                DDAPrecursor {
308                    frame_id: precursor.precursor_frame_id,
309                    precursor_id: precursor.precursor_id,
310                    mono_mz: precursor.precursor_mz_monoisotopic,
311                    highest_intensity_mz: precursor.precursor_mz_highest_intensity,
312                    average_mz: precursor.precursor_mz_average,
313                    charge: precursor.precursor_charge,
314                    inverse_ion_mobility: self.scan_to_inverse_mobility(
315                        precursor.precursor_frame_id as u32,
316                        &vec![precursor.precursor_average_scan_number as u32],
317                    )[0],
318                    collision_energy: pasef_meta.collision_energy,
319                    precuror_total_intensity: precursor.precursor_total_intensity,
320                    isolation_mz: pasef_meta.isolation_mz,
321                    isolation_width: pasef_meta.isolation_width,
322                }
323            })
324            .collect();
325
326        result
327    }
328
329    pub fn get_precursor_frames(
330        &self,
331        min_intensity: f64,
332        max_num_peaks: usize,
333        num_threads: usize,
334    ) -> Vec<TimsFrame> {
335        // get all precursor frames
336        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
337
338        // get the precursor frames
339        let precursor_frames = meta_data.iter().filter(|x| x.ms_ms_type == 0);
340
341        let tims_silce =
342            self.get_slice(precursor_frames.map(|x| x.id as u32).collect(), num_threads);
343
344        let result: Vec<_> = tims_silce
345            .frames
346            .par_iter()
347            .map(|frame| {
348                frame
349                    .filter_ranged(0.0, 2000.0, 0, 2000, 0.0, 5.0, min_intensity, 1e9, 0, i32::MAX)
350                    .top_n(max_num_peaks)
351            })
352            .collect();
353
354        result
355    }
356
357    /// Extract MS1 precursor signals for a batch of precursors in parallel.
358    ///
359    /// For each precursor, extracts:
360    /// - XIC (chromatographic profile) from MS1 frames in RT window
361    /// - Mobilogram (IM profile)
362    /// - Isotope envelope
363    /// - Statistical moments (mean, variance, skewness, apex, FWHM) for each dimension
364    ///
365    /// Uses batched processing to avoid loading all MS1 frames at once.
366    ///
367    /// # Arguments
368    /// * `precursor_coords` - Vector of precursor coordinates (id, mz, rt_sec, mobility, charge)
369    /// * `rt_window_sec` - RT window in seconds (total width)
370    /// * `mz_tol_ppm` - m/z tolerance in ppm
371    /// * `im_window` - IM window in 1/K0 units (total width)
372    /// * `n_isotopes` - Number of isotope peaks to extract
373    /// * `num_threads` - Number of threads for parallel processing
374    ///
375    /// # Returns
376    /// Vector of PrecursorMS1Signal, one per input precursor
377    pub fn extract_precursor_ms1_signals(
378        &self,
379        precursor_coords: Vec<PrecursorCoord>,
380        rt_window_sec: f64,
381        mz_tol_ppm: f64,
382        im_window: f64,
383        n_isotopes: usize,
384        num_threads: usize,
385    ) -> Vec<PrecursorMS1Signal> {
386        if precursor_coords.is_empty() {
387            return Vec::new();
388        }
389
390        // Get frame metadata
391        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
392
393        // Get all MS1 frame info sorted by time
394        let mut ms1_frame_info: Vec<(u32, f64)> = meta_data
395            .iter()
396            .filter(|f| f.ms_ms_type == 0)
397            .map(|f| (f.id as u32, f.time))
398            .collect();
399        ms1_frame_info.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
400
401        let ms1_times: Vec<f64> = ms1_frame_info.iter().map(|(_, t)| *t).collect();
402
403        // Sort precursors by RT for batched processing
404        let mut sorted_coords: Vec<(usize, &PrecursorCoord)> = precursor_coords
405            .iter()
406            .enumerate()
407            .collect();
408        sorted_coords.sort_by(|a, b| a.1.rt_seconds.partial_cmp(&b.1.rt_seconds).unwrap());
409
410        // Process in RT batches (5 minute chunks = 300 sec)
411        let batch_size_sec = 300.0;
412        let mut results: Vec<(usize, PrecursorMS1Signal)> = Vec::with_capacity(precursor_coords.len());
413
414        let mut batch_start = 0;
415        while batch_start < sorted_coords.len() {
416            // Find batch end (all precursors within batch_size_sec of first)
417            let batch_rt_start = sorted_coords[batch_start].1.rt_seconds;
418            let batch_rt_end = batch_rt_start + batch_size_sec;
419
420            let mut batch_end = batch_start;
421            while batch_end < sorted_coords.len() && sorted_coords[batch_end].1.rt_seconds < batch_rt_end {
422                batch_end += 1;
423            }
424
425            // Determine MS1 frames needed for this batch (with RT window margin)
426            let frame_rt_min = batch_rt_start - rt_window_sec;
427            let frame_rt_max = batch_rt_end + rt_window_sec;
428
429            let frame_start_idx = ms1_times.partition_point(|t| *t < frame_rt_min);
430            let frame_end_idx = ms1_times.partition_point(|t| *t <= frame_rt_max);
431
432            // Load frames for this batch
433            let batch_frame_ids: Vec<u32> = ms1_frame_info[frame_start_idx..frame_end_idx]
434                .iter()
435                .map(|(id, _)| *id)
436                .collect();
437
438            let batch_frames = if !batch_frame_ids.is_empty() {
439                self.loader.get_slice(batch_frame_ids, num_threads)
440            } else {
441                TimsSlice { frames: Vec::new() }
442            };
443
444            let batch_times: Vec<f64> = ms1_times[frame_start_idx..frame_end_idx].to_vec();
445
446            // Process precursors in this batch in parallel
447            let batch_coords = &sorted_coords[batch_start..batch_end];
448
449            let pool = ThreadPoolBuilder::new()
450                .num_threads(num_threads)
451                .build()
452                .unwrap();
453
454            let batch_results: Vec<(usize, PrecursorMS1Signal)> = pool.install(|| {
455                batch_coords.par_iter().map(|(orig_idx, coord)| {
456                    let signal = Self::extract_single_precursor(
457                        coord,
458                        &batch_frames.frames,
459                        &batch_times,
460                        rt_window_sec,
461                        mz_tol_ppm,
462                        im_window,
463                        n_isotopes,
464                    );
465                    (*orig_idx, signal)
466                }).collect()
467            });
468
469            results.extend(batch_results);
470            batch_start = batch_end;
471        }
472
473        // Restore original order
474        results.sort_by_key(|(idx, _)| *idx);
475        results.into_iter().map(|(_, signal)| signal).collect()
476    }
477
478    /// Extract MS1 signal for a single precursor from pre-loaded frames
479    fn extract_single_precursor(
480        coord: &PrecursorCoord,
481        frames: &[TimsFrame],
482        frame_times: &[f64],
483        rt_window_sec: f64,
484        mz_tol_ppm: f64,
485        im_window: f64,
486        n_isotopes: usize,
487    ) -> PrecursorMS1Signal {
488        let rt_sec = coord.rt_seconds;
489
490        // Binary search to find RT window bounds within batch frames
491        let rt_min = rt_sec - rt_window_sec / 2.0;
492        let rt_max = rt_sec + rt_window_sec / 2.0;
493
494        let start_idx = frame_times.partition_point(|t| *t < rt_min);
495        let end_idx = frame_times.partition_point(|t| *t <= rt_max);
496
497        // Calculate isotope spacing
498        let isotope_spacing = 1.003355 / (coord.charge.max(1) as f64);
499        let n_isotopes_to_extract = 4.min(n_isotopes); // M+0 to M+3 only
500
501        // Determine base m/z for extraction:
502        // - If mono_mz > 0: use it as starting point for isotope range
503        // - Otherwise: fall back to largest_peak_mz (single peak extraction)
504        let has_mono_mz = coord.mono_mz > 0.0;
505        let base_mz = if has_mono_mz { coord.mono_mz } else { coord.mz };
506        let mz_tol = base_mz * mz_tol_ppm / 1e6;
507
508        // Calculate isotope m/z values (M+0 through M+3)
509        let isotope_mz_values: Vec<f64> = (0..n_isotopes_to_extract)
510            .map(|i| base_mz + (i as f64) * isotope_spacing)
511            .collect();
512
513        // m/z range for XIC/mobilogram:
514        // - If mono_mz available: use full isotope range (M+0 to M+3) for better S/N
515        // - Otherwise: use single m/z (largest_peak_mz)
516        let (xic_mz_min, xic_mz_max) = if has_mono_mz {
517            // Full isotope range: M+0 - tolerance to M+3 + tolerance
518            (base_mz - mz_tol, base_mz + ((n_isotopes_to_extract - 1) as f64) * isotope_spacing + mz_tol)
519        } else {
520            // Single peak: largest_peak_mz ± tolerance
521            (coord.mz - mz_tol, coord.mz + mz_tol)
522        };
523
524        // IM range (uses im_window parameter, doubled from default for better coverage)
525        let im_min = coord.mobility - im_window / 2.0;
526        let im_max = coord.mobility + im_window / 2.0;
527
528        // Accumulators
529        let n_frames = end_idx.saturating_sub(start_idx);
530        let mut rt_coords = Vec::with_capacity(n_frames);
531        let mut rt_intensities = Vec::with_capacity(n_frames);
532        let mut im_dict: HashMap<i64, f64> = HashMap::new();
533        let mut isotope_intensity = vec![0.0f64; n_isotopes];  // Keep original size for output
534
535        // Accumulators for raw 2D data (merged from all frames)
536        let mut raw_rt = Vec::new();
537        let mut raw_mz = Vec::new();
538        let mut raw_mobility = Vec::new();
539        let mut raw_intensity = Vec::new();
540
541        // Extract from each MS1 frame in the RT window
542        for idx in start_idx..end_idx.min(frames.len()) {
543            let frame_time = frame_times[idx];
544            let frame = &frames[idx];
545
546            // === XIC and Mobilogram: Extract from isotope range (or single m/z if no mono_mz) ===
547            let xic_filtered = frame.filter_ranged(
548                xic_mz_min, xic_mz_max,
549                0, 1000,
550                im_min, im_max,
551                0.0, 1e9,
552                0, i32::MAX,
553            );
554
555            // XIC: sum intensity at this RT from all isotopes
556            let xic_intensity: f64 = xic_filtered.ims_frame.intensity.iter().sum();
557            rt_coords.push(frame_time);
558            rt_intensities.push(xic_intensity);
559
560            // Mobilogram: accumulate from all isotopes in m/z range
561            for (mob, inten) in xic_filtered.ims_frame.mobility.iter().zip(xic_filtered.ims_frame.intensity.iter()) {
562                let mob_bin = (*mob * 1000.0).round() as i64;
563                *im_dict.entry(mob_bin).or_insert(0.0) += *inten;
564            }
565
566            // === Isotope envelope: Extract M+0 through M+3 individually ===
567            // Use same filtered data (already in isotope range)
568            for (iso_idx, iso_mz) in isotope_mz_values.iter().enumerate() {
569                let iso_peak_min = iso_mz - mz_tol;
570                let iso_peak_max = iso_mz + mz_tol;
571                let iso_intensity_sum: f64 = xic_filtered.ims_frame.mz.iter()
572                    .zip(xic_filtered.ims_frame.intensity.iter())
573                    .filter(|(mz, _)| **mz >= iso_peak_min && **mz <= iso_peak_max)
574                    .map(|(_, i)| *i)
575                    .sum();
576                isotope_intensity[iso_idx] += iso_intensity_sum;
577            }
578
579            // Raw 2D data: store all peaks from filtered range
580            let n_peaks = xic_filtered.ims_frame.mz.len();
581            for i in 0..n_peaks {
582                raw_rt.push(frame_time);
583                raw_mz.push(xic_filtered.ims_frame.mz[i]);
584                raw_mobility.push(xic_filtered.ims_frame.mobility[i]);
585                raw_intensity.push(xic_filtered.ims_frame.intensity[i]);
586            }
587        }
588
589        // Convert mobilogram accumulator to sorted arrays
590        let mut im_entries: Vec<(i64, f64)> = im_dict.into_iter().collect();
591        im_entries.sort_by_key(|(k, _)| *k);
592        let im_coords: Vec<f64> = im_entries.iter().map(|(k, _)| *k as f64 / 1000.0).collect();
593        let im_intensities: Vec<f64> = im_entries.iter().map(|(_, v)| *v).collect();
594
595        // Calculate moments
596        let rt_moments = SignalMoments::from_signal(&rt_coords, &rt_intensities);
597        let im_moments = SignalMoments::from_signal(&im_coords, &im_intensities);
598        let mz_moments = SignalMoments::from_signal(&isotope_mz_values, &isotope_intensity);
599
600        PrecursorMS1Signal {
601            precursor_id: coord.precursor_id,
602            rt_coords,
603            rt_intensities,
604            rt_moments,
605            im_coords,
606            im_intensities,
607            im_moments,
608            isotope_mz: isotope_mz_values,
609            isotope_intensity,
610            mz_moments,
611            raw_rt,
612            raw_mz,
613            raw_mobility,
614            raw_intensity,
615        }
616    }
617
618    pub fn get_pasef_frame_ms_ms_info(&self) -> Vec<PasefMsMsMeta> {
619        read_pasef_frame_ms_ms_info(&self.loader.get_data_path()).unwrap()
620    }
621
622    /// Get the fragment spectra for all PASEF selected precursors
623    pub fn get_pasef_fragments(&self, num_threads: usize) -> Vec<PASEFDDAFragment> {
624        // Delegate to the filtered version with no filter (all precursors)
625        self.get_pasef_fragments_for_precursors(None, num_threads)
626    }
627
628    /// Get fragment spectra for specific precursor IDs only.
629    /// If precursor_ids is None, returns all fragments (same as get_pasef_fragments).
630    /// This is more memory-efficient for batched processing.
631    pub fn get_pasef_fragments_for_precursors(
632        &self,
633        precursor_ids: Option<&[u32]>,
634        num_threads: usize,
635    ) -> Vec<PASEFDDAFragment> {
636        // extract fragment spectra information
637        let pasef_info = self.get_pasef_frame_ms_ms_info();
638
639        // Filter to requested precursor IDs if specified
640        let filtered_pasef_info: Vec<&PasefMsMsMeta> = match precursor_ids {
641            Some(ids) => {
642                // Create a HashSet for O(1) lookup
643                let id_set: std::collections::HashSet<u32> = ids.iter().copied().collect();
644                pasef_info.iter()
645                    .filter(|info| id_set.contains(&(info.precursor_id as u32)))
646                    .collect()
647            }
648            None => pasef_info.iter().collect(),
649        };
650
651        // Note: The Bruker SDK is NOT thread-safe, so we must use sequential iteration
652        // when the SDK is being used for index conversion.
653        let uses_bruker_sdk = self.loader.uses_bruker_sdk();
654
655        // Helper closure to process a single PASEF fragment
656        let process_fragment = |pasef_info: &PasefMsMsMeta| -> PASEFDDAFragment {
657            // get the frame
658            let frame = self.loader.get_frame(pasef_info.frame_id as u32);
659
660            // get five percent of the scan range
661            let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
662
663            // get the fragment spectrum by scan range
664            let filtered_frame = frame.filter_ranged(
665                0.0,
666                2000.0,
667                (pasef_info.scan_num_begin - scan_margin) as i32,
668                (pasef_info.scan_num_end + scan_margin) as i32,
669                0.0,
670                5.0,
671                0.0,
672                1e9,
673                0,
674                i32::MAX,
675            );
676
677            PASEFDDAFragment {
678                frame_id: pasef_info.frame_id as u32,
679                precursor_id: pasef_info.precursor_id as u32,
680                collision_energy: pasef_info.collision_energy,
681                // flatten the spectrum
682                selected_fragment: filtered_frame,
683            }
684        };
685
686        if uses_bruker_sdk {
687            // Sequential processing when using Bruker SDK (not thread-safe)
688            filtered_pasef_info.iter().map(|info| process_fragment(info)).collect()
689        } else {
690            // Parallel processing when using simple index converter (thread-safe)
691            let pool = ThreadPoolBuilder::new()
692                .num_threads(num_threads)
693                .build()
694                .unwrap();
695
696            pool.install(|| {
697                filtered_pasef_info.par_iter().map(|info| process_fragment(info)).collect()
698            })
699        }
700    }
701
702    /// Get preprocessed PASEF fragments ready for database search.
703    /// This method performs parallel processing of all fragment spectra, including:
704    /// - Flattening frames across ion mobility dimension
705    /// - Deisotoping (optional)
706    /// - Filtering to top N peaks
707    /// - Computing inverse mobility along scan marginal
708    ///
709    /// # Arguments
710    /// * `dataset_name` - Name of the dataset for generating spec_ids
711    /// * `config` - Spectrum processing configuration
712    /// * `num_threads` - Number of threads to use for parallel processing
713    ///
714    /// # Returns
715    /// Vector of preprocessed spectra ready for Sage search
716    pub fn get_preprocessed_pasef_fragments(
717        &self,
718        dataset_name: &str,
719        config: SpectrumProcessingConfig,
720        num_threads: usize,
721    ) -> Vec<PreprocessedSpectrum> {
722        // Step 1: Get raw PASEF fragments info
723        let pasef_info = self.get_pasef_frame_ms_ms_info();
724
725        // Step 2: Get precursor metadata
726        let precursor_meta = read_dda_precursor_meta(&self.loader.get_data_path()).unwrap_or_default();
727        let frame_meta = read_meta_data_sql(&self.loader.get_data_path()).unwrap_or_default();
728
729        // Create lookup maps
730        let precursor_map: BTreeMap<i64, &DDAPrecursorMeta> = precursor_meta
731            .iter()
732            .map(|p| (p.precursor_id, p))
733            .collect();
734
735        let frame_time_map: BTreeMap<i64, f64> = frame_meta
736            .iter()
737            .map(|f| (f.id, f.time / 60.0))  // Convert to minutes
738            .collect();
739
740        // Step 3: Group PASEF info by precursor_id to aggregate re-fragmented precursors
741        // This matches Python's groupby('precursor_id').agg({'raw_data': 'sum', ...})
742        let mut pasef_by_precursor: BTreeMap<i64, Vec<&PasefMsMsMeta>> = BTreeMap::new();
743        for info in &pasef_info {
744            pasef_by_precursor
745                .entry(info.precursor_id)
746                .or_insert_with(Vec::new)
747                .push(info);
748        }
749
750        // Step 4: Build fragment data with aggregation (merge frames for same precursor)
751        // Note: The Bruker SDK is NOT thread-safe, so we must use sequential iteration
752        // when the SDK is being used for index conversion.
753        let uses_bruker_sdk = self.loader.uses_bruker_sdk();
754
755        // Helper closure to process a single precursor
756        let process_precursor = |(precursor_id, pasef_infos): (&i64, &Vec<&PasefMsMsMeta>)| -> Option<PASEFFragmentData> {
757            // Get precursor metadata
758            let precursor = precursor_map.get(precursor_id)?;
759
760            // Use first PASEF info for metadata (matches Python's 'first')
761            let first_pasef = pasef_infos.first()?;
762
763            // Get retention time from first frame
764            let scan_start_time = frame_time_map.get(&first_pasef.frame_id).copied().unwrap_or(0.0);
765
766            // Collect and merge all frames for this precursor (matches Python's 'sum')
767            let mut combined_scan = Vec::new();
768            let mut combined_mobility = Vec::new();
769            let mut combined_tof = Vec::new();
770            let mut combined_mz = Vec::new();
771            let mut combined_intensity = Vec::new();
772
773            for pasef_info in pasef_infos {
774                // Get the frame and filter by scan range
775                let frame = self.loader.get_frame(pasef_info.frame_id as u32);
776
777                // Get five percent of the scan range for margin
778                let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
779
780                // Filter frame by scan range
781                let filtered_frame = frame.filter_ranged(
782                    0.0,
783                    2000.0,
784                    (pasef_info.scan_num_begin - scan_margin) as i32,
785                    (pasef_info.scan_num_end + scan_margin) as i32,
786                    0.0,
787                    5.0,
788                    0.0,
789                    1e9,
790                    0,
791                    i32::MAX,
792                );
793
794                // Append data from this frame (merge/sum behavior)
795                combined_scan.extend(filtered_frame.scan.iter());
796                combined_mobility.extend(filtered_frame.ims_frame.mobility.iter());
797                combined_tof.extend(filtered_frame.tof.iter());
798                combined_mz.extend(filtered_frame.ims_frame.mz.iter());
799                combined_intensity.extend(filtered_frame.ims_frame.intensity.iter());
800            }
801
802            if combined_mz.is_empty() {
803                return None;
804            }
805
806            // Determine precursor m/z (prefer monoisotopic, fallback to highest intensity)
807            let precursor_mz = precursor.precursor_mz_monoisotopic
808                .unwrap_or(precursor.precursor_mz_highest_intensity);
809
810            Some(PASEFFragmentData {
811                frame_id: first_pasef.frame_id as u32,
812                precursor_id: *precursor_id as u32,
813                collision_energy: first_pasef.collision_energy,
814                scan_start_time,
815                scan: combined_scan,
816                mobility: combined_mobility,
817                tof: combined_tof,
818                mz: combined_mz,
819                intensity: combined_intensity,
820                precursor_mz,
821                precursor_charge: precursor.precursor_charge.map(|c| c as i32),
822                precursor_intensity: precursor.precursor_total_intensity,
823                isolation_mz: first_pasef.isolation_mz,
824                isolation_width: first_pasef.isolation_width,
825            })
826        };
827
828        let fragment_data: Vec<PASEFFragmentData> = if uses_bruker_sdk {
829            // Sequential processing when using Bruker SDK (not thread-safe)
830            pasef_by_precursor
831                .iter()
832                .filter_map(process_precursor)
833                .collect()
834        } else {
835            // Parallel processing when using simple index converter (thread-safe)
836            let pool = ThreadPoolBuilder::new()
837                .num_threads(num_threads)
838                .build()
839                .unwrap();
840
841            pool.install(|| {
842                pasef_by_precursor
843                    .par_iter()
844                    .filter_map(|item| process_precursor(item))
845                    .collect()
846            })
847        };
848
849        // Step 5: Process all fragments in parallel using the batch processor
850        process_pasef_fragments_batch(fragment_data, dataset_name, &config, num_threads)
851    }
852
853    pub fn sample_pasef_fragment_random(
854        &self,
855        target_scan_apex: i32,
856        experiment_max_scan: i32,
857    ) -> TimsFrame {
858        let pasef_meta = &self.pasef_meta;
859        let random_index = rand::random::<usize>() % pasef_meta.len();
860        let pasef_info = &pasef_meta[random_index];
861        
862        // get the frame
863        let frame = self.loader.get_frame(pasef_info.frame_id as u32);
864        
865        // get five percent of the scan range
866        let scan_margin = (pasef_info.scan_num_end - pasef_info.scan_num_begin) / 20;
867        
868        // get the fragment spectrum by scan range
869        let mut filtered = frame.filter_ranged(
870            0.0,
871            2000.0,
872            (pasef_info.scan_num_begin - scan_margin) as i32,
873            (pasef_info.scan_num_end + scan_margin) as i32,
874            0.0,
875            5.0,
876            0.0,
877            1e9,
878            0,
879            i32::MAX,
880        );
881
882        // Safety check
883        if filtered.scan.is_empty() {
884            return filtered;
885        }
886
887        // Compute median scan
888        let mut scan_copy = filtered.scan.clone();
889        scan_copy.sort_unstable();
890        let median_scan = scan_copy[scan_copy.len() / 2];
891
892        // Compute shift
893        let scan_shift = target_scan_apex - median_scan;
894
895        // Apply shift to scan values
896        for s in filtered.scan.iter_mut() {
897            *s += scan_shift;
898        }
899
900        // Refilter to clip shifted scans that fall outside valid bounds
901        let re_filtered = filtered.filter_ranged(
902            0.0,
903            2000.0,
904            0,
905            experiment_max_scan,
906            0.0,
907            5.0,
908            0.0,
909            1e9,
910            0,
911            i32::MAX,
912        );
913
914        re_filtered
915    }
916    
917    pub fn sample_pasef_fragments_random(
918        &self,
919        target_scan_apex_values: Vec<i32>,
920        experiment_max_scan: i32,
921    ) -> TimsFrame {
922
923        // return empty frame is target_scan_apex_values is empty
924        if target_scan_apex_values.is_empty() {
925            return TimsFrame {
926                frame_id: 0, // Replace with a suitable default value
927                ms_type: MsType::FragmentDda,
928                scan: Vec::new(),
929                tof: Vec::new(),
930                ims_frame: ImsFrame::default(), // Uses the default implementation for `ImsFrame`
931            }
932        }
933        
934        let mut pasef_frames = Vec::new();
935        
936        for target_scan_apex in target_scan_apex_values {
937            let pasef_frame = self.sample_pasef_fragment_random(target_scan_apex, experiment_max_scan);
938            pasef_frames.push(pasef_frame);
939        }
940        
941        // create combined frame by summing the frame structures, they override add
942        let mut combined_frame = pasef_frames[0].clone();
943        
944        for frame in pasef_frames.iter().skip(1) {
945            combined_frame = combined_frame + frame.clone();
946        }
947
948        // re-calculate ion mobility
949        let im_values = self.scan_to_inverse_mobility(
950            combined_frame.frame_id as u32,
951            &combined_frame.scan.iter().map(|x| *x as u32).collect(),
952        );
953
954        // Update the inverse mobility values
955        combined_frame.ims_frame.mobility = std::sync::Arc::new(im_values);
956        
957        combined_frame
958    }
959
960    pub fn sample_precursor_signal(
961        &self,
962        num_frames: usize,
963        max_intensity: f64,
964        take_probability: f64,
965    ) -> TimsFrame {
966        // get all precursor frames
967        let meta_data = read_meta_data_sql(&self.loader.get_data_path()).unwrap();
968        let precursor_frames = meta_data.iter().filter(|x| x.ms_ms_type == 0);
969
970        // randomly sample num_frames
971        let mut rng = rand::thread_rng();
972        let mut sampled_frames: Vec<TimsFrame> = Vec::new();
973
974        // go through each frame and sample the data
975        for frame in precursor_frames.choose_multiple(&mut rng, num_frames) {
976            let frame_id = frame.id;
977            let frame_data = self
978                .loader
979                .get_frame(frame_id as u32)
980                .filter_ranged(0.0, 2000.0, 0, 1000, 0.0, 5.0, 1.0, max_intensity, 0, i32::MAX)
981                .generate_random_sample(take_probability);
982            sampled_frames.push(frame_data);
983        }
984
985        // get the first frame
986        let mut sampled_frame = sampled_frames.remove(0);
987
988        // sum all the other frames to the first frame
989        for frame in sampled_frames {
990            sampled_frame = sampled_frame + frame;
991        }
992
993        sampled_frame
994    }
995}
996
997impl TimsData for TimsDatasetDDA {
998    fn get_frame(&self, frame_id: u32) -> TimsFrame {
999        self.loader.get_frame(frame_id)
1000    }
1001
1002    fn get_raw_frame(&self, frame_id: u32) -> RawTimsFrame {
1003        self.loader.get_raw_frame(frame_id)
1004    }
1005
1006    fn get_slice(&self, frame_ids: Vec<u32>, num_threads: usize) -> TimsSlice {
1007        self.loader.get_slice(frame_ids, num_threads)
1008    }
1009
1010    fn get_acquisition_mode(&self) -> AcquisitionMode {
1011        self.loader.get_acquisition_mode().clone()
1012    }
1013
1014    fn get_frame_count(&self) -> i32 {
1015        self.loader.get_frame_count()
1016    }
1017
1018    fn get_data_path(&self) -> &str {
1019        &self.loader.get_data_path()
1020    }
1021}
1022
1023impl IndexConverter for TimsDatasetDDA {
1024    fn tof_to_mz(&self, frame_id: u32, tof_values: &Vec<u32>) -> Vec<f64> {
1025        self.loader
1026            .get_index_converter()
1027            .tof_to_mz(frame_id, tof_values)
1028    }
1029
1030    fn mz_to_tof(&self, frame_id: u32, mz_values: &Vec<f64>) -> Vec<u32> {
1031        self.loader
1032            .get_index_converter()
1033            .mz_to_tof(frame_id, mz_values)
1034    }
1035
1036    fn scan_to_inverse_mobility(&self, frame_id: u32, scan_values: &Vec<u32>) -> Vec<f64> {
1037        self.loader
1038            .get_index_converter()
1039            .scan_to_inverse_mobility(frame_id, scan_values)
1040    }
1041
1042    fn inverse_mobility_to_scan(
1043        &self,
1044        frame_id: u32,
1045        inverse_mobility_values: &Vec<f64>,
1046    ) -> Vec<u32> {
1047        self.loader
1048            .get_index_converter()
1049            .inverse_mobility_to_scan(frame_id, inverse_mobility_values)
1050    }
1051}