1use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::time::Instant;
21
22use common_telemetry::debug;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::region_engine::PartitionRange;
26use store_api::storage::FileId;
27use tokio::sync::{mpsc, oneshot};
28use uuid::Uuid;
29
30use crate::error::{PruneFileSnafu, Result};
31use crate::metrics::PRUNER_ACTIVE_BUILDERS;
32use crate::read::range::{FileRangeBuilder, RowGroupIndex};
33use crate::read::scan_region::StreamContext;
34use crate::read::scan_util::{FileScanMetrics, PartitionMetrics};
35use crate::sst::parquet::file_range::FileRange;
36use crate::sst::parquet::reader::ReaderMetrics;
37
38const PREFETCH_COUNT: usize = 8;
40
41pub struct PartitionPruner {
43 pruner: Arc<Pruner>,
44 file_indices: Vec<usize>,
46 current_position: AtomicUsize,
48}
49
50impl PartitionPruner {
51 pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
53 let mut file_indices = Vec::with_capacity(pruner.inner.stream_ctx.input.num_files());
54 let mut dedup_set = HashSet::with_capacity(pruner.inner.stream_ctx.input.num_files());
55
56 let num_memtables = pruner.inner.stream_ctx.input.num_memtables();
57 for part_range in partition_ranges {
58 let range_meta = &pruner.inner.stream_ctx.ranges[part_range.identifier];
59 for row_group_index in &range_meta.row_group_indices {
60 if row_group_index.index >= num_memtables {
61 let file_index = row_group_index.index - num_memtables;
62 if dedup_set.contains(&file_index) {
63 continue;
64 } else {
65 file_indices.push(file_index);
66 dedup_set.insert(file_index);
67 }
68 }
69 }
70 }
71
72 Self {
73 pruner,
74 file_indices,
75 current_position: AtomicUsize::new(0),
76 }
77 }
78
79 pub async fn build_file_ranges(
84 &self,
85 index: RowGroupIndex,
86 partition_metrics: &PartitionMetrics,
87 reader_metrics: &mut ReaderMetrics,
88 ) -> Result<SmallVec<[FileRange; 2]>> {
89 let file_index = index.index - self.pruner.inner.stream_ctx.input.num_memtables();
90
91 let ranges = self
93 .pruner
94 .build_file_ranges(index, partition_metrics, reader_metrics)
95 .await?;
96
97 if let Some(pos) = self.file_indices.iter().position(|&idx| idx == file_index) {
99 let prev_pos = self.current_position.fetch_max(pos, Ordering::Relaxed);
100 if pos > prev_pos || prev_pos == 0 {
101 self.prefetch_upcoming_files(pos, partition_metrics);
102 }
103 }
104
105 Ok(ranges)
106 }
107
108 fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
110 let start = current_pos + 1;
111 let end = (start + PREFETCH_COUNT).min(self.file_indices.len());
112
113 for i in start..end {
114 self.pruner
115 .get_file_builder_background(self.file_indices[i], Some(partition_metrics.clone()));
116 }
117 }
118}
119
120pub struct Pruner {
122 worker_senders: Vec<mpsc::Sender<PruneRequest>>,
124 inner: Arc<PrunerInner>,
125}
126
127struct PrunerInner {
128 num_workers: usize,
130 file_entries: Vec<Mutex<FileBuilderEntry>>,
132 stream_ctx: Arc<StreamContext>,
134}
135
136struct FileBuilderEntry {
138 builder: Option<Arc<FileRangeBuilder>>,
140 remaining_ranges: usize,
143 waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
145}
146
147struct PruneRequest {
149 file_index: usize,
151 response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
153 partition_metrics: Option<PartitionMetrics>,
155}
156
157impl Pruner {
158 pub fn new(stream_ctx: Arc<StreamContext>, num_workers: usize) -> Self {
163 let num_files = stream_ctx.input.num_files();
164 let file_entries: Vec<_> = (0..num_files)
165 .map(|_| {
166 Mutex::new(FileBuilderEntry {
167 builder: None,
168 remaining_ranges: 0,
169 waiters: Vec::new(),
170 })
171 })
172 .collect();
173
174 let mut worker_senders = Vec::with_capacity(num_workers);
176 let mut receivers = Vec::with_capacity(num_workers);
177 for _ in 0..num_workers {
178 let (tx, rx) = mpsc::channel::<PruneRequest>(64);
179 worker_senders.push(tx);
180 receivers.push(rx);
181 }
182
183 let inner = Arc::new(PrunerInner {
184 num_workers,
185 file_entries,
186 stream_ctx,
187 });
188
189 for (worker_id, rx) in receivers.into_iter().enumerate() {
191 let inner_clone = inner.clone();
192 common_runtime::spawn_global(async move {
193 Self::worker_loop(worker_id, rx, inner_clone).await;
194 });
195 }
196
197 Self {
198 worker_senders,
199 inner,
200 }
201 }
202
203 pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
205 let num_memtables = self.inner.stream_ctx.input.num_memtables();
207 for part_range in partition_ranges {
208 let range_meta = &self.inner.stream_ctx.ranges[part_range.identifier];
209 for row_group_index in &range_meta.row_group_indices {
210 if row_group_index.index >= num_memtables {
211 let file_index = row_group_index.index - num_memtables;
212 if file_index < self.inner.file_entries.len() {
213 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
214 entry.remaining_ranges += 1;
215 }
216 }
217 }
218 }
219 }
220
221 pub async fn build_file_ranges(
227 &self,
228 index: RowGroupIndex,
229 partition_metrics: &PartitionMetrics,
230 reader_metrics: &mut ReaderMetrics,
231 ) -> Result<SmallVec<[FileRange; 2]>> {
232 let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
233
234 let builder = self
236 .get_file_builder(file_index, partition_metrics, reader_metrics)
237 .await?;
238
239 let mut ranges = SmallVec::new();
241 builder.build_ranges(index.row_group_index, &mut ranges);
242
243 self.decrement_and_maybe_clear(file_index, reader_metrics);
245
246 Ok(ranges)
247 }
248
249 async fn get_file_builder(
251 &self,
252 file_index: usize,
253 partition_metrics: &PartitionMetrics,
254 reader_metrics: &mut ReaderMetrics,
255 ) -> Result<Arc<FileRangeBuilder>> {
256 {
258 let entry = self.inner.file_entries[file_index].lock().unwrap();
259 if let Some(builder) = &entry.builder {
260 reader_metrics.filter_metrics.pruner_cache_hit += 1;
261 return Ok(builder.clone());
262 }
263 }
264
265 reader_metrics.filter_metrics.pruner_cache_miss += 1;
266 let prune_start = Instant::now();
267 let file = &self.inner.stream_ctx.input.files[file_index];
268 let file_id = file.file_id().file_id();
269 let worker_idx = self.get_worker_idx(file_id);
270
271 let (response_tx, response_rx) = oneshot::channel();
272 let request = PruneRequest {
273 file_index,
274 response_tx: Some(response_tx),
275 partition_metrics: Some(partition_metrics.clone()),
276 };
277
278 let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
279 common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
280 self.prune_file_directly(file_index, reader_metrics).await
282 } else {
283 match response_rx.await {
285 Ok(result) => result,
286 Err(_) => {
287 common_telemetry::warn!(
288 "Response channel closed, falling back to direct pruning"
289 );
290 self.prune_file_directly(file_index, reader_metrics).await
292 }
293 }
294 };
295 reader_metrics.filter_metrics.pruner_prune_cost += prune_start.elapsed();
296 result
297 }
298
299 pub fn get_file_builder_background(
301 &self,
302 file_index: usize,
303 partition_metrics: Option<PartitionMetrics>,
304 ) {
305 {
307 let entry = self.inner.file_entries[file_index].lock().unwrap();
308 if entry.builder.is_some() {
309 return;
310 }
311 }
312
313 let file = &self.inner.stream_ctx.input.files[file_index];
314 let file_id = file.file_id().file_id();
315 let worker_idx = self.get_worker_idx(file_id);
316
317 let request = PruneRequest {
318 file_index,
319 response_tx: None,
320 partition_metrics,
321 };
322
323 let _ = self.worker_senders[worker_idx].try_send(request);
325 }
326
327 fn get_worker_idx(&self, file_id: FileId) -> usize {
328 let file_id_hash = Uuid::from(file_id).as_u128() as usize;
329 file_id_hash % self.inner.num_workers
330 }
331
332 async fn prune_file_directly(
335 &self,
336 file_index: usize,
337 reader_metrics: &mut ReaderMetrics,
338 ) -> Result<Arc<FileRangeBuilder>> {
339 let file = &self.inner.stream_ctx.input.files[file_index];
340 let builder = self
341 .inner
342 .stream_ctx
343 .input
344 .prune_file(file, reader_metrics)
345 .await?;
346
347 let arc_builder = Arc::new(builder);
348
349 {
351 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
352 if entry.builder.is_none() {
353 reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
354 reader_metrics.num_range_builders += 1;
355 entry.builder = Some(arc_builder.clone());
356 PRUNER_ACTIVE_BUILDERS.inc();
357 }
358 }
359
360 Ok(arc_builder)
361 }
362
363 fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
365 let mut entry = self.inner.file_entries[file_index].lock().unwrap();
366 entry.remaining_ranges = entry.remaining_ranges.saturating_sub(1);
367
368 if entry.remaining_ranges == 0
369 && let Some(builder) = entry.builder.take()
370 {
371 PRUNER_ACTIVE_BUILDERS.dec();
372 reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
373 reader_metrics.num_range_builders -= 1;
374 }
375 }
376
377 async fn worker_loop(
379 worker_id: usize,
380 mut rx: mpsc::Receiver<PruneRequest>,
381 inner: Arc<PrunerInner>,
382 ) {
383 let mut worker_cache_hit = 0;
384 let mut worker_cache_miss = 0;
385 let mut pruned_files = Vec::new();
386
387 while let Some(request) = rx.recv().await {
388 let PruneRequest {
389 file_index,
390 response_tx,
391 partition_metrics,
392 } = request;
393
394 {
396 let entry = inner.file_entries[file_index].lock().unwrap();
397
398 if let Some(builder) = &entry.builder {
399 if let Some(response_tx) = response_tx {
401 let _ = response_tx.send(Ok(builder.clone()));
402 }
403 worker_cache_hit += 1;
404 continue;
405 }
406 }
407 worker_cache_miss += 1;
408
409 let file = &inner.stream_ctx.input.files[file_index];
411 pruned_files.push(file.file_id().file_id());
412 let mut metrics = ReaderMetrics::default();
413 let result = inner.stream_ctx.input.prune_file(file, &mut metrics).await;
414
415 let mut entry = inner.file_entries[file_index].lock().unwrap();
417 match result {
418 Ok(builder) => {
419 let arc_builder = Arc::new(builder);
420 entry.builder = Some(arc_builder.clone());
421 PRUNER_ACTIVE_BUILDERS.inc();
422
423 for waiter in entry.waiters.drain(..) {
425 let _ = waiter.send(Ok(arc_builder.clone()));
426 }
427 if let Some(response_tx) = response_tx {
428 let _ = response_tx.send(Ok(arc_builder));
429 }
430
431 debug!(
432 "Pruner worker {} pruned file_index: {}, file: {:?}, metrics: {:?}",
433 worker_id,
434 file_index,
435 file.file_id(),
436 metrics
437 );
438
439 if let Some(part_metrics) = &partition_metrics {
441 let per_file_metrics = if part_metrics.explain_verbose() {
442 let file_id = file.file_id();
443 let mut map = HashMap::new();
444 map.insert(
445 file_id,
446 FileScanMetrics {
447 build_part_cost: metrics.build_cost,
448 ..Default::default()
449 },
450 );
451 Some(map)
452 } else {
453 None
454 };
455 part_metrics.merge_reader_metrics(&metrics, per_file_metrics.as_ref());
456 }
457 }
458 Err(e) => {
459 let arc_error = Arc::new(e);
460 for waiter in entry.waiters.drain(..) {
461 let _ = waiter.send(Err(arc_error.clone()).context(PruneFileSnafu));
462 }
463 if let Some(response_tx) = response_tx {
464 let _ = response_tx.send(Err(arc_error).context(PruneFileSnafu));
465 }
466 }
467 }
468 }
469
470 common_telemetry::debug!(
471 "Pruner worker {} finished, cache_hit: {}, cache_miss: {}, files: {:?}",
472 worker_id,
473 worker_cache_hit,
474 worker_cache_miss,
475 pruned_files,
476 );
477 }
478}