mito2/read/
pruner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pruner for parallel file pruning across scanner partitions.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::FileRange;
36use crate::sst::parquet::reader::ReaderMetrics;
37
38/// Number of files to pre-fetch ahead of the current position.
39const PREFETCH_COUNT: usize = 8;
40
41/// Local pruner in a partition that supports prefetching files to prune.
42pub struct PartitionPruner {
43    pruner: Arc<Pruner>,
44    /// Files to prune, in the order to scan.
45    file_indices: Vec<usize>,
46    /// Current position for tracking pre-fetch progress.
47    current_position: AtomicUsize,
48}
49
50impl PartitionPruner {
51    /// Creates a new `PartitionPruner` for the given partition ranges.
52    pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
53        let mut file_indices = Vec::with_capacity(pruner.inner.stream_ctx.input.num_files());
54        let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
55
56        let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
57        for part_range in partition_ranges {
58            let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
59            for row_group_index in &range_meta.row_group_indices {
60                if pruner
61                    .inner
62                    .stream_ctx
63                    .is_file_range_index(*row_group_index)
64                {
65                    let file_index = row_group_index.index - num_memtables;
66                    if dedup_set.contains(&file_index) {
67                        continue;
68                    } else {
69                        file_indices.push(file_index);
70                        dedup_set.insert(file_index);
71                    }
72                }
73            }
74        }
75
76        Self {
77            pruner,
78            file_indices,
79            current_position: AtomicUsize::new(0),
80        }
81    }
82
83    /// Gets or creates the FileRangeBuilder for a file.
84    ///
85    /// This method also triggers pre-fetching of upcoming files in the background
86    /// to improve performance by overlapping I/O with computation.
87    pub async fn build_file_ranges(
88        &self,
89        index: RowGroupIndex,
90        partition_metrics: &PartitionMetrics,
91        reader_metrics: &mut ReaderMetrics,
92    ) -> Result<SmallVec<[FileRange; 2]>> {
93        let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
94
95        // Delegate to underlying Pruner
96        let ranges = self
97            .pruner
98            .build_file_ranges(index, partition_metrics, reader_metrics)
99            .await?;
100
101        // Find position and trigger pre-fetch for upcoming files
102        if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
103            let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
104            if pos > prev_pos || prev_pos == 0 {
105                self.prefetch_upcoming_files(pos, partition_metrics);
106            }
107        }
108
109        Ok(ranges)
110    }
111
112    /// Pre-fetches upcoming files starting from the given position.
113    fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
114        let start = current_pos + 1;
115        let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
116
117        for i in start..end {
118            self.pruner
119                .get_file_builder_background(self.file_indices[i], Some(partition_metrics.clone()));
120        }
121    }
122}
123
124/// A pruner that prunes files for all partitions of a scanner.
125pub struct Pruner {
126    /// Channels to send requests to workers.
127    worker_senders: Vec<mpsc::Sender<PruneRequest>>,
128    inner: Arc<PrunerInner>,
129}
130
131struct PrunerInner {
132    /// Number of worker tasks.
133    num_workers: usize,
134    /// Per-file state (indexed by file_index).
135    file_entries: Vec<Mutex<FileBuilderEntry>>,
136    /// StreamContext containing all context needed for pruning.
137    stream_ctx: Arc<StreamContext>,
138}
139
140/// Per-file state tracking.
141struct FileBuilderEntry {
142    /// Cached builder after pruning. None if not yet built or already cleared.
143    builder: Option<Arc<FileRangeBuilder>>,
144    /// Number of remaining ranges to scan for this file.
145    /// When this reaches 0, the builder is dropped for memory cleanup.
146    remaining_ranges: usize,
147    /// Waiters when pruning is in-progress.
148    waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
149}
150
151/// Request to prune a file.
152struct PruneRequest {
153    /// Index of the file in ScanInput.files.
154    file_index: usize,
155    /// Oneshot channel to send back the result.
156    response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
157    /// Partition metrics for merging reader metrics.
158    partition_metrics: Option<PartitionMetrics>,
159}
160
161impl Pruner {
162    /// Creates a new Pruner with N worker tasks.
163    ///
164    /// Initially all file_entries have `remaining_ranges = 0`.
165    /// Call `add_partition_ranges()` to initialize ref counts.
166    pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
167        let num_files = stream_ctx.input.num_files();
168        let file_entries: Vec<_> = (0..num_files)
169            .map(|_| {
170                Mutex::new(FileBuilderEntry {
171                    builder: None,
172                    remaining_ranges: 0,
173                    waiters: Vec::new(),
174                })
175            })
176            .collect();
177
178        // Create channels and collect senders
179        let mut worker_senders = Vec::with_capacity(num_workers);
180        let mut receivers = Vec::with_capacity(num_workers);
181        for _ in 0..num_workers {
182            let (tx, rx) = mpsc::channel::<PruneRequest>(64);
183            worker_senders.push(tx);
184            receivers.push(rx);
185        }
186
187        let inner = Arc::new(PrunerInner {
188            num_workers,
189            file_entries,
190            stream_ctx,
191        });
192
193        // Spawn worker tasks with their receivers
194        for (worker_id, rx) in receivers.into_iter().enumerate() {
195            let inner_clone = inner.clone();
196            common_runtime::spawn_global(async move {
197                Self::worker_loop(worker_id, rx, inner_clone).await;
198            });
199        }
200
201        Self {
202            worker_senders,
203            inner,
204        }
205    }
206
207    /// Adds reference counts for all partitions' ranges.
208    pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
209        // Add reference counts for each partition range
210        let num_memtables = self.inner.stream_ctx.input.num_memtables();
211        for part_range in partition_ranges {
212            let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
213            for row_group_index in &range_meta.row_group_indices {
214                if self.inner.stream_ctx.is_file_range_index(*row_group_index) {
215                    let file_index = row_group_index.index - num_memtables;
216                    if file_index < self.inner.file_entries.len() {
217                        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
218                        entry.remaining_ranges += 1;
219                    }
220                }
221            }
222        }
223    }
224
225    /// Gets or creates the FileRangeBuilder for a file, builds ranges,
226    /// and decrements ref count (cleans up if zero).
227    ///
228    /// Callers should invoke [add_partition_ranges()](Pruner::add_partition_ranges()) to initialize the
229    /// file entries and ref counts.
230    pub async fn build_file_ranges(
231        &self,
232        index: RowGroupIndex,
233        partition_metrics: &PartitionMetrics,
234        reader_metrics: &mut ReaderMetrics,
235    ) -> Result<SmallVec<[FileRange; 2]>> {
236        let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
237
238        // Get builder (from cache or by pruning)
239        let builder = self
240            .get_file_builder(file_index, partition_metrics, reader_metrics)
241            .await?;
242
243        // Build ranges
244        let mut ranges = SmallVec::new();
245        builder.build_ranges(index.row_group_index, &mut ranges);
246
247        // Decrement ref count and cleanup if needed
248        self.decrement_and_maybe_clear(file_index, reader_metrics);
249
250        Ok(ranges)
251    }
252
253    /// Gets or creates the FileRangeBuilder for a file.
254    async fn get_file_builder(
255        &self,
256        file_index: usize,
257        partition_metrics: &PartitionMetrics,
258        reader_metrics: &mut ReaderMetrics,
259    ) -> Result<Arc<FileRangeBuilder>> {
260        // Fast path: checks cache
261        {
262            let entry = self.inner.file_entries[file_index].lock().unwrap();
263            if let Some(builder) = &entry.builder {
264                reader_metrics.filter_metrics.pruner_cache_hit += 1;
265                return Ok(builder.clone());
266            }
267        }
268
269        reader_metrics.filter_metrics.pruner_cache_miss += 1;
270        let prune_start = Instant::now();
271        let file = &self.inner.stream_ctx.input.files[file_index];
272        let file_id = file.file_id().file_id();
273        let worker_idx = self.get_worker_idx(file_id);
274
275        let (response_tx, response_rx) = oneshot::channel();
276        let request = PruneRequest {
277            file_index,
278            response_tx: Some(response_tx),
279            partition_metrics: Some(partition_metrics.clone()),
280        };
281
282        let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
283            common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
284            // Worker channel closed, falls back to direct pruning
285            self.prune_file_directly(file_index, reader_metrics).await
286        } else {
287            // Waits for response
288            match response_rx.await {
289                Ok(result) => result,
290                Err(_) => {
291                    common_telemetry::warn!(
292                        "Response channel closed, falling back to direct pruning"
293                    );
294                    // Channel closed, falls back to direct pruning
295                    self.prune_file_directly(file_index, reader_metrics).await
296                }
297            }
298        };
299        reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
300        result
301    }
302
303    /// Gets or creates the FileRangeBuilder for a file.
304    pub fn get_file_builder_background(
305        &self,
306        file_index: usize,
307        partition_metrics: Option<PartitionMetrics>,
308    ) {
309        // Fast path: checks cache
310        {
311            let entry = self.inner.file_entries[file_index].lock().unwrap();
312            if entry.builder.is_some() {
313                return;
314            }
315        }
316
317        let file = &self.inner.stream_ctx.input.files[file_index];
318        let file_id = file.file_id().file_id();
319        let worker_idx = self.get_worker_idx(file_id);
320
321        let request = PruneRequest {
322            file_index,
323            response_tx: None,
324            partition_metrics,
325        };
326
327        // Sends request to worker
328        let _ = self.worker_senders[worker_idx].try_send(request);
329    }
330
331    fn get_worker_idx(&self, file_id: FileId) -> usize {
332        let file_id_hash = Uuid::from(file_id).as_u128() as usize;
333        file_id_hash % self.inner.num_workers
334    }
335
336    /// Prunes a file directly without going through a worker.
337    /// Used as fallback when worker channels are closed.
338    async fn prune_file_directly(
339        &self,
340        file_index: usize,
341        reader_metrics: &mut ReaderMetrics,
342    ) -> Result<Arc<FileRangeBuilder>> {
343        let file = &self.inner.stream_ctx.input.files[file_index];
344        let builder = self
345            .inner
346            .stream_ctx
347            .input
348            .prune_file(file, reader_metrics)
349            .await?;
350
351        let arc_builder = Arc::new(builder);
352
353        // Caches the builder
354        {
355            let mut entry = self.inner.file_entries[file_index].lock().unwrap();
356            if entry.builder.is_none() {
357                reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
358                reader_metrics.num_range_builders += 1;
359                entry.builder = Some(arc_builder.clone());
360                PRUNER_ACTIVE_BUILDERS.inc();
361            }
362        }
363
364        Ok(arc_builder)
365    }
366
367    /// Decrements ref count and clears builder if no longer needed.
368    fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
369        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
370        entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
371
372        if entry.remaining_ranges == 0
373            && let Some(builder) = entry.builder.take()
374        {
375            PRUNER_ACTIVE_BUILDERS.dec();
376            reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
377            reader_metrics.num_range_builders -= 1;
378        }
379    }
380
381    /// Worker loop that processes prune requests.
382    async fn worker_loop(
383        worker_id: usize,
384        mut rx: mpsc::Receiver<PruneRequest>,
385        inner: Arc<PrunerInner>,
386    ) {
387        let mut worker_cache_hit = 0;
388        let mut worker_cache_miss = 0;
389        let mut pruned_files = Vec::new();
390
391        while let Some(request) = rx.recv().await {
392            let PruneRequest {
393                file_index,
394                response_tx,
395                partition_metrics,
396            } = request;
397
398            // Check if already cached or in-progress
399            {
400                let entry = inner.file_entries[file_index].lock().unwrap();
401
402                if let Some(builder) = &entry.builder {
403                    // Cache hit - send immediately
404                    if let Some(response_tx) = response_tx {
405                        let _ = response_tx.send(Ok(builder.clone()));
406                    }
407                    worker_cache_hit += 1;
408                    continue;
409                }
410            }
411            worker_cache_miss += 1;
412
413            // Do the actual pruning (outside lock)
414            let file = &inner.stream_ctx.input.files[file_index];
415            pruned_files.push(file.file_id().file_id());
416            let mut metrics = ReaderMetrics::default();
417            let result = inner.stream_ctx.input.prune_file(file, &mut metrics).await;
418
419            // Update state and notify waiters
420            let mut entry = inner.file_entries[file_index].lock().unwrap();
421            match result {
422                Ok(builder) => {
423                    let arc_builder = Arc::new(builder);
424                    entry.builder = Some(arc_builder.clone());
425                    PRUNER_ACTIVE_BUILDERS.inc();
426
427                    // Notify all waiters
428                    for waiter in entry.waiters.drain(..) {
429                        let _ = waiter.send(Ok(arc_builder.clone()));
430                    }
431                    if let Some(response_tx) = response_tx {
432                        let _ = response_tx.send(Ok(arc_builder));
433                    }
434
435                    debug!(
436                        "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
437                        worker_id,
438                        file_index,
439                        file.file_id(),
440                        metrics
441                    );
442
443                    // Merge metrics to partition if provided
444                    if let Some(part_metrics) = &partition_metrics {
445                        let per_file_metrics = if part_metrics.explain_verbose() {
446                            let file_id = file.file_id();
447                            let mut map = HashMap::new();
448                            map.insert(
449                                file_id,
450                                FileScanMetrics {
451                                    build_part_cost: metrics.build_cost,
452                                    ..Default::default()
453                                },
454                            );
455                            Some(map)
456                        } else {
457                            None
458                        };
459                        part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
460                    }
461                }
462                Err(e) => {
463                    let arc_error = Arc::new(e);
464                    for waiter in entry.waiters.drain(..) {
465                        let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
466                    }
467                    if let Some(response_tx) = response_tx {
468                        let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
469                    }
470                }
471            }
472        }
473
474        common_telemetry::debug!(
475            "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
476            worker_id,
477            worker_cache_hit,
478            worker_cache_miss,
479            pruned_files,
480        );
481    }
482}