1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::metrics::{
30 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
31 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
32};
33use crate::read::range::{RangeBuilderList, RowGroupIndex};
34use crate::read::scan_region::StreamContext;
35use crate::read::{Batch, ScannerMetrics, Source};
36use crate::sst::file::FileTimeRange;
37use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
38
39#[derive(Default)]
41struct ScanMetricsSet {
42 prepare_scan_cost: Duration,
44 build_reader_cost: Duration,
46 scan_cost: Duration,
48 convert_cost: Duration,
50 yield_cost: Duration,
52 total_cost: Duration,
54 num_rows: usize,
56 num_batches: usize,
58 num_mem_ranges: usize,
60 num_file_ranges: usize,
62
63 build_parts_cost: Duration,
66 rg_total: usize,
68 rg_fulltext_filtered: usize,
70 rg_inverted_filtered: usize,
72 rg_minmax_filtered: usize,
74 rg_bloom_filtered: usize,
76 rows_before_filter: usize,
78 rows_fulltext_filtered: usize,
80 rows_inverted_filtered: usize,
82 rows_bloom_filtered: usize,
84 rows_precise_filtered: usize,
86 num_sst_record_batches: usize,
88 num_sst_batches: usize,
90 num_sst_rows: usize,
92
93 first_poll: Duration,
95
96 num_series_send_timeout: usize,
98 num_distributor_rows: usize,
100 num_distributor_batches: usize,
102 distributor_scan_cost: Duration,
104 distributor_yield_cost: Duration,
106}
107
108impl fmt::Debug for ScanMetricsSet {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 let ScanMetricsSet {
111 prepare_scan_cost,
112 build_reader_cost,
113 scan_cost,
114 convert_cost,
115 yield_cost,
116 total_cost,
117 num_rows,
118 num_batches,
119 num_mem_ranges,
120 num_file_ranges,
121 build_parts_cost,
122 rg_total,
123 rg_fulltext_filtered,
124 rg_inverted_filtered,
125 rg_minmax_filtered,
126 rg_bloom_filtered,
127 rows_before_filter,
128 rows_fulltext_filtered,
129 rows_inverted_filtered,
130 rows_bloom_filtered,
131 rows_precise_filtered,
132 num_sst_record_batches,
133 num_sst_batches,
134 num_sst_rows,
135 first_poll,
136 num_series_send_timeout,
137 num_distributor_rows,
138 num_distributor_batches,
139 distributor_scan_cost,
140 distributor_yield_cost,
141 } = self;
142
143 write!(
144 f,
145 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
146 \"build_reader_cost\":\"{build_reader_cost:?}\", \
147 \"scan_cost\":\"{scan_cost:?}\", \
148 \"convert_cost\":\"{convert_cost:?}\", \
149 \"yield_cost\":\"{yield_cost:?}\", \
150 \"total_cost\":\"{total_cost:?}\", \
151 \"num_rows\":{num_rows}, \
152 \"num_batches\":{num_batches}, \
153 \"num_mem_ranges\":{num_mem_ranges}, \
154 \"num_file_ranges\":{num_file_ranges}, \
155 \"build_parts_cost\":\"{build_parts_cost:?}\", \
156 \"rg_total\":{rg_total}, \
157 \"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
158 \"rg_inverted_filtered\":{rg_inverted_filtered}, \
159 \"rg_minmax_filtered\":{rg_minmax_filtered}, \
160 \"rg_bloom_filtered\":{rg_bloom_filtered}, \
161 \"rows_before_filter\":{rows_before_filter}, \
162 \"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
163 \"rows_inverted_filtered\":{rows_inverted_filtered}, \
164 \"rows_bloom_filtered\":{rows_bloom_filtered}, \
165 \"rows_precise_filtered\":{rows_precise_filtered}, \
166 \"num_sst_record_batches\":{num_sst_record_batches}, \
167 \"num_sst_batches\":{num_sst_batches}, \
168 \"num_sst_rows\":{num_sst_rows}, \
169 \"first_poll\":\"{first_poll:?}\", \
170 \"num_series_send_timeout\":{num_series_send_timeout}, \
171 \"num_distributor_rows\":{num_distributor_rows}, \
172 \"num_distributor_batches\":{num_distributor_batches}, \
173 \"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
174 \"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
175 )
176 }
177}
178impl ScanMetricsSet {
179 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
181 self.prepare_scan_cost += cost;
182 self
183 }
184
185 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
187 let ScannerMetrics {
188 prepare_scan_cost,
189 build_reader_cost,
190 scan_cost,
191 convert_cost,
192 yield_cost,
193 num_batches,
194 num_rows,
195 num_mem_ranges,
196 num_file_ranges,
197 } = other;
198
199 self.prepare_scan_cost += *prepare_scan_cost;
200 self.build_reader_cost += *build_reader_cost;
201 self.scan_cost += *scan_cost;
202 self.convert_cost += *convert_cost;
203 self.yield_cost += *yield_cost;
204 self.num_rows += *num_rows;
205 self.num_batches += *num_batches;
206 self.num_mem_ranges += *num_mem_ranges;
207 self.num_file_ranges += *num_file_ranges;
208 }
209
210 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
212 let ReaderMetrics {
213 build_cost,
214 filter_metrics:
215 ReaderFilterMetrics {
216 rg_total,
217 rg_fulltext_filtered,
218 rg_inverted_filtered,
219 rg_minmax_filtered,
220 rg_bloom_filtered,
221 rows_total,
222 rows_fulltext_filtered,
223 rows_inverted_filtered,
224 rows_bloom_filtered,
225 rows_precise_filtered,
226 },
227 num_record_batches,
228 num_batches,
229 num_rows,
230 scan_cost: _,
231 } = other;
232
233 self.build_parts_cost += *build_cost;
234
235 self.rg_total += *rg_total;
236 self.rg_fulltext_filtered += *rg_fulltext_filtered;
237 self.rg_inverted_filtered += *rg_inverted_filtered;
238 self.rg_minmax_filtered += *rg_minmax_filtered;
239 self.rg_bloom_filtered += *rg_bloom_filtered;
240
241 self.rows_before_filter += *rows_total;
242 self.rows_fulltext_filtered += *rows_fulltext_filtered;
243 self.rows_inverted_filtered += *rows_inverted_filtered;
244 self.rows_bloom_filtered += *rows_bloom_filtered;
245 self.rows_precise_filtered += *rows_precise_filtered;
246
247 self.num_sst_record_batches += *num_record_batches;
248 self.num_sst_batches += *num_batches;
249 self.num_sst_rows += *num_rows;
250 }
251
252 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
254 let SeriesDistributorMetrics {
255 num_series_send_timeout,
256 num_rows,
257 num_batches,
258 scan_cost,
259 yield_cost,
260 } = distributor_metrics;
261
262 self.num_series_send_timeout += *num_series_send_timeout;
263 self.num_distributor_rows += *num_rows;
264 self.num_distributor_batches += *num_batches;
265 self.distributor_scan_cost += *scan_cost;
266 self.distributor_yield_cost += *yield_cost;
267 }
268
269 fn observe_metrics(&self) {
271 READ_STAGE_ELAPSED
272 .with_label_values(&["prepare_scan"])
273 .observe(self.prepare_scan_cost.as_secs_f64());
274 READ_STAGE_ELAPSED
275 .with_label_values(&["build_reader"])
276 .observe(self.build_reader_cost.as_secs_f64());
277 READ_STAGE_ELAPSED
278 .with_label_values(&["convert_rb"])
279 .observe(self.convert_cost.as_secs_f64());
280 READ_STAGE_ELAPSED
281 .with_label_values(&["scan"])
282 .observe(self.scan_cost.as_secs_f64());
283 READ_STAGE_ELAPSED
284 .with_label_values(&["yield"])
285 .observe(self.yield_cost.as_secs_f64());
286 READ_STAGE_ELAPSED
287 .with_label_values(&["total"])
288 .observe(self.total_cost.as_secs_f64());
289 READ_ROWS_RETURN.observe(self.num_rows as f64);
290 READ_BATCHES_RETURN.observe(self.num_batches as f64);
291
292 READ_STAGE_ELAPSED
293 .with_label_values(&["build_parts"])
294 .observe(self.build_parts_cost.as_secs_f64());
295
296 READ_ROW_GROUPS_TOTAL
297 .with_label_values(&["before_filtering"])
298 .inc_by(self.rg_total as u64);
299 READ_ROW_GROUPS_TOTAL
300 .with_label_values(&["fulltext_index_filtered"])
301 .inc_by(self.rg_fulltext_filtered as u64);
302 READ_ROW_GROUPS_TOTAL
303 .with_label_values(&["inverted_index_filtered"])
304 .inc_by(self.rg_inverted_filtered as u64);
305 READ_ROW_GROUPS_TOTAL
306 .with_label_values(&["minmax_index_filtered"])
307 .inc_by(self.rg_minmax_filtered as u64);
308 READ_ROW_GROUPS_TOTAL
309 .with_label_values(&["bloom_filter_index_filtered"])
310 .inc_by(self.rg_bloom_filtered as u64);
311
312 PRECISE_FILTER_ROWS_TOTAL
313 .with_label_values(&["parquet"])
314 .inc_by(self.rows_precise_filtered as u64);
315 READ_ROWS_IN_ROW_GROUP_TOTAL
316 .with_label_values(&["before_filtering"])
317 .inc_by(self.rows_before_filter as u64);
318 READ_ROWS_IN_ROW_GROUP_TOTAL
319 .with_label_values(&["fulltext_index_filtered"])
320 .inc_by(self.rows_fulltext_filtered as u64);
321 READ_ROWS_IN_ROW_GROUP_TOTAL
322 .with_label_values(&["inverted_index_filtered"])
323 .inc_by(self.rows_inverted_filtered as u64);
324 READ_ROWS_IN_ROW_GROUP_TOTAL
325 .with_label_values(&["bloom_filter_index_filtered"])
326 .inc_by(self.rows_bloom_filtered as u64);
327 }
328}
329
330struct PartitionMetricsInner {
331 region_id: RegionId,
332 partition: usize,
334 scanner_type: &'static str,
336 query_start: Instant,
338 metrics: Mutex<ScanMetricsSet>,
340 in_progress_scan: IntGauge,
341
342 build_parts_cost: Time,
345 build_reader_cost: Time,
347 scan_cost: Time,
349 yield_cost: Time,
351}
352
353impl PartitionMetricsInner {
354 fn on_finish(&self) {
355 let mut metrics = self.metrics.lock().unwrap();
356 if metrics.total_cost.is_zero() {
357 metrics.total_cost = self.query_start.elapsed();
358 }
359 }
360}
361
362impl Drop for PartitionMetricsInner {
363 fn drop(&mut self) {
364 self.on_finish();
365 let metrics = self.metrics.lock().unwrap();
366 metrics.observe_metrics();
367 self.in_progress_scan.dec();
368
369 debug!(
370 "{} finished, region_id: {}, partition: {}, metrics: {:?}",
371 self.scanner_type, self.region_id, self.partition, metrics
372 );
373 }
374}
375
376#[derive(Default)]
378pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
379
380impl PartitionMetricsList {
381 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
383 let mut list = self.0.lock().unwrap();
384 if list.len() <= partition {
385 list.resize(partition + 1, None);
386 }
387 list[partition] = Some(metrics);
388 }
389
390 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
392 let list = self.0.lock().unwrap();
393 write!(f, ", \"metrics_per_partition\": ")?;
394 f.debug_list()
395 .entries(list.iter().filter_map(|p| p.as_ref()))
396 .finish()?;
397 write!(f, "}}")
398 }
399}
400
401#[derive(Clone)]
403pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
404
405impl PartitionMetrics {
406 pub(crate) fn new(
407 region_id: RegionId,
408 partition: usize,
409 scanner_type: &'static str,
410 query_start: Instant,
411 metrics_set: &ExecutionPlanMetricsSet,
412 ) -> Self {
413 let partition_str = partition.to_string();
414 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
415 in_progress_scan.inc();
416 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
417 let inner = PartitionMetricsInner {
418 region_id,
419 partition,
420 scanner_type,
421 query_start,
422 metrics: Mutex::new(metrics),
423 in_progress_scan,
424 build_parts_cost: MetricBuilder::new(metrics_set)
425 .subset_time("build_parts_cost", partition),
426 build_reader_cost: MetricBuilder::new(metrics_set)
427 .subset_time("build_reader_cost", partition),
428 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
429 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
430 };
431 Self(Arc::new(inner))
432 }
433
434 pub(crate) fn on_first_poll(&self) {
435 let mut metrics = self.0.metrics.lock().unwrap();
436 metrics.first_poll = self.0.query_start.elapsed();
437 }
438
439 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
440 let mut metrics = self.0.metrics.lock().unwrap();
441 metrics.num_mem_ranges += num;
442 }
443
444 pub(crate) fn inc_num_file_ranges(&self, num: usize) {
445 let mut metrics = self.0.metrics.lock().unwrap();
446 metrics.num_file_ranges += num;
447 }
448
449 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
451 self.0.build_reader_cost.add_duration(cost);
452
453 let mut metrics = self.0.metrics.lock().unwrap();
454 metrics.build_reader_cost += cost;
455 }
456
457 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
459 self.0
460 .build_reader_cost
461 .add_duration(metrics.build_reader_cost);
462 self.0.scan_cost.add_duration(metrics.scan_cost);
463 self.0.yield_cost.add_duration(metrics.yield_cost);
464
465 let mut metrics_set = self.0.metrics.lock().unwrap();
466 metrics_set.merge_scanner_metrics(metrics);
467 }
468
469 pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
471 self.0.build_parts_cost.add_duration(metrics.build_cost);
472
473 let mut metrics_set = self.0.metrics.lock().unwrap();
474 metrics_set.merge_reader_metrics(metrics);
475 }
476
477 pub(crate) fn on_finish(&self) {
479 self.0.on_finish();
480 }
481
482 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
484 let mut metrics_set = self.0.metrics.lock().unwrap();
485 metrics_set.set_distributor_metrics(metrics);
486 }
487}
488
489impl fmt::Debug for PartitionMetrics {
490 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
491 let metrics = self.0.metrics.lock().unwrap();
492 write!(
493 f,
494 r#"{{"partition":{}, "metrics":{:?}}}"#,
495 self.0.partition, metrics
496 )
497 }
498}
499
500#[derive(Default)]
502pub(crate) struct SeriesDistributorMetrics {
503 pub(crate) num_series_send_timeout: usize,
505 pub(crate) num_rows: usize,
507 pub(crate) num_batches: usize,
509 pub(crate) scan_cost: Duration,
511 pub(crate) yield_cost: Duration,
513}
514
515pub(crate) fn scan_mem_ranges(
517 stream_ctx: Arc<StreamContext>,
518 part_metrics: PartitionMetrics,
519 index: RowGroupIndex,
520 time_range: FileTimeRange,
521) -> impl Stream<Item = Result<Batch>> {
522 try_stream! {
523 let ranges = stream_ctx.input.build_mem_ranges(index);
524 part_metrics.inc_num_mem_ranges(ranges.len());
525 for range in ranges {
526 let build_reader_start = Instant::now();
527 let iter = range.build_iter(time_range)?;
528 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
529
530 let mut source = Source::Iter(iter);
531 while let Some(batch) = source.next_batch().await? {
532 yield batch;
533 }
534 }
535 }
536}
537
538pub(crate) fn scan_file_ranges(
540 stream_ctx: Arc<StreamContext>,
541 part_metrics: PartitionMetrics,
542 index: RowGroupIndex,
543 read_type: &'static str,
544 range_builder: Arc<RangeBuilderList>,
545) -> impl Stream<Item = Result<Batch>> {
546 try_stream! {
547 let mut reader_metrics = ReaderMetrics::default();
548 let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
549 part_metrics.inc_num_file_ranges(ranges.len());
550
551 for range in ranges {
552 let build_reader_start = Instant::now();
553 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
554 let build_cost = build_reader_start.elapsed();
555 part_metrics.inc_build_reader_cost(build_cost);
556 let compat_batch = range.compat_batch();
557 let mut source = Source::PruneReader(reader);
558 while let Some(mut batch) = source.next_batch().await? {
559 if let Some(compact_batch) = compat_batch {
560 batch = compact_batch.compat_batch(batch)?;
561 }
562 yield batch;
563 }
564 if let Source::PruneReader(reader) = source {
565 let prune_metrics = reader.metrics();
566 reader_metrics.merge_from(&prune_metrics);
567 }
568 }
569
570 reader_metrics.observe_rows(read_type);
572 reader_metrics.filter_metrics.observe();
573 part_metrics.merge_reader_metrics(&reader_metrics);
574 }
575}