mito2/read/
pruner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pruner for parallel file pruning across scanner partitions.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::FileRange;
36use crate::sst::parquet::reader::ReaderMetrics;
37
38/// Number of files to pre-fetch ahead of the current position.
39const PREFETCH_COUNT: usize = 8;
40
41/// Local pruner in a partition that supports prefetching files to prune.
42pub struct PartitionPruner {
43    pruner: Arc<Pruner>,
44    /// Files to prune, in the order to scan.
45    file_indices: Vec<usize>,
46    /// Current position for tracking pre-fetch progress.
47    current_position: AtomicUsize,
48}
49
50impl PartitionPruner {
51    /// Creates a new `PartitionPruner` for the given partition ranges.
52    pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
53        let mut file_indices = Vec::with_capacity(pruner.inner.stream_ctx.input.num_files());
54        let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
55
56        let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
57        for part_range in partition_ranges {
58            let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
59            for row_group_index in &range_meta.row_group_indices {
60                if row_group_index.index >= num_memtables {
61                    let file_index = row_group_index.index - num_memtables;
62                    if dedup_set.contains(&file_index) {
63                        continue;
64                    } else {
65                        file_indices.push(file_index);
66                        dedup_set.insert(file_index);
67                    }
68                }
69            }
70        }
71
72        Self {
73            pruner,
74            file_indices,
75            current_position: AtomicUsize::new(0),
76        }
77    }
78
79    /// Gets or creates the FileRangeBuilder for a file.
80    ///
81    /// This method also triggers pre-fetching of upcoming files in the background
82    /// to improve performance by overlapping I/O with computation.
83    pub async fn build_file_ranges(
84        &self,
85        index: RowGroupIndex,
86        partition_metrics: &PartitionMetrics,
87        reader_metrics: &mut ReaderMetrics,
88    ) -> Result<SmallVec<[FileRange; 2]>> {
89        let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
90
91        // Delegate to underlying Pruner
92        let ranges = self
93            .pruner
94            .build_file_ranges(index, partition_metrics, reader_metrics)
95            .await?;
96
97        // Find position and trigger pre-fetch for upcoming files
98        if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
99            let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
100            if pos > prev_pos || prev_pos == 0 {
101                self.prefetch_upcoming_files(pos, partition_metrics);
102            }
103        }
104
105        Ok(ranges)
106    }
107
108    /// Pre-fetches upcoming files starting from the given position.
109    fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
110        let start = current_pos + 1;
111        let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
112
113        for i in start..end {
114            self.pruner
115                .get_file_builder_background(self.file_indices[i], Some(partition_metrics.clone()));
116        }
117    }
118}
119
120/// A pruner that prunes files for all partitions of a scanner.
121pub struct Pruner {
122    /// Channels to send requests to workers.
123    worker_senders: Vec<mpsc::Sender<PruneRequest>>,
124    inner: Arc<PrunerInner>,
125}
126
127struct PrunerInner {
128    /// Number of worker tasks.
129    num_workers: usize,
130    /// Per-file state (indexed by file_index).
131    file_entries: Vec<Mutex<FileBuilderEntry>>,
132    /// StreamContext containing all context needed for pruning.
133    stream_ctx: Arc<StreamContext>,
134}
135
136/// Per-file state tracking.
137struct FileBuilderEntry {
138    /// Cached builder after pruning. None if not yet built or already cleared.
139    builder: Option<Arc<FileRangeBuilder>>,
140    /// Number of remaining ranges to scan for this file.
141    /// When this reaches 0, the builder is dropped for memory cleanup.
142    remaining_ranges: usize,
143    /// Waiters when pruning is in-progress.
144    waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
145}
146
147/// Request to prune a file.
148struct PruneRequest {
149    /// Index of the file in ScanInput.files.
150    file_index: usize,
151    /// Oneshot channel to send back the result.
152    response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
153    /// Partition metrics for merging reader metrics.
154    partition_metrics: Option<PartitionMetrics>,
155}
156
157impl Pruner {
158    /// Creates a new Pruner with N worker tasks.
159    ///
160    /// Initially all file_entries have `remaining_ranges = 0`.
161    /// Call `add_partition_ranges()` to initialize ref counts.
162    pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
163        let num_files = stream_ctx.input.num_files();
164        let file_entries: Vec<_> = (0..num_files)
165            .map(|_| {
166                Mutex::new(FileBuilderEntry {
167                    builder: None,
168                    remaining_ranges: 0,
169                    waiters: Vec::new(),
170                })
171            })
172            .collect();
173
174        // Create channels and collect senders
175        let mut worker_senders = Vec::with_capacity(num_workers);
176        let mut receivers = Vec::with_capacity(num_workers);
177        for _ in 0..num_workers {
178            let (tx, rx) = mpsc::channel::<PruneRequest>(64);
179            worker_senders.push(tx);
180            receivers.push(rx);
181        }
182
183        let inner = Arc::new(PrunerInner {
184            num_workers,
185            file_entries,
186            stream_ctx,
187        });
188
189        // Spawn worker tasks with their receivers
190        for (worker_id, rx) in receivers.into_iter().enumerate() {
191            let inner_clone = inner.clone();
192            common_runtime::spawn_global(async move {
193                Self::worker_loop(worker_id, rx, inner_clone).await;
194            });
195        }
196
197        Self {
198            worker_senders,
199            inner,
200        }
201    }
202
203    /// Adds reference counts for all partitions' ranges.
204    pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
205        // Add reference counts for each partition range
206        let num_memtables = self.inner.stream_ctx.input.num_memtables();
207        for part_range in partition_ranges {
208            let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
209            for row_group_index in &range_meta.row_group_indices {
210                if row_group_index.index >= num_memtables {
211                    let file_index = row_group_index.index - num_memtables;
212                    if file_index < self.inner.file_entries.len() {
213                        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
214                        entry.remaining_ranges += 1;
215                    }
216                }
217            }
218        }
219    }
220
221    /// Gets or creates the FileRangeBuilder for a file, builds ranges,
222    /// and decrements ref count (cleans up if zero).
223    ///
224    /// Callers should invoke [add_partition_ranges()](Pruner::add_partition_ranges()) to initialize the
225    /// file entries and ref counts.
226    pub async fn build_file_ranges(
227        &self,
228        index: RowGroupIndex,
229        partition_metrics: &PartitionMetrics,
230        reader_metrics: &mut ReaderMetrics,
231    ) -> Result<SmallVec<[FileRange; 2]>> {
232        let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
233
234        // Get builder (from cache or by pruning)
235        let builder = self
236            .get_file_builder(file_index, partition_metrics, reader_metrics)
237            .await?;
238
239        // Build ranges
240        let mut ranges = SmallVec::new();
241        builder.build_ranges(index.row_group_index, &mut ranges);
242
243        // Decrement ref count and cleanup if needed
244        self.decrement_and_maybe_clear(file_index, reader_metrics);
245
246        Ok(ranges)
247    }
248
249    /// Gets or creates the FileRangeBuilder for a file.
250    async fn get_file_builder(
251        &self,
252        file_index: usize,
253        partition_metrics: &PartitionMetrics,
254        reader_metrics: &mut ReaderMetrics,
255    ) -> Result<Arc<FileRangeBuilder>> {
256        // Fast path: checks cache
257        {
258            let entry = self.inner.file_entries[file_index].lock().unwrap();
259            if let Some(builder) = &entry.builder {
260                reader_metrics.filter_metrics.pruner_cache_hit += 1;
261                return Ok(builder.clone());
262            }
263        }
264
265        reader_metrics.filter_metrics.pruner_cache_miss += 1;
266        let prune_start = Instant::now();
267        let file = &self.inner.stream_ctx.input.files[file_index];
268        let file_id = file.file_id().file_id();
269        let worker_idx = self.get_worker_idx(file_id);
270
271        let (response_tx, response_rx) = oneshot::channel();
272        let request = PruneRequest {
273            file_index,
274            response_tx: Some(response_tx),
275            partition_metrics: Some(partition_metrics.clone()),
276        };
277
278        let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
279            common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
280            // Worker channel closed, falls back to direct pruning
281            self.prune_file_directly(file_index, reader_metrics).await
282        } else {
283            // Waits for response
284            match response_rx.await {
285                Ok(result) => result,
286                Err(_) => {
287                    common_telemetry::warn!(
288                        "Response channel closed, falling back to direct pruning"
289                    );
290                    // Channel closed, falls back to direct pruning
291                    self.prune_file_directly(file_index, reader_metrics).await
292                }
293            }
294        };
295        reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
296        result
297    }
298
299    /// Gets or creates the FileRangeBuilder for a file.
300    pub fn get_file_builder_background(
301        &self,
302        file_index: usize,
303        partition_metrics: Option<PartitionMetrics>,
304    ) {
305        // Fast path: checks cache
306        {
307            let entry = self.inner.file_entries[file_index].lock().unwrap();
308            if entry.builder.is_some() {
309                return;
310            }
311        }
312
313        let file = &self.inner.stream_ctx.input.files[file_index];
314        let file_id = file.file_id().file_id();
315        let worker_idx = self.get_worker_idx(file_id);
316
317        let request = PruneRequest {
318            file_index,
319            response_tx: None,
320            partition_metrics,
321        };
322
323        // Sends request to worker
324        let _ = self.worker_senders[worker_idx].try_send(request);
325    }
326
327    fn get_worker_idx(&self, file_id: FileId) -> usize {
328        let file_id_hash = Uuid::from(file_id).as_u128() as usize;
329        file_id_hash % self.inner.num_workers
330    }
331
332    /// Prunes a file directly without going through a worker.
333    /// Used as fallback when worker channels are closed.
334    async fn prune_file_directly(
335        &self,
336        file_index: usize,
337        reader_metrics: &mut ReaderMetrics,
338    ) -> Result<Arc<FileRangeBuilder>> {
339        let file = &self.inner.stream_ctx.input.files[file_index];
340        let builder = self
341            .inner
342            .stream_ctx
343            .input
344            .prune_file(file, reader_metrics)
345            .await?;
346
347        let arc_builder = Arc::new(builder);
348
349        // Caches the builder
350        {
351            let mut entry = self.inner.file_entries[file_index].lock().unwrap();
352            if entry.builder.is_none() {
353                reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
354                reader_metrics.num_range_builders += 1;
355                entry.builder = Some(arc_builder.clone());
356                PRUNER_ACTIVE_BUILDERS.inc();
357            }
358        }
359
360        Ok(arc_builder)
361    }
362
363    /// Decrements ref count and clears builder if no longer needed.
364    fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
365        let mut entry = self.inner.file_entries[file_index].lock().unwrap();
366        entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
367
368        if entry.remaining_ranges == 0
369            && let Some(builder) = entry.builder.take()
370        {
371            PRUNER_ACTIVE_BUILDERS.dec();
372            reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
373            reader_metrics.num_range_builders -= 1;
374        }
375    }
376
377    /// Worker loop that processes prune requests.
378    async fn worker_loop(
379        worker_id: usize,
380        mut rx: mpsc::Receiver<PruneRequest>,
381        inner: Arc<PrunerInner>,
382    ) {
383        let mut worker_cache_hit = 0;
384        let mut worker_cache_miss = 0;
385        let mut pruned_files = Vec::new();
386
387        while let Some(request) = rx.recv().await {
388            let PruneRequest {
389                file_index,
390                response_tx,
391                partition_metrics,
392            } = request;
393
394            // Check if already cached or in-progress
395            {
396                let entry = inner.file_entries[file_index].lock().unwrap();
397
398                if let Some(builder) = &entry.builder {
399                    // Cache hit - send immediately
400                    if let Some(response_tx) = response_tx {
401                        let _ = response_tx.send(Ok(builder.clone()));
402                    }
403                    worker_cache_hit += 1;
404                    continue;
405                }
406            }
407            worker_cache_miss += 1;
408
409            // Do the actual pruning (outside lock)
410            let file = &inner.stream_ctx.input.files[file_index];
411            pruned_files.push(file.file_id().file_id());
412            let mut metrics = ReaderMetrics::default();
413            let result = inner.stream_ctx.input.prune_file(file, &mut metrics).await;
414
415            // Update state and notify waiters
416            let mut entry = inner.file_entries[file_index].lock().unwrap();
417            match result {
418                Ok(builder) => {
419                    let arc_builder = Arc::new(builder);
420                    entry.builder = Some(arc_builder.clone());
421                    PRUNER_ACTIVE_BUILDERS.inc();
422
423                    // Notify all waiters
424                    for waiter in entry.waiters.drain(..) {
425                        let _ = waiter.send(Ok(arc_builder.clone()));
426                    }
427                    if let Some(response_tx) = response_tx {
428                        let _ = response_tx.send(Ok(arc_builder));
429                    }
430
431                    debug!(
432                        "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
433                        worker_id,
434                        file_index,
435                        file.file_id(),
436                        metrics
437                    );
438
439                    // Merge metrics to partition if provided
440                    if let Some(part_metrics) = &partition_metrics {
441                        let per_file_metrics = if part_metrics.explain_verbose() {
442                            let file_id = file.file_id();
443                            let mut map = HashMap::new();
444                            map.insert(
445                                file_id,
446                                FileScanMetrics {
447                                    build_part_cost: metrics.build_cost,
448                                    ..Default::default()
449                                },
450                            );
451                            Some(map)
452                        } else {
453                            None
454                        };
455                        part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
456                    }
457                }
458                Err(e) => {
459                    let arc_error = Arc::new(e);
460                    for waiter in entry.waiters.drain(..) {
461                        let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
462                    }
463                    if let Some(response_tx) = response_tx {
464                        let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
465                    }
466                }
467            }
468        }
469
470        common_telemetry::debug!(
471            "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
472            worker_id,
473            worker_cache_hit,
474            worker_cache_miss,
475            pruned_files,
476        );
477    }
478}