1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use store_api::storage::RegionId;
33
34use crate::error::Result;
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::pruner::PartitionPruner;
43use crate::read::range::{RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
55
56#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59 pub num_ranges: usize,
61 pub num_rows: usize,
63 pub build_part_cost: Duration,
65 pub build_reader_cost: Duration,
67 pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75 if self.num_ranges > 0 {
76 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77 }
78 if self.num_rows > 0 {
79 write!(f, ", \"num_rows\":{}", self.num_rows)?;
80 }
81 if !self.build_reader_cost.is_zero() {
82 write!(
83 f,
84 ", \"build_reader_cost\":\"{:?}\"",
85 self.build_reader_cost
86 )?;
87 }
88 if !self.scan_cost.is_zero() {
89 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90 }
91
92 write!(f, "}}")
93 }
94}
95
96impl FileScanMetrics {
97 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99 self.num_ranges += other.num_ranges;
100 self.num_rows += other.num_rows;
101 self.build_part_cost += other.build_part_cost;
102 self.build_reader_cost += other.build_reader_cost;
103 self.scan_cost += other.scan_cost;
104 }
105}
106
107#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110 prepare_scan_cost: Duration,
112 build_reader_cost: Duration,
114 scan_cost: Duration,
116 yield_cost: Duration,
118 convert_cost: Option<Time>,
120 total_cost: Duration,
122 num_rows: usize,
124 num_batches: usize,
126 num_mem_ranges: usize,
128 num_file_ranges: usize,
130
131 mem_scan_cost: Duration,
134 mem_rows: usize,
136 mem_batches: usize,
138 mem_series: usize,
140 mem_prefilter_cost: Duration,
142 mem_prefilter_rows_filtered: usize,
144
145 build_parts_cost: Duration,
148 sst_scan_cost: Duration,
150 rg_total: usize,
152 rg_fulltext_filtered: usize,
154 rg_inverted_filtered: usize,
156 rg_minmax_filtered: usize,
158 rg_bloom_filtered: usize,
160 rg_vector_filtered: usize,
162 rows_before_filter: usize,
164 rows_fulltext_filtered: usize,
166 rows_inverted_filtered: usize,
168 rows_bloom_filtered: usize,
170 rows_vector_filtered: usize,
172 rows_vector_selected: usize,
174 rows_precise_filtered: usize,
176 fulltext_index_cache_hit: usize,
178 fulltext_index_cache_miss: usize,
180 inverted_index_cache_hit: usize,
182 inverted_index_cache_miss: usize,
184 bloom_filter_cache_hit: usize,
186 bloom_filter_cache_miss: usize,
188 minmax_cache_hit: usize,
190 minmax_cache_miss: usize,
192 pruner_cache_hit: usize,
194 pruner_cache_miss: usize,
196 pruner_prune_cost: Duration,
198 num_sst_record_batches: usize,
200 num_sst_batches: usize,
202 num_sst_rows: usize,
204
205 first_poll: Duration,
207
208 num_series_send_timeout: usize,
210 num_series_send_full: usize,
212 num_distributor_rows: usize,
214 num_distributor_batches: usize,
216 distributor_scan_cost: Duration,
218 distributor_yield_cost: Duration,
220 distributor_divider_cost: Duration,
222
223 merge_metrics: MergeMetrics,
225 dedup_metrics: DedupMetrics,
227
228 stream_eof: bool,
230
231 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
234 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
236 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
238 fetch_metrics: Option<ParquetFetchMetrics>,
240 metadata_cache_metrics: Option<MetadataCacheMetrics>,
242 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
244
245 build_ranges_mem_size: isize,
247 build_ranges_peak_mem_size: isize,
249 num_range_builders: isize,
251 num_peak_range_builders: isize,
253 range_cache_size: usize,
255 range_cache_hit: usize,
257 range_cache_miss: usize,
259}
260
261struct CompareCostReverse<'a> {
264 total_cost: Duration,
265 file_id: RegionFileId,
266 metrics: &'a FileScanMetrics,
267}
268
269impl Ord for CompareCostReverse<'_> {
270 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
271 other.total_cost.cmp(&self.total_cost)
273 }
274}
275
276impl PartialOrd for CompareCostReverse<'_> {
277 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
278 Some(self.cmp(other))
279 }
280}
281
282impl Eq for CompareCostReverse<'_> {}
283
284impl PartialEq for CompareCostReverse<'_> {
285 fn eq(&self, other: &Self) -> bool {
286 self.total_cost == other.total_cost
287 }
288}
289
290impl fmt::Debug for ScanMetricsSet {
291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292 let ScanMetricsSet {
293 prepare_scan_cost,
294 build_reader_cost,
295 scan_cost,
296 yield_cost,
297 convert_cost,
298 total_cost,
299 num_rows,
300 num_batches,
301 num_mem_ranges,
302 num_file_ranges,
303 build_parts_cost,
304 sst_scan_cost,
305 rg_total,
306 rg_fulltext_filtered,
307 rg_inverted_filtered,
308 rg_minmax_filtered,
309 rg_bloom_filtered,
310 rg_vector_filtered,
311 rows_before_filter,
312 rows_fulltext_filtered,
313 rows_inverted_filtered,
314 rows_bloom_filtered,
315 rows_vector_filtered,
316 rows_vector_selected,
317 rows_precise_filtered,
318 fulltext_index_cache_hit,
319 fulltext_index_cache_miss,
320 inverted_index_cache_hit,
321 inverted_index_cache_miss,
322 bloom_filter_cache_hit,
323 bloom_filter_cache_miss,
324 minmax_cache_hit,
325 minmax_cache_miss,
326 pruner_cache_hit,
327 pruner_cache_miss,
328 pruner_prune_cost,
329 num_sst_record_batches,
330 num_sst_batches,
331 num_sst_rows,
332 first_poll,
333 num_series_send_timeout,
334 num_series_send_full,
335 num_distributor_rows,
336 num_distributor_batches,
337 distributor_scan_cost,
338 distributor_yield_cost,
339 distributor_divider_cost,
340 merge_metrics,
341 dedup_metrics,
342 stream_eof,
343 mem_scan_cost,
344 mem_rows,
345 mem_batches,
346 mem_series,
347 mem_prefilter_cost,
348 mem_prefilter_rows_filtered,
349 inverted_index_apply_metrics,
350 bloom_filter_apply_metrics,
351 fulltext_index_apply_metrics,
352 fetch_metrics,
353 metadata_cache_metrics,
354 per_file_metrics,
355 build_ranges_mem_size: _,
356 build_ranges_peak_mem_size,
357 num_range_builders: _,
358 num_peak_range_builders,
359 range_cache_size,
360 range_cache_hit,
361 range_cache_miss,
362 } = self;
363
364 write!(
366 f,
367 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
368 \"build_reader_cost\":\"{build_reader_cost:?}\", \
369 \"scan_cost\":\"{scan_cost:?}\", \
370 \"yield_cost\":\"{yield_cost:?}\", \
371 \"total_cost\":\"{total_cost:?}\", \
372 \"num_rows\":{num_rows}, \
373 \"num_batches\":{num_batches}, \
374 \"num_mem_ranges\":{num_mem_ranges}, \
375 \"num_file_ranges\":{num_file_ranges}, \
376 \"build_parts_cost\":\"{build_parts_cost:?}\", \
377 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
378 \"rg_total\":{rg_total}, \
379 \"rows_before_filter\":{rows_before_filter}, \
380 \"num_sst_record_batches\":{num_sst_record_batches}, \
381 \"num_sst_batches\":{num_sst_batches}, \
382 \"num_sst_rows\":{num_sst_rows}, \
383 \"first_poll\":\"{first_poll:?}\""
384 )?;
385
386 if let Some(time) = convert_cost {
388 let duration = Duration::from_nanos(time.value() as u64);
389 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
390 }
391
392 if *rg_fulltext_filtered > 0 {
394 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
395 }
396 if *rg_inverted_filtered > 0 {
397 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
398 }
399 if *rg_minmax_filtered > 0 {
400 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
401 }
402 if *rg_bloom_filtered > 0 {
403 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
404 }
405 if *rg_vector_filtered > 0 {
406 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
407 }
408 if *rows_fulltext_filtered > 0 {
409 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
410 }
411 if *rows_inverted_filtered > 0 {
412 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
413 }
414 if *rows_bloom_filtered > 0 {
415 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
416 }
417 if *rows_vector_filtered > 0 {
418 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
419 }
420 if *rows_vector_selected > 0 {
421 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
422 }
423 if *rows_precise_filtered > 0 {
424 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
425 }
426 if *fulltext_index_cache_hit > 0 {
427 write!(
428 f,
429 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
430 )?;
431 }
432 if *fulltext_index_cache_miss > 0 {
433 write!(
434 f,
435 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
436 )?;
437 }
438 if *inverted_index_cache_hit > 0 {
439 write!(
440 f,
441 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
442 )?;
443 }
444 if *inverted_index_cache_miss > 0 {
445 write!(
446 f,
447 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
448 )?;
449 }
450 if *bloom_filter_cache_hit > 0 {
451 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
452 }
453 if *bloom_filter_cache_miss > 0 {
454 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
455 }
456 if *minmax_cache_hit > 0 {
457 write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
458 }
459 if *minmax_cache_miss > 0 {
460 write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
461 }
462 if *pruner_cache_hit > 0 {
463 write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
464 }
465 if *pruner_cache_miss > 0 {
466 write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
467 }
468 if !pruner_prune_cost.is_zero() {
469 write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
470 }
471
472 if *num_series_send_timeout > 0 {
474 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
475 }
476 if *num_series_send_full > 0 {
477 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
478 }
479 if *num_distributor_rows > 0 {
480 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
481 }
482 if *num_distributor_batches > 0 {
483 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
484 }
485 if !distributor_scan_cost.is_zero() {
486 write!(
487 f,
488 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
489 )?;
490 }
491 if !distributor_yield_cost.is_zero() {
492 write!(
493 f,
494 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
495 )?;
496 }
497 if !distributor_divider_cost.is_zero() {
498 write!(
499 f,
500 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
501 )?;
502 }
503
504 if *mem_rows > 0 {
506 write!(f, ", \"mem_rows\":{mem_rows}")?;
507 }
508 if *mem_batches > 0 {
509 write!(f, ", \"mem_batches\":{mem_batches}")?;
510 }
511 if *mem_series > 0 {
512 write!(f, ", \"mem_series\":{mem_series}")?;
513 }
514 if !mem_scan_cost.is_zero() {
515 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
516 }
517 if !mem_prefilter_cost.is_zero() {
518 write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
519 }
520 if *mem_prefilter_rows_filtered > 0 {
521 write!(
522 f,
523 ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
524 )?;
525 }
526
527 if let Some(metrics) = inverted_index_apply_metrics
529 && !metrics.is_empty()
530 {
531 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
532 }
533 if let Some(metrics) = bloom_filter_apply_metrics
534 && !metrics.is_empty()
535 {
536 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
537 }
538 if let Some(metrics) = fulltext_index_apply_metrics
539 && !metrics.is_empty()
540 {
541 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
542 }
543 if let Some(metrics) = fetch_metrics
544 && !metrics.is_empty()
545 {
546 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
547 }
548 if let Some(metrics) = metadata_cache_metrics
549 && !metrics.is_empty()
550 {
551 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
552 }
553
554 if !merge_metrics.scan_cost.is_zero() {
556 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
557 }
558
559 if !dedup_metrics.dedup_cost.is_zero() {
561 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
562 }
563
564 if let Some(file_metrics) = per_file_metrics
566 && !file_metrics.is_empty()
567 {
568 let mut heap = BinaryHeap::new();
570 for (file_id, metrics) in file_metrics.iter() {
571 let total_cost =
572 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
573
574 if total_cost.is_zero() && metrics.num_ranges == 0 {
577 continue;
578 }
579
580 if heap.len() < 10 {
581 heap.push(CompareCostReverse {
583 total_cost,
584 file_id: *file_id,
585 metrics,
586 });
587 } else if let Some(min_entry) = heap.peek() {
588 if total_cost > min_entry.total_cost {
590 heap.pop();
591 heap.push(CompareCostReverse {
592 total_cost,
593 file_id: *file_id,
594 metrics,
595 });
596 }
597 }
598 }
599
600 let top_files = heap.into_sorted_vec();
601 write!(f, ", \"top_file_metrics\": {{")?;
602 for (i, item) in top_files.iter().enumerate() {
603 let CompareCostReverse {
604 total_cost: _,
605 file_id,
606 metrics,
607 } = item;
608 if i > 0 {
609 write!(f, ", ")?;
610 }
611 write!(f, "\"{}\": {:?}", file_id, metrics)?;
612 }
613 write!(f, "}}")?;
614 }
615
616 if *range_cache_size > 0 {
617 write!(f, ", \"range_cache_size\":{range_cache_size}")?;
618 }
619 if *range_cache_hit > 0 {
620 write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
621 }
622 if *range_cache_miss > 0 {
623 write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
624 }
625
626 write!(
627 f,
628 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
629 \"num_peak_range_builders\":{num_peak_range_builders}, \
630 \"stream_eof\":{stream_eof}}}"
631 )
632 }
633}
634impl ScanMetricsSet {
635 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
637 self.prepare_scan_cost += cost;
638 self
639 }
640
641 fn with_convert_cost(mut self, time: Time) -> Self {
643 self.convert_cost = Some(time);
644 self
645 }
646
647 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
649 let ScannerMetrics {
650 scan_cost,
651 yield_cost,
652 num_batches,
653 num_rows,
654 } = other;
655
656 self.scan_cost += *scan_cost;
657 self.yield_cost += *yield_cost;
658 self.num_rows += *num_rows;
659 self.num_batches += *num_batches;
660 }
661
662 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
664 let ReaderMetrics {
665 build_cost,
666 filter_metrics:
667 ReaderFilterMetrics {
668 rg_total,
669 rg_fulltext_filtered,
670 rg_inverted_filtered,
671 rg_minmax_filtered,
672 rg_bloom_filtered,
673 rg_vector_filtered,
674 rows_total,
675 rows_fulltext_filtered,
676 rows_inverted_filtered,
677 rows_bloom_filtered,
678 rows_vector_filtered,
679 rows_vector_selected,
680 rows_precise_filtered,
681 fulltext_index_cache_hit,
682 fulltext_index_cache_miss,
683 inverted_index_cache_hit,
684 inverted_index_cache_miss,
685 bloom_filter_cache_hit,
686 bloom_filter_cache_miss,
687 minmax_cache_hit,
688 minmax_cache_miss,
689 pruner_cache_hit,
690 pruner_cache_miss,
691 pruner_prune_cost,
692 inverted_index_apply_metrics,
693 bloom_filter_apply_metrics,
694 fulltext_index_apply_metrics,
695 },
696 num_record_batches,
697 num_batches,
698 num_rows,
699 scan_cost,
700 metadata_cache_metrics,
701 fetch_metrics,
702 metadata_mem_size,
703 num_range_builders,
704 } = other;
705
706 self.build_parts_cost += *build_cost;
707 self.sst_scan_cost += *scan_cost;
708
709 self.rg_total += *rg_total;
710 self.rg_fulltext_filtered += *rg_fulltext_filtered;
711 self.rg_inverted_filtered += *rg_inverted_filtered;
712 self.rg_minmax_filtered += *rg_minmax_filtered;
713 self.rg_bloom_filtered += *rg_bloom_filtered;
714 self.rg_vector_filtered += *rg_vector_filtered;
715
716 self.rows_before_filter += *rows_total;
717 self.rows_fulltext_filtered += *rows_fulltext_filtered;
718 self.rows_inverted_filtered += *rows_inverted_filtered;
719 self.rows_bloom_filtered += *rows_bloom_filtered;
720 self.rows_vector_filtered += *rows_vector_filtered;
721 self.rows_vector_selected += *rows_vector_selected;
722 self.rows_precise_filtered += *rows_precise_filtered;
723
724 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
725 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
726 self.inverted_index_cache_hit += *inverted_index_cache_hit;
727 self.inverted_index_cache_miss += *inverted_index_cache_miss;
728 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
729 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
730 self.minmax_cache_hit += *minmax_cache_hit;
731 self.minmax_cache_miss += *minmax_cache_miss;
732 self.pruner_cache_hit += *pruner_cache_hit;
733 self.pruner_cache_miss += *pruner_cache_miss;
734 self.pruner_prune_cost += *pruner_prune_cost;
735
736 self.num_sst_record_batches += *num_record_batches;
737 self.num_sst_batches += *num_batches;
738 self.num_sst_rows += *num_rows;
739
740 if let Some(metrics) = inverted_index_apply_metrics {
742 self.inverted_index_apply_metrics
743 .get_or_insert_with(InvertedIndexApplyMetrics::default)
744 .merge_from(metrics);
745 }
746 if let Some(metrics) = bloom_filter_apply_metrics {
747 self.bloom_filter_apply_metrics
748 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
749 .merge_from(metrics);
750 }
751 if let Some(metrics) = fulltext_index_apply_metrics {
752 self.fulltext_index_apply_metrics
753 .get_or_insert_with(FulltextIndexApplyMetrics::default)
754 .merge_from(metrics);
755 }
756 if let Some(metrics) = fetch_metrics {
757 self.fetch_metrics
758 .get_or_insert_with(ParquetFetchMetrics::default)
759 .merge_from(metrics);
760 }
761 self.metadata_cache_metrics
762 .get_or_insert_with(MetadataCacheMetrics::default)
763 .merge_from(metadata_cache_metrics);
764
765 self.build_ranges_mem_size += *metadata_mem_size;
767 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
768 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
769 }
770
771 self.num_range_builders += *num_range_builders;
773 if self.num_range_builders > self.num_peak_range_builders {
774 self.num_peak_range_builders = self.num_range_builders;
775 }
776 }
777
778 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
780 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
781 for (file_id, metrics) in other {
782 self_file_metrics
783 .entry(*file_id)
784 .or_default()
785 .merge_from(metrics);
786 }
787 }
788
789 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
791 let SeriesDistributorMetrics {
792 num_series_send_timeout,
793 num_series_send_full,
794 num_rows,
795 num_batches,
796 scan_cost,
797 yield_cost,
798 divider_cost,
799 } = distributor_metrics;
800
801 self.num_series_send_timeout += *num_series_send_timeout;
802 self.num_series_send_full += *num_series_send_full;
803 self.num_distributor_rows += *num_rows;
804 self.num_distributor_batches += *num_batches;
805 self.distributor_scan_cost += *scan_cost;
806 self.distributor_yield_cost += *yield_cost;
807 self.distributor_divider_cost += *divider_cost;
808 }
809
810 fn observe_metrics(&self) {
812 READ_STAGE_ELAPSED
813 .with_label_values(&["prepare_scan"])
814 .observe(self.prepare_scan_cost.as_secs_f64());
815 READ_STAGE_ELAPSED
816 .with_label_values(&["build_reader"])
817 .observe(self.build_reader_cost.as_secs_f64());
818 READ_STAGE_ELAPSED
819 .with_label_values(&["scan"])
820 .observe(self.scan_cost.as_secs_f64());
821 READ_STAGE_ELAPSED
822 .with_label_values(&["yield"])
823 .observe(self.yield_cost.as_secs_f64());
824 if let Some(time) = &self.convert_cost {
825 READ_STAGE_ELAPSED
826 .with_label_values(&["convert"])
827 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
828 }
829 READ_STAGE_ELAPSED
830 .with_label_values(&["total"])
831 .observe(self.total_cost.as_secs_f64());
832 READ_ROWS_RETURN.observe(self.num_rows as f64);
833 READ_BATCHES_RETURN.observe(self.num_batches as f64);
834
835 READ_STAGE_ELAPSED
836 .with_label_values(&["build_parts"])
837 .observe(self.build_parts_cost.as_secs_f64());
838
839 READ_ROW_GROUPS_TOTAL
840 .with_label_values(&["before_filtering"])
841 .inc_by(self.rg_total as u64);
842 READ_ROW_GROUPS_TOTAL
843 .with_label_values(&["fulltext_index_filtered"])
844 .inc_by(self.rg_fulltext_filtered as u64);
845 READ_ROW_GROUPS_TOTAL
846 .with_label_values(&["inverted_index_filtered"])
847 .inc_by(self.rg_inverted_filtered as u64);
848 READ_ROW_GROUPS_TOTAL
849 .with_label_values(&["minmax_index_filtered"])
850 .inc_by(self.rg_minmax_filtered as u64);
851 READ_ROW_GROUPS_TOTAL
852 .with_label_values(&["bloom_filter_index_filtered"])
853 .inc_by(self.rg_bloom_filtered as u64);
854 #[cfg(feature = "vector_index")]
855 READ_ROW_GROUPS_TOTAL
856 .with_label_values(&["vector_index_filtered"])
857 .inc_by(self.rg_vector_filtered as u64);
858
859 PRECISE_FILTER_ROWS_TOTAL
860 .with_label_values(&["parquet"])
861 .inc_by(self.rows_precise_filtered as u64);
862 READ_ROWS_IN_ROW_GROUP_TOTAL
863 .with_label_values(&["before_filtering"])
864 .inc_by(self.rows_before_filter as u64);
865 READ_ROWS_IN_ROW_GROUP_TOTAL
866 .with_label_values(&["fulltext_index_filtered"])
867 .inc_by(self.rows_fulltext_filtered as u64);
868 READ_ROWS_IN_ROW_GROUP_TOTAL
869 .with_label_values(&["inverted_index_filtered"])
870 .inc_by(self.rows_inverted_filtered as u64);
871 READ_ROWS_IN_ROW_GROUP_TOTAL
872 .with_label_values(&["bloom_filter_index_filtered"])
873 .inc_by(self.rows_bloom_filtered as u64);
874 #[cfg(feature = "vector_index")]
875 READ_ROWS_IN_ROW_GROUP_TOTAL
876 .with_label_values(&["vector_index_filtered"])
877 .inc_by(self.rows_vector_filtered as u64);
878 }
879}
880
881struct PartitionMetricsInner {
882 region_id: RegionId,
883 partition: usize,
885 scanner_type: &'static str,
887 query_start: Instant,
889 explain_verbose: bool,
891 metrics: Mutex<ScanMetricsSet>,
893 in_progress_scan: IntGauge,
894
895 build_parts_cost: Time,
898 build_reader_cost: Time,
900 scan_cost: Time,
902 yield_cost: Time,
904 convert_cost: Time,
906 elapsed_compute: Time,
908}
909
910impl PartitionMetricsInner {
911 fn on_finish(&self, stream_eof: bool) {
912 let mut metrics = self.metrics.lock().unwrap();
913 if metrics.total_cost.is_zero() {
914 metrics.total_cost = self.query_start.elapsed();
915 }
916 if !metrics.stream_eof {
917 metrics.stream_eof = stream_eof;
918 }
919 }
920}
921
922impl MergeMetricsReport for PartitionMetricsInner {
923 fn report(&self, metrics: &mut MergeMetrics) {
924 let mut scan_metrics = self.metrics.lock().unwrap();
925 scan_metrics.merge_metrics.merge(metrics);
927
928 *metrics = MergeMetrics::default();
930 }
931}
932
933impl DedupMetricsReport for PartitionMetricsInner {
934 fn report(&self, metrics: &mut DedupMetrics) {
935 let mut scan_metrics = self.metrics.lock().unwrap();
936 scan_metrics.dedup_metrics.merge(metrics);
938
939 *metrics = DedupMetrics::default();
941 }
942}
943
944impl Drop for PartitionMetricsInner {
945 fn drop(&mut self) {
946 self.on_finish(false);
947 let metrics = self.metrics.lock().unwrap();
948 metrics.observe_metrics();
949 self.in_progress_scan.dec();
950
951 if self.explain_verbose {
952 common_telemetry::info!(
953 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
954 self.scanner_type,
955 self.region_id,
956 self.partition,
957 metrics,
958 );
959 } else {
960 common_telemetry::debug!(
961 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
962 self.scanner_type,
963 self.region_id,
964 self.partition,
965 metrics,
966 );
967 }
968 }
969}
970
971#[derive(Default)]
973pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
974
975impl PartitionMetricsList {
976 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
978 let mut list = self.0.lock().unwrap();
979 if list.len() <= partition {
980 list.resize(partition + 1, None);
981 }
982 list[partition] = Some(metrics);
983 }
984
985 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
987 let list = self.0.lock().unwrap();
988 write!(f, ", \"metrics_per_partition\": ")?;
989 f.debug_list()
990 .entries(list.iter().filter_map(|p| p.as_ref()))
991 .finish()?;
992 write!(f, "}}")
993 }
994}
995
996#[derive(Clone)]
998pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
999
1000impl PartitionMetrics {
1001 pub(crate) fn new(
1002 region_id: RegionId,
1003 partition: usize,
1004 scanner_type: &'static str,
1005 query_start: Instant,
1006 explain_verbose: bool,
1007 metrics_set: &ExecutionPlanMetricsSet,
1008 ) -> Self {
1009 let partition_str = partition.to_string();
1010 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1011 in_progress_scan.inc();
1012 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1013 let metrics = ScanMetricsSet::default()
1014 .with_prepare_scan_cost(query_start.elapsed())
1015 .with_convert_cost(convert_cost.clone());
1016 let inner = PartitionMetricsInner {
1017 region_id,
1018 partition,
1019 scanner_type,
1020 query_start,
1021 explain_verbose,
1022 metrics: Mutex::new(metrics),
1023 in_progress_scan,
1024 build_parts_cost: MetricBuilder::new(metrics_set)
1025 .subset_time("build_parts_cost", partition),
1026 build_reader_cost: MetricBuilder::new(metrics_set)
1027 .subset_time("build_reader_cost", partition),
1028 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1029 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1030 convert_cost,
1031 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1032 };
1033 Self(Arc::new(inner))
1034 }
1035
1036 pub(crate) fn on_first_poll(&self) {
1037 let mut metrics = self.0.metrics.lock().unwrap();
1038 metrics.first_poll = self.0.query_start.elapsed();
1039 }
1040
1041 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1042 let mut metrics = self.0.metrics.lock().unwrap();
1043 metrics.num_mem_ranges += num;
1044 }
1045
1046 pub fn inc_num_file_ranges(&self, num: usize) {
1047 let mut metrics = self.0.metrics.lock().unwrap();
1048 metrics.num_file_ranges += num;
1049 }
1050
1051 fn record_elapsed_compute(&self, duration: Duration) {
1052 if duration.is_zero() {
1053 return;
1054 }
1055 self.0.elapsed_compute.add_duration(duration);
1056 }
1057
1058 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1060 self.0.build_reader_cost.add_duration(cost);
1061
1062 let mut metrics = self.0.metrics.lock().unwrap();
1063 metrics.build_reader_cost += cost;
1064 }
1065
1066 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1067 self.0.convert_cost.add_duration(cost);
1068 self.record_elapsed_compute(cost);
1069 }
1070
1071 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1073 let mut metrics = self.0.metrics.lock().unwrap();
1074 metrics.mem_scan_cost += data.scan_cost;
1075 metrics.mem_rows += data.num_rows;
1076 metrics.mem_batches += data.num_batches;
1077 metrics.mem_series += data.total_series;
1078 metrics.mem_prefilter_cost += data.prefilter_cost;
1079 metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1080 }
1081
1082 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1084 self.0.scan_cost.add_duration(metrics.scan_cost);
1085 self.record_elapsed_compute(metrics.scan_cost);
1086 self.0.yield_cost.add_duration(metrics.yield_cost);
1087 self.record_elapsed_compute(metrics.yield_cost);
1088
1089 let mut metrics_set = self.0.metrics.lock().unwrap();
1090 metrics_set.merge_scanner_metrics(metrics);
1091 }
1092
1093 pub fn merge_reader_metrics(
1095 &self,
1096 metrics: &ReaderMetrics,
1097 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1098 ) {
1099 self.0.build_parts_cost.add_duration(metrics.build_cost);
1100
1101 let mut metrics_set = self.0.metrics.lock().unwrap();
1102 metrics_set.merge_reader_metrics(metrics);
1103
1104 if let Some(file_metrics) = per_file_metrics {
1106 metrics_set.merge_per_file_metrics(file_metrics);
1107 }
1108 }
1109
1110 pub(crate) fn on_finish(&self) {
1112 self.0.on_finish(true);
1113 }
1114
1115 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1117 let mut metrics_set = self.0.metrics.lock().unwrap();
1118 metrics_set.set_distributor_metrics(metrics);
1119 }
1120
1121 pub(crate) fn explain_verbose(&self) -> bool {
1123 self.0.explain_verbose
1124 }
1125
1126 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1128 self.0.clone()
1129 }
1130
1131 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1133 self.0.clone()
1134 }
1135
1136 #[allow(dead_code)]
1138 pub(crate) fn inc_range_cache_size(&self, size: usize) {
1139 let mut metrics = self.0.metrics.lock().unwrap();
1140 metrics.range_cache_size += size;
1141 }
1142
1143 #[allow(dead_code)]
1145 pub(crate) fn inc_range_cache_hit(&self) {
1146 let mut metrics = self.0.metrics.lock().unwrap();
1147 metrics.range_cache_hit += 1;
1148 }
1149
1150 #[allow(dead_code)]
1152 pub(crate) fn inc_range_cache_miss(&self) {
1153 let mut metrics = self.0.metrics.lock().unwrap();
1154 metrics.range_cache_miss += 1;
1155 }
1156}
1157
1158impl fmt::Debug for PartitionMetrics {
1159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1160 let metrics = self.0.metrics.lock().unwrap();
1161 write!(
1162 f,
1163 r#"{{"partition":{}, "metrics":{:?}}}"#,
1164 self.0.partition, metrics
1165 )
1166 }
1167}
1168
1169#[derive(Default)]
1171pub(crate) struct SeriesDistributorMetrics {
1172 pub(crate) num_series_send_timeout: usize,
1174 pub(crate) num_series_send_full: usize,
1176 pub(crate) num_rows: usize,
1178 pub(crate) num_batches: usize,
1180 pub(crate) scan_cost: Duration,
1182 pub(crate) yield_cost: Duration,
1184 pub(crate) divider_cost: Duration,
1186}
1187
1188#[tracing::instrument(
1190 skip_all,
1191 fields(
1192 region_id = %stream_ctx.input.region_metadata().region_id,
1193 row_group_index = %index.index,
1194 source = "mem_flat"
1195 )
1196)]
1197pub(crate) fn scan_flat_mem_ranges(
1198 stream_ctx: Arc<StreamContext>,
1199 part_metrics: PartitionMetrics,
1200 index: RowGroupIndex,
1201 time_range: FileTimeRange,
1202) -> impl Stream<Item = Result<RecordBatch>> {
1203 try_stream! {
1204 let ranges = stream_ctx.input.build_mem_ranges(index);
1205 part_metrics.inc_num_mem_ranges(ranges.len());
1206 for range in ranges {
1207 let build_reader_start = Instant::now();
1208 let mem_scan_metrics = Some(MemScanMetrics::default());
1209 let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1210 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1211
1212 while let Some(record_batch) = iter.next().transpose()? {
1213 yield record_batch;
1214 }
1215
1216 if let Some(ref metrics) = mem_scan_metrics {
1218 let data = metrics.data();
1219 part_metrics.report_mem_scan_metrics(&data);
1220 }
1221 }
1222 }
1223}
1224
1225const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1227const NUM_SERIES_THRESHOLD: u64 = 10240;
1229const BATCH_SIZE_THRESHOLD: u64 = 50;
1232
1233pub(crate) fn should_split_flat_batches_for_merge(
1236 stream_ctx: &Arc<StreamContext>,
1237 range_meta: &RangeMeta,
1238) -> Option<usize> {
1239 let mut num_files_to_split = 0;
1241 let mut num_mem_rows = 0;
1242 let mut num_mem_series = 0;
1243 let mut total_rows: u64 = 0;
1245 let mut total_series: u64 = 0;
1246 for index in &range_meta.row_group_indices {
1250 if stream_ctx.is_mem_range_index(*index) {
1251 let memtable = &stream_ctx.input.memtables[index.index];
1252 let stats = memtable.stats();
1254 num_mem_rows += stats.num_rows();
1255 num_mem_series += stats.series_count();
1256 } else if stream_ctx.is_file_range_index(*index) {
1257 let file_index = index.index - stream_ctx.input.num_memtables();
1259 let file = &stream_ctx.input.files[file_index];
1260 let file_meta = file.meta_ref();
1261 if file_meta.level == 0 {
1262 num_files_to_split += 1;
1264 continue;
1265 } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1266 continue;
1268 }
1269 debug_assert!(file_meta.num_rows > 0);
1270 if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1271 common_telemetry::trace!(
1273 "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1274 file_meta.file_id,
1275 file_meta.level,
1276 file_meta.num_rows,
1277 file_meta.num_series,
1278 );
1279 return None;
1280 } else {
1281 num_files_to_split += 1;
1282 total_rows += file.meta_ref().num_rows;
1283 total_series += file.meta_ref().num_series;
1284 }
1285 }
1286 }
1288
1289 let should_split = if num_files_to_split > 0 {
1290 true
1292 } else if num_mem_series > 0
1293 && num_mem_rows > 0
1294 && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1295 {
1296 total_rows += num_mem_rows as u64;
1297 total_series += num_mem_series as u64;
1298 true
1299 } else {
1300 false
1301 };
1302
1303 if !should_split {
1304 return None;
1305 }
1306
1307 let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1309 ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1310 } else {
1311 DEFAULT_READ_BATCH_SIZE / 4
1313 };
1314 Some(estimated_batch_size)
1315}
1316
1317pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1320 let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1321 size.clamp(2, 64)
1322}
1323
1324pub(crate) fn compute_average_batch_size(
1326 estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1327) -> usize {
1328 let mut total = 0usize;
1329 let mut count = 0usize;
1330 for size in estimated_rows_per_batch {
1331 total += size;
1332 count += 1;
1333 }
1334
1335 if count == 0 {
1336 return DEFAULT_READ_BATCH_SIZE;
1337 }
1338
1339 (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1340}
1341
1342fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1343 if num_rows == 0 || num_series == 0 {
1344 return false;
1345 }
1346
1347 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1349}
1350
1351#[cfg(test)]
1352mod split_tests {
1353 use std::sync::Arc;
1354
1355 use common_time::Timestamp;
1356 use smallvec::smallvec;
1357 use store_api::storage::FileId;
1358
1359 use super::*;
1360 use crate::read::flat_projection::FlatProjectionMapper;
1361 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1362 use crate::read::scan_region::{ScanInput, StreamContext};
1363 use crate::sst::file::FileHandle;
1364 use crate::test_util::memtable_util::metadata_with_primary_key;
1365 use crate::test_util::scheduler_util::SchedulerEnv;
1366 use crate::test_util::sst_util::sst_file_handle_with_file_id;
1367
1368 async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1369 let env = SchedulerEnv::new().await;
1370 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1371 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1372 let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1373
1374 StreamContext {
1375 input,
1376 ranges: vec![],
1377 scan_fingerprint: None,
1378 query_start: std::time::Instant::now(),
1379 }
1380 }
1381
1382 fn single_file_range_meta() -> RangeMeta {
1383 RangeMeta {
1384 time_range: (
1385 Timestamp::new_millisecond(0),
1386 Timestamp::new_millisecond(1000),
1387 ),
1388 indices: smallvec![SourceIndex {
1389 index: 0,
1390 num_row_groups: 1,
1391 }],
1392 row_group_indices: smallvec![RowGroupIndex {
1393 index: 0,
1394 row_group_index: 0,
1395 }],
1396 num_rows: 1024,
1397 }
1398 }
1399
1400 #[tokio::test]
1401 async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1402 let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1403 .meta_ref()
1404 .clone();
1405 file.level = 0;
1406 file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1407 file.num_row_groups = 1;
1408 file.num_series = 0;
1409
1410 let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1411 let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1412
1413 assert!(
1414 should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1415 );
1416 }
1417
1418 #[test]
1419 fn can_split_series_returns_false_for_zero_inputs() {
1420 assert!(!can_split_series(0, 1));
1421 assert!(!can_split_series(1, 0));
1422 assert!(!can_split_series(0, 0));
1423 }
1424}
1425
1426fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1429 if explain_verbose {
1430 ReaderFilterMetrics {
1431 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1432 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1433 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1434 ..Default::default()
1435 }
1436 } else {
1437 ReaderFilterMetrics::default()
1438 }
1439}
1440
1441#[tracing::instrument(
1443 skip_all,
1444 fields(
1445 region_id = %stream_ctx.input.region_metadata().region_id,
1446 row_group_index = %index.index,
1447 source = read_type
1448 )
1449)]
1450pub(crate) async fn scan_flat_file_ranges(
1451 stream_ctx: Arc<StreamContext>,
1452 part_metrics: PartitionMetrics,
1453 index: RowGroupIndex,
1454 read_type: &'static str,
1455 partition_pruner: Arc<PartitionPruner>,
1456) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1457 let mut reader_metrics = ReaderMetrics {
1458 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1459 ..Default::default()
1460 };
1461 let ranges = partition_pruner
1462 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1463 .await?;
1464 part_metrics.inc_num_file_ranges(ranges.len());
1465 part_metrics.merge_reader_metrics(&reader_metrics, None);
1466
1467 let init_per_file_metrics = if part_metrics.explain_verbose() {
1469 let file = stream_ctx.input.file_from_index(index);
1470 let file_id = file.file_id();
1471
1472 let mut map = HashMap::new();
1473 map.insert(
1474 file_id,
1475 FileScanMetrics {
1476 build_part_cost: reader_metrics.build_cost,
1477 ..Default::default()
1478 },
1479 );
1480 Some(map)
1481 } else {
1482 None
1483 };
1484
1485 Ok(build_flat_file_range_scan_stream(
1486 stream_ctx,
1487 part_metrics,
1488 read_type,
1489 ranges,
1490 init_per_file_metrics,
1491 ))
1492}
1493
1494#[tracing::instrument(
1496 skip_all,
1497 fields(read_type = read_type, range_count = ranges.len())
1498)]
1499pub fn build_flat_file_range_scan_stream(
1500 stream_ctx: Arc<StreamContext>,
1501 part_metrics: PartitionMetrics,
1502 read_type: &'static str,
1503 ranges: SmallVec<[FileRange; 2]>,
1504 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1505) -> impl Stream<Item = Result<RecordBatch>> {
1506 try_stream! {
1507 let fetch_metrics = if part_metrics.explain_verbose() {
1508 Some(Arc::new(ParquetFetchMetrics::default()))
1509 } else {
1510 None
1511 };
1512 let reader_metrics = &mut ReaderMetrics {
1513 fetch_metrics: fetch_metrics.clone(),
1514 ..Default::default()
1515 };
1516 for range in ranges {
1517 let build_reader_start = Instant::now();
1518 let Some(mut reader) = range
1519 .flat_reader(
1520 stream_ctx.input.series_row_selector,
1521 fetch_metrics.as_deref(),
1522 )
1523 .await?
1524 else {
1525 continue;
1526 };
1527 let build_cost = build_reader_start.elapsed();
1528 part_metrics.inc_build_reader_cost(build_cost);
1529
1530 let may_compat = range.compat_batch();
1531
1532 let mapper = range.compaction_projection_mapper();
1533 while let Some(record_batch) = reader.next_batch().await? {
1534 let record_batch = if let Some(mapper) = mapper {
1535 let batch = mapper.project(record_batch)?;
1536 batch
1537 } else {
1538 record_batch
1539 };
1540
1541 if let Some(flat_compat) = may_compat {
1542 let batch = flat_compat.compat(record_batch)?;
1543 yield batch;
1544 } else {
1545 yield record_batch;
1546 }
1547 }
1548
1549 let prune_metrics = reader.metrics();
1550
1551 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1553 let file_id = range.file_handle().file_id();
1554 let file_metrics = file_metrics_map
1555 .entry(file_id)
1556 .or_insert_with(FileScanMetrics::default);
1557
1558 file_metrics.num_ranges += 1;
1559 file_metrics.num_rows += prune_metrics.num_rows;
1560 file_metrics.build_reader_cost += build_cost;
1561 file_metrics.scan_cost += prune_metrics.scan_cost;
1562 }
1563
1564 reader_metrics.merge_from(&prune_metrics);
1565 }
1566
1567 reader_metrics.observe_rows(read_type);
1569 reader_metrics.filter_metrics.observe();
1570 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1571 }
1572}
1573
1574#[cfg(feature = "enterprise")]
1576pub(crate) async fn scan_flat_extension_range(
1577 context: Arc<StreamContext>,
1578 index: RowGroupIndex,
1579 partition_metrics: PartitionMetrics,
1580 options: crate::extension::ExtensionRangeReadOptions,
1581) -> Result<BoxedRecordBatchStream> {
1582 use snafu::ResultExt;
1583
1584 let range = context.input.extension_range(index.index);
1585 let reader = range.flat_reader(context.as_ref(), options);
1586 let stream = reader
1587 .read(context, partition_metrics, index)
1588 .await
1589 .context(crate::error::ScanExternalRangeSnafu)?;
1590 Ok(stream)
1591}
1592
1593pub(crate) async fn maybe_scan_flat_other_ranges(
1594 context: &Arc<StreamContext>,
1595 index: RowGroupIndex,
1596 metrics: &PartitionMetrics,
1597 pre_filter_mode: PreFilterMode,
1598) -> Result<BoxedRecordBatchStream> {
1599 #[cfg(feature = "enterprise")]
1600 {
1601 let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode };
1602 scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await
1603 }
1604
1605 #[cfg(not(feature = "enterprise"))]
1606 {
1607 let _ = context;
1608 let _ = index;
1609 let _ = metrics;
1610 let _ = pre_filter_mode;
1611
1612 crate::error::UnexpectedSnafu {
1613 reason: "no other ranges scannable in flat format",
1614 }
1615 .fail()
1616 }
1617}
1618
1619pub(crate) struct SplitRecordBatchStream<S> {
1621 inner: S,
1623 batches: VecDeque<RecordBatch>,
1625}
1626
1627impl<S> SplitRecordBatchStream<S> {
1628 pub(crate) fn new(inner: S) -> Self {
1630 Self {
1631 inner,
1632 batches: VecDeque::new(),
1633 }
1634 }
1635}
1636
1637impl<S> Stream for SplitRecordBatchStream<S>
1638where
1639 S: Stream<Item = Result<RecordBatch>> + Unpin,
1640{
1641 type Item = Result<RecordBatch>;
1642
1643 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1644 loop {
1645 if let Some(batch) = self.batches.pop_front() {
1647 return Poll::Ready(Some(Ok(batch)));
1648 }
1649
1650 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1652 Some(Ok(batch)) => batch,
1653 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1654 None => return Poll::Ready(None),
1655 };
1656
1657 split_record_batch(record_batch, &mut self.batches);
1659 }
1661 }
1662}
1663
1664pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1669 let batch_rows = record_batch.num_rows();
1670 if batch_rows == 0 {
1671 return;
1672 }
1673 if batch_rows < 2 {
1674 batches.push_back(record_batch);
1675 return;
1676 }
1677
1678 let time_index_pos = time_index_column_index(record_batch.num_columns());
1679 let timestamps = record_batch.column(time_index_pos);
1680 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1681 let mut offsets = Vec::with_capacity(16);
1682 offsets.push(0);
1683 let values = ts_values.values();
1684 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1685 if value > values[i + 1] {
1686 offsets.push(i + 1);
1687 }
1688 }
1689 offsets.push(values.len());
1690
1691 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1693 let end = offsets[i + 1];
1694 let rows_in_batch = end - start;
1695 batches.push_back(record_batch.slice(start, rows_in_batch));
1696 }
1697}
1698
1699#[cfg(test)]
1700mod tests {
1701 use std::sync::Arc;
1702 use std::time::Instant;
1703
1704 use common_time::Timestamp;
1705 use smallvec::{SmallVec, smallvec};
1706 use store_api::storage::RegionId;
1707
1708 use super::*;
1709 use crate::cache::CacheStrategy;
1710 use crate::memtable::{
1711 BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1712 MemtableRangeContext, MemtableStats,
1713 };
1714 use crate::read::flat_projection::FlatProjectionMapper;
1715 use crate::read::range::{MemRangeBuilder, SourceIndex};
1716 use crate::read::scan_region::ScanInput;
1717 use crate::sst::file::{FileHandle, FileMeta};
1718 use crate::sst::file_purger::NoopFilePurger;
1719 use crate::test_util::memtable_util::metadata_for_test;
1720 use crate::test_util::scheduler_util::SchedulerEnv;
1721
1722 struct EmptyIterBuilder;
1723
1724 impl IterBuilder for EmptyIterBuilder {
1725 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1726 Ok(Box::new(std::iter::empty()))
1727 }
1728
1729 fn is_record_batch(&self) -> bool {
1730 true
1731 }
1732
1733 fn build_record_batch(
1734 &self,
1735 _time_range: Option<(Timestamp, Timestamp)>,
1736 _metrics: Option<MemScanMetrics>,
1737 ) -> Result<BoxedRecordBatchIterator> {
1738 Ok(Box::new(std::iter::empty()))
1739 }
1740 }
1741
1742 async fn new_test_stream_ctx(
1743 files: Vec<FileHandle>,
1744 memtables: Vec<MemRangeBuilder>,
1745 ) -> Arc<StreamContext> {
1746 let env = SchedulerEnv::new().await;
1747 let metadata = metadata_for_test();
1748 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1749 let input = ScanInput::new(env.access_layer.clone(), mapper)
1750 .with_cache(CacheStrategy::Disabled)
1751 .with_memtables(memtables)
1752 .with_files(files);
1753
1754 Arc::new(StreamContext {
1755 input,
1756 ranges: Vec::new(),
1757 scan_fingerprint: None,
1758 query_start: Instant::now(),
1759 })
1760 }
1761
1762 fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1763 let meta = FileMeta {
1764 region_id: RegionId::new(123, 456),
1765 file_id: Default::default(),
1766 level: 1,
1767 time_range: (
1768 Timestamp::new_millisecond(0),
1769 Timestamp::new_millisecond(1000),
1770 ),
1771 num_rows,
1772 num_series,
1773 ..Default::default()
1774 };
1775 FileHandle::new(meta, Arc::new(NoopFilePurger))
1776 }
1777
1778 fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1779 let context = Arc::new(MemtableRangeContext::new(
1780 0,
1781 Box::new(EmptyIterBuilder),
1782 Default::default(),
1783 ));
1784 let stats = MemtableStats {
1785 time_range: Some((
1786 Timestamp::new_millisecond(0),
1787 Timestamp::new_millisecond(1000),
1788 )),
1789 num_rows,
1790 num_ranges: 1,
1791 series_count,
1792 ..Default::default()
1793 };
1794 let range = MemtableRange::new(context, stats.clone());
1795 MemRangeBuilder::new(range, stats)
1796 }
1797
1798 fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1799 let indices = row_group_indices
1800 .iter()
1801 .map(|row_group_index| SourceIndex {
1802 index: row_group_index.index,
1803 num_row_groups: 1,
1804 })
1805 .collect();
1806
1807 RangeMeta {
1808 time_range: (
1809 Timestamp::new_millisecond(0),
1810 Timestamp::new_millisecond(1000),
1811 ),
1812 indices,
1813 row_group_indices,
1814 num_rows: 0,
1815 }
1816 }
1817
1818 #[tokio::test]
1819 async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1820 let num_rows = SPLIT_ROW_THRESHOLD * 2;
1821 let num_series = (num_rows / 100).max(1);
1822 let stream_ctx =
1823 new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1824 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1825 index: 0,
1826 row_group_index: 0,
1827 }]);
1828
1829 assert_eq!(
1830 Some((num_rows / num_series) as usize),
1831 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1832 );
1833 }
1834
1835 #[tokio::test]
1836 async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1837 let stream_ctx = new_test_stream_ctx(
1838 vec![
1839 new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1840 new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1841 ],
1842 vec![],
1843 )
1844 .await;
1845 let range_meta = new_test_range_meta(smallvec![
1846 RowGroupIndex {
1847 index: 0,
1848 row_group_index: 0,
1849 },
1850 RowGroupIndex {
1851 index: 1,
1852 row_group_index: 0,
1853 }
1854 ]);
1855
1856 assert_eq!(
1857 None,
1858 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1859 );
1860 }
1861
1862 #[tokio::test]
1863 async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1864 let num_series =
1865 (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1866 let stream_ctx =
1867 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1868 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1869 index: 0,
1870 row_group_index: 0,
1871 }]);
1872
1873 assert_eq!(
1874 None,
1875 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1876 );
1877 }
1878
1879 #[tokio::test]
1880 async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1881 let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1882 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1883 index: 0,
1884 row_group_index: 0,
1885 }]);
1886
1887 assert_eq!(
1888 Some(50),
1889 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1890 );
1891 }
1892
1893 #[tokio::test]
1894 async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1895 let stream_ctx =
1896 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1897 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1898 index: 0,
1899 row_group_index: 0,
1900 }]);
1901
1902 assert_eq!(
1903 Some(DEFAULT_READ_BATCH_SIZE),
1904 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1905 );
1906 }
1907
1908 #[test]
1909 fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1910 assert_eq!(64, compute_parallel_channel_size(0));
1911 assert_eq!(64, compute_parallel_channel_size(1));
1912 }
1913
1914 #[test]
1915 fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1916 assert_eq!(
1917 4,
1918 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1919 );
1920 }
1921
1922 #[test]
1923 fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1924 assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1925 assert_eq!(
1926 2,
1927 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1928 );
1929 }
1930
1931 #[test]
1932 fn test_compute_average_batch_size_uses_arithmetic_mean() {
1933 assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1934 }
1935
1936 #[test]
1937 fn test_compute_average_batch_size_clamps_values() {
1938 assert_eq!(
1939 DEFAULT_READ_BATCH_SIZE,
1940 compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1941 );
1942 assert_eq!(1, compute_average_batch_size([0, 1]));
1943 }
1944
1945 #[test]
1946 fn test_compute_average_batch_size_falls_back_when_empty() {
1947 assert_eq!(
1948 DEFAULT_READ_BATCH_SIZE,
1949 compute_average_batch_size(std::iter::empty())
1950 );
1951 }
1952}