1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
52use crate::sst::parquet::file_range::FileRange;
53use crate::sst::parquet::flat_format::time_index_column_index;
54use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
55use crate::sst::parquet::row_group::ParquetFetchMetrics;
56
57#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60 pub num_ranges: usize,
62 pub num_rows: usize,
64 pub build_part_cost: Duration,
66 pub build_reader_cost: Duration,
68 pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76 if self.num_ranges > 0 {
77 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78 }
79 if self.num_rows > 0 {
80 write!(f, ", \"num_rows\":{}", self.num_rows)?;
81 }
82 if !self.build_reader_cost.is_zero() {
83 write!(
84 f,
85 ", \"build_reader_cost\":\"{:?}\"",
86 self.build_reader_cost
87 )?;
88 }
89 if !self.scan_cost.is_zero() {
90 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91 }
92
93 write!(f, "}}")
94 }
95}
96
97impl FileScanMetrics {
98 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100 self.num_ranges += other.num_ranges;
101 self.num_rows += other.num_rows;
102 self.build_part_cost += other.build_part_cost;
103 self.build_reader_cost += other.build_reader_cost;
104 self.scan_cost += other.scan_cost;
105 }
106}
107
108#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111 prepare_scan_cost: Duration,
113 build_reader_cost: Duration,
115 scan_cost: Duration,
117 yield_cost: Duration,
119 convert_cost: Option<Time>,
121 total_cost: Duration,
123 num_rows: usize,
125 num_batches: usize,
127 num_mem_ranges: usize,
129 num_file_ranges: usize,
131
132 mem_scan_cost: Duration,
135 mem_rows: usize,
137 mem_batches: usize,
139 mem_series: usize,
141
142 build_parts_cost: Duration,
145 sst_scan_cost: Duration,
147 rg_total: usize,
149 rg_fulltext_filtered: usize,
151 rg_inverted_filtered: usize,
153 rg_minmax_filtered: usize,
155 rg_bloom_filtered: usize,
157 rg_vector_filtered: usize,
159 rows_before_filter: usize,
161 rows_fulltext_filtered: usize,
163 rows_inverted_filtered: usize,
165 rows_bloom_filtered: usize,
167 rows_vector_filtered: usize,
169 rows_vector_selected: usize,
171 rows_precise_filtered: usize,
173 fulltext_index_cache_hit: usize,
175 fulltext_index_cache_miss: usize,
177 inverted_index_cache_hit: usize,
179 inverted_index_cache_miss: usize,
181 bloom_filter_cache_hit: usize,
183 bloom_filter_cache_miss: usize,
185 pruner_cache_hit: usize,
187 pruner_cache_miss: usize,
189 pruner_prune_cost: Duration,
191 num_sst_record_batches: usize,
193 num_sst_batches: usize,
195 num_sst_rows: usize,
197
198 first_poll: Duration,
200
201 num_series_send_timeout: usize,
203 num_series_send_full: usize,
205 num_distributor_rows: usize,
207 num_distributor_batches: usize,
209 distributor_scan_cost: Duration,
211 distributor_yield_cost: Duration,
213 distributor_divider_cost: Duration,
215
216 merge_metrics: MergeMetrics,
218 dedup_metrics: DedupMetrics,
220
221 stream_eof: bool,
223
224 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
227 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
229 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
231 fetch_metrics: Option<ParquetFetchMetrics>,
233 metadata_cache_metrics: Option<MetadataCacheMetrics>,
235 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
237
238 build_ranges_mem_size: isize,
240 build_ranges_peak_mem_size: isize,
242 num_range_builders: isize,
244 num_peak_range_builders: isize,
246}
247
248struct CompareCostReverse<'a> {
251 total_cost: Duration,
252 file_id: RegionFileId,
253 metrics: &'a FileScanMetrics,
254}
255
256impl Ord for CompareCostReverse<'_> {
257 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
258 other.total_cost.cmp(&self.total_cost)
260 }
261}
262
263impl PartialOrd for CompareCostReverse<'_> {
264 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
265 Some(self.cmp(other))
266 }
267}
268
269impl Eq for CompareCostReverse<'_> {}
270
271impl PartialEq for CompareCostReverse<'_> {
272 fn eq(&self, other: &Self) -> bool {
273 self.total_cost == other.total_cost
274 }
275}
276
277impl fmt::Debug for ScanMetricsSet {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 let ScanMetricsSet {
280 prepare_scan_cost,
281 build_reader_cost,
282 scan_cost,
283 yield_cost,
284 convert_cost,
285 total_cost,
286 num_rows,
287 num_batches,
288 num_mem_ranges,
289 num_file_ranges,
290 build_parts_cost,
291 sst_scan_cost,
292 rg_total,
293 rg_fulltext_filtered,
294 rg_inverted_filtered,
295 rg_minmax_filtered,
296 rg_bloom_filtered,
297 rg_vector_filtered,
298 rows_before_filter,
299 rows_fulltext_filtered,
300 rows_inverted_filtered,
301 rows_bloom_filtered,
302 rows_vector_filtered,
303 rows_vector_selected,
304 rows_precise_filtered,
305 fulltext_index_cache_hit,
306 fulltext_index_cache_miss,
307 inverted_index_cache_hit,
308 inverted_index_cache_miss,
309 bloom_filter_cache_hit,
310 bloom_filter_cache_miss,
311 pruner_cache_hit,
312 pruner_cache_miss,
313 pruner_prune_cost,
314 num_sst_record_batches,
315 num_sst_batches,
316 num_sst_rows,
317 first_poll,
318 num_series_send_timeout,
319 num_series_send_full,
320 num_distributor_rows,
321 num_distributor_batches,
322 distributor_scan_cost,
323 distributor_yield_cost,
324 distributor_divider_cost,
325 merge_metrics,
326 dedup_metrics,
327 stream_eof,
328 mem_scan_cost,
329 mem_rows,
330 mem_batches,
331 mem_series,
332 inverted_index_apply_metrics,
333 bloom_filter_apply_metrics,
334 fulltext_index_apply_metrics,
335 fetch_metrics,
336 metadata_cache_metrics,
337 per_file_metrics,
338 build_ranges_mem_size: _,
339 build_ranges_peak_mem_size,
340 num_range_builders: _,
341 num_peak_range_builders,
342 } = self;
343
344 write!(
346 f,
347 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
348 \"build_reader_cost\":\"{build_reader_cost:?}\", \
349 \"scan_cost\":\"{scan_cost:?}\", \
350 \"yield_cost\":\"{yield_cost:?}\", \
351 \"total_cost\":\"{total_cost:?}\", \
352 \"num_rows\":{num_rows}, \
353 \"num_batches\":{num_batches}, \
354 \"num_mem_ranges\":{num_mem_ranges}, \
355 \"num_file_ranges\":{num_file_ranges}, \
356 \"build_parts_cost\":\"{build_parts_cost:?}\", \
357 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
358 \"rg_total\":{rg_total}, \
359 \"rows_before_filter\":{rows_before_filter}, \
360 \"num_sst_record_batches\":{num_sst_record_batches}, \
361 \"num_sst_batches\":{num_sst_batches}, \
362 \"num_sst_rows\":{num_sst_rows}, \
363 \"first_poll\":\"{first_poll:?}\""
364 )?;
365
366 if let Some(time) = convert_cost {
368 let duration = Duration::from_nanos(time.value() as u64);
369 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
370 }
371
372 if *rg_fulltext_filtered > 0 {
374 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
375 }
376 if *rg_inverted_filtered > 0 {
377 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
378 }
379 if *rg_minmax_filtered > 0 {
380 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
381 }
382 if *rg_bloom_filtered > 0 {
383 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
384 }
385 if *rg_vector_filtered > 0 {
386 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
387 }
388 if *rows_fulltext_filtered > 0 {
389 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
390 }
391 if *rows_inverted_filtered > 0 {
392 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
393 }
394 if *rows_bloom_filtered > 0 {
395 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
396 }
397 if *rows_vector_filtered > 0 {
398 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
399 }
400 if *rows_vector_selected > 0 {
401 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
402 }
403 if *rows_precise_filtered > 0 {
404 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
405 }
406 if *fulltext_index_cache_hit > 0 {
407 write!(
408 f,
409 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
410 )?;
411 }
412 if *fulltext_index_cache_miss > 0 {
413 write!(
414 f,
415 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
416 )?;
417 }
418 if *inverted_index_cache_hit > 0 {
419 write!(
420 f,
421 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
422 )?;
423 }
424 if *inverted_index_cache_miss > 0 {
425 write!(
426 f,
427 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
428 )?;
429 }
430 if *bloom_filter_cache_hit > 0 {
431 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
432 }
433 if *bloom_filter_cache_miss > 0 {
434 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
435 }
436 if *pruner_cache_hit > 0 {
437 write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
438 }
439 if *pruner_cache_miss > 0 {
440 write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
441 }
442 if !pruner_prune_cost.is_zero() {
443 write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
444 }
445
446 if *num_series_send_timeout > 0 {
448 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
449 }
450 if *num_series_send_full > 0 {
451 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
452 }
453 if *num_distributor_rows > 0 {
454 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
455 }
456 if *num_distributor_batches > 0 {
457 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
458 }
459 if !distributor_scan_cost.is_zero() {
460 write!(
461 f,
462 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
463 )?;
464 }
465 if !distributor_yield_cost.is_zero() {
466 write!(
467 f,
468 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
469 )?;
470 }
471 if !distributor_divider_cost.is_zero() {
472 write!(
473 f,
474 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
475 )?;
476 }
477
478 if *mem_rows > 0 {
480 write!(f, ", \"mem_rows\":{mem_rows}")?;
481 }
482 if *mem_batches > 0 {
483 write!(f, ", \"mem_batches\":{mem_batches}")?;
484 }
485 if *mem_series > 0 {
486 write!(f, ", \"mem_series\":{mem_series}")?;
487 }
488 if !mem_scan_cost.is_zero() {
489 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
490 }
491
492 if let Some(metrics) = inverted_index_apply_metrics
494 && !metrics.is_empty()
495 {
496 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
497 }
498 if let Some(metrics) = bloom_filter_apply_metrics
499 && !metrics.is_empty()
500 {
501 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
502 }
503 if let Some(metrics) = fulltext_index_apply_metrics
504 && !metrics.is_empty()
505 {
506 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
507 }
508 if let Some(metrics) = fetch_metrics
509 && !metrics.is_empty()
510 {
511 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
512 }
513 if let Some(metrics) = metadata_cache_metrics
514 && !metrics.is_empty()
515 {
516 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
517 }
518
519 if !merge_metrics.scan_cost.is_zero() {
521 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
522 }
523
524 if !dedup_metrics.dedup_cost.is_zero() {
526 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
527 }
528
529 if let Some(file_metrics) = per_file_metrics
531 && !file_metrics.is_empty()
532 {
533 let mut heap = BinaryHeap::new();
535 for (file_id, metrics) in file_metrics.iter() {
536 let total_cost =
537 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
538
539 if total_cost.is_zero() && metrics.num_ranges == 0 {
542 continue;
543 }
544
545 if heap.len() < 10 {
546 heap.push(CompareCostReverse {
548 total_cost,
549 file_id: *file_id,
550 metrics,
551 });
552 } else if let Some(min_entry) = heap.peek() {
553 if total_cost > min_entry.total_cost {
555 heap.pop();
556 heap.push(CompareCostReverse {
557 total_cost,
558 file_id: *file_id,
559 metrics,
560 });
561 }
562 }
563 }
564
565 let top_files = heap.into_sorted_vec();
566 write!(f, ", \"top_file_metrics\": {{")?;
567 for (i, item) in top_files.iter().enumerate() {
568 let CompareCostReverse {
569 total_cost: _,
570 file_id,
571 metrics,
572 } = item;
573 if i > 0 {
574 write!(f, ", ")?;
575 }
576 write!(f, "\"{}\": {:?}", file_id, metrics)?;
577 }
578 write!(f, "}}")?;
579 }
580
581 write!(
582 f,
583 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
584 \"num_peak_range_builders\":{num_peak_range_builders}, \
585 \"stream_eof\":{stream_eof}}}"
586 )
587 }
588}
589impl ScanMetricsSet {
590 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
592 self.prepare_scan_cost += cost;
593 self
594 }
595
596 fn with_convert_cost(mut self, time: Time) -> Self {
598 self.convert_cost = Some(time);
599 self
600 }
601
602 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
604 let ScannerMetrics {
605 scan_cost,
606 yield_cost,
607 num_batches,
608 num_rows,
609 } = other;
610
611 self.scan_cost += *scan_cost;
612 self.yield_cost += *yield_cost;
613 self.num_rows += *num_rows;
614 self.num_batches += *num_batches;
615 }
616
617 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
619 let ReaderMetrics {
620 build_cost,
621 filter_metrics:
622 ReaderFilterMetrics {
623 rg_total,
624 rg_fulltext_filtered,
625 rg_inverted_filtered,
626 rg_minmax_filtered,
627 rg_bloom_filtered,
628 rg_vector_filtered,
629 rows_total,
630 rows_fulltext_filtered,
631 rows_inverted_filtered,
632 rows_bloom_filtered,
633 rows_vector_filtered,
634 rows_vector_selected,
635 rows_precise_filtered,
636 fulltext_index_cache_hit,
637 fulltext_index_cache_miss,
638 inverted_index_cache_hit,
639 inverted_index_cache_miss,
640 bloom_filter_cache_hit,
641 bloom_filter_cache_miss,
642 pruner_cache_hit,
643 pruner_cache_miss,
644 pruner_prune_cost,
645 inverted_index_apply_metrics,
646 bloom_filter_apply_metrics,
647 fulltext_index_apply_metrics,
648 },
649 num_record_batches,
650 num_batches,
651 num_rows,
652 scan_cost,
653 metadata_cache_metrics,
654 fetch_metrics,
655 metadata_mem_size,
656 num_range_builders,
657 } = other;
658
659 self.build_parts_cost += *build_cost;
660 self.sst_scan_cost += *scan_cost;
661
662 self.rg_total += *rg_total;
663 self.rg_fulltext_filtered += *rg_fulltext_filtered;
664 self.rg_inverted_filtered += *rg_inverted_filtered;
665 self.rg_minmax_filtered += *rg_minmax_filtered;
666 self.rg_bloom_filtered += *rg_bloom_filtered;
667 self.rg_vector_filtered += *rg_vector_filtered;
668
669 self.rows_before_filter += *rows_total;
670 self.rows_fulltext_filtered += *rows_fulltext_filtered;
671 self.rows_inverted_filtered += *rows_inverted_filtered;
672 self.rows_bloom_filtered += *rows_bloom_filtered;
673 self.rows_vector_filtered += *rows_vector_filtered;
674 self.rows_vector_selected += *rows_vector_selected;
675 self.rows_precise_filtered += *rows_precise_filtered;
676
677 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
678 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
679 self.inverted_index_cache_hit += *inverted_index_cache_hit;
680 self.inverted_index_cache_miss += *inverted_index_cache_miss;
681 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
682 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
683 self.pruner_cache_hit += *pruner_cache_hit;
684 self.pruner_cache_miss += *pruner_cache_miss;
685 self.pruner_prune_cost += *pruner_prune_cost;
686
687 self.num_sst_record_batches += *num_record_batches;
688 self.num_sst_batches += *num_batches;
689 self.num_sst_rows += *num_rows;
690
691 if let Some(metrics) = inverted_index_apply_metrics {
693 self.inverted_index_apply_metrics
694 .get_or_insert_with(InvertedIndexApplyMetrics::default)
695 .merge_from(metrics);
696 }
697 if let Some(metrics) = bloom_filter_apply_metrics {
698 self.bloom_filter_apply_metrics
699 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
700 .merge_from(metrics);
701 }
702 if let Some(metrics) = fulltext_index_apply_metrics {
703 self.fulltext_index_apply_metrics
704 .get_or_insert_with(FulltextIndexApplyMetrics::default)
705 .merge_from(metrics);
706 }
707 if let Some(metrics) = fetch_metrics {
708 self.fetch_metrics
709 .get_or_insert_with(ParquetFetchMetrics::default)
710 .merge_from(metrics);
711 }
712 self.metadata_cache_metrics
713 .get_or_insert_with(MetadataCacheMetrics::default)
714 .merge_from(metadata_cache_metrics);
715
716 self.build_ranges_mem_size += *metadata_mem_size;
718 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
719 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
720 }
721
722 self.num_range_builders += *num_range_builders;
724 if self.num_range_builders > self.num_peak_range_builders {
725 self.num_peak_range_builders = self.num_range_builders;
726 }
727 }
728
729 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
731 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
732 for (file_id, metrics) in other {
733 self_file_metrics
734 .entry(*file_id)
735 .or_default()
736 .merge_from(metrics);
737 }
738 }
739
740 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
742 let SeriesDistributorMetrics {
743 num_series_send_timeout,
744 num_series_send_full,
745 num_rows,
746 num_batches,
747 scan_cost,
748 yield_cost,
749 divider_cost,
750 } = distributor_metrics;
751
752 self.num_series_send_timeout += *num_series_send_timeout;
753 self.num_series_send_full += *num_series_send_full;
754 self.num_distributor_rows += *num_rows;
755 self.num_distributor_batches += *num_batches;
756 self.distributor_scan_cost += *scan_cost;
757 self.distributor_yield_cost += *yield_cost;
758 self.distributor_divider_cost += *divider_cost;
759 }
760
761 fn observe_metrics(&self) {
763 READ_STAGE_ELAPSED
764 .with_label_values(&["prepare_scan"])
765 .observe(self.prepare_scan_cost.as_secs_f64());
766 READ_STAGE_ELAPSED
767 .with_label_values(&["build_reader"])
768 .observe(self.build_reader_cost.as_secs_f64());
769 READ_STAGE_ELAPSED
770 .with_label_values(&["scan"])
771 .observe(self.scan_cost.as_secs_f64());
772 READ_STAGE_ELAPSED
773 .with_label_values(&["yield"])
774 .observe(self.yield_cost.as_secs_f64());
775 if let Some(time) = &self.convert_cost {
776 READ_STAGE_ELAPSED
777 .with_label_values(&["convert"])
778 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
779 }
780 READ_STAGE_ELAPSED
781 .with_label_values(&["total"])
782 .observe(self.total_cost.as_secs_f64());
783 READ_ROWS_RETURN.observe(self.num_rows as f64);
784 READ_BATCHES_RETURN.observe(self.num_batches as f64);
785
786 READ_STAGE_ELAPSED
787 .with_label_values(&["build_parts"])
788 .observe(self.build_parts_cost.as_secs_f64());
789
790 READ_ROW_GROUPS_TOTAL
791 .with_label_values(&["before_filtering"])
792 .inc_by(self.rg_total as u64);
793 READ_ROW_GROUPS_TOTAL
794 .with_label_values(&["fulltext_index_filtered"])
795 .inc_by(self.rg_fulltext_filtered as u64);
796 READ_ROW_GROUPS_TOTAL
797 .with_label_values(&["inverted_index_filtered"])
798 .inc_by(self.rg_inverted_filtered as u64);
799 READ_ROW_GROUPS_TOTAL
800 .with_label_values(&["minmax_index_filtered"])
801 .inc_by(self.rg_minmax_filtered as u64);
802 READ_ROW_GROUPS_TOTAL
803 .with_label_values(&["bloom_filter_index_filtered"])
804 .inc_by(self.rg_bloom_filtered as u64);
805 #[cfg(feature = "vector_index")]
806 READ_ROW_GROUPS_TOTAL
807 .with_label_values(&["vector_index_filtered"])
808 .inc_by(self.rg_vector_filtered as u64);
809
810 PRECISE_FILTER_ROWS_TOTAL
811 .with_label_values(&["parquet"])
812 .inc_by(self.rows_precise_filtered as u64);
813 READ_ROWS_IN_ROW_GROUP_TOTAL
814 .with_label_values(&["before_filtering"])
815 .inc_by(self.rows_before_filter as u64);
816 READ_ROWS_IN_ROW_GROUP_TOTAL
817 .with_label_values(&["fulltext_index_filtered"])
818 .inc_by(self.rows_fulltext_filtered as u64);
819 READ_ROWS_IN_ROW_GROUP_TOTAL
820 .with_label_values(&["inverted_index_filtered"])
821 .inc_by(self.rows_inverted_filtered as u64);
822 READ_ROWS_IN_ROW_GROUP_TOTAL
823 .with_label_values(&["bloom_filter_index_filtered"])
824 .inc_by(self.rows_bloom_filtered as u64);
825 #[cfg(feature = "vector_index")]
826 READ_ROWS_IN_ROW_GROUP_TOTAL
827 .with_label_values(&["vector_index_filtered"])
828 .inc_by(self.rows_vector_filtered as u64);
829 }
830}
831
832struct PartitionMetricsInner {
833 region_id: RegionId,
834 partition: usize,
836 scanner_type: &'static str,
838 query_start: Instant,
840 explain_verbose: bool,
842 metrics: Mutex<ScanMetricsSet>,
844 in_progress_scan: IntGauge,
845
846 build_parts_cost: Time,
849 build_reader_cost: Time,
851 scan_cost: Time,
853 yield_cost: Time,
855 convert_cost: Time,
857 elapsed_compute: Time,
859}
860
861impl PartitionMetricsInner {
862 fn on_finish(&self, stream_eof: bool) {
863 let mut metrics = self.metrics.lock().unwrap();
864 if metrics.total_cost.is_zero() {
865 metrics.total_cost = self.query_start.elapsed();
866 }
867 if !metrics.stream_eof {
868 metrics.stream_eof = stream_eof;
869 }
870 }
871}
872
873impl MergeMetricsReport for PartitionMetricsInner {
874 fn report(&self, metrics: &mut MergeMetrics) {
875 let mut scan_metrics = self.metrics.lock().unwrap();
876 scan_metrics.merge_metrics.merge(metrics);
878
879 *metrics = MergeMetrics::default();
881 }
882}
883
884impl DedupMetricsReport for PartitionMetricsInner {
885 fn report(&self, metrics: &mut DedupMetrics) {
886 let mut scan_metrics = self.metrics.lock().unwrap();
887 scan_metrics.dedup_metrics.merge(metrics);
889
890 *metrics = DedupMetrics::default();
892 }
893}
894
895impl Drop for PartitionMetricsInner {
896 fn drop(&mut self) {
897 self.on_finish(false);
898 let metrics = self.metrics.lock().unwrap();
899 metrics.observe_metrics();
900 self.in_progress_scan.dec();
901
902 if self.explain_verbose {
903 common_telemetry::info!(
904 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
905 self.scanner_type,
906 self.region_id,
907 self.partition,
908 metrics,
909 );
910 } else {
911 common_telemetry::debug!(
912 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
913 self.scanner_type,
914 self.region_id,
915 self.partition,
916 metrics,
917 );
918 }
919 }
920}
921
922#[derive(Default)]
924pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
925
926impl PartitionMetricsList {
927 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
929 let mut list = self.0.lock().unwrap();
930 if list.len() <= partition {
931 list.resize(partition + 1, None);
932 }
933 list[partition] = Some(metrics);
934 }
935
936 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
938 let list = self.0.lock().unwrap();
939 write!(f, ", \"metrics_per_partition\": ")?;
940 f.debug_list()
941 .entries(list.iter().filter_map(|p| p.as_ref()))
942 .finish()?;
943 write!(f, "}}")
944 }
945}
946
947#[derive(Clone)]
949pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
950
951impl PartitionMetrics {
952 pub(crate) fn new(
953 region_id: RegionId,
954 partition: usize,
955 scanner_type: &'static str,
956 query_start: Instant,
957 explain_verbose: bool,
958 metrics_set: &ExecutionPlanMetricsSet,
959 ) -> Self {
960 let partition_str = partition.to_string();
961 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
962 in_progress_scan.inc();
963 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
964 let metrics = ScanMetricsSet::default()
965 .with_prepare_scan_cost(query_start.elapsed())
966 .with_convert_cost(convert_cost.clone());
967 let inner = PartitionMetricsInner {
968 region_id,
969 partition,
970 scanner_type,
971 query_start,
972 explain_verbose,
973 metrics: Mutex::new(metrics),
974 in_progress_scan,
975 build_parts_cost: MetricBuilder::new(metrics_set)
976 .subset_time("build_parts_cost", partition),
977 build_reader_cost: MetricBuilder::new(metrics_set)
978 .subset_time("build_reader_cost", partition),
979 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
980 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
981 convert_cost,
982 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
983 };
984 Self(Arc::new(inner))
985 }
986
987 pub(crate) fn on_first_poll(&self) {
988 let mut metrics = self.0.metrics.lock().unwrap();
989 metrics.first_poll = self.0.query_start.elapsed();
990 }
991
992 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
993 let mut metrics = self.0.metrics.lock().unwrap();
994 metrics.num_mem_ranges += num;
995 }
996
997 pub fn inc_num_file_ranges(&self, num: usize) {
998 let mut metrics = self.0.metrics.lock().unwrap();
999 metrics.num_file_ranges += num;
1000 }
1001
1002 fn record_elapsed_compute(&self, duration: Duration) {
1003 if duration.is_zero() {
1004 return;
1005 }
1006 self.0.elapsed_compute.add_duration(duration);
1007 }
1008
1009 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1011 self.0.build_reader_cost.add_duration(cost);
1012
1013 let mut metrics = self.0.metrics.lock().unwrap();
1014 metrics.build_reader_cost += cost;
1015 }
1016
1017 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1018 self.0.convert_cost.add_duration(cost);
1019 self.record_elapsed_compute(cost);
1020 }
1021
1022 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1024 let mut metrics = self.0.metrics.lock().unwrap();
1025 metrics.mem_scan_cost += data.scan_cost;
1026 metrics.mem_rows += data.num_rows;
1027 metrics.mem_batches += data.num_batches;
1028 metrics.mem_series += data.total_series;
1029 }
1030
1031 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1033 self.0.scan_cost.add_duration(metrics.scan_cost);
1034 self.record_elapsed_compute(metrics.scan_cost);
1035 self.0.yield_cost.add_duration(metrics.yield_cost);
1036 self.record_elapsed_compute(metrics.yield_cost);
1037
1038 let mut metrics_set = self.0.metrics.lock().unwrap();
1039 metrics_set.merge_scanner_metrics(metrics);
1040 }
1041
1042 pub fn merge_reader_metrics(
1044 &self,
1045 metrics: &ReaderMetrics,
1046 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1047 ) {
1048 self.0.build_parts_cost.add_duration(metrics.build_cost);
1049
1050 let mut metrics_set = self.0.metrics.lock().unwrap();
1051 metrics_set.merge_reader_metrics(metrics);
1052
1053 if let Some(file_metrics) = per_file_metrics {
1055 metrics_set.merge_per_file_metrics(file_metrics);
1056 }
1057 }
1058
1059 pub(crate) fn on_finish(&self) {
1061 self.0.on_finish(true);
1062 }
1063
1064 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1066 let mut metrics_set = self.0.metrics.lock().unwrap();
1067 metrics_set.set_distributor_metrics(metrics);
1068 }
1069
1070 pub(crate) fn explain_verbose(&self) -> bool {
1072 self.0.explain_verbose
1073 }
1074
1075 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1077 self.0.clone()
1078 }
1079
1080 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1082 self.0.clone()
1083 }
1084}
1085
1086impl fmt::Debug for PartitionMetrics {
1087 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088 let metrics = self.0.metrics.lock().unwrap();
1089 write!(
1090 f,
1091 r#"{{"partition":{}, "metrics":{:?}}}"#,
1092 self.0.partition, metrics
1093 )
1094 }
1095}
1096
1097#[derive(Default)]
1099pub(crate) struct SeriesDistributorMetrics {
1100 pub(crate) num_series_send_timeout: usize,
1102 pub(crate) num_series_send_full: usize,
1104 pub(crate) num_rows: usize,
1106 pub(crate) num_batches: usize,
1108 pub(crate) scan_cost: Duration,
1110 pub(crate) yield_cost: Duration,
1112 pub(crate) divider_cost: Duration,
1114}
1115
1116#[tracing::instrument(
1118 skip_all,
1119 fields(
1120 region_id = %stream_ctx.input.region_metadata().region_id,
1121 file_or_mem_index = %index.index,
1122 row_group_index = %index.row_group_index,
1123 source = "mem"
1124 )
1125)]
1126pub(crate) fn scan_mem_ranges(
1127 stream_ctx: Arc<StreamContext>,
1128 part_metrics: PartitionMetrics,
1129 index: RowGroupIndex,
1130 time_range: FileTimeRange,
1131) -> impl Stream<Item = Result<Batch>> {
1132 try_stream! {
1133 let ranges = stream_ctx.input.build_mem_ranges(index);
1134 part_metrics.inc_num_mem_ranges(ranges.len());
1135 for range in ranges {
1136 let build_reader_start = Instant::now();
1137 let mem_scan_metrics = Some(MemScanMetrics::default());
1138 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1139 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1140
1141 let mut source = Source::Iter(iter);
1142 while let Some(batch) = source.next_batch().await? {
1143 yield batch;
1144 }
1145
1146 if let Some(ref metrics) = mem_scan_metrics {
1148 let data = metrics.data();
1149 part_metrics.report_mem_scan_metrics(&data);
1150 }
1151 }
1152 }
1153}
1154
1155#[tracing::instrument(
1157 skip_all,
1158 fields(
1159 region_id = %stream_ctx.input.region_metadata().region_id,
1160 row_group_index = %index.index,
1161 source = "mem_flat"
1162 )
1163)]
1164pub(crate) fn scan_flat_mem_ranges(
1165 stream_ctx: Arc<StreamContext>,
1166 part_metrics: PartitionMetrics,
1167 index: RowGroupIndex,
1168) -> impl Stream<Item = Result<RecordBatch>> {
1169 try_stream! {
1170 let ranges = stream_ctx.input.build_mem_ranges(index);
1171 part_metrics.inc_num_mem_ranges(ranges.len());
1172 for range in ranges {
1173 let build_reader_start = Instant::now();
1174 let mem_scan_metrics = Some(MemScanMetrics::default());
1175 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
1176 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1177
1178 while let Some(record_batch) = iter.next().transpose()? {
1179 yield record_batch;
1180 }
1181
1182 if let Some(ref metrics) = mem_scan_metrics {
1184 let data = metrics.data();
1185 part_metrics.report_mem_scan_metrics(&data);
1186 }
1187 }
1188 }
1189}
1190
1191const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1193const NUM_SERIES_THRESHOLD: u64 = 10240;
1195const BATCH_SIZE_THRESHOLD: u64 = 50;
1198
1199pub(crate) fn should_split_flat_batches_for_merge(
1201 stream_ctx: &Arc<StreamContext>,
1202 range_meta: &RangeMeta,
1203) -> bool {
1204 let mut num_files_to_split = 0;
1206 let mut num_mem_rows = 0;
1207 let mut num_mem_series = 0;
1208 for index in &range_meta.row_group_indices {
1212 if stream_ctx.is_mem_range_index(*index) {
1213 let memtable = &stream_ctx.input.memtables[index.index];
1214 let stats = memtable.stats();
1216 num_mem_rows += stats.num_rows();
1217 num_mem_series += stats.series_count();
1218 } else if stream_ctx.is_file_range_index(*index) {
1219 let file_index = index.index - stream_ctx.input.num_memtables();
1221 let file = &stream_ctx.input.files[file_index];
1222 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1223 continue;
1225 }
1226 debug_assert!(file.meta_ref().num_rows > 0);
1227 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1228 return false;
1230 } else {
1231 num_files_to_split += 1;
1232 }
1233 }
1234 }
1236
1237 if num_files_to_split > 0 {
1238 true
1240 } else if num_mem_series > 0 && num_mem_rows > 0 {
1241 can_split_series(num_mem_rows as u64, num_mem_series as u64)
1243 } else {
1244 false
1245 }
1246}
1247
1248fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1249 assert!(num_series > 0);
1250 assert!(num_rows > 0);
1251
1252 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1254}
1255
1256fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1259 if explain_verbose {
1260 ReaderFilterMetrics {
1261 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1262 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1263 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1264 ..Default::default()
1265 }
1266 } else {
1267 ReaderFilterMetrics::default()
1268 }
1269}
1270
1271#[tracing::instrument(
1273 skip_all,
1274 fields(
1275 region_id = %stream_ctx.input.region_metadata().region_id,
1276 row_group_index = %index.index,
1277 source = read_type
1278 )
1279)]
1280pub(crate) async fn scan_file_ranges(
1281 stream_ctx: Arc<StreamContext>,
1282 part_metrics: PartitionMetrics,
1283 index: RowGroupIndex,
1284 read_type: &'static str,
1285 partition_pruner: Arc<PartitionPruner>,
1286) -> Result<impl Stream<Item = Result<Batch>>> {
1287 let mut reader_metrics = ReaderMetrics {
1288 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1289 ..Default::default()
1290 };
1291 let ranges = partition_pruner
1292 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1293 .await?;
1294 part_metrics.inc_num_file_ranges(ranges.len());
1295 part_metrics.merge_reader_metrics(&reader_metrics, None);
1296
1297 let init_per_file_metrics = if part_metrics.explain_verbose() {
1299 let file = stream_ctx.input.file_from_index(index);
1300 let file_id = file.file_id();
1301
1302 let mut map = HashMap::new();
1303 map.insert(
1304 file_id,
1305 FileScanMetrics {
1306 build_part_cost: reader_metrics.build_cost,
1307 ..Default::default()
1308 },
1309 );
1310 Some(map)
1311 } else {
1312 None
1313 };
1314
1315 Ok(build_file_range_scan_stream(
1316 stream_ctx,
1317 part_metrics,
1318 read_type,
1319 ranges,
1320 init_per_file_metrics,
1321 ))
1322}
1323
1324#[tracing::instrument(
1326 skip_all,
1327 fields(
1328 region_id = %stream_ctx.input.region_metadata().region_id,
1329 row_group_index = %index.index,
1330 source = read_type
1331 )
1332)]
1333pub(crate) async fn scan_flat_file_ranges(
1334 stream_ctx: Arc<StreamContext>,
1335 part_metrics: PartitionMetrics,
1336 index: RowGroupIndex,
1337 read_type: &'static str,
1338 partition_pruner: Arc<PartitionPruner>,
1339) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1340 let mut reader_metrics = ReaderMetrics {
1341 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1342 ..Default::default()
1343 };
1344 let ranges = partition_pruner
1345 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1346 .await?;
1347 part_metrics.inc_num_file_ranges(ranges.len());
1348 part_metrics.merge_reader_metrics(&reader_metrics, None);
1349
1350 let init_per_file_metrics = if part_metrics.explain_verbose() {
1352 let file = stream_ctx.input.file_from_index(index);
1353 let file_id = file.file_id();
1354
1355 let mut map = HashMap::new();
1356 map.insert(
1357 file_id,
1358 FileScanMetrics {
1359 build_part_cost: reader_metrics.build_cost,
1360 ..Default::default()
1361 },
1362 );
1363 Some(map)
1364 } else {
1365 None
1366 };
1367
1368 Ok(build_flat_file_range_scan_stream(
1369 stream_ctx,
1370 part_metrics,
1371 read_type,
1372 ranges,
1373 init_per_file_metrics,
1374 ))
1375}
1376
1377#[tracing::instrument(
1379 skip_all,
1380 fields(read_type = read_type, range_count = ranges.len())
1381)]
1382pub fn build_file_range_scan_stream(
1383 stream_ctx: Arc<StreamContext>,
1384 part_metrics: PartitionMetrics,
1385 read_type: &'static str,
1386 ranges: SmallVec<[FileRange; 2]>,
1387 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1388) -> impl Stream<Item = Result<Batch>> {
1389 try_stream! {
1390 let fetch_metrics = if part_metrics.explain_verbose() {
1391 Some(Arc::new(ParquetFetchMetrics::default()))
1392 } else {
1393 None
1394 };
1395 let reader_metrics = &mut ReaderMetrics {
1396 fetch_metrics: fetch_metrics.clone(),
1397 ..Default::default()
1398 };
1399 for range in ranges {
1400 let build_reader_start = Instant::now();
1401 let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1402 continue;
1403 };
1404 let build_cost = build_reader_start.elapsed();
1405 part_metrics.inc_build_reader_cost(build_cost);
1406 let compat_batch = range.compat_batch();
1407 let mut source = Source::PruneReader(reader);
1408 while let Some(mut batch) = source.next_batch().await? {
1409 if let Some(compact_batch) = compat_batch {
1410 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1411 }
1412 yield batch;
1413 }
1414 if let Source::PruneReader(reader) = source {
1415 let prune_metrics = reader.metrics();
1416
1417 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1419 let file_id = range.file_handle().file_id();
1420 let file_metrics = file_metrics_map
1421 .entry(file_id)
1422 .or_insert_with(FileScanMetrics::default);
1423
1424 file_metrics.num_ranges += 1;
1425 file_metrics.num_rows += prune_metrics.num_rows;
1426 file_metrics.build_reader_cost += build_cost;
1427 file_metrics.scan_cost += prune_metrics.scan_cost;
1428 }
1429
1430 reader_metrics.merge_from(&prune_metrics);
1431 }
1432 }
1433
1434 reader_metrics.observe_rows(read_type);
1436 reader_metrics.filter_metrics.observe();
1437 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1438 }
1439}
1440
1441#[tracing::instrument(
1443 skip_all,
1444 fields(read_type = read_type, range_count = ranges.len())
1445)]
1446pub fn build_flat_file_range_scan_stream(
1447 _stream_ctx: Arc<StreamContext>,
1448 part_metrics: PartitionMetrics,
1449 read_type: &'static str,
1450 ranges: SmallVec<[FileRange; 2]>,
1451 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1452) -> impl Stream<Item = Result<RecordBatch>> {
1453 try_stream! {
1454 let fetch_metrics = if part_metrics.explain_verbose() {
1455 Some(Arc::new(ParquetFetchMetrics::default()))
1456 } else {
1457 None
1458 };
1459 let reader_metrics = &mut ReaderMetrics {
1460 fetch_metrics: fetch_metrics.clone(),
1461 ..Default::default()
1462 };
1463 for range in ranges {
1464 let build_reader_start = Instant::now();
1465 let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1466 let build_cost = build_reader_start.elapsed();
1467 part_metrics.inc_build_reader_cost(build_cost);
1468
1469 let may_compat = range
1470 .compat_batch()
1471 .map(|compat| {
1472 compat.as_flat().context(UnexpectedSnafu {
1473 reason: "Invalid compat for flat format",
1474 })
1475 })
1476 .transpose()?;
1477
1478 let mapper = range.compaction_projection_mapper();
1479 while let Some(record_batch) = reader.next_batch()? {
1480 let record_batch = if let Some(mapper) = mapper {
1481 let batch = mapper.project(record_batch)?;
1482 batch
1483 } else {
1484 record_batch
1485 };
1486
1487 if let Some(flat_compat) = may_compat {
1488 let batch = flat_compat.compat(record_batch)?;
1489 yield batch;
1490 } else {
1491 yield record_batch;
1492 }
1493 }
1494
1495 let prune_metrics = reader.metrics();
1496
1497 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1499 let file_id = range.file_handle().file_id();
1500 let file_metrics = file_metrics_map
1501 .entry(file_id)
1502 .or_insert_with(FileScanMetrics::default);
1503
1504 file_metrics.num_ranges += 1;
1505 file_metrics.num_rows += prune_metrics.num_rows;
1506 file_metrics.build_reader_cost += build_cost;
1507 file_metrics.scan_cost += prune_metrics.scan_cost;
1508 }
1509
1510 reader_metrics.merge_from(&prune_metrics);
1511 }
1512
1513 reader_metrics.observe_rows(read_type);
1515 reader_metrics.filter_metrics.observe();
1516 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1517 }
1518}
1519
1520#[cfg(feature = "enterprise")]
1522pub(crate) async fn scan_extension_range(
1523 context: Arc<StreamContext>,
1524 index: RowGroupIndex,
1525 partition_metrics: PartitionMetrics,
1526) -> Result<BoxedBatchStream> {
1527 use snafu::ResultExt;
1528
1529 let range = context.input.extension_range(index.index);
1530 let reader = range.reader(context.as_ref());
1531 let stream = reader
1532 .read(context, partition_metrics, index)
1533 .await
1534 .context(crate::error::ScanExternalRangeSnafu)?;
1535 Ok(stream)
1536}
1537
1538pub(crate) async fn maybe_scan_other_ranges(
1539 context: &Arc<StreamContext>,
1540 index: RowGroupIndex,
1541 metrics: &PartitionMetrics,
1542) -> Result<BoxedBatchStream> {
1543 #[cfg(feature = "enterprise")]
1544 {
1545 scan_extension_range(context.clone(), index, metrics.clone()).await
1546 }
1547
1548 #[cfg(not(feature = "enterprise"))]
1549 {
1550 let _ = context;
1551 let _ = index;
1552 let _ = metrics;
1553
1554 crate::error::UnexpectedSnafu {
1555 reason: "no other ranges scannable",
1556 }
1557 .fail()
1558 }
1559}
1560
1561#[cfg(feature = "enterprise")]
1563pub(crate) async fn scan_flat_extension_range(
1564 context: Arc<StreamContext>,
1565 index: RowGroupIndex,
1566 partition_metrics: PartitionMetrics,
1567) -> Result<BoxedRecordBatchStream> {
1568 use snafu::ResultExt;
1569
1570 let range = context.input.extension_range(index.index);
1571 let reader = range.flat_reader(context.as_ref());
1572 let stream = reader
1573 .read(context, partition_metrics, index)
1574 .await
1575 .context(crate::error::ScanExternalRangeSnafu)?;
1576 Ok(stream)
1577}
1578
1579pub(crate) async fn maybe_scan_flat_other_ranges(
1580 context: &Arc<StreamContext>,
1581 index: RowGroupIndex,
1582 metrics: &PartitionMetrics,
1583) -> Result<BoxedRecordBatchStream> {
1584 #[cfg(feature = "enterprise")]
1585 {
1586 scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1587 }
1588
1589 #[cfg(not(feature = "enterprise"))]
1590 {
1591 let _ = context;
1592 let _ = index;
1593 let _ = metrics;
1594
1595 crate::error::UnexpectedSnafu {
1596 reason: "no other ranges scannable in flat format",
1597 }
1598 .fail()
1599 }
1600}
1601
1602pub(crate) struct SplitRecordBatchStream<S> {
1604 inner: S,
1606 batches: VecDeque<RecordBatch>,
1608}
1609
1610impl<S> SplitRecordBatchStream<S> {
1611 pub(crate) fn new(inner: S) -> Self {
1613 Self {
1614 inner,
1615 batches: VecDeque::new(),
1616 }
1617 }
1618}
1619
1620impl<S> Stream for SplitRecordBatchStream<S>
1621where
1622 S: Stream<Item = Result<RecordBatch>> + Unpin,
1623{
1624 type Item = Result<RecordBatch>;
1625
1626 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1627 loop {
1628 if let Some(batch) = self.batches.pop_front() {
1630 return Poll::Ready(Some(Ok(batch)));
1631 }
1632
1633 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1635 Some(Ok(batch)) => batch,
1636 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1637 None => return Poll::Ready(None),
1638 };
1639
1640 split_record_batch(record_batch, &mut self.batches);
1642 }
1644 }
1645}
1646
1647pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1652 let batch_rows = record_batch.num_rows();
1653 if batch_rows == 0 {
1654 return;
1655 }
1656 if batch_rows < 2 {
1657 batches.push_back(record_batch);
1658 return;
1659 }
1660
1661 let time_index_pos = time_index_column_index(record_batch.num_columns());
1662 let timestamps = record_batch.column(time_index_pos);
1663 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1664 let mut offsets = Vec::with_capacity(16);
1665 offsets.push(0);
1666 let values = ts_values.values();
1667 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1668 if value > values[i + 1] {
1669 offsets.push(i + 1);
1670 }
1671 }
1672 offsets.push(values.len());
1673
1674 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1676 let end = offsets[i + 1];
1677 let rows_in_batch = end - start;
1678 batches.push_back(record_batch.slice(start, rows_in_batch));
1679 }
1680}