1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
52use crate::sst::parquet::file_range::FileRange;
53use crate::sst::parquet::flat_format::time_index_column_index;
54use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
55use crate::sst::parquet::row_group::ParquetFetchMetrics;
56
57#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60 pub num_ranges: usize,
62 pub num_rows: usize,
64 pub build_part_cost: Duration,
66 pub build_reader_cost: Duration,
68 pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76 if self.num_ranges > 0 {
77 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78 }
79 if self.num_rows > 0 {
80 write!(f, ", \"num_rows\":{}", self.num_rows)?;
81 }
82 if !self.build_reader_cost.is_zero() {
83 write!(
84 f,
85 ", \"build_reader_cost\":\"{:?}\"",
86 self.build_reader_cost
87 )?;
88 }
89 if !self.scan_cost.is_zero() {
90 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91 }
92
93 write!(f, "}}")
94 }
95}
96
97impl FileScanMetrics {
98 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100 self.num_ranges += other.num_ranges;
101 self.num_rows += other.num_rows;
102 self.build_part_cost += other.build_part_cost;
103 self.build_reader_cost += other.build_reader_cost;
104 self.scan_cost += other.scan_cost;
105 }
106}
107
108#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111 prepare_scan_cost: Duration,
113 build_reader_cost: Duration,
115 scan_cost: Duration,
117 yield_cost: Duration,
119 convert_cost: Option<Time>,
121 total_cost: Duration,
123 num_rows: usize,
125 num_batches: usize,
127 num_mem_ranges: usize,
129 num_file_ranges: usize,
131
132 mem_scan_cost: Duration,
135 mem_rows: usize,
137 mem_batches: usize,
139 mem_series: usize,
141
142 build_parts_cost: Duration,
145 sst_scan_cost: Duration,
147 rg_total: usize,
149 rg_fulltext_filtered: usize,
151 rg_inverted_filtered: usize,
153 rg_minmax_filtered: usize,
155 rg_bloom_filtered: usize,
157 rg_vector_filtered: usize,
159 rows_before_filter: usize,
161 rows_fulltext_filtered: usize,
163 rows_inverted_filtered: usize,
165 rows_bloom_filtered: usize,
167 rows_vector_filtered: usize,
169 rows_vector_selected: usize,
171 rows_precise_filtered: usize,
173 fulltext_index_cache_hit: usize,
175 fulltext_index_cache_miss: usize,
177 inverted_index_cache_hit: usize,
179 inverted_index_cache_miss: usize,
181 bloom_filter_cache_hit: usize,
183 bloom_filter_cache_miss: usize,
185 minmax_cache_hit: usize,
187 minmax_cache_miss: usize,
189 pruner_cache_hit: usize,
191 pruner_cache_miss: usize,
193 pruner_prune_cost: Duration,
195 num_sst_record_batches: usize,
197 num_sst_batches: usize,
199 num_sst_rows: usize,
201
202 first_poll: Duration,
204
205 num_series_send_timeout: usize,
207 num_series_send_full: usize,
209 num_distributor_rows: usize,
211 num_distributor_batches: usize,
213 distributor_scan_cost: Duration,
215 distributor_yield_cost: Duration,
217 distributor_divider_cost: Duration,
219
220 merge_metrics: MergeMetrics,
222 dedup_metrics: DedupMetrics,
224
225 stream_eof: bool,
227
228 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
231 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
233 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
235 fetch_metrics: Option<ParquetFetchMetrics>,
237 metadata_cache_metrics: Option<MetadataCacheMetrics>,
239 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
241
242 build_ranges_mem_size: isize,
244 build_ranges_peak_mem_size: isize,
246 num_range_builders: isize,
248 num_peak_range_builders: isize,
250 range_cache_size: usize,
252 range_cache_hit: usize,
254 range_cache_miss: usize,
256}
257
258struct CompareCostReverse<'a> {
261 total_cost: Duration,
262 file_id: RegionFileId,
263 metrics: &'a FileScanMetrics,
264}
265
266impl Ord for CompareCostReverse<'_> {
267 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
268 other.total_cost.cmp(&self.total_cost)
270 }
271}
272
273impl PartialOrd for CompareCostReverse<'_> {
274 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
275 Some(self.cmp(other))
276 }
277}
278
279impl Eq for CompareCostReverse<'_> {}
280
281impl PartialEq for CompareCostReverse<'_> {
282 fn eq(&self, other: &Self) -> bool {
283 self.total_cost == other.total_cost
284 }
285}
286
287impl fmt::Debug for ScanMetricsSet {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 let ScanMetricsSet {
290 prepare_scan_cost,
291 build_reader_cost,
292 scan_cost,
293 yield_cost,
294 convert_cost,
295 total_cost,
296 num_rows,
297 num_batches,
298 num_mem_ranges,
299 num_file_ranges,
300 build_parts_cost,
301 sst_scan_cost,
302 rg_total,
303 rg_fulltext_filtered,
304 rg_inverted_filtered,
305 rg_minmax_filtered,
306 rg_bloom_filtered,
307 rg_vector_filtered,
308 rows_before_filter,
309 rows_fulltext_filtered,
310 rows_inverted_filtered,
311 rows_bloom_filtered,
312 rows_vector_filtered,
313 rows_vector_selected,
314 rows_precise_filtered,
315 fulltext_index_cache_hit,
316 fulltext_index_cache_miss,
317 inverted_index_cache_hit,
318 inverted_index_cache_miss,
319 bloom_filter_cache_hit,
320 bloom_filter_cache_miss,
321 minmax_cache_hit,
322 minmax_cache_miss,
323 pruner_cache_hit,
324 pruner_cache_miss,
325 pruner_prune_cost,
326 num_sst_record_batches,
327 num_sst_batches,
328 num_sst_rows,
329 first_poll,
330 num_series_send_timeout,
331 num_series_send_full,
332 num_distributor_rows,
333 num_distributor_batches,
334 distributor_scan_cost,
335 distributor_yield_cost,
336 distributor_divider_cost,
337 merge_metrics,
338 dedup_metrics,
339 stream_eof,
340 mem_scan_cost,
341 mem_rows,
342 mem_batches,
343 mem_series,
344 inverted_index_apply_metrics,
345 bloom_filter_apply_metrics,
346 fulltext_index_apply_metrics,
347 fetch_metrics,
348 metadata_cache_metrics,
349 per_file_metrics,
350 build_ranges_mem_size: _,
351 build_ranges_peak_mem_size,
352 num_range_builders: _,
353 num_peak_range_builders,
354 range_cache_size,
355 range_cache_hit,
356 range_cache_miss,
357 } = self;
358
359 write!(
361 f,
362 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
363 \"build_reader_cost\":\"{build_reader_cost:?}\", \
364 \"scan_cost\":\"{scan_cost:?}\", \
365 \"yield_cost\":\"{yield_cost:?}\", \
366 \"total_cost\":\"{total_cost:?}\", \
367 \"num_rows\":{num_rows}, \
368 \"num_batches\":{num_batches}, \
369 \"num_mem_ranges\":{num_mem_ranges}, \
370 \"num_file_ranges\":{num_file_ranges}, \
371 \"build_parts_cost\":\"{build_parts_cost:?}\", \
372 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
373 \"rg_total\":{rg_total}, \
374 \"rows_before_filter\":{rows_before_filter}, \
375 \"num_sst_record_batches\":{num_sst_record_batches}, \
376 \"num_sst_batches\":{num_sst_batches}, \
377 \"num_sst_rows\":{num_sst_rows}, \
378 \"first_poll\":\"{first_poll:?}\""
379 )?;
380
381 if let Some(time) = convert_cost {
383 let duration = Duration::from_nanos(time.value() as u64);
384 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
385 }
386
387 if *rg_fulltext_filtered > 0 {
389 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
390 }
391 if *rg_inverted_filtered > 0 {
392 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
393 }
394 if *rg_minmax_filtered > 0 {
395 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
396 }
397 if *rg_bloom_filtered > 0 {
398 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
399 }
400 if *rg_vector_filtered > 0 {
401 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
402 }
403 if *rows_fulltext_filtered > 0 {
404 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
405 }
406 if *rows_inverted_filtered > 0 {
407 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
408 }
409 if *rows_bloom_filtered > 0 {
410 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
411 }
412 if *rows_vector_filtered > 0 {
413 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
414 }
415 if *rows_vector_selected > 0 {
416 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
417 }
418 if *rows_precise_filtered > 0 {
419 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
420 }
421 if *fulltext_index_cache_hit > 0 {
422 write!(
423 f,
424 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
425 )?;
426 }
427 if *fulltext_index_cache_miss > 0 {
428 write!(
429 f,
430 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
431 )?;
432 }
433 if *inverted_index_cache_hit > 0 {
434 write!(
435 f,
436 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
437 )?;
438 }
439 if *inverted_index_cache_miss > 0 {
440 write!(
441 f,
442 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
443 )?;
444 }
445 if *bloom_filter_cache_hit > 0 {
446 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
447 }
448 if *bloom_filter_cache_miss > 0 {
449 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
450 }
451 if *minmax_cache_hit > 0 {
452 write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
453 }
454 if *minmax_cache_miss > 0 {
455 write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
456 }
457 if *pruner_cache_hit > 0 {
458 write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
459 }
460 if *pruner_cache_miss > 0 {
461 write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
462 }
463 if !pruner_prune_cost.is_zero() {
464 write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
465 }
466
467 if *num_series_send_timeout > 0 {
469 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
470 }
471 if *num_series_send_full > 0 {
472 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
473 }
474 if *num_distributor_rows > 0 {
475 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
476 }
477 if *num_distributor_batches > 0 {
478 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
479 }
480 if !distributor_scan_cost.is_zero() {
481 write!(
482 f,
483 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
484 )?;
485 }
486 if !distributor_yield_cost.is_zero() {
487 write!(
488 f,
489 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
490 )?;
491 }
492 if !distributor_divider_cost.is_zero() {
493 write!(
494 f,
495 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
496 )?;
497 }
498
499 if *mem_rows > 0 {
501 write!(f, ", \"mem_rows\":{mem_rows}")?;
502 }
503 if *mem_batches > 0 {
504 write!(f, ", \"mem_batches\":{mem_batches}")?;
505 }
506 if *mem_series > 0 {
507 write!(f, ", \"mem_series\":{mem_series}")?;
508 }
509 if !mem_scan_cost.is_zero() {
510 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
511 }
512
513 if let Some(metrics) = inverted_index_apply_metrics
515 && !metrics.is_empty()
516 {
517 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
518 }
519 if let Some(metrics) = bloom_filter_apply_metrics
520 && !metrics.is_empty()
521 {
522 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
523 }
524 if let Some(metrics) = fulltext_index_apply_metrics
525 && !metrics.is_empty()
526 {
527 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
528 }
529 if let Some(metrics) = fetch_metrics
530 && !metrics.is_empty()
531 {
532 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
533 }
534 if let Some(metrics) = metadata_cache_metrics
535 && !metrics.is_empty()
536 {
537 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
538 }
539
540 if !merge_metrics.scan_cost.is_zero() {
542 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
543 }
544
545 if !dedup_metrics.dedup_cost.is_zero() {
547 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
548 }
549
550 if let Some(file_metrics) = per_file_metrics
552 && !file_metrics.is_empty()
553 {
554 let mut heap = BinaryHeap::new();
556 for (file_id, metrics) in file_metrics.iter() {
557 let total_cost =
558 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
559
560 if total_cost.is_zero() && metrics.num_ranges == 0 {
563 continue;
564 }
565
566 if heap.len() < 10 {
567 heap.push(CompareCostReverse {
569 total_cost,
570 file_id: *file_id,
571 metrics,
572 });
573 } else if let Some(min_entry) = heap.peek() {
574 if total_cost > min_entry.total_cost {
576 heap.pop();
577 heap.push(CompareCostReverse {
578 total_cost,
579 file_id: *file_id,
580 metrics,
581 });
582 }
583 }
584 }
585
586 let top_files = heap.into_sorted_vec();
587 write!(f, ", \"top_file_metrics\": {{")?;
588 for (i, item) in top_files.iter().enumerate() {
589 let CompareCostReverse {
590 total_cost: _,
591 file_id,
592 metrics,
593 } = item;
594 if i > 0 {
595 write!(f, ", ")?;
596 }
597 write!(f, "\"{}\": {:?}", file_id, metrics)?;
598 }
599 write!(f, "}}")?;
600 }
601
602 if *range_cache_size > 0 {
603 write!(f, ", \"range_cache_size\":{range_cache_size}")?;
604 }
605 if *range_cache_hit > 0 {
606 write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
607 }
608 if *range_cache_miss > 0 {
609 write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
610 }
611
612 write!(
613 f,
614 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
615 \"num_peak_range_builders\":{num_peak_range_builders}, \
616 \"stream_eof\":{stream_eof}}}"
617 )
618 }
619}
620impl ScanMetricsSet {
621 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
623 self.prepare_scan_cost += cost;
624 self
625 }
626
627 fn with_convert_cost(mut self, time: Time) -> Self {
629 self.convert_cost = Some(time);
630 self
631 }
632
633 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
635 let ScannerMetrics {
636 scan_cost,
637 yield_cost,
638 num_batches,
639 num_rows,
640 } = other;
641
642 self.scan_cost += *scan_cost;
643 self.yield_cost += *yield_cost;
644 self.num_rows += *num_rows;
645 self.num_batches += *num_batches;
646 }
647
648 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
650 let ReaderMetrics {
651 build_cost,
652 filter_metrics:
653 ReaderFilterMetrics {
654 rg_total,
655 rg_fulltext_filtered,
656 rg_inverted_filtered,
657 rg_minmax_filtered,
658 rg_bloom_filtered,
659 rg_vector_filtered,
660 rows_total,
661 rows_fulltext_filtered,
662 rows_inverted_filtered,
663 rows_bloom_filtered,
664 rows_vector_filtered,
665 rows_vector_selected,
666 rows_precise_filtered,
667 fulltext_index_cache_hit,
668 fulltext_index_cache_miss,
669 inverted_index_cache_hit,
670 inverted_index_cache_miss,
671 bloom_filter_cache_hit,
672 bloom_filter_cache_miss,
673 minmax_cache_hit,
674 minmax_cache_miss,
675 pruner_cache_hit,
676 pruner_cache_miss,
677 pruner_prune_cost,
678 inverted_index_apply_metrics,
679 bloom_filter_apply_metrics,
680 fulltext_index_apply_metrics,
681 },
682 num_record_batches,
683 num_batches,
684 num_rows,
685 scan_cost,
686 metadata_cache_metrics,
687 fetch_metrics,
688 metadata_mem_size,
689 num_range_builders,
690 } = other;
691
692 self.build_parts_cost += *build_cost;
693 self.sst_scan_cost += *scan_cost;
694
695 self.rg_total += *rg_total;
696 self.rg_fulltext_filtered += *rg_fulltext_filtered;
697 self.rg_inverted_filtered += *rg_inverted_filtered;
698 self.rg_minmax_filtered += *rg_minmax_filtered;
699 self.rg_bloom_filtered += *rg_bloom_filtered;
700 self.rg_vector_filtered += *rg_vector_filtered;
701
702 self.rows_before_filter += *rows_total;
703 self.rows_fulltext_filtered += *rows_fulltext_filtered;
704 self.rows_inverted_filtered += *rows_inverted_filtered;
705 self.rows_bloom_filtered += *rows_bloom_filtered;
706 self.rows_vector_filtered += *rows_vector_filtered;
707 self.rows_vector_selected += *rows_vector_selected;
708 self.rows_precise_filtered += *rows_precise_filtered;
709
710 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
711 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
712 self.inverted_index_cache_hit += *inverted_index_cache_hit;
713 self.inverted_index_cache_miss += *inverted_index_cache_miss;
714 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
715 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
716 self.minmax_cache_hit += *minmax_cache_hit;
717 self.minmax_cache_miss += *minmax_cache_miss;
718 self.pruner_cache_hit += *pruner_cache_hit;
719 self.pruner_cache_miss += *pruner_cache_miss;
720 self.pruner_prune_cost += *pruner_prune_cost;
721
722 self.num_sst_record_batches += *num_record_batches;
723 self.num_sst_batches += *num_batches;
724 self.num_sst_rows += *num_rows;
725
726 if let Some(metrics) = inverted_index_apply_metrics {
728 self.inverted_index_apply_metrics
729 .get_or_insert_with(InvertedIndexApplyMetrics::default)
730 .merge_from(metrics);
731 }
732 if let Some(metrics) = bloom_filter_apply_metrics {
733 self.bloom_filter_apply_metrics
734 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
735 .merge_from(metrics);
736 }
737 if let Some(metrics) = fulltext_index_apply_metrics {
738 self.fulltext_index_apply_metrics
739 .get_or_insert_with(FulltextIndexApplyMetrics::default)
740 .merge_from(metrics);
741 }
742 if let Some(metrics) = fetch_metrics {
743 self.fetch_metrics
744 .get_or_insert_with(ParquetFetchMetrics::default)
745 .merge_from(metrics);
746 }
747 self.metadata_cache_metrics
748 .get_or_insert_with(MetadataCacheMetrics::default)
749 .merge_from(metadata_cache_metrics);
750
751 self.build_ranges_mem_size += *metadata_mem_size;
753 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
754 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
755 }
756
757 self.num_range_builders += *num_range_builders;
759 if self.num_range_builders > self.num_peak_range_builders {
760 self.num_peak_range_builders = self.num_range_builders;
761 }
762 }
763
764 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
766 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
767 for (file_id, metrics) in other {
768 self_file_metrics
769 .entry(*file_id)
770 .or_default()
771 .merge_from(metrics);
772 }
773 }
774
775 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
777 let SeriesDistributorMetrics {
778 num_series_send_timeout,
779 num_series_send_full,
780 num_rows,
781 num_batches,
782 scan_cost,
783 yield_cost,
784 divider_cost,
785 } = distributor_metrics;
786
787 self.num_series_send_timeout += *num_series_send_timeout;
788 self.num_series_send_full += *num_series_send_full;
789 self.num_distributor_rows += *num_rows;
790 self.num_distributor_batches += *num_batches;
791 self.distributor_scan_cost += *scan_cost;
792 self.distributor_yield_cost += *yield_cost;
793 self.distributor_divider_cost += *divider_cost;
794 }
795
796 fn observe_metrics(&self) {
798 READ_STAGE_ELAPSED
799 .with_label_values(&["prepare_scan"])
800 .observe(self.prepare_scan_cost.as_secs_f64());
801 READ_STAGE_ELAPSED
802 .with_label_values(&["build_reader"])
803 .observe(self.build_reader_cost.as_secs_f64());
804 READ_STAGE_ELAPSED
805 .with_label_values(&["scan"])
806 .observe(self.scan_cost.as_secs_f64());
807 READ_STAGE_ELAPSED
808 .with_label_values(&["yield"])
809 .observe(self.yield_cost.as_secs_f64());
810 if let Some(time) = &self.convert_cost {
811 READ_STAGE_ELAPSED
812 .with_label_values(&["convert"])
813 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
814 }
815 READ_STAGE_ELAPSED
816 .with_label_values(&["total"])
817 .observe(self.total_cost.as_secs_f64());
818 READ_ROWS_RETURN.observe(self.num_rows as f64);
819 READ_BATCHES_RETURN.observe(self.num_batches as f64);
820
821 READ_STAGE_ELAPSED
822 .with_label_values(&["build_parts"])
823 .observe(self.build_parts_cost.as_secs_f64());
824
825 READ_ROW_GROUPS_TOTAL
826 .with_label_values(&["before_filtering"])
827 .inc_by(self.rg_total as u64);
828 READ_ROW_GROUPS_TOTAL
829 .with_label_values(&["fulltext_index_filtered"])
830 .inc_by(self.rg_fulltext_filtered as u64);
831 READ_ROW_GROUPS_TOTAL
832 .with_label_values(&["inverted_index_filtered"])
833 .inc_by(self.rg_inverted_filtered as u64);
834 READ_ROW_GROUPS_TOTAL
835 .with_label_values(&["minmax_index_filtered"])
836 .inc_by(self.rg_minmax_filtered as u64);
837 READ_ROW_GROUPS_TOTAL
838 .with_label_values(&["bloom_filter_index_filtered"])
839 .inc_by(self.rg_bloom_filtered as u64);
840 #[cfg(feature = "vector_index")]
841 READ_ROW_GROUPS_TOTAL
842 .with_label_values(&["vector_index_filtered"])
843 .inc_by(self.rg_vector_filtered as u64);
844
845 PRECISE_FILTER_ROWS_TOTAL
846 .with_label_values(&["parquet"])
847 .inc_by(self.rows_precise_filtered as u64);
848 READ_ROWS_IN_ROW_GROUP_TOTAL
849 .with_label_values(&["before_filtering"])
850 .inc_by(self.rows_before_filter as u64);
851 READ_ROWS_IN_ROW_GROUP_TOTAL
852 .with_label_values(&["fulltext_index_filtered"])
853 .inc_by(self.rows_fulltext_filtered as u64);
854 READ_ROWS_IN_ROW_GROUP_TOTAL
855 .with_label_values(&["inverted_index_filtered"])
856 .inc_by(self.rows_inverted_filtered as u64);
857 READ_ROWS_IN_ROW_GROUP_TOTAL
858 .with_label_values(&["bloom_filter_index_filtered"])
859 .inc_by(self.rows_bloom_filtered as u64);
860 #[cfg(feature = "vector_index")]
861 READ_ROWS_IN_ROW_GROUP_TOTAL
862 .with_label_values(&["vector_index_filtered"])
863 .inc_by(self.rows_vector_filtered as u64);
864 }
865}
866
867struct PartitionMetricsInner {
868 region_id: RegionId,
869 partition: usize,
871 scanner_type: &'static str,
873 query_start: Instant,
875 explain_verbose: bool,
877 metrics: Mutex<ScanMetricsSet>,
879 in_progress_scan: IntGauge,
880
881 build_parts_cost: Time,
884 build_reader_cost: Time,
886 scan_cost: Time,
888 yield_cost: Time,
890 convert_cost: Time,
892 elapsed_compute: Time,
894}
895
896impl PartitionMetricsInner {
897 fn on_finish(&self, stream_eof: bool) {
898 let mut metrics = self.metrics.lock().unwrap();
899 if metrics.total_cost.is_zero() {
900 metrics.total_cost = self.query_start.elapsed();
901 }
902 if !metrics.stream_eof {
903 metrics.stream_eof = stream_eof;
904 }
905 }
906}
907
908impl MergeMetricsReport for PartitionMetricsInner {
909 fn report(&self, metrics: &mut MergeMetrics) {
910 let mut scan_metrics = self.metrics.lock().unwrap();
911 scan_metrics.merge_metrics.merge(metrics);
913
914 *metrics = MergeMetrics::default();
916 }
917}
918
919impl DedupMetricsReport for PartitionMetricsInner {
920 fn report(&self, metrics: &mut DedupMetrics) {
921 let mut scan_metrics = self.metrics.lock().unwrap();
922 scan_metrics.dedup_metrics.merge(metrics);
924
925 *metrics = DedupMetrics::default();
927 }
928}
929
930impl Drop for PartitionMetricsInner {
931 fn drop(&mut self) {
932 self.on_finish(false);
933 let metrics = self.metrics.lock().unwrap();
934 metrics.observe_metrics();
935 self.in_progress_scan.dec();
936
937 if self.explain_verbose {
938 common_telemetry::info!(
939 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
940 self.scanner_type,
941 self.region_id,
942 self.partition,
943 metrics,
944 );
945 } else {
946 common_telemetry::debug!(
947 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
948 self.scanner_type,
949 self.region_id,
950 self.partition,
951 metrics,
952 );
953 }
954 }
955}
956
957#[derive(Default)]
959pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
960
961impl PartitionMetricsList {
962 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
964 let mut list = self.0.lock().unwrap();
965 if list.len() <= partition {
966 list.resize(partition + 1, None);
967 }
968 list[partition] = Some(metrics);
969 }
970
971 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
973 let list = self.0.lock().unwrap();
974 write!(f, ", \"metrics_per_partition\": ")?;
975 f.debug_list()
976 .entries(list.iter().filter_map(|p| p.as_ref()))
977 .finish()?;
978 write!(f, "}}")
979 }
980}
981
982#[derive(Clone)]
984pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
985
986impl PartitionMetrics {
987 pub(crate) fn new(
988 region_id: RegionId,
989 partition: usize,
990 scanner_type: &'static str,
991 query_start: Instant,
992 explain_verbose: bool,
993 metrics_set: &ExecutionPlanMetricsSet,
994 ) -> Self {
995 let partition_str = partition.to_string();
996 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
997 in_progress_scan.inc();
998 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
999 let metrics = ScanMetricsSet::default()
1000 .with_prepare_scan_cost(query_start.elapsed())
1001 .with_convert_cost(convert_cost.clone());
1002 let inner = PartitionMetricsInner {
1003 region_id,
1004 partition,
1005 scanner_type,
1006 query_start,
1007 explain_verbose,
1008 metrics: Mutex::new(metrics),
1009 in_progress_scan,
1010 build_parts_cost: MetricBuilder::new(metrics_set)
1011 .subset_time("build_parts_cost", partition),
1012 build_reader_cost: MetricBuilder::new(metrics_set)
1013 .subset_time("build_reader_cost", partition),
1014 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1015 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1016 convert_cost,
1017 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1018 };
1019 Self(Arc::new(inner))
1020 }
1021
1022 pub(crate) fn on_first_poll(&self) {
1023 let mut metrics = self.0.metrics.lock().unwrap();
1024 metrics.first_poll = self.0.query_start.elapsed();
1025 }
1026
1027 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1028 let mut metrics = self.0.metrics.lock().unwrap();
1029 metrics.num_mem_ranges += num;
1030 }
1031
1032 pub fn inc_num_file_ranges(&self, num: usize) {
1033 let mut metrics = self.0.metrics.lock().unwrap();
1034 metrics.num_file_ranges += num;
1035 }
1036
1037 fn record_elapsed_compute(&self, duration: Duration) {
1038 if duration.is_zero() {
1039 return;
1040 }
1041 self.0.elapsed_compute.add_duration(duration);
1042 }
1043
1044 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1046 self.0.build_reader_cost.add_duration(cost);
1047
1048 let mut metrics = self.0.metrics.lock().unwrap();
1049 metrics.build_reader_cost += cost;
1050 }
1051
1052 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1053 self.0.convert_cost.add_duration(cost);
1054 self.record_elapsed_compute(cost);
1055 }
1056
1057 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1059 let mut metrics = self.0.metrics.lock().unwrap();
1060 metrics.mem_scan_cost += data.scan_cost;
1061 metrics.mem_rows += data.num_rows;
1062 metrics.mem_batches += data.num_batches;
1063 metrics.mem_series += data.total_series;
1064 }
1065
1066 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1068 self.0.scan_cost.add_duration(metrics.scan_cost);
1069 self.record_elapsed_compute(metrics.scan_cost);
1070 self.0.yield_cost.add_duration(metrics.yield_cost);
1071 self.record_elapsed_compute(metrics.yield_cost);
1072
1073 let mut metrics_set = self.0.metrics.lock().unwrap();
1074 metrics_set.merge_scanner_metrics(metrics);
1075 }
1076
1077 pub fn merge_reader_metrics(
1079 &self,
1080 metrics: &ReaderMetrics,
1081 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1082 ) {
1083 self.0.build_parts_cost.add_duration(metrics.build_cost);
1084
1085 let mut metrics_set = self.0.metrics.lock().unwrap();
1086 metrics_set.merge_reader_metrics(metrics);
1087
1088 if let Some(file_metrics) = per_file_metrics {
1090 metrics_set.merge_per_file_metrics(file_metrics);
1091 }
1092 }
1093
1094 pub(crate) fn on_finish(&self) {
1096 self.0.on_finish(true);
1097 }
1098
1099 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1101 let mut metrics_set = self.0.metrics.lock().unwrap();
1102 metrics_set.set_distributor_metrics(metrics);
1103 }
1104
1105 pub(crate) fn explain_verbose(&self) -> bool {
1107 self.0.explain_verbose
1108 }
1109
1110 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1112 self.0.clone()
1113 }
1114
1115 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1117 self.0.clone()
1118 }
1119
1120 #[allow(dead_code)]
1122 pub(crate) fn inc_range_cache_size(&self, size: usize) {
1123 let mut metrics = self.0.metrics.lock().unwrap();
1124 metrics.range_cache_size += size;
1125 }
1126
1127 #[allow(dead_code)]
1129 pub(crate) fn inc_range_cache_hit(&self) {
1130 let mut metrics = self.0.metrics.lock().unwrap();
1131 metrics.range_cache_hit += 1;
1132 }
1133
1134 #[allow(dead_code)]
1136 pub(crate) fn inc_range_cache_miss(&self) {
1137 let mut metrics = self.0.metrics.lock().unwrap();
1138 metrics.range_cache_miss += 1;
1139 }
1140}
1141
1142impl fmt::Debug for PartitionMetrics {
1143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1144 let metrics = self.0.metrics.lock().unwrap();
1145 write!(
1146 f,
1147 r#"{{"partition":{}, "metrics":{:?}}}"#,
1148 self.0.partition, metrics
1149 )
1150 }
1151}
1152
1153#[derive(Default)]
1155pub(crate) struct SeriesDistributorMetrics {
1156 pub(crate) num_series_send_timeout: usize,
1158 pub(crate) num_series_send_full: usize,
1160 pub(crate) num_rows: usize,
1162 pub(crate) num_batches: usize,
1164 pub(crate) scan_cost: Duration,
1166 pub(crate) yield_cost: Duration,
1168 pub(crate) divider_cost: Duration,
1170}
1171
1172#[tracing::instrument(
1174 skip_all,
1175 fields(
1176 region_id = %stream_ctx.input.region_metadata().region_id,
1177 file_or_mem_index = %index.index,
1178 row_group_index = %index.row_group_index,
1179 source = "mem"
1180 )
1181)]
1182pub(crate) fn scan_mem_ranges(
1183 stream_ctx: Arc<StreamContext>,
1184 part_metrics: PartitionMetrics,
1185 index: RowGroupIndex,
1186 time_range: FileTimeRange,
1187) -> impl Stream<Item = Result<Batch>> {
1188 try_stream! {
1189 let ranges = stream_ctx.input.build_mem_ranges(index);
1190 part_metrics.inc_num_mem_ranges(ranges.len());
1191 for range in ranges {
1192 let build_reader_start = Instant::now();
1193 let mem_scan_metrics = Some(MemScanMetrics::default());
1194 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1195 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1196
1197 let mut source = Source::Iter(iter);
1198 while let Some(batch) = source.next_batch().await? {
1199 yield batch;
1200 }
1201
1202 if let Some(ref metrics) = mem_scan_metrics {
1204 let data = metrics.data();
1205 part_metrics.report_mem_scan_metrics(&data);
1206 }
1207 }
1208 }
1209}
1210
1211#[tracing::instrument(
1213 skip_all,
1214 fields(
1215 region_id = %stream_ctx.input.region_metadata().region_id,
1216 row_group_index = %index.index,
1217 source = "mem_flat"
1218 )
1219)]
1220pub(crate) fn scan_flat_mem_ranges(
1221 stream_ctx: Arc<StreamContext>,
1222 part_metrics: PartitionMetrics,
1223 index: RowGroupIndex,
1224 time_range: FileTimeRange,
1225) -> impl Stream<Item = Result<RecordBatch>> {
1226 try_stream! {
1227 let ranges = stream_ctx.input.build_mem_ranges(index);
1228 part_metrics.inc_num_mem_ranges(ranges.len());
1229 for range in ranges {
1230 let build_reader_start = Instant::now();
1231 let mem_scan_metrics = Some(MemScanMetrics::default());
1232 let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1233 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1234
1235 while let Some(record_batch) = iter.next().transpose()? {
1236 yield record_batch;
1237 }
1238
1239 if let Some(ref metrics) = mem_scan_metrics {
1241 let data = metrics.data();
1242 part_metrics.report_mem_scan_metrics(&data);
1243 }
1244 }
1245 }
1246}
1247
1248const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1250const NUM_SERIES_THRESHOLD: u64 = 10240;
1252const BATCH_SIZE_THRESHOLD: u64 = 50;
1255
1256pub(crate) fn should_split_flat_batches_for_merge(
1258 stream_ctx: &Arc<StreamContext>,
1259 range_meta: &RangeMeta,
1260) -> bool {
1261 let mut num_files_to_split = 0;
1263 let mut num_mem_rows = 0;
1264 let mut num_mem_series = 0;
1265 for index in &range_meta.row_group_indices {
1269 if stream_ctx.is_mem_range_index(*index) {
1270 let memtable = &stream_ctx.input.memtables[index.index];
1271 let stats = memtable.stats();
1273 num_mem_rows += stats.num_rows();
1274 num_mem_series += stats.series_count();
1275 } else if stream_ctx.is_file_range_index(*index) {
1276 let file_index = index.index - stream_ctx.input.num_memtables();
1278 let file = &stream_ctx.input.files[file_index];
1279 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1280 continue;
1282 }
1283 debug_assert!(file.meta_ref().num_rows > 0);
1284 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1285 return false;
1287 } else {
1288 num_files_to_split += 1;
1289 }
1290 }
1291 }
1293
1294 if num_files_to_split > 0 {
1295 true
1297 } else if num_mem_series > 0 && num_mem_rows > 0 {
1298 can_split_series(num_mem_rows as u64, num_mem_series as u64)
1300 } else {
1301 false
1302 }
1303}
1304
1305fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1306 assert!(num_series > 0);
1307 assert!(num_rows > 0);
1308
1309 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1311}
1312
1313fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1316 if explain_verbose {
1317 ReaderFilterMetrics {
1318 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1319 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1320 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1321 ..Default::default()
1322 }
1323 } else {
1324 ReaderFilterMetrics::default()
1325 }
1326}
1327
1328#[tracing::instrument(
1330 skip_all,
1331 fields(
1332 region_id = %stream_ctx.input.region_metadata().region_id,
1333 row_group_index = %index.index,
1334 source = read_type
1335 )
1336)]
1337pub(crate) async fn scan_file_ranges(
1338 stream_ctx: Arc<StreamContext>,
1339 part_metrics: PartitionMetrics,
1340 index: RowGroupIndex,
1341 read_type: &'static str,
1342 partition_pruner: Arc<PartitionPruner>,
1343) -> Result<impl Stream<Item = Result<Batch>>> {
1344 let mut reader_metrics = ReaderMetrics {
1345 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1346 ..Default::default()
1347 };
1348 let ranges = partition_pruner
1349 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1350 .await?;
1351 part_metrics.inc_num_file_ranges(ranges.len());
1352 part_metrics.merge_reader_metrics(&reader_metrics, None);
1353
1354 let init_per_file_metrics = if part_metrics.explain_verbose() {
1356 let file = stream_ctx.input.file_from_index(index);
1357 let file_id = file.file_id();
1358
1359 let mut map = HashMap::new();
1360 map.insert(
1361 file_id,
1362 FileScanMetrics {
1363 build_part_cost: reader_metrics.build_cost,
1364 ..Default::default()
1365 },
1366 );
1367 Some(map)
1368 } else {
1369 None
1370 };
1371
1372 Ok(build_file_range_scan_stream(
1373 stream_ctx,
1374 part_metrics,
1375 read_type,
1376 ranges,
1377 init_per_file_metrics,
1378 ))
1379}
1380
1381#[tracing::instrument(
1383 skip_all,
1384 fields(
1385 region_id = %stream_ctx.input.region_metadata().region_id,
1386 row_group_index = %index.index,
1387 source = read_type
1388 )
1389)]
1390pub(crate) async fn scan_flat_file_ranges(
1391 stream_ctx: Arc<StreamContext>,
1392 part_metrics: PartitionMetrics,
1393 index: RowGroupIndex,
1394 read_type: &'static str,
1395 partition_pruner: Arc<PartitionPruner>,
1396) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1397 let mut reader_metrics = ReaderMetrics {
1398 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1399 ..Default::default()
1400 };
1401 let ranges = partition_pruner
1402 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1403 .await?;
1404 part_metrics.inc_num_file_ranges(ranges.len());
1405 part_metrics.merge_reader_metrics(&reader_metrics, None);
1406
1407 let init_per_file_metrics = if part_metrics.explain_verbose() {
1409 let file = stream_ctx.input.file_from_index(index);
1410 let file_id = file.file_id();
1411
1412 let mut map = HashMap::new();
1413 map.insert(
1414 file_id,
1415 FileScanMetrics {
1416 build_part_cost: reader_metrics.build_cost,
1417 ..Default::default()
1418 },
1419 );
1420 Some(map)
1421 } else {
1422 None
1423 };
1424
1425 Ok(build_flat_file_range_scan_stream(
1426 stream_ctx,
1427 part_metrics,
1428 read_type,
1429 ranges,
1430 init_per_file_metrics,
1431 ))
1432}
1433
1434#[tracing::instrument(
1436 skip_all,
1437 fields(read_type = read_type, range_count = ranges.len())
1438)]
1439pub fn build_file_range_scan_stream(
1440 stream_ctx: Arc<StreamContext>,
1441 part_metrics: PartitionMetrics,
1442 read_type: &'static str,
1443 ranges: SmallVec<[FileRange; 2]>,
1444 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1445) -> impl Stream<Item = Result<Batch>> {
1446 try_stream! {
1447 let fetch_metrics = if part_metrics.explain_verbose() {
1448 Some(Arc::new(ParquetFetchMetrics::default()))
1449 } else {
1450 None
1451 };
1452 let reader_metrics = &mut ReaderMetrics {
1453 fetch_metrics: fetch_metrics.clone(),
1454 ..Default::default()
1455 };
1456 for range in ranges {
1457 let build_reader_start = Instant::now();
1458 let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1459 continue;
1460 };
1461 let build_cost = build_reader_start.elapsed();
1462 part_metrics.inc_build_reader_cost(build_cost);
1463 let compat_batch = range.compat_batch();
1464 let mut source = Source::PruneReader(reader);
1465 while let Some(mut batch) = source.next_batch().await? {
1466 if let Some(compact_batch) = compat_batch {
1467 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1468 }
1469 yield batch;
1470 }
1471 if let Source::PruneReader(reader) = source {
1472 let prune_metrics = reader.metrics();
1473
1474 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1476 let file_id = range.file_handle().file_id();
1477 let file_metrics = file_metrics_map
1478 .entry(file_id)
1479 .or_insert_with(FileScanMetrics::default);
1480
1481 file_metrics.num_ranges += 1;
1482 file_metrics.num_rows += prune_metrics.num_rows;
1483 file_metrics.build_reader_cost += build_cost;
1484 file_metrics.scan_cost += prune_metrics.scan_cost;
1485 }
1486
1487 reader_metrics.merge_from(&prune_metrics);
1488 }
1489 }
1490
1491 reader_metrics.observe_rows(read_type);
1493 reader_metrics.filter_metrics.observe();
1494 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1495 }
1496}
1497
1498#[tracing::instrument(
1500 skip_all,
1501 fields(read_type = read_type, range_count = ranges.len())
1502)]
1503pub fn build_flat_file_range_scan_stream(
1504 _stream_ctx: Arc<StreamContext>,
1505 part_metrics: PartitionMetrics,
1506 read_type: &'static str,
1507 ranges: SmallVec<[FileRange; 2]>,
1508 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1509) -> impl Stream<Item = Result<RecordBatch>> {
1510 try_stream! {
1511 let fetch_metrics = if part_metrics.explain_verbose() {
1512 Some(Arc::new(ParquetFetchMetrics::default()))
1513 } else {
1514 None
1515 };
1516 let reader_metrics = &mut ReaderMetrics {
1517 fetch_metrics: fetch_metrics.clone(),
1518 ..Default::default()
1519 };
1520 for range in ranges {
1521 let build_reader_start = Instant::now();
1522 let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
1523 let build_cost = build_reader_start.elapsed();
1524 part_metrics.inc_build_reader_cost(build_cost);
1525
1526 let may_compat = range
1527 .compat_batch()
1528 .map(|compat| {
1529 compat.as_flat().context(UnexpectedSnafu {
1530 reason: "Invalid compat for flat format",
1531 })
1532 })
1533 .transpose()?;
1534
1535 let mapper = range.compaction_projection_mapper();
1536 while let Some(record_batch) = reader.next_batch().await? {
1537 let record_batch = if let Some(mapper) = mapper {
1538 let batch = mapper.project(record_batch)?;
1539 batch
1540 } else {
1541 record_batch
1542 };
1543
1544 if let Some(flat_compat) = may_compat {
1545 let batch = flat_compat.compat(record_batch)?;
1546 yield batch;
1547 } else {
1548 yield record_batch;
1549 }
1550 }
1551
1552 let prune_metrics = reader.metrics();
1553
1554 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1556 let file_id = range.file_handle().file_id();
1557 let file_metrics = file_metrics_map
1558 .entry(file_id)
1559 .or_insert_with(FileScanMetrics::default);
1560
1561 file_metrics.num_ranges += 1;
1562 file_metrics.num_rows += prune_metrics.num_rows;
1563 file_metrics.build_reader_cost += build_cost;
1564 file_metrics.scan_cost += prune_metrics.scan_cost;
1565 }
1566
1567 reader_metrics.merge_from(&prune_metrics);
1568 }
1569
1570 reader_metrics.observe_rows(read_type);
1572 reader_metrics.filter_metrics.observe();
1573 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1574 }
1575}
1576
1577#[cfg(feature = "enterprise")]
1579pub(crate) async fn scan_extension_range(
1580 context: Arc<StreamContext>,
1581 index: RowGroupIndex,
1582 partition_metrics: PartitionMetrics,
1583) -> Result<BoxedBatchStream> {
1584 use snafu::ResultExt;
1585
1586 let range = context.input.extension_range(index.index);
1587 let reader = range.reader(context.as_ref());
1588 let stream = reader
1589 .read(context, partition_metrics, index)
1590 .await
1591 .context(crate::error::ScanExternalRangeSnafu)?;
1592 Ok(stream)
1593}
1594
1595pub(crate) async fn maybe_scan_other_ranges(
1596 context: &Arc<StreamContext>,
1597 index: RowGroupIndex,
1598 metrics: &PartitionMetrics,
1599) -> Result<BoxedBatchStream> {
1600 #[cfg(feature = "enterprise")]
1601 {
1602 scan_extension_range(context.clone(), index, metrics.clone()).await
1603 }
1604
1605 #[cfg(not(feature = "enterprise"))]
1606 {
1607 let _ = context;
1608 let _ = index;
1609 let _ = metrics;
1610
1611 crate::error::UnexpectedSnafu {
1612 reason: "no other ranges scannable",
1613 }
1614 .fail()
1615 }
1616}
1617
1618#[cfg(feature = "enterprise")]
1620pub(crate) async fn scan_flat_extension_range(
1621 context: Arc<StreamContext>,
1622 index: RowGroupIndex,
1623 partition_metrics: PartitionMetrics,
1624) -> Result<BoxedRecordBatchStream> {
1625 use snafu::ResultExt;
1626
1627 let range = context.input.extension_range(index.index);
1628 let reader = range.flat_reader(context.as_ref());
1629 let stream = reader
1630 .read(context, partition_metrics, index)
1631 .await
1632 .context(crate::error::ScanExternalRangeSnafu)?;
1633 Ok(stream)
1634}
1635
1636pub(crate) async fn maybe_scan_flat_other_ranges(
1637 context: &Arc<StreamContext>,
1638 index: RowGroupIndex,
1639 metrics: &PartitionMetrics,
1640) -> Result<BoxedRecordBatchStream> {
1641 #[cfg(feature = "enterprise")]
1642 {
1643 scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1644 }
1645
1646 #[cfg(not(feature = "enterprise"))]
1647 {
1648 let _ = context;
1649 let _ = index;
1650 let _ = metrics;
1651
1652 crate::error::UnexpectedSnafu {
1653 reason: "no other ranges scannable in flat format",
1654 }
1655 .fail()
1656 }
1657}
1658
1659pub(crate) struct SplitRecordBatchStream<S> {
1661 inner: S,
1663 batches: VecDeque<RecordBatch>,
1665}
1666
1667impl<S> SplitRecordBatchStream<S> {
1668 pub(crate) fn new(inner: S) -> Self {
1670 Self {
1671 inner,
1672 batches: VecDeque::new(),
1673 }
1674 }
1675}
1676
1677impl<S> Stream for SplitRecordBatchStream<S>
1678where
1679 S: Stream<Item = Result<RecordBatch>> + Unpin,
1680{
1681 type Item = Result<RecordBatch>;
1682
1683 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1684 loop {
1685 if let Some(batch) = self.batches.pop_front() {
1687 return Poll::Ready(Some(Ok(batch)));
1688 }
1689
1690 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1692 Some(Ok(batch)) => batch,
1693 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1694 None => return Poll::Ready(None),
1695 };
1696
1697 split_record_batch(record_batch, &mut self.batches);
1699 }
1701 }
1702}
1703
1704pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1709 let batch_rows = record_batch.num_rows();
1710 if batch_rows == 0 {
1711 return;
1712 }
1713 if batch_rows < 2 {
1714 batches.push_back(record_batch);
1715 return;
1716 }
1717
1718 let time_index_pos = time_index_column_index(record_batch.num_columns());
1719 let timestamps = record_batch.column(time_index_pos);
1720 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1721 let mut offsets = Vec::with_capacity(16);
1722 offsets.push(0);
1723 let values = ts_values.values();
1724 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1725 if value > values[i + 1] {
1726 offsets.push(i + 1);
1727 }
1728 }
1729 offsets.push(values.len());
1730
1731 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1733 let end = offsets[i + 1];
1734 let rows_in_batch = end - start;
1735 batches.push_back(record_batch.slice(start, rows_in_batch));
1736 }
1737}