1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use futures::Stream;
24use prometheus::IntGauge;
25use smallvec::SmallVec;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::memtable::MemScanMetrics;
30use crate::metrics::{
31 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
32 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
33};
34use crate::read::range::{RangeBuilderList, RowGroupIndex};
35use crate::read::scan_region::StreamContext;
36use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source};
37use crate::sst::file::FileTimeRange;
38use crate::sst::parquet::file_range::FileRange;
39use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
40
41#[derive(Default)]
43pub(crate) struct ScanMetricsSet {
44 prepare_scan_cost: Duration,
46 build_reader_cost: Duration,
48 scan_cost: Duration,
50 yield_cost: Duration,
52 total_cost: Duration,
54 num_rows: usize,
56 num_batches: usize,
58 num_mem_ranges: usize,
60 num_file_ranges: usize,
62
63 mem_scan_cost: Duration,
66 mem_rows: usize,
68 mem_batches: usize,
70 mem_series: usize,
72
73 build_parts_cost: Duration,
76 rg_total: usize,
78 rg_fulltext_filtered: usize,
80 rg_inverted_filtered: usize,
82 rg_minmax_filtered: usize,
84 rg_bloom_filtered: usize,
86 rows_before_filter: usize,
88 rows_fulltext_filtered: usize,
90 rows_inverted_filtered: usize,
92 rows_bloom_filtered: usize,
94 rows_precise_filtered: usize,
96 num_sst_record_batches: usize,
98 num_sst_batches: usize,
100 num_sst_rows: usize,
102
103 first_poll: Duration,
105
106 num_series_send_timeout: usize,
108 num_series_send_full: usize,
110 num_distributor_rows: usize,
112 num_distributor_batches: usize,
114 distributor_scan_cost: Duration,
116 distributor_yield_cost: Duration,
118
119 stream_eof: bool,
121}
122
123impl fmt::Debug for ScanMetricsSet {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 let ScanMetricsSet {
126 prepare_scan_cost,
127 build_reader_cost,
128 scan_cost,
129 yield_cost,
130 total_cost,
131 num_rows,
132 num_batches,
133 num_mem_ranges,
134 num_file_ranges,
135 build_parts_cost,
136 rg_total,
137 rg_fulltext_filtered,
138 rg_inverted_filtered,
139 rg_minmax_filtered,
140 rg_bloom_filtered,
141 rows_before_filter,
142 rows_fulltext_filtered,
143 rows_inverted_filtered,
144 rows_bloom_filtered,
145 rows_precise_filtered,
146 num_sst_record_batches,
147 num_sst_batches,
148 num_sst_rows,
149 first_poll,
150 num_series_send_timeout,
151 num_series_send_full,
152 num_distributor_rows,
153 num_distributor_batches,
154 distributor_scan_cost,
155 distributor_yield_cost,
156 stream_eof,
157 mem_scan_cost,
158 mem_rows,
159 mem_batches,
160 mem_series,
161 } = self;
162
163 write!(
165 f,
166 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
167 \"build_reader_cost\":\"{build_reader_cost:?}\", \
168 \"scan_cost\":\"{scan_cost:?}\", \
169 \"yield_cost\":\"{yield_cost:?}\", \
170 \"total_cost\":\"{total_cost:?}\", \
171 \"num_rows\":{num_rows}, \
172 \"num_batches\":{num_batches}, \
173 \"num_mem_ranges\":{num_mem_ranges}, \
174 \"num_file_ranges\":{num_file_ranges}, \
175 \"build_parts_cost\":\"{build_parts_cost:?}\", \
176 \"rg_total\":{rg_total}, \
177 \"rows_before_filter\":{rows_before_filter}, \
178 \"num_sst_record_batches\":{num_sst_record_batches}, \
179 \"num_sst_batches\":{num_sst_batches}, \
180 \"num_sst_rows\":{num_sst_rows}, \
181 \"first_poll\":\"{first_poll:?}\""
182 )?;
183
184 if *rg_fulltext_filtered > 0 {
186 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
187 }
188 if *rg_inverted_filtered > 0 {
189 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
190 }
191 if *rg_minmax_filtered > 0 {
192 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
193 }
194 if *rg_bloom_filtered > 0 {
195 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
196 }
197 if *rows_fulltext_filtered > 0 {
198 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
199 }
200 if *rows_inverted_filtered > 0 {
201 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
202 }
203 if *rows_bloom_filtered > 0 {
204 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
205 }
206 if *rows_precise_filtered > 0 {
207 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
208 }
209
210 if *num_series_send_timeout > 0 {
212 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
213 }
214 if *num_series_send_full > 0 {
215 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
216 }
217 if *num_distributor_rows > 0 {
218 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
219 }
220 if *num_distributor_batches > 0 {
221 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
222 }
223 if !distributor_scan_cost.is_zero() {
224 write!(
225 f,
226 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
227 )?;
228 }
229 if !distributor_yield_cost.is_zero() {
230 write!(
231 f,
232 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
233 )?;
234 }
235
236 if *mem_rows > 0 {
238 write!(f, ", \"mem_rows\":{mem_rows}")?;
239 }
240 if *mem_batches > 0 {
241 write!(f, ", \"mem_batches\":{mem_batches}")?;
242 }
243 if *mem_series > 0 {
244 write!(f, ", \"mem_series\":{mem_series}")?;
245 }
246 if !mem_scan_cost.is_zero() {
247 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
248 }
249
250 write!(f, ", \"stream_eof\":{stream_eof}}}")
251 }
252}
253impl ScanMetricsSet {
254 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
256 self.prepare_scan_cost += cost;
257 self
258 }
259
260 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
262 let ScannerMetrics {
263 prepare_scan_cost,
264 build_reader_cost,
265 scan_cost,
266 yield_cost,
267 num_batches,
268 num_rows,
269 num_mem_ranges,
270 num_file_ranges,
271 } = other;
272
273 self.prepare_scan_cost += *prepare_scan_cost;
274 self.build_reader_cost += *build_reader_cost;
275 self.scan_cost += *scan_cost;
276 self.yield_cost += *yield_cost;
277 self.num_rows += *num_rows;
278 self.num_batches += *num_batches;
279 self.num_mem_ranges += *num_mem_ranges;
280 self.num_file_ranges += *num_file_ranges;
281 }
282
283 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
285 let ReaderMetrics {
286 build_cost,
287 filter_metrics:
288 ReaderFilterMetrics {
289 rg_total,
290 rg_fulltext_filtered,
291 rg_inverted_filtered,
292 rg_minmax_filtered,
293 rg_bloom_filtered,
294 rows_total,
295 rows_fulltext_filtered,
296 rows_inverted_filtered,
297 rows_bloom_filtered,
298 rows_precise_filtered,
299 },
300 num_record_batches,
301 num_batches,
302 num_rows,
303 scan_cost: _,
304 } = other;
305
306 self.build_parts_cost += *build_cost;
307
308 self.rg_total += *rg_total;
309 self.rg_fulltext_filtered += *rg_fulltext_filtered;
310 self.rg_inverted_filtered += *rg_inverted_filtered;
311 self.rg_minmax_filtered += *rg_minmax_filtered;
312 self.rg_bloom_filtered += *rg_bloom_filtered;
313
314 self.rows_before_filter += *rows_total;
315 self.rows_fulltext_filtered += *rows_fulltext_filtered;
316 self.rows_inverted_filtered += *rows_inverted_filtered;
317 self.rows_bloom_filtered += *rows_bloom_filtered;
318 self.rows_precise_filtered += *rows_precise_filtered;
319
320 self.num_sst_record_batches += *num_record_batches;
321 self.num_sst_batches += *num_batches;
322 self.num_sst_rows += *num_rows;
323 }
324
325 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
327 let SeriesDistributorMetrics {
328 num_series_send_timeout,
329 num_series_send_full,
330 num_rows,
331 num_batches,
332 scan_cost,
333 yield_cost,
334 } = distributor_metrics;
335
336 self.num_series_send_timeout += *num_series_send_timeout;
337 self.num_series_send_full += *num_series_send_full;
338 self.num_distributor_rows += *num_rows;
339 self.num_distributor_batches += *num_batches;
340 self.distributor_scan_cost += *scan_cost;
341 self.distributor_yield_cost += *yield_cost;
342 }
343
344 fn observe_metrics(&self) {
346 READ_STAGE_ELAPSED
347 .with_label_values(&["prepare_scan"])
348 .observe(self.prepare_scan_cost.as_secs_f64());
349 READ_STAGE_ELAPSED
350 .with_label_values(&["build_reader"])
351 .observe(self.build_reader_cost.as_secs_f64());
352 READ_STAGE_ELAPSED
353 .with_label_values(&["scan"])
354 .observe(self.scan_cost.as_secs_f64());
355 READ_STAGE_ELAPSED
356 .with_label_values(&["yield"])
357 .observe(self.yield_cost.as_secs_f64());
358 READ_STAGE_ELAPSED
359 .with_label_values(&["total"])
360 .observe(self.total_cost.as_secs_f64());
361 READ_ROWS_RETURN.observe(self.num_rows as f64);
362 READ_BATCHES_RETURN.observe(self.num_batches as f64);
363
364 READ_STAGE_ELAPSED
365 .with_label_values(&["build_parts"])
366 .observe(self.build_parts_cost.as_secs_f64());
367
368 READ_ROW_GROUPS_TOTAL
369 .with_label_values(&["before_filtering"])
370 .inc_by(self.rg_total as u64);
371 READ_ROW_GROUPS_TOTAL
372 .with_label_values(&["fulltext_index_filtered"])
373 .inc_by(self.rg_fulltext_filtered as u64);
374 READ_ROW_GROUPS_TOTAL
375 .with_label_values(&["inverted_index_filtered"])
376 .inc_by(self.rg_inverted_filtered as u64);
377 READ_ROW_GROUPS_TOTAL
378 .with_label_values(&["minmax_index_filtered"])
379 .inc_by(self.rg_minmax_filtered as u64);
380 READ_ROW_GROUPS_TOTAL
381 .with_label_values(&["bloom_filter_index_filtered"])
382 .inc_by(self.rg_bloom_filtered as u64);
383
384 PRECISE_FILTER_ROWS_TOTAL
385 .with_label_values(&["parquet"])
386 .inc_by(self.rows_precise_filtered as u64);
387 READ_ROWS_IN_ROW_GROUP_TOTAL
388 .with_label_values(&["before_filtering"])
389 .inc_by(self.rows_before_filter as u64);
390 READ_ROWS_IN_ROW_GROUP_TOTAL
391 .with_label_values(&["fulltext_index_filtered"])
392 .inc_by(self.rows_fulltext_filtered as u64);
393 READ_ROWS_IN_ROW_GROUP_TOTAL
394 .with_label_values(&["inverted_index_filtered"])
395 .inc_by(self.rows_inverted_filtered as u64);
396 READ_ROWS_IN_ROW_GROUP_TOTAL
397 .with_label_values(&["bloom_filter_index_filtered"])
398 .inc_by(self.rows_bloom_filtered as u64);
399 }
400}
401
402struct PartitionMetricsInner {
403 region_id: RegionId,
404 partition: usize,
406 scanner_type: &'static str,
408 query_start: Instant,
410 explain_verbose: bool,
412 metrics: Mutex<ScanMetricsSet>,
414 in_progress_scan: IntGauge,
415
416 build_parts_cost: Time,
419 build_reader_cost: Time,
421 scan_cost: Time,
423 yield_cost: Time,
425 convert_cost: Time,
427}
428
429impl PartitionMetricsInner {
430 fn on_finish(&self, stream_eof: bool) {
431 let mut metrics = self.metrics.lock().unwrap();
432 if metrics.total_cost.is_zero() {
433 metrics.total_cost = self.query_start.elapsed();
434 }
435 if !metrics.stream_eof {
436 metrics.stream_eof = stream_eof;
437 }
438 }
439}
440
441impl Drop for PartitionMetricsInner {
442 fn drop(&mut self) {
443 self.on_finish(false);
444 let metrics = self.metrics.lock().unwrap();
445 metrics.observe_metrics();
446 self.in_progress_scan.dec();
447
448 if self.explain_verbose {
449 common_telemetry::info!(
450 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
451 self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
452 );
453 } else {
454 common_telemetry::debug!(
455 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
456 self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
457 );
458 }
459 }
460}
461
462#[derive(Default)]
464pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
465
466impl PartitionMetricsList {
467 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
469 let mut list = self.0.lock().unwrap();
470 if list.len() <= partition {
471 list.resize(partition + 1, None);
472 }
473 list[partition] = Some(metrics);
474 }
475
476 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
478 let list = self.0.lock().unwrap();
479 write!(f, ", \"metrics_per_partition\": ")?;
480 f.debug_list()
481 .entries(list.iter().filter_map(|p| p.as_ref()))
482 .finish()?;
483 write!(f, "}}")
484 }
485}
486
487#[derive(Clone)]
489pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
490
491impl PartitionMetrics {
492 pub(crate) fn new(
493 region_id: RegionId,
494 partition: usize,
495 scanner_type: &'static str,
496 query_start: Instant,
497 explain_verbose: bool,
498 metrics_set: &ExecutionPlanMetricsSet,
499 ) -> Self {
500 let partition_str = partition.to_string();
501 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
502 in_progress_scan.inc();
503 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
504 let inner = PartitionMetricsInner {
505 region_id,
506 partition,
507 scanner_type,
508 query_start,
509 explain_verbose,
510 metrics: Mutex::new(metrics),
511 in_progress_scan,
512 build_parts_cost: MetricBuilder::new(metrics_set)
513 .subset_time("build_parts_cost", partition),
514 build_reader_cost: MetricBuilder::new(metrics_set)
515 .subset_time("build_reader_cost", partition),
516 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
517 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
518 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
519 };
520 Self(Arc::new(inner))
521 }
522
523 pub(crate) fn on_first_poll(&self) {
524 let mut metrics = self.0.metrics.lock().unwrap();
525 metrics.first_poll = self.0.query_start.elapsed();
526 }
527
528 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
529 let mut metrics = self.0.metrics.lock().unwrap();
530 metrics.num_mem_ranges += num;
531 }
532
533 pub fn inc_num_file_ranges(&self, num: usize) {
534 let mut metrics = self.0.metrics.lock().unwrap();
535 metrics.num_file_ranges += num;
536 }
537
538 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
540 self.0.build_reader_cost.add_duration(cost);
541
542 let mut metrics = self.0.metrics.lock().unwrap();
543 metrics.build_reader_cost += cost;
544 }
545
546 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
547 self.0.convert_cost.add_duration(cost);
548 }
549
550 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
552 let mut metrics = self.0.metrics.lock().unwrap();
553 metrics.mem_scan_cost += data.scan_cost;
554 metrics.mem_rows += data.num_rows;
555 metrics.mem_batches += data.num_batches;
556 metrics.mem_series += data.total_series;
557 }
558
559 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
561 self.0
562 .build_reader_cost
563 .add_duration(metrics.build_reader_cost);
564 self.0.scan_cost.add_duration(metrics.scan_cost);
565 self.0.yield_cost.add_duration(metrics.yield_cost);
566
567 let mut metrics_set = self.0.metrics.lock().unwrap();
568 metrics_set.merge_scanner_metrics(metrics);
569 }
570
571 pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
573 self.0.build_parts_cost.add_duration(metrics.build_cost);
574
575 let mut metrics_set = self.0.metrics.lock().unwrap();
576 metrics_set.merge_reader_metrics(metrics);
577 }
578
579 pub(crate) fn on_finish(&self) {
581 self.0.on_finish(true);
582 }
583
584 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
586 let mut metrics_set = self.0.metrics.lock().unwrap();
587 metrics_set.set_distributor_metrics(metrics);
588 }
589}
590
591impl fmt::Debug for PartitionMetrics {
592 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593 let metrics = self.0.metrics.lock().unwrap();
594 write!(
595 f,
596 r#"{{"partition":{}, "metrics":{:?}}}"#,
597 self.0.partition, metrics
598 )
599 }
600}
601
602#[derive(Default)]
604pub(crate) struct SeriesDistributorMetrics {
605 pub(crate) num_series_send_timeout: usize,
607 pub(crate) num_series_send_full: usize,
609 pub(crate) num_rows: usize,
611 pub(crate) num_batches: usize,
613 pub(crate) scan_cost: Duration,
615 pub(crate) yield_cost: Duration,
617}
618
619pub(crate) fn scan_mem_ranges(
621 stream_ctx: Arc<StreamContext>,
622 part_metrics: PartitionMetrics,
623 index: RowGroupIndex,
624 time_range: FileTimeRange,
625) -> impl Stream<Item = Result<Batch>> {
626 try_stream! {
627 let ranges = stream_ctx.input.build_mem_ranges(index);
628 part_metrics.inc_num_mem_ranges(ranges.len());
629 for range in ranges {
630 let build_reader_start = Instant::now();
631 let mem_scan_metrics = Some(MemScanMetrics::default());
632 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
633 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
634
635 let mut source = Source::Iter(iter);
636 while let Some(batch) = source.next_batch().await? {
637 yield batch;
638 }
639
640 if let Some(ref metrics) = mem_scan_metrics {
642 let data = metrics.data();
643 part_metrics.report_mem_scan_metrics(&data);
644 }
645 }
646 }
647}
648
649pub(crate) async fn scan_file_ranges(
651 stream_ctx: Arc<StreamContext>,
652 part_metrics: PartitionMetrics,
653 index: RowGroupIndex,
654 read_type: &'static str,
655 range_builder: Arc<RangeBuilderList>,
656) -> Result<impl Stream<Item = Result<Batch>>> {
657 let mut reader_metrics = ReaderMetrics::default();
658 let ranges = range_builder
659 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
660 .await?;
661 part_metrics.inc_num_file_ranges(ranges.len());
662 part_metrics.merge_reader_metrics(&reader_metrics);
663
664 Ok(build_file_range_scan_stream(
665 stream_ctx,
666 part_metrics,
667 read_type,
668 ranges,
669 ))
670}
671
672pub fn build_file_range_scan_stream(
674 stream_ctx: Arc<StreamContext>,
675 part_metrics: PartitionMetrics,
676 read_type: &'static str,
677 ranges: SmallVec<[FileRange; 2]>,
678) -> impl Stream<Item = Result<Batch>> {
679 try_stream! {
680 let reader_metrics = &mut ReaderMetrics::default();
681 for range in ranges {
682 let build_reader_start = Instant::now();
683 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
684 let build_cost = build_reader_start.elapsed();
685 part_metrics.inc_build_reader_cost(build_cost);
686 let compat_batch = range.compat_batch();
687 let mut source = Source::PruneReader(reader);
688 while let Some(mut batch) = source.next_batch().await? {
689 if let Some(compact_batch) = compat_batch {
690 batch = compact_batch.compat_batch(batch)?;
691 }
692 yield batch;
693 }
694 if let Source::PruneReader(reader) = source {
695 let prune_metrics = reader.metrics();
696 reader_metrics.merge_from(&prune_metrics);
697 }
698 }
699
700 reader_metrics.observe_rows(read_type);
702 reader_metrics.filter_metrics.observe();
703 part_metrics.merge_reader_metrics(reader_metrics);
704 }
705}
706
707#[cfg(feature = "enterprise")]
709pub(crate) async fn scan_extension_range(
710 context: Arc<StreamContext>,
711 index: RowGroupIndex,
712 partition_metrics: PartitionMetrics,
713) -> Result<BoxedBatchStream> {
714 use snafu::ResultExt;
715
716 let range = context.input.extension_range(index.index);
717 let reader = range.reader(context.as_ref());
718 let stream = reader
719 .read(context, partition_metrics, index)
720 .await
721 .context(crate::error::ScanExternalRangeSnafu)?;
722 Ok(stream)
723}
724
725pub(crate) async fn maybe_scan_other_ranges(
726 context: &Arc<StreamContext>,
727 index: RowGroupIndex,
728 metrics: &PartitionMetrics,
729) -> Result<BoxedBatchStream> {
730 #[cfg(feature = "enterprise")]
731 {
732 scan_extension_range(context.clone(), index, metrics.clone()).await
733 }
734
735 #[cfg(not(feature = "enterprise"))]
736 {
737 let _ = context;
738 let _ = index;
739 let _ = metrics;
740
741 crate::error::UnexpectedSnafu {
742 reason: "no other ranges scannable",
743 }
744 .fail()
745 }
746}