1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55
56#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59 pub num_ranges: usize,
61 pub num_rows: usize,
63 pub build_part_cost: Duration,
65 pub build_reader_cost: Duration,
67 pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75 if self.num_ranges > 0 {
76 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77 }
78 if self.num_rows > 0 {
79 write!(f, ", \"num_rows\":{}", self.num_rows)?;
80 }
81 if !self.build_reader_cost.is_zero() {
82 write!(
83 f,
84 ", \"build_reader_cost\":\"{:?}\"",
85 self.build_reader_cost
86 )?;
87 }
88 if !self.scan_cost.is_zero() {
89 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90 }
91
92 write!(f, "}}")
93 }
94}
95
96impl FileScanMetrics {
97 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99 self.num_ranges += other.num_ranges;
100 self.num_rows += other.num_rows;
101 self.build_part_cost += other.build_part_cost;
102 self.build_reader_cost += other.build_reader_cost;
103 self.scan_cost += other.scan_cost;
104 }
105}
106
107#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110 prepare_scan_cost: Duration,
112 build_reader_cost: Duration,
114 scan_cost: Duration,
116 yield_cost: Duration,
118 total_cost: Duration,
120 num_rows: usize,
122 num_batches: usize,
124 num_mem_ranges: usize,
126 num_file_ranges: usize,
128
129 mem_scan_cost: Duration,
132 mem_rows: usize,
134 mem_batches: usize,
136 mem_series: usize,
138
139 build_parts_cost: Duration,
142 sst_scan_cost: Duration,
144 rg_total: usize,
146 rg_fulltext_filtered: usize,
148 rg_inverted_filtered: usize,
150 rg_minmax_filtered: usize,
152 rg_bloom_filtered: usize,
154 rows_before_filter: usize,
156 rows_fulltext_filtered: usize,
158 rows_inverted_filtered: usize,
160 rows_bloom_filtered: usize,
162 rows_precise_filtered: usize,
164 num_sst_record_batches: usize,
166 num_sst_batches: usize,
168 num_sst_rows: usize,
170
171 first_poll: Duration,
173
174 num_series_send_timeout: usize,
176 num_series_send_full: usize,
178 num_distributor_rows: usize,
180 num_distributor_batches: usize,
182 distributor_scan_cost: Duration,
184 distributor_yield_cost: Duration,
186
187 merge_metrics: MergeMetrics,
189 dedup_metrics: DedupMetrics,
191
192 stream_eof: bool,
194
195 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
198 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
200 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
202 fetch_metrics: Option<ParquetFetchMetrics>,
204 metadata_cache_metrics: Option<MetadataCacheMetrics>,
206 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
208}
209
210struct CompareCostReverse<'a> {
213 total_cost: Duration,
214 file_id: RegionFileId,
215 metrics: &'a FileScanMetrics,
216}
217
218impl Ord for CompareCostReverse<'_> {
219 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
220 other.total_cost.cmp(&self.total_cost)
222 }
223}
224
225impl PartialOrd for CompareCostReverse<'_> {
226 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
227 Some(self.cmp(other))
228 }
229}
230
231impl Eq for CompareCostReverse<'_> {}
232
233impl PartialEq for CompareCostReverse<'_> {
234 fn eq(&self, other: &Self) -> bool {
235 self.total_cost == other.total_cost
236 }
237}
238
239impl fmt::Debug for ScanMetricsSet {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 let ScanMetricsSet {
242 prepare_scan_cost,
243 build_reader_cost,
244 scan_cost,
245 yield_cost,
246 total_cost,
247 num_rows,
248 num_batches,
249 num_mem_ranges,
250 num_file_ranges,
251 build_parts_cost,
252 sst_scan_cost,
253 rg_total,
254 rg_fulltext_filtered,
255 rg_inverted_filtered,
256 rg_minmax_filtered,
257 rg_bloom_filtered,
258 rows_before_filter,
259 rows_fulltext_filtered,
260 rows_inverted_filtered,
261 rows_bloom_filtered,
262 rows_precise_filtered,
263 num_sst_record_batches,
264 num_sst_batches,
265 num_sst_rows,
266 first_poll,
267 num_series_send_timeout,
268 num_series_send_full,
269 num_distributor_rows,
270 num_distributor_batches,
271 distributor_scan_cost,
272 distributor_yield_cost,
273 merge_metrics,
274 dedup_metrics,
275 stream_eof,
276 mem_scan_cost,
277 mem_rows,
278 mem_batches,
279 mem_series,
280 inverted_index_apply_metrics,
281 bloom_filter_apply_metrics,
282 fulltext_index_apply_metrics,
283 fetch_metrics,
284 metadata_cache_metrics,
285 per_file_metrics,
286 } = self;
287
288 write!(
290 f,
291 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
292 \"build_reader_cost\":\"{build_reader_cost:?}\", \
293 \"scan_cost\":\"{scan_cost:?}\", \
294 \"yield_cost\":\"{yield_cost:?}\", \
295 \"total_cost\":\"{total_cost:?}\", \
296 \"num_rows\":{num_rows}, \
297 \"num_batches\":{num_batches}, \
298 \"num_mem_ranges\":{num_mem_ranges}, \
299 \"num_file_ranges\":{num_file_ranges}, \
300 \"build_parts_cost\":\"{build_parts_cost:?}\", \
301 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
302 \"rg_total\":{rg_total}, \
303 \"rows_before_filter\":{rows_before_filter}, \
304 \"num_sst_record_batches\":{num_sst_record_batches}, \
305 \"num_sst_batches\":{num_sst_batches}, \
306 \"num_sst_rows\":{num_sst_rows}, \
307 \"first_poll\":\"{first_poll:?}\""
308 )?;
309
310 if *rg_fulltext_filtered > 0 {
312 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
313 }
314 if *rg_inverted_filtered > 0 {
315 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
316 }
317 if *rg_minmax_filtered > 0 {
318 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
319 }
320 if *rg_bloom_filtered > 0 {
321 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
322 }
323 if *rows_fulltext_filtered > 0 {
324 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
325 }
326 if *rows_inverted_filtered > 0 {
327 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
328 }
329 if *rows_bloom_filtered > 0 {
330 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
331 }
332 if *rows_precise_filtered > 0 {
333 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
334 }
335
336 if *num_series_send_timeout > 0 {
338 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
339 }
340 if *num_series_send_full > 0 {
341 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
342 }
343 if *num_distributor_rows > 0 {
344 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
345 }
346 if *num_distributor_batches > 0 {
347 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
348 }
349 if !distributor_scan_cost.is_zero() {
350 write!(
351 f,
352 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
353 )?;
354 }
355 if !distributor_yield_cost.is_zero() {
356 write!(
357 f,
358 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
359 )?;
360 }
361
362 if *mem_rows > 0 {
364 write!(f, ", \"mem_rows\":{mem_rows}")?;
365 }
366 if *mem_batches > 0 {
367 write!(f, ", \"mem_batches\":{mem_batches}")?;
368 }
369 if *mem_series > 0 {
370 write!(f, ", \"mem_series\":{mem_series}")?;
371 }
372 if !mem_scan_cost.is_zero() {
373 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
374 }
375
376 if let Some(metrics) = inverted_index_apply_metrics
378 && !metrics.is_empty()
379 {
380 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
381 }
382 if let Some(metrics) = bloom_filter_apply_metrics
383 && !metrics.is_empty()
384 {
385 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
386 }
387 if let Some(metrics) = fulltext_index_apply_metrics
388 && !metrics.is_empty()
389 {
390 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
391 }
392 if let Some(metrics) = fetch_metrics
393 && !metrics.is_empty()
394 {
395 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
396 }
397 if let Some(metrics) = metadata_cache_metrics
398 && !metrics.is_empty()
399 {
400 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
401 }
402
403 if !merge_metrics.scan_cost.is_zero() {
405 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
406 }
407
408 if !dedup_metrics.dedup_cost.is_zero() {
410 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
411 }
412
413 if let Some(file_metrics) = per_file_metrics
415 && !file_metrics.is_empty()
416 {
417 let mut heap = BinaryHeap::new();
419 for (file_id, metrics) in file_metrics.iter() {
420 let total_cost =
421 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
422
423 if heap.len() < 10 {
424 heap.push(CompareCostReverse {
426 total_cost,
427 file_id: *file_id,
428 metrics,
429 });
430 } else if let Some(min_entry) = heap.peek() {
431 if total_cost > min_entry.total_cost {
433 heap.pop();
434 heap.push(CompareCostReverse {
435 total_cost,
436 file_id: *file_id,
437 metrics,
438 });
439 }
440 }
441 }
442
443 let top_files = heap.into_sorted_vec();
444 write!(f, ", \"top_file_metrics\": {{")?;
445 for (i, item) in top_files.iter().enumerate() {
446 let CompareCostReverse {
447 total_cost: _,
448 file_id,
449 metrics,
450 } = item;
451 if i > 0 {
452 write!(f, ", ")?;
453 }
454 write!(f, "\"{}\": {:?}", file_id, metrics)?;
455 }
456 write!(f, "}}")?;
457 }
458
459 write!(f, ", \"stream_eof\":{stream_eof}}}")
460 }
461}
462impl ScanMetricsSet {
463 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
465 self.prepare_scan_cost += cost;
466 self
467 }
468
469 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
471 let ScannerMetrics {
472 prepare_scan_cost,
473 build_reader_cost,
474 scan_cost,
475 yield_cost,
476 num_batches,
477 num_rows,
478 num_mem_ranges,
479 num_file_ranges,
480 } = other;
481
482 self.prepare_scan_cost += *prepare_scan_cost;
483 self.build_reader_cost += *build_reader_cost;
484 self.scan_cost += *scan_cost;
485 self.yield_cost += *yield_cost;
486 self.num_rows += *num_rows;
487 self.num_batches += *num_batches;
488 self.num_mem_ranges += *num_mem_ranges;
489 self.num_file_ranges += *num_file_ranges;
490 }
491
492 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
494 let ReaderMetrics {
495 build_cost,
496 filter_metrics:
497 ReaderFilterMetrics {
498 rg_total,
499 rg_fulltext_filtered,
500 rg_inverted_filtered,
501 rg_minmax_filtered,
502 rg_bloom_filtered,
503 rows_total,
504 rows_fulltext_filtered,
505 rows_inverted_filtered,
506 rows_bloom_filtered,
507 rows_precise_filtered,
508 inverted_index_apply_metrics,
509 bloom_filter_apply_metrics,
510 fulltext_index_apply_metrics,
511 },
512 num_record_batches,
513 num_batches,
514 num_rows,
515 scan_cost,
516 metadata_cache_metrics,
517 fetch_metrics,
518 } = other;
519
520 self.build_parts_cost += *build_cost;
521 self.sst_scan_cost += *scan_cost;
522
523 self.rg_total += *rg_total;
524 self.rg_fulltext_filtered += *rg_fulltext_filtered;
525 self.rg_inverted_filtered += *rg_inverted_filtered;
526 self.rg_minmax_filtered += *rg_minmax_filtered;
527 self.rg_bloom_filtered += *rg_bloom_filtered;
528
529 self.rows_before_filter += *rows_total;
530 self.rows_fulltext_filtered += *rows_fulltext_filtered;
531 self.rows_inverted_filtered += *rows_inverted_filtered;
532 self.rows_bloom_filtered += *rows_bloom_filtered;
533 self.rows_precise_filtered += *rows_precise_filtered;
534
535 self.num_sst_record_batches += *num_record_batches;
536 self.num_sst_batches += *num_batches;
537 self.num_sst_rows += *num_rows;
538
539 if let Some(metrics) = inverted_index_apply_metrics {
541 self.inverted_index_apply_metrics
542 .get_or_insert_with(InvertedIndexApplyMetrics::default)
543 .merge_from(metrics);
544 }
545 if let Some(metrics) = bloom_filter_apply_metrics {
546 self.bloom_filter_apply_metrics
547 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
548 .merge_from(metrics);
549 }
550 if let Some(metrics) = fulltext_index_apply_metrics {
551 self.fulltext_index_apply_metrics
552 .get_or_insert_with(FulltextIndexApplyMetrics::default)
553 .merge_from(metrics);
554 }
555 if let Some(metrics) = fetch_metrics {
556 self.fetch_metrics
557 .get_or_insert_with(ParquetFetchMetrics::default)
558 .merge_from(metrics);
559 }
560 self.metadata_cache_metrics
561 .get_or_insert_with(MetadataCacheMetrics::default)
562 .merge_from(metadata_cache_metrics);
563 }
564
565 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
567 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
568 for (file_id, metrics) in other {
569 self_file_metrics
570 .entry(*file_id)
571 .or_default()
572 .merge_from(metrics);
573 }
574 }
575
576 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
578 let SeriesDistributorMetrics {
579 num_series_send_timeout,
580 num_series_send_full,
581 num_rows,
582 num_batches,
583 scan_cost,
584 yield_cost,
585 } = distributor_metrics;
586
587 self.num_series_send_timeout += *num_series_send_timeout;
588 self.num_series_send_full += *num_series_send_full;
589 self.num_distributor_rows += *num_rows;
590 self.num_distributor_batches += *num_batches;
591 self.distributor_scan_cost += *scan_cost;
592 self.distributor_yield_cost += *yield_cost;
593 }
594
595 fn observe_metrics(&self) {
597 READ_STAGE_ELAPSED
598 .with_label_values(&["prepare_scan"])
599 .observe(self.prepare_scan_cost.as_secs_f64());
600 READ_STAGE_ELAPSED
601 .with_label_values(&["build_reader"])
602 .observe(self.build_reader_cost.as_secs_f64());
603 READ_STAGE_ELAPSED
604 .with_label_values(&["scan"])
605 .observe(self.scan_cost.as_secs_f64());
606 READ_STAGE_ELAPSED
607 .with_label_values(&["yield"])
608 .observe(self.yield_cost.as_secs_f64());
609 READ_STAGE_ELAPSED
610 .with_label_values(&["total"])
611 .observe(self.total_cost.as_secs_f64());
612 READ_ROWS_RETURN.observe(self.num_rows as f64);
613 READ_BATCHES_RETURN.observe(self.num_batches as f64);
614
615 READ_STAGE_ELAPSED
616 .with_label_values(&["build_parts"])
617 .observe(self.build_parts_cost.as_secs_f64());
618
619 READ_ROW_GROUPS_TOTAL
620 .with_label_values(&["before_filtering"])
621 .inc_by(self.rg_total as u64);
622 READ_ROW_GROUPS_TOTAL
623 .with_label_values(&["fulltext_index_filtered"])
624 .inc_by(self.rg_fulltext_filtered as u64);
625 READ_ROW_GROUPS_TOTAL
626 .with_label_values(&["inverted_index_filtered"])
627 .inc_by(self.rg_inverted_filtered as u64);
628 READ_ROW_GROUPS_TOTAL
629 .with_label_values(&["minmax_index_filtered"])
630 .inc_by(self.rg_minmax_filtered as u64);
631 READ_ROW_GROUPS_TOTAL
632 .with_label_values(&["bloom_filter_index_filtered"])
633 .inc_by(self.rg_bloom_filtered as u64);
634
635 PRECISE_FILTER_ROWS_TOTAL
636 .with_label_values(&["parquet"])
637 .inc_by(self.rows_precise_filtered as u64);
638 READ_ROWS_IN_ROW_GROUP_TOTAL
639 .with_label_values(&["before_filtering"])
640 .inc_by(self.rows_before_filter as u64);
641 READ_ROWS_IN_ROW_GROUP_TOTAL
642 .with_label_values(&["fulltext_index_filtered"])
643 .inc_by(self.rows_fulltext_filtered as u64);
644 READ_ROWS_IN_ROW_GROUP_TOTAL
645 .with_label_values(&["inverted_index_filtered"])
646 .inc_by(self.rows_inverted_filtered as u64);
647 READ_ROWS_IN_ROW_GROUP_TOTAL
648 .with_label_values(&["bloom_filter_index_filtered"])
649 .inc_by(self.rows_bloom_filtered as u64);
650 }
651}
652
653struct PartitionMetricsInner {
654 region_id: RegionId,
655 partition: usize,
657 scanner_type: &'static str,
659 query_start: Instant,
661 explain_verbose: bool,
663 metrics: Mutex<ScanMetricsSet>,
665 in_progress_scan: IntGauge,
666
667 build_parts_cost: Time,
670 build_reader_cost: Time,
672 scan_cost: Time,
674 yield_cost: Time,
676 convert_cost: Time,
678 elapsed_compute: Time,
680}
681
682impl PartitionMetricsInner {
683 fn on_finish(&self, stream_eof: bool) {
684 let mut metrics = self.metrics.lock().unwrap();
685 if metrics.total_cost.is_zero() {
686 metrics.total_cost = self.query_start.elapsed();
687 }
688 if !metrics.stream_eof {
689 metrics.stream_eof = stream_eof;
690 }
691 }
692}
693
694impl MergeMetricsReport for PartitionMetricsInner {
695 fn report(&self, metrics: &mut MergeMetrics) {
696 let mut scan_metrics = self.metrics.lock().unwrap();
697 scan_metrics.merge_metrics.merge(metrics);
699
700 *metrics = MergeMetrics::default();
702 }
703}
704
705impl DedupMetricsReport for PartitionMetricsInner {
706 fn report(&self, metrics: &mut DedupMetrics) {
707 let mut scan_metrics = self.metrics.lock().unwrap();
708 scan_metrics.dedup_metrics.merge(metrics);
710
711 *metrics = DedupMetrics::default();
713 }
714}
715
716impl Drop for PartitionMetricsInner {
717 fn drop(&mut self) {
718 self.on_finish(false);
719 let metrics = self.metrics.lock().unwrap();
720 metrics.observe_metrics();
721 self.in_progress_scan.dec();
722
723 if self.explain_verbose {
724 common_telemetry::info!(
725 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
726 self.scanner_type,
727 self.region_id,
728 self.partition,
729 metrics,
730 self.convert_cost,
731 );
732 } else {
733 common_telemetry::debug!(
734 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
735 self.scanner_type,
736 self.region_id,
737 self.partition,
738 metrics,
739 self.convert_cost,
740 );
741 }
742 }
743}
744
745#[derive(Default)]
747pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
748
749impl PartitionMetricsList {
750 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
752 let mut list = self.0.lock().unwrap();
753 if list.len() <= partition {
754 list.resize(partition + 1, None);
755 }
756 list[partition] = Some(metrics);
757 }
758
759 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
761 let list = self.0.lock().unwrap();
762 write!(f, ", \"metrics_per_partition\": ")?;
763 f.debug_list()
764 .entries(list.iter().filter_map(|p| p.as_ref()))
765 .finish()?;
766 write!(f, "}}")
767 }
768}
769
770#[derive(Clone)]
772pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
773
774impl PartitionMetrics {
775 pub(crate) fn new(
776 region_id: RegionId,
777 partition: usize,
778 scanner_type: &'static str,
779 query_start: Instant,
780 explain_verbose: bool,
781 metrics_set: &ExecutionPlanMetricsSet,
782 ) -> Self {
783 let partition_str = partition.to_string();
784 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
785 in_progress_scan.inc();
786 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
787 let inner = PartitionMetricsInner {
788 region_id,
789 partition,
790 scanner_type,
791 query_start,
792 explain_verbose,
793 metrics: Mutex::new(metrics),
794 in_progress_scan,
795 build_parts_cost: MetricBuilder::new(metrics_set)
796 .subset_time("build_parts_cost", partition),
797 build_reader_cost: MetricBuilder::new(metrics_set)
798 .subset_time("build_reader_cost", partition),
799 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
800 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
801 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
802 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
803 };
804 Self(Arc::new(inner))
805 }
806
807 pub(crate) fn on_first_poll(&self) {
808 let mut metrics = self.0.metrics.lock().unwrap();
809 metrics.first_poll = self.0.query_start.elapsed();
810 }
811
812 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
813 let mut metrics = self.0.metrics.lock().unwrap();
814 metrics.num_mem_ranges += num;
815 }
816
817 pub fn inc_num_file_ranges(&self, num: usize) {
818 let mut metrics = self.0.metrics.lock().unwrap();
819 metrics.num_file_ranges += num;
820 }
821
822 fn record_elapsed_compute(&self, duration: Duration) {
823 if duration.is_zero() {
824 return;
825 }
826 self.0.elapsed_compute.add_duration(duration);
827 }
828
829 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
831 self.0.build_reader_cost.add_duration(cost);
832
833 let mut metrics = self.0.metrics.lock().unwrap();
834 metrics.build_reader_cost += cost;
835 }
836
837 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
838 self.0.convert_cost.add_duration(cost);
839 self.record_elapsed_compute(cost);
840 }
841
842 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
844 let mut metrics = self.0.metrics.lock().unwrap();
845 metrics.mem_scan_cost += data.scan_cost;
846 metrics.mem_rows += data.num_rows;
847 metrics.mem_batches += data.num_batches;
848 metrics.mem_series += data.total_series;
849 }
850
851 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
853 self.0
854 .build_reader_cost
855 .add_duration(metrics.build_reader_cost);
856 self.0.scan_cost.add_duration(metrics.scan_cost);
857 self.record_elapsed_compute(metrics.scan_cost);
858 self.0.yield_cost.add_duration(metrics.yield_cost);
859 self.record_elapsed_compute(metrics.yield_cost);
860
861 let mut metrics_set = self.0.metrics.lock().unwrap();
862 metrics_set.merge_scanner_metrics(metrics);
863 }
864
865 pub fn merge_reader_metrics(
867 &self,
868 metrics: &ReaderMetrics,
869 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
870 ) {
871 self.0.build_parts_cost.add_duration(metrics.build_cost);
872
873 let mut metrics_set = self.0.metrics.lock().unwrap();
874 metrics_set.merge_reader_metrics(metrics);
875
876 if let Some(file_metrics) = per_file_metrics {
878 metrics_set.merge_per_file_metrics(file_metrics);
879 }
880 }
881
882 pub(crate) fn on_finish(&self) {
884 self.0.on_finish(true);
885 }
886
887 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
889 let mut metrics_set = self.0.metrics.lock().unwrap();
890 metrics_set.set_distributor_metrics(metrics);
891 }
892
893 pub(crate) fn explain_verbose(&self) -> bool {
895 self.0.explain_verbose
896 }
897
898 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
900 self.0.clone()
901 }
902
903 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
905 self.0.clone()
906 }
907}
908
909impl fmt::Debug for PartitionMetrics {
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 let metrics = self.0.metrics.lock().unwrap();
912 write!(
913 f,
914 r#"{{"partition":{}, "metrics":{:?}}}"#,
915 self.0.partition, metrics
916 )
917 }
918}
919
920#[derive(Default)]
922pub(crate) struct SeriesDistributorMetrics {
923 pub(crate) num_series_send_timeout: usize,
925 pub(crate) num_series_send_full: usize,
927 pub(crate) num_rows: usize,
929 pub(crate) num_batches: usize,
931 pub(crate) scan_cost: Duration,
933 pub(crate) yield_cost: Duration,
935}
936
937#[tracing::instrument(
939 skip_all,
940 fields(
941 region_id = %stream_ctx.input.region_metadata().region_id,
942 file_or_mem_index = %index.index,
943 row_group_index = %index.row_group_index,
944 source = "mem"
945 )
946)]
947pub(crate) fn scan_mem_ranges(
948 stream_ctx: Arc<StreamContext>,
949 part_metrics: PartitionMetrics,
950 index: RowGroupIndex,
951 time_range: FileTimeRange,
952) -> impl Stream<Item = Result<Batch>> {
953 try_stream! {
954 let ranges = stream_ctx.input.build_mem_ranges(index);
955 part_metrics.inc_num_mem_ranges(ranges.len());
956 for range in ranges {
957 let build_reader_start = Instant::now();
958 let mem_scan_metrics = Some(MemScanMetrics::default());
959 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
960 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
961
962 let mut source = Source::Iter(iter);
963 while let Some(batch) = source.next_batch().await? {
964 yield batch;
965 }
966
967 if let Some(ref metrics) = mem_scan_metrics {
969 let data = metrics.data();
970 part_metrics.report_mem_scan_metrics(&data);
971 }
972 }
973 }
974}
975
976#[tracing::instrument(
978 skip_all,
979 fields(
980 region_id = %stream_ctx.input.region_metadata().region_id,
981 row_group_index = %index.index,
982 source = "mem_flat"
983 )
984)]
985pub(crate) fn scan_flat_mem_ranges(
986 stream_ctx: Arc<StreamContext>,
987 part_metrics: PartitionMetrics,
988 index: RowGroupIndex,
989) -> impl Stream<Item = Result<RecordBatch>> {
990 try_stream! {
991 let ranges = stream_ctx.input.build_mem_ranges(index);
992 part_metrics.inc_num_mem_ranges(ranges.len());
993 for range in ranges {
994 let build_reader_start = Instant::now();
995 let mem_scan_metrics = Some(MemScanMetrics::default());
996 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
997 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
998
999 while let Some(record_batch) = iter.next().transpose()? {
1000 yield record_batch;
1001 }
1002
1003 if let Some(ref metrics) = mem_scan_metrics {
1005 let data = metrics.data();
1006 part_metrics.report_mem_scan_metrics(&data);
1007 }
1008 }
1009 }
1010}
1011
1012const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1014const NUM_SERIES_THRESHOLD: u64 = 10240;
1016const BATCH_SIZE_THRESHOLD: u64 = 50;
1019
1020pub(crate) fn should_split_flat_batches_for_merge(
1022 stream_ctx: &Arc<StreamContext>,
1023 range_meta: &RangeMeta,
1024) -> bool {
1025 let mut num_files_to_split = 0;
1027 let mut num_mem_rows = 0;
1028 let mut num_mem_series = 0;
1029 for index in &range_meta.row_group_indices {
1033 if stream_ctx.is_mem_range_index(*index) {
1034 let memtable = &stream_ctx.input.memtables[index.index];
1035 let stats = memtable.stats();
1037 num_mem_rows += stats.num_rows();
1038 num_mem_series += stats.series_count();
1039 } else if stream_ctx.is_file_range_index(*index) {
1040 let file_index = index.index - stream_ctx.input.num_memtables();
1042 let file = &stream_ctx.input.files[file_index];
1043 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1044 continue;
1046 }
1047 debug_assert!(file.meta_ref().num_rows > 0);
1048 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1049 return false;
1051 } else {
1052 num_files_to_split += 1;
1053 }
1054 }
1055 }
1057
1058 if num_files_to_split > 0 {
1059 true
1061 } else if num_mem_series > 0 && num_mem_rows > 0 {
1062 can_split_series(num_mem_rows as u64, num_mem_series as u64)
1064 } else {
1065 false
1066 }
1067}
1068
1069fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1070 assert!(num_series > 0);
1071 assert!(num_rows > 0);
1072
1073 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1075}
1076
1077fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1080 if explain_verbose {
1081 ReaderFilterMetrics {
1082 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1083 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1084 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1085 ..Default::default()
1086 }
1087 } else {
1088 ReaderFilterMetrics::default()
1089 }
1090}
1091
1092#[tracing::instrument(
1094 skip_all,
1095 fields(
1096 region_id = %stream_ctx.input.region_metadata().region_id,
1097 row_group_index = %index.index,
1098 source = read_type
1099 )
1100)]
1101pub(crate) async fn scan_file_ranges(
1102 stream_ctx: Arc<StreamContext>,
1103 part_metrics: PartitionMetrics,
1104 index: RowGroupIndex,
1105 read_type: &'static str,
1106 range_builder: Arc<RangeBuilderList>,
1107) -> Result<impl Stream<Item = Result<Batch>>> {
1108 let mut reader_metrics = ReaderMetrics {
1109 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1110 ..Default::default()
1111 };
1112 let ranges = range_builder
1113 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1114 .await?;
1115 part_metrics.inc_num_file_ranges(ranges.len());
1116 part_metrics.merge_reader_metrics(&reader_metrics, None);
1117
1118 let init_per_file_metrics = if part_metrics.explain_verbose() {
1120 let file = stream_ctx.input.file_from_index(index);
1121 let file_id = file.file_id();
1122
1123 let mut map = HashMap::new();
1124 map.insert(
1125 file_id,
1126 FileScanMetrics {
1127 build_part_cost: reader_metrics.build_cost,
1128 ..Default::default()
1129 },
1130 );
1131 Some(map)
1132 } else {
1133 None
1134 };
1135
1136 Ok(build_file_range_scan_stream(
1137 stream_ctx,
1138 part_metrics,
1139 read_type,
1140 ranges,
1141 init_per_file_metrics,
1142 ))
1143}
1144
1145#[tracing::instrument(
1147 skip_all,
1148 fields(
1149 region_id = %stream_ctx.input.region_metadata().region_id,
1150 row_group_index = %index.index,
1151 source = read_type
1152 )
1153)]
1154pub(crate) async fn scan_flat_file_ranges(
1155 stream_ctx: Arc<StreamContext>,
1156 part_metrics: PartitionMetrics,
1157 index: RowGroupIndex,
1158 read_type: &'static str,
1159 range_builder: Arc<RangeBuilderList>,
1160) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1161 let mut reader_metrics = ReaderMetrics {
1162 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1163 ..Default::default()
1164 };
1165 let ranges = range_builder
1166 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1167 .await?;
1168 part_metrics.inc_num_file_ranges(ranges.len());
1169 part_metrics.merge_reader_metrics(&reader_metrics, None);
1170
1171 let init_per_file_metrics = if part_metrics.explain_verbose() {
1173 let file = stream_ctx.input.file_from_index(index);
1174 let file_id = file.file_id();
1175
1176 let mut map = HashMap::new();
1177 map.insert(
1178 file_id,
1179 FileScanMetrics {
1180 build_part_cost: reader_metrics.build_cost,
1181 ..Default::default()
1182 },
1183 );
1184 Some(map)
1185 } else {
1186 None
1187 };
1188
1189 Ok(build_flat_file_range_scan_stream(
1190 stream_ctx,
1191 part_metrics,
1192 read_type,
1193 ranges,
1194 init_per_file_metrics,
1195 ))
1196}
1197
1198#[tracing::instrument(
1200 skip_all,
1201 fields(read_type = read_type, range_count = ranges.len())
1202)]
1203pub fn build_file_range_scan_stream(
1204 stream_ctx: Arc<StreamContext>,
1205 part_metrics: PartitionMetrics,
1206 read_type: &'static str,
1207 ranges: SmallVec<[FileRange; 2]>,
1208 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1209) -> impl Stream<Item = Result<Batch>> {
1210 try_stream! {
1211 let fetch_metrics = if part_metrics.explain_verbose() {
1212 Some(Arc::new(ParquetFetchMetrics::default()))
1213 } else {
1214 None
1215 };
1216 let reader_metrics = &mut ReaderMetrics {
1217 fetch_metrics: fetch_metrics.clone(),
1218 ..Default::default()
1219 };
1220 for range in ranges {
1221 let build_reader_start = Instant::now();
1222 let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1223 continue;
1224 };
1225 let build_cost = build_reader_start.elapsed();
1226 part_metrics.inc_build_reader_cost(build_cost);
1227 let compat_batch = range.compat_batch();
1228 let mut source = Source::PruneReader(reader);
1229 while let Some(mut batch) = source.next_batch().await? {
1230 if let Some(compact_batch) = compat_batch {
1231 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1232 }
1233 yield batch;
1234 }
1235 if let Source::PruneReader(reader) = source {
1236 let prune_metrics = reader.metrics();
1237
1238 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1240 let file_id = range.file_handle().file_id();
1241 let file_metrics = file_metrics_map
1242 .entry(file_id)
1243 .or_insert_with(FileScanMetrics::default);
1244
1245 file_metrics.num_ranges += 1;
1246 file_metrics.num_rows += prune_metrics.num_rows;
1247 file_metrics.build_reader_cost += build_cost;
1248 file_metrics.scan_cost += prune_metrics.scan_cost;
1249 }
1250
1251 reader_metrics.merge_from(&prune_metrics);
1252 }
1253 }
1254
1255 reader_metrics.observe_rows(read_type);
1257 reader_metrics.filter_metrics.observe();
1258 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1259 }
1260}
1261
1262#[tracing::instrument(
1264 skip_all,
1265 fields(read_type = read_type, range_count = ranges.len())
1266)]
1267pub fn build_flat_file_range_scan_stream(
1268 _stream_ctx: Arc<StreamContext>,
1269 part_metrics: PartitionMetrics,
1270 read_type: &'static str,
1271 ranges: SmallVec<[FileRange; 2]>,
1272 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1273) -> impl Stream<Item = Result<RecordBatch>> {
1274 try_stream! {
1275 let fetch_metrics = if part_metrics.explain_verbose() {
1276 Some(Arc::new(ParquetFetchMetrics::default()))
1277 } else {
1278 None
1279 };
1280 let reader_metrics = &mut ReaderMetrics {
1281 fetch_metrics: fetch_metrics.clone(),
1282 ..Default::default()
1283 };
1284 for range in ranges {
1285 let build_reader_start = Instant::now();
1286 let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1287 let build_cost = build_reader_start.elapsed();
1288 part_metrics.inc_build_reader_cost(build_cost);
1289
1290 let may_compat = range
1291 .compat_batch()
1292 .map(|compat| {
1293 compat.as_flat().context(UnexpectedSnafu {
1294 reason: "Invalid compat for flat format",
1295 })
1296 })
1297 .transpose()?;
1298 while let Some(record_batch) = reader.next_batch()? {
1299 if let Some(flat_compat) = may_compat {
1300 let batch = flat_compat.compat(record_batch)?;
1301 yield batch;
1302 } else {
1303 yield record_batch;
1304 }
1305 }
1306
1307 let prune_metrics = reader.metrics();
1308
1309 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1311 let file_id = range.file_handle().file_id();
1312 let file_metrics = file_metrics_map
1313 .entry(file_id)
1314 .or_insert_with(FileScanMetrics::default);
1315
1316 file_metrics.num_ranges += 1;
1317 file_metrics.num_rows += prune_metrics.num_rows;
1318 file_metrics.build_reader_cost += build_cost;
1319 file_metrics.scan_cost += prune_metrics.scan_cost;
1320 }
1321
1322 reader_metrics.merge_from(&prune_metrics);
1323 }
1324
1325 reader_metrics.observe_rows(read_type);
1327 reader_metrics.filter_metrics.observe();
1328 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1329 }
1330}
1331
1332#[cfg(feature = "enterprise")]
1334pub(crate) async fn scan_extension_range(
1335 context: Arc<StreamContext>,
1336 index: RowGroupIndex,
1337 partition_metrics: PartitionMetrics,
1338) -> Result<BoxedBatchStream> {
1339 use snafu::ResultExt;
1340
1341 let range = context.input.extension_range(index.index);
1342 let reader = range.reader(context.as_ref());
1343 let stream = reader
1344 .read(context, partition_metrics, index)
1345 .await
1346 .context(crate::error::ScanExternalRangeSnafu)?;
1347 Ok(stream)
1348}
1349
1350pub(crate) async fn maybe_scan_other_ranges(
1351 context: &Arc<StreamContext>,
1352 index: RowGroupIndex,
1353 metrics: &PartitionMetrics,
1354) -> Result<BoxedBatchStream> {
1355 #[cfg(feature = "enterprise")]
1356 {
1357 scan_extension_range(context.clone(), index, metrics.clone()).await
1358 }
1359
1360 #[cfg(not(feature = "enterprise"))]
1361 {
1362 let _ = context;
1363 let _ = index;
1364 let _ = metrics;
1365
1366 crate::error::UnexpectedSnafu {
1367 reason: "no other ranges scannable",
1368 }
1369 .fail()
1370 }
1371}
1372
1373pub(crate) async fn maybe_scan_flat_other_ranges(
1374 context: &Arc<StreamContext>,
1375 index: RowGroupIndex,
1376 metrics: &PartitionMetrics,
1377) -> Result<BoxedRecordBatchStream> {
1378 let _ = context;
1379 let _ = index;
1380 let _ = metrics;
1381
1382 crate::error::UnexpectedSnafu {
1383 reason: "no other ranges scannable in flat format",
1384 }
1385 .fail()
1386}
1387
1388pub(crate) struct SplitRecordBatchStream<S> {
1390 inner: S,
1392 batches: VecDeque<RecordBatch>,
1394}
1395
1396impl<S> SplitRecordBatchStream<S> {
1397 pub(crate) fn new(inner: S) -> Self {
1399 Self {
1400 inner,
1401 batches: VecDeque::new(),
1402 }
1403 }
1404}
1405
1406impl<S> Stream for SplitRecordBatchStream<S>
1407where
1408 S: Stream<Item = Result<RecordBatch>> + Unpin,
1409{
1410 type Item = Result<RecordBatch>;
1411
1412 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1413 loop {
1414 if let Some(batch) = self.batches.pop_front() {
1416 return Poll::Ready(Some(Ok(batch)));
1417 }
1418
1419 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1421 Some(Ok(batch)) => batch,
1422 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1423 None => return Poll::Ready(None),
1424 };
1425
1426 split_record_batch(record_batch, &mut self.batches);
1428 }
1430 }
1431}
1432
1433pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1438 let batch_rows = record_batch.num_rows();
1439 if batch_rows == 0 {
1440 return;
1441 }
1442 if batch_rows < 2 {
1443 batches.push_back(record_batch);
1444 return;
1445 }
1446
1447 let time_index_pos = time_index_column_index(record_batch.num_columns());
1448 let timestamps = record_batch.column(time_index_pos);
1449 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1450 let mut offsets = Vec::with_capacity(16);
1451 offsets.push(0);
1452 let values = ts_values.values();
1453 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1454 if value > values[i + 1] {
1455 offsets.push(i + 1);
1456 }
1457 }
1458 offsets.push(values.len());
1459
1460 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1462 let end = offsets[i + 1];
1463 let rows_in_batch = end - start;
1464 batches.push_back(record_batch.slice(start, rows_in_batch));
1465 }
1466}