Skip to main content

mito2/read/
pruner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pruner for parallel file pruning across scanner partitions.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
36use crate::sst::parquet::reader::ReaderMetrics;
37
38/// Number of files to pre-fetch ahead of the current position.
39const PREFETCH_COUNT: usize = 8;
40
41/// Local pruner in a partition that supports prefetching files to prune.
42pub struct PartitionPruner {
43    pruner: Arc<Pruner>,
44    /// Files to prune, in the order to scan.
45    file_indices: Vec<usize>,
46    /// Per-file pre-filter mode lookup indexed by file_index.
47    pre_filter_modes: Vec<PreFilterMode>,
48    /// Current position for tracking pre-fetch progress.
49    current_position: AtomicUsize,
50}
51
52impl PartitionPruner {
53    /// Creates a new `PartitionPruner` for the given partition ranges.
54    pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
55        let num_files = pruner.inner.stream_ctx.input.num_files();
56        let mut file_indices = Vec::with_capacity(num_files);
57        let mut pre_filter_modes = vec![PreFilterMode::SkipFields; num_files];
58        let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
59
60        let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
61        for part_range in partition_ranges {
62            let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
63            let pre_filter_mode = pruner.inner.stream_ctx.range_pre_filter_mode(part_range);
64            for row_group_index in &range_meta.row_group_indices {
65                if pruner
66                    .inner
67                    .stream_ctx
68                    .is_file_range_index(*row_group_index)
69                {
70                    let file_index = row_group_index.index - num_memtables;
71                    if dedup_set.contains(&file_index) {
72                        continue;
73                    } else {
74                        file_indices.push(file_index);
75                        pre_filter_modes[file_index] = pre_filter_mode;
76                        dedup_set.insert(file_index);
77                    }
78                }
79            }
80        }
81
82        Self {
83            pruner,
84            file_indices,
85            pre_filter_modes,
86            current_position: AtomicUsize::new(0),
87        }
88    }
89
90    /// Gets or creates the FileRangeBuilder for a file.
91    ///
92    /// This method also triggers pre-fetching of upcoming files in the background
93    /// to improve performance by overlapping I/O with computation.
94    pub async fn build_file_ranges(
95        &self,
96        index: RowGroupIndex,
97        partition_metrics: &PartitionMetrics,
98        reader_metrics: &mut ReaderMetrics,
99    ) -> Result<SmallVec<[FileRange; 2]>> {
100        let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
101        let pre_filter_mode = self.pre_filter_mode(file_index);
102
103        // Delegate to underlying Pruner
104        let ranges = self
105            .pruner
106            .build_file_ranges(index, pre_filter_mode, partition_metrics, reader_metrics)
107            .await?;
108
109        // Find position and trigger pre-fetch for upcoming files
110        if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
111            let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
112            if pos > prev_pos || prev_pos == 0 {
113                self.prefetch_upcoming_files(pos, partition_metrics);
114            }
115        }
116
117        Ok(ranges)
118    }
119
120    /// Pre-fetches upcoming files starting from the given position.
121    fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
122        let start = current_pos + 1;
123        let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
124
125        for i in start..end {
126            let file_index = self.file_indices[i];
127            let pre_filter_mode = self.pre_filter_mode(file_index);
128            self.pruner.get_file_builder_background(
129                file_index,
130                pre_filter_mode,
131                Some(partition_metrics.clone()),
132            );
133        }
134    }
135
136    fn pre_filter_mode(&self, file_index: usize) -> PreFilterMode {
137        self.pre_filter_modes
138            .get(file_index)
139            .copied()
140            .unwrap_or(PreFilterMode::SkipFields)
141    }
142}
143
144/// A pruner that prunes files for all partitions of a scanner.
145pub struct Pruner {
146    /// Channels to send requests to workers.
147    worker_senders: Vec<mpsc::Sender<PruneRequest>>,
148    inner: Arc<PrunerInner>,
149}
150
151struct PrunerInner {
152    /// Number of worker tasks.
153    num_workers: usize,
154    /// Per-file state (indexed by file_index).
155    file_entries: Vec<Mutex<FileBuilderEntry>>,
156    /// StreamContext containing all context needed for pruning.
157    stream_ctx: Arc<StreamContext>,
158}
159
160/// Per-file state tracking.
161struct FileBuilderEntry {
162    /// Cached builder after pruning. None if not yet built or already cleared.
163    builder: Option<Arc<FileRangeBuilder>>,
164    /// Number of remaining ranges to scan for this file.
165    /// When this reaches 0, the builder is dropped for memory cleanup.
166    remaining_ranges: usize,
167    /// Waiters when pruning is in-progress.
168    waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
169}
170
171/// Request to prune a file.
172struct PruneRequest {
173    /// Index of the file in ScanInput.files.
174    file_index: usize,
175    /// Pre-filter mode to use for the file.
176    pre_filter_mode: PreFilterMode,
177    /// Oneshot channel to send back the result.
178    response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
179    /// Partition metrics for merging reader metrics.
180    partition_metrics: Option<PartitionMetrics>,
181}
182
183impl Pruner {
184    /// Creates a new Pruner with N worker tasks.
185    ///
186    /// Initially all file_entries have `remaining_ranges = 0`.
187    /// Call `add_partition_ranges()` to initialize ref counts.
188    pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
189        let num_files = stream_ctx.input.num_files();
190        let file_entries: Vec<_> = (0..num_files)
191            .map(|_| {
192                Mutex::new(FileBuilderEntry {
193                    builder: None,
194                    remaining_ranges: 0,
195                    waiters: Vec::new(),
196                })
197            })
198            .collect();
199        // Create channels and collect senders
200        let mut worker_senders = Vec::with_capacity(num_workers);
201        let mut receivers = Vec::with_capacity(num_workers);
202        for _ in 0..num_workers {
203            let (tx, rx) = mpsc::channel::<PruneRequest>(64);
204            worker_senders.push(tx);
205            receivers.push(rx);
206        }
207
208        let inner = Arc::new(PrunerInner {
209            num_workers,
210            file_entries,
211            stream_ctx,
212        });
213
214        // Spawn worker tasks with their receivers
215        for (worker_id, rx) in receivers.into_iter().enumerate() {
216            let inner_clone = inner.clone();
217            common_runtime::spawn_global(async move {
218                Self::worker_loop(worker_id, rx, inner_clone).await;
219            });
220        }
221
222        Self {
223            worker_senders,
224            inner,
225        }
226    }
227
228    /// Adds reference counts for all partitions' ranges.
229    pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
230        // Add reference counts for each partition range
231        let num_memtables = self.inner.stream_ctx.input.num_memtables();
232        for part_range in partition_ranges {
233            let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
234            for row_group_index in &range_meta.row_group_indices {
235                if self.inner.stream_ctx.is_file_range_index(*row_group_index) {
236                    let file_index = row_group_index.index - num_memtables;
237                    if file_index < self.inner.file_entries.len() {
238                        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
239                        entry.remaining_ranges += 1;
240                    }
241                }
242            }
243        }
244    }
245
246    /// Gets or creates the FileRangeBuilder for a file, builds ranges,
247    /// and decrements ref count (cleans up if zero).
248    ///
249    /// Callers should invoke [add_partition_ranges()](Pruner::add_partition_ranges()) to initialize the
250    /// file entries and ref counts.
251    pub async fn build_file_ranges(
252        &self,
253        index: RowGroupIndex,
254        pre_filter_mode: PreFilterMode,
255        partition_metrics: &PartitionMetrics,
256        reader_metrics: &mut ReaderMetrics,
257    ) -> Result<SmallVec<[FileRange; 2]>> {
258        let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
259
260        // Get builder (from cache or by pruning)
261        let builder = self
262            .get_file_builder(
263                file_index,
264                pre_filter_mode,
265                partition_metrics,
266                reader_metrics,
267            )
268            .await?;
269
270        // Build ranges
271        let mut ranges = SmallVec::new();
272        builder.build_ranges(index.row_group_index, &mut ranges);
273
274        // Decrement ref count and cleanup if needed
275        self.decrement_and_maybe_clear(file_index, reader_metrics);
276
277        Ok(ranges)
278    }
279
280    /// Gets or creates the FileRangeBuilder for a file.
281    async fn get_file_builder(
282        &self,
283        file_index: usize,
284        pre_filter_mode: PreFilterMode,
285        partition_metrics: &PartitionMetrics,
286        reader_metrics: &mut ReaderMetrics,
287    ) -> Result<Arc<FileRangeBuilder>> {
288        // Fast path: checks cache
289        {
290            let entry = self.inner.file_entries[file_index].lock().unwrap();
291            if let Some(builder) = &entry.builder {
292                reader_metrics.filter_metrics.pruner_cache_hit += 1;
293                return Ok(builder.clone());
294            }
295        }
296
297        reader_metrics.filter_metrics.pruner_cache_miss += 1;
298        let prune_start = Instant::now();
299        let file = &self.inner.stream_ctx.input.files[file_index];
300        let file_id = file.file_id().file_id();
301        let worker_idx = self.get_worker_idx(file_id);
302
303        let (response_tx, response_rx) = oneshot::channel();
304        let request = PruneRequest {
305            file_index,
306            pre_filter_mode,
307            response_tx: Some(response_tx),
308            partition_metrics: Some(partition_metrics.clone()),
309        };
310
311        let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
312            common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
313            // Worker channel closed, falls back to direct pruning
314            self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
315                .await
316        } else {
317            // Waits for response
318            match response_rx.await {
319                Ok(result) => result,
320                Err(_) => {
321                    common_telemetry::warn!(
322                        "Response channel closed, falling back to direct pruning"
323                    );
324                    // Channel closed, falls back to direct pruning
325                    self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
326                        .await
327                }
328            }
329        };
330        reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
331        result
332    }
333
334    /// Gets or creates the FileRangeBuilder for a file.
335    pub fn get_file_builder_background(
336        &self,
337        file_index: usize,
338        pre_filter_mode: PreFilterMode,
339        partition_metrics: Option<PartitionMetrics>,
340    ) {
341        // Fast path: checks cache
342        {
343            let entry = self.inner.file_entries[file_index].lock().unwrap();
344            if entry.builder.is_some() {
345                return;
346            }
347        }
348
349        let file = &self.inner.stream_ctx.input.files[file_index];
350        let file_id = file.file_id().file_id();
351        let worker_idx = self.get_worker_idx(file_id);
352
353        let request = PruneRequest {
354            file_index,
355            pre_filter_mode,
356            response_tx: None,
357            partition_metrics,
358        };
359
360        // Sends request to worker
361        let _ = self.worker_senders[worker_idx].try_send(request);
362    }
363
364    fn get_worker_idx(&self, file_id: FileId) -> usize {
365        let file_id_hash = Uuid::from(file_id).as_u128() as usize;
366        file_id_hash % self.inner.num_workers
367    }
368
369    /// Prunes a file directly without going through a worker.
370    /// Used as fallback when worker channels are closed.
371    async fn prune_file_directly(
372        &self,
373        file_index: usize,
374        pre_filter_mode: PreFilterMode,
375        reader_metrics: &mut ReaderMetrics,
376    ) -> Result<Arc<FileRangeBuilder>> {
377        let file = &self.inner.stream_ctx.input.files[file_index];
378        let builder = self
379            .inner
380            .stream_ctx
381            .input
382            .prune_file(file, pre_filter_mode, reader_metrics)
383            .await?;
384
385        let arc_builder = Arc::new(builder);
386
387        // Caches the builder
388        {
389            let mut entry = self.inner.file_entries[file_index].lock().unwrap();
390            if entry.builder.is_none() {
391                reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
392                reader_metrics.num_range_builders += 1;
393                entry.builder = Some(arc_builder.clone());
394                PRUNER_ACTIVE_BUILDERS.inc();
395            }
396        }
397
398        Ok(arc_builder)
399    }
400
401    /// Decrements ref count and clears builder if no longer needed.
402    fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
403        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
404        entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
405
406        if entry.remaining_ranges == 0
407            && let Some(builder) = entry.builder.take()
408        {
409            PRUNER_ACTIVE_BUILDERS.dec();
410            reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
411            reader_metrics.num_range_builders -= 1;
412        }
413    }
414
415    /// Worker loop that processes prune requests.
416    async fn worker_loop(
417        worker_id: usize,
418        mut rx: mpsc::Receiver<PruneRequest>,
419        inner: Arc<PrunerInner>,
420    ) {
421        let mut worker_cache_hit = 0;
422        let mut worker_cache_miss = 0;
423        let mut pruned_files = Vec::new();
424
425        while let Some(request) = rx.recv().await {
426            let PruneRequest {
427                file_index,
428                pre_filter_mode,
429                response_tx,
430                partition_metrics,
431            } = request;
432
433            // Check if already cached or in-progress
434            {
435                let entry = inner.file_entries[file_index].lock().unwrap();
436                if let Some(builder) = &entry.builder {
437                    // Cache hit - send immediately
438                    if let Some(response_tx) = response_tx {
439                        let _ = response_tx.send(Ok(builder.clone()));
440                    }
441                    worker_cache_hit += 1;
442                    continue;
443                }
444            }
445            worker_cache_miss += 1;
446
447            // Do the actual pruning (outside lock)
448            let file = &inner.stream_ctx.input.files[file_index];
449            pruned_files.push(file.file_id().file_id());
450            let mut metrics = ReaderMetrics::default();
451            let result = inner
452                .stream_ctx
453                .input
454                .prune_file(file, pre_filter_mode, &mut metrics)
455                .await;
456
457            // Update state and notify waiters
458            let mut entry = inner.file_entries[file_index].lock().unwrap();
459            match result {
460                Ok(builder) => {
461                    let arc_builder = Arc::new(builder);
462                    entry.builder = Some(arc_builder.clone());
463                    PRUNER_ACTIVE_BUILDERS.inc();
464
465                    // Notify all waiters
466                    for waiter in entry.waiters.drain(..) {
467                        let _ = waiter.send(Ok(arc_builder.clone()));
468                    }
469                    if let Some(response_tx) = response_tx {
470                        let _ = response_tx.send(Ok(arc_builder));
471                    }
472
473                    debug!(
474                        "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
475                        worker_id,
476                        file_index,
477                        file.file_id(),
478                        metrics
479                    );
480
481                    // Merge metrics to partition if provided
482                    if let Some(part_metrics) = &partition_metrics {
483                        let per_file_metrics = if part_metrics.explain_verbose() {
484                            let file_id = file.file_id();
485                            let mut map = HashMap::new();
486                            map.insert(
487                                file_id,
488                                FileScanMetrics {
489                                    build_part_cost: metrics.build_cost,
490                                    ..Default::default()
491                                },
492                            );
493                            Some(map)
494                        } else {
495                            None
496                        };
497                        part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
498                    }
499                }
500                Err(e) => {
501                    let arc_error = Arc::new(e);
502                    for waiter in entry.waiters.drain(..) {
503                        let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
504                    }
505                    if let Some(response_tx) = response_tx {
506                        let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
507                    }
508                }
509            }
510        }
511
512        common_telemetry::debug!(
513            "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
514            worker_id,
515            worker_cache_hit,
516            worker_cache_miss,
517            pruned_files,
518        );
519    }
520}