Skip to main content

query/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
21use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
22use common_telemetry::warn;
23use datafusion::physical_plan::ExecutionPlan;
24use datatypes::schema::SchemaRef;
25use futures::Stream;
26use futures_util::ready;
27use lazy_static::lazy_static;
28use prometheus::*;
29
30use crate::dist_plan::MergeScanExec;
31
32/// Intermediate merge state for one participating region while collecting
33/// terminal correctness watermarks across merge-scan sub-stages.
34enum MergeState {
35    /// The region participated, but no explicit watermark result has been seen
36    /// yet for this merge.
37    Participated,
38    /// At least one branch reported that this region cannot prove a safe
39    /// checkpoint watermark for the current query round.
40    Unproved,
41    /// All seen branches agree the region can advance safely to this sequence.
42    Proved(u64),
43    /// Different proved sequences were reported for the same region. The final
44    /// result is degraded to `None`, and the collected values are logged.
45    Conflict {
46        /// Distinct proved watermark candidates reported for the region.
47        watermarks: Vec<u64>,
48    },
49}
50
51lazy_static! {
52    /// Timer of different stages in query.
53    pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
54        "greptime_query_stage_elapsed",
55        "query engine time elapsed during each stage",
56        &["stage"],
57        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
58    )
59    .unwrap();
60    pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
61        .with_label_values(&["parse_sql"]);
62    pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
63        .with_label_values(&["parse_promql"]);
64    pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
65        .with_label_values(&["optimize_logicalplan"]);
66    pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
67        .with_label_values(&["optimize_physicalplan"]);
68    pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
69        .with_label_values(&["create_physicalplan"]);
70    pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
71        .with_label_values(&["execute_plan"]);
72    pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
73        .with_label_values(&["merge_scan_poll"]);
74
75    pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
76        "greptime_query_merge_scan_regions",
77        "query merge scan regions"
78    )
79    .unwrap();
80    pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
81        "greptime_query_merge_scan_errors_total",
82        "query merge scan errors total"
83    )
84    .unwrap();
85    pub static ref PUSH_DOWN_FALLBACK_ERRORS_TOTAL: IntCounter = register_int_counter!(
86        "greptime_push_down_fallback_errors_total",
87        "query push down fallback errors total"
88    )
89    .unwrap();
90
91    pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!(
92        "greptime_query_memory_pool_usage_bytes",
93        "current query memory pool usage in bytes"
94    )
95    .unwrap();
96
97    pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!(
98        "greptime_query_memory_pool_rejected_total",
99        "total number of query memory allocations rejected"
100    )
101    .unwrap();
102}
103
104/// A stream to call the callback once a RecordBatch stream is done.
105pub struct OnDone<F> {
106    stream: SendableRecordBatchStream,
107    callback: Option<F>,
108}
109
110impl<F> OnDone<F> {
111    /// Attaches a `callback` to invoke once the `stream` is terminated.
112    pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
113        Self {
114            stream,
115            callback: Some(callback),
116        }
117    }
118}
119
120impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
121    fn name(&self) -> &str {
122        self.stream.name()
123    }
124
125    fn schema(&self) -> SchemaRef {
126        self.stream.schema()
127    }
128
129    fn output_ordering(&self) -> Option<&[OrderOption]> {
130        self.stream.output_ordering()
131    }
132
133    fn metrics(&self) -> Option<RecordBatchMetrics> {
134        self.stream.metrics()
135    }
136}
137
138impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
139    type Item = common_recordbatch::error::Result<RecordBatch>;
140
141    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142        match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
143            Some(rb) => Poll::Ready(Some(rb)),
144            None => {
145                if let Some(callback) = self.callback.take() {
146                    callback();
147                }
148                Poll::Ready(None)
149            }
150        }
151    }
152
153    fn size_hint(&self) -> (usize, Option<usize>) {
154        self.stream.size_hint()
155    }
156}
157
158pub struct RegionWatermarkMetricsStream {
159    stream: SendableRecordBatchStream,
160    plan: Arc<dyn ExecutionPlan>,
161}
162
163impl RegionWatermarkMetricsStream {
164    pub fn new(stream: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
165        Self { stream, plan }
166    }
167}
168
169impl RecordBatchStream for RegionWatermarkMetricsStream {
170    fn name(&self) -> &str {
171        self.stream.name()
172    }
173
174    fn schema(&self) -> SchemaRef {
175        self.stream.schema()
176    }
177
178    fn output_ordering(&self) -> Option<&[OrderOption]> {
179        self.stream.output_ordering()
180    }
181
182    fn metrics(&self) -> Option<RecordBatchMetrics> {
183        let mut metrics = self.stream.metrics()?;
184        let region_watermarks = collect_region_watermarks(self.plan.clone());
185        if !region_watermarks.is_empty() {
186            metrics.region_watermarks = region_watermarks;
187        }
188        Some(metrics)
189    }
190}
191
192impl Stream for RegionWatermarkMetricsStream {
193    type Item = common_recordbatch::error::Result<RecordBatch>;
194
195    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196        Pin::new(&mut self.stream).poll_next(cx)
197    }
198
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        self.stream.size_hint()
201    }
202}
203
204pub fn terminal_recordbatch_metrics_from_plan(
205    plan: Arc<dyn ExecutionPlan>,
206) -> Option<RecordBatchMetrics> {
207    let region_watermarks = collect_region_watermarks(plan);
208    if region_watermarks.is_empty() {
209        None
210    } else {
211        Some(RecordBatchMetrics {
212            region_watermarks,
213            ..Default::default()
214        })
215    }
216}
217
218fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermarkEntry> {
219    let mut merged = BTreeMap::<u64, MergeState>::new();
220    let mut stack = vec![plan];
221
222    while let Some(plan) = stack.pop() {
223        if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
224            merge_merge_scan_region_watermarks(
225                &mut merged,
226                merge_scan
227                    .regions()
228                    .iter()
229                    .map(|region_id| region_id.as_u64()),
230                merge_scan.sub_stage_metrics(),
231            );
232        }
233
234        stack.extend(plan.children().into_iter().cloned());
235    }
236
237    finalize_region_watermarks(merged)
238}
239
240fn merge_merge_scan_region_watermarks(
241    merged: &mut BTreeMap<u64, MergeState>,
242    regions: impl IntoIterator<Item = u64>,
243    sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
244) {
245    for region_id in regions {
246        merged.entry(region_id).or_insert(MergeState::Participated);
247    }
248
249    for metrics in sub_stage_metrics {
250        for entry in metrics.region_watermarks {
251            merged
252                .entry(entry.region_id)
253                .and_modify(|existing| match entry.watermark {
254                    None => match existing {
255                        MergeState::Participated | MergeState::Proved(_) => {
256                            *existing = MergeState::Unproved;
257                        }
258                        MergeState::Unproved | MergeState::Conflict { .. } => {}
259                    },
260                    Some(seq) => match existing {
261                        MergeState::Participated => {
262                            *existing = MergeState::Proved(seq);
263                        }
264                        MergeState::Unproved => {}
265                        MergeState::Proved(existing_seq) if *existing_seq == seq => {}
266                        MergeState::Proved(existing_seq) => {
267                            let old_seq = *existing_seq;
268                            *existing = MergeState::Conflict {
269                                watermarks: vec![old_seq, seq],
270                            };
271                        }
272                        MergeState::Conflict { watermarks } => {
273                            if !watermarks.contains(&seq) {
274                                watermarks.push(seq);
275                            }
276                        }
277                    },
278                })
279                .or_insert(match entry.watermark {
280                    Some(seq) => MergeState::Proved(seq),
281                    None => MergeState::Unproved,
282                });
283        }
284    }
285}
286
287fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWatermarkEntry> {
288    merged
289        .into_iter()
290        .map(|(region_id, state)| RegionWatermarkEntry {
291            region_id,
292            watermark: match state {
293                MergeState::Participated => None,
294                MergeState::Unproved => None,
295                MergeState::Proved(seq) => Some(seq),
296                MergeState::Conflict { watermarks } => {
297                    warn!(
298                        "Conflicting proved watermarks for region {}: {:?}; degrading to unproved",
299                        region_id, watermarks
300                    );
301                    None
302                }
303            },
304        })
305        .collect()
306}
307
308#[cfg(test)]
309mod tests {
310    use datafusion::arrow::datatypes::Schema as ArrowSchema;
311    use datafusion::physical_plan::empty::EmptyExec;
312
313    use super::*;
314
315    fn metrics_with_region_watermarks(entries: &[(u64, Option<u64>)]) -> RecordBatchMetrics {
316        RecordBatchMetrics {
317            region_watermarks: entries
318                .iter()
319                .map(|(region_id, watermark)| RegionWatermarkEntry {
320                    region_id: *region_id,
321                    watermark: *watermark,
322                })
323                .collect(),
324            ..Default::default()
325        }
326    }
327
328    #[test]
329    fn terminal_metrics_returns_none_without_merge_scan() {
330        let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty())));
331        assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
332    }
333
334    #[test]
335    fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() {
336        let mut merged = BTreeMap::new();
337
338        merge_merge_scan_region_watermarks(&mut merged, [1, 2], std::iter::empty());
339
340        assert_eq!(
341            finalize_region_watermarks(merged),
342            vec![
343                RegionWatermarkEntry {
344                    region_id: 1,
345                    watermark: None,
346                },
347                RegionWatermarkEntry {
348                    region_id: 2,
349                    watermark: None,
350                },
351            ]
352        );
353    }
354
355    #[test]
356    fn merge_merge_scan_region_watermarks_keeps_matching_proved_values() {
357        let mut merged = BTreeMap::new();
358
359        merge_merge_scan_region_watermarks(
360            &mut merged,
361            [42],
362            [
363                metrics_with_region_watermarks(&[(42, Some(7))]),
364                metrics_with_region_watermarks(&[(42, Some(7))]),
365            ],
366        );
367
368        assert_eq!(
369            finalize_region_watermarks(merged),
370            vec![RegionWatermarkEntry {
371                region_id: 42,
372                watermark: Some(7),
373            }]
374        );
375    }
376
377    #[test]
378    fn merge_merge_scan_region_watermarks_degrades_conflicting_proved_values() {
379        let mut merged = BTreeMap::new();
380
381        merge_merge_scan_region_watermarks(
382            &mut merged,
383            [7],
384            [
385                metrics_with_region_watermarks(&[(7, Some(11))]),
386                metrics_with_region_watermarks(&[(7, Some(13))]),
387            ],
388        );
389
390        assert_eq!(
391            finalize_region_watermarks(merged),
392            vec![RegionWatermarkEntry {
393                region_id: 7,
394                watermark: None,
395            }]
396        );
397    }
398
399    #[test]
400    fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() {
401        let mut merged = BTreeMap::new();
402
403        merge_merge_scan_region_watermarks(
404            &mut merged,
405            [9],
406            [
407                metrics_with_region_watermarks(&[(9, Some(21))]),
408                metrics_with_region_watermarks(&[(9, None)]),
409            ],
410        );
411
412        assert_eq!(
413            finalize_region_watermarks(merged),
414            vec![RegionWatermarkEntry {
415                region_id: 9,
416                watermark: None,
417            }]
418        );
419    }
420
421    #[test]
422    fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() {
423        let mut merged = BTreeMap::new();
424
425        merge_merge_scan_region_watermarks(
426            &mut merged,
427            [9],
428            [
429                metrics_with_region_watermarks(&[(9, None)]),
430                metrics_with_region_watermarks(&[(9, Some(21))]),
431            ],
432        );
433
434        assert_eq!(
435            finalize_region_watermarks(merged),
436            vec![RegionWatermarkEntry {
437                region_id: 9,
438                watermark: None,
439            }]
440        );
441    }
442}