Skip to main content

query/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
21use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
22use common_telemetry::warn;
23use datafusion::physical_plan::ExecutionPlan;
24use datatypes::schema::SchemaRef;
25use futures::Stream;
26use futures_util::ready;
27use lazy_static::lazy_static;
28use prometheus::*;
29use session::context::QueryContextRef;
30
31use crate::dist_plan::MergeScanExec;
32use crate::error::Result;
33use crate::options::FlowQueryExtensions;
34
35/// Intermediate merge state for one participating region while collecting
36/// terminal correctness watermarks across merge-scan sub-stages.
37enum MergeState {
38    /// At least one branch reported that this region cannot prove a safe
39    /// checkpoint watermark for the current query round.
40    Unproved,
41    /// All seen branches agree the region can advance safely to this sequence.
42    Proved(u64),
43    /// Different proved sequences were reported for the same region. The final
44    /// result is degraded to `None`, and the collected values are logged.
45    Conflict {
46        /// Distinct proved watermark candidates reported for the region.
47        watermarks: Vec<u64>,
48    },
49}
50
51lazy_static! {
52    /// Timer of different stages in query.
53    pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
54        "greptime_query_stage_elapsed",
55        "query engine time elapsed during each stage",
56        &["stage"],
57        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
58    )
59    .unwrap();
60    pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
61        .with_label_values(&["parse_sql"]);
62    pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
63        .with_label_values(&["parse_promql"]);
64    pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
65        .with_label_values(&["optimize_logicalplan"]);
66    pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
67        .with_label_values(&["optimize_physicalplan"]);
68    pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
69        .with_label_values(&["create_physicalplan"]);
70    pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
71        .with_label_values(&["execute_plan"]);
72    pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
73        .with_label_values(&["merge_scan_poll"]);
74
75    pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
76        "greptime_query_merge_scan_regions",
77        "query merge scan regions"
78    )
79    .unwrap();
80    pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
81        "greptime_query_merge_scan_errors_total",
82        "query merge scan errors total"
83    )
84    .unwrap();
85    pub static ref PUSH_DOWN_FALLBACK_ERRORS_TOTAL: IntCounter = register_int_counter!(
86        "greptime_push_down_fallback_errors_total",
87        "query push down fallback errors total"
88    )
89    .unwrap();
90
91    pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!(
92        "greptime_query_memory_pool_usage_bytes",
93        "current query memory pool usage in bytes"
94    )
95    .unwrap();
96
97    pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!(
98        "greptime_query_memory_pool_rejected_total",
99        "total number of query memory allocations rejected"
100    )
101    .unwrap();
102}
103
104/// A stream to call the callback once a RecordBatch stream is done.
105pub struct OnDone<F> {
106    stream: SendableRecordBatchStream,
107    callback: Option<F>,
108}
109
110impl<F> OnDone<F> {
111    /// Attaches a `callback` to invoke once the `stream` is terminated.
112    pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
113        Self {
114            stream,
115            callback: Some(callback),
116        }
117    }
118}
119
120impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
121    fn name(&self) -> &str {
122        self.stream.name()
123    }
124
125    fn schema(&self) -> SchemaRef {
126        self.stream.schema()
127    }
128
129    fn output_ordering(&self) -> Option<&[OrderOption]> {
130        self.stream.output_ordering()
131    }
132
133    fn metrics(&self) -> Option<RecordBatchMetrics> {
134        self.stream.metrics()
135    }
136}
137
138impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
139    type Item = common_recordbatch::error::Result<RecordBatch>;
140
141    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142        match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
143            Some(rb) => Poll::Ready(Some(rb)),
144            None => {
145                if let Some(callback) = self.callback.take() {
146                    callback();
147                }
148                Poll::Ready(None)
149            }
150        }
151    }
152
153    fn size_hint(&self) -> (usize, Option<usize>) {
154        self.stream.size_hint()
155    }
156}
157
158pub struct RegionWatermarkMetricsStream {
159    stream: SendableRecordBatchStream,
160    plan: Arc<dyn ExecutionPlan>,
161}
162
163impl RegionWatermarkMetricsStream {
164    pub fn new(stream: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
165        Self { stream, plan }
166    }
167}
168
169impl RecordBatchStream for RegionWatermarkMetricsStream {
170    fn name(&self) -> &str {
171        self.stream.name()
172    }
173
174    fn schema(&self) -> SchemaRef {
175        self.stream.schema()
176    }
177
178    fn output_ordering(&self) -> Option<&[OrderOption]> {
179        self.stream.output_ordering()
180    }
181
182    fn metrics(&self) -> Option<RecordBatchMetrics> {
183        let mut metrics = self.stream.metrics()?;
184        let region_watermarks = collect_region_watermarks(self.plan.clone());
185        if !region_watermarks.is_empty() {
186            metrics.region_watermarks = region_watermarks;
187        }
188        Some(metrics)
189    }
190}
191
192impl Stream for RegionWatermarkMetricsStream {
193    type Item = common_recordbatch::error::Result<RecordBatch>;
194
195    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196        Pin::new(&mut self.stream).poll_next(cx)
197    }
198
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        self.stream.size_hint()
201    }
202}
203
204/// Returns whether terminal region watermark metrics should be collected for the query context.
205pub fn should_collect_region_watermark_from_query_ctx(query_ctx: &QueryContextRef) -> Result<bool> {
206    Ok(
207        FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
208            .is_some_and(|extensions| extensions.should_collect_region_watermark()),
209    )
210}
211
212/// Attaches terminal region watermark metrics to `stream` when collection is requested.
213pub fn maybe_attach_region_watermark_metrics(
214    stream: SendableRecordBatchStream,
215    plan: Arc<dyn ExecutionPlan>,
216    should_collect_region_watermark: bool,
217) -> SendableRecordBatchStream {
218    if should_collect_region_watermark {
219        Box::pin(RegionWatermarkMetricsStream::new(stream, plan))
220    } else {
221        stream
222    }
223}
224
225pub fn terminal_recordbatch_metrics_from_plan(
226    plan: Arc<dyn ExecutionPlan>,
227) -> Option<RecordBatchMetrics> {
228    let region_watermarks = collect_region_watermarks(plan);
229    if region_watermarks.is_empty() {
230        None
231    } else {
232        Some(RecordBatchMetrics {
233            region_watermarks,
234            ..Default::default()
235        })
236    }
237}
238
239/// Collects terminal record-batch metrics from `plan` only when requested.
240pub fn terminal_recordbatch_metrics_from_plan_if_requested(
241    plan: Option<Arc<dyn ExecutionPlan>>,
242    should_collect_region_watermark: bool,
243) -> Option<RecordBatchMetrics> {
244    if should_collect_region_watermark {
245        plan.and_then(terminal_recordbatch_metrics_from_plan)
246    } else {
247        None
248    }
249}
250
251fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermarkEntry> {
252    let mut merged = BTreeMap::<u64, MergeState>::new();
253    let mut stack = vec![plan];
254
255    while let Some(plan) = stack.pop() {
256        if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>()
257            && !merge_scan.is_flow_sink_scan()
258        {
259            merge_merge_scan_region_watermarks(
260                &mut merged,
261                merge_scan
262                    .regions()
263                    .iter()
264                    .map(|region_id| region_id.as_u64()),
265                merge_scan.sub_stage_metrics(),
266            );
267        }
268        stack.extend(plan.children().into_iter().cloned());
269    }
270
271    finalize_region_watermarks(merged)
272}
273
274/// Merge a batch of per-region watermark entries into the global merged state.
275///
276/// # Merge strategy: correctness over maximum
277///
278/// Flow checkpoint advancement requires provable watermarks so that incremental
279/// queries never miss rows. This merge uses correctness-first semantics:
280///
281/// | Current state  | New entry       | Result            | Rationale |
282/// |---------------|-----------------|-------------------|-----------|
283/// | Proved(old)   | Proved(same)    | Proved(old)       | Convergent proof, keep |
284/// | Proved(old)   | Proved(diff)    | Conflict([old,diff]) | Ambiguous → degrade to unproved |
285/// | Unproved      | _anything_      | Unproved          | Already unsafe, stays unsafe |
286/// | Conflict{..}  | Proved(seq)     | Conflict[...seq]  | Record for diagnostics |
287///
288/// Using `max(old, new)` would be incorrect because it could advance a
289/// checkpoint past rows that a competing MergeScan sub-stage has not yet
290/// scanned, causing Flow to skip data.
291fn merge_region_watermark_entries(
292    merged: &mut BTreeMap<u64, MergeState>,
293    entries: impl IntoIterator<Item = RegionWatermarkEntry>,
294) {
295    for entry in entries {
296        merged
297            .entry(entry.region_id)
298            .and_modify(|existing| match entry.watermark {
299                None => match existing {
300                    MergeState::Proved(_) => {
301                        *existing = MergeState::Unproved;
302                    }
303                    MergeState::Unproved | MergeState::Conflict { .. } => {}
304                },
305                Some(seq) => match existing {
306                    MergeState::Unproved => {}
307                    MergeState::Proved(existing_seq) if *existing_seq == seq => {}
308                    MergeState::Proved(existing_seq) => {
309                        let old_seq = *existing_seq;
310                        *existing = MergeState::Conflict {
311                            watermarks: vec![old_seq, seq],
312                        };
313                    }
314                    MergeState::Conflict { watermarks } => {
315                        if !watermarks.contains(&seq) {
316                            watermarks.push(seq);
317                        }
318                    }
319                },
320            })
321            .or_insert(match entry.watermark {
322                Some(seq) => MergeState::Proved(seq),
323                None => MergeState::Unproved,
324            });
325    }
326}
327
328fn merge_merge_scan_region_watermarks(
329    merged: &mut BTreeMap<u64, MergeState>,
330    regions: impl IntoIterator<Item = u64>,
331    sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
332) {
333    let regions = regions.into_iter().collect::<Vec<_>>();
334    let mut proved_or_unproved_regions = BTreeSet::new();
335    for metrics in sub_stage_metrics {
336        proved_or_unproved_regions.extend(
337            metrics
338                .region_watermarks
339                .iter()
340                .map(|entry| entry.region_id),
341        );
342        merge_region_watermark_entries(merged, metrics.region_watermarks);
343    }
344
345    // Regions listed by a MergeScanExec participated even when no sub-stage can
346    // prove a watermark. Merge missing per-scan region entries as explicit
347    // `None` entries so an unproved participating branch vetoes any proof from
348    // another branch for the same region.
349    merge_region_watermark_entries(
350        merged,
351        regions
352            .into_iter()
353            .filter(|region_id| !proved_or_unproved_regions.contains(region_id))
354            .map(|region_id| RegionWatermarkEntry {
355                region_id,
356                watermark: None,
357            }),
358    );
359}
360
361fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWatermarkEntry> {
362    merged
363        .into_iter()
364        .map(|(region_id, state)| RegionWatermarkEntry {
365            region_id,
366            watermark: match state {
367                MergeState::Unproved => None,
368                MergeState::Proved(seq) => Some(seq),
369                MergeState::Conflict { watermarks } => {
370                    warn!(
371                        "Conflicting proved watermarks for region {}: {:?}; degrading to unproved",
372                        region_id, watermarks
373                    );
374                    None
375                }
376            },
377        })
378        .collect()
379}
380
381#[cfg(test)]
382mod tests {
383    use std::collections::{BTreeMap, BTreeSet};
384    use std::sync::Arc;
385
386    use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
387    use async_trait::async_trait;
388    use datafusion::arrow::datatypes::Schema as ArrowSchema;
389    use datafusion::execution::SessionStateBuilder;
390    use datafusion::physical_plan::empty::EmptyExec;
391    use datafusion_expr::LogicalPlanBuilder;
392    use session::ReadPreference;
393    use session::context::QueryContextBuilder;
394    use store_api::storage::RegionId;
395    use table::table_name::TableName;
396
397    use super::*;
398    use crate::dist_plan::RemoteDynFilterProducerId;
399    use crate::options::{FLOW_RETURN_REGION_SEQ, FLOW_SINK_TABLE_ID};
400    use crate::region_query::RegionQueryHandler;
401
402    struct NoopRegionQueryHandler;
403
404    #[async_trait]
405    impl RegionQueryHandler for NoopRegionQueryHandler {
406        async fn do_get(
407            &self,
408            _read_preference: ReadPreference,
409            _request: common_query::request::QueryRequest,
410        ) -> Result<SendableRecordBatchStream> {
411            unreachable!("metrics tests should not execute remote queries")
412        }
413
414        async fn handle_remote_dyn_filter_update(
415            &self,
416            _region_id: RegionId,
417            _query_id: String,
418            _update: RemoteDynFilterUpdate,
419        ) -> Result<()> {
420            unreachable!("metrics tests should not send remote dyn filter updates")
421        }
422
423        async fn handle_remote_dyn_filter_unregister(
424            &self,
425            _region_id: RegionId,
426            _query_id: String,
427            _unregister: RemoteDynFilterUnregister,
428        ) -> Result<()> {
429            unreachable!("metrics tests should not send remote dyn filter unregisters")
430        }
431    }
432
433    fn metrics_with_region_watermarks(entries: &[(u64, Option<u64>)]) -> RecordBatchMetrics {
434        RecordBatchMetrics {
435            region_watermarks: entries
436                .iter()
437                .map(|(region_id, watermark)| RegionWatermarkEntry {
438                    region_id: *region_id,
439                    watermark: *watermark,
440                })
441                .collect(),
442            ..Default::default()
443        }
444    }
445
446    fn test_merge_scan_exec(table_id: u32, query_ctx: QueryContextRef) -> Arc<dyn ExecutionPlan> {
447        let session_state = SessionStateBuilder::new().with_default_features().build();
448        let plan = LogicalPlanBuilder::empty(false).build().unwrap();
449        let schema = ArrowSchema::empty();
450
451        Arc::new(
452            MergeScanExec::new(
453                &session_state,
454                TableName::new("greptime", "public", "test"),
455                vec![RegionId::new(table_id, 0)],
456                plan,
457                &schema,
458                Arc::new(NoopRegionQueryHandler),
459                query_ctx,
460                1,
461                BTreeMap::<String, BTreeSet<datafusion_common::Column>>::new(),
462                Some(RemoteDynFilterProducerId::new(0)),
463            )
464            .unwrap(),
465        )
466    }
467
468    fn flow_query_ctx_with_sink_table_id(sink_table_id: u32) -> QueryContextRef {
469        Arc::new(
470            QueryContextBuilder::default()
471                .set_extension(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())
472                .set_extension(FLOW_SINK_TABLE_ID.to_string(), sink_table_id.to_string())
473                .build(),
474        )
475    }
476
477    #[test]
478    fn terminal_metrics_returns_none_without_merge_scan() {
479        let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty())));
480        assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
481    }
482
483    #[test]
484    fn terminal_metrics_skip_flow_sink_merge_scan_regions() {
485        let query_ctx = flow_query_ctx_with_sink_table_id(42);
486        let plan = test_merge_scan_exec(42, query_ctx);
487
488        assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
489    }
490
491    #[test]
492    fn terminal_metrics_keep_source_merge_scan_regions_with_sink_extension() {
493        let query_ctx = flow_query_ctx_with_sink_table_id(42);
494        let plan = test_merge_scan_exec(43, query_ctx);
495
496        assert_eq!(
497            terminal_recordbatch_metrics_from_plan(plan)
498                .unwrap()
499                .region_watermarks,
500            vec![RegionWatermarkEntry {
501                region_id: RegionId::new(43, 0).as_u64(),
502                watermark: None,
503            }]
504        );
505    }
506
507    #[test]
508    fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() {
509        let mut merged = BTreeMap::new();
510
511        merge_merge_scan_region_watermarks(&mut merged, [1, 2], std::iter::empty());
512
513        assert_eq!(
514            finalize_region_watermarks(merged),
515            vec![
516                RegionWatermarkEntry {
517                    region_id: 1,
518                    watermark: None,
519                },
520                RegionWatermarkEntry {
521                    region_id: 2,
522                    watermark: None,
523                },
524            ]
525        );
526    }
527
528    #[test]
529    fn merge_merge_scan_region_watermarks_keeps_matching_proved_values() {
530        let mut merged = BTreeMap::new();
531
532        merge_merge_scan_region_watermarks(
533            &mut merged,
534            [42],
535            [
536                metrics_with_region_watermarks(&[(42, Some(7))]),
537                metrics_with_region_watermarks(&[(42, Some(7))]),
538            ],
539        );
540
541        assert_eq!(
542            finalize_region_watermarks(merged),
543            vec![RegionWatermarkEntry {
544                region_id: 42,
545                watermark: Some(7),
546            }]
547        );
548    }
549
550    #[test]
551    fn merge_merge_scan_region_watermarks_degrades_conflicting_proved_values() {
552        let mut merged = BTreeMap::new();
553
554        merge_merge_scan_region_watermarks(
555            &mut merged,
556            [7],
557            [
558                metrics_with_region_watermarks(&[(7, Some(11))]),
559                metrics_with_region_watermarks(&[(7, Some(13))]),
560            ],
561        );
562
563        assert_eq!(
564            finalize_region_watermarks(merged),
565            vec![RegionWatermarkEntry {
566                region_id: 7,
567                watermark: None,
568            }]
569        );
570    }
571
572    #[test]
573    fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() {
574        let mut merged = BTreeMap::new();
575
576        merge_merge_scan_region_watermarks(
577            &mut merged,
578            [9],
579            [
580                metrics_with_region_watermarks(&[(9, Some(21))]),
581                metrics_with_region_watermarks(&[(9, None)]),
582            ],
583        );
584
585        assert_eq!(
586            finalize_region_watermarks(merged),
587            vec![RegionWatermarkEntry {
588                region_id: 9,
589                watermark: None,
590            }]
591        );
592    }
593
594    #[test]
595    fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() {
596        let mut merged = BTreeMap::new();
597
598        merge_merge_scan_region_watermarks(
599            &mut merged,
600            [9],
601            [
602                metrics_with_region_watermarks(&[(9, None)]),
603                metrics_with_region_watermarks(&[(9, Some(21))]),
604            ],
605        );
606
607        assert_eq!(
608            finalize_region_watermarks(merged),
609            vec![RegionWatermarkEntry {
610                region_id: 9,
611                watermark: None,
612            }]
613        );
614    }
615
616    #[test]
617    fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value() {
618        let mut merged = BTreeMap::new();
619
620        merge_merge_scan_region_watermarks(
621            &mut merged,
622            [9],
623            [metrics_with_region_watermarks(&[(9, Some(21))])],
624        );
625        merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty());
626
627        assert_eq!(
628            finalize_region_watermarks(merged),
629            vec![RegionWatermarkEntry {
630                region_id: 9,
631                watermark: None,
632            }]
633        );
634    }
635
636    #[test]
637    fn merge_merge_scan_region_watermarks_missing_branch_vetoes_proved_value_regardless_of_order() {
638        let mut merged = BTreeMap::new();
639
640        merge_merge_scan_region_watermarks(&mut merged, [9], std::iter::empty());
641        merge_merge_scan_region_watermarks(
642            &mut merged,
643            [9],
644            [metrics_with_region_watermarks(&[(9, Some(21))])],
645        );
646
647        assert_eq!(
648            finalize_region_watermarks(merged),
649            vec![RegionWatermarkEntry {
650                region_id: 9,
651                watermark: None,
652            }]
653        );
654    }
655}