1use std::collections::HashMap;
16
17use common_base::memory_limit::MemoryLimit;
18use serde::{Deserialize, Serialize};
19use store_api::storage::RegionId;
20use table::metadata::TableId;
21
22use crate::error::{Error, InvalidQueryContextExtensionSnafu, Result};
23
24pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
25pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
26pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
27pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
28pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str =
30 "query.enable_remote_dynamic_filter_pushdown";
31
32pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(default)]
37pub struct QueryOptions {
38 pub parallelism: usize,
40 pub allow_query_fallback: bool,
42 pub memory_pool_size: MemoryLimit,
46}
47
48#[allow(clippy::derivable_impls)]
49impl Default for QueryOptions {
50 fn default() -> Self {
51 Self {
52 parallelism: 0,
53 allow_query_fallback: false,
54 memory_pool_size: MemoryLimit::default(),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum FlowIncrementalMode {
61 MemtableOnly,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Default)]
65pub struct FlowQueryExtensions {
66 pub incremental_after_seqs: Option<HashMap<u64, u64>>,
68 pub incremental_mode: Option<FlowIncrementalMode>,
70 pub return_region_seq: bool,
72 pub sink_table_id: Option<TableId>,
74}
75
76impl FlowQueryExtensions {
77 pub fn parse_flow_extensions(extensions: &HashMap<String, String>) -> Result<Option<Self>> {
83 let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS)
84 || extensions.contains_key(FLOW_INCREMENTAL_MODE)
85 || extensions.contains_key(FLOW_RETURN_REGION_SEQ)
86 || extensions.contains_key(FLOW_SINK_TABLE_ID);
87
88 if !has_flow_context {
89 return Ok(None);
90 }
91
92 let incremental_mode = extensions
93 .get(FLOW_INCREMENTAL_MODE)
94 .map(|value| match value.as_str() {
95 v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => {
96 Ok(FlowIncrementalMode::MemtableOnly)
97 }
98 _ => Err(invalid_query_context_extension(format!(
99 "Invalid value for {}: {}",
100 FLOW_INCREMENTAL_MODE, value
101 ))),
102 })
103 .transpose()?;
104
105 let incremental_after_seqs = extensions
106 .get(FLOW_INCREMENTAL_AFTER_SEQS)
107 .map(|value| parse_incremental_after_seqs(value.as_str()))
108 .transpose()?;
109
110 let return_region_seq = extensions
111 .get(FLOW_RETURN_REGION_SEQ)
112 .map(|value| parse_bool(FLOW_RETURN_REGION_SEQ, value.as_str()))
113 .transpose()?
114 .unwrap_or(false);
115
116 let sink_table_id = extensions
117 .get(FLOW_SINK_TABLE_ID)
118 .map(|value| {
119 value.parse::<TableId>().map_err(|_| {
120 invalid_query_context_extension(format!(
121 "Invalid value for {}: {}",
122 FLOW_SINK_TABLE_ID, value
123 ))
124 })
125 })
126 .transpose()?;
127
128 if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) {
129 let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| {
130 invalid_query_context_extension(format!(
131 "{} is required when {}={}.",
132 FLOW_INCREMENTAL_AFTER_SEQS,
133 FLOW_INCREMENTAL_MODE,
134 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
135 ))
136 })?;
137 if after_seqs.is_empty() {
138 return Err(invalid_query_context_extension(format!(
139 "{} must not be empty when {}={}.",
140 FLOW_INCREMENTAL_AFTER_SEQS,
141 FLOW_INCREMENTAL_MODE,
142 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
143 )));
144 }
145 }
146
147 Ok(Some(Self {
148 incremental_after_seqs,
149 incremental_mode,
150 return_region_seq,
151 sink_table_id,
152 }))
153 }
154
155 pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
156 if self.sink_table_id.is_some() && self.sink_table_id == Some(source_region_id.table_id()) {
157 return Ok(false);
158 }
159
160 if matches!(
161 self.incremental_mode,
162 Some(FlowIncrementalMode::MemtableOnly)
163 ) {
164 let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| {
165 invalid_query_context_extension(format!(
166 "{} is required when {}=memtable_only.",
167 FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
168 ))
169 })?;
170
171 if !after_seqs.contains_key(&source_region_id.as_u64()) {
172 return Err(invalid_query_context_extension(format!(
173 "Missing region {} in {} when {}=memtable_only.",
174 source_region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
175 )));
176 }
177 }
178
179 Ok(self.incremental_after_seqs.is_some())
180 }
181
182 pub fn should_collect_region_watermark(&self) -> bool {
183 should_collect_region_watermark(
184 self.return_region_seq,
185 self.incremental_after_seqs.is_some(),
186 )
187 }
188}
189
190pub fn remote_dyn_filter_pushdown_enabled_from_extensions(
197 extensions: &HashMap<String, String>,
198) -> Result<bool> {
199 extensions
200 .get(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN)
201 .map(|value| parse_bool(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, value.as_str()))
202 .transpose()
203 .map(|value| value.unwrap_or(true))
204}
205
206pub fn should_collect_region_watermark_from_extensions(
211 extensions: &HashMap<String, String>,
212) -> bool {
213 let return_region_seq = extensions
214 .get(FLOW_RETURN_REGION_SEQ)
215 .is_some_and(|value| value.eq_ignore_ascii_case("true"));
216 let has_incremental_after_seqs = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS);
217
218 should_collect_region_watermark(return_region_seq, has_incremental_after_seqs)
219}
220
221fn should_collect_region_watermark(
222 return_region_seq: bool,
223 has_incremental_after_seqs: bool,
224) -> bool {
225 return_region_seq || has_incremental_after_seqs
226}
227
228fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
229 let raw = serde_json::from_str::<HashMap<String, serde_json::Value>>(value).map_err(|e| {
230 invalid_query_context_extension(format!(
231 "Invalid JSON for {}: {} ({})",
232 FLOW_INCREMENTAL_AFTER_SEQS, value, e
233 ))
234 })?;
235
236 raw.into_iter()
237 .map(|(region_id, raw_seq)| {
238 let region_id = region_id.parse::<u64>().map_err(|_| {
239 invalid_query_context_extension(format!(
240 "Invalid region id in {}: {}",
241 FLOW_INCREMENTAL_AFTER_SEQS, region_id
242 ))
243 })?;
244
245 let seq = match raw_seq {
246 serde_json::Value::Number(num) => num.as_u64().ok_or_else(|| {
247 invalid_query_context_extension(format!(
248 "Invalid sequence value in {} for region {}: {}",
249 FLOW_INCREMENTAL_AFTER_SEQS, region_id, num
250 ))
251 })?,
252 serde_json::Value::String(s) => s.parse::<u64>().map_err(|_| {
253 invalid_query_context_extension(format!(
254 "Invalid sequence string in {} for region {}: {}",
255 FLOW_INCREMENTAL_AFTER_SEQS, region_id, s
256 ))
257 })?,
258 _ => {
259 return Err(invalid_query_context_extension(format!(
260 "Invalid sequence value type in {} for region {}",
261 FLOW_INCREMENTAL_AFTER_SEQS, region_id
262 )));
263 }
264 };
265
266 Ok((region_id, seq))
267 })
268 .collect()
269}
270
271fn parse_bool(option_name: &str, value: &str) -> Result<bool> {
272 match value {
273 v if v.eq_ignore_ascii_case("true") => Ok(true),
274 v if v.eq_ignore_ascii_case("false") => Ok(false),
275 _ => Err(invalid_query_context_extension(format!(
276 "Invalid value for {}: {}",
277 option_name, value
278 ))),
279 }
280}
281
282fn invalid_query_context_extension(reason: String) -> Error {
283 InvalidQueryContextExtensionSnafu { reason }.build()
284}
285
286#[cfg(test)]
287mod flow_extension_tests {
288 use super::*;
289
290 #[test]
291 fn test_parse_flow_extensions_returns_none_for_non_flow_query() {
292 let exts = HashMap::new();
293 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap();
294
295 assert_eq!(parsed, None);
296 }
297
298 #[test]
299 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_defaults_true() {
300 assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&HashMap::new()).unwrap());
301 }
302
303 #[test]
304 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_parses_bool() {
305 let exts = HashMap::from([(
306 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
307 "false".to_string(),
308 )]);
309 assert!(!remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
310
311 let exts = HashMap::from([(
312 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
313 "true".to_string(),
314 )]);
315 assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
316 }
317
318 #[test]
319 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_rejects_invalid_bool() {
320 let exts = HashMap::from([(
321 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
322 "invalid".to_string(),
323 )]);
324
325 let err = remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap_err();
326 assert!(format!("{err}").contains(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN));
327 }
328
329 #[test]
330 fn test_parse_flow_extensions_memtable_only_success() {
331 let exts = HashMap::from([
332 (
333 FLOW_INCREMENTAL_MODE.to_string(),
334 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
335 ),
336 (
337 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
338 r#"{"1":10,"2":20}"#.to_string(),
339 ),
340 (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
341 (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
342 ]);
343
344 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
345 .unwrap()
346 .unwrap();
347 assert_eq!(
348 parsed.incremental_mode,
349 Some(FlowIncrementalMode::MemtableOnly)
350 );
351 assert_eq!(
352 parsed.incremental_after_seqs.unwrap(),
353 HashMap::from([(1, 10), (2, 20)])
354 );
355 assert!(parsed.return_region_seq);
356 assert_eq!(parsed.sink_table_id, Some(1024));
357 }
358
359 #[test]
360 fn test_parse_flow_extensions_mode_requires_after_seqs() {
361 let exts = HashMap::from([(
362 FLOW_INCREMENTAL_MODE.to_string(),
363 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
364 )]);
365
366 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
367 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
368 }
369
370 #[test]
371 fn test_parse_flow_extensions_invalid_mode() {
372 let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
373
374 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
375 assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
376 }
377
378 #[test]
379 fn test_parse_flow_extensions_invalid_after_seqs_json() {
380 let exts = HashMap::from([
381 (
382 FLOW_INCREMENTAL_MODE.to_string(),
383 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
384 ),
385 (
386 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
387 "not-json".to_string(),
388 ),
389 ]);
390
391 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
392 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
393 }
394
395 #[test]
396 fn test_parse_flow_extensions_after_seqs_string_values() {
397 let exts = HashMap::from([(
398 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
399 r#"{"1":"10","2":"20"}"#.to_string(),
400 )]);
401
402 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
403 .unwrap()
404 .unwrap();
405 assert_eq!(
406 parsed.incremental_after_seqs.unwrap(),
407 HashMap::from([(1, 10), (2, 20)])
408 );
409 }
410
411 #[test]
412 fn test_parse_flow_extensions_after_seqs_invalid_value_type() {
413 let exts = HashMap::from([(
414 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
415 r#"{"1":true}"#.to_string(),
416 )]);
417
418 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
419 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
420 }
421
422 #[test]
423 fn test_parse_flow_extensions_invalid_sink_table_id() {
424 let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
425
426 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
427 assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
428 }
429
430 #[test]
431 fn test_validate_for_scan_missing_source_region() {
432 let source_region_id = RegionId::new(100, 2);
433 let existing_region_id = RegionId::new(100, 1);
434 let exts = HashMap::from([
435 (
436 FLOW_INCREMENTAL_MODE.to_string(),
437 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
438 ),
439 (
440 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
441 format!(r#"{{"{}":10}}"#, existing_region_id.as_u64()),
442 ),
443 ]);
444
445 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
446 .unwrap()
447 .unwrap();
448 let err = parsed.validate_for_scan(source_region_id).unwrap_err();
449 assert!(format!("{err}").contains("Missing region"));
450 }
451
452 #[test]
453 fn test_validate_for_scan_sink_table_excluded() {
454 let source_region_id = RegionId::new(1024, 1);
455 let exts = HashMap::from([
456 (
457 FLOW_INCREMENTAL_MODE.to_string(),
458 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
459 ),
460 (
461 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
462 format!(r#"{{"{}":10}}"#, source_region_id.as_u64()),
463 ),
464 (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
465 ]);
466
467 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
468 .unwrap()
469 .unwrap();
470 let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
471 assert!(!apply_incremental);
472 }
473
474 #[test]
475 fn test_should_collect_region_watermark_defaults_false() {
476 let parsed = FlowQueryExtensions::default();
477 assert!(!parsed.should_collect_region_watermark());
478 }
479
480 #[test]
481 fn test_should_collect_region_watermark_true_for_return_region_seq() {
482 let parsed = FlowQueryExtensions {
483 return_region_seq: true,
484 ..Default::default()
485 };
486 assert!(parsed.should_collect_region_watermark());
487 }
488
489 #[test]
490 fn test_should_collect_region_watermark_true_for_incremental_query() {
491 let parsed = FlowQueryExtensions {
492 incremental_after_seqs: Some(HashMap::from([(1, 10)])),
493 ..Default::default()
494 };
495 assert!(parsed.should_collect_region_watermark());
496 }
497
498 #[test]
499 fn test_should_collect_region_watermark_from_extensions() {
500 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
501 assert!(should_collect_region_watermark_from_extensions(&exts));
502
503 let exts = HashMap::from([(
504 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
505 r#"{"1":10}"#.to_string(),
506 )]);
507 assert!(should_collect_region_watermark_from_extensions(&exts));
508
509 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "false".to_string())]);
510 assert!(!should_collect_region_watermark_from_extensions(&exts));
511 assert!(!should_collect_region_watermark_from_extensions(
512 &HashMap::new()
513 ));
514 }
515
516 #[test]
517 fn test_parse_flow_extensions_return_region_seq_only_returns_some() {
518 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
519
520 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
521 .unwrap()
522 .unwrap();
523
524 assert!(parsed.return_region_seq);
525 }
526
527 #[test]
528 fn test_parse_flow_extensions_sink_table_only_returns_some() {
529 let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]);
530
531 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
532 .unwrap()
533 .unwrap();
534
535 assert_eq!(parsed.sink_table_id, Some(1024));
536 }
537
538 #[test]
539 fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() {
540 let exts = HashMap::from([(
541 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
542 r#"{"1":10}"#.to_string(),
543 )]);
544
545 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
546 .unwrap()
547 .unwrap();
548
549 assert_eq!(
550 parsed.incremental_after_seqs,
551 Some(HashMap::from([(1, 10)]))
552 );
553 }
554}