1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::metrics::{
30 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
31 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
32};
33use crate::read::range::{RangeBuilderList, RowGroupIndex};
34use crate::read::scan_region::StreamContext;
35use crate::read::{Batch, ScannerMetrics, Source};
36use crate::sst::file::FileTimeRange;
37use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
38
39#[derive(Default)]
41struct ScanMetricsSet {
42 prepare_scan_cost: Duration,
44 build_reader_cost: Duration,
46 scan_cost: Duration,
48 convert_cost: Duration,
50 yield_cost: Duration,
52 total_cost: Duration,
54 num_rows: usize,
56 num_batches: usize,
58 num_mem_ranges: usize,
60 num_file_ranges: usize,
62
63 build_parts_cost: Duration,
66 rg_total: usize,
68 rg_fulltext_filtered: usize,
70 rg_inverted_filtered: usize,
72 rg_minmax_filtered: usize,
74 rg_bloom_filtered: usize,
76 rows_before_filter: usize,
78 rows_fulltext_filtered: usize,
80 rows_inverted_filtered: usize,
82 rows_bloom_filtered: usize,
84 rows_precise_filtered: usize,
86 num_sst_record_batches: usize,
88 num_sst_batches: usize,
90 num_sst_rows: usize,
92
93 first_poll: Duration,
95
96 num_series_send_timeout: usize,
98 num_distributor_rows: usize,
100 num_distributor_batches: usize,
102 distributor_scan_cost: Duration,
104 distributor_yield_cost: Duration,
106}
107
108impl fmt::Debug for ScanMetricsSet {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 let ScanMetricsSet {
111 prepare_scan_cost,
112 build_reader_cost,
113 scan_cost,
114 convert_cost,
115 yield_cost,
116 total_cost,
117 num_rows,
118 num_batches,
119 num_mem_ranges,
120 num_file_ranges,
121 build_parts_cost,
122 rg_total,
123 rg_fulltext_filtered,
124 rg_inverted_filtered,
125 rg_minmax_filtered,
126 rg_bloom_filtered,
127 rows_before_filter,
128 rows_fulltext_filtered,
129 rows_inverted_filtered,
130 rows_bloom_filtered,
131 rows_precise_filtered,
132 num_sst_record_batches,
133 num_sst_batches,
134 num_sst_rows,
135 first_poll,
136 num_series_send_timeout,
137 num_distributor_rows,
138 num_distributor_batches,
139 distributor_scan_cost,
140 distributor_yield_cost,
141 } = self;
142
143 write!(
144 f,
145 "{{prepare_scan_cost={prepare_scan_cost:?}, \
146 build_reader_cost={build_reader_cost:?}, \
147 scan_cost={scan_cost:?}, \
148 convert_cost={convert_cost:?}, \
149 yield_cost={yield_cost:?}, \
150 total_cost={total_cost:?}, \
151 num_rows={num_rows}, \
152 num_batches={num_batches}, \
153 num_mem_ranges={num_mem_ranges}, \
154 num_file_ranges={num_file_ranges}, \
155 build_parts_cost={build_parts_cost:?}, \
156 rg_total={rg_total}, \
157 rg_fulltext_filtered={rg_fulltext_filtered}, \
158 rg_inverted_filtered={rg_inverted_filtered}, \
159 rg_minmax_filtered={rg_minmax_filtered}, \
160 rg_bloom_filtered={rg_bloom_filtered}, \
161 rows_before_filter={rows_before_filter}, \
162 rows_fulltext_filtered={rows_fulltext_filtered}, \
163 rows_inverted_filtered={rows_inverted_filtered}, \
164 rows_bloom_filtered={rows_bloom_filtered}, \
165 rows_precise_filtered={rows_precise_filtered}, \
166 num_sst_record_batches={num_sst_record_batches}, \
167 num_sst_batches={num_sst_batches}, \
168 num_sst_rows={num_sst_rows}, \
169 first_poll={first_poll:?}, \
170 num_series_send_timeout={num_series_send_timeout}, \
171 num_distributor_rows={num_distributor_rows}, \
172 num_distributor_batches={num_distributor_batches}, \
173 distributor_scan_cost={distributor_scan_cost:?}, \
174 distributor_yield_cost={distributor_yield_cost:?}}},"
175 )
176 }
177}
178impl ScanMetricsSet {
179 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
181 self.prepare_scan_cost += cost;
182 self
183 }
184
185 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
187 let ScannerMetrics {
188 prepare_scan_cost,
189 build_reader_cost,
190 scan_cost,
191 convert_cost,
192 yield_cost,
193 num_batches,
194 num_rows,
195 num_mem_ranges,
196 num_file_ranges,
197 } = other;
198
199 self.prepare_scan_cost += *prepare_scan_cost;
200 self.build_reader_cost += *build_reader_cost;
201 self.scan_cost += *scan_cost;
202 self.convert_cost += *convert_cost;
203 self.yield_cost += *yield_cost;
204 self.num_rows += *num_rows;
205 self.num_batches += *num_batches;
206 self.num_mem_ranges += *num_mem_ranges;
207 self.num_file_ranges += *num_file_ranges;
208 }
209
210 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
212 let ReaderMetrics {
213 build_cost,
214 filter_metrics:
215 ReaderFilterMetrics {
216 rg_total,
217 rg_fulltext_filtered,
218 rg_inverted_filtered,
219 rg_minmax_filtered,
220 rg_bloom_filtered,
221 rows_total,
222 rows_fulltext_filtered,
223 rows_inverted_filtered,
224 rows_bloom_filtered,
225 rows_precise_filtered,
226 },
227 num_record_batches,
228 num_batches,
229 num_rows,
230 scan_cost: _,
231 } = other;
232
233 self.build_parts_cost += *build_cost;
234
235 self.rg_total += *rg_total;
236 self.rg_fulltext_filtered += *rg_fulltext_filtered;
237 self.rg_inverted_filtered += *rg_inverted_filtered;
238 self.rg_minmax_filtered += *rg_minmax_filtered;
239 self.rg_bloom_filtered += *rg_bloom_filtered;
240
241 self.rows_before_filter += *rows_total;
242 self.rows_fulltext_filtered += *rows_fulltext_filtered;
243 self.rows_inverted_filtered += *rows_inverted_filtered;
244 self.rows_bloom_filtered += *rows_bloom_filtered;
245 self.rows_precise_filtered += *rows_precise_filtered;
246
247 self.num_sst_record_batches += *num_record_batches;
248 self.num_sst_batches += *num_batches;
249 self.num_sst_rows += *num_rows;
250 }
251
252 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
254 let SeriesDistributorMetrics {
255 num_series_send_timeout,
256 num_rows,
257 num_batches,
258 scan_cost,
259 yield_cost,
260 } = distributor_metrics;
261
262 self.num_series_send_timeout += *num_series_send_timeout;
263 self.num_distributor_rows += *num_rows;
264 self.num_distributor_batches += *num_batches;
265 self.distributor_scan_cost += *scan_cost;
266 self.distributor_yield_cost += *yield_cost;
267 }
268
269 fn observe_metrics(&self) {
271 READ_STAGE_ELAPSED
272 .with_label_values(&["prepare_scan"])
273 .observe(self.prepare_scan_cost.as_secs_f64());
274 READ_STAGE_ELAPSED
275 .with_label_values(&["build_reader"])
276 .observe(self.build_reader_cost.as_secs_f64());
277 READ_STAGE_ELAPSED
278 .with_label_values(&["convert_rb"])
279 .observe(self.convert_cost.as_secs_f64());
280 READ_STAGE_ELAPSED
281 .with_label_values(&["scan"])
282 .observe(self.scan_cost.as_secs_f64());
283 READ_STAGE_ELAPSED
284 .with_label_values(&["yield"])
285 .observe(self.yield_cost.as_secs_f64());
286 READ_STAGE_ELAPSED
287 .with_label_values(&["total"])
288 .observe(self.total_cost.as_secs_f64());
289 READ_ROWS_RETURN.observe(self.num_rows as f64);
290 READ_BATCHES_RETURN.observe(self.num_batches as f64);
291
292 READ_STAGE_ELAPSED
293 .with_label_values(&["build_parts"])
294 .observe(self.build_parts_cost.as_secs_f64());
295
296 READ_ROW_GROUPS_TOTAL
297 .with_label_values(&["before_filtering"])
298 .inc_by(self.rg_total as u64);
299 READ_ROW_GROUPS_TOTAL
300 .with_label_values(&["fulltext_index_filtered"])
301 .inc_by(self.rg_fulltext_filtered as u64);
302 READ_ROW_GROUPS_TOTAL
303 .with_label_values(&["inverted_index_filtered"])
304 .inc_by(self.rg_inverted_filtered as u64);
305 READ_ROW_GROUPS_TOTAL
306 .with_label_values(&["minmax_index_filtered"])
307 .inc_by(self.rg_minmax_filtered as u64);
308 READ_ROW_GROUPS_TOTAL
309 .with_label_values(&["bloom_filter_index_filtered"])
310 .inc_by(self.rg_bloom_filtered as u64);
311
312 PRECISE_FILTER_ROWS_TOTAL
313 .with_label_values(&["parquet"])
314 .inc_by(self.rows_precise_filtered as u64);
315 READ_ROWS_IN_ROW_GROUP_TOTAL
316 .with_label_values(&["before_filtering"])
317 .inc_by(self.rows_before_filter as u64);
318 READ_ROWS_IN_ROW_GROUP_TOTAL
319 .with_label_values(&["fulltext_index_filtered"])
320 .inc_by(self.rows_fulltext_filtered as u64);
321 READ_ROWS_IN_ROW_GROUP_TOTAL
322 .with_label_values(&["inverted_index_filtered"])
323 .inc_by(self.rows_inverted_filtered as u64);
324 READ_ROWS_IN_ROW_GROUP_TOTAL
325 .with_label_values(&["bloom_filter_index_filtered"])
326 .inc_by(self.rows_bloom_filtered as u64);
327 }
328}
329
330struct PartitionMetricsInner {
331 region_id: RegionId,
332 partition: usize,
334 scanner_type: &'static str,
336 query_start: Instant,
338 metrics: Mutex<ScanMetricsSet>,
340 in_progress_scan: IntGauge,
341
342 build_parts_cost: Time,
345 build_reader_cost: Time,
347 scan_cost: Time,
349 yield_cost: Time,
351}
352
353impl PartitionMetricsInner {
354 fn on_finish(&self) {
355 let mut metrics = self.metrics.lock().unwrap();
356 if metrics.total_cost.is_zero() {
357 metrics.total_cost = self.query_start.elapsed();
358 }
359 }
360}
361
362impl Drop for PartitionMetricsInner {
363 fn drop(&mut self) {
364 self.on_finish();
365 let metrics = self.metrics.lock().unwrap();
366 metrics.observe_metrics();
367 self.in_progress_scan.dec();
368
369 debug!(
370 "{} finished, region_id: {}, partition: {}, metrics: {:?}",
371 self.scanner_type, self.region_id, self.partition, metrics
372 );
373 }
374}
375
376#[derive(Default)]
378pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
379
380impl PartitionMetricsList {
381 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
383 let mut list = self.0.lock().unwrap();
384 if list.len() <= partition {
385 list.resize(partition + 1, None);
386 }
387 list[partition] = Some(metrics);
388 }
389
390 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
392 let list = self.0.lock().unwrap();
393 write!(f, ", metrics_per_partition: ")?;
394 f.debug_list()
395 .entries(list.iter().filter_map(|p| p.as_ref()))
396 .finish()
397 }
398}
399
400#[derive(Clone)]
402pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
403
404impl PartitionMetrics {
405 pub(crate) fn new(
406 region_id: RegionId,
407 partition: usize,
408 scanner_type: &'static str,
409 query_start: Instant,
410 metrics_set: &ExecutionPlanMetricsSet,
411 ) -> Self {
412 let partition_str = partition.to_string();
413 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
414 in_progress_scan.inc();
415 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
416 let inner = PartitionMetricsInner {
417 region_id,
418 partition,
419 scanner_type,
420 query_start,
421 metrics: Mutex::new(metrics),
422 in_progress_scan,
423 build_parts_cost: MetricBuilder::new(metrics_set)
424 .subset_time("build_parts_cost", partition),
425 build_reader_cost: MetricBuilder::new(metrics_set)
426 .subset_time("build_reader_cost", partition),
427 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
428 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
429 };
430 Self(Arc::new(inner))
431 }
432
433 pub(crate) fn on_first_poll(&self) {
434 let mut metrics = self.0.metrics.lock().unwrap();
435 metrics.first_poll = self.0.query_start.elapsed();
436 }
437
438 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
439 let mut metrics = self.0.metrics.lock().unwrap();
440 metrics.num_mem_ranges += num;
441 }
442
443 pub(crate) fn inc_num_file_ranges(&self, num: usize) {
444 let mut metrics = self.0.metrics.lock().unwrap();
445 metrics.num_file_ranges += num;
446 }
447
448 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
450 self.0.build_reader_cost.add_duration(cost);
451
452 let mut metrics = self.0.metrics.lock().unwrap();
453 metrics.build_reader_cost += cost;
454 }
455
456 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
458 self.0
459 .build_reader_cost
460 .add_duration(metrics.build_reader_cost);
461 self.0.scan_cost.add_duration(metrics.scan_cost);
462 self.0.yield_cost.add_duration(metrics.yield_cost);
463
464 let mut metrics_set = self.0.metrics.lock().unwrap();
465 metrics_set.merge_scanner_metrics(metrics);
466 }
467
468 pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
470 self.0.build_parts_cost.add_duration(metrics.build_cost);
471
472 let mut metrics_set = self.0.metrics.lock().unwrap();
473 metrics_set.merge_reader_metrics(metrics);
474 }
475
476 pub(crate) fn on_finish(&self) {
478 self.0.on_finish();
479 }
480
481 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
483 let mut metrics_set = self.0.metrics.lock().unwrap();
484 metrics_set.set_distributor_metrics(metrics);
485 }
486}
487
488impl fmt::Debug for PartitionMetrics {
489 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490 let metrics = self.0.metrics.lock().unwrap();
491 write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
492 }
493}
494
495#[derive(Default)]
497pub(crate) struct SeriesDistributorMetrics {
498 pub(crate) num_series_send_timeout: usize,
500 pub(crate) num_rows: usize,
502 pub(crate) num_batches: usize,
504 pub(crate) scan_cost: Duration,
506 pub(crate) yield_cost: Duration,
508}
509
510pub(crate) fn scan_mem_ranges(
512 stream_ctx: Arc<StreamContext>,
513 part_metrics: PartitionMetrics,
514 index: RowGroupIndex,
515 time_range: FileTimeRange,
516) -> impl Stream<Item = Result<Batch>> {
517 try_stream! {
518 let ranges = stream_ctx.input.build_mem_ranges(index);
519 part_metrics.inc_num_mem_ranges(ranges.len());
520 for range in ranges {
521 let build_reader_start = Instant::now();
522 let iter = range.build_iter(time_range)?;
523 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
524
525 let mut source = Source::Iter(iter);
526 while let Some(batch) = source.next_batch().await? {
527 yield batch;
528 }
529 }
530 }
531}
532
533pub(crate) fn scan_file_ranges(
535 stream_ctx: Arc<StreamContext>,
536 part_metrics: PartitionMetrics,
537 index: RowGroupIndex,
538 read_type: &'static str,
539 range_builder: Arc<RangeBuilderList>,
540) -> impl Stream<Item = Result<Batch>> {
541 try_stream! {
542 let mut reader_metrics = ReaderMetrics::default();
543 let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
544 part_metrics.inc_num_file_ranges(ranges.len());
545
546 for range in ranges {
547 let build_reader_start = Instant::now();
548 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
549 let build_cost = build_reader_start.elapsed();
550 part_metrics.inc_build_reader_cost(build_cost);
551 let compat_batch = range.compat_batch();
552 let mut source = Source::PruneReader(reader);
553 while let Some(mut batch) = source.next_batch().await? {
554 if let Some(compact_batch) = compat_batch {
555 batch = compact_batch.compat_batch(batch)?;
556 }
557 yield batch;
558 }
559 if let Source::PruneReader(reader) = source {
560 let prune_metrics = reader.metrics();
561 reader_metrics.merge_from(&prune_metrics);
562 }
563 }
564
565 reader_metrics.observe_rows(read_type);
567 reader_metrics.filter_metrics.observe();
568 part_metrics.merge_reader_metrics(&reader_metrics);
569 }
570}