1use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
36use crate::sst::parquet::reader::ReaderMetrics;
37
38const PREFETCH_COUNT: usize = 8;
40
41pub struct PartitionPruner {
43 pruner: Arc<Pruner>,
44 file_indices: Vec<usize>,
46 pre_filter_modes: Vec<PreFilterMode>,
48 current_position: AtomicUsize,
50}
51
52impl PartitionPruner {
53 pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
55 let num_files = pruner.inner.stream_ctx.input.num_files();
56 let mut file_indices = Vec::with_capacity(num_files);
57 let mut pre_filter_modes = vec![PreFilterMode::SkipFields; num_files];
58 let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
59
60 let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
61 for part_range in partition_ranges {
62 let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
63 let pre_filter_mode = pruner.inner.stream_ctx.range_pre_filter_mode(part_range);
64 for row_group_index in &range_meta.row_group_indices {
65 if pruner
66 .inner
67 .stream_ctx
68 .is_file_range_index(*row_group_index)
69 {
70 let file_index = row_group_index.index - num_memtables;
71 if dedup_set.contains(&file_index) {
72 continue;
73 } else {
74 file_indices.push(file_index);
75 pre_filter_modes[file_index] = pre_filter_mode;
76 dedup_set.insert(file_index);
77 }
78 }
79 }
80 }
81
82 Self {
83 pruner,
84 file_indices,
85 pre_filter_modes,
86 current_position: AtomicUsize::new(0),
87 }
88 }
89
90 pub async fn build_file_ranges(
95 &self,
96 index: RowGroupIndex,
97 partition_metrics: &PartitionMetrics,
98 reader_metrics: &mut ReaderMetrics,
99 ) -> Result<SmallVec<[FileRange; 2]>> {
100 let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
101 let pre_filter_mode = self.pre_filter_mode(file_index);
102
103 let ranges = self
105 .pruner
106 .build_file_ranges(index, pre_filter_mode, partition_metrics, reader_metrics)
107 .await?;
108
109 if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
111 let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
112 if pos > prev_pos || prev_pos == 0 {
113 self.prefetch_upcoming_files(pos, partition_metrics);
114 }
115 }
116
117 Ok(ranges)
118 }
119
120 fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
122 let start = current_pos + 1;
123 let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
124
125 for i in start..end {
126 let file_index = self.file_indices[i];
127 let pre_filter_mode = self.pre_filter_mode(file_index);
128 self.pruner.get_file_builder_background(
129 file_index,
130 pre_filter_mode,
131 Some(partition_metrics.clone()),
132 );
133 }
134 }
135
136 fn pre_filter_mode(&self, file_index: usize) -> PreFilterMode {
137 self.pre_filter_modes
138 .get(file_index)
139 .copied()
140 .unwrap_or(PreFilterMode::SkipFields)
141 }
142}
143
144pub struct Pruner {
146 worker_senders: Vec<mpsc::Sender<PruneRequest>>,
148 inner: Arc<PrunerInner>,
149}
150
151struct PrunerInner {
152 num_workers: usize,
154 file_entries: Vec<Mutex<FileBuilderEntry>>,
156 stream_ctx: Arc<StreamContext>,
158}
159
160struct FileBuilderEntry {
162 builder: Option<Arc<FileRangeBuilder>>,
164 remaining_ranges: usize,
167 waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
169}
170
171struct PruneRequest {
173 file_index: usize,
175 pre_filter_mode: PreFilterMode,
177 response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
179 partition_metrics: Option<PartitionMetrics>,
181}
182
183impl Pruner {
184 pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
189 let num_files = stream_ctx.input.num_files();
190 let file_entries: Vec<_> = (0..num_files)
191 .map(|_| {
192 Mutex::new(FileBuilderEntry {
193 builder: None,
194 remaining_ranges: 0,
195 waiters: Vec::new(),
196 })
197 })
198 .collect();
199 let mut worker_senders = Vec::with_capacity(num_workers);
201 let mut receivers = Vec::with_capacity(num_workers);
202 for _ in 0..num_workers {
203 let (tx, rx) = mpsc::channel::<PruneRequest>(64);
204 worker_senders.push(tx);
205 receivers.push(rx);
206 }
207
208 let inner = Arc::new(PrunerInner {
209 num_workers,
210 file_entries,
211 stream_ctx,
212 });
213
214 for (worker_id, rx) in receivers.into_iter().enumerate() {
216 let inner_clone = inner.clone();
217 common_runtime::spawn_global(async move {
218 Self::worker_loop(worker_id, rx, inner_clone).await;
219 });
220 }
221
222 Self {
223 worker_senders,
224 inner,
225 }
226 }
227
228 pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
230 let num_memtables = self.inner.stream_ctx.input.num_memtables();
232 for part_range in partition_ranges {
233 let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
234 for row_group_index in &range_meta.row_group_indices {
235 if self.inner.stream_ctx.is_file_range_index(*row_group_index) {
236 let file_index = row_group_index.index - num_memtables;
237 if file_index < self.inner.file_entries.len() {
238 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
239 entry.remaining_ranges += 1;
240 }
241 }
242 }
243 }
244 }
245
246 pub async fn build_file_ranges(
252 &self,
253 index: RowGroupIndex,
254 pre_filter_mode: PreFilterMode,
255 partition_metrics: &PartitionMetrics,
256 reader_metrics: &mut ReaderMetrics,
257 ) -> Result<SmallVec<[FileRange; 2]>> {
258 let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
259
260 let builder = self
262 .get_file_builder(
263 file_index,
264 pre_filter_mode,
265 partition_metrics,
266 reader_metrics,
267 )
268 .await?;
269
270 let mut ranges = SmallVec::new();
272 builder.build_ranges(index.row_group_index, &mut ranges);
273
274 self.decrement_and_maybe_clear(file_index, reader_metrics);
276
277 Ok(ranges)
278 }
279
280 async fn get_file_builder(
282 &self,
283 file_index: usize,
284 pre_filter_mode: PreFilterMode,
285 partition_metrics: &PartitionMetrics,
286 reader_metrics: &mut ReaderMetrics,
287 ) -> Result<Arc<FileRangeBuilder>> {
288 {
290 let entry = self.inner.file_entries[file_index].lock().unwrap();
291 if let Some(builder) = &entry.builder {
292 reader_metrics.filter_metrics.pruner_cache_hit += 1;
293 return Ok(builder.clone());
294 }
295 }
296
297 reader_metrics.filter_metrics.pruner_cache_miss += 1;
298 let prune_start = Instant::now();
299 let file = &self.inner.stream_ctx.input.files[file_index];
300 let file_id = file.file_id().file_id();
301 let worker_idx = self.get_worker_idx(file_id);
302
303 let (response_tx, response_rx) = oneshot::channel();
304 let request = PruneRequest {
305 file_index,
306 pre_filter_mode,
307 response_tx: Some(response_tx),
308 partition_metrics: Some(partition_metrics.clone()),
309 };
310
311 let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
312 common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
313 self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
315 .await
316 } else {
317 match response_rx.await {
319 Ok(result) => result,
320 Err(_) => {
321 common_telemetry::warn!(
322 "Response channel closed, falling back to direct pruning"
323 );
324 self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
326 .await
327 }
328 }
329 };
330 reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
331 result
332 }
333
334 pub fn get_file_builder_background(
336 &self,
337 file_index: usize,
338 pre_filter_mode: PreFilterMode,
339 partition_metrics: Option<PartitionMetrics>,
340 ) {
341 {
343 let entry = self.inner.file_entries[file_index].lock().unwrap();
344 if entry.builder.is_some() {
345 return;
346 }
347 }
348
349 let file = &self.inner.stream_ctx.input.files[file_index];
350 let file_id = file.file_id().file_id();
351 let worker_idx = self.get_worker_idx(file_id);
352
353 let request = PruneRequest {
354 file_index,
355 pre_filter_mode,
356 response_tx: None,
357 partition_metrics,
358 };
359
360 let _ = self.worker_senders[worker_idx].try_send(request);
362 }
363
364 fn get_worker_idx(&self, file_id: FileId) -> usize {
365 let file_id_hash = Uuid::from(file_id).as_u128() as usize;
366 file_id_hash % self.inner.num_workers
367 }
368
369 async fn prune_file_directly(
372 &self,
373 file_index: usize,
374 pre_filter_mode: PreFilterMode,
375 reader_metrics: &mut ReaderMetrics,
376 ) -> Result<Arc<FileRangeBuilder>> {
377 let file = &self.inner.stream_ctx.input.files[file_index];
378 let builder = self
379 .inner
380 .stream_ctx
381 .input
382 .prune_file(file, pre_filter_mode, reader_metrics)
383 .await?;
384
385 let arc_builder = Arc::new(builder);
386
387 {
389 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
390 if entry.builder.is_none() {
391 reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
392 reader_metrics.num_range_builders += 1;
393 entry.builder = Some(arc_builder.clone());
394 PRUNER_ACTIVE_BUILDERS.inc();
395 }
396 }
397
398 Ok(arc_builder)
399 }
400
401 fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
403 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
404 entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
405
406 if entry.remaining_ranges == 0
407 && let Some(builder) = entry.builder.take()
408 {
409 PRUNER_ACTIVE_BUILDERS.dec();
410 reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
411 reader_metrics.num_range_builders -= 1;
412 }
413 }
414
415 async fn worker_loop(
417 worker_id: usize,
418 mut rx: mpsc::Receiver<PruneRequest>,
419 inner: Arc<PrunerInner>,
420 ) {
421 let mut worker_cache_hit = 0;
422 let mut worker_cache_miss = 0;
423 let mut pruned_files = Vec::new();
424
425 while let Some(request) = rx.recv().await {
426 let PruneRequest {
427 file_index,
428 pre_filter_mode,
429 response_tx,
430 partition_metrics,
431 } = request;
432
433 {
435 let entry = inner.file_entries[file_index].lock().unwrap();
436 if let Some(builder) = &entry.builder {
437 if let Some(response_tx) = response_tx {
439 let _ = response_tx.send(Ok(builder.clone()));
440 }
441 worker_cache_hit += 1;
442 continue;
443 }
444 }
445 worker_cache_miss += 1;
446
447 let file = &inner.stream_ctx.input.files[file_index];
449 pruned_files.push(file.file_id().file_id());
450 let mut metrics = ReaderMetrics::default();
451 let result = inner
452 .stream_ctx
453 .input
454 .prune_file(file, pre_filter_mode, &mut metrics)
455 .await;
456
457 let mut entry = inner.file_entries[file_index].lock().unwrap();
459 match result {
460 Ok(builder) => {
461 let arc_builder = Arc::new(builder);
462 entry.builder = Some(arc_builder.clone());
463 PRUNER_ACTIVE_BUILDERS.inc();
464
465 for waiter in entry.waiters.drain(..) {
467 let _ = waiter.send(Ok(arc_builder.clone()));
468 }
469 if let Some(response_tx) = response_tx {
470 let _ = response_tx.send(Ok(arc_builder));
471 }
472
473 debug!(
474 "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
475 worker_id,
476 file_index,
477 file.file_id(),
478 metrics
479 );
480
481 if let Some(part_metrics) = &partition_metrics {
483 let per_file_metrics = if part_metrics.explain_verbose() {
484 let file_id = file.file_id();
485 let mut map = HashMap::new();
486 map.insert(
487 file_id,
488 FileScanMetrics {
489 build_part_cost: metrics.build_cost,
490 ..Default::default()
491 },
492 );
493 Some(map)
494 } else {
495 None
496 };
497 part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
498 }
499 }
500 Err(e) => {
501 let arc_error = Arc::new(e);
502 for waiter in entry.waiters.drain(..) {
503 let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
504 }
505 if let Some(response_tx) = response_tx {
506 let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
507 }
508 }
509 }
510 }
511
512 common_telemetry::debug!(
513 "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
514 worker_id,
515 worker_cache_hit,
516 worker_cache_miss,
517 pruned_files,
518 );
519 }
520}