rustdf/data/
utility.rs

1use byteorder::{ByteOrder, LittleEndian};
2use mscore::timstof::frame::TimsFrame;
3use rayon::iter::IntoParallelRefIterator;
4use rayon::prelude::*;
5use rayon::ThreadPoolBuilder;
6use std::io;
7use std::io::{Read, Write};
8
9/// Decompresses a ZSTD compressed byte array
10///
11/// # Arguments
12///
13/// * `compressed_data` - A byte slice that holds the compressed data
14///
15/// # Returns
16///
17/// * `decompressed_data` - A vector of u8 that holds the decompressed data
18///
19pub fn zstd_decompress(compressed_data: &[u8]) -> io::Result<Vec<u8>> {
20    let mut decoder = zstd::Decoder::new(compressed_data)?;
21    let mut decompressed_data = Vec::new();
22    decoder.read_to_end(&mut decompressed_data)?;
23    Ok(decompressed_data)
24}
25
26/// Compresses a byte array using ZSTD
27///
28/// # Arguments
29///
30/// * `decompressed_data` - A byte slice that holds the decompressed data
31///
32/// # Returns
33///
34/// * `compressed_data` - A vector of u8 that holds the compressed data
35///
36pub fn zstd_compress(decompressed_data: &[u8], compression_level: i32) -> io::Result<Vec<u8>> {
37    let mut encoder = zstd::Encoder::new(Vec::new(), compression_level)?;
38    encoder.write_all(decompressed_data)?;
39    let compressed_data = encoder.finish()?;
40    Ok(compressed_data)
41}
42
43pub fn reconstruct_compressed_data(
44    scans: Vec<u32>,
45    mut tofs: Vec<u32>,
46    intensities: Vec<u32>,
47    total_scans: u32,
48    compression_level: i32,
49) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
50    // Ensuring all vectors have the same length
51    assert_eq!(scans.len(), tofs.len());
52    assert_eq!(scans.len(), intensities.len());
53
54    // Modify TOFs based on scans
55    modify_tofs(&mut tofs, &scans);
56
57    // Get peak counts from total scans and scans
58    let peak_cnts = get_peak_cnts(total_scans, &scans);
59
60    // Interleave TOFs and intensities
61    let mut interleaved = Vec::new();
62    for (&tof, &intensity) in tofs.iter().zip(intensities.iter()) {
63        interleaved.push(tof);
64        interleaved.push(intensity);
65    }
66
67    // Get real data using the custom loop logic
68    let real_data = get_realdata(&peak_cnts, &interleaved);
69
70    // Compress real_data using zstd_compress
71    let compressed_data = zstd_compress(&real_data, compression_level)?;
72
73    // Final data preparation with compressed data
74    let mut final_data = Vec::new();
75
76    // Include the length of the compressed data as a header (4 bytes)
77    final_data.extend_from_slice(&(compressed_data.len() as u32 + 8).to_le_bytes());
78
79    // Include total_scans as part of the header
80    final_data.extend_from_slice(&total_scans.to_le_bytes());
81
82    // Include the compressed data itself
83    final_data.extend_from_slice(&compressed_data);
84
85    Ok(final_data)
86}
87
88pub fn compress_collection(
89    frames: Vec<TimsFrame>,
90    max_scan_count: u32,
91    compression_level: i32,
92    num_threads: usize,
93) -> Vec<Vec<u8>> {
94    let pool = ThreadPoolBuilder::new()
95        .num_threads(num_threads)
96        .build()
97        .unwrap();
98
99    let result = pool.install(|| {
100        frames
101            .par_iter()
102            .map(|frame| {
103                let compressed_data = reconstruct_compressed_data(
104                    frame.scan.iter().map(|&x| x as u32).collect(),
105                    frame.tof.iter().map(|&x| x as u32).collect(),
106                    frame
107                        .ims_frame
108                        .intensity
109                        .iter()
110                        .map(|&x| x as u32)
111                        .collect(),
112                    max_scan_count,
113                    compression_level,
114                )
115                .unwrap();
116                compressed_data
117            })
118            .collect()
119    });
120    result
121}
122
123/// Parses the decompressed bruker binary data
124///
125/// # Arguments
126///
127/// * `decompressed_bytes` - A byte slice that holds the decompressed data
128///
129/// # Returns
130///
131/// * `scan_indices` - A vector of u32 that holds the scan indices
132/// * `tof_indices` - A vector of u32 that holds the tof indices
133/// * `intensities` - A vector of u32 that holds the intensities
134///
135pub fn parse_decompressed_bruker_binary_data(
136    decompressed_bytes: &[u8],
137) -> Result<(Vec<u32>, Vec<u32>, Vec<u32>), Box<dyn std::error::Error>> {
138    let mut buffer_u32 = Vec::new();
139
140    for i in 0..(decompressed_bytes.len() / 4) {
141        let value = LittleEndian::read_u32(&[
142            decompressed_bytes[i],
143            decompressed_bytes[i + (decompressed_bytes.len() / 4)],
144            decompressed_bytes[i + (2 * decompressed_bytes.len() / 4)],
145            decompressed_bytes[i + (3 * decompressed_bytes.len() / 4)],
146        ]);
147        buffer_u32.push(value);
148    }
149
150    // get the number of scans
151    let scan_count = buffer_u32[0] as usize;
152
153    // get the scan indices
154    let mut scan_indices: Vec<u32> = buffer_u32[..scan_count].to_vec();
155    for index in &mut scan_indices {
156        *index /= 2;
157    }
158
159    // first scan index is always 0?
160    scan_indices[0] = 0;
161
162    // get the tof indices, which are the first half of the buffer after the scan indices
163    let mut tof_indices: Vec<u32> = buffer_u32
164        .iter()
165        .skip(scan_count)
166        .step_by(2)
167        .cloned()
168        .collect();
169
170    // get the intensities, which are the second half of the buffer
171    let intensities: Vec<u32> = buffer_u32
172        .iter()
173        .skip(scan_count + 1)
174        .step_by(2)
175        .cloned()
176        .collect();
177
178    // calculate the last scan before moving scan indices
179    let last_scan = intensities.len() as u32 - scan_indices[1..].iter().sum::<u32>();
180
181    // shift the scan indices to the right
182    for i in 0..(scan_indices.len() - 1) {
183        scan_indices[i] = scan_indices[i + 1];
184    }
185
186    // set the last scan index
187    let len = scan_indices.len();
188    scan_indices[len - 1] = last_scan;
189
190    // convert the tof indices to cumulative sums
191    let mut index = 0;
192    for &size in &scan_indices {
193        let mut current_sum = 0;
194        for _ in 0..size {
195            current_sum += tof_indices[index];
196            tof_indices[index] = current_sum;
197            index += 1;
198        }
199    }
200
201    // adjust the tof indices to be zero-indexed
202    let adjusted_tof_indices: Vec<u32> = tof_indices.iter().map(|&val| val - 1).collect();
203    Ok((scan_indices, adjusted_tof_indices, intensities))
204}
205
206pub fn get_peak_cnts(total_scans: u32, scans: &[u32]) -> Vec<u32> {
207    let mut peak_cnts = vec![total_scans];
208    let mut ii = 0;
209    for scan_id in 1..total_scans {
210        let mut counter = 0;
211        while ii < scans.len() && scans[ii] < scan_id {
212            ii += 1;
213            counter += 1;
214        }
215        peak_cnts.push(counter * 2);
216    }
217    peak_cnts
218}
219
220pub fn modify_tofs(tofs: &mut [u32], scans: &[u32]) {
221    let mut last_tof = -1i32; // Using i32 to allow -1
222    let mut last_scan = 0;
223    for ii in 0..tofs.len() {
224        if last_scan != scans[ii] {
225            last_tof = -1;
226            last_scan = scans[ii];
227        }
228        let val = tofs[ii] as i32; // Cast to i32 for calculation
229        tofs[ii] = (val - last_tof) as u32; // Cast back to u32
230        last_tof = val;
231    }
232}
233
234pub fn get_realdata(peak_cnts: &[u32], interleaved: &[u32]) -> Vec<u8> {
235    let mut back_data = Vec::new();
236
237    // Convert peak counts to bytes and add to back_data
238    for &cnt in peak_cnts {
239        back_data.extend_from_slice(&cnt.to_le_bytes());
240    }
241
242    // Convert interleaved data to bytes and add to back_data
243    for &value in interleaved {
244        back_data.extend_from_slice(&value.to_le_bytes());
245    }
246
247    // Call get_realdata_loop for data rearrangement
248    get_realdata_loop(&back_data)
249}
250
251pub fn get_realdata_loop(back_data: &[u8]) -> Vec<u8> {
252    let mut real_data = vec![0u8; back_data.len()];
253    let mut reminder = 0;
254    let mut bd_idx = 0;
255    for rd_idx in 0..back_data.len() {
256        if bd_idx >= back_data.len() {
257            reminder += 1;
258            bd_idx = reminder;
259        }
260        real_data[rd_idx] = back_data[bd_idx];
261        bd_idx += 4;
262    }
263    real_data
264}
265
266pub fn get_data_for_compression(
267    tofs: &Vec<u32>,
268    scans: &Vec<u32>,
269    intensities: &Vec<u32>,
270    max_scans: u32,
271) -> Vec<u8> {
272    let mut tof_copy = tofs.clone();
273    modify_tofs(&mut tof_copy, &scans);
274    let peak_cnts = get_peak_cnts(max_scans, &scans);
275    let interleaved: Vec<u32> = tofs
276        .iter()
277        .zip(intensities.iter())
278        .flat_map(|(tof, intensity)| vec![*tof, *intensity])
279        .collect();
280
281    get_realdata(&peak_cnts, &interleaved)
282}
283
284pub fn get_data_for_compression_par(
285    tofs: Vec<Vec<u32>>,
286    scans: Vec<Vec<u32>>,
287    intensities: Vec<Vec<u32>>,
288    max_scans: u32,
289    num_threads: usize,
290) -> Vec<Vec<u8>> {
291    let pool = ThreadPoolBuilder::new()
292        .num_threads(num_threads)
293        .build()
294        .unwrap();
295
296    let result = pool.install(|| {
297        tofs.par_iter()
298            .zip(scans.par_iter())
299            .zip(intensities.par_iter())
300            .map(|((tof, scan), intensity)| {
301                get_data_for_compression(tof, scan, intensity, max_scans)
302            })
303            .collect()
304    });
305
306    result
307}
308
309pub fn flatten_scan_values(scan: &Vec<u32>, zero_indexed: bool) -> Vec<u32> {
310    let add = if zero_indexed { 0 } else { 1 };
311    scan.iter()
312        .enumerate()
313        .flat_map(|(index, &count)| vec![(index + add) as u32; count as usize].into_iter())
314        .collect()
315}