1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55
56#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59 pub num_ranges: usize,
61 pub num_rows: usize,
63 pub build_part_cost: Duration,
65 pub build_reader_cost: Duration,
67 pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75 if self.num_ranges > 0 {
76 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77 }
78 if self.num_rows > 0 {
79 write!(f, ", \"num_rows\":{}", self.num_rows)?;
80 }
81 if !self.build_reader_cost.is_zero() {
82 write!(
83 f,
84 ", \"build_reader_cost\":\"{:?}\"",
85 self.build_reader_cost
86 )?;
87 }
88 if !self.scan_cost.is_zero() {
89 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90 }
91
92 write!(f, "}}")
93 }
94}
95
96impl FileScanMetrics {
97 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99 self.num_ranges += other.num_ranges;
100 self.num_rows += other.num_rows;
101 self.build_part_cost += other.build_part_cost;
102 self.build_reader_cost += other.build_reader_cost;
103 self.scan_cost += other.scan_cost;
104 }
105}
106
107#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110 prepare_scan_cost: Duration,
112 build_reader_cost: Duration,
114 scan_cost: Duration,
116 yield_cost: Duration,
118 convert_cost: Option<Time>,
120 total_cost: Duration,
122 num_rows: usize,
124 num_batches: usize,
126 num_mem_ranges: usize,
128 num_file_ranges: usize,
130
131 mem_scan_cost: Duration,
134 mem_rows: usize,
136 mem_batches: usize,
138 mem_series: usize,
140
141 build_parts_cost: Duration,
144 sst_scan_cost: Duration,
146 rg_total: usize,
148 rg_fulltext_filtered: usize,
150 rg_inverted_filtered: usize,
152 rg_minmax_filtered: usize,
154 rg_bloom_filtered: usize,
156 rg_vector_filtered: usize,
158 rows_before_filter: usize,
160 rows_fulltext_filtered: usize,
162 rows_inverted_filtered: usize,
164 rows_bloom_filtered: usize,
166 rows_vector_filtered: usize,
168 rows_vector_selected: usize,
170 rows_precise_filtered: usize,
172 fulltext_index_cache_hit: usize,
174 fulltext_index_cache_miss: usize,
176 inverted_index_cache_hit: usize,
178 inverted_index_cache_miss: usize,
180 bloom_filter_cache_hit: usize,
182 bloom_filter_cache_miss: usize,
184 num_sst_record_batches: usize,
186 num_sst_batches: usize,
188 num_sst_rows: usize,
190
191 first_poll: Duration,
193
194 num_series_send_timeout: usize,
196 num_series_send_full: usize,
198 num_distributor_rows: usize,
200 num_distributor_batches: usize,
202 distributor_scan_cost: Duration,
204 distributor_yield_cost: Duration,
206 distributor_divider_cost: Duration,
208
209 merge_metrics: MergeMetrics,
211 dedup_metrics: DedupMetrics,
213
214 stream_eof: bool,
216
217 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
220 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
222 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
224 fetch_metrics: Option<ParquetFetchMetrics>,
226 metadata_cache_metrics: Option<MetadataCacheMetrics>,
228 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
230
231 build_ranges_mem_size: isize,
233 build_ranges_peak_mem_size: isize,
235 num_range_builders: isize,
237 num_peak_range_builders: isize,
239}
240
241struct CompareCostReverse<'a> {
244 total_cost: Duration,
245 file_id: RegionFileId,
246 metrics: &'a FileScanMetrics,
247}
248
249impl Ord for CompareCostReverse<'_> {
250 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
251 other.total_cost.cmp(&self.total_cost)
253 }
254}
255
256impl PartialOrd for CompareCostReverse<'_> {
257 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
258 Some(self.cmp(other))
259 }
260}
261
262impl Eq for CompareCostReverse<'_> {}
263
264impl PartialEq for CompareCostReverse<'_> {
265 fn eq(&self, other: &Self) -> bool {
266 self.total_cost == other.total_cost
267 }
268}
269
270impl fmt::Debug for ScanMetricsSet {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 let ScanMetricsSet {
273 prepare_scan_cost,
274 build_reader_cost,
275 scan_cost,
276 yield_cost,
277 convert_cost,
278 total_cost,
279 num_rows,
280 num_batches,
281 num_mem_ranges,
282 num_file_ranges,
283 build_parts_cost,
284 sst_scan_cost,
285 rg_total,
286 rg_fulltext_filtered,
287 rg_inverted_filtered,
288 rg_minmax_filtered,
289 rg_bloom_filtered,
290 rg_vector_filtered,
291 rows_before_filter,
292 rows_fulltext_filtered,
293 rows_inverted_filtered,
294 rows_bloom_filtered,
295 rows_vector_filtered,
296 rows_vector_selected,
297 rows_precise_filtered,
298 fulltext_index_cache_hit,
299 fulltext_index_cache_miss,
300 inverted_index_cache_hit,
301 inverted_index_cache_miss,
302 bloom_filter_cache_hit,
303 bloom_filter_cache_miss,
304 num_sst_record_batches,
305 num_sst_batches,
306 num_sst_rows,
307 first_poll,
308 num_series_send_timeout,
309 num_series_send_full,
310 num_distributor_rows,
311 num_distributor_batches,
312 distributor_scan_cost,
313 distributor_yield_cost,
314 distributor_divider_cost,
315 merge_metrics,
316 dedup_metrics,
317 stream_eof,
318 mem_scan_cost,
319 mem_rows,
320 mem_batches,
321 mem_series,
322 inverted_index_apply_metrics,
323 bloom_filter_apply_metrics,
324 fulltext_index_apply_metrics,
325 fetch_metrics,
326 metadata_cache_metrics,
327 per_file_metrics,
328 build_ranges_mem_size: _,
329 build_ranges_peak_mem_size,
330 num_range_builders: _,
331 num_peak_range_builders,
332 } = self;
333
334 write!(
336 f,
337 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
338 \"build_reader_cost\":\"{build_reader_cost:?}\", \
339 \"scan_cost\":\"{scan_cost:?}\", \
340 \"yield_cost\":\"{yield_cost:?}\", \
341 \"total_cost\":\"{total_cost:?}\", \
342 \"num_rows\":{num_rows}, \
343 \"num_batches\":{num_batches}, \
344 \"num_mem_ranges\":{num_mem_ranges}, \
345 \"num_file_ranges\":{num_file_ranges}, \
346 \"build_parts_cost\":\"{build_parts_cost:?}\", \
347 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
348 \"rg_total\":{rg_total}, \
349 \"rows_before_filter\":{rows_before_filter}, \
350 \"num_sst_record_batches\":{num_sst_record_batches}, \
351 \"num_sst_batches\":{num_sst_batches}, \
352 \"num_sst_rows\":{num_sst_rows}, \
353 \"first_poll\":\"{first_poll:?}\""
354 )?;
355
356 if let Some(time) = convert_cost {
358 let duration = Duration::from_nanos(time.value() as u64);
359 write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
360 }
361
362 if *rg_fulltext_filtered > 0 {
364 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
365 }
366 if *rg_inverted_filtered > 0 {
367 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
368 }
369 if *rg_minmax_filtered > 0 {
370 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
371 }
372 if *rg_bloom_filtered > 0 {
373 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
374 }
375 if *rg_vector_filtered > 0 {
376 write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
377 }
378 if *rows_fulltext_filtered > 0 {
379 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
380 }
381 if *rows_inverted_filtered > 0 {
382 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
383 }
384 if *rows_bloom_filtered > 0 {
385 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
386 }
387 if *rows_vector_filtered > 0 {
388 write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
389 }
390 if *rows_vector_selected > 0 {
391 write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
392 }
393 if *rows_precise_filtered > 0 {
394 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
395 }
396 if *fulltext_index_cache_hit > 0 {
397 write!(
398 f,
399 ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
400 )?;
401 }
402 if *fulltext_index_cache_miss > 0 {
403 write!(
404 f,
405 ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
406 )?;
407 }
408 if *inverted_index_cache_hit > 0 {
409 write!(
410 f,
411 ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
412 )?;
413 }
414 if *inverted_index_cache_miss > 0 {
415 write!(
416 f,
417 ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
418 )?;
419 }
420 if *bloom_filter_cache_hit > 0 {
421 write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
422 }
423 if *bloom_filter_cache_miss > 0 {
424 write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
425 }
426
427 if *num_series_send_timeout > 0 {
429 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
430 }
431 if *num_series_send_full > 0 {
432 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
433 }
434 if *num_distributor_rows > 0 {
435 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
436 }
437 if *num_distributor_batches > 0 {
438 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
439 }
440 if !distributor_scan_cost.is_zero() {
441 write!(
442 f,
443 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
444 )?;
445 }
446 if !distributor_yield_cost.is_zero() {
447 write!(
448 f,
449 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
450 )?;
451 }
452 if !distributor_divider_cost.is_zero() {
453 write!(
454 f,
455 ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
456 )?;
457 }
458
459 if *mem_rows > 0 {
461 write!(f, ", \"mem_rows\":{mem_rows}")?;
462 }
463 if *mem_batches > 0 {
464 write!(f, ", \"mem_batches\":{mem_batches}")?;
465 }
466 if *mem_series > 0 {
467 write!(f, ", \"mem_series\":{mem_series}")?;
468 }
469 if !mem_scan_cost.is_zero() {
470 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
471 }
472
473 if let Some(metrics) = inverted_index_apply_metrics
475 && !metrics.is_empty()
476 {
477 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
478 }
479 if let Some(metrics) = bloom_filter_apply_metrics
480 && !metrics.is_empty()
481 {
482 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
483 }
484 if let Some(metrics) = fulltext_index_apply_metrics
485 && !metrics.is_empty()
486 {
487 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
488 }
489 if let Some(metrics) = fetch_metrics
490 && !metrics.is_empty()
491 {
492 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
493 }
494 if let Some(metrics) = metadata_cache_metrics
495 && !metrics.is_empty()
496 {
497 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
498 }
499
500 if !merge_metrics.scan_cost.is_zero() {
502 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
503 }
504
505 if !dedup_metrics.dedup_cost.is_zero() {
507 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
508 }
509
510 if let Some(file_metrics) = per_file_metrics
512 && !file_metrics.is_empty()
513 {
514 let mut heap = BinaryHeap::new();
516 for (file_id, metrics) in file_metrics.iter() {
517 let total_cost =
518 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
519
520 if heap.len() < 10 {
521 heap.push(CompareCostReverse {
523 total_cost,
524 file_id: *file_id,
525 metrics,
526 });
527 } else if let Some(min_entry) = heap.peek() {
528 if total_cost > min_entry.total_cost {
530 heap.pop();
531 heap.push(CompareCostReverse {
532 total_cost,
533 file_id: *file_id,
534 metrics,
535 });
536 }
537 }
538 }
539
540 let top_files = heap.into_sorted_vec();
541 write!(f, ", \"top_file_metrics\": {{")?;
542 for (i, item) in top_files.iter().enumerate() {
543 let CompareCostReverse {
544 total_cost: _,
545 file_id,
546 metrics,
547 } = item;
548 if i > 0 {
549 write!(f, ", ")?;
550 }
551 write!(f, "\"{}\": {:?}", file_id, metrics)?;
552 }
553 write!(f, "}}")?;
554 }
555
556 write!(
557 f,
558 ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
559 \"num_peak_range_builders\":{num_peak_range_builders}, \
560 \"stream_eof\":{stream_eof}}}"
561 )
562 }
563}
564impl ScanMetricsSet {
565 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
567 self.prepare_scan_cost += cost;
568 self
569 }
570
571 fn with_convert_cost(mut self, time: Time) -> Self {
573 self.convert_cost = Some(time);
574 self
575 }
576
577 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
579 let ScannerMetrics {
580 scan_cost,
581 yield_cost,
582 num_batches,
583 num_rows,
584 } = other;
585
586 self.scan_cost += *scan_cost;
587 self.yield_cost += *yield_cost;
588 self.num_rows += *num_rows;
589 self.num_batches += *num_batches;
590 }
591
592 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
594 let ReaderMetrics {
595 build_cost,
596 filter_metrics:
597 ReaderFilterMetrics {
598 rg_total,
599 rg_fulltext_filtered,
600 rg_inverted_filtered,
601 rg_minmax_filtered,
602 rg_bloom_filtered,
603 rg_vector_filtered,
604 rows_total,
605 rows_fulltext_filtered,
606 rows_inverted_filtered,
607 rows_bloom_filtered,
608 rows_vector_filtered,
609 rows_vector_selected,
610 rows_precise_filtered,
611 fulltext_index_cache_hit,
612 fulltext_index_cache_miss,
613 inverted_index_cache_hit,
614 inverted_index_cache_miss,
615 bloom_filter_cache_hit,
616 bloom_filter_cache_miss,
617 inverted_index_apply_metrics,
618 bloom_filter_apply_metrics,
619 fulltext_index_apply_metrics,
620 },
621 num_record_batches,
622 num_batches,
623 num_rows,
624 scan_cost,
625 metadata_cache_metrics,
626 fetch_metrics,
627 metadata_mem_size,
628 num_range_builders,
629 } = other;
630
631 self.build_parts_cost += *build_cost;
632 self.sst_scan_cost += *scan_cost;
633
634 self.rg_total += *rg_total;
635 self.rg_fulltext_filtered += *rg_fulltext_filtered;
636 self.rg_inverted_filtered += *rg_inverted_filtered;
637 self.rg_minmax_filtered += *rg_minmax_filtered;
638 self.rg_bloom_filtered += *rg_bloom_filtered;
639 self.rg_vector_filtered += *rg_vector_filtered;
640
641 self.rows_before_filter += *rows_total;
642 self.rows_fulltext_filtered += *rows_fulltext_filtered;
643 self.rows_inverted_filtered += *rows_inverted_filtered;
644 self.rows_bloom_filtered += *rows_bloom_filtered;
645 self.rows_vector_filtered += *rows_vector_filtered;
646 self.rows_vector_selected += *rows_vector_selected;
647 self.rows_precise_filtered += *rows_precise_filtered;
648
649 self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
650 self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
651 self.inverted_index_cache_hit += *inverted_index_cache_hit;
652 self.inverted_index_cache_miss += *inverted_index_cache_miss;
653 self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
654 self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
655
656 self.num_sst_record_batches += *num_record_batches;
657 self.num_sst_batches += *num_batches;
658 self.num_sst_rows += *num_rows;
659
660 if let Some(metrics) = inverted_index_apply_metrics {
662 self.inverted_index_apply_metrics
663 .get_or_insert_with(InvertedIndexApplyMetrics::default)
664 .merge_from(metrics);
665 }
666 if let Some(metrics) = bloom_filter_apply_metrics {
667 self.bloom_filter_apply_metrics
668 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
669 .merge_from(metrics);
670 }
671 if let Some(metrics) = fulltext_index_apply_metrics {
672 self.fulltext_index_apply_metrics
673 .get_or_insert_with(FulltextIndexApplyMetrics::default)
674 .merge_from(metrics);
675 }
676 if let Some(metrics) = fetch_metrics {
677 self.fetch_metrics
678 .get_or_insert_with(ParquetFetchMetrics::default)
679 .merge_from(metrics);
680 }
681 self.metadata_cache_metrics
682 .get_or_insert_with(MetadataCacheMetrics::default)
683 .merge_from(metadata_cache_metrics);
684
685 self.build_ranges_mem_size += *metadata_mem_size;
687 if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
688 self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
689 }
690
691 self.num_range_builders += *num_range_builders;
693 if self.num_range_builders > self.num_peak_range_builders {
694 self.num_peak_range_builders = self.num_range_builders;
695 }
696 }
697
698 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
700 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
701 for (file_id, metrics) in other {
702 self_file_metrics
703 .entry(*file_id)
704 .or_default()
705 .merge_from(metrics);
706 }
707 }
708
709 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
711 let SeriesDistributorMetrics {
712 num_series_send_timeout,
713 num_series_send_full,
714 num_rows,
715 num_batches,
716 scan_cost,
717 yield_cost,
718 divider_cost,
719 } = distributor_metrics;
720
721 self.num_series_send_timeout += *num_series_send_timeout;
722 self.num_series_send_full += *num_series_send_full;
723 self.num_distributor_rows += *num_rows;
724 self.num_distributor_batches += *num_batches;
725 self.distributor_scan_cost += *scan_cost;
726 self.distributor_yield_cost += *yield_cost;
727 self.distributor_divider_cost += *divider_cost;
728 }
729
730 fn observe_metrics(&self) {
732 READ_STAGE_ELAPSED
733 .with_label_values(&["prepare_scan"])
734 .observe(self.prepare_scan_cost.as_secs_f64());
735 READ_STAGE_ELAPSED
736 .with_label_values(&["build_reader"])
737 .observe(self.build_reader_cost.as_secs_f64());
738 READ_STAGE_ELAPSED
739 .with_label_values(&["scan"])
740 .observe(self.scan_cost.as_secs_f64());
741 READ_STAGE_ELAPSED
742 .with_label_values(&["yield"])
743 .observe(self.yield_cost.as_secs_f64());
744 if let Some(time) = &self.convert_cost {
745 READ_STAGE_ELAPSED
746 .with_label_values(&["convert"])
747 .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
748 }
749 READ_STAGE_ELAPSED
750 .with_label_values(&["total"])
751 .observe(self.total_cost.as_secs_f64());
752 READ_ROWS_RETURN.observe(self.num_rows as f64);
753 READ_BATCHES_RETURN.observe(self.num_batches as f64);
754
755 READ_STAGE_ELAPSED
756 .with_label_values(&["build_parts"])
757 .observe(self.build_parts_cost.as_secs_f64());
758
759 READ_ROW_GROUPS_TOTAL
760 .with_label_values(&["before_filtering"])
761 .inc_by(self.rg_total as u64);
762 READ_ROW_GROUPS_TOTAL
763 .with_label_values(&["fulltext_index_filtered"])
764 .inc_by(self.rg_fulltext_filtered as u64);
765 READ_ROW_GROUPS_TOTAL
766 .with_label_values(&["inverted_index_filtered"])
767 .inc_by(self.rg_inverted_filtered as u64);
768 READ_ROW_GROUPS_TOTAL
769 .with_label_values(&["minmax_index_filtered"])
770 .inc_by(self.rg_minmax_filtered as u64);
771 READ_ROW_GROUPS_TOTAL
772 .with_label_values(&["bloom_filter_index_filtered"])
773 .inc_by(self.rg_bloom_filtered as u64);
774 #[cfg(feature = "vector_index")]
775 READ_ROW_GROUPS_TOTAL
776 .with_label_values(&["vector_index_filtered"])
777 .inc_by(self.rg_vector_filtered as u64);
778
779 PRECISE_FILTER_ROWS_TOTAL
780 .with_label_values(&["parquet"])
781 .inc_by(self.rows_precise_filtered as u64);
782 READ_ROWS_IN_ROW_GROUP_TOTAL
783 .with_label_values(&["before_filtering"])
784 .inc_by(self.rows_before_filter as u64);
785 READ_ROWS_IN_ROW_GROUP_TOTAL
786 .with_label_values(&["fulltext_index_filtered"])
787 .inc_by(self.rows_fulltext_filtered as u64);
788 READ_ROWS_IN_ROW_GROUP_TOTAL
789 .with_label_values(&["inverted_index_filtered"])
790 .inc_by(self.rows_inverted_filtered as u64);
791 READ_ROWS_IN_ROW_GROUP_TOTAL
792 .with_label_values(&["bloom_filter_index_filtered"])
793 .inc_by(self.rows_bloom_filtered as u64);
794 #[cfg(feature = "vector_index")]
795 READ_ROWS_IN_ROW_GROUP_TOTAL
796 .with_label_values(&["vector_index_filtered"])
797 .inc_by(self.rows_vector_filtered as u64);
798 }
799}
800
801struct PartitionMetricsInner {
802 region_id: RegionId,
803 partition: usize,
805 scanner_type: &'static str,
807 query_start: Instant,
809 explain_verbose: bool,
811 metrics: Mutex<ScanMetricsSet>,
813 in_progress_scan: IntGauge,
814
815 build_parts_cost: Time,
818 build_reader_cost: Time,
820 scan_cost: Time,
822 yield_cost: Time,
824 convert_cost: Time,
826 elapsed_compute: Time,
828}
829
830impl PartitionMetricsInner {
831 fn on_finish(&self, stream_eof: bool) {
832 let mut metrics = self.metrics.lock().unwrap();
833 if metrics.total_cost.is_zero() {
834 metrics.total_cost = self.query_start.elapsed();
835 }
836 if !metrics.stream_eof {
837 metrics.stream_eof = stream_eof;
838 }
839 }
840}
841
842impl MergeMetricsReport for PartitionMetricsInner {
843 fn report(&self, metrics: &mut MergeMetrics) {
844 let mut scan_metrics = self.metrics.lock().unwrap();
845 scan_metrics.merge_metrics.merge(metrics);
847
848 *metrics = MergeMetrics::default();
850 }
851}
852
853impl DedupMetricsReport for PartitionMetricsInner {
854 fn report(&self, metrics: &mut DedupMetrics) {
855 let mut scan_metrics = self.metrics.lock().unwrap();
856 scan_metrics.dedup_metrics.merge(metrics);
858
859 *metrics = DedupMetrics::default();
861 }
862}
863
864impl Drop for PartitionMetricsInner {
865 fn drop(&mut self) {
866 self.on_finish(false);
867 let metrics = self.metrics.lock().unwrap();
868 metrics.observe_metrics();
869 self.in_progress_scan.dec();
870
871 if self.explain_verbose {
872 common_telemetry::info!(
873 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
874 self.scanner_type,
875 self.region_id,
876 self.partition,
877 metrics,
878 );
879 } else {
880 common_telemetry::debug!(
881 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
882 self.scanner_type,
883 self.region_id,
884 self.partition,
885 metrics,
886 );
887 }
888 }
889}
890
891#[derive(Default)]
893pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
894
895impl PartitionMetricsList {
896 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
898 let mut list = self.0.lock().unwrap();
899 if list.len() <= partition {
900 list.resize(partition + 1, None);
901 }
902 list[partition] = Some(metrics);
903 }
904
905 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
907 let list = self.0.lock().unwrap();
908 write!(f, ", \"metrics_per_partition\": ")?;
909 f.debug_list()
910 .entries(list.iter().filter_map(|p| p.as_ref()))
911 .finish()?;
912 write!(f, "}}")
913 }
914}
915
916#[derive(Clone)]
918pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
919
920impl PartitionMetrics {
921 pub(crate) fn new(
922 region_id: RegionId,
923 partition: usize,
924 scanner_type: &'static str,
925 query_start: Instant,
926 explain_verbose: bool,
927 metrics_set: &ExecutionPlanMetricsSet,
928 ) -> Self {
929 let partition_str = partition.to_string();
930 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
931 in_progress_scan.inc();
932 let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
933 let metrics = ScanMetricsSet::default()
934 .with_prepare_scan_cost(query_start.elapsed())
935 .with_convert_cost(convert_cost.clone());
936 let inner = PartitionMetricsInner {
937 region_id,
938 partition,
939 scanner_type,
940 query_start,
941 explain_verbose,
942 metrics: Mutex::new(metrics),
943 in_progress_scan,
944 build_parts_cost: MetricBuilder::new(metrics_set)
945 .subset_time("build_parts_cost", partition),
946 build_reader_cost: MetricBuilder::new(metrics_set)
947 .subset_time("build_reader_cost", partition),
948 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
949 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
950 convert_cost,
951 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
952 };
953 Self(Arc::new(inner))
954 }
955
956 pub(crate) fn on_first_poll(&self) {
957 let mut metrics = self.0.metrics.lock().unwrap();
958 metrics.first_poll = self.0.query_start.elapsed();
959 }
960
961 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
962 let mut metrics = self.0.metrics.lock().unwrap();
963 metrics.num_mem_ranges += num;
964 }
965
966 pub fn inc_num_file_ranges(&self, num: usize) {
967 let mut metrics = self.0.metrics.lock().unwrap();
968 metrics.num_file_ranges += num;
969 }
970
971 fn record_elapsed_compute(&self, duration: Duration) {
972 if duration.is_zero() {
973 return;
974 }
975 self.0.elapsed_compute.add_duration(duration);
976 }
977
978 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
980 self.0.build_reader_cost.add_duration(cost);
981
982 let mut metrics = self.0.metrics.lock().unwrap();
983 metrics.build_reader_cost += cost;
984 }
985
986 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
987 self.0.convert_cost.add_duration(cost);
988 self.record_elapsed_compute(cost);
989 }
990
991 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
993 let mut metrics = self.0.metrics.lock().unwrap();
994 metrics.mem_scan_cost += data.scan_cost;
995 metrics.mem_rows += data.num_rows;
996 metrics.mem_batches += data.num_batches;
997 metrics.mem_series += data.total_series;
998 }
999
1000 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1002 self.0.scan_cost.add_duration(metrics.scan_cost);
1003 self.record_elapsed_compute(metrics.scan_cost);
1004 self.0.yield_cost.add_duration(metrics.yield_cost);
1005 self.record_elapsed_compute(metrics.yield_cost);
1006
1007 let mut metrics_set = self.0.metrics.lock().unwrap();
1008 metrics_set.merge_scanner_metrics(metrics);
1009 }
1010
1011 pub fn merge_reader_metrics(
1013 &self,
1014 metrics: &ReaderMetrics,
1015 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1016 ) {
1017 self.0.build_parts_cost.add_duration(metrics.build_cost);
1018
1019 let mut metrics_set = self.0.metrics.lock().unwrap();
1020 metrics_set.merge_reader_metrics(metrics);
1021
1022 if let Some(file_metrics) = per_file_metrics {
1024 metrics_set.merge_per_file_metrics(file_metrics);
1025 }
1026 }
1027
1028 pub(crate) fn on_finish(&self) {
1030 self.0.on_finish(true);
1031 }
1032
1033 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1035 let mut metrics_set = self.0.metrics.lock().unwrap();
1036 metrics_set.set_distributor_metrics(metrics);
1037 }
1038
1039 pub(crate) fn explain_verbose(&self) -> bool {
1041 self.0.explain_verbose
1042 }
1043
1044 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1046 self.0.clone()
1047 }
1048
1049 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1051 self.0.clone()
1052 }
1053}
1054
1055impl fmt::Debug for PartitionMetrics {
1056 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057 let metrics = self.0.metrics.lock().unwrap();
1058 write!(
1059 f,
1060 r#"{{"partition":{}, "metrics":{:?}}}"#,
1061 self.0.partition, metrics
1062 )
1063 }
1064}
1065
1066#[derive(Default)]
1068pub(crate) struct SeriesDistributorMetrics {
1069 pub(crate) num_series_send_timeout: usize,
1071 pub(crate) num_series_send_full: usize,
1073 pub(crate) num_rows: usize,
1075 pub(crate) num_batches: usize,
1077 pub(crate) scan_cost: Duration,
1079 pub(crate) yield_cost: Duration,
1081 pub(crate) divider_cost: Duration,
1083}
1084
1085#[tracing::instrument(
1087 skip_all,
1088 fields(
1089 region_id = %stream_ctx.input.region_metadata().region_id,
1090 file_or_mem_index = %index.index,
1091 row_group_index = %index.row_group_index,
1092 source = "mem"
1093 )
1094)]
1095pub(crate) fn scan_mem_ranges(
1096 stream_ctx: Arc<StreamContext>,
1097 part_metrics: PartitionMetrics,
1098 index: RowGroupIndex,
1099 time_range: FileTimeRange,
1100) -> impl Stream<Item = Result<Batch>> {
1101 try_stream! {
1102 let ranges = stream_ctx.input.build_mem_ranges(index);
1103 part_metrics.inc_num_mem_ranges(ranges.len());
1104 for range in ranges {
1105 let build_reader_start = Instant::now();
1106 let mem_scan_metrics = Some(MemScanMetrics::default());
1107 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1108 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1109
1110 let mut source = Source::Iter(iter);
1111 while let Some(batch) = source.next_batch().await? {
1112 yield batch;
1113 }
1114
1115 if let Some(ref metrics) = mem_scan_metrics {
1117 let data = metrics.data();
1118 part_metrics.report_mem_scan_metrics(&data);
1119 }
1120 }
1121 }
1122}
1123
1124#[tracing::instrument(
1126 skip_all,
1127 fields(
1128 region_id = %stream_ctx.input.region_metadata().region_id,
1129 row_group_index = %index.index,
1130 source = "mem_flat"
1131 )
1132)]
1133pub(crate) fn scan_flat_mem_ranges(
1134 stream_ctx: Arc<StreamContext>,
1135 part_metrics: PartitionMetrics,
1136 index: RowGroupIndex,
1137) -> impl Stream<Item = Result<RecordBatch>> {
1138 try_stream! {
1139 let ranges = stream_ctx.input.build_mem_ranges(index);
1140 part_metrics.inc_num_mem_ranges(ranges.len());
1141 for range in ranges {
1142 let build_reader_start = Instant::now();
1143 let mem_scan_metrics = Some(MemScanMetrics::default());
1144 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
1145 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1146
1147 while let Some(record_batch) = iter.next().transpose()? {
1148 yield record_batch;
1149 }
1150
1151 if let Some(ref metrics) = mem_scan_metrics {
1153 let data = metrics.data();
1154 part_metrics.report_mem_scan_metrics(&data);
1155 }
1156 }
1157 }
1158}
1159
1160const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1162const NUM_SERIES_THRESHOLD: u64 = 10240;
1164const BATCH_SIZE_THRESHOLD: u64 = 50;
1167
1168pub(crate) fn should_split_flat_batches_for_merge(
1170 stream_ctx: &Arc<StreamContext>,
1171 range_meta: &RangeMeta,
1172) -> bool {
1173 let mut num_files_to_split = 0;
1175 let mut num_mem_rows = 0;
1176 let mut num_mem_series = 0;
1177 for index in &range_meta.row_group_indices {
1181 if stream_ctx.is_mem_range_index(*index) {
1182 let memtable = &stream_ctx.input.memtables[index.index];
1183 let stats = memtable.stats();
1185 num_mem_rows += stats.num_rows();
1186 num_mem_series += stats.series_count();
1187 } else if stream_ctx.is_file_range_index(*index) {
1188 let file_index = index.index - stream_ctx.input.num_memtables();
1190 let file = &stream_ctx.input.files[file_index];
1191 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1192 continue;
1194 }
1195 debug_assert!(file.meta_ref().num_rows > 0);
1196 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1197 return false;
1199 } else {
1200 num_files_to_split += 1;
1201 }
1202 }
1203 }
1205
1206 if num_files_to_split > 0 {
1207 true
1209 } else if num_mem_series > 0 && num_mem_rows > 0 {
1210 can_split_series(num_mem_rows as u64, num_mem_series as u64)
1212 } else {
1213 false
1214 }
1215}
1216
1217fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1218 assert!(num_series > 0);
1219 assert!(num_rows > 0);
1220
1221 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1223}
1224
1225fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1228 if explain_verbose {
1229 ReaderFilterMetrics {
1230 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1231 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1232 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1233 ..Default::default()
1234 }
1235 } else {
1236 ReaderFilterMetrics::default()
1237 }
1238}
1239
1240#[tracing::instrument(
1242 skip_all,
1243 fields(
1244 region_id = %stream_ctx.input.region_metadata().region_id,
1245 row_group_index = %index.index,
1246 source = read_type
1247 )
1248)]
1249pub(crate) async fn scan_file_ranges(
1250 stream_ctx: Arc<StreamContext>,
1251 part_metrics: PartitionMetrics,
1252 index: RowGroupIndex,
1253 read_type: &'static str,
1254 range_builder: Arc<RangeBuilderList>,
1255) -> Result<impl Stream<Item = Result<Batch>>> {
1256 let mut reader_metrics = ReaderMetrics {
1257 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1258 ..Default::default()
1259 };
1260 let ranges = range_builder
1261 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1262 .await?;
1263 part_metrics.inc_num_file_ranges(ranges.len());
1264 part_metrics.merge_reader_metrics(&reader_metrics, None);
1265
1266 let init_per_file_metrics = if part_metrics.explain_verbose() {
1268 let file = stream_ctx.input.file_from_index(index);
1269 let file_id = file.file_id();
1270
1271 let mut map = HashMap::new();
1272 map.insert(
1273 file_id,
1274 FileScanMetrics {
1275 build_part_cost: reader_metrics.build_cost,
1276 ..Default::default()
1277 },
1278 );
1279 Some(map)
1280 } else {
1281 None
1282 };
1283
1284 Ok(build_file_range_scan_stream(
1285 stream_ctx,
1286 part_metrics,
1287 read_type,
1288 ranges,
1289 init_per_file_metrics,
1290 ))
1291}
1292
1293#[tracing::instrument(
1295 skip_all,
1296 fields(
1297 region_id = %stream_ctx.input.region_metadata().region_id,
1298 row_group_index = %index.index,
1299 source = read_type
1300 )
1301)]
1302pub(crate) async fn scan_flat_file_ranges(
1303 stream_ctx: Arc<StreamContext>,
1304 part_metrics: PartitionMetrics,
1305 index: RowGroupIndex,
1306 read_type: &'static str,
1307 range_builder: Arc<RangeBuilderList>,
1308) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1309 let mut reader_metrics = ReaderMetrics {
1310 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1311 ..Default::default()
1312 };
1313 let ranges = range_builder
1314 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1315 .await?;
1316 part_metrics.inc_num_file_ranges(ranges.len());
1317 part_metrics.merge_reader_metrics(&reader_metrics, None);
1318
1319 let init_per_file_metrics = if part_metrics.explain_verbose() {
1321 let file = stream_ctx.input.file_from_index(index);
1322 let file_id = file.file_id();
1323
1324 let mut map = HashMap::new();
1325 map.insert(
1326 file_id,
1327 FileScanMetrics {
1328 build_part_cost: reader_metrics.build_cost,
1329 ..Default::default()
1330 },
1331 );
1332 Some(map)
1333 } else {
1334 None
1335 };
1336
1337 Ok(build_flat_file_range_scan_stream(
1338 stream_ctx,
1339 part_metrics,
1340 read_type,
1341 ranges,
1342 init_per_file_metrics,
1343 ))
1344}
1345
1346#[tracing::instrument(
1348 skip_all,
1349 fields(read_type = read_type, range_count = ranges.len())
1350)]
1351pub fn build_file_range_scan_stream(
1352 stream_ctx: Arc<StreamContext>,
1353 part_metrics: PartitionMetrics,
1354 read_type: &'static str,
1355 ranges: SmallVec<[FileRange; 2]>,
1356 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1357) -> impl Stream<Item = Result<Batch>> {
1358 try_stream! {
1359 let fetch_metrics = if part_metrics.explain_verbose() {
1360 Some(Arc::new(ParquetFetchMetrics::default()))
1361 } else {
1362 None
1363 };
1364 let reader_metrics = &mut ReaderMetrics {
1365 fetch_metrics: fetch_metrics.clone(),
1366 ..Default::default()
1367 };
1368 for range in ranges {
1369 let build_reader_start = Instant::now();
1370 let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1371 continue;
1372 };
1373 let build_cost = build_reader_start.elapsed();
1374 part_metrics.inc_build_reader_cost(build_cost);
1375 let compat_batch = range.compat_batch();
1376 let mut source = Source::PruneReader(reader);
1377 while let Some(mut batch) = source.next_batch().await? {
1378 if let Some(compact_batch) = compat_batch {
1379 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1380 }
1381 yield batch;
1382 }
1383 if let Source::PruneReader(reader) = source {
1384 let prune_metrics = reader.metrics();
1385
1386 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1388 let file_id = range.file_handle().file_id();
1389 let file_metrics = file_metrics_map
1390 .entry(file_id)
1391 .or_insert_with(FileScanMetrics::default);
1392
1393 file_metrics.num_ranges += 1;
1394 file_metrics.num_rows += prune_metrics.num_rows;
1395 file_metrics.build_reader_cost += build_cost;
1396 file_metrics.scan_cost += prune_metrics.scan_cost;
1397 }
1398
1399 reader_metrics.merge_from(&prune_metrics);
1400 }
1401 }
1402
1403 reader_metrics.observe_rows(read_type);
1405 reader_metrics.filter_metrics.observe();
1406 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1407 }
1408}
1409
1410#[tracing::instrument(
1412 skip_all,
1413 fields(read_type = read_type, range_count = ranges.len())
1414)]
1415pub fn build_flat_file_range_scan_stream(
1416 _stream_ctx: Arc<StreamContext>,
1417 part_metrics: PartitionMetrics,
1418 read_type: &'static str,
1419 ranges: SmallVec<[FileRange; 2]>,
1420 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1421) -> impl Stream<Item = Result<RecordBatch>> {
1422 try_stream! {
1423 let fetch_metrics = if part_metrics.explain_verbose() {
1424 Some(Arc::new(ParquetFetchMetrics::default()))
1425 } else {
1426 None
1427 };
1428 let reader_metrics = &mut ReaderMetrics {
1429 fetch_metrics: fetch_metrics.clone(),
1430 ..Default::default()
1431 };
1432 for range in ranges {
1433 let build_reader_start = Instant::now();
1434 let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1435 let build_cost = build_reader_start.elapsed();
1436 part_metrics.inc_build_reader_cost(build_cost);
1437
1438 let may_compat = range
1439 .compat_batch()
1440 .map(|compat| {
1441 compat.as_flat().context(UnexpectedSnafu {
1442 reason: "Invalid compat for flat format",
1443 })
1444 })
1445 .transpose()?;
1446
1447 let mapper = range.compaction_projection_mapper();
1448 while let Some(record_batch) = reader.next_batch()? {
1449 let record_batch = if let Some(mapper) = mapper {
1450 let batch = mapper.project(record_batch)?;
1451 batch
1452 } else {
1453 record_batch
1454 };
1455
1456 if let Some(flat_compat) = may_compat {
1457 let batch = flat_compat.compat(record_batch)?;
1458 yield batch;
1459 } else {
1460 yield record_batch;
1461 }
1462 }
1463
1464 let prune_metrics = reader.metrics();
1465
1466 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1468 let file_id = range.file_handle().file_id();
1469 let file_metrics = file_metrics_map
1470 .entry(file_id)
1471 .or_insert_with(FileScanMetrics::default);
1472
1473 file_metrics.num_ranges += 1;
1474 file_metrics.num_rows += prune_metrics.num_rows;
1475 file_metrics.build_reader_cost += build_cost;
1476 file_metrics.scan_cost += prune_metrics.scan_cost;
1477 }
1478
1479 reader_metrics.merge_from(&prune_metrics);
1480 }
1481
1482 reader_metrics.observe_rows(read_type);
1484 reader_metrics.filter_metrics.observe();
1485 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1486 }
1487}
1488
1489#[cfg(feature = "enterprise")]
1491pub(crate) async fn scan_extension_range(
1492 context: Arc<StreamContext>,
1493 index: RowGroupIndex,
1494 partition_metrics: PartitionMetrics,
1495) -> Result<BoxedBatchStream> {
1496 use snafu::ResultExt;
1497
1498 let range = context.input.extension_range(index.index);
1499 let reader = range.reader(context.as_ref());
1500 let stream = reader
1501 .read(context, partition_metrics, index)
1502 .await
1503 .context(crate::error::ScanExternalRangeSnafu)?;
1504 Ok(stream)
1505}
1506
1507pub(crate) async fn maybe_scan_other_ranges(
1508 context: &Arc<StreamContext>,
1509 index: RowGroupIndex,
1510 metrics: &PartitionMetrics,
1511) -> Result<BoxedBatchStream> {
1512 #[cfg(feature = "enterprise")]
1513 {
1514 scan_extension_range(context.clone(), index, metrics.clone()).await
1515 }
1516
1517 #[cfg(not(feature = "enterprise"))]
1518 {
1519 let _ = context;
1520 let _ = index;
1521 let _ = metrics;
1522
1523 crate::error::UnexpectedSnafu {
1524 reason: "no other ranges scannable",
1525 }
1526 .fail()
1527 }
1528}
1529
1530pub(crate) async fn maybe_scan_flat_other_ranges(
1531 context: &Arc<StreamContext>,
1532 index: RowGroupIndex,
1533 metrics: &PartitionMetrics,
1534) -> Result<BoxedRecordBatchStream> {
1535 let _ = context;
1536 let _ = index;
1537 let _ = metrics;
1538
1539 crate::error::UnexpectedSnafu {
1540 reason: "no other ranges scannable in flat format",
1541 }
1542 .fail()
1543}
1544
1545pub(crate) struct SplitRecordBatchStream<S> {
1547 inner: S,
1549 batches: VecDeque<RecordBatch>,
1551}
1552
1553impl<S> SplitRecordBatchStream<S> {
1554 pub(crate) fn new(inner: S) -> Self {
1556 Self {
1557 inner,
1558 batches: VecDeque::new(),
1559 }
1560 }
1561}
1562
1563impl<S> Stream for SplitRecordBatchStream<S>
1564where
1565 S: Stream<Item = Result<RecordBatch>> + Unpin,
1566{
1567 type Item = Result<RecordBatch>;
1568
1569 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1570 loop {
1571 if let Some(batch) = self.batches.pop_front() {
1573 return Poll::Ready(Some(Ok(batch)));
1574 }
1575
1576 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1578 Some(Ok(batch)) => batch,
1579 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1580 None => return Poll::Ready(None),
1581 };
1582
1583 split_record_batch(record_batch, &mut self.batches);
1585 }
1587 }
1588}
1589
1590pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1595 let batch_rows = record_batch.num_rows();
1596 if batch_rows == 0 {
1597 return;
1598 }
1599 if batch_rows < 2 {
1600 batches.push_back(record_batch);
1601 return;
1602 }
1603
1604 let time_index_pos = time_index_column_index(record_batch.num_columns());
1605 let timestamps = record_batch.column(time_index_pos);
1606 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1607 let mut offsets = Vec::with_capacity(16);
1608 offsets.push(0);
1609 let values = ts_values.values();
1610 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1611 if value > values[i + 1] {
1612 offsets.push(i + 1);
1613 }
1614 }
1615 offsets.push(values.len());
1616
1617 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1619 let end = offsets[i + 1];
1620 let rows_in_batch = end - start;
1621 batches.push_back(record_batch.slice(start, rows_in_batch));
1622 }
1623}