1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use store_api::storage::RegionId;
28
29use crate::error::Result;
30use crate::metrics::{
31 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
32 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
33};
34use crate::read::range::{RangeBuilderList, RowGroupIndex};
35use crate::read::scan_region::StreamContext;
36use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source};
37use crate::sst::file::FileTimeRange;
38use crate::sst::parquet::file_range::FileRange;
39use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
40
41#[derive(Default)]
43struct ScanMetricsSet {
44 prepare_scan_cost: Duration,
46 build_reader_cost: Duration,
48 scan_cost: Duration,
50 yield_cost: Duration,
52 total_cost: Duration,
54 num_rows: usize,
56 num_batches: usize,
58 num_mem_ranges: usize,
60 num_file_ranges: usize,
62
63 build_parts_cost: Duration,
66 rg_total: usize,
68 rg_fulltext_filtered: usize,
70 rg_inverted_filtered: usize,
72 rg_minmax_filtered: usize,
74 rg_bloom_filtered: usize,
76 rows_before_filter: usize,
78 rows_fulltext_filtered: usize,
80 rows_inverted_filtered: usize,
82 rows_bloom_filtered: usize,
84 rows_precise_filtered: usize,
86 num_sst_record_batches: usize,
88 num_sst_batches: usize,
90 num_sst_rows: usize,
92
93 first_poll: Duration,
95
96 num_series_send_timeout: usize,
98 num_distributor_rows: usize,
100 num_distributor_batches: usize,
102 distributor_scan_cost: Duration,
104 distributor_yield_cost: Duration,
106}
107
108impl fmt::Debug for ScanMetricsSet {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 let ScanMetricsSet {
111 prepare_scan_cost,
112 build_reader_cost,
113 scan_cost,
114 yield_cost,
115 total_cost,
116 num_rows,
117 num_batches,
118 num_mem_ranges,
119 num_file_ranges,
120 build_parts_cost,
121 rg_total,
122 rg_fulltext_filtered,
123 rg_inverted_filtered,
124 rg_minmax_filtered,
125 rg_bloom_filtered,
126 rows_before_filter,
127 rows_fulltext_filtered,
128 rows_inverted_filtered,
129 rows_bloom_filtered,
130 rows_precise_filtered,
131 num_sst_record_batches,
132 num_sst_batches,
133 num_sst_rows,
134 first_poll,
135 num_series_send_timeout,
136 num_distributor_rows,
137 num_distributor_batches,
138 distributor_scan_cost,
139 distributor_yield_cost,
140 } = self;
141
142 write!(
143 f,
144 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
145 \"build_reader_cost\":\"{build_reader_cost:?}\", \
146 \"scan_cost\":\"{scan_cost:?}\", \
147 \"yield_cost\":\"{yield_cost:?}\", \
148 \"total_cost\":\"{total_cost:?}\", \
149 \"num_rows\":{num_rows}, \
150 \"num_batches\":{num_batches}, \
151 \"num_mem_ranges\":{num_mem_ranges}, \
152 \"num_file_ranges\":{num_file_ranges}, \
153 \"build_parts_cost\":\"{build_parts_cost:?}\", \
154 \"rg_total\":{rg_total}, \
155 \"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
156 \"rg_inverted_filtered\":{rg_inverted_filtered}, \
157 \"rg_minmax_filtered\":{rg_minmax_filtered}, \
158 \"rg_bloom_filtered\":{rg_bloom_filtered}, \
159 \"rows_before_filter\":{rows_before_filter}, \
160 \"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
161 \"rows_inverted_filtered\":{rows_inverted_filtered}, \
162 \"rows_bloom_filtered\":{rows_bloom_filtered}, \
163 \"rows_precise_filtered\":{rows_precise_filtered}, \
164 \"num_sst_record_batches\":{num_sst_record_batches}, \
165 \"num_sst_batches\":{num_sst_batches}, \
166 \"num_sst_rows\":{num_sst_rows}, \
167 \"first_poll\":\"{first_poll:?}\", \
168 \"num_series_send_timeout\":{num_series_send_timeout}, \
169 \"num_distributor_rows\":{num_distributor_rows}, \
170 \"num_distributor_batches\":{num_distributor_batches}, \
171 \"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
172 \"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
173 )
174 }
175}
176impl ScanMetricsSet {
177 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
179 self.prepare_scan_cost += cost;
180 self
181 }
182
183 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
185 let ScannerMetrics {
186 prepare_scan_cost,
187 build_reader_cost,
188 scan_cost,
189 yield_cost,
190 num_batches,
191 num_rows,
192 num_mem_ranges,
193 num_file_ranges,
194 } = other;
195
196 self.prepare_scan_cost += *prepare_scan_cost;
197 self.build_reader_cost += *build_reader_cost;
198 self.scan_cost += *scan_cost;
199 self.yield_cost += *yield_cost;
200 self.num_rows += *num_rows;
201 self.num_batches += *num_batches;
202 self.num_mem_ranges += *num_mem_ranges;
203 self.num_file_ranges += *num_file_ranges;
204 }
205
206 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
208 let ReaderMetrics {
209 build_cost,
210 filter_metrics:
211 ReaderFilterMetrics {
212 rg_total,
213 rg_fulltext_filtered,
214 rg_inverted_filtered,
215 rg_minmax_filtered,
216 rg_bloom_filtered,
217 rows_total,
218 rows_fulltext_filtered,
219 rows_inverted_filtered,
220 rows_bloom_filtered,
221 rows_precise_filtered,
222 },
223 num_record_batches,
224 num_batches,
225 num_rows,
226 scan_cost: _,
227 } = other;
228
229 self.build_parts_cost += *build_cost;
230
231 self.rg_total += *rg_total;
232 self.rg_fulltext_filtered += *rg_fulltext_filtered;
233 self.rg_inverted_filtered += *rg_inverted_filtered;
234 self.rg_minmax_filtered += *rg_minmax_filtered;
235 self.rg_bloom_filtered += *rg_bloom_filtered;
236
237 self.rows_before_filter += *rows_total;
238 self.rows_fulltext_filtered += *rows_fulltext_filtered;
239 self.rows_inverted_filtered += *rows_inverted_filtered;
240 self.rows_bloom_filtered += *rows_bloom_filtered;
241 self.rows_precise_filtered += *rows_precise_filtered;
242
243 self.num_sst_record_batches += *num_record_batches;
244 self.num_sst_batches += *num_batches;
245 self.num_sst_rows += *num_rows;
246 }
247
248 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
250 let SeriesDistributorMetrics {
251 num_series_send_timeout,
252 num_rows,
253 num_batches,
254 scan_cost,
255 yield_cost,
256 } = distributor_metrics;
257
258 self.num_series_send_timeout += *num_series_send_timeout;
259 self.num_distributor_rows += *num_rows;
260 self.num_distributor_batches += *num_batches;
261 self.distributor_scan_cost += *scan_cost;
262 self.distributor_yield_cost += *yield_cost;
263 }
264
265 fn observe_metrics(&self) {
267 READ_STAGE_ELAPSED
268 .with_label_values(&["prepare_scan"])
269 .observe(self.prepare_scan_cost.as_secs_f64());
270 READ_STAGE_ELAPSED
271 .with_label_values(&["build_reader"])
272 .observe(self.build_reader_cost.as_secs_f64());
273 READ_STAGE_ELAPSED
274 .with_label_values(&["scan"])
275 .observe(self.scan_cost.as_secs_f64());
276 READ_STAGE_ELAPSED
277 .with_label_values(&["yield"])
278 .observe(self.yield_cost.as_secs_f64());
279 READ_STAGE_ELAPSED
280 .with_label_values(&["total"])
281 .observe(self.total_cost.as_secs_f64());
282 READ_ROWS_RETURN.observe(self.num_rows as f64);
283 READ_BATCHES_RETURN.observe(self.num_batches as f64);
284
285 READ_STAGE_ELAPSED
286 .with_label_values(&["build_parts"])
287 .observe(self.build_parts_cost.as_secs_f64());
288
289 READ_ROW_GROUPS_TOTAL
290 .with_label_values(&["before_filtering"])
291 .inc_by(self.rg_total as u64);
292 READ_ROW_GROUPS_TOTAL
293 .with_label_values(&["fulltext_index_filtered"])
294 .inc_by(self.rg_fulltext_filtered as u64);
295 READ_ROW_GROUPS_TOTAL
296 .with_label_values(&["inverted_index_filtered"])
297 .inc_by(self.rg_inverted_filtered as u64);
298 READ_ROW_GROUPS_TOTAL
299 .with_label_values(&["minmax_index_filtered"])
300 .inc_by(self.rg_minmax_filtered as u64);
301 READ_ROW_GROUPS_TOTAL
302 .with_label_values(&["bloom_filter_index_filtered"])
303 .inc_by(self.rg_bloom_filtered as u64);
304
305 PRECISE_FILTER_ROWS_TOTAL
306 .with_label_values(&["parquet"])
307 .inc_by(self.rows_precise_filtered as u64);
308 READ_ROWS_IN_ROW_GROUP_TOTAL
309 .with_label_values(&["before_filtering"])
310 .inc_by(self.rows_before_filter as u64);
311 READ_ROWS_IN_ROW_GROUP_TOTAL
312 .with_label_values(&["fulltext_index_filtered"])
313 .inc_by(self.rows_fulltext_filtered as u64);
314 READ_ROWS_IN_ROW_GROUP_TOTAL
315 .with_label_values(&["inverted_index_filtered"])
316 .inc_by(self.rows_inverted_filtered as u64);
317 READ_ROWS_IN_ROW_GROUP_TOTAL
318 .with_label_values(&["bloom_filter_index_filtered"])
319 .inc_by(self.rows_bloom_filtered as u64);
320 }
321}
322
323struct PartitionMetricsInner {
324 region_id: RegionId,
325 partition: usize,
327 scanner_type: &'static str,
329 query_start: Instant,
331 metrics: Mutex<ScanMetricsSet>,
333 in_progress_scan: IntGauge,
334
335 build_parts_cost: Time,
338 build_reader_cost: Time,
340 scan_cost: Time,
342 yield_cost: Time,
344 convert_cost: Time,
346}
347
348impl PartitionMetricsInner {
349 fn on_finish(&self) {
350 let mut metrics = self.metrics.lock().unwrap();
351 if metrics.total_cost.is_zero() {
352 metrics.total_cost = self.query_start.elapsed();
353 }
354 }
355}
356
357impl Drop for PartitionMetricsInner {
358 fn drop(&mut self) {
359 self.on_finish();
360 let metrics = self.metrics.lock().unwrap();
361 metrics.observe_metrics();
362 self.in_progress_scan.dec();
363
364 debug!(
365 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
366 self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
367 );
368 }
369}
370
371#[derive(Default)]
373pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
374
375impl PartitionMetricsList {
376 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
378 let mut list = self.0.lock().unwrap();
379 if list.len() <= partition {
380 list.resize(partition + 1, None);
381 }
382 list[partition] = Some(metrics);
383 }
384
385 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
387 let list = self.0.lock().unwrap();
388 write!(f, ", \"metrics_per_partition\": ")?;
389 f.debug_list()
390 .entries(list.iter().filter_map(|p| p.as_ref()))
391 .finish()?;
392 write!(f, "}}")
393 }
394}
395
396#[derive(Clone)]
398pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
399
400impl PartitionMetrics {
401 pub(crate) fn new(
402 region_id: RegionId,
403 partition: usize,
404 scanner_type: &'static str,
405 query_start: Instant,
406 metrics_set: &ExecutionPlanMetricsSet,
407 ) -> Self {
408 let partition_str = partition.to_string();
409 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
410 in_progress_scan.inc();
411 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
412 let inner = PartitionMetricsInner {
413 region_id,
414 partition,
415 scanner_type,
416 query_start,
417 metrics: Mutex::new(metrics),
418 in_progress_scan,
419 build_parts_cost: MetricBuilder::new(metrics_set)
420 .subset_time("build_parts_cost", partition),
421 build_reader_cost: MetricBuilder::new(metrics_set)
422 .subset_time("build_reader_cost", partition),
423 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
424 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
425 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
426 };
427 Self(Arc::new(inner))
428 }
429
430 pub(crate) fn on_first_poll(&self) {
431 let mut metrics = self.0.metrics.lock().unwrap();
432 metrics.first_poll = self.0.query_start.elapsed();
433 }
434
435 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
436 let mut metrics = self.0.metrics.lock().unwrap();
437 metrics.num_mem_ranges += num;
438 }
439
440 pub fn inc_num_file_ranges(&self, num: usize) {
441 let mut metrics = self.0.metrics.lock().unwrap();
442 metrics.num_file_ranges += num;
443 }
444
445 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
447 self.0.build_reader_cost.add_duration(cost);
448
449 let mut metrics = self.0.metrics.lock().unwrap();
450 metrics.build_reader_cost += cost;
451 }
452
453 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
454 self.0.convert_cost.add_duration(cost);
455 }
456
457 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
459 self.0
460 .build_reader_cost
461 .add_duration(metrics.build_reader_cost);
462 self.0.scan_cost.add_duration(metrics.scan_cost);
463 self.0.yield_cost.add_duration(metrics.yield_cost);
464
465 let mut metrics_set = self.0.metrics.lock().unwrap();
466 metrics_set.merge_scanner_metrics(metrics);
467 }
468
469 pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
471 self.0.build_parts_cost.add_duration(metrics.build_cost);
472
473 let mut metrics_set = self.0.metrics.lock().unwrap();
474 metrics_set.merge_reader_metrics(metrics);
475 }
476
477 pub(crate) fn on_finish(&self) {
479 self.0.on_finish();
480 }
481
482 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
484 let mut metrics_set = self.0.metrics.lock().unwrap();
485 metrics_set.set_distributor_metrics(metrics);
486 }
487}
488
489impl fmt::Debug for PartitionMetrics {
490 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
491 let metrics = self.0.metrics.lock().unwrap();
492 write!(
493 f,
494 r#"{{"partition":{}, "metrics":{:?}}}"#,
495 self.0.partition, metrics
496 )
497 }
498}
499
500#[derive(Default)]
502pub(crate) struct SeriesDistributorMetrics {
503 pub(crate) num_series_send_timeout: usize,
505 pub(crate) num_rows: usize,
507 pub(crate) num_batches: usize,
509 pub(crate) scan_cost: Duration,
511 pub(crate) yield_cost: Duration,
513}
514
515pub(crate) fn scan_mem_ranges(
517 stream_ctx: Arc<StreamContext>,
518 part_metrics: PartitionMetrics,
519 index: RowGroupIndex,
520 time_range: FileTimeRange,
521) -> impl Stream<Item = Result<Batch>> {
522 try_stream! {
523 let ranges = stream_ctx.input.build_mem_ranges(index);
524 part_metrics.inc_num_mem_ranges(ranges.len());
525 for range in ranges {
526 let build_reader_start = Instant::now();
527 let iter = range.build_iter(time_range)?;
528 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
529
530 let mut source = Source::Iter(iter);
531 while let Some(batch) = source.next_batch().await? {
532 yield batch;
533 }
534 }
535 }
536}
537
538pub(crate) async fn scan_file_ranges(
540 stream_ctx: Arc<StreamContext>,
541 part_metrics: PartitionMetrics,
542 index: RowGroupIndex,
543 read_type: &'static str,
544 range_builder: Arc<RangeBuilderList>,
545) -> Result<impl Stream<Item = Result<Batch>>> {
546 let mut reader_metrics = ReaderMetrics::default();
547 let ranges = range_builder
548 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
549 .await?;
550 part_metrics.inc_num_file_ranges(ranges.len());
551 part_metrics.merge_reader_metrics(&reader_metrics);
552
553 Ok(build_file_range_scan_stream(
554 stream_ctx,
555 part_metrics,
556 read_type,
557 ranges,
558 ))
559}
560
561pub fn build_file_range_scan_stream(
563 stream_ctx: Arc<StreamContext>,
564 part_metrics: PartitionMetrics,
565 read_type: &'static str,
566 ranges: SmallVec<[FileRange; 2]>,
567) -> impl Stream<Item = Result<Batch>> {
568 try_stream! {
569 let reader_metrics = &mut ReaderMetrics::default();
570 for range in ranges {
571 let build_reader_start = Instant::now();
572 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
573 let build_cost = build_reader_start.elapsed();
574 part_metrics.inc_build_reader_cost(build_cost);
575 let compat_batch = range.compat_batch();
576 let mut source = Source::PruneReader(reader);
577 while let Some(mut batch) = source.next_batch().await? {
578 if let Some(compact_batch) = compat_batch {
579 batch = compact_batch.compat_batch(batch)?;
580 }
581 yield batch;
582 }
583 if let Source::PruneReader(reader) = source {
584 let prune_metrics = reader.metrics();
585 reader_metrics.merge_from(&prune_metrics);
586 }
587 }
588
589 reader_metrics.observe_rows(read_type);
591 reader_metrics.filter_metrics.observe();
592 part_metrics.merge_reader_metrics(reader_metrics);
593 }
594}
595
596#[cfg(feature = "enterprise")]
598pub(crate) async fn scan_extension_range(
599 context: Arc<StreamContext>,
600 index: RowGroupIndex,
601 metrics: PartitionMetrics,
602) -> Result<BoxedBatchStream> {
603 use snafu::ResultExt;
604
605 let range = context.input.extension_range(index.index);
606 let reader = range.reader(context.as_ref());
607 reader
608 .read(context, metrics, index)
609 .await
610 .context(crate::error::ScanExternalRangeSnafu)
611}
612
613pub(crate) async fn maybe_scan_other_ranges(
614 context: &Arc<StreamContext>,
615 index: RowGroupIndex,
616 metrics: &PartitionMetrics,
617) -> Result<BoxedBatchStream> {
618 #[cfg(feature = "enterprise")]
619 {
620 scan_extension_range(context.clone(), index, metrics.clone()).await
621 }
622
623 #[cfg(not(feature = "enterprise"))]
624 {
625 let _ = context;
626 let _ = index;
627 let _ = metrics;
628
629 crate::error::UnexpectedSnafu {
630 reason: "no other ranges scannable",
631 }
632 .fail()
633 }
634}