Skip to main content

query/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_base::memory_limit::MemoryLimit;
18use serde::{Deserialize, Serialize};
19use store_api::storage::RegionId;
20use table::metadata::TableId;
21
22use crate::error::{Error, InvalidQueryContextExtensionSnafu, Result};
23
24pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
25pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
26pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
27pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
28/// Enable by default, set to false to explicitly disable.
29pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str =
30    "query.enable_remote_dynamic_filter_pushdown";
31
32pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
33
34/// Query engine config
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(default)]
37pub struct QueryOptions {
38    /// Parallelism of query engine. Default to 0, which implies the number of logical CPUs.
39    pub parallelism: usize,
40    /// Whether to allow query fallback when push down fails.
41    pub allow_query_fallback: bool,
42    /// Memory pool size for query execution. Setting it to 0 disables the limit (unbounded).
43    /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
44    /// When this limit is reached, queries will fail with ResourceExhausted error.
45    pub memory_pool_size: MemoryLimit,
46}
47
48#[allow(clippy::derivable_impls)]
49impl Default for QueryOptions {
50    fn default() -> Self {
51        Self {
52            parallelism: 0,
53            allow_query_fallback: false,
54            memory_pool_size: MemoryLimit::default(),
55        }
56    }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum FlowIncrementalMode {
61    MemtableOnly,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Default)]
65pub struct FlowQueryExtensions {
66    /// Maps region id -> lower exclusive sequence bound for incremental reads.
67    pub incremental_after_seqs: Option<HashMap<u64, u64>>,
68    /// Incremental read mode requested by the caller.
69    pub incremental_mode: Option<FlowIncrementalMode>,
70    /// Whether the caller expects per-region watermark metadata in terminal metrics.
71    pub return_region_seq: bool,
72    /// Optional sink table id used to distinguish source scans from sink reads.
73    pub sink_table_id: Option<TableId>,
74}
75
76impl FlowQueryExtensions {
77    /// Parses flow-specific query extensions when any flow key is present.
78    ///
79    /// Returns `Ok(None)` for ordinary queries with no flow-related extensions,
80    /// `Ok(Some(_))` when flow context is present and valid, and `Err(_)` when a
81    /// flow-related extension is present but malformed or incomplete.
82    pub fn parse_flow_extensions(extensions: &HashMap<String, String>) -> Result<Option<Self>> {
83        let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS)
84            || extensions.contains_key(FLOW_INCREMENTAL_MODE)
85            || extensions.contains_key(FLOW_RETURN_REGION_SEQ)
86            || extensions.contains_key(FLOW_SINK_TABLE_ID);
87
88        if !has_flow_context {
89            return Ok(None);
90        }
91
92        let incremental_mode = extensions
93            .get(FLOW_INCREMENTAL_MODE)
94            .map(|value| match value.as_str() {
95                v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => {
96                    Ok(FlowIncrementalMode::MemtableOnly)
97                }
98                _ => Err(invalid_query_context_extension(format!(
99                    "Invalid value for {}: {}",
100                    FLOW_INCREMENTAL_MODE, value
101                ))),
102            })
103            .transpose()?;
104
105        let incremental_after_seqs = extensions
106            .get(FLOW_INCREMENTAL_AFTER_SEQS)
107            .map(|value| parse_incremental_after_seqs(value.as_str()))
108            .transpose()?;
109
110        let return_region_seq = extensions
111            .get(FLOW_RETURN_REGION_SEQ)
112            .map(|value| parse_bool(FLOW_RETURN_REGION_SEQ, value.as_str()))
113            .transpose()?
114            .unwrap_or(false);
115
116        let sink_table_id = extensions
117            .get(FLOW_SINK_TABLE_ID)
118            .map(|value| {
119                value.parse::<TableId>().map_err(|_| {
120                    invalid_query_context_extension(format!(
121                        "Invalid value for {}: {}",
122                        FLOW_SINK_TABLE_ID, value
123                    ))
124                })
125            })
126            .transpose()?;
127
128        if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) {
129            let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| {
130                invalid_query_context_extension(format!(
131                    "{} is required when {}={}.",
132                    FLOW_INCREMENTAL_AFTER_SEQS,
133                    FLOW_INCREMENTAL_MODE,
134                    FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
135                ))
136            })?;
137            if after_seqs.is_empty() {
138                return Err(invalid_query_context_extension(format!(
139                    "{} must not be empty when {}={}.",
140                    FLOW_INCREMENTAL_AFTER_SEQS,
141                    FLOW_INCREMENTAL_MODE,
142                    FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
143                )));
144            }
145        }
146
147        Ok(Some(Self {
148            incremental_after_seqs,
149            incremental_mode,
150            return_region_seq,
151            sink_table_id,
152        }))
153    }
154
155    pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
156        if self.sink_table_id.is_some() && self.sink_table_id == Some(source_region_id.table_id()) {
157            return Ok(false);
158        }
159
160        if matches!(
161            self.incremental_mode,
162            Some(FlowIncrementalMode::MemtableOnly)
163        ) {
164            let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| {
165                invalid_query_context_extension(format!(
166                    "{} is required when {}=memtable_only.",
167                    FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
168                ))
169            })?;
170
171            if !after_seqs.contains_key(&source_region_id.as_u64()) {
172                return Err(invalid_query_context_extension(format!(
173                    "Missing region {} in {} when {}=memtable_only.",
174                    source_region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
175                )));
176            }
177        }
178
179        Ok(self.incremental_after_seqs.is_some())
180    }
181
182    pub fn should_collect_region_watermark(&self) -> bool {
183        should_collect_region_watermark(
184            self.return_region_seq,
185            self.incremental_after_seqs.is_some(),
186        )
187    }
188}
189
190/// Returns whether query-level remote dynamic filter propagation is enabled.
191///
192/// The option defaults to enabled to preserve existing behavior. Callers may set
193/// `query.enable_remote_dynamic_filter_pushdown=false` in query context
194/// extensions to disable FE->DN remote dynamic filter propagation for a single
195/// query.
196pub fn remote_dyn_filter_pushdown_enabled_from_extensions(
197    extensions: &HashMap<String, String>,
198) -> Result<bool> {
199    extensions
200        .get(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN)
201        .map(|value| parse_bool(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, value.as_str()))
202        .transpose()
203        .map(|value| value.unwrap_or(true))
204}
205
206/// Returns whether raw Flow query extensions request terminal region watermark collection.
207///
208/// This is only an intent/presence check for transport/scan plumbing; callers that need
209/// validated Flow options must still use [`FlowQueryExtensions::parse_flow_extensions`].
210pub fn should_collect_region_watermark_from_extensions(
211    extensions: &HashMap<String, String>,
212) -> bool {
213    let return_region_seq = extensions
214        .get(FLOW_RETURN_REGION_SEQ)
215        .is_some_and(|value| value.eq_ignore_ascii_case("true"));
216    let has_incremental_after_seqs = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS);
217
218    should_collect_region_watermark(return_region_seq, has_incremental_after_seqs)
219}
220
221fn should_collect_region_watermark(
222    return_region_seq: bool,
223    has_incremental_after_seqs: bool,
224) -> bool {
225    return_region_seq || has_incremental_after_seqs
226}
227
228fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
229    let raw = serde_json::from_str::<HashMap<String, serde_json::Value>>(value).map_err(|e| {
230        invalid_query_context_extension(format!(
231            "Invalid JSON for {}: {} ({})",
232            FLOW_INCREMENTAL_AFTER_SEQS, value, e
233        ))
234    })?;
235
236    raw.into_iter()
237        .map(|(region_id, raw_seq)| {
238            let region_id = region_id.parse::<u64>().map_err(|_| {
239                invalid_query_context_extension(format!(
240                    "Invalid region id in {}: {}",
241                    FLOW_INCREMENTAL_AFTER_SEQS, region_id
242                ))
243            })?;
244
245            let seq = match raw_seq {
246                serde_json::Value::Number(num) => num.as_u64().ok_or_else(|| {
247                    invalid_query_context_extension(format!(
248                        "Invalid sequence value in {} for region {}: {}",
249                        FLOW_INCREMENTAL_AFTER_SEQS, region_id, num
250                    ))
251                })?,
252                serde_json::Value::String(s) => s.parse::<u64>().map_err(|_| {
253                    invalid_query_context_extension(format!(
254                        "Invalid sequence string in {} for region {}: {}",
255                        FLOW_INCREMENTAL_AFTER_SEQS, region_id, s
256                    ))
257                })?,
258                _ => {
259                    return Err(invalid_query_context_extension(format!(
260                        "Invalid sequence value type in {} for region {}",
261                        FLOW_INCREMENTAL_AFTER_SEQS, region_id
262                    )));
263                }
264            };
265
266            Ok((region_id, seq))
267        })
268        .collect()
269}
270
271fn parse_bool(option_name: &str, value: &str) -> Result<bool> {
272    match value {
273        v if v.eq_ignore_ascii_case("true") => Ok(true),
274        v if v.eq_ignore_ascii_case("false") => Ok(false),
275        _ => Err(invalid_query_context_extension(format!(
276            "Invalid value for {}: {}",
277            option_name, value
278        ))),
279    }
280}
281
282fn invalid_query_context_extension(reason: String) -> Error {
283    InvalidQueryContextExtensionSnafu { reason }.build()
284}
285
286#[cfg(test)]
287mod flow_extension_tests {
288    use super::*;
289
290    #[test]
291    fn test_parse_flow_extensions_returns_none_for_non_flow_query() {
292        let exts = HashMap::new();
293        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap();
294
295        assert_eq!(parsed, None);
296    }
297
298    #[test]
299    fn test_remote_dyn_filter_pushdown_enabled_from_extensions_defaults_true() {
300        assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&HashMap::new()).unwrap());
301    }
302
303    #[test]
304    fn test_remote_dyn_filter_pushdown_enabled_from_extensions_parses_bool() {
305        let exts = HashMap::from([(
306            QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
307            "false".to_string(),
308        )]);
309        assert!(!remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
310
311        let exts = HashMap::from([(
312            QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
313            "true".to_string(),
314        )]);
315        assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
316    }
317
318    #[test]
319    fn test_remote_dyn_filter_pushdown_enabled_from_extensions_rejects_invalid_bool() {
320        let exts = HashMap::from([(
321            QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
322            "invalid".to_string(),
323        )]);
324
325        let err = remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap_err();
326        assert!(format!("{err}").contains(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN));
327    }
328
329    #[test]
330    fn test_parse_flow_extensions_memtable_only_success() {
331        let exts = HashMap::from([
332            (
333                FLOW_INCREMENTAL_MODE.to_string(),
334                FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
335            ),
336            (
337                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
338                r#"{"1":10,"2":20}"#.to_string(),
339            ),
340            (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
341            (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
342        ]);
343
344        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
345            .unwrap()
346            .unwrap();
347        assert_eq!(
348            parsed.incremental_mode,
349            Some(FlowIncrementalMode::MemtableOnly)
350        );
351        assert_eq!(
352            parsed.incremental_after_seqs.unwrap(),
353            HashMap::from([(1, 10), (2, 20)])
354        );
355        assert!(parsed.return_region_seq);
356        assert_eq!(parsed.sink_table_id, Some(1024));
357    }
358
359    #[test]
360    fn test_parse_flow_extensions_mode_requires_after_seqs() {
361        let exts = HashMap::from([(
362            FLOW_INCREMENTAL_MODE.to_string(),
363            FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
364        )]);
365
366        let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
367        assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
368    }
369
370    #[test]
371    fn test_parse_flow_extensions_invalid_mode() {
372        let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
373
374        let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
375        assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
376    }
377
378    #[test]
379    fn test_parse_flow_extensions_invalid_after_seqs_json() {
380        let exts = HashMap::from([
381            (
382                FLOW_INCREMENTAL_MODE.to_string(),
383                FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
384            ),
385            (
386                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
387                "not-json".to_string(),
388            ),
389        ]);
390
391        let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
392        assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
393    }
394
395    #[test]
396    fn test_parse_flow_extensions_after_seqs_string_values() {
397        let exts = HashMap::from([(
398            FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
399            r#"{"1":"10","2":"20"}"#.to_string(),
400        )]);
401
402        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
403            .unwrap()
404            .unwrap();
405        assert_eq!(
406            parsed.incremental_after_seqs.unwrap(),
407            HashMap::from([(1, 10), (2, 20)])
408        );
409    }
410
411    #[test]
412    fn test_parse_flow_extensions_after_seqs_invalid_value_type() {
413        let exts = HashMap::from([(
414            FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
415            r#"{"1":true}"#.to_string(),
416        )]);
417
418        let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
419        assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
420    }
421
422    #[test]
423    fn test_parse_flow_extensions_invalid_sink_table_id() {
424        let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
425
426        let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
427        assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
428    }
429
430    #[test]
431    fn test_validate_for_scan_missing_source_region() {
432        let source_region_id = RegionId::new(100, 2);
433        let existing_region_id = RegionId::new(100, 1);
434        let exts = HashMap::from([
435            (
436                FLOW_INCREMENTAL_MODE.to_string(),
437                FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
438            ),
439            (
440                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
441                format!(r#"{{"{}":10}}"#, existing_region_id.as_u64()),
442            ),
443        ]);
444
445        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
446            .unwrap()
447            .unwrap();
448        let err = parsed.validate_for_scan(source_region_id).unwrap_err();
449        assert!(format!("{err}").contains("Missing region"));
450    }
451
452    #[test]
453    fn test_validate_for_scan_sink_table_excluded() {
454        let source_region_id = RegionId::new(1024, 1);
455        let exts = HashMap::from([
456            (
457                FLOW_INCREMENTAL_MODE.to_string(),
458                FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
459            ),
460            (
461                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
462                format!(r#"{{"{}":10}}"#, source_region_id.as_u64()),
463            ),
464            (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
465        ]);
466
467        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
468            .unwrap()
469            .unwrap();
470        let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
471        assert!(!apply_incremental);
472    }
473
474    #[test]
475    fn test_should_collect_region_watermark_defaults_false() {
476        let parsed = FlowQueryExtensions::default();
477        assert!(!parsed.should_collect_region_watermark());
478    }
479
480    #[test]
481    fn test_should_collect_region_watermark_true_for_return_region_seq() {
482        let parsed = FlowQueryExtensions {
483            return_region_seq: true,
484            ..Default::default()
485        };
486        assert!(parsed.should_collect_region_watermark());
487    }
488
489    #[test]
490    fn test_should_collect_region_watermark_true_for_incremental_query() {
491        let parsed = FlowQueryExtensions {
492            incremental_after_seqs: Some(HashMap::from([(1, 10)])),
493            ..Default::default()
494        };
495        assert!(parsed.should_collect_region_watermark());
496    }
497
498    #[test]
499    fn test_should_collect_region_watermark_from_extensions() {
500        let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
501        assert!(should_collect_region_watermark_from_extensions(&exts));
502
503        let exts = HashMap::from([(
504            FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
505            r#"{"1":10}"#.to_string(),
506        )]);
507        assert!(should_collect_region_watermark_from_extensions(&exts));
508
509        let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "false".to_string())]);
510        assert!(!should_collect_region_watermark_from_extensions(&exts));
511        assert!(!should_collect_region_watermark_from_extensions(
512            &HashMap::new()
513        ));
514    }
515
516    #[test]
517    fn test_parse_flow_extensions_return_region_seq_only_returns_some() {
518        let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
519
520        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
521            .unwrap()
522            .unwrap();
523
524        assert!(parsed.return_region_seq);
525    }
526
527    #[test]
528    fn test_parse_flow_extensions_sink_table_only_returns_some() {
529        let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]);
530
531        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
532            .unwrap()
533            .unwrap();
534
535        assert_eq!(parsed.sink_table_id, Some(1024));
536    }
537
538    #[test]
539    fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() {
540        let exts = HashMap::from([(
541            FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
542            r#"{"1":10}"#.to_string(),
543        )]);
544
545        let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
546            .unwrap()
547            .unwrap();
548
549        assert_eq!(
550            parsed.incremental_after_seqs,
551            Some(HashMap::from([(1, 10)]))
552        );
553    }
554}