1use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::Stream;
29use prometheus::IntGauge;
30use smallvec::SmallVec;
31use snafu::OptionExt;
32use store_api::storage::RegionId;
33
34use crate::error::{Result, UnexpectedSnafu};
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
43use crate::read::scan_region::StreamContext;
44use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
45use crate::sst::file::{FileTimeRange, RegionFileId};
46use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
47use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
48use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
49use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
50use crate::sst::parquet::file_range::FileRange;
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54
55#[derive(Default, Clone)]
57pub struct FileScanMetrics {
58 pub num_ranges: usize,
60 pub num_rows: usize,
62 pub build_part_cost: Duration,
64 pub build_reader_cost: Duration,
66 pub scan_cost: Duration,
68}
69
70impl fmt::Debug for FileScanMetrics {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
73
74 if self.num_ranges > 0 {
75 write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
76 }
77 if self.num_rows > 0 {
78 write!(f, ", \"num_rows\":{}", self.num_rows)?;
79 }
80 if !self.build_reader_cost.is_zero() {
81 write!(
82 f,
83 ", \"build_reader_cost\":\"{:?}\"",
84 self.build_reader_cost
85 )?;
86 }
87 if !self.scan_cost.is_zero() {
88 write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
89 }
90
91 write!(f, "}}")
92 }
93}
94
95impl FileScanMetrics {
96 pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
98 self.num_ranges += other.num_ranges;
99 self.num_rows += other.num_rows;
100 self.build_part_cost += other.build_part_cost;
101 self.build_reader_cost += other.build_reader_cost;
102 self.scan_cost += other.scan_cost;
103 }
104}
105
106#[derive(Default)]
108pub(crate) struct ScanMetricsSet {
109 prepare_scan_cost: Duration,
111 build_reader_cost: Duration,
113 scan_cost: Duration,
115 yield_cost: Duration,
117 total_cost: Duration,
119 num_rows: usize,
121 num_batches: usize,
123 num_mem_ranges: usize,
125 num_file_ranges: usize,
127
128 mem_scan_cost: Duration,
131 mem_rows: usize,
133 mem_batches: usize,
135 mem_series: usize,
137
138 build_parts_cost: Duration,
141 sst_scan_cost: Duration,
143 rg_total: usize,
145 rg_fulltext_filtered: usize,
147 rg_inverted_filtered: usize,
149 rg_minmax_filtered: usize,
151 rg_bloom_filtered: usize,
153 rows_before_filter: usize,
155 rows_fulltext_filtered: usize,
157 rows_inverted_filtered: usize,
159 rows_bloom_filtered: usize,
161 rows_precise_filtered: usize,
163 num_sst_record_batches: usize,
165 num_sst_batches: usize,
167 num_sst_rows: usize,
169
170 first_poll: Duration,
172
173 num_series_send_timeout: usize,
175 num_series_send_full: usize,
177 num_distributor_rows: usize,
179 num_distributor_batches: usize,
181 distributor_scan_cost: Duration,
183 distributor_yield_cost: Duration,
185
186 merge_metrics: MergeMetrics,
188 dedup_metrics: DedupMetrics,
190
191 stream_eof: bool,
193
194 inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
197 bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
199 fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
201 fetch_metrics: Option<ParquetFetchMetrics>,
203 metadata_cache_metrics: Option<MetadataCacheMetrics>,
205 per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
207}
208
209struct CompareCostReverse<'a> {
212 total_cost: Duration,
213 file_id: RegionFileId,
214 metrics: &'a FileScanMetrics,
215}
216
217impl Ord for CompareCostReverse<'_> {
218 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
219 other.total_cost.cmp(&self.total_cost)
221 }
222}
223
224impl PartialOrd for CompareCostReverse<'_> {
225 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226 Some(self.cmp(other))
227 }
228}
229
230impl Eq for CompareCostReverse<'_> {}
231
232impl PartialEq for CompareCostReverse<'_> {
233 fn eq(&self, other: &Self) -> bool {
234 self.total_cost == other.total_cost
235 }
236}
237
238impl fmt::Debug for ScanMetricsSet {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 let ScanMetricsSet {
241 prepare_scan_cost,
242 build_reader_cost,
243 scan_cost,
244 yield_cost,
245 total_cost,
246 num_rows,
247 num_batches,
248 num_mem_ranges,
249 num_file_ranges,
250 build_parts_cost,
251 sst_scan_cost,
252 rg_total,
253 rg_fulltext_filtered,
254 rg_inverted_filtered,
255 rg_minmax_filtered,
256 rg_bloom_filtered,
257 rows_before_filter,
258 rows_fulltext_filtered,
259 rows_inverted_filtered,
260 rows_bloom_filtered,
261 rows_precise_filtered,
262 num_sst_record_batches,
263 num_sst_batches,
264 num_sst_rows,
265 first_poll,
266 num_series_send_timeout,
267 num_series_send_full,
268 num_distributor_rows,
269 num_distributor_batches,
270 distributor_scan_cost,
271 distributor_yield_cost,
272 merge_metrics,
273 dedup_metrics,
274 stream_eof,
275 mem_scan_cost,
276 mem_rows,
277 mem_batches,
278 mem_series,
279 inverted_index_apply_metrics,
280 bloom_filter_apply_metrics,
281 fulltext_index_apply_metrics,
282 fetch_metrics,
283 metadata_cache_metrics,
284 per_file_metrics,
285 } = self;
286
287 write!(
289 f,
290 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
291 \"build_reader_cost\":\"{build_reader_cost:?}\", \
292 \"scan_cost\":\"{scan_cost:?}\", \
293 \"yield_cost\":\"{yield_cost:?}\", \
294 \"total_cost\":\"{total_cost:?}\", \
295 \"num_rows\":{num_rows}, \
296 \"num_batches\":{num_batches}, \
297 \"num_mem_ranges\":{num_mem_ranges}, \
298 \"num_file_ranges\":{num_file_ranges}, \
299 \"build_parts_cost\":\"{build_parts_cost:?}\", \
300 \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
301 \"rg_total\":{rg_total}, \
302 \"rows_before_filter\":{rows_before_filter}, \
303 \"num_sst_record_batches\":{num_sst_record_batches}, \
304 \"num_sst_batches\":{num_sst_batches}, \
305 \"num_sst_rows\":{num_sst_rows}, \
306 \"first_poll\":\"{first_poll:?}\""
307 )?;
308
309 if *rg_fulltext_filtered > 0 {
311 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
312 }
313 if *rg_inverted_filtered > 0 {
314 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
315 }
316 if *rg_minmax_filtered > 0 {
317 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
318 }
319 if *rg_bloom_filtered > 0 {
320 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
321 }
322 if *rows_fulltext_filtered > 0 {
323 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
324 }
325 if *rows_inverted_filtered > 0 {
326 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
327 }
328 if *rows_bloom_filtered > 0 {
329 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
330 }
331 if *rows_precise_filtered > 0 {
332 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
333 }
334
335 if *num_series_send_timeout > 0 {
337 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
338 }
339 if *num_series_send_full > 0 {
340 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
341 }
342 if *num_distributor_rows > 0 {
343 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
344 }
345 if *num_distributor_batches > 0 {
346 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
347 }
348 if !distributor_scan_cost.is_zero() {
349 write!(
350 f,
351 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
352 )?;
353 }
354 if !distributor_yield_cost.is_zero() {
355 write!(
356 f,
357 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
358 )?;
359 }
360
361 if *mem_rows > 0 {
363 write!(f, ", \"mem_rows\":{mem_rows}")?;
364 }
365 if *mem_batches > 0 {
366 write!(f, ", \"mem_batches\":{mem_batches}")?;
367 }
368 if *mem_series > 0 {
369 write!(f, ", \"mem_series\":{mem_series}")?;
370 }
371 if !mem_scan_cost.is_zero() {
372 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
373 }
374
375 if let Some(metrics) = inverted_index_apply_metrics
377 && !metrics.is_empty()
378 {
379 write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
380 }
381 if let Some(metrics) = bloom_filter_apply_metrics
382 && !metrics.is_empty()
383 {
384 write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
385 }
386 if let Some(metrics) = fulltext_index_apply_metrics
387 && !metrics.is_empty()
388 {
389 write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
390 }
391 if let Some(metrics) = fetch_metrics
392 && !metrics.is_empty()
393 {
394 write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
395 }
396 if let Some(metrics) = metadata_cache_metrics
397 && !metrics.is_empty()
398 {
399 write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
400 }
401
402 if !merge_metrics.scan_cost.is_zero() {
404 write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
405 }
406
407 if !dedup_metrics.dedup_cost.is_zero() {
409 write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
410 }
411
412 if let Some(file_metrics) = per_file_metrics
414 && !file_metrics.is_empty()
415 {
416 let mut heap = BinaryHeap::new();
418 for (file_id, metrics) in file_metrics.iter() {
419 let total_cost =
420 metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
421
422 if heap.len() < 10 {
423 heap.push(CompareCostReverse {
425 total_cost,
426 file_id: *file_id,
427 metrics,
428 });
429 } else if let Some(min_entry) = heap.peek() {
430 if total_cost > min_entry.total_cost {
432 heap.pop();
433 heap.push(CompareCostReverse {
434 total_cost,
435 file_id: *file_id,
436 metrics,
437 });
438 }
439 }
440 }
441
442 let top_files = heap.into_sorted_vec();
443 write!(f, ", \"top_file_metrics\": {{")?;
444 for (i, item) in top_files.iter().enumerate() {
445 let CompareCostReverse {
446 total_cost: _,
447 file_id,
448 metrics,
449 } = item;
450 if i > 0 {
451 write!(f, ", ")?;
452 }
453 write!(f, "\"{}\": {:?}", file_id, metrics)?;
454 }
455 write!(f, "}}")?;
456 }
457
458 write!(f, ", \"stream_eof\":{stream_eof}}}")
459 }
460}
461impl ScanMetricsSet {
462 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
464 self.prepare_scan_cost += cost;
465 self
466 }
467
468 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
470 let ScannerMetrics {
471 prepare_scan_cost,
472 build_reader_cost,
473 scan_cost,
474 yield_cost,
475 num_batches,
476 num_rows,
477 num_mem_ranges,
478 num_file_ranges,
479 } = other;
480
481 self.prepare_scan_cost += *prepare_scan_cost;
482 self.build_reader_cost += *build_reader_cost;
483 self.scan_cost += *scan_cost;
484 self.yield_cost += *yield_cost;
485 self.num_rows += *num_rows;
486 self.num_batches += *num_batches;
487 self.num_mem_ranges += *num_mem_ranges;
488 self.num_file_ranges += *num_file_ranges;
489 }
490
491 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
493 let ReaderMetrics {
494 build_cost,
495 filter_metrics:
496 ReaderFilterMetrics {
497 rg_total,
498 rg_fulltext_filtered,
499 rg_inverted_filtered,
500 rg_minmax_filtered,
501 rg_bloom_filtered,
502 rows_total,
503 rows_fulltext_filtered,
504 rows_inverted_filtered,
505 rows_bloom_filtered,
506 rows_precise_filtered,
507 inverted_index_apply_metrics,
508 bloom_filter_apply_metrics,
509 fulltext_index_apply_metrics,
510 },
511 num_record_batches,
512 num_batches,
513 num_rows,
514 scan_cost,
515 metadata_cache_metrics,
516 fetch_metrics,
517 } = other;
518
519 self.build_parts_cost += *build_cost;
520 self.sst_scan_cost += *scan_cost;
521
522 self.rg_total += *rg_total;
523 self.rg_fulltext_filtered += *rg_fulltext_filtered;
524 self.rg_inverted_filtered += *rg_inverted_filtered;
525 self.rg_minmax_filtered += *rg_minmax_filtered;
526 self.rg_bloom_filtered += *rg_bloom_filtered;
527
528 self.rows_before_filter += *rows_total;
529 self.rows_fulltext_filtered += *rows_fulltext_filtered;
530 self.rows_inverted_filtered += *rows_inverted_filtered;
531 self.rows_bloom_filtered += *rows_bloom_filtered;
532 self.rows_precise_filtered += *rows_precise_filtered;
533
534 self.num_sst_record_batches += *num_record_batches;
535 self.num_sst_batches += *num_batches;
536 self.num_sst_rows += *num_rows;
537
538 if let Some(metrics) = inverted_index_apply_metrics {
540 self.inverted_index_apply_metrics
541 .get_or_insert_with(InvertedIndexApplyMetrics::default)
542 .merge_from(metrics);
543 }
544 if let Some(metrics) = bloom_filter_apply_metrics {
545 self.bloom_filter_apply_metrics
546 .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
547 .merge_from(metrics);
548 }
549 if let Some(metrics) = fulltext_index_apply_metrics {
550 self.fulltext_index_apply_metrics
551 .get_or_insert_with(FulltextIndexApplyMetrics::default)
552 .merge_from(metrics);
553 }
554 if let Some(metrics) = fetch_metrics {
555 self.fetch_metrics
556 .get_or_insert_with(ParquetFetchMetrics::default)
557 .merge_from(metrics);
558 }
559 self.metadata_cache_metrics
560 .get_or_insert_with(MetadataCacheMetrics::default)
561 .merge_from(metadata_cache_metrics);
562 }
563
564 fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
566 let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
567 for (file_id, metrics) in other {
568 self_file_metrics
569 .entry(*file_id)
570 .or_default()
571 .merge_from(metrics);
572 }
573 }
574
575 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
577 let SeriesDistributorMetrics {
578 num_series_send_timeout,
579 num_series_send_full,
580 num_rows,
581 num_batches,
582 scan_cost,
583 yield_cost,
584 } = distributor_metrics;
585
586 self.num_series_send_timeout += *num_series_send_timeout;
587 self.num_series_send_full += *num_series_send_full;
588 self.num_distributor_rows += *num_rows;
589 self.num_distributor_batches += *num_batches;
590 self.distributor_scan_cost += *scan_cost;
591 self.distributor_yield_cost += *yield_cost;
592 }
593
594 fn observe_metrics(&self) {
596 READ_STAGE_ELAPSED
597 .with_label_values(&["prepare_scan"])
598 .observe(self.prepare_scan_cost.as_secs_f64());
599 READ_STAGE_ELAPSED
600 .with_label_values(&["build_reader"])
601 .observe(self.build_reader_cost.as_secs_f64());
602 READ_STAGE_ELAPSED
603 .with_label_values(&["scan"])
604 .observe(self.scan_cost.as_secs_f64());
605 READ_STAGE_ELAPSED
606 .with_label_values(&["yield"])
607 .observe(self.yield_cost.as_secs_f64());
608 READ_STAGE_ELAPSED
609 .with_label_values(&["total"])
610 .observe(self.total_cost.as_secs_f64());
611 READ_ROWS_RETURN.observe(self.num_rows as f64);
612 READ_BATCHES_RETURN.observe(self.num_batches as f64);
613
614 READ_STAGE_ELAPSED
615 .with_label_values(&["build_parts"])
616 .observe(self.build_parts_cost.as_secs_f64());
617
618 READ_ROW_GROUPS_TOTAL
619 .with_label_values(&["before_filtering"])
620 .inc_by(self.rg_total as u64);
621 READ_ROW_GROUPS_TOTAL
622 .with_label_values(&["fulltext_index_filtered"])
623 .inc_by(self.rg_fulltext_filtered as u64);
624 READ_ROW_GROUPS_TOTAL
625 .with_label_values(&["inverted_index_filtered"])
626 .inc_by(self.rg_inverted_filtered as u64);
627 READ_ROW_GROUPS_TOTAL
628 .with_label_values(&["minmax_index_filtered"])
629 .inc_by(self.rg_minmax_filtered as u64);
630 READ_ROW_GROUPS_TOTAL
631 .with_label_values(&["bloom_filter_index_filtered"])
632 .inc_by(self.rg_bloom_filtered as u64);
633
634 PRECISE_FILTER_ROWS_TOTAL
635 .with_label_values(&["parquet"])
636 .inc_by(self.rows_precise_filtered as u64);
637 READ_ROWS_IN_ROW_GROUP_TOTAL
638 .with_label_values(&["before_filtering"])
639 .inc_by(self.rows_before_filter as u64);
640 READ_ROWS_IN_ROW_GROUP_TOTAL
641 .with_label_values(&["fulltext_index_filtered"])
642 .inc_by(self.rows_fulltext_filtered as u64);
643 READ_ROWS_IN_ROW_GROUP_TOTAL
644 .with_label_values(&["inverted_index_filtered"])
645 .inc_by(self.rows_inverted_filtered as u64);
646 READ_ROWS_IN_ROW_GROUP_TOTAL
647 .with_label_values(&["bloom_filter_index_filtered"])
648 .inc_by(self.rows_bloom_filtered as u64);
649 }
650}
651
652struct PartitionMetricsInner {
653 region_id: RegionId,
654 partition: usize,
656 scanner_type: &'static str,
658 query_start: Instant,
660 explain_verbose: bool,
662 metrics: Mutex<ScanMetricsSet>,
664 in_progress_scan: IntGauge,
665
666 build_parts_cost: Time,
669 build_reader_cost: Time,
671 scan_cost: Time,
673 yield_cost: Time,
675 convert_cost: Time,
677 elapsed_compute: Time,
679}
680
681impl PartitionMetricsInner {
682 fn on_finish(&self, stream_eof: bool) {
683 let mut metrics = self.metrics.lock().unwrap();
684 if metrics.total_cost.is_zero() {
685 metrics.total_cost = self.query_start.elapsed();
686 }
687 if !metrics.stream_eof {
688 metrics.stream_eof = stream_eof;
689 }
690 }
691}
692
693impl MergeMetricsReport for PartitionMetricsInner {
694 fn report(&self, metrics: &mut MergeMetrics) {
695 let mut scan_metrics = self.metrics.lock().unwrap();
696 scan_metrics.merge_metrics.merge(metrics);
698
699 *metrics = MergeMetrics::default();
701 }
702}
703
704impl DedupMetricsReport for PartitionMetricsInner {
705 fn report(&self, metrics: &mut DedupMetrics) {
706 let mut scan_metrics = self.metrics.lock().unwrap();
707 scan_metrics.dedup_metrics.merge(metrics);
709
710 *metrics = DedupMetrics::default();
712 }
713}
714
715impl Drop for PartitionMetricsInner {
716 fn drop(&mut self) {
717 self.on_finish(false);
718 let metrics = self.metrics.lock().unwrap();
719 metrics.observe_metrics();
720 self.in_progress_scan.dec();
721
722 if self.explain_verbose {
723 common_telemetry::info!(
724 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
725 self.scanner_type,
726 self.region_id,
727 self.partition,
728 metrics,
729 self.convert_cost,
730 );
731 } else {
732 common_telemetry::debug!(
733 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
734 self.scanner_type,
735 self.region_id,
736 self.partition,
737 metrics,
738 self.convert_cost,
739 );
740 }
741 }
742}
743
744#[derive(Default)]
746pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
747
748impl PartitionMetricsList {
749 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
751 let mut list = self.0.lock().unwrap();
752 if list.len() <= partition {
753 list.resize(partition + 1, None);
754 }
755 list[partition] = Some(metrics);
756 }
757
758 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
760 let list = self.0.lock().unwrap();
761 write!(f, ", \"metrics_per_partition\": ")?;
762 f.debug_list()
763 .entries(list.iter().filter_map(|p| p.as_ref()))
764 .finish()?;
765 write!(f, "}}")
766 }
767}
768
769#[derive(Clone)]
771pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
772
773impl PartitionMetrics {
774 pub(crate) fn new(
775 region_id: RegionId,
776 partition: usize,
777 scanner_type: &'static str,
778 query_start: Instant,
779 explain_verbose: bool,
780 metrics_set: &ExecutionPlanMetricsSet,
781 ) -> Self {
782 let partition_str = partition.to_string();
783 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
784 in_progress_scan.inc();
785 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
786 let inner = PartitionMetricsInner {
787 region_id,
788 partition,
789 scanner_type,
790 query_start,
791 explain_verbose,
792 metrics: Mutex::new(metrics),
793 in_progress_scan,
794 build_parts_cost: MetricBuilder::new(metrics_set)
795 .subset_time("build_parts_cost", partition),
796 build_reader_cost: MetricBuilder::new(metrics_set)
797 .subset_time("build_reader_cost", partition),
798 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
799 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
800 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
801 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
802 };
803 Self(Arc::new(inner))
804 }
805
806 pub(crate) fn on_first_poll(&self) {
807 let mut metrics = self.0.metrics.lock().unwrap();
808 metrics.first_poll = self.0.query_start.elapsed();
809 }
810
811 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
812 let mut metrics = self.0.metrics.lock().unwrap();
813 metrics.num_mem_ranges += num;
814 }
815
816 pub fn inc_num_file_ranges(&self, num: usize) {
817 let mut metrics = self.0.metrics.lock().unwrap();
818 metrics.num_file_ranges += num;
819 }
820
821 fn record_elapsed_compute(&self, duration: Duration) {
822 if duration.is_zero() {
823 return;
824 }
825 self.0.elapsed_compute.add_duration(duration);
826 }
827
828 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
830 self.0.build_reader_cost.add_duration(cost);
831
832 let mut metrics = self.0.metrics.lock().unwrap();
833 metrics.build_reader_cost += cost;
834 }
835
836 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
837 self.0.convert_cost.add_duration(cost);
838 self.record_elapsed_compute(cost);
839 }
840
841 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
843 let mut metrics = self.0.metrics.lock().unwrap();
844 metrics.mem_scan_cost += data.scan_cost;
845 metrics.mem_rows += data.num_rows;
846 metrics.mem_batches += data.num_batches;
847 metrics.mem_series += data.total_series;
848 }
849
850 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
852 self.0
853 .build_reader_cost
854 .add_duration(metrics.build_reader_cost);
855 self.0.scan_cost.add_duration(metrics.scan_cost);
856 self.record_elapsed_compute(metrics.scan_cost);
857 self.0.yield_cost.add_duration(metrics.yield_cost);
858 self.record_elapsed_compute(metrics.yield_cost);
859
860 let mut metrics_set = self.0.metrics.lock().unwrap();
861 metrics_set.merge_scanner_metrics(metrics);
862 }
863
864 pub fn merge_reader_metrics(
866 &self,
867 metrics: &ReaderMetrics,
868 per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
869 ) {
870 self.0.build_parts_cost.add_duration(metrics.build_cost);
871
872 let mut metrics_set = self.0.metrics.lock().unwrap();
873 metrics_set.merge_reader_metrics(metrics);
874
875 if let Some(file_metrics) = per_file_metrics {
877 metrics_set.merge_per_file_metrics(file_metrics);
878 }
879 }
880
881 pub(crate) fn on_finish(&self) {
883 self.0.on_finish(true);
884 }
885
886 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
888 let mut metrics_set = self.0.metrics.lock().unwrap();
889 metrics_set.set_distributor_metrics(metrics);
890 }
891
892 pub(crate) fn explain_verbose(&self) -> bool {
894 self.0.explain_verbose
895 }
896
897 pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
899 self.0.clone()
900 }
901
902 pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
904 self.0.clone()
905 }
906}
907
908impl fmt::Debug for PartitionMetrics {
909 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
910 let metrics = self.0.metrics.lock().unwrap();
911 write!(
912 f,
913 r#"{{"partition":{}, "metrics":{:?}}}"#,
914 self.0.partition, metrics
915 )
916 }
917}
918
919#[derive(Default)]
921pub(crate) struct SeriesDistributorMetrics {
922 pub(crate) num_series_send_timeout: usize,
924 pub(crate) num_series_send_full: usize,
926 pub(crate) num_rows: usize,
928 pub(crate) num_batches: usize,
930 pub(crate) scan_cost: Duration,
932 pub(crate) yield_cost: Duration,
934}
935
936pub(crate) fn scan_mem_ranges(
938 stream_ctx: Arc<StreamContext>,
939 part_metrics: PartitionMetrics,
940 index: RowGroupIndex,
941 time_range: FileTimeRange,
942) -> impl Stream<Item = Result<Batch>> {
943 try_stream! {
944 let ranges = stream_ctx.input.build_mem_ranges(index);
945 part_metrics.inc_num_mem_ranges(ranges.len());
946 for range in ranges {
947 let build_reader_start = Instant::now();
948 let mem_scan_metrics = Some(MemScanMetrics::default());
949 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
950 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
951
952 let mut source = Source::Iter(iter);
953 while let Some(batch) = source.next_batch().await? {
954 yield batch;
955 }
956
957 if let Some(ref metrics) = mem_scan_metrics {
959 let data = metrics.data();
960 part_metrics.report_mem_scan_metrics(&data);
961 }
962 }
963 }
964}
965
966pub(crate) fn scan_flat_mem_ranges(
968 stream_ctx: Arc<StreamContext>,
969 part_metrics: PartitionMetrics,
970 index: RowGroupIndex,
971) -> impl Stream<Item = Result<RecordBatch>> {
972 try_stream! {
973 let ranges = stream_ctx.input.build_mem_ranges(index);
974 part_metrics.inc_num_mem_ranges(ranges.len());
975 for range in ranges {
976 let build_reader_start = Instant::now();
977 let mem_scan_metrics = Some(MemScanMetrics::default());
978 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
979 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
980
981 while let Some(record_batch) = iter.next().transpose()? {
982 yield record_batch;
983 }
984
985 if let Some(ref metrics) = mem_scan_metrics {
987 let data = metrics.data();
988 part_metrics.report_mem_scan_metrics(&data);
989 }
990 }
991 }
992}
993
994const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
996const NUM_SERIES_THRESHOLD: u64 = 10240;
998const BATCH_SIZE_THRESHOLD: u64 = 50;
1001
1002pub(crate) fn should_split_flat_batches_for_merge(
1004 stream_ctx: &Arc<StreamContext>,
1005 range_meta: &RangeMeta,
1006) -> bool {
1007 let mut num_files_to_split = 0;
1009 let mut num_mem_rows = 0;
1010 let mut num_mem_series = 0;
1011 for index in &range_meta.row_group_indices {
1015 if stream_ctx.is_mem_range_index(*index) {
1016 let memtable = &stream_ctx.input.memtables[index.index];
1017 let stats = memtable.stats();
1019 num_mem_rows += stats.num_rows();
1020 num_mem_series += stats.series_count();
1021 } else if stream_ctx.is_file_range_index(*index) {
1022 let file_index = index.index - stream_ctx.input.num_memtables();
1024 let file = &stream_ctx.input.files[file_index];
1025 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1026 continue;
1028 }
1029 debug_assert!(file.meta_ref().num_rows > 0);
1030 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1031 return false;
1033 } else {
1034 num_files_to_split += 1;
1035 }
1036 }
1037 }
1039
1040 if num_files_to_split > 0 {
1041 true
1043 } else if num_mem_series > 0 && num_mem_rows > 0 {
1044 can_split_series(num_mem_rows as u64, num_mem_series as u64)
1046 } else {
1047 false
1048 }
1049}
1050
1051fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1052 assert!(num_series > 0);
1053 assert!(num_rows > 0);
1054
1055 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1057}
1058
1059fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1062 if explain_verbose {
1063 ReaderFilterMetrics {
1064 inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1065 bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1066 fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1067 ..Default::default()
1068 }
1069 } else {
1070 ReaderFilterMetrics::default()
1071 }
1072}
1073
1074pub(crate) async fn scan_file_ranges(
1076 stream_ctx: Arc<StreamContext>,
1077 part_metrics: PartitionMetrics,
1078 index: RowGroupIndex,
1079 read_type: &'static str,
1080 range_builder: Arc<RangeBuilderList>,
1081) -> Result<impl Stream<Item = Result<Batch>>> {
1082 let mut reader_metrics = ReaderMetrics {
1083 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1084 ..Default::default()
1085 };
1086 let ranges = range_builder
1087 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1088 .await?;
1089 part_metrics.inc_num_file_ranges(ranges.len());
1090 part_metrics.merge_reader_metrics(&reader_metrics, None);
1091
1092 let init_per_file_metrics = if part_metrics.explain_verbose() {
1094 let file = stream_ctx.input.file_from_index(index);
1095 let file_id = file.file_id();
1096
1097 let mut map = HashMap::new();
1098 map.insert(
1099 file_id,
1100 FileScanMetrics {
1101 build_part_cost: reader_metrics.build_cost,
1102 ..Default::default()
1103 },
1104 );
1105 Some(map)
1106 } else {
1107 None
1108 };
1109
1110 Ok(build_file_range_scan_stream(
1111 stream_ctx,
1112 part_metrics,
1113 read_type,
1114 ranges,
1115 init_per_file_metrics,
1116 ))
1117}
1118
1119pub(crate) async fn scan_flat_file_ranges(
1121 stream_ctx: Arc<StreamContext>,
1122 part_metrics: PartitionMetrics,
1123 index: RowGroupIndex,
1124 read_type: &'static str,
1125 range_builder: Arc<RangeBuilderList>,
1126) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1127 let mut reader_metrics = ReaderMetrics {
1128 filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1129 ..Default::default()
1130 };
1131 let ranges = range_builder
1132 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1133 .await?;
1134 part_metrics.inc_num_file_ranges(ranges.len());
1135 part_metrics.merge_reader_metrics(&reader_metrics, None);
1136
1137 let init_per_file_metrics = if part_metrics.explain_verbose() {
1139 let file = stream_ctx.input.file_from_index(index);
1140 let file_id = file.file_id();
1141
1142 let mut map = HashMap::new();
1143 map.insert(
1144 file_id,
1145 FileScanMetrics {
1146 build_part_cost: reader_metrics.build_cost,
1147 ..Default::default()
1148 },
1149 );
1150 Some(map)
1151 } else {
1152 None
1153 };
1154
1155 Ok(build_flat_file_range_scan_stream(
1156 stream_ctx,
1157 part_metrics,
1158 read_type,
1159 ranges,
1160 init_per_file_metrics,
1161 ))
1162}
1163
1164pub fn build_file_range_scan_stream(
1166 stream_ctx: Arc<StreamContext>,
1167 part_metrics: PartitionMetrics,
1168 read_type: &'static str,
1169 ranges: SmallVec<[FileRange; 2]>,
1170 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1171) -> impl Stream<Item = Result<Batch>> {
1172 try_stream! {
1173 let fetch_metrics = if part_metrics.explain_verbose() {
1174 Some(Arc::new(ParquetFetchMetrics::default()))
1175 } else {
1176 None
1177 };
1178 let reader_metrics = &mut ReaderMetrics {
1179 fetch_metrics: fetch_metrics.clone(),
1180 ..Default::default()
1181 };
1182 for range in ranges {
1183 let build_reader_start = Instant::now();
1184 let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
1185 let build_cost = build_reader_start.elapsed();
1186 part_metrics.inc_build_reader_cost(build_cost);
1187 let compat_batch = range.compat_batch();
1188 let mut source = Source::PruneReader(reader);
1189 while let Some(mut batch) = source.next_batch().await? {
1190 if let Some(compact_batch) = compat_batch {
1191 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1192 }
1193 yield batch;
1194 }
1195 if let Source::PruneReader(reader) = source {
1196 let prune_metrics = reader.metrics();
1197
1198 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1200 let file_id = range.file_handle().file_id();
1201 let file_metrics = file_metrics_map
1202 .entry(file_id)
1203 .or_insert_with(FileScanMetrics::default);
1204
1205 file_metrics.num_ranges += 1;
1206 file_metrics.num_rows += prune_metrics.num_rows;
1207 file_metrics.build_reader_cost += build_cost;
1208 file_metrics.scan_cost += prune_metrics.scan_cost;
1209 }
1210
1211 reader_metrics.merge_from(&prune_metrics);
1212 }
1213 }
1214
1215 reader_metrics.observe_rows(read_type);
1217 reader_metrics.filter_metrics.observe();
1218 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1219 }
1220}
1221
1222pub fn build_flat_file_range_scan_stream(
1224 _stream_ctx: Arc<StreamContext>,
1225 part_metrics: PartitionMetrics,
1226 read_type: &'static str,
1227 ranges: SmallVec<[FileRange; 2]>,
1228 mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1229) -> impl Stream<Item = Result<RecordBatch>> {
1230 try_stream! {
1231 let fetch_metrics = if part_metrics.explain_verbose() {
1232 Some(Arc::new(ParquetFetchMetrics::default()))
1233 } else {
1234 None
1235 };
1236 let reader_metrics = &mut ReaderMetrics {
1237 fetch_metrics: fetch_metrics.clone(),
1238 ..Default::default()
1239 };
1240 for range in ranges {
1241 let build_reader_start = Instant::now();
1242 let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
1243 let build_cost = build_reader_start.elapsed();
1244 part_metrics.inc_build_reader_cost(build_cost);
1245
1246 let may_compat = range
1247 .compat_batch()
1248 .map(|compat| {
1249 compat.as_flat().context(UnexpectedSnafu {
1250 reason: "Invalid compat for flat format",
1251 })
1252 })
1253 .transpose()?;
1254 while let Some(record_batch) = reader.next_batch()? {
1255 if let Some(flat_compat) = may_compat {
1256 let batch = flat_compat.compat(record_batch)?;
1257 yield batch;
1258 } else {
1259 yield record_batch;
1260 }
1261 }
1262
1263 let prune_metrics = reader.metrics();
1264
1265 if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1267 let file_id = range.file_handle().file_id();
1268 let file_metrics = file_metrics_map
1269 .entry(file_id)
1270 .or_insert_with(FileScanMetrics::default);
1271
1272 file_metrics.num_ranges += 1;
1273 file_metrics.num_rows += prune_metrics.num_rows;
1274 file_metrics.build_reader_cost += build_cost;
1275 file_metrics.scan_cost += prune_metrics.scan_cost;
1276 }
1277
1278 reader_metrics.merge_from(&prune_metrics);
1279 }
1280
1281 reader_metrics.observe_rows(read_type);
1283 reader_metrics.filter_metrics.observe();
1284 part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1285 }
1286}
1287
1288#[cfg(feature = "enterprise")]
1290pub(crate) async fn scan_extension_range(
1291 context: Arc<StreamContext>,
1292 index: RowGroupIndex,
1293 partition_metrics: PartitionMetrics,
1294) -> Result<BoxedBatchStream> {
1295 use snafu::ResultExt;
1296
1297 let range = context.input.extension_range(index.index);
1298 let reader = range.reader(context.as_ref());
1299 let stream = reader
1300 .read(context, partition_metrics, index)
1301 .await
1302 .context(crate::error::ScanExternalRangeSnafu)?;
1303 Ok(stream)
1304}
1305
1306pub(crate) async fn maybe_scan_other_ranges(
1307 context: &Arc<StreamContext>,
1308 index: RowGroupIndex,
1309 metrics: &PartitionMetrics,
1310) -> Result<BoxedBatchStream> {
1311 #[cfg(feature = "enterprise")]
1312 {
1313 scan_extension_range(context.clone(), index, metrics.clone()).await
1314 }
1315
1316 #[cfg(not(feature = "enterprise"))]
1317 {
1318 let _ = context;
1319 let _ = index;
1320 let _ = metrics;
1321
1322 crate::error::UnexpectedSnafu {
1323 reason: "no other ranges scannable",
1324 }
1325 .fail()
1326 }
1327}
1328
1329pub(crate) async fn maybe_scan_flat_other_ranges(
1330 context: &Arc<StreamContext>,
1331 index: RowGroupIndex,
1332 metrics: &PartitionMetrics,
1333) -> Result<BoxedRecordBatchStream> {
1334 let _ = context;
1335 let _ = index;
1336 let _ = metrics;
1337
1338 crate::error::UnexpectedSnafu {
1339 reason: "no other ranges scannable in flat format",
1340 }
1341 .fail()
1342}
1343
1344pub(crate) struct SplitRecordBatchStream<S> {
1346 inner: S,
1348 batches: VecDeque<RecordBatch>,
1350}
1351
1352impl<S> SplitRecordBatchStream<S> {
1353 pub(crate) fn new(inner: S) -> Self {
1355 Self {
1356 inner,
1357 batches: VecDeque::new(),
1358 }
1359 }
1360}
1361
1362impl<S> Stream for SplitRecordBatchStream<S>
1363where
1364 S: Stream<Item = Result<RecordBatch>> + Unpin,
1365{
1366 type Item = Result<RecordBatch>;
1367
1368 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1369 loop {
1370 if let Some(batch) = self.batches.pop_front() {
1372 return Poll::Ready(Some(Ok(batch)));
1373 }
1374
1375 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1377 Some(Ok(batch)) => batch,
1378 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1379 None => return Poll::Ready(None),
1380 };
1381
1382 split_record_batch(record_batch, &mut self.batches);
1384 }
1386 }
1387}
1388
1389pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1394 let batch_rows = record_batch.num_rows();
1395 if batch_rows == 0 {
1396 return;
1397 }
1398 if batch_rows < 2 {
1399 batches.push_back(record_batch);
1400 return;
1401 }
1402
1403 let time_index_pos = time_index_column_index(record_batch.num_columns());
1404 let timestamps = record_batch.column(time_index_pos);
1405 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1406 let mut offsets = Vec::with_capacity(16);
1407 offsets.push(0);
1408 let values = ts_values.values();
1409 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1410 if value > values[i + 1] {
1411 offsets.push(i + 1);
1412 }
1413 }
1414 offsets.push(values.len());
1415
1416 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1418 let end = offsets[i + 1];
1419 let rows_in_batch = end - start;
1420 batches.push_back(record_batch.slice(start, rows_in_batch));
1421 }
1422}