1use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::FileRange;
36use crate::sst::parquet::reader::ReaderMetrics;
37
38const PREFETCH_COUNT: usize = 8;
40
41pub struct PartitionPruner {
43 pruner: Arc<Pruner>,
44 file_indices: Vec<usize>,
46 current_position: AtomicUsize,
48}
49
50impl PartitionPruner {
51 pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
53 let mut file_indices = Vec::with_capacity(pruner.inner.stream_ctx.input.num_files());
54 let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
55
56 let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
57 for part_range in partition_ranges {
58 let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
59 for row_group_index in &range_meta.row_group_indices {
60 if pruner
61 .inner
62 .stream_ctx
63 .is_file_range_index(*row_group_index)
64 {
65 let file_index = row_group_index.index - num_memtables;
66 if dedup_set.contains(&file_index) {
67 continue;
68 } else {
69 file_indices.push(file_index);
70 dedup_set.insert(file_index);
71 }
72 }
73 }
74 }
75
76 Self {
77 pruner,
78 file_indices,
79 current_position: AtomicUsize::new(0),
80 }
81 }
82
83 pub async fn build_file_ranges(
88 &self,
89 index: RowGroupIndex,
90 partition_metrics: &PartitionMetrics,
91 reader_metrics: &mut ReaderMetrics,
92 ) -> Result<SmallVec<[FileRange; 2]>> {
93 let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
94
95 let ranges = self
97 .pruner
98 .build_file_ranges(index, partition_metrics, reader_metrics)
99 .await?;
100
101 if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
103 let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
104 if pos > prev_pos || prev_pos == 0 {
105 self.prefetch_upcoming_files(pos, partition_metrics);
106 }
107 }
108
109 Ok(ranges)
110 }
111
112 fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
114 let start = current_pos + 1;
115 let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
116
117 for i in start..end {
118 self.pruner
119 .get_file_builder_background(self.file_indices[i], Some(partition_metrics.clone()));
120 }
121 }
122}
123
124pub struct Pruner {
126 worker_senders: Vec<mpsc::Sender<PruneRequest>>,
128 inner: Arc<PrunerInner>,
129}
130
131struct PrunerInner {
132 num_workers: usize,
134 file_entries: Vec<Mutex<FileBuilderEntry>>,
136 stream_ctx: Arc<StreamContext>,
138}
139
140struct FileBuilderEntry {
142 builder: Option<Arc<FileRangeBuilder>>,
144 remaining_ranges: usize,
147 waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
149}
150
151struct PruneRequest {
153 file_index: usize,
155 response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
157 partition_metrics: Option<PartitionMetrics>,
159}
160
161impl Pruner {
162 pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
167 let num_files = stream_ctx.input.num_files();
168 let file_entries: Vec<_> = (0..num_files)
169 .map(|_| {
170 Mutex::new(FileBuilderEntry {
171 builder: None,
172 remaining_ranges: 0,
173 waiters: Vec::new(),
174 })
175 })
176 .collect();
177
178 let mut worker_senders = Vec::with_capacity(num_workers);
180 let mut receivers = Vec::with_capacity(num_workers);
181 for _ in 0..num_workers {
182 let (tx, rx) = mpsc::channel::<PruneRequest>(64);
183 worker_senders.push(tx);
184 receivers.push(rx);
185 }
186
187 let inner = Arc::new(PrunerInner {
188 num_workers,
189 file_entries,
190 stream_ctx,
191 });
192
193 for (worker_id, rx) in receivers.into_iter().enumerate() {
195 let inner_clone = inner.clone();
196 common_runtime::spawn_global(async move {
197 Self::worker_loop(worker_id, rx, inner_clone).await;
198 });
199 }
200
201 Self {
202 worker_senders,
203 inner,
204 }
205 }
206
207 pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
209 let num_memtables = self.inner.stream_ctx.input.num_memtables();
211 for part_range in partition_ranges {
212 let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
213 for row_group_index in &range_meta.row_group_indices {
214 if self.inner.stream_ctx.is_file_range_index(*row_group_index) {
215 let file_index = row_group_index.index - num_memtables;
216 if file_index < self.inner.file_entries.len() {
217 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
218 entry.remaining_ranges += 1;
219 }
220 }
221 }
222 }
223 }
224
225 pub async fn build_file_ranges(
231 &self,
232 index: RowGroupIndex,
233 partition_metrics: &PartitionMetrics,
234 reader_metrics: &mut ReaderMetrics,
235 ) -> Result<SmallVec<[FileRange; 2]>> {
236 let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
237
238 let builder = self
240 .get_file_builder(file_index, partition_metrics, reader_metrics)
241 .await?;
242
243 let mut ranges = SmallVec::new();
245 builder.build_ranges(index.row_group_index, &mut ranges);
246
247 self.decrement_and_maybe_clear(file_index, reader_metrics);
249
250 Ok(ranges)
251 }
252
253 async fn get_file_builder(
255 &self,
256 file_index: usize,
257 partition_metrics: &PartitionMetrics,
258 reader_metrics: &mut ReaderMetrics,
259 ) -> Result<Arc<FileRangeBuilder>> {
260 {
262 let entry = self.inner.file_entries[file_index].lock().unwrap();
263 if let Some(builder) = &entry.builder {
264 reader_metrics.filter_metrics.pruner_cache_hit += 1;
265 return Ok(builder.clone());
266 }
267 }
268
269 reader_metrics.filter_metrics.pruner_cache_miss += 1;
270 let prune_start = Instant::now();
271 let file = &self.inner.stream_ctx.input.files[file_index];
272 let file_id = file.file_id().file_id();
273 let worker_idx = self.get_worker_idx(file_id);
274
275 let (response_tx, response_rx) = oneshot::channel();
276 let request = PruneRequest {
277 file_index,
278 response_tx: Some(response_tx),
279 partition_metrics: Some(partition_metrics.clone()),
280 };
281
282 let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
283 common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
284 self.prune_file_directly(file_index, reader_metrics).await
286 } else {
287 match response_rx.await {
289 Ok(result) => result,
290 Err(_) => {
291 common_telemetry::warn!(
292 "Response channel closed, falling back to direct pruning"
293 );
294 self.prune_file_directly(file_index, reader_metrics).await
296 }
297 }
298 };
299 reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
300 result
301 }
302
303 pub fn get_file_builder_background(
305 &self,
306 file_index: usize,
307 partition_metrics: Option<PartitionMetrics>,
308 ) {
309 {
311 let entry = self.inner.file_entries[file_index].lock().unwrap();
312 if entry.builder.is_some() {
313 return;
314 }
315 }
316
317 let file = &self.inner.stream_ctx.input.files[file_index];
318 let file_id = file.file_id().file_id();
319 let worker_idx = self.get_worker_idx(file_id);
320
321 let request = PruneRequest {
322 file_index,
323 response_tx: None,
324 partition_metrics,
325 };
326
327 let _ = self.worker_senders[worker_idx].try_send(request);
329 }
330
331 fn get_worker_idx(&self, file_id: FileId) -> usize {
332 let file_id_hash = Uuid::from(file_id).as_u128() as usize;
333 file_id_hash % self.inner.num_workers
334 }
335
336 async fn prune_file_directly(
339 &self,
340 file_index: usize,
341 reader_metrics: &mut ReaderMetrics,
342 ) -> Result<Arc<FileRangeBuilder>> {
343 let file = &self.inner.stream_ctx.input.files[file_index];
344 let builder = self
345 .inner
346 .stream_ctx
347 .input
348 .prune_file(file, reader_metrics)
349 .await?;
350
351 let arc_builder = Arc::new(builder);
352
353 {
355 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
356 if entry.builder.is_none() {
357 reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
358 reader_metrics.num_range_builders += 1;
359 entry.builder = Some(arc_builder.clone());
360 PRUNER_ACTIVE_BUILDERS.inc();
361 }
362 }
363
364 Ok(arc_builder)
365 }
366
367 fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
369 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
370 entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
371
372 if entry.remaining_ranges == 0
373 && let Some(builder) = entry.builder.take()
374 {
375 PRUNER_ACTIVE_BUILDERS.dec();
376 reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
377 reader_metrics.num_range_builders -= 1;
378 }
379 }
380
381 async fn worker_loop(
383 worker_id: usize,
384 mut rx: mpsc::Receiver<PruneRequest>,
385 inner: Arc<PrunerInner>,
386 ) {
387 let mut worker_cache_hit = 0;
388 let mut worker_cache_miss = 0;
389 let mut pruned_files = Vec::new();
390
391 while let Some(request) = rx.recv().await {
392 let PruneRequest {
393 file_index,
394 response_tx,
395 partition_metrics,
396 } = request;
397
398 {
400 let entry = inner.file_entries[file_index].lock().unwrap();
401
402 if let Some(builder) = &entry.builder {
403 if let Some(response_tx) = response_tx {
405 let _ = response_tx.send(Ok(builder.clone()));
406 }
407 worker_cache_hit += 1;
408 continue;
409 }
410 }
411 worker_cache_miss += 1;
412
413 let file = &inner.stream_ctx.input.files[file_index];
415 pruned_files.push(file.file_id().file_id());
416 let mut metrics = ReaderMetrics::default();
417 let result = inner.stream_ctx.input.prune_file(file, &mut metrics).await;
418
419 let mut entry = inner.file_entries[file_index].lock().unwrap();
421 match result {
422 Ok(builder) => {
423 let arc_builder = Arc::new(builder);
424 entry.builder = Some(arc_builder.clone());
425 PRUNER_ACTIVE_BUILDERS.inc();
426
427 for waiter in entry.waiters.drain(..) {
429 let _ = waiter.send(Ok(arc_builder.clone()));
430 }
431 if let Some(response_tx) = response_tx {
432 let _ = response_tx.send(Ok(arc_builder));
433 }
434
435 debug!(
436 "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
437 worker_id,
438 file_index,
439 file.file_id(),
440 metrics
441 );
442
443 if let Some(part_metrics) = &partition_metrics {
445 let per_file_metrics = if part_metrics.explain_verbose() {
446 let file_id = file.file_id();
447 let mut map = HashMap::new();
448 map.insert(
449 file_id,
450 FileScanMetrics {
451 build_part_cost: metrics.build_cost,
452 ..Default::default()
453 },
454 );
455 Some(map)
456 } else {
457 None
458 };
459 part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
460 }
461 }
462 Err(e) => {
463 let arc_error = Arc::new(e);
464 for waiter in entry.waiters.drain(..) {
465 let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
466 }
467 if let Some(response_tx) = response_tx {
468 let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
469 }
470 }
471 }
472 }
473
474 common_telemetry::debug!(
475 "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
476 worker_id,
477 worker_cache_hit,
478 worker_cache_miss,
479 pruned_files,
480 );
481 }
482}