1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::metrics::{
30 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
31 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
32};
33use crate::read::range::{RangeBuilderList, RowGroupIndex};
34use crate::read::scan_region::StreamContext;
35use crate::read::{Batch, ScannerMetrics, Source};
36use crate::sst::file::FileTimeRange;
37use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
38
39#[derive(Default)]
41struct ScanMetricsSet {
42 prepare_scan_cost: Duration,
44 build_reader_cost: Duration,
46 scan_cost: Duration,
48 convert_cost: Duration,
50 yield_cost: Duration,
52 total_cost: Duration,
54 num_rows: usize,
56 num_batches: usize,
58 num_mem_ranges: usize,
60 num_file_ranges: usize,
62
63 build_parts_cost: Duration,
66 rg_total: usize,
68 rg_fulltext_filtered: usize,
70 rg_inverted_filtered: usize,
72 rg_minmax_filtered: usize,
74 rg_bloom_filtered: usize,
76 rows_before_filter: usize,
78 rows_fulltext_filtered: usize,
80 rows_inverted_filtered: usize,
82 rows_bloom_filtered: usize,
84 rows_precise_filtered: usize,
86 num_sst_record_batches: usize,
88 num_sst_batches: usize,
90 num_sst_rows: usize,
92
93 first_poll: Duration,
95}
96
97impl fmt::Debug for ScanMetricsSet {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 let ScanMetricsSet {
100 prepare_scan_cost,
101 build_reader_cost,
102 scan_cost,
103 convert_cost,
104 yield_cost,
105 total_cost,
106 num_rows,
107 num_batches,
108 num_mem_ranges,
109 num_file_ranges,
110 build_parts_cost,
111 rg_total,
112 rg_fulltext_filtered,
113 rg_inverted_filtered,
114 rg_minmax_filtered,
115 rg_bloom_filtered,
116 rows_before_filter,
117 rows_fulltext_filtered,
118 rows_inverted_filtered,
119 rows_bloom_filtered,
120 rows_precise_filtered,
121 num_sst_record_batches,
122 num_sst_batches,
123 num_sst_rows,
124 first_poll,
125 } = self;
126
127 write!(
128 f,
129 "{{prepare_scan_cost={prepare_scan_cost:?}, \
130 build_reader_cost={build_reader_cost:?}, \
131 scan_cost={scan_cost:?}, \
132 convert_cost={convert_cost:?}, \
133 yield_cost={yield_cost:?}, \
134 total_cost={total_cost:?}, \
135 num_rows={num_rows}, \
136 num_batches={num_batches}, \
137 num_mem_ranges={num_mem_ranges}, \
138 num_file_ranges={num_file_ranges}, \
139 build_parts_cost={build_parts_cost:?}, \
140 rg_total={rg_total}, \
141 rg_fulltext_filtered={rg_fulltext_filtered}, \
142 rg_inverted_filtered={rg_inverted_filtered}, \
143 rg_minmax_filtered={rg_minmax_filtered}, \
144 rg_bloom_filtered={rg_bloom_filtered}, \
145 rows_before_filter={rows_before_filter}, \
146 rows_fulltext_filtered={rows_fulltext_filtered}, \
147 rows_inverted_filtered={rows_inverted_filtered}, \
148 rows_bloom_filtered={rows_bloom_filtered}, \
149 rows_precise_filtered={rows_precise_filtered}, \
150 num_sst_record_batches={num_sst_record_batches}, \
151 num_sst_batches={num_sst_batches}, \
152 num_sst_rows={num_sst_rows}, \
153 first_poll={first_poll:?}}}"
154 )
155 }
156}
157impl ScanMetricsSet {
158 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
160 self.prepare_scan_cost += cost;
161 self
162 }
163
164 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
166 let ScannerMetrics {
167 prepare_scan_cost,
168 build_reader_cost,
169 scan_cost,
170 convert_cost,
171 yield_cost,
172 num_batches,
173 num_rows,
174 num_mem_ranges,
175 num_file_ranges,
176 } = other;
177
178 self.prepare_scan_cost += *prepare_scan_cost;
179 self.build_reader_cost += *build_reader_cost;
180 self.scan_cost += *scan_cost;
181 self.convert_cost += *convert_cost;
182 self.yield_cost += *yield_cost;
183 self.num_rows += *num_rows;
184 self.num_batches += *num_batches;
185 self.num_mem_ranges += *num_mem_ranges;
186 self.num_file_ranges += *num_file_ranges;
187 }
188
189 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
191 let ReaderMetrics {
192 build_cost,
193 filter_metrics:
194 ReaderFilterMetrics {
195 rg_total,
196 rg_fulltext_filtered,
197 rg_inverted_filtered,
198 rg_minmax_filtered,
199 rg_bloom_filtered,
200 rows_total,
201 rows_fulltext_filtered,
202 rows_inverted_filtered,
203 rows_bloom_filtered,
204 rows_precise_filtered,
205 },
206 num_record_batches,
207 num_batches,
208 num_rows,
209 scan_cost: _,
210 } = other;
211
212 self.build_parts_cost += *build_cost;
213
214 self.rg_total += *rg_total;
215 self.rg_fulltext_filtered += *rg_fulltext_filtered;
216 self.rg_inverted_filtered += *rg_inverted_filtered;
217 self.rg_minmax_filtered += *rg_minmax_filtered;
218 self.rg_bloom_filtered += *rg_bloom_filtered;
219
220 self.rows_before_filter += *rows_total;
221 self.rows_fulltext_filtered += *rows_fulltext_filtered;
222 self.rows_inverted_filtered += *rows_inverted_filtered;
223 self.rows_bloom_filtered += *rows_bloom_filtered;
224 self.rows_precise_filtered += *rows_precise_filtered;
225
226 self.num_sst_record_batches += *num_record_batches;
227 self.num_sst_batches += *num_batches;
228 self.num_sst_rows += *num_rows;
229 }
230
231 fn observe_metrics(&self) {
233 READ_STAGE_ELAPSED
234 .with_label_values(&["prepare_scan"])
235 .observe(self.prepare_scan_cost.as_secs_f64());
236 READ_STAGE_ELAPSED
237 .with_label_values(&["build_reader"])
238 .observe(self.build_reader_cost.as_secs_f64());
239 READ_STAGE_ELAPSED
240 .with_label_values(&["convert_rb"])
241 .observe(self.convert_cost.as_secs_f64());
242 READ_STAGE_ELAPSED
243 .with_label_values(&["scan"])
244 .observe(self.scan_cost.as_secs_f64());
245 READ_STAGE_ELAPSED
246 .with_label_values(&["yield"])
247 .observe(self.yield_cost.as_secs_f64());
248 READ_STAGE_ELAPSED
249 .with_label_values(&["total"])
250 .observe(self.total_cost.as_secs_f64());
251 READ_ROWS_RETURN.observe(self.num_rows as f64);
252 READ_BATCHES_RETURN.observe(self.num_batches as f64);
253
254 READ_STAGE_ELAPSED
255 .with_label_values(&["build_parts"])
256 .observe(self.build_parts_cost.as_secs_f64());
257
258 READ_ROW_GROUPS_TOTAL
259 .with_label_values(&["before_filtering"])
260 .inc_by(self.rg_total as u64);
261 READ_ROW_GROUPS_TOTAL
262 .with_label_values(&["fulltext_index_filtered"])
263 .inc_by(self.rg_fulltext_filtered as u64);
264 READ_ROW_GROUPS_TOTAL
265 .with_label_values(&["inverted_index_filtered"])
266 .inc_by(self.rg_inverted_filtered as u64);
267 READ_ROW_GROUPS_TOTAL
268 .with_label_values(&["minmax_index_filtered"])
269 .inc_by(self.rg_minmax_filtered as u64);
270 READ_ROW_GROUPS_TOTAL
271 .with_label_values(&["bloom_filter_index_filtered"])
272 .inc_by(self.rg_bloom_filtered as u64);
273
274 PRECISE_FILTER_ROWS_TOTAL
275 .with_label_values(&["parquet"])
276 .inc_by(self.rows_precise_filtered as u64);
277 READ_ROWS_IN_ROW_GROUP_TOTAL
278 .with_label_values(&["before_filtering"])
279 .inc_by(self.rows_before_filter as u64);
280 READ_ROWS_IN_ROW_GROUP_TOTAL
281 .with_label_values(&["fulltext_index_filtered"])
282 .inc_by(self.rows_fulltext_filtered as u64);
283 READ_ROWS_IN_ROW_GROUP_TOTAL
284 .with_label_values(&["inverted_index_filtered"])
285 .inc_by(self.rows_inverted_filtered as u64);
286 READ_ROWS_IN_ROW_GROUP_TOTAL
287 .with_label_values(&["bloom_filter_index_filtered"])
288 .inc_by(self.rows_bloom_filtered as u64);
289 }
290}
291
292struct PartitionMetricsInner {
293 region_id: RegionId,
294 partition: usize,
296 scanner_type: &'static str,
298 query_start: Instant,
300 metrics: Mutex<ScanMetricsSet>,
302 in_progress_scan: IntGauge,
303
304 build_parts_cost: Time,
307 build_reader_cost: Time,
309 scan_cost: Time,
311 yield_cost: Time,
313}
314
315impl PartitionMetricsInner {
316 fn on_finish(&self) {
317 let mut metrics = self.metrics.lock().unwrap();
318 if metrics.total_cost.is_zero() {
319 metrics.total_cost = self.query_start.elapsed();
320 }
321 }
322}
323
324impl Drop for PartitionMetricsInner {
325 fn drop(&mut self) {
326 self.on_finish();
327 let metrics = self.metrics.lock().unwrap();
328 metrics.observe_metrics();
329 self.in_progress_scan.dec();
330
331 debug!(
332 "{} finished, region_id: {}, partition: {}, metrics: {:?}",
333 self.scanner_type, self.region_id, self.partition, metrics
334 );
335 }
336}
337
338#[derive(Default)]
340pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
341
342impl PartitionMetricsList {
343 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
345 let mut list = self.0.lock().unwrap();
346 if list.len() <= partition {
347 list.resize(partition + 1, None);
348 }
349 list[partition] = Some(metrics);
350 }
351
352 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
354 let list = self.0.lock().unwrap();
355 write!(f, ", metrics_per_partition: ")?;
356 f.debug_list()
357 .entries(list.iter().filter_map(|p| p.as_ref()))
358 .finish()
359 }
360}
361
362#[derive(Clone)]
364pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
365
366impl PartitionMetrics {
367 pub(crate) fn new(
368 region_id: RegionId,
369 partition: usize,
370 scanner_type: &'static str,
371 query_start: Instant,
372 metrics_set: &ExecutionPlanMetricsSet,
373 ) -> Self {
374 let partition_str = partition.to_string();
375 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
376 in_progress_scan.inc();
377 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
378 let inner = PartitionMetricsInner {
379 region_id,
380 partition,
381 scanner_type,
382 query_start,
383 metrics: Mutex::new(metrics),
384 in_progress_scan,
385 build_parts_cost: MetricBuilder::new(metrics_set)
386 .subset_time("build_parts_cost", partition),
387 build_reader_cost: MetricBuilder::new(metrics_set)
388 .subset_time("build_reader_cost", partition),
389 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
390 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
391 };
392 Self(Arc::new(inner))
393 }
394
395 pub(crate) fn on_first_poll(&self) {
396 let mut metrics = self.0.metrics.lock().unwrap();
397 metrics.first_poll = self.0.query_start.elapsed();
398 }
399
400 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
401 let mut metrics = self.0.metrics.lock().unwrap();
402 metrics.num_mem_ranges += num;
403 }
404
405 pub(crate) fn inc_num_file_ranges(&self, num: usize) {
406 let mut metrics = self.0.metrics.lock().unwrap();
407 metrics.num_file_ranges += num;
408 }
409
410 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
412 self.0.build_reader_cost.add_duration(cost);
413
414 let mut metrics = self.0.metrics.lock().unwrap();
415 metrics.build_reader_cost += cost;
416 }
417
418 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
420 self.0
421 .build_reader_cost
422 .add_duration(metrics.build_reader_cost);
423 self.0.scan_cost.add_duration(metrics.scan_cost);
424 self.0.yield_cost.add_duration(metrics.yield_cost);
425
426 let mut metrics_set = self.0.metrics.lock().unwrap();
427 metrics_set.merge_scanner_metrics(metrics);
428 }
429
430 pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
432 self.0.build_parts_cost.add_duration(metrics.build_cost);
433
434 let mut metrics_set = self.0.metrics.lock().unwrap();
435 metrics_set.merge_reader_metrics(metrics);
436 }
437
438 pub(crate) fn on_finish(&self) {
440 self.0.on_finish();
441 }
442}
443
444impl fmt::Debug for PartitionMetrics {
445 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446 let metrics = self.0.metrics.lock().unwrap();
447 write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
448 }
449}
450
451pub(crate) fn scan_mem_ranges(
453 stream_ctx: Arc<StreamContext>,
454 part_metrics: PartitionMetrics,
455 index: RowGroupIndex,
456 time_range: FileTimeRange,
457) -> impl Stream<Item = Result<Batch>> {
458 try_stream! {
459 let ranges = stream_ctx.input.build_mem_ranges(index);
460 part_metrics.inc_num_mem_ranges(ranges.len());
461 for range in ranges {
462 let build_reader_start = Instant::now();
463 let iter = range.build_iter(time_range)?;
464 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
465
466 let mut source = Source::Iter(iter);
467 while let Some(batch) = source.next_batch().await? {
468 yield batch;
469 }
470 }
471 }
472}
473
474pub(crate) fn scan_file_ranges(
476 stream_ctx: Arc<StreamContext>,
477 part_metrics: PartitionMetrics,
478 index: RowGroupIndex,
479 read_type: &'static str,
480 range_builder: Arc<RangeBuilderList>,
481) -> impl Stream<Item = Result<Batch>> {
482 try_stream! {
483 let mut reader_metrics = ReaderMetrics::default();
484 let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
485 part_metrics.inc_num_file_ranges(ranges.len());
486
487 for range in ranges {
488 let build_reader_start = Instant::now();
489 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
490 let build_cost = build_reader_start.elapsed();
491 part_metrics.inc_build_reader_cost(build_cost);
492 let compat_batch = range.compat_batch();
493 let mut source = Source::PruneReader(reader);
494 while let Some(mut batch) = source.next_batch().await? {
495 if let Some(compact_batch) = compat_batch {
496 batch = compact_batch.compat_batch(batch)?;
497 }
498 yield batch;
499 }
500 if let Source::PruneReader(reader) = source {
501 let prune_metrics = reader.metrics();
502 reader_metrics.merge_from(&prune_metrics);
503 }
504 }
505
506 reader_metrics.observe_rows(read_type);
508 reader_metrics.filter_metrics.observe();
509 part_metrics.merge_reader_metrics(&reader_metrics);
510 }
511}