1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use store_api::storage::RegionId;
33
34use crate::error::Result;
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::pruner::PartitionPruner;
43use crate::read::range::{RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
55
56#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59 pub num_ranges: usize,
61 pub num_rows: usize,
63 pub build_part_cost: Duration,
65 pub build_reader_cost: Duration,
67 pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75 if self.num_ranges > 0 {
76 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77 }
78 if self.num_rows > 0 {
79 write!(f, ", \"num_rows\":{}", self.num_rows)?;
80 }
81 if !self.build_reader_cost.is_zero() {
82 write!(
83 f,
84 ", \"build_reader_cost\":\"{:?}\"",
85 self.build_reader_cost
86 )?;
87 }
88 if !self.scan_cost.is_zero() {
89 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90 }
91
92 write!(f, "}}")
93 }
94}
95
96impl FileScanMetrics {
97 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99 self.num_ranges += other.num_ranges;
100 self.num_rows += other.num_rows;
101 self.build_part_cost += other.build_part_cost;
102 self.build_reader_cost += other.build_reader_cost;
103 self.scan_cost += other.scan_cost;
104 }
105}
106
107#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110 prepare_scan_cost: Duration,
112 build_reader_cost: Duration,
114 scan_cost: Duration,
116 yield_cost: Duration,
118 convert_cost: Option<Time>,
120 total_cost: Duration,
122 num_rows: usize,
124 num_batches: usize,
126 num_mem_ranges: usize,
128 num_file_ranges: usize,
130
131 mem_scan_cost: Duration,
134 mem_rows: usize,
136 mem_batches: usize,
138 mem_series: usize,
140 mem_prefilter_cost: Duration,
142 mem_prefilter_rows_filtered: usize,
144
145 build_parts_cost: Duration,
148 sst_scan_cost: Duration,
150 rg_total: usize,
152 rg_fulltext_filtered: usize,
154 rg_inverted_filtered: usize,
156 rg_minmax_filtered: usize,
158 rg_bloom_filtered: usize,
160 rg_vector_filtered: usize,
162 rows_before_filter: usize,
164 rows_fulltext_filtered: usize,
166 rows_inverted_filtered: usize,
168 rows_bloom_filtered: usize,
170 rows_vector_filtered: usize,
172 rows_vector_selected: usize,
174 rows_precise_filtered: usize,
176 fulltext_index_cache_hit: usize,
178 fulltext_index_cache_miss: usize,
180 inverted_index_cache_hit: usize,
182 inverted_index_cache_miss: usize,
184 bloom_filter_cache_hit: usize,
186 bloom_filter_cache_miss: usize,
188 minmax_cache_hit: usize,
190 minmax_cache_miss: usize,
192 pruner_cache_hit: usize,
194 pruner_cache_miss: usize,
196 pruner_prune_cost: Duration,
198 files_time_range_pruned: usize,
200 num_sst_record_batches: usize,
202 num_sst_batches: usize,
204 num_sst_rows: usize,
206
207 first_poll: Duration,
209
210 num_series_send_timeout: usize,
212 num_series_send_full: usize,
214 num_distributor_rows: usize,
216 num_distributor_batches: usize,
218 distributor_scan_cost: Duration,
220 distributor_yield_cost: Duration,
222 distributor_divider_cost: Duration,
224
225 merge_metrics: MergeMetrics,
227 dedup_metrics: DedupMetrics,
229
230 stream_eof: bool,
232
233 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
236 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
238 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
240 fetch_metrics: Option<ParquetFetchMetrics>,
242 metadata_cache_metrics: Option<MetadataCacheMetrics>,
244 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
246
247 build_ranges_mem_size: isize,
249 build_ranges_peak_mem_size: isize,
251 num_range_builders: isize,
253 num_peak_range_builders: isize,
255 range_cache_size: usize,
257 range_cache_hit: usize,
259 range_cache_miss: usize,
261}
262
263struct CompareCostReverse<'a> {
266 total_cost: Duration,
267 file_id: RegionFileId,
268 metrics: &'a FileScanMetrics,
269}
270
271impl Ord for CompareCostReverse<'_> {
272 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
273 other.total_cost.cmp(&self.total_cost)
275 }
276}
277
278impl PartialOrd for CompareCostReverse<'_> {
279 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
280 Some(self.cmp(other))
281 }
282}
283
284impl Eq for CompareCostReverse<'_> {}
285
286impl PartialEq for CompareCostReverse<'_> {
287 fn eq(&self, other: &Self) -> bool {
288 self.total_cost == other.total_cost
289 }
290}
291
292impl fmt::Debug for ScanMetricsSet {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 let ScanMetricsSet {
295 prepare_scan_cost,
296 build_reader_cost,
297 scan_cost,
298 yield_cost,
299 convert_cost,
300 total_cost,
301 num_rows,
302 num_batches,
303 num_mem_ranges,
304 num_file_ranges,
305 build_parts_cost,
306 sst_scan_cost,
307 rg_total,
308 rg_fulltext_filtered,
309 rg_inverted_filtered,
310 rg_minmax_filtered,
311 rg_bloom_filtered,
312 rg_vector_filtered,
313 rows_before_filter,
314 rows_fulltext_filtered,
315 rows_inverted_filtered,
316 rows_bloom_filtered,
317 rows_vector_filtered,
318 rows_vector_selected,
319 rows_precise_filtered,
320 fulltext_index_cache_hit,
321 fulltext_index_cache_miss,
322 inverted_index_cache_hit,
323 inverted_index_cache_miss,
324 bloom_filter_cache_hit,
325 bloom_filter_cache_miss,
326 minmax_cache_hit,
327 minmax_cache_miss,
328 pruner_cache_hit,
329 pruner_cache_miss,
330 pruner_prune_cost,
331 files_time_range_pruned,
332 num_sst_record_batches,
333 num_sst_batches,
334 num_sst_rows,
335 first_poll,
336 num_series_send_timeout,
337 num_series_send_full,
338 num_distributor_rows,
339 num_distributor_batches,
340 distributor_scan_cost,
341 distributor_yield_cost,
342 distributor_divider_cost,
343 merge_metrics,
344 dedup_metrics,
345 stream_eof,
346 mem_scan_cost,
347 mem_rows,
348 mem_batches,
349 mem_series,
350 mem_prefilter_cost,
351 mem_prefilter_rows_filtered,
352 inverted_index_apply_metrics,
353 bloom_filter_apply_metrics,
354 fulltext_index_apply_metrics,
355 fetch_metrics,
356 metadata_cache_metrics,
357 per_file_metrics,
358 build_ranges_mem_size: _,
359 build_ranges_peak_mem_size,
360 num_range_builders: _,
361 num_peak_range_builders,
362 range_cache_size,
363 range_cache_hit,
364 range_cache_miss,
365 } = self;
366
367 write!(
369 f,
370 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
371 \"build_reader_cost\":\"{build_reader_cost:?}\", \
372 \"scan_cost\":\"{scan_cost:?}\", \
373 \"yield_cost\":\"{yield_cost:?}\", \
374 \"total_cost\":\"{total_cost:?}\", \
375 \"num_rows\":{num_rows}, \
376 \"num_batches\":{num_batches}, \
377 \"num_mem_ranges\":{num_mem_ranges}, \
378 \"num_file_ranges\":{num_file_ranges}, \
379 \"build_parts_cost\":\"{build_parts_cost:?}\", \
380 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
381 \"rg_total\":{rg_total}, \
382 \"rows_before_filter\":{rows_before_filter}, \
383 \"num_sst_record_batches\":{num_sst_record_batches}, \
384 \"num_sst_batches\":{num_sst_batches}, \
385 \"num_sst_rows\":{num_sst_rows}, \
386 \"first_poll\":\"{first_poll:?}\""
387 )?;
388
389 if let Some(time) = convert_cost {
391 let duration = Duration::from_nanos(time.value() as u64);
392 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
393 }
394
395 if *files_time_range_pruned > 0 {
397 write!(f, ", \"files_time_range_pruned\":{files_time_range_pruned}")?;
398 }
399 if *rg_fulltext_filtered > 0 {
400 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
401 }
402 if *rg_inverted_filtered > 0 {
403 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
404 }
405 if *rg_minmax_filtered > 0 {
406 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
407 }
408 if *rg_bloom_filtered > 0 {
409 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
410 }
411 if *rg_vector_filtered > 0 {
412 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
413 }
414 if *rows_fulltext_filtered > 0 {
415 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
416 }
417 if *rows_inverted_filtered > 0 {
418 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
419 }
420 if *rows_bloom_filtered > 0 {
421 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
422 }
423 if *rows_vector_filtered > 0 {
424 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
425 }
426 if *rows_vector_selected > 0 {
427 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
428 }
429 if *rows_precise_filtered > 0 {
430 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
431 }
432 if *fulltext_index_cache_hit > 0 {
433 write!(
434 f,
435 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
436 )?;
437 }
438 if *fulltext_index_cache_miss > 0 {
439 write!(
440 f,
441 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
442 )?;
443 }
444 if *inverted_index_cache_hit > 0 {
445 write!(
446 f,
447 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
448 )?;
449 }
450 if *inverted_index_cache_miss > 0 {
451 write!(
452 f,
453 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
454 )?;
455 }
456 if *bloom_filter_cache_hit > 0 {
457 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
458 }
459 if *bloom_filter_cache_miss > 0 {
460 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
461 }
462 if *minmax_cache_hit > 0 {
463 write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
464 }
465 if *minmax_cache_miss > 0 {
466 write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
467 }
468 if *pruner_cache_hit > 0 {
469 write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
470 }
471 if *pruner_cache_miss > 0 {
472 write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
473 }
474 if !pruner_prune_cost.is_zero() {
475 write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
476 }
477
478 if *num_series_send_timeout > 0 {
480 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
481 }
482 if *num_series_send_full > 0 {
483 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
484 }
485 if *num_distributor_rows > 0 {
486 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
487 }
488 if *num_distributor_batches > 0 {
489 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
490 }
491 if !distributor_scan_cost.is_zero() {
492 write!(
493 f,
494 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
495 )?;
496 }
497 if !distributor_yield_cost.is_zero() {
498 write!(
499 f,
500 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
501 )?;
502 }
503 if !distributor_divider_cost.is_zero() {
504 write!(
505 f,
506 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
507 )?;
508 }
509
510 if *mem_rows > 0 {
512 write!(f, ", \"mem_rows\":{mem_rows}")?;
513 }
514 if *mem_batches > 0 {
515 write!(f, ", \"mem_batches\":{mem_batches}")?;
516 }
517 if *mem_series > 0 {
518 write!(f, ", \"mem_series\":{mem_series}")?;
519 }
520 if !mem_scan_cost.is_zero() {
521 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
522 }
523 if !mem_prefilter_cost.is_zero() {
524 write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
525 }
526 if *mem_prefilter_rows_filtered > 0 {
527 write!(
528 f,
529 ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
530 )?;
531 }
532
533 if let Some(metrics) = inverted_index_apply_metrics
535 && !metrics.is_empty()
536 {
537 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
538 }
539 if let Some(metrics) = bloom_filter_apply_metrics
540 && !metrics.is_empty()
541 {
542 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
543 }
544 if let Some(metrics) = fulltext_index_apply_metrics
545 && !metrics.is_empty()
546 {
547 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
548 }
549 if let Some(metrics) = fetch_metrics
550 && !metrics.is_empty()
551 {
552 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
553 }
554 if let Some(metrics) = metadata_cache_metrics
555 && !metrics.is_empty()
556 {
557 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
558 }
559
560 if !merge_metrics.scan_cost.is_zero() {
562 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
563 }
564
565 if !dedup_metrics.dedup_cost.is_zero() {
567 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
568 }
569
570 if let Some(file_metrics) = per_file_metrics
572 && !file_metrics.is_empty()
573 {
574 let mut heap = BinaryHeap::new();
576 for (file_id, metrics) in file_metrics.iter() {
577 let total_cost =
578 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
579
580 if total_cost.is_zero() && metrics.num_ranges == 0 {
583 continue;
584 }
585
586 if heap.len() < 10 {
587 heap.push(CompareCostReverse {
589 total_cost,
590 file_id: *file_id,
591 metrics,
592 });
593 } else if let Some(min_entry) = heap.peek() {
594 if total_cost > min_entry.total_cost {
596 heap.pop();
597 heap.push(CompareCostReverse {
598 total_cost,
599 file_id: *file_id,
600 metrics,
601 });
602 }
603 }
604 }
605
606 let top_files = heap.into_sorted_vec();
607 write!(f, ", \"top_file_metrics\": {{")?;
608 for (i, item) in top_files.iter().enumerate() {
609 let CompareCostReverse {
610 total_cost: _,
611 file_id,
612 metrics,
613 } = item;
614 if i > 0 {
615 write!(f, ", ")?;
616 }
617 write!(f, "\"{}\": {:?}", file_id, metrics)?;
618 }
619 write!(f, "}}")?;
620 }
621
622 if *range_cache_size > 0 {
623 write!(f, ", \"range_cache_size\":{range_cache_size}")?;
624 }
625 if *range_cache_hit > 0 {
626 write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
627 }
628 if *range_cache_miss > 0 {
629 write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
630 }
631
632 write!(
633 f,
634 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
635 \"num_peak_range_builders\":{num_peak_range_builders}, \
636 \"stream_eof\":{stream_eof}}}"
637 )
638 }
639}
640impl ScanMetricsSet {
641 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
643 self.prepare_scan_cost += cost;
644 self
645 }
646
647 fn with_convert_cost(mut self, time: Time) -> Self {
649 self.convert_cost = Some(time);
650 self
651 }
652
653 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
655 let ScannerMetrics {
656 scan_cost,
657 yield_cost,
658 num_batches,
659 num_rows,
660 } = other;
661
662 self.scan_cost += *scan_cost;
663 self.yield_cost += *yield_cost;
664 self.num_rows += *num_rows;
665 self.num_batches += *num_batches;
666 }
667
668 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
670 let ReaderMetrics {
671 build_cost,
672 filter_metrics:
673 ReaderFilterMetrics {
674 rg_total,
675 rg_fulltext_filtered,
676 rg_inverted_filtered,
677 rg_minmax_filtered,
678 rg_bloom_filtered,
679 rg_vector_filtered,
680 rows_total,
681 rows_fulltext_filtered,
682 rows_inverted_filtered,
683 rows_bloom_filtered,
684 rows_vector_filtered,
685 rows_vector_selected,
686 rows_precise_filtered,
687 fulltext_index_cache_hit,
688 fulltext_index_cache_miss,
689 inverted_index_cache_hit,
690 inverted_index_cache_miss,
691 bloom_filter_cache_hit,
692 bloom_filter_cache_miss,
693 minmax_cache_hit,
694 minmax_cache_miss,
695 pruner_cache_hit,
696 pruner_cache_miss,
697 pruner_prune_cost,
698 files_time_range_pruned,
699 inverted_index_apply_metrics,
700 bloom_filter_apply_metrics,
701 fulltext_index_apply_metrics,
702 },
703 num_record_batches,
704 num_batches,
705 num_rows,
706 scan_cost,
707 metadata_cache_metrics,
708 fetch_metrics,
709 metadata_mem_size,
710 num_range_builders,
711 } = other;
712
713 self.build_parts_cost += *build_cost;
714 self.sst_scan_cost += *scan_cost;
715
716 self.files_time_range_pruned += *files_time_range_pruned;
717
718 self.rg_total += *rg_total;
719 self.rg_fulltext_filtered += *rg_fulltext_filtered;
720 self.rg_inverted_filtered += *rg_inverted_filtered;
721 self.rg_minmax_filtered += *rg_minmax_filtered;
722 self.rg_bloom_filtered += *rg_bloom_filtered;
723 self.rg_vector_filtered += *rg_vector_filtered;
724
725 self.rows_before_filter += *rows_total;
726 self.rows_fulltext_filtered += *rows_fulltext_filtered;
727 self.rows_inverted_filtered += *rows_inverted_filtered;
728 self.rows_bloom_filtered += *rows_bloom_filtered;
729 self.rows_vector_filtered += *rows_vector_filtered;
730 self.rows_vector_selected += *rows_vector_selected;
731 self.rows_precise_filtered += *rows_precise_filtered;
732
733 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
734 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
735 self.inverted_index_cache_hit += *inverted_index_cache_hit;
736 self.inverted_index_cache_miss += *inverted_index_cache_miss;
737 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
738 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
739 self.minmax_cache_hit += *minmax_cache_hit;
740 self.minmax_cache_miss += *minmax_cache_miss;
741 self.pruner_cache_hit += *pruner_cache_hit;
742 self.pruner_cache_miss += *pruner_cache_miss;
743 self.pruner_prune_cost += *pruner_prune_cost;
744
745 self.num_sst_record_batches += *num_record_batches;
746 self.num_sst_batches += *num_batches;
747 self.num_sst_rows += *num_rows;
748
749 if let Some(metrics) = inverted_index_apply_metrics {
751 self.inverted_index_apply_metrics
752 .get_or_insert_with(InvertedIndexApplyMetrics::default)
753 .merge_from(metrics);
754 }
755 if let Some(metrics) = bloom_filter_apply_metrics {
756 self.bloom_filter_apply_metrics
757 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
758 .merge_from(metrics);
759 }
760 if let Some(metrics) = fulltext_index_apply_metrics {
761 self.fulltext_index_apply_metrics
762 .get_or_insert_with(FulltextIndexApplyMetrics::default)
763 .merge_from(metrics);
764 }
765 if let Some(metrics) = fetch_metrics {
766 self.fetch_metrics
767 .get_or_insert_with(ParquetFetchMetrics::default)
768 .merge_from(metrics);
769 }
770 self.metadata_cache_metrics
771 .get_or_insert_with(MetadataCacheMetrics::default)
772 .merge_from(metadata_cache_metrics);
773
774 self.build_ranges_mem_size += *metadata_mem_size;
776 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
777 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
778 }
779
780 self.num_range_builders += *num_range_builders;
782 if self.num_range_builders > self.num_peak_range_builders {
783 self.num_peak_range_builders = self.num_range_builders;
784 }
785 }
786
787 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
789 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
790 for (file_id, metrics) in other {
791 self_file_metrics
792 .entry(*file_id)
793 .or_default()
794 .merge_from(metrics);
795 }
796 }
797
798 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
800 let SeriesDistributorMetrics {
801 num_series_send_timeout,
802 num_series_send_full,
803 num_rows,
804 num_batches,
805 scan_cost,
806 yield_cost,
807 divider_cost,
808 } = distributor_metrics;
809
810 self.num_series_send_timeout += *num_series_send_timeout;
811 self.num_series_send_full += *num_series_send_full;
812 self.num_distributor_rows += *num_rows;
813 self.num_distributor_batches += *num_batches;
814 self.distributor_scan_cost += *scan_cost;
815 self.distributor_yield_cost += *yield_cost;
816 self.distributor_divider_cost += *divider_cost;
817 }
818
819 fn observe_metrics(&self) {
821 READ_STAGE_ELAPSED
822 .with_label_values(&["prepare_scan"])
823 .observe(self.prepare_scan_cost.as_secs_f64());
824 READ_STAGE_ELAPSED
825 .with_label_values(&["build_reader"])
826 .observe(self.build_reader_cost.as_secs_f64());
827 READ_STAGE_ELAPSED
828 .with_label_values(&["scan"])
829 .observe(self.scan_cost.as_secs_f64());
830 READ_STAGE_ELAPSED
831 .with_label_values(&["yield"])
832 .observe(self.yield_cost.as_secs_f64());
833 if let Some(time) = &self.convert_cost {
834 READ_STAGE_ELAPSED
835 .with_label_values(&["convert"])
836 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
837 }
838 READ_STAGE_ELAPSED
839 .with_label_values(&["total"])
840 .observe(self.total_cost.as_secs_f64());
841 READ_ROWS_RETURN.observe(self.num_rows as f64);
842 READ_BATCHES_RETURN.observe(self.num_batches as f64);
843
844 READ_STAGE_ELAPSED
845 .with_label_values(&["build_parts"])
846 .observe(self.build_parts_cost.as_secs_f64());
847
848 READ_ROW_GROUPS_TOTAL
849 .with_label_values(&["before_filtering"])
850 .inc_by(self.rg_total as u64);
851 READ_ROW_GROUPS_TOTAL
852 .with_label_values(&["fulltext_index_filtered"])
853 .inc_by(self.rg_fulltext_filtered as u64);
854 READ_ROW_GROUPS_TOTAL
855 .with_label_values(&["inverted_index_filtered"])
856 .inc_by(self.rg_inverted_filtered as u64);
857 READ_ROW_GROUPS_TOTAL
858 .with_label_values(&["minmax_index_filtered"])
859 .inc_by(self.rg_minmax_filtered as u64);
860 READ_ROW_GROUPS_TOTAL
861 .with_label_values(&["bloom_filter_index_filtered"])
862 .inc_by(self.rg_bloom_filtered as u64);
863 #[cfg(feature = "vector_index")]
864 READ_ROW_GROUPS_TOTAL
865 .with_label_values(&["vector_index_filtered"])
866 .inc_by(self.rg_vector_filtered as u64);
867
868 PRECISE_FILTER_ROWS_TOTAL
869 .with_label_values(&["parquet"])
870 .inc_by(self.rows_precise_filtered as u64);
871 READ_ROWS_IN_ROW_GROUP_TOTAL
872 .with_label_values(&["before_filtering"])
873 .inc_by(self.rows_before_filter as u64);
874 READ_ROWS_IN_ROW_GROUP_TOTAL
875 .with_label_values(&["fulltext_index_filtered"])
876 .inc_by(self.rows_fulltext_filtered as u64);
877 READ_ROWS_IN_ROW_GROUP_TOTAL
878 .with_label_values(&["inverted_index_filtered"])
879 .inc_by(self.rows_inverted_filtered as u64);
880 READ_ROWS_IN_ROW_GROUP_TOTAL
881 .with_label_values(&["bloom_filter_index_filtered"])
882 .inc_by(self.rows_bloom_filtered as u64);
883 #[cfg(feature = "vector_index")]
884 READ_ROWS_IN_ROW_GROUP_TOTAL
885 .with_label_values(&["vector_index_filtered"])
886 .inc_by(self.rows_vector_filtered as u64);
887 }
888}
889
890struct PartitionMetricsInner {
891 region_id: RegionId,
892 partition: usize,
894 scanner_type: &'static str,
896 query_start: Instant,
898 explain_verbose: bool,
900 metrics: Mutex<ScanMetricsSet>,
902 in_progress_scan: IntGauge,
903
904 build_parts_cost: Time,
907 build_reader_cost: Time,
909 scan_cost: Time,
911 yield_cost: Time,
913 convert_cost: Time,
915 elapsed_compute: Time,
917}
918
919impl PartitionMetricsInner {
920 fn on_finish(&self, stream_eof: bool) {
921 let mut metrics = self.metrics.lock().unwrap();
922 if metrics.total_cost.is_zero() {
923 metrics.total_cost = self.query_start.elapsed();
924 }
925 if !metrics.stream_eof {
926 metrics.stream_eof = stream_eof;
927 }
928 }
929}
930
931impl MergeMetricsReport for PartitionMetricsInner {
932 fn report(&self, metrics: &mut MergeMetrics) {
933 let mut scan_metrics = self.metrics.lock().unwrap();
934 scan_metrics.merge_metrics.merge(metrics);
936
937 *metrics = MergeMetrics::default();
939 }
940}
941
942impl DedupMetricsReport for PartitionMetricsInner {
943 fn report(&self, metrics: &mut DedupMetrics) {
944 let mut scan_metrics = self.metrics.lock().unwrap();
945 scan_metrics.dedup_metrics.merge(metrics);
947
948 *metrics = DedupMetrics::default();
950 }
951}
952
953impl Drop for PartitionMetricsInner {
954 fn drop(&mut self) {
955 self.on_finish(false);
956 let metrics = self.metrics.lock().unwrap();
957 metrics.observe_metrics();
958 self.in_progress_scan.dec();
959
960 if self.explain_verbose {
961 common_telemetry::info!(
962 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
963 self.scanner_type,
964 self.region_id,
965 self.partition,
966 metrics,
967 );
968 } else {
969 common_telemetry::debug!(
970 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
971 self.scanner_type,
972 self.region_id,
973 self.partition,
974 metrics,
975 );
976 }
977 }
978}
979
980#[derive(Default)]
982pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
983
984impl PartitionMetricsList {
985 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
987 let mut list = self.0.lock().unwrap();
988 if list.len() <= partition {
989 list.resize(partition + 1, None);
990 }
991 list[partition] = Some(metrics);
992 }
993
994 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
996 let list = self.0.lock().unwrap();
997 write!(f, ", \"metrics_per_partition\": ")?;
998 f.debug_list()
999 .entries(list.iter().filter_map(|p| p.as_ref()))
1000 .finish()?;
1001 write!(f, "}}")
1002 }
1003}
1004
1005#[derive(Clone)]
1007pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
1008
1009impl PartitionMetrics {
1010 pub(crate) fn new(
1011 region_id: RegionId,
1012 partition: usize,
1013 scanner_type: &'static str,
1014 query_start: Instant,
1015 explain_verbose: bool,
1016 metrics_set: &ExecutionPlanMetricsSet,
1017 ) -> Self {
1018 let partition_str = partition.to_string();
1019 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1020 in_progress_scan.inc();
1021 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1022 let metrics = ScanMetricsSet::default()
1023 .with_prepare_scan_cost(query_start.elapsed())
1024 .with_convert_cost(convert_cost.clone());
1025 let inner = PartitionMetricsInner {
1026 region_id,
1027 partition,
1028 scanner_type,
1029 query_start,
1030 explain_verbose,
1031 metrics: Mutex::new(metrics),
1032 in_progress_scan,
1033 build_parts_cost: MetricBuilder::new(metrics_set)
1034 .subset_time("build_parts_cost", partition),
1035 build_reader_cost: MetricBuilder::new(metrics_set)
1036 .subset_time("build_reader_cost", partition),
1037 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1038 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1039 convert_cost,
1040 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1041 };
1042 Self(Arc::new(inner))
1043 }
1044
1045 pub(crate) fn on_first_poll(&self) {
1046 let mut metrics = self.0.metrics.lock().unwrap();
1047 metrics.first_poll = self.0.query_start.elapsed();
1048 }
1049
1050 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1051 let mut metrics = self.0.metrics.lock().unwrap();
1052 metrics.num_mem_ranges += num;
1053 }
1054
1055 pub fn inc_num_file_ranges(&self, num: usize) {
1056 let mut metrics = self.0.metrics.lock().unwrap();
1057 metrics.num_file_ranges += num;
1058 }
1059
1060 fn record_elapsed_compute(&self, duration: Duration) {
1061 if duration.is_zero() {
1062 return;
1063 }
1064 self.0.elapsed_compute.add_duration(duration);
1065 }
1066
1067 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1069 self.0.build_reader_cost.add_duration(cost);
1070
1071 let mut metrics = self.0.metrics.lock().unwrap();
1072 metrics.build_reader_cost += cost;
1073 }
1074
1075 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1076 self.0.convert_cost.add_duration(cost);
1077 self.record_elapsed_compute(cost);
1078 }
1079
1080 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1082 let mut metrics = self.0.metrics.lock().unwrap();
1083 metrics.mem_scan_cost += data.scan_cost;
1084 metrics.mem_rows += data.num_rows;
1085 metrics.mem_batches += data.num_batches;
1086 metrics.mem_series += data.total_series;
1087 metrics.mem_prefilter_cost += data.prefilter_cost;
1088 metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1089 }
1090
1091 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1093 self.0.scan_cost.add_duration(metrics.scan_cost);
1094 self.record_elapsed_compute(metrics.scan_cost);
1095 self.0.yield_cost.add_duration(metrics.yield_cost);
1096 self.record_elapsed_compute(metrics.yield_cost);
1097
1098 let mut metrics_set = self.0.metrics.lock().unwrap();
1099 metrics_set.merge_scanner_metrics(metrics);
1100 }
1101
1102 pub fn merge_reader_metrics(
1104 &self,
1105 metrics: &ReaderMetrics,
1106 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1107 ) {
1108 self.0.build_parts_cost.add_duration(metrics.build_cost);
1109
1110 let mut metrics_set = self.0.metrics.lock().unwrap();
1111 metrics_set.merge_reader_metrics(metrics);
1112
1113 if let Some(file_metrics) = per_file_metrics {
1115 metrics_set.merge_per_file_metrics(file_metrics);
1116 }
1117 }
1118
1119 pub(crate) fn on_finish(&self) {
1121 self.0.on_finish(true);
1122 }
1123
1124 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1126 let mut metrics_set = self.0.metrics.lock().unwrap();
1127 metrics_set.set_distributor_metrics(metrics);
1128 }
1129
1130 pub(crate) fn explain_verbose(&self) -> bool {
1132 self.0.explain_verbose
1133 }
1134
1135 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1137 self.0.clone()
1138 }
1139
1140 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1142 self.0.clone()
1143 }
1144
1145 #[allow(dead_code)]
1147 pub(crate) fn inc_range_cache_size(&self, size: usize) {
1148 let mut metrics = self.0.metrics.lock().unwrap();
1149 metrics.range_cache_size += size;
1150 }
1151
1152 #[allow(dead_code)]
1154 pub(crate) fn inc_range_cache_hit(&self) {
1155 let mut metrics = self.0.metrics.lock().unwrap();
1156 metrics.range_cache_hit += 1;
1157 }
1158
1159 #[allow(dead_code)]
1161 pub(crate) fn inc_range_cache_miss(&self) {
1162 let mut metrics = self.0.metrics.lock().unwrap();
1163 metrics.range_cache_miss += 1;
1164 }
1165}
1166
1167impl fmt::Debug for PartitionMetrics {
1168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1169 let metrics = self.0.metrics.lock().unwrap();
1170 write!(
1171 f,
1172 r#"{{"partition":{}, "metrics":{:?}}}"#,
1173 self.0.partition, metrics
1174 )
1175 }
1176}
1177
1178#[derive(Default)]
1180pub(crate) struct SeriesDistributorMetrics {
1181 pub(crate) num_series_send_timeout: usize,
1183 pub(crate) num_series_send_full: usize,
1185 pub(crate) num_rows: usize,
1187 pub(crate) num_batches: usize,
1189 pub(crate) scan_cost: Duration,
1191 pub(crate) yield_cost: Duration,
1193 pub(crate) divider_cost: Duration,
1195}
1196
1197#[tracing::instrument(
1199 skip_all,
1200 fields(
1201 region_id = %stream_ctx.input.region_metadata().region_id,
1202 row_group_index = %index.index,
1203 source = "mem_flat"
1204 )
1205)]
1206pub(crate) fn scan_flat_mem_ranges(
1207 stream_ctx: Arc<StreamContext>,
1208 part_metrics: PartitionMetrics,
1209 index: RowGroupIndex,
1210 time_range: FileTimeRange,
1211) -> impl Stream<Item = Result<RecordBatch>> {
1212 try_stream! {
1213 let ranges = stream_ctx.input.build_mem_ranges(index);
1214 part_metrics.inc_num_mem_ranges(ranges.len());
1215 for range in ranges {
1216 let build_reader_start = Instant::now();
1217 let mem_scan_metrics = Some(MemScanMetrics::default());
1218 let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1219 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1220
1221 while let Some(record_batch) = iter.next().transpose()? {
1222 yield record_batch;
1223 }
1224
1225 if let Some(ref metrics) = mem_scan_metrics {
1227 let data = metrics.data();
1228 part_metrics.report_mem_scan_metrics(&data);
1229 }
1230 }
1231 }
1232}
1233
1234const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1236const NUM_SERIES_THRESHOLD: u64 = 10240;
1238const BATCH_SIZE_THRESHOLD: u64 = 50;
1241
1242pub(crate) fn should_split_flat_batches_for_merge(
1245 stream_ctx: &Arc<StreamContext>,
1246 range_meta: &RangeMeta,
1247) -> Option<usize> {
1248 let mut num_files_to_split = 0;
1250 let mut num_mem_rows = 0;
1251 let mut num_mem_series = 0;
1252 let mut total_rows: u64 = 0;
1254 let mut total_series: u64 = 0;
1255 for index in &range_meta.row_group_indices {
1259 if stream_ctx.is_mem_range_index(*index) {
1260 let memtable = &stream_ctx.input.memtables[index.index];
1261 let stats = memtable.stats();
1263 num_mem_rows += stats.num_rows();
1264 num_mem_series += stats.series_count();
1265 } else if stream_ctx.is_file_range_index(*index) {
1266 let file_index = index.index - stream_ctx.input.num_memtables();
1268 let file = &stream_ctx.input.files[file_index];
1269 let file_meta = file.meta_ref();
1270 if file_meta.level == 0 {
1271 num_files_to_split += 1;
1273 continue;
1274 } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1275 continue;
1277 }
1278 debug_assert!(file_meta.num_rows > 0);
1279 if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1280 common_telemetry::trace!(
1282 "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1283 file_meta.file_id,
1284 file_meta.level,
1285 file_meta.num_rows,
1286 file_meta.num_series,
1287 );
1288 return None;
1289 } else {
1290 num_files_to_split += 1;
1291 total_rows += file.meta_ref().num_rows;
1292 total_series += file.meta_ref().num_series;
1293 }
1294 }
1295 }
1297
1298 let should_split = if num_files_to_split > 0 {
1299 true
1301 } else if num_mem_series > 0
1302 && num_mem_rows > 0
1303 && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1304 {
1305 total_rows += num_mem_rows as u64;
1306 total_series += num_mem_series as u64;
1307 true
1308 } else {
1309 false
1310 };
1311
1312 if !should_split {
1313 return None;
1314 }
1315
1316 let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1318 ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1319 } else {
1320 DEFAULT_READ_BATCH_SIZE / 4
1322 };
1323 Some(estimated_batch_size)
1324}
1325
1326pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1329 let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1330 size.clamp(2, 64)
1331}
1332
1333pub(crate) fn compute_average_batch_size(
1335 estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1336) -> usize {
1337 let mut total = 0usize;
1338 let mut count = 0usize;
1339 for size in estimated_rows_per_batch {
1340 total += size;
1341 count += 1;
1342 }
1343
1344 if count == 0 {
1345 return DEFAULT_READ_BATCH_SIZE;
1346 }
1347
1348 (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1349}
1350
1351fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1352 if num_rows == 0 || num_series == 0 {
1353 return false;
1354 }
1355
1356 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1358}
1359
1360#[cfg(test)]
1361mod split_tests {
1362 use std::sync::Arc;
1363
1364 use common_time::Timestamp;
1365 use smallvec::smallvec;
1366 use store_api::storage::FileId;
1367
1368 use super::*;
1369 use crate::read::flat_projection::FlatProjectionMapper;
1370 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1371 use crate::read::scan_region::{ScanInput, StreamContext};
1372 use crate::sst::file::FileHandle;
1373 use crate::test_util::memtable_util::metadata_with_primary_key;
1374 use crate::test_util::scheduler_util::SchedulerEnv;
1375 use crate::test_util::sst_util::sst_file_handle_with_file_id;
1376
1377 async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1378 let env = SchedulerEnv::new().await;
1379 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1380 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1381 let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1382
1383 StreamContext {
1384 input,
1385 ranges: vec![],
1386 scan_fingerprint: None,
1387 scan_implied_time_range: None,
1388 query_start: std::time::Instant::now(),
1389 }
1390 }
1391
1392 fn single_file_range_meta() -> RangeMeta {
1393 RangeMeta {
1394 time_range: (
1395 Timestamp::new_millisecond(0),
1396 Timestamp::new_millisecond(1000),
1397 ),
1398 indices: smallvec![SourceIndex {
1399 index: 0,
1400 num_row_groups: 1,
1401 }],
1402 row_group_indices: smallvec![RowGroupIndex {
1403 index: 0,
1404 row_group_index: 0,
1405 }],
1406 num_rows: 1024,
1407 }
1408 }
1409
1410 #[tokio::test]
1411 async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1412 let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1413 .meta_ref()
1414 .clone();
1415 file.level = 0;
1416 file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1417 file.num_row_groups = 1;
1418 file.num_series = 0;
1419
1420 let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1421 let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1422
1423 assert!(
1424 should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1425 );
1426 }
1427
1428 #[test]
1429 fn can_split_series_returns_false_for_zero_inputs() {
1430 assert!(!can_split_series(0, 1));
1431 assert!(!can_split_series(1, 0));
1432 assert!(!can_split_series(0, 0));
1433 }
1434}
1435
1436pub(crate) fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1439 if explain_verbose {
1440 ReaderFilterMetrics {
1441 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1442 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1443 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1444 ..Default::default()
1445 }
1446 } else {
1447 ReaderFilterMetrics::default()
1448 }
1449}
1450
1451#[tracing::instrument(
1453 skip_all,
1454 fields(
1455 region_id = %stream_ctx.input.region_metadata().region_id,
1456 row_group_index = %index.index,
1457 source = read_type
1458 )
1459)]
1460pub(crate) async fn scan_flat_file_ranges(
1461 stream_ctx: Arc<StreamContext>,
1462 part_metrics: PartitionMetrics,
1463 index: RowGroupIndex,
1464 read_type: &'static str,
1465 partition_pruner: Arc<PartitionPruner>,
1466) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1467 let mut reader_metrics = ReaderMetrics {
1468 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1469 ..Default::default()
1470 };
1471 let ranges = partition_pruner
1472 .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1473 .await?;
1474 part_metrics.inc_num_file_ranges(ranges.len());
1475 part_metrics.merge_reader_metrics(&reader_metrics, None);
1476
1477 let init_per_file_metrics = if part_metrics.explain_verbose() {
1479 let file = stream_ctx.input.file_from_index(index);
1480 let file_id = file.file_id();
1481
1482 let mut map = HashMap::new();
1483 map.insert(
1484 file_id,
1485 FileScanMetrics {
1486 build_part_cost: reader_metrics.build_cost,
1487 ..Default::default()
1488 },
1489 );
1490 Some(map)
1491 } else {
1492 None
1493 };
1494
1495 Ok(build_flat_file_range_scan_stream(
1496 stream_ctx,
1497 part_metrics,
1498 read_type,
1499 ranges,
1500 init_per_file_metrics,
1501 ))
1502}
1503
1504#[tracing::instrument(
1506 skip_all,
1507 fields(read_type = read_type, range_count = ranges.len())
1508)]
1509pub fn build_flat_file_range_scan_stream(
1510 stream_ctx: Arc<StreamContext>,
1511 part_metrics: PartitionMetrics,
1512 read_type: &'static str,
1513 ranges: SmallVec<[FileRange; 2]>,
1514 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1515) -> impl Stream<Item = Result<RecordBatch>> {
1516 try_stream! {
1517 let fetch_metrics = if part_metrics.explain_verbose() {
1518 Some(Arc::new(ParquetFetchMetrics::default()))
1519 } else {
1520 None
1521 };
1522 let reader_metrics = &mut ReaderMetrics {
1523 fetch_metrics: fetch_metrics.clone(),
1524 ..Default::default()
1525 };
1526 for range in ranges {
1527 let build_reader_start = Instant::now();
1528 let Some(mut reader) = range
1529 .flat_reader(
1530 stream_ctx.input.series_row_selector,
1531 fetch_metrics.as_deref(),
1532 )
1533 .await?
1534 else {
1535 continue;
1536 };
1537 let build_cost = build_reader_start.elapsed();
1538 part_metrics.inc_build_reader_cost(build_cost);
1539
1540 let may_compat = range.compat_batch();
1541
1542 let mapper = range.compaction_projection_mapper();
1543 while let Some(record_batch) = reader.next_batch().await? {
1544 let record_batch = if let Some(mapper) = mapper {
1545 let batch = mapper.project(record_batch)?;
1546 batch
1547 } else {
1548 record_batch
1549 };
1550
1551 if let Some(flat_compat) = may_compat {
1552 let batch = flat_compat.compat(record_batch)?;
1553 yield batch;
1554 } else {
1555 yield record_batch;
1556 }
1557 }
1558
1559 let prune_metrics = reader.metrics();
1560
1561 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1563 let file_id = range.file_handle().file_id();
1564 let file_metrics = file_metrics_map
1565 .entry(file_id)
1566 .or_insert_with(FileScanMetrics::default);
1567
1568 file_metrics.num_ranges += 1;
1569 file_metrics.num_rows += prune_metrics.num_rows;
1570 file_metrics.build_reader_cost += build_cost;
1571 file_metrics.scan_cost += prune_metrics.scan_cost;
1572 }
1573
1574 reader_metrics.merge_from(&prune_metrics);
1575 }
1576
1577 reader_metrics.observe_rows(read_type);
1579 reader_metrics.filter_metrics.observe();
1580 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1581 }
1582}
1583
1584#[cfg(feature = "enterprise")]
1586pub(crate) async fn scan_flat_extension_range(
1587 context: Arc<StreamContext>,
1588 index: RowGroupIndex,
1589 partition_metrics: PartitionMetrics,
1590 options: crate::extension::ExtensionRangeReadOptions,
1591) -> Result<BoxedRecordBatchStream> {
1592 use snafu::ResultExt;
1593
1594 let range = context.input.extension_range(index.index);
1595 let reader = range.flat_reader(context.as_ref(), options);
1596 let stream = reader
1597 .read(context, partition_metrics, index)
1598 .await
1599 .context(crate::error::ScanExternalRangeSnafu)?;
1600 Ok(stream)
1601}
1602
1603pub(crate) async fn maybe_scan_flat_other_ranges(
1604 context: &Arc<StreamContext>,
1605 index: RowGroupIndex,
1606 metrics: &PartitionMetrics,
1607 pre_filter_mode: PreFilterMode,
1608) -> Result<BoxedRecordBatchStream> {
1609 #[cfg(feature = "enterprise")]
1610 {
1611 let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode };
1612 scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await
1613 }
1614
1615 #[cfg(not(feature = "enterprise"))]
1616 {
1617 let _ = context;
1618 let _ = index;
1619 let _ = metrics;
1620 let _ = pre_filter_mode;
1621
1622 crate::error::UnexpectedSnafu {
1623 reason: "no other ranges scannable in flat format",
1624 }
1625 .fail()
1626 }
1627}
1628
1629pub(crate) struct SplitRecordBatchStream<S> {
1631 inner: S,
1633 batches: VecDeque<RecordBatch>,
1635}
1636
1637impl<S> SplitRecordBatchStream<S> {
1638 pub(crate) fn new(inner: S) -> Self {
1640 Self {
1641 inner,
1642 batches: VecDeque::new(),
1643 }
1644 }
1645}
1646
1647impl<S> Stream for SplitRecordBatchStream<S>
1648where
1649 S: Stream<Item = Result<RecordBatch>> + Unpin,
1650{
1651 type Item = Result<RecordBatch>;
1652
1653 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1654 loop {
1655 if let Some(batch) = self.batches.pop_front() {
1657 return Poll::Ready(Some(Ok(batch)));
1658 }
1659
1660 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1662 Some(Ok(batch)) => batch,
1663 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1664 None => return Poll::Ready(None),
1665 };
1666
1667 split_record_batch(record_batch, &mut self.batches);
1669 }
1671 }
1672}
1673
1674pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1679 let batch_rows = record_batch.num_rows();
1680 if batch_rows == 0 {
1681 return;
1682 }
1683 if batch_rows < 2 {
1684 batches.push_back(record_batch);
1685 return;
1686 }
1687
1688 let time_index_pos = time_index_column_index(record_batch.num_columns());
1689 let timestamps = record_batch.column(time_index_pos);
1690 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1691 let mut offsets = Vec::with_capacity(16);
1692 offsets.push(0);
1693 let values = ts_values.values();
1694 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1695 if value >= values[i + 1] {
1696 offsets.push(i + 1);
1697 }
1698 }
1699 offsets.push(values.len());
1700
1701 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1703 let end = offsets[i + 1];
1704 let rows_in_batch = end - start;
1705 batches.push_back(record_batch.slice(start, rows_in_batch));
1706 }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711 use std::sync::Arc;
1712 use std::time::Instant;
1713
1714 use common_time::Timestamp;
1715 use smallvec::{SmallVec, smallvec};
1716 use store_api::storage::RegionId;
1717
1718 use super::*;
1719 use crate::cache::CacheStrategy;
1720 use crate::memtable::{
1721 BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1722 MemtableRangeContext, MemtableStats,
1723 };
1724 use crate::read::flat_projection::FlatProjectionMapper;
1725 use crate::read::range::{MemRangeBuilder, SourceIndex};
1726 use crate::read::scan_region::ScanInput;
1727 use crate::sst::file::{FileHandle, FileMeta};
1728 use crate::sst::file_purger::NoopFilePurger;
1729 use crate::test_util::memtable_util::metadata_for_test;
1730 use crate::test_util::scheduler_util::SchedulerEnv;
1731
1732 struct EmptyIterBuilder;
1733
1734 impl IterBuilder for EmptyIterBuilder {
1735 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1736 Ok(Box::new(std::iter::empty()))
1737 }
1738
1739 fn is_record_batch(&self) -> bool {
1740 true
1741 }
1742
1743 fn build_record_batch(
1744 &self,
1745 _time_range: Option<(Timestamp, Timestamp)>,
1746 _metrics: Option<MemScanMetrics>,
1747 ) -> Result<BoxedRecordBatchIterator> {
1748 Ok(Box::new(std::iter::empty()))
1749 }
1750 }
1751
1752 async fn new_test_stream_ctx(
1753 files: Vec<FileHandle>,
1754 memtables: Vec<MemRangeBuilder>,
1755 ) -> Arc<StreamContext> {
1756 let env = SchedulerEnv::new().await;
1757 let metadata = metadata_for_test();
1758 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1759 let input = ScanInput::new(env.access_layer.clone(), mapper)
1760 .with_cache(CacheStrategy::Disabled)
1761 .with_memtables(memtables)
1762 .with_files(files);
1763
1764 Arc::new(StreamContext {
1765 input,
1766 ranges: Vec::new(),
1767 scan_fingerprint: None,
1768 scan_implied_time_range: None,
1769 query_start: Instant::now(),
1770 })
1771 }
1772
1773 fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1774 let meta = FileMeta {
1775 region_id: RegionId::new(123, 456),
1776 file_id: Default::default(),
1777 level: 1,
1778 time_range: (
1779 Timestamp::new_millisecond(0),
1780 Timestamp::new_millisecond(1000),
1781 ),
1782 num_rows,
1783 num_series,
1784 ..Default::default()
1785 };
1786 FileHandle::new(meta, Arc::new(NoopFilePurger))
1787 }
1788
1789 fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1790 let context = Arc::new(MemtableRangeContext::new(
1791 0,
1792 Box::new(EmptyIterBuilder),
1793 Default::default(),
1794 ));
1795 let stats = MemtableStats {
1796 time_range: Some((
1797 Timestamp::new_millisecond(0),
1798 Timestamp::new_millisecond(1000),
1799 )),
1800 num_rows,
1801 num_ranges: 1,
1802 series_count,
1803 ..Default::default()
1804 };
1805 let range = MemtableRange::new(context, stats.clone());
1806 MemRangeBuilder::new(range, stats)
1807 }
1808
1809 fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1810 let indices = row_group_indices
1811 .iter()
1812 .map(|row_group_index| SourceIndex {
1813 index: row_group_index.index,
1814 num_row_groups: 1,
1815 })
1816 .collect();
1817
1818 RangeMeta {
1819 time_range: (
1820 Timestamp::new_millisecond(0),
1821 Timestamp::new_millisecond(1000),
1822 ),
1823 indices,
1824 row_group_indices,
1825 num_rows: 0,
1826 }
1827 }
1828
1829 #[tokio::test]
1830 async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1831 let num_rows = SPLIT_ROW_THRESHOLD * 2;
1832 let num_series = (num_rows / 100).max(1);
1833 let stream_ctx =
1834 new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1835 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1836 index: 0,
1837 row_group_index: 0,
1838 }]);
1839
1840 assert_eq!(
1841 Some((num_rows / num_series) as usize),
1842 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1843 );
1844 }
1845
1846 #[tokio::test]
1847 async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1848 let stream_ctx = new_test_stream_ctx(
1849 vec![
1850 new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1851 new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1852 ],
1853 vec![],
1854 )
1855 .await;
1856 let range_meta = new_test_range_meta(smallvec![
1857 RowGroupIndex {
1858 index: 0,
1859 row_group_index: 0,
1860 },
1861 RowGroupIndex {
1862 index: 1,
1863 row_group_index: 0,
1864 }
1865 ]);
1866
1867 assert_eq!(
1868 None,
1869 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1870 );
1871 }
1872
1873 #[tokio::test]
1874 async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1875 let num_series =
1876 (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1877 let stream_ctx =
1878 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1879 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1880 index: 0,
1881 row_group_index: 0,
1882 }]);
1883
1884 assert_eq!(
1885 None,
1886 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1887 );
1888 }
1889
1890 #[tokio::test]
1891 async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1892 let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1893 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1894 index: 0,
1895 row_group_index: 0,
1896 }]);
1897
1898 assert_eq!(
1899 Some(50),
1900 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1901 );
1902 }
1903
1904 #[tokio::test]
1905 async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1906 let stream_ctx =
1907 new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1908 let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1909 index: 0,
1910 row_group_index: 0,
1911 }]);
1912
1913 assert_eq!(
1914 Some(DEFAULT_READ_BATCH_SIZE),
1915 should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1916 );
1917 }
1918
1919 #[test]
1920 fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1921 assert_eq!(64, compute_parallel_channel_size(0));
1922 assert_eq!(64, compute_parallel_channel_size(1));
1923 }
1924
1925 #[test]
1926 fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1927 assert_eq!(
1928 4,
1929 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1930 );
1931 }
1932
1933 #[test]
1934 fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1935 assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1936 assert_eq!(
1937 2,
1938 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1939 );
1940 }
1941
1942 #[test]
1943 fn test_compute_average_batch_size_uses_arithmetic_mean() {
1944 assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1945 }
1946
1947 #[test]
1948 fn test_compute_average_batch_size_clamps_values() {
1949 assert_eq!(
1950 DEFAULT_READ_BATCH_SIZE,
1951 compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1952 );
1953 assert_eq!(1, compute_average_batch_size([0, 1]));
1954 }
1955
1956 #[test]
1957 fn test_compute_average_batch_size_falls_back_when_empty() {
1958 assert_eq!(
1959 DEFAULT_READ_BATCH_SIZE,
1960 compute_average_batch_size(std::iter::empty())
1961 );
1962 }
1963
1964 fn flat_ts_batch(timestamps: &[i64]) -> RecordBatch {
1966 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1967 use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1968
1969 let num_rows = timestamps.len();
1970 let schema = Arc::new(Schema::new(vec![
1971 Field::new(
1972 "ts",
1973 DataType::Timestamp(TimeUnit::Millisecond, None),
1974 false,
1975 ),
1976 Field::new("pk", DataType::UInt64, false),
1977 Field::new("seq", DataType::UInt64, false),
1978 Field::new("op", DataType::UInt8, false),
1979 ]));
1980 RecordBatch::try_new(
1981 schema,
1982 vec![
1983 Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())),
1984 Arc::new(UInt64Array::from(vec![0u64; num_rows])),
1985 Arc::new(UInt64Array::from(vec![0u64; num_rows])),
1986 Arc::new(UInt8Array::from(vec![0u8; num_rows])),
1987 ],
1988 )
1989 .unwrap()
1990 }
1991
1992 fn split_ts(timestamps: &[i64]) -> Vec<Vec<i64>> {
1994 let mut batches = VecDeque::new();
1995 split_record_batch(flat_ts_batch(timestamps), &mut batches);
1996 batches
1997 .iter()
1998 .map(|batch| {
1999 let pos = time_index_column_index(batch.num_columns());
2000 let (values, _) = timestamp_array_to_primitive(batch.column(pos)).unwrap();
2001 values.values().to_vec()
2002 })
2003 .collect()
2004 }
2005
2006 #[test]
2007 fn test_split_record_batch_on_equal_timestamps() {
2008 assert_eq!(
2010 split_ts(&[1, 2, 2, 3, 1]),
2011 vec![vec![1, 2], vec![2, 3], vec![1]]
2012 );
2013 assert_eq!(split_ts(&[5, 5, 5]), vec![vec![5], vec![5], vec![5]]);
2015 assert_eq!(split_ts(&[5, 5, 1, 2]), vec![vec![5], vec![5], vec![1, 2]]);
2017 assert_eq!(split_ts(&[1, 2, 5, 5]), vec![vec![1, 2, 5], vec![5]]);
2019 }
2020
2021 #[test]
2022 fn test_split_record_batch_on_decreasing_timestamps() {
2023 assert_eq!(split_ts(&[1, 2, 3]), vec![vec![1, 2, 3]]);
2024 assert_eq!(split_ts(&[1, 3, 2, 4]), vec![vec![1, 3], vec![2, 4]]);
2025 }
2026
2027 #[test]
2028 fn test_split_record_batch_empty_and_single_row() {
2029 let mut batches = VecDeque::new();
2030 split_record_batch(flat_ts_batch(&[]), &mut batches);
2031 assert!(batches.is_empty());
2032
2033 assert_eq!(split_ts(&[42]), vec![vec![42]]);
2034 }
2035}