1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
56
57#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60 pub num_ranges: usize,
62 pub num_rows: usize,
64 pub build_part_cost: Duration,
66 pub build_reader_cost: Duration,
68 pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76 if self.num_ranges > 0 {
77 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78 }
79 if self.num_rows > 0 {
80 write!(f, ", \"num_rows\":{}", self.num_rows)?;
81 }
82 if !self.build_reader_cost.is_zero() {
83 write!(
84 f,
85 ", \"build_reader_cost\":\"{:?}\"",
86 self.build_reader_cost
87 )?;
88 }
89 if !self.scan_cost.is_zero() {
90 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91 }
92
93 write!(f, "}}")
94 }
95}
96
97impl FileScanMetrics {
98 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100 self.num_ranges += other.num_ranges;
101 self.num_rows += other.num_rows;
102 self.build_part_cost += other.build_part_cost;
103 self.build_reader_cost += other.build_reader_cost;
104 self.scan_cost += other.scan_cost;
105 }
106}
107
108#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111 prepare_scan_cost: Duration,
113 build_reader_cost: Duration,
115 scan_cost: Duration,
117 yield_cost: Duration,
119 convert_cost: Option<Time>,
121 total_cost: Duration,
123 num_rows: usize,
125 num_batches: usize,
127 num_mem_ranges: usize,
129 num_file_ranges: usize,
131
132 mem_scan_cost: Duration,
135 mem_rows: usize,
137 mem_batches: usize,
139 mem_series: usize,
141 mem_prefilter_cost: Duration,
143 mem_prefilter_rows_filtered: usize,
145
146 build_parts_cost: Duration,
149 sst_scan_cost: Duration,
151 rg_total: usize,
153 rg_fulltext_filtered: usize,
155 rg_inverted_filtered: usize,
157 rg_minmax_filtered: usize,
159 rg_bloom_filtered: usize,
161 rg_vector_filtered: usize,
163 rows_before_filter: usize,
165 rows_fulltext_filtered: usize,
167 rows_inverted_filtered: usize,
169 rows_bloom_filtered: usize,
171 rows_vector_filtered: usize,
173 rows_vector_selected: usize,
175 rows_precise_filtered: usize,
177 fulltext_index_cache_hit: usize,
179 fulltext_index_cache_miss: usize,
181 inverted_index_cache_hit: usize,
183 inverted_index_cache_miss: usize,
185 bloom_filter_cache_hit: usize,
187 bloom_filter_cache_miss: usize,
189 minmax_cache_hit: usize,
191 minmax_cache_miss: usize,
193 pruner_cache_hit: usize,
195 pruner_cache_miss: usize,
197 pruner_prune_cost: Duration,
199 num_sst_record_batches: usize,
201 num_sst_batches: usize,
203 num_sst_rows: usize,
205
206 first_poll: Duration,
208
209 num_series_send_timeout: usize,
211 num_series_send_full: usize,
213 num_distributor_rows: usize,
215 num_distributor_batches: usize,
217 distributor_scan_cost: Duration,
219 distributor_yield_cost: Duration,
221 distributor_divider_cost: Duration,
223
224 merge_metrics: MergeMetrics,
226 dedup_metrics: DedupMetrics,
228
229 stream_eof: bool,
231
232 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
235 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
237 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
239 fetch_metrics: Option<ParquetFetchMetrics>,
241 metadata_cache_metrics: Option<MetadataCacheMetrics>,
243 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
245
246 build_ranges_mem_size: isize,
248 build_ranges_peak_mem_size: isize,
250 num_range_builders: isize,
252 num_peak_range_builders: isize,
254 range_cache_size: usize,
256 range_cache_hit: usize,
258 range_cache_miss: usize,
260}
261
262struct CompareCostReverse<'a> {
265 total_cost: Duration,
266 file_id: RegionFileId,
267 metrics: &'a FileScanMetrics,
268}
269
270impl Ord for CompareCostReverse<'_> {
271 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
272 other.total_cost.cmp(&self.total_cost)
274 }
275}
276
277impl PartialOrd for CompareCostReverse<'_> {
278 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
279 Some(self.cmp(other))
280 }
281}
282
283impl Eq for CompareCostReverse<'_> {}
284
285impl PartialEq for CompareCostReverse<'_> {
286 fn eq(&self, other: &Self) -> bool {
287 self.total_cost == other.total_cost
288 }
289}
290
291impl fmt::Debug for ScanMetricsSet {
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 let ScanMetricsSet {
294 prepare_scan_cost,
295 build_reader_cost,
296 scan_cost,
297 yield_cost,
298 convert_cost,
299 total_cost,
300 num_rows,
301 num_batches,
302 num_mem_ranges,
303 num_file_ranges,
304 build_parts_cost,
305 sst_scan_cost,
306 rg_total,
307 rg_fulltext_filtered,
308 rg_inverted_filtered,
309 rg_minmax_filtered,
310 rg_bloom_filtered,
311 rg_vector_filtered,
312 rows_before_filter,
313 rows_fulltext_filtered,
314 rows_inverted_filtered,
315 rows_bloom_filtered,
316 rows_vector_filtered,
317 rows_vector_selected,
318 rows_precise_filtered,
319 fulltext_index_cache_hit,
320 fulltext_index_cache_miss,
321 inverted_index_cache_hit,
322 inverted_index_cache_miss,
323 bloom_filter_cache_hit,
324 bloom_filter_cache_miss,
325 minmax_cache_hit,
326 minmax_cache_miss,
327 pruner_cache_hit,
328 pruner_cache_miss,
329 pruner_prune_cost,
330 num_sst_record_batches,
331 num_sst_batches,
332 num_sst_rows,
333 first_poll,
334 num_series_send_timeout,
335 num_series_send_full,
336 num_distributor_rows,
337 num_distributor_batches,
338 distributor_scan_cost,
339 distributor_yield_cost,
340 distributor_divider_cost,
341 merge_metrics,
342 dedup_metrics,
343 stream_eof,
344 mem_scan_cost,
345 mem_rows,
346 mem_batches,
347 mem_series,
348 mem_prefilter_cost,
349 mem_prefilter_rows_filtered,
350 inverted_index_apply_metrics,
351 bloom_filter_apply_metrics,
352 fulltext_index_apply_metrics,
353 fetch_metrics,
354 metadata_cache_metrics,
355 per_file_metrics,
356 build_ranges_mem_size: _,
357 build_ranges_peak_mem_size,
358 num_range_builders: _,
359 num_peak_range_builders,
360 range_cache_size,
361 range_cache_hit,
362 range_cache_miss,
363 } = self;
364
365 write!(
367 f,
368 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
369 \"build_reader_cost\":\"{build_reader_cost:?}\", \
370 \"scan_cost\":\"{scan_cost:?}\", \
371 \"yield_cost\":\"{yield_cost:?}\", \
372 \"total_cost\":\"{total_cost:?}\", \
373 \"num_rows\":{num_rows}, \
374 \"num_batches\":{num_batches}, \
375 \"num_mem_ranges\":{num_mem_ranges}, \
376 \"num_file_ranges\":{num_file_ranges}, \
377 \"build_parts_cost\":\"{build_parts_cost:?}\", \
378 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
379 \"rg_total\":{rg_total}, \
380 \"rows_before_filter\":{rows_before_filter}, \
381 \"num_sst_record_batches\":{num_sst_record_batches}, \
382 \"num_sst_batches\":{num_sst_batches}, \
383 \"num_sst_rows\":{num_sst_rows}, \
384 \"first_poll\":\"{first_poll:?}\""
385 )?;
386
387 if let Some(time) = convert_cost {
389 let duration = Duration::from_nanos(time.value() as u64);
390 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
391 }
392
393 if *rg_fulltext_filtered > 0 {
395 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
396 }
397 if *rg_inverted_filtered > 0 {
398 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
399 }
400 if *rg_minmax_filtered > 0 {
401 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
402 }
403 if *rg_bloom_filtered > 0 {
404 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
405 }
406 if *rg_vector_filtered > 0 {
407 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
408 }
409 if *rows_fulltext_filtered > 0 {
410 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
411 }
412 if *rows_inverted_filtered > 0 {
413 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
414 }
415 if *rows_bloom_filtered > 0 {
416 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
417 }
418 if *rows_vector_filtered > 0 {
419 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
420 }
421 if *rows_vector_selected > 0 {
422 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
423 }
424 if *rows_precise_filtered > 0 {
425 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
426 }
427 if *fulltext_index_cache_hit > 0 {
428 write!(
429 f,
430 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
431 )?;
432 }
433 if *fulltext_index_cache_miss > 0 {
434 write!(
435 f,
436 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
437 )?;
438 }
439 if *inverted_index_cache_hit > 0 {
440 write!(
441 f,
442 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
443 )?;
444 }
445 if *inverted_index_cache_miss > 0 {
446 write!(
447 f,
448 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
449 )?;
450 }
451 if *bloom_filter_cache_hit > 0 {
452 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
453 }
454 if *bloom_filter_cache_miss > 0 {
455 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
456 }
457 if *minmax_cache_hit > 0 {
458 write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
459 }
460 if *minmax_cache_miss > 0 {
461 write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
462 }
463 if *pruner_cache_hit > 0 {
464 write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
465 }
466 if *pruner_cache_miss > 0 {
467 write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
468 }
469 if !pruner_prune_cost.is_zero() {
470 write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
471 }
472
473 if *num_series_send_timeout > 0 {
475 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
476 }
477 if *num_series_send_full > 0 {
478 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
479 }
480 if *num_distributor_rows > 0 {
481 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
482 }
483 if *num_distributor_batches > 0 {
484 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
485 }
486 if !distributor_scan_cost.is_zero() {
487 write!(
488 f,
489 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
490 )?;
491 }
492 if !distributor_yield_cost.is_zero() {
493 write!(
494 f,
495 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
496 )?;
497 }
498 if !distributor_divider_cost.is_zero() {
499 write!(
500 f,
501 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
502 )?;
503 }
504
505 if *mem_rows > 0 {
507 write!(f, ", \"mem_rows\":{mem_rows}")?;
508 }
509 if *mem_batches > 0 {
510 write!(f, ", \"mem_batches\":{mem_batches}")?;
511 }
512 if *mem_series > 0 {
513 write!(f, ", \"mem_series\":{mem_series}")?;
514 }
515 if !mem_scan_cost.is_zero() {
516 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
517 }
518 if !mem_prefilter_cost.is_zero() {
519 write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
520 }
521 if *mem_prefilter_rows_filtered > 0 {
522 write!(
523 f,
524 ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
525 )?;
526 }
527
528 if let Some(metrics) = inverted_index_apply_metrics
530 && !metrics.is_empty()
531 {
532 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
533 }
534 if let Some(metrics) = bloom_filter_apply_metrics
535 && !metrics.is_empty()
536 {
537 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
538 }
539 if let Some(metrics) = fulltext_index_apply_metrics
540 && !metrics.is_empty()
541 {
542 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
543 }
544 if let Some(metrics) = fetch_metrics
545 && !metrics.is_empty()
546 {
547 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
548 }
549 if let Some(metrics) = metadata_cache_metrics
550 && !metrics.is_empty()
551 {
552 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
553 }
554
555 if !merge_metrics.scan_cost.is_zero() {
557 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
558 }
559
560 if !dedup_metrics.dedup_cost.is_zero() {
562 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
563 }
564
565 if let Some(file_metrics) = per_file_metrics
567 && !file_metrics.is_empty()
568 {
569 let mut heap = BinaryHeap::new();
571 for (file_id, metrics) in file_metrics.iter() {
572 let total_cost =
573 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
574
575 if total_cost.is_zero() && metrics.num_ranges == 0 {
578 continue;
579 }
580
581 if heap.len() < 10 {
582 heap.push(CompareCostReverse {
584 total_cost,
585 file_id: *file_id,
586 metrics,
587 });
588 } else if let Some(min_entry) = heap.peek() {
589 if total_cost > min_entry.total_cost {
591 heap.pop();
592 heap.push(CompareCostReverse {
593 total_cost,
594 file_id: *file_id,
595 metrics,
596 });
597 }
598 }
599 }
600
601 let top_files = heap.into_sorted_vec();
602 write!(f, ", \"top_file_metrics\": {{")?;
603 for (i, item) in top_files.iter().enumerate() {
604 let CompareCostReverse {
605 total_cost: _,
606 file_id,
607 metrics,
608 } = item;
609 if i > 0 {
610 write!(f, ", ")?;
611 }
612 write!(f, "\"{}\": {:?}", file_id, metrics)?;
613 }
614 write!(f, "}}")?;
615 }
616
617 if *range_cache_size > 0 {
618 write!(f, ", \"range_cache_size\":{range_cache_size}")?;
619 }
620 if *range_cache_hit > 0 {
621 write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
622 }
623 if *range_cache_miss > 0 {
624 write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
625 }
626
627 write!(
628 f,
629 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
630 \"num_peak_range_builders\":{num_peak_range_builders}, \
631 \"stream_eof\":{stream_eof}}}"
632 )
633 }
634}
635impl ScanMetricsSet {
636 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
638 self.prepare_scan_cost += cost;
639 self
640 }
641
642 fn with_convert_cost(mut self, time: Time) -> Self {
644 self.convert_cost = Some(time);
645 self
646 }
647
648 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
650 let ScannerMetrics {
651 scan_cost,
652 yield_cost,
653 num_batches,
654 num_rows,
655 } = other;
656
657 self.scan_cost += *scan_cost;
658 self.yield_cost += *yield_cost;
659 self.num_rows += *num_rows;
660 self.num_batches += *num_batches;
661 }
662
663 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
665 let ReaderMetrics {
666 build_cost,
667 filter_metrics:
668 ReaderFilterMetrics {
669 rg_total,
670 rg_fulltext_filtered,
671 rg_inverted_filtered,
672 rg_minmax_filtered,
673 rg_bloom_filtered,
674 rg_vector_filtered,
675 rows_total,
676 rows_fulltext_filtered,
677 rows_inverted_filtered,
678 rows_bloom_filtered,
679 rows_vector_filtered,
680 rows_vector_selected,
681 rows_precise_filtered,
682 fulltext_index_cache_hit,
683 fulltext_index_cache_miss,
684 inverted_index_cache_hit,
685 inverted_index_cache_miss,
686 bloom_filter_cache_hit,
687 bloom_filter_cache_miss,
688 minmax_cache_hit,
689 minmax_cache_miss,
690 pruner_cache_hit,
691 pruner_cache_miss,
692 pruner_prune_cost,
693 inverted_index_apply_metrics,
694 bloom_filter_apply_metrics,
695 fulltext_index_apply_metrics,
696 },
697 num_record_batches,
698 num_batches,
699 num_rows,
700 scan_cost,
701 metadata_cache_metrics,
702 fetch_metrics,
703 metadata_mem_size,
704 num_range_builders,
705 } = other;
706
707 self.build_parts_cost += *build_cost;
708 self.sst_scan_cost += *scan_cost;
709
710 self.rg_total += *rg_total;
711 self.rg_fulltext_filtered += *rg_fulltext_filtered;
712 self.rg_inverted_filtered += *rg_inverted_filtered;
713 self.rg_minmax_filtered += *rg_minmax_filtered;
714 self.rg_bloom_filtered += *rg_bloom_filtered;
715 self.rg_vector_filtered += *rg_vector_filtered;
716
717 self.rows_before_filter += *rows_total;
718 self.rows_fulltext_filtered += *rows_fulltext_filtered;
719 self.rows_inverted_filtered += *rows_inverted_filtered;
720 self.rows_bloom_filtered += *rows_bloom_filtered;
721 self.rows_vector_filtered += *rows_vector_filtered;
722 self.rows_vector_selected += *rows_vector_selected;
723 self.rows_precise_filtered += *rows_precise_filtered;
724
725 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
726 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
727 self.inverted_index_cache_hit += *inverted_index_cache_hit;
728 self.inverted_index_cache_miss += *inverted_index_cache_miss;
729 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
730 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
731 self.minmax_cache_hit += *minmax_cache_hit;
732 self.minmax_cache_miss += *minmax_cache_miss;
733 self.pruner_cache_hit += *pruner_cache_hit;
734 self.pruner_cache_miss += *pruner_cache_miss;
735 self.pruner_prune_cost += *pruner_prune_cost;
736
737 self.num_sst_record_batches += *num_record_batches;
738 self.num_sst_batches += *num_batches;
739 self.num_sst_rows += *num_rows;
740
741 if let Some(metrics) = inverted_index_apply_metrics {
743 self.inverted_index_apply_metrics
744 .get_or_insert_with(InvertedIndexApplyMetrics::default)
745 .merge_from(metrics);
746 }
747 if let Some(metrics) = bloom_filter_apply_metrics {
748 self.bloom_filter_apply_metrics
749 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
750 .merge_from(metrics);
751 }
752 if let Some(metrics) = fulltext_index_apply_metrics {
753 self.fulltext_index_apply_metrics
754 .get_or_insert_with(FulltextIndexApplyMetrics::default)
755 .merge_from(metrics);
756 }
757 if let Some(metrics) = fetch_metrics {
758 self.fetch_metrics
759 .get_or_insert_with(ParquetFetchMetrics::default)
760 .merge_from(metrics);
761 }
762 self.metadata_cache_metrics
763 .get_or_insert_with(MetadataCacheMetrics::default)
764 .merge_from(metadata_cache_metrics);
765
766 self.build_ranges_mem_size += *metadata_mem_size;
768 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
769 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
770 }
771
772 self.num_range_builders += *num_range_builders;
774 if self.num_range_builders > self.num_peak_range_builders {
775 self.num_peak_range_builders = self.num_range_builders;
776 }
777 }
778
779 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
781 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
782 for (file_id, metrics) in other {
783 self_file_metrics
784 .entry(*file_id)
785 .or_default()
786 .merge_from(metrics);
787 }
788 }
789
790 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
792 let SeriesDistributorMetrics {
793 num_series_send_timeout,
794 num_series_send_full,
795 num_rows,
796 num_batches,
797 scan_cost,
798 yield_cost,
799 divider_cost,
800 } = distributor_metrics;
801
802 self.num_series_send_timeout += *num_series_send_timeout;
803 self.num_series_send_full += *num_series_send_full;
804 self.num_distributor_rows += *num_rows;
805 self.num_distributor_batches += *num_batches;
806 self.distributor_scan_cost += *scan_cost;
807 self.distributor_yield_cost += *yield_cost;
808 self.distributor_divider_cost += *divider_cost;
809 }
810
811 fn observe_metrics(&self) {
813 READ_STAGE_ELAPSED
814 .with_label_values(&["prepare_scan"])
815 .observe(self.prepare_scan_cost.as_secs_f64());
816 READ_STAGE_ELAPSED
817 .with_label_values(&["build_reader"])
818 .observe(self.build_reader_cost.as_secs_f64());
819 READ_STAGE_ELAPSED
820 .with_label_values(&["scan"])
821 .observe(self.scan_cost.as_secs_f64());
822 READ_STAGE_ELAPSED
823 .with_label_values(&["yield"])
824 .observe(self.yield_cost.as_secs_f64());
825 if let Some(time) = &self.convert_cost {
826 READ_STAGE_ELAPSED
827 .with_label_values(&["convert"])
828 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
829 }
830 READ_STAGE_ELAPSED
831 .with_label_values(&["total"])
832 .observe(self.total_cost.as_secs_f64());
833 READ_ROWS_RETURN.observe(self.num_rows as f64);
834 READ_BATCHES_RETURN.observe(self.num_batches as f64);
835
836 READ_STAGE_ELAPSED
837 .with_label_values(&["build_parts"])
838 .observe(self.build_parts_cost.as_secs_f64());
839
840 READ_ROW_GROUPS_TOTAL
841 .with_label_values(&["before_filtering"])
842 .inc_by(self.rg_total as u64);
843 READ_ROW_GROUPS_TOTAL
844 .with_label_values(&["fulltext_index_filtered"])
845 .inc_by(self.rg_fulltext_filtered as u64);
846 READ_ROW_GROUPS_TOTAL
847 .with_label_values(&["inverted_index_filtered"])
848 .inc_by(self.rg_inverted_filtered as u64);
849 READ_ROW_GROUPS_TOTAL
850 .with_label_values(&["minmax_index_filtered"])
851 .inc_by(self.rg_minmax_filtered as u64);
852 READ_ROW_GROUPS_TOTAL
853 .with_label_values(&["bloom_filter_index_filtered"])
854 .inc_by(self.rg_bloom_filtered as u64);
855 #[cfg(feature = "vector_index")]
856 READ_ROW_GROUPS_TOTAL
857 .with_label_values(&["vector_index_filtered"])
858 .inc_by(self.rg_vector_filtered as u64);
859
860 PRECISE_FILTER_ROWS_TOTAL
861 .with_label_values(&["parquet"])
862 .inc_by(self.rows_precise_filtered as u64);
863 READ_ROWS_IN_ROW_GROUP_TOTAL
864 .with_label_values(&["before_filtering"])
865 .inc_by(self.rows_before_filter as u64);
866 READ_ROWS_IN_ROW_GROUP_TOTAL
867 .with_label_values(&["fulltext_index_filtered"])
868 .inc_by(self.rows_fulltext_filtered as u64);
869 READ_ROWS_IN_ROW_GROUP_TOTAL
870 .with_label_values(&["inverted_index_filtered"])
871 .inc_by(self.rows_inverted_filtered as u64);
872 READ_ROWS_IN_ROW_GROUP_TOTAL
873 .with_label_values(&["bloom_filter_index_filtered"])
874 .inc_by(self.rows_bloom_filtered as u64);
875 #[cfg(feature = "vector_index")]
876 READ_ROWS_IN_ROW_GROUP_TOTAL
877 .with_label_values(&["vector_index_filtered"])
878 .inc_by(self.rows_vector_filtered as u64);
879 }
880}
881
882struct PartitionMetricsInner {
883 region_id: RegionId,
884 partition: usize,
886 scanner_type: &'static str,
888 query_start: Instant,
890 explain_verbose: bool,
892 metrics: Mutex<ScanMetricsSet>,
894 in_progress_scan: IntGauge,
895
896 build_parts_cost: Time,
899 build_reader_cost: Time,
901 scan_cost: Time,
903 yield_cost: Time,
905 convert_cost: Time,
907 elapsed_compute: Time,
909}
910
911impl PartitionMetricsInner {
912 fn on_finish(&self, stream_eof: bool) {
913 let mut metrics = self.metrics.lock().unwrap();
914 if metrics.total_cost.is_zero() {
915 metrics.total_cost = self.query_start.elapsed();
916 }
917 if !metrics.stream_eof {
918 metrics.stream_eof = stream_eof;
919 }
920 }
921}
922
923impl MergeMetricsReport for PartitionMetricsInner {
924 fn report(&self, metrics: &mut MergeMetrics) {
925 let mut scan_metrics = self.metrics.lock().unwrap();
926 scan_metrics.merge_metrics.merge(metrics);
928
929 *metrics = MergeMetrics::default();
931 }
932}
933
934impl DedupMetricsReport for PartitionMetricsInner {
935 fn report(&self, metrics: &mut DedupMetrics) {
936 let mut scan_metrics = self.metrics.lock().unwrap();
937 scan_metrics.dedup_metrics.merge(metrics);
939
940 *metrics = DedupMetrics::default();
942 }
943}
944
945impl Drop for PartitionMetricsInner {
946 fn drop(&mut self) {
947 self.on_finish(false);
948 let metrics = self.metrics.lock().unwrap();
949 metrics.observe_metrics();
950 self.in_progress_scan.dec();
951
952 if self.explain_verbose {
953 common_telemetry::info!(
954 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
955 self.scanner_type,
956 self.region_id,
957 self.partition,
958 metrics,
959 );
960 } else {
961 common_telemetry::debug!(
962 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
963 self.scanner_type,
964 self.region_id,
965 self.partition,
966 metrics,
967 );
968 }
969 }
970}
971
972#[derive(Default)]
974pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
975
976impl PartitionMetricsList {
977 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
979 let mut list = self.0.lock().unwrap();
980 if list.len() <= partition {
981 list.resize(partition + 1, None);
982 }
983 list[partition] = Some(metrics);
984 }
985
986 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
988 let list = self.0.lock().unwrap();
989 write!(f, ", \"metrics_per_partition\": ")?;
990 f.debug_list()
991 .entries(list.iter().filter_map(|p| p.as_ref()))
992 .finish()?;
993 write!(f, "}}")
994 }
995}
996
997#[derive(Clone)]
999pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
1000
1001impl PartitionMetrics {
1002 pub(crate) fn new(
1003 region_id: RegionId,
1004 partition: usize,
1005 scanner_type: &'static str,
1006 query_start: Instant,
1007 explain_verbose: bool,
1008 metrics_set: &ExecutionPlanMetricsSet,
1009 ) -> Self {
1010 let partition_str = partition.to_string();
1011 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1012 in_progress_scan.inc();
1013 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1014 let metrics = ScanMetricsSet::default()
1015 .with_prepare_scan_cost(query_start.elapsed())
1016 .with_convert_cost(convert_cost.clone());
1017 let inner = PartitionMetricsInner {
1018 region_id,
1019 partition,
1020 scanner_type,
1021 query_start,
1022 explain_verbose,
1023 metrics: Mutex::new(metrics),
1024 in_progress_scan,
1025 build_parts_cost: MetricBuilder::new(metrics_set)
1026 .subset_time("build_parts_cost", partition),
1027 build_reader_cost: MetricBuilder::new(metrics_set)
1028 .subset_time("build_reader_cost", partition),
1029 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1030 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1031 convert_cost,
1032 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1033 };
1034 Self(Arc::new(inner))
1035 }
1036
1037 pub(crate) fn on_first_poll(&self) {
1038 let mut metrics = self.0.metrics.lock().unwrap();
1039 metrics.first_poll = self.0.query_start.elapsed();
1040 }
1041
1042 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1043 let mut metrics = self.0.metrics.lock().unwrap();
1044 metrics.num_mem_ranges += num;
1045 }
1046
1047 pub fn inc_num_file_ranges(&self, num: usize) {
1048 let mut metrics = self.0.metrics.lock().unwrap();
1049 metrics.num_file_ranges += num;
1050 }
1051
1052 fn record_elapsed_compute(&self, duration: Duration) {
1053 if duration.is_zero() {
1054 return;
1055 }
1056 self.0.elapsed_compute.add_duration(duration);
1057 }
1058
1059 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1061 self.0.build_reader_cost.add_duration(cost);
1062
1063 let mut metrics = self.0.metrics.lock().unwrap();
1064 metrics.build_reader_cost += cost;
1065 }
1066
1067 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1068 self.0.convert_cost.add_duration(cost);
1069 self.record_elapsed_compute(cost);
1070 }
1071
1072 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1074 let mut metrics = self.0.metrics.lock().unwrap();
1075 metrics.mem_scan_cost += data.scan_cost;
1076 metrics.mem_rows += data.num_rows;
1077 metrics.mem_batches += data.num_batches;
1078 metrics.mem_series += data.total_series;
1079 metrics.mem_prefilter_cost += data.prefilter_cost;
1080 metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1081 }
1082
1083 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1085 self.0.scan_cost.add_duration(metrics.scan_cost);
1086 self.record_elapsed_compute(metrics.scan_cost);
1087 self.0.yield_cost.add_duration(metrics.yield_cost);
1088 self.record_elapsed_compute(metrics.yield_cost);
1089
1090 let mut metrics_set = self.0.metrics.lock().unwrap();
1091 metrics_set.merge_scanner_metrics(metrics);
1092 }
1093
1094 pub fn merge_reader_metrics(
1096 &self,
1097 metrics: &ReaderMetrics,
1098 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1099 ) {
1100 self.0.build_parts_cost.add_duration(metrics.build_cost);
1101
1102 let mut metrics_set = self.0.metrics.lock().unwrap();
1103 metrics_set.merge_reader_metrics(metrics);
1104
1105 if let Some(file_metrics) = per_file_metrics {
1107 metrics_set.merge_per_file_metrics(file_metrics);
1108 }
1109 }
1110
1111 pub(crate) fn on_finish(&self) {
1113 self.0.on_finish(true);
1114 }
1115
1116 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1118 let mut metrics_set = self.0.metrics.lock().unwrap();
1119 metrics_set.set_distributor_metrics(metrics);
1120 }
1121
1122 pub(crate) fn explain_verbose(&self) -> bool {
1124 self.0.explain_verbose
1125 }
1126
1127 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1129 self.0.clone()
1130 }
1131
1132 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1134 self.0.clone()
1135 }
1136
1137 #[allow(dead_code)]
1139 pub(crate) fn inc_range_cache_size(&self, size: usize) {
1140 let mut metrics = self.0.metrics.lock().unwrap();
1141 metrics.range_cache_size += size;
1142 }
1143
1144 #[allow(dead_code)]
1146 pub(crate) fn inc_range_cache_hit(&self) {
1147 let mut metrics = self.0.metrics.lock().unwrap();
1148 metrics.range_cache_hit += 1;
1149 }
1150
1151 #[allow(dead_code)]
1153 pub(crate) fn inc_range_cache_miss(&self) {
1154 let mut metrics = self.0.metrics.lock().unwrap();
1155 metrics.range_cache_miss += 1;
1156 }
1157}
1158
1159impl fmt::Debug for PartitionMetrics {
1160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1161 let metrics = self.0.metrics.lock().unwrap();
1162 write!(
1163 f,
1164 r#"{{"partition":{}, "metrics":{:?}}}"#,
1165 self.0.partition, metrics
1166 )
1167 }
1168}
1169
1170#[derive(Default)]
1172pub(crate) struct SeriesDistributorMetrics {
1173 pub(crate) num_series_send_timeout: usize,
1175 pub(crate) num_series_send_full: usize,
1177 pub(crate) num_rows: usize,
1179 pub(crate) num_batches: usize,
1181 pub(crate) scan_cost: Duration,
1183 pub(crate) yield_cost: Duration,
1185 pub(crate) divider_cost: Duration,
1187}
1188
1189#[tracing::instrument(
1191 skip_all,
1192 fields(
1193 region_id = %stream_ctx.input.region_metadata().region_id,
1194 row_group_index = %index.index,
1195 source = "mem_flat"
1196 )
1197)]
1198pub(crate) fn scan_flat_mem_ranges(
1199 stream_ctx: Arc<StreamContext>,
1200 part_metrics: PartitionMetrics,
1201 index: RowGroupIndex,
1202 time_range: FileTimeRange,
1203) -> impl Stream<Item = Result<RecordBatch>> {
1204 try_stream! {
1205 let ranges = stream_ctx.input.build_mem_ranges(index);
1206 part_metrics.inc_num_mem_ranges(ranges.len());
1207 for range in ranges {
1208 let build_reader_start = Instant::now();
1209 let mem_scan_metrics = Some(MemScanMetrics::default());
1210 let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1211 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1212
1213 while let Some(record_batch) = iter.next().transpose()? {
1214 yield record_batch;
1215 }
1216
1217 if let Some(ref metrics) = mem_scan_metrics {
1219 let data = metrics.data();
1220 part_metrics.report_mem_scan_metrics(&data);
1221 }
1222 }
1223 }
1224}
1225
1226const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1228const NUM_SERIES_THRESHOLD: u64 = 10240;
1230const BATCH_SIZE_THRESHOLD: u64 = 50;
1233
1234pub(crate) fn should_split_flat_batches_for_merge(
1237 stream_ctx: &Arc<StreamContext>,
1238 range_meta: &RangeMeta,
1239) -> Option<usize> {
1240 let mut num_files_to_split = 0;
1242 let mut num_mem_rows = 0;
1243 let mut num_mem_series = 0;
1244 let mut total_rows: u64 = 0;
1246 let mut total_series: u64 = 0;
1247 for index in &range_meta.row_group_indices {
1251 if stream_ctx.is_mem_range_index(*index) {
1252 let memtable = &stream_ctx.input.memtables[index.index];
1253 let stats = memtable.stats();
1255 num_mem_rows += stats.num_rows();
1256 num_mem_series += stats.series_count();
1257 } else if stream_ctx.is_file_range_index(*index) {
1258 let file_index = index.index - stream_ctx.input.num_memtables();
1260 let file = &stream_ctx.input.files[file_index];
1261 let file_meta = file.meta_ref();
1262 if file_meta.level == 0 {
1263 num_files_to_split += 1;
1265 continue;
1266 } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1267 continue;
1269 }
1270 debug_assert!(file_meta.num_rows > 0);
1271 if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1272 common_telemetry::trace!(
1274 "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1275 file_meta.file_id,
1276 file_meta.level,
1277 file_meta.num_rows,
1278 file_meta.num_series,
1279 );
1280 return None;
1281 } else {
1282 num_files_to_split += 1;
1283 total_rows += file.meta_ref().num_rows;
1284 total_series += file.meta_ref().num_series;
1285 }
1286 }
1287 }
1289
1290 let should_split = if num_files_to_split > 0 {
1291 true
1293 } else if num_mem_series > 0
1294 && num_mem_rows > 0
1295 && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1296 {
1297 total_rows += num_mem_rows as u64;
1298 total_series += num_mem_series as u64;
1299 true
1300 } else {
1301 false
1302 };
1303
1304 if !should_split {
1305 return None;
1306 }
1307
1308 let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1310 ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1311 } else {
1312 DEFAULT_READ_BATCH_SIZE / 4
1314 };
1315 Some(estimated_batch_size)
1316}
1317
1318pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1321 let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1322 size.clamp(2, 64)
1323}
1324
1325pub(crate) fn compute_average_batch_size(
1327 estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1328) -> usize {
1329 let mut total = 0usize;
1330 let mut count = 0usize;
1331 for size in estimated_rows_per_batch {
1332 total += size;
1333 count += 1;
1334 }
1335
1336 if count == 0 {
1337 return DEFAULT_READ_BATCH_SIZE;
1338 }
1339
1340 (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1341}
1342
1343fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1344 if num_rows == 0 || num_series == 0 {
1345 return false;
1346 }
1347
1348 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1350}
1351
1352#[cfg(test)]
1353mod split_tests {
1354 use std::sync::Arc;
1355
1356 use common_time::Timestamp;
1357 use smallvec::smallvec;
1358 use store_api::storage::FileId;
1359
1360 use super::*;
1361 use crate::read::projection::ProjectionMapper;
1362 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1363 use crate::read::scan_region::{ScanInput, StreamContext};
1364 use crate::sst::file::FileHandle;
1365 use crate::test_util::memtable_util::metadata_with_primary_key;
1366 use crate::test_util::scheduler_util::SchedulerEnv;
1367 use crate::test_util::sst_util::sst_file_handle_with_file_id;
1368
1369 async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1370 let env = SchedulerEnv::new().await;
1371 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1372 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1373 let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1374
1375 StreamContext {
1376 input,
1377 ranges: vec![],
1378 scan_fingerprint: None,
1379 query_start: std::time::Instant::now(),
1380 }
1381 }
1382
1383 fn single_file_range_meta() -> RangeMeta {
1384 RangeMeta {
1385 time_range: (
1386 Timestamp::new_millisecond(0),
1387 Timestamp::new_millisecond(1000),
1388 ),
1389 indices: smallvec![SourceIndex {
1390 index: 0,
1391 num_row_groups: 1,
1392 }],
1393 row_group_indices: smallvec![RowGroupIndex {
1394 index: 0,
1395 row_group_index: 0,
1396 }],
1397 num_rows: 1024,
1398 }
1399 }
1400
1401 #[tokio::test]
1402 async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1403 let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1404 .meta_ref()
1405 .clone();
1406 file.level = 0;
1407 file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1408 file.num_row_groups = 1;
1409 file.num_series = 0;
1410
1411 let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1412 let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1413
1414 assert!(
1415 should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1416 );
1417 }
1418
1419 #[test]
1420 fn can_split_series_returns_false_for_zero_inputs() {
1421 assert!(!can_split_series(0, 1));
1422 assert!(!can_split_series(1, 0));
1423 assert!(!can_split_series(0, 0));
1424 }
1425}
1426
1427fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1430 if explain_verbose {
1431 ReaderFilterMetrics {
1432 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1433 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1434 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1435 ..Default::default()
1436 }
1437 } else {
1438 ReaderFilterMetrics::default()
1439 }
1440}
1441
1442#[tracing::instrument(
1444 skip_all,
1445 fields(
1446 region_id = %stream_ctx.input.region_metadata().region_id,
1447 row_group_index = %index.index,
1448 source = read_type
1449 )
1450)]
1451pub(crate) async fn scan_flat_file_ranges(
1452 stream_ctx: Arc<StreamContext>,
1453 part_metrics: PartitionMetrics,
1454 index: RowGroupIndex,
1455 read_type: &'static str,
1456 partition_pruner: Arc<PartitionPruner>,
1457) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1458 let mut reader_metrics = ReaderMetrics {
1459 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1460 ..Default::default()
1461 };
1462 let ranges = partition_pruner
1463 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1464 .await?;
1465 part_metrics.inc_num_file_ranges(ranges.len());
1466 part_metrics.merge_reader_metrics(&reader_metrics, None);
1467
1468 let init_per_file_metrics = if part_metrics.explain_verbose() {
1470 let file = stream_ctx.input.file_from_index(index);
1471 let file_id = file.file_id();
1472
1473 let mut map = HashMap::new();
1474 map.insert(
1475 file_id,
1476 FileScanMetrics {
1477 build_part_cost: reader_metrics.build_cost,
1478 ..Default::default()
1479 },
1480 );
1481 Some(map)
1482 } else {
1483 None
1484 };
1485
1486 Ok(build_flat_file_range_scan_stream(
1487 stream_ctx,
1488 part_metrics,
1489 read_type,
1490 ranges,
1491 init_per_file_metrics,
1492 ))
1493}
1494
1495#[tracing::instrument(
1497 skip_all,
1498 fields(read_type = read_type, range_count = ranges.len())
1499)]
1500pub fn build_flat_file_range_scan_stream(
1501 _stream_ctx: Arc<StreamContext>,
1502 part_metrics: PartitionMetrics,
1503 read_type: &'static str,
1504 ranges: SmallVec<[FileRange; 2]>,
1505 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1506) -> impl Stream<Item = Result<RecordBatch>> {
1507 try_stream! {
1508 let fetch_metrics = if part_metrics.explain_verbose() {
1509 Some(Arc::new(ParquetFetchMetrics::default()))
1510 } else {
1511 None
1512 };
1513 let reader_metrics = &mut ReaderMetrics {
1514 fetch_metrics: fetch_metrics.clone(),
1515 ..Default::default()
1516 };
1517 for range in ranges {
1518 let build_reader_start = Instant::now();
1519 let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
1520 let build_cost = build_reader_start.elapsed();
1521 part_metrics.inc_build_reader_cost(build_cost);
1522
1523 let may_compat = range
1524 .compat_batch()
1525 .map(|compat| {
1526 compat.as_flat().context(UnexpectedSnafu {
1527 reason: "Invalid compat for flat format",
1528 })
1529 })
1530 .transpose()?;
1531
1532 let mapper = range.compaction_projection_mapper();
1533 while let Some(record_batch) = reader.next_batch().await? {
1534 let record_batch = if let Some(mapper) = mapper {
1535 let batch = mapper.project(record_batch)?;
1536 batch
1537 } else {
1538 record_batch
1539 };
1540
1541 if let Some(flat_compat) = may_compat {
1542 let batch = flat_compat.compat(record_batch)?;
1543 yield batch;
1544 } else {
1545 yield record_batch;
1546 }
1547 }
1548
1549 let prune_metrics = reader.metrics();
1550
1551 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1553 let file_id = range.file_handle().file_id();
1554 let file_metrics = file_metrics_map
1555 .entry(file_id)
1556 .or_insert_with(FileScanMetrics::default);
1557
1558 file_metrics.num_ranges += 1;
1559 file_metrics.num_rows += prune_metrics.num_rows;
1560 file_metrics.build_reader_cost += build_cost;
1561 file_metrics.scan_cost += prune_metrics.scan_cost;
1562 }
1563
1564 reader_metrics.merge_from(&prune_metrics);
1565 }
1566
1567 reader_metrics.observe_rows(read_type);
1569 reader_metrics.filter_metrics.observe();
1570 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1571 }
1572}
1573
1574#[cfg(feature = "enterprise")]
1576pub(crate) async fn scan_flat_extension_range(
1577 context: Arc<StreamContext>,
1578 index: RowGroupIndex,
1579 partition_metrics: PartitionMetrics,
1580) -> Result<BoxedRecordBatchStream> {
1581 use snafu::ResultExt;
1582
1583 let range = context.input.extension_range(index.index);
1584 let reader = range.flat_reader(context.as_ref());
1585 let stream = reader
1586 .read(context, partition_metrics, index)
1587 .await
1588 .context(crate::error::ScanExternalRangeSnafu)?;
1589 Ok(stream)
1590}
1591
1592pub(crate) async fn maybe_scan_flat_other_ranges(
1593 context: &Arc<StreamContext>,
1594 index: RowGroupIndex,
1595 metrics: &PartitionMetrics,
1596) -> Result<BoxedRecordBatchStream> {
1597 #[cfg(feature = "enterprise")]
1598 {
1599 scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1600 }
1601
1602 #[cfg(not(feature = "enterprise"))]
1603 {
1604 let _ = context;
1605 let _ = index;
1606 let _ = metrics;
1607
1608 crate::error::UnexpectedSnafu {
1609 reason: "no other ranges scannable in flat format",
1610 }
1611 .fail()
1612 }
1613}
1614
1615pub(crate) struct SplitRecordBatchStream<S> {
1617 inner: S,
1619 batches: VecDeque<RecordBatch>,
1621}
1622
1623impl<S> SplitRecordBatchStream<S> {
1624 pub(crate) fn new(inner: S) -> Self {
1626 Self {
1627 inner,
1628 batches: VecDeque::new(),
1629 }
1630 }
1631}
1632
1633impl<S> Stream for SplitRecordBatchStream<S>
1634where
1635 S: Stream<Item = Result<RecordBatch>> + Unpin,
1636{
1637 type Item = Result<RecordBatch>;
1638
1639 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1640 loop {
1641 if let Some(batch) = self.batches.pop_front() {
1643 return Poll::Ready(Some(Ok(batch)));
1644 }
1645
1646 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1648 Some(Ok(batch)) => batch,
1649 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1650 None => return Poll::Ready(None),
1651 };
1652
1653 split_record_batch(record_batch, &mut self.batches);
1655 }
1657 }
1658}
1659
1660pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1665 let batch_rows = record_batch.num_rows();
1666 if batch_rows == 0 {
1667 return;
1668 }
1669 if batch_rows < 2 {
1670 batches.push_back(record_batch);
1671 return;
1672 }
1673
1674 let time_index_pos = time_index_column_index(record_batch.num_columns());
1675 let timestamps = record_batch.column(time_index_pos);
1676 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1677 let mut offsets = Vec::with_capacity(16);
1678 offsets.push(0);
1679 let values = ts_values.values();
1680 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1681 if value > values[i + 1] {
1682 offsets.push(i + 1);
1683 }
1684 }
1685 offsets.push(values.len());
1686
1687 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1689 let end = offsets[i + 1];
1690 let rows_in_batch = end - start;
1691 batches.push_back(record_batch.slice(start, rows_in_batch));
1692 }
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697 use std::sync::Arc;
1698 use std::time::Instant;
1699
1700 use common_time::Timestamp;
1701 use smallvec::{SmallVec, smallvec};
1702 use store_api::storage::RegionId;
1703
1704 use super::*;
1705 use crate::cache::CacheStrategy;
1706 use crate::memtable::{
1707 BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1708 MemtableRangeContext, MemtableStats,
1709 };
1710 use crate::read::projection::ProjectionMapper;
1711 use crate::read::range::{MemRangeBuilder, SourceIndex};
1712 use crate::read::scan_region::ScanInput;
1713 use crate::sst::file::{FileHandle, FileMeta};
1714 use crate::sst::file_purger::NoopFilePurger;
1715 use crate::test_util::memtable_util::metadata_for_test;
1716 use crate::test_util::scheduler_util::SchedulerEnv;
1717
1718 struct EmptyIterBuilder;
1719
1720 impl IterBuilder for EmptyIterBuilder {
1721 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1722 Ok(Box::new(std::iter::empty()))
1723 }
1724
1725 fn is_record_batch(&self) -> bool {
1726 true
1727 }
1728
1729 fn build_record_batch(
1730 &self,
1731 _time_range: Option<(Timestamp, Timestamp)>,
1732 _metrics: Option<MemScanMetrics>,
1733 ) -> Result<BoxedRecordBatchIterator> {
1734 Ok(Box::new(std::iter::empty()))
1735 }
1736 }
1737
1738 async fn new_test_stream_ctx(
1739 files: Vec<FileHandle>,
1740 memtables: Vec<MemRangeBuilder>,
1741 ) -> Arc<StreamContext> {
1742 let env = SchedulerEnv::new().await;
1743 let metadata = metadata_for_test();
1744 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1745 let input = ScanInput::new(env.access_layer.clone(), mapper)
1746 .with_cache(CacheStrategy::Disabled)
1747 .with_memtables(memtables)
1748 .with_files(files);
1749
1750 Arc::new(StreamContext {
1751 input,
1752 ranges: Vec::new(),
1753 scan_fingerprint: None,
1754 query_start: Instant::now(),
1755 })
1756 }
1757
1758 fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1759 let meta = FileMeta {
1760 region_id: RegionId::new(123, 456),
1761 file_id: Default::default(),
1762 level: 1,
1763 time_range: (
1764 Timestamp::new_millisecond(0),
1765 Timestamp::new_millisecond(1000),
1766 ),
1767 num_rows,
1768 num_series,
1769 ..Default::default()
1770 };
1771 FileHandle::new(meta, Arc::new(NoopFilePurger))
1772 }
1773
1774 fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1775 let context = Arc::new(MemtableRangeContext::new(
1776 0,
1777 Box::new(EmptyIterBuilder),
1778 Default::default(),
1779 ));
1780 let stats = MemtableStats {
1781 time_range: Some((
1782 Timestamp::new_millisecond(0),
1783 Timestamp::new_millisecond(1000),
1784 )),
1785 num_rows,
1786 num_ranges: 1,
1787 series_count,
1788 ..Default::default()
1789 };
1790 let range = MemtableRange::new(context, stats.clone());
1791 MemRangeBuilder::new(range, stats)
1792 }
1793
1794 fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1795 let indices = row_group_indices
1796 .iter()
1797 .map(|row_group_index| SourceIndex {
1798 index: row_group_index.index,
1799 num_row_groups: 1,
1800 })
1801 .collect();
1802
1803 RangeMeta {
1804 time_range: (
1805 Timestamp::new_millisecond(0),
1806 Timestamp::new_millisecond(1000),
1807 ),
1808 indices,
1809 row_group_indices,
1810 num_rows: 0,
1811 }
1812 }
1813
1814 #[tokio::test]
1815 async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1816 let num_rows = SPLIT_ROW_THRESHOLD * 2;
1817 let num_series = (num_rows / 100).max(1);
1818 let stream_ctx =
1819 new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1820 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1821 index: 0,
1822 row_group_index: 0,
1823 }]);
1824
1825 assert_eq!(
1826 Some((num_rows / num_series) as usize),
1827 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1833 let stream_ctx = new_test_stream_ctx(
1834 vec![
1835 new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1836 new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1837 ],
1838 vec![],
1839 )
1840 .await;
1841 let range_meta = new_test_range_meta(smallvec![
1842 RowGroupIndex {
1843 index: 0,
1844 row_group_index: 0,
1845 },
1846 RowGroupIndex {
1847 index: 1,
1848 row_group_index: 0,
1849 }
1850 ]);
1851
1852 assert_eq!(
1853 None,
1854 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1855 );
1856 }
1857
1858 #[tokio::test]
1859 async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1860 let num_series =
1861 (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1862 let stream_ctx =
1863 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1864 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1865 index: 0,
1866 row_group_index: 0,
1867 }]);
1868
1869 assert_eq!(
1870 None,
1871 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1872 );
1873 }
1874
1875 #[tokio::test]
1876 async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1877 let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1878 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1879 index: 0,
1880 row_group_index: 0,
1881 }]);
1882
1883 assert_eq!(
1884 Some(50),
1885 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1886 );
1887 }
1888
1889 #[tokio::test]
1890 async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1891 let stream_ctx =
1892 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1893 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1894 index: 0,
1895 row_group_index: 0,
1896 }]);
1897
1898 assert_eq!(
1899 Some(DEFAULT_READ_BATCH_SIZE),
1900 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1901 );
1902 }
1903
1904 #[test]
1905 fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1906 assert_eq!(64, compute_parallel_channel_size(0));
1907 assert_eq!(64, compute_parallel_channel_size(1));
1908 }
1909
1910 #[test]
1911 fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1912 assert_eq!(
1913 4,
1914 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1915 );
1916 }
1917
1918 #[test]
1919 fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1920 assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1921 assert_eq!(
1922 2,
1923 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1924 );
1925 }
1926
1927 #[test]
1928 fn test_compute_average_batch_size_uses_arithmetic_mean() {
1929 assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1930 }
1931
1932 #[test]
1933 fn test_compute_average_batch_size_clamps_values() {
1934 assert_eq!(
1935 DEFAULT_READ_BATCH_SIZE,
1936 compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1937 );
1938 assert_eq!(1, compute_average_batch_size([0, 1]));
1939 }
1940
1941 #[test]
1942 fn test_compute_average_batch_size_falls_back_when_empty() {
1943 assert_eq!(
1944 DEFAULT_READ_BATCH_SIZE,
1945 compute_average_batch_size(std::iter::empty())
1946 );
1947 }
1948}