1use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{
39 RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorSearchRequest,
40};
41use table::TableRef;
42use table::metadata::{TableId, TableInfoRef};
43use table::table::scan::RegionScanExec;
44
45use crate::error::{GetRegionMetadataSnafu, Result};
46use crate::options::FlowQueryExtensions;
47
48#[derive(Clone, Debug)]
50pub struct DummyCatalogList {
51 catalog: DummyCatalogProvider,
52}
53
54impl DummyCatalogList {
55 pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
57 let schema_provider = DummySchemaProvider {
58 table: table_provider,
59 };
60 let catalog_provider = DummyCatalogProvider {
61 schema: schema_provider,
62 };
63 Self {
64 catalog: catalog_provider,
65 }
66 }
67}
68
69impl CatalogProviderList for DummyCatalogList {
70 fn as_any(&self) -> &dyn Any {
71 self
72 }
73
74 fn register_catalog(
75 &self,
76 _name: String,
77 _catalog: Arc<dyn CatalogProvider>,
78 ) -> Option<Arc<dyn CatalogProvider>> {
79 None
80 }
81
82 fn catalog_names(&self) -> Vec<String> {
83 vec![]
84 }
85
86 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
87 Some(Arc::new(self.catalog.clone()))
88 }
89}
90
91#[derive(Clone, Debug)]
93struct DummyCatalogProvider {
94 schema: DummySchemaProvider,
95}
96
97impl CatalogProvider for DummyCatalogProvider {
98 fn as_any(&self) -> &dyn Any {
99 self
100 }
101
102 fn schema_names(&self) -> Vec<String> {
103 vec![]
104 }
105
106 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
107 Some(Arc::new(self.schema.clone()))
108 }
109}
110
111#[derive(Clone, Debug)]
113struct DummySchemaProvider {
114 table: Arc<dyn TableProvider>,
115}
116
117#[async_trait]
118impl SchemaProvider for DummySchemaProvider {
119 fn as_any(&self) -> &dyn Any {
120 self
121 }
122
123 fn table_names(&self) -> Vec<String> {
124 vec![]
125 }
126
127 async fn table(
128 &self,
129 _name: &str,
130 ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
131 Ok(Some(self.table.clone()))
132 }
133
134 fn table_exist(&self, _name: &str) -> bool {
135 true
136 }
137}
138
139#[derive(Clone)]
141pub struct DummyTableProvider {
142 region_id: RegionId,
143 engine: RegionEngineRef,
144 metadata: RegionMetadataRef,
145 scan_request: Arc<Mutex<ScanRequest>>,
147 query_ctx: Option<QueryContextRef>,
148}
149
150impl fmt::Debug for DummyTableProvider {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 f.debug_struct("DummyTableProvider")
153 .field("region_id", &self.region_id)
154 .field("metadata", &self.metadata)
155 .field("scan_request", &self.scan_request)
156 .finish()
157 }
158}
159
160#[async_trait]
161impl TableProvider for DummyTableProvider {
162 fn as_any(&self) -> &dyn Any {
163 self
164 }
165
166 fn schema(&self) -> SchemaRef {
167 self.metadata.schema.arrow_schema().clone()
168 }
169
170 fn table_type(&self) -> TableType {
171 TableType::Base
172 }
173
174 async fn scan(
175 &self,
176 _state: &dyn Session,
177 projection: Option<&Vec<usize>>,
178 filters: &[Expr],
179 limit: Option<usize>,
180 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
181 let mut request = self.scan_request.lock().unwrap().clone();
182 request.projection_input = projection.map(|p| p.clone().into());
183 request.filters = filters.to_vec();
184 request.limit = limit;
185
186 if let Some(query_ctx) = &self.query_ctx {
187 let is_sink_scan = is_sink_scan(query_ctx, self.region_id)
188 .map_err(|e| DataFusionError::External(Box::new(e)))?;
189 apply_cached_snapshot_to_request(query_ctx, self.region_id, is_sink_scan, &mut request);
190 }
191
192 let scanner = self
193 .engine
194 .handle_query(self.region_id, request.clone())
195 .await
196 .map_err(|e| DataFusionError::External(Box::new(e)))?;
197
198 if request.snapshot_on_scan
199 && let Some(query_ctx) = &self.query_ctx
200 && let Some(snapshot_sequence) = scanner.snapshot_sequence()
201 {
202 bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence)
203 .map_err(|e| DataFusionError::External(Box::new(e)))?;
204 }
205
206 let query_memory_tracker = self.engine.query_memory_tracker();
207 let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?;
208 if let Some(query_ctx) = &self.query_ctx {
209 scan_exec.set_explain_verbose(query_ctx.explain_verbose());
210 }
211 Ok(Arc::new(scan_exec))
212 }
213
214 fn supports_filters_pushdown(
215 &self,
216 filters: &[&Expr],
217 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
218 let supported = filters
219 .iter()
220 .map(|e| {
221 if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
223 if self
224 .metadata
225 .column_by_name(simple_filter.column_name())
226 .and_then(|c| {
227 (c.semantic_type == SemanticType::Tag
228 || c.semantic_type == SemanticType::Timestamp)
229 .then_some(())
230 })
231 .is_some()
232 {
233 TableProviderFilterPushDown::Exact
234 } else {
235 TableProviderFilterPushDown::Inexact
236 }
237 } else {
238 TableProviderFilterPushDown::Inexact
239 }
240 })
241 .collect();
242 Ok(supported)
243 }
244}
245
246impl DummyTableProvider {
247 pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
249 Self {
250 region_id,
251 engine,
252 metadata,
253 scan_request: Default::default(),
254 query_ctx: None,
255 }
256 }
257
258 pub fn region_metadata(&self) -> RegionMetadataRef {
259 self.metadata.clone()
260 }
261
262 pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
264 self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
265 }
266
267 pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
269 self.scan_request.lock().unwrap().distribution = Some(distribution);
270 }
271
272 pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
274 self.scan_request.lock().unwrap().series_row_selector = Some(selector);
275 }
276
277 pub fn with_vector_search_hint(&self, hint: VectorSearchRequest) {
278 self.scan_request.lock().unwrap().vector_search = Some(hint);
279 }
280
281 pub fn get_vector_search_hint(&self) -> Option<VectorSearchRequest> {
282 self.scan_request.lock().unwrap().vector_search.clone()
283 }
284
285 pub fn with_sequence(&self, sequence: u64) {
286 self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
287 }
288
289 #[cfg(test)]
291 pub fn scan_request(&self) -> ScanRequest {
292 self.scan_request.lock().unwrap().clone()
293 }
294}
295
296pub struct DummyTableProviderFactory;
297
298impl DummyTableProviderFactory {
299 pub async fn create_table_provider(
300 &self,
301 region_id: RegionId,
302 engine: RegionEngineRef,
303 query_ctx: Option<QueryContextRef>,
304 ) -> Result<DummyTableProvider> {
305 let metadata =
306 engine
307 .get_metadata(region_id)
308 .await
309 .with_context(|_| GetRegionMetadataSnafu {
310 engine: engine.name(),
311 region_id,
312 })?;
313
314 let scan_request = if let Some(ctx) = query_ctx.as_ref() {
315 scan_request_from_query_context(region_id, ctx)?
316 } else {
317 ScanRequest::default()
318 };
319
320 Ok(DummyTableProvider {
321 region_id,
322 engine,
323 metadata,
324 scan_request: Arc::new(Mutex::new(scan_request)),
325 query_ctx,
326 })
327 }
328}
329
330fn scan_request_from_query_context(
331 region_id: RegionId,
332 query_ctx: &QueryContext,
333) -> Result<ScanRequest> {
334 let decision = decide_flow_scan(query_ctx, region_id)?;
335 Ok(build_scan_request(query_ctx, region_id, &decision))
336}
337
338#[derive(Debug, Clone, PartialEq, Eq)]
339struct FlowScanDecision {
340 is_sink_scan: bool,
343 snapshot_on_scan: bool,
347 memtable_min_sequence: Option<u64>,
350 memtable_max_sequence: Option<u64>,
354}
355
356impl FlowScanDecision {
357 fn plain_scan() -> Self {
358 Self {
359 is_sink_scan: true,
360 snapshot_on_scan: false,
361 memtable_min_sequence: None,
362 memtable_max_sequence: None,
363 }
364 }
365}
366
367fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
368 let Some(flow_extensions) =
369 FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
370 else {
371 return Ok(FlowScanDecision {
372 is_sink_scan: false,
373 snapshot_on_scan: false,
374 memtable_min_sequence: None,
375 memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
376 });
377 };
378
379 if flow_extensions.sink_table_id == Some(region_id.table_id()) {
383 return Ok(FlowScanDecision::plain_scan());
384 }
385
386 let apply_incremental = flow_extensions.validate_for_scan(region_id)?;
387
388 let memtable_min_sequence = if apply_incremental {
389 flow_extensions
390 .incremental_after_seqs
391 .as_ref()
392 .and_then(|seqs| seqs.get(®ion_id.as_u64()))
393 .copied()
394 } else {
395 None
396 };
397
398 let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64());
399
400 Ok(FlowScanDecision {
401 is_sink_scan: false,
402 snapshot_on_scan: memtable_max_sequence.is_none()
403 && flow_extensions.should_collect_region_watermark(),
404 memtable_min_sequence,
405 memtable_max_sequence,
406 })
407}
408
409fn build_scan_request(
410 query_ctx: &QueryContext,
411 region_id: RegionId,
412 decision: &FlowScanDecision,
413) -> ScanRequest {
414 ScanRequest {
418 sst_min_sequence: (!decision.is_sink_scan)
419 .then(|| query_ctx.sst_min_sequence(region_id.as_u64()))
420 .flatten(),
421 snapshot_on_scan: decision.snapshot_on_scan,
422 memtable_min_sequence: decision.memtable_min_sequence,
423 memtable_max_sequence: decision.memtable_max_sequence,
424 ..Default::default()
425 }
426}
427
428fn is_sink_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<bool> {
429 Ok(
430 FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
431 .is_some_and(|exts| exts.sink_table_id == Some(region_id.table_id())),
432 )
433}
434
435fn apply_cached_snapshot_to_request(
436 query_ctx: &QueryContext,
437 region_id: RegionId,
438 is_sink_scan: bool,
439 scan_request: &mut ScanRequest,
440) {
441 if is_sink_scan {
442 return;
443 }
444
445 if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) {
446 scan_request.memtable_max_sequence = Some(snapshot_sequence);
451 scan_request.snapshot_on_scan = false;
452 }
453}
454
455fn bind_snapshot_bound_region_seq(
456 query_ctx: &QueryContext,
457 region_id: RegionId,
458 snapshot_sequence: u64,
459) -> Result<u64> {
460 if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) {
461 if existing != snapshot_sequence {
462 return crate::error::ConflictingSnapshotSequenceSnafu {
463 region_id,
464 existing,
465 new: snapshot_sequence,
466 }
467 .fail();
468 }
469 Ok(existing)
470 } else {
471 query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence);
472 Ok(snapshot_sequence)
473 }
474}
475
476#[async_trait]
477impl TableProviderFactory for DummyTableProviderFactory {
478 async fn create(
479 &self,
480 region_id: RegionId,
481 engine: RegionEngineRef,
482 ctx: Option<QueryContextRef>,
483 ) -> Result<Arc<dyn TableProvider>> {
484 let provider = self.create_table_provider(region_id, engine, ctx).await?;
485 Ok(Arc::new(provider))
486 }
487}
488
489#[async_trait]
490pub trait TableProviderFactory: Send + Sync {
491 async fn create(
492 &self,
493 region_id: RegionId,
494 engine: RegionEngineRef,
495 ctx: Option<QueryContextRef>,
496 ) -> Result<Arc<dyn TableProvider>>;
497}
498
499pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
500
501pub struct DummyCatalogManager;
505
506impl DummyCatalogManager {
507 pub fn arc() -> CatalogManagerRef {
509 Arc::new(Self)
510 }
511}
512
513#[async_trait::async_trait]
514impl CatalogManager for DummyCatalogManager {
515 fn as_any(&self) -> &dyn Any {
516 self
517 }
518
519 async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
520 Ok(vec![])
521 }
522
523 async fn schema_names(
524 &self,
525 _catalog: &str,
526 _query_ctx: Option<&QueryContext>,
527 ) -> CatalogResult<Vec<String>> {
528 Ok(vec![])
529 }
530
531 async fn table_names(
532 &self,
533 _catalog: &str,
534 _schema: &str,
535 _query_ctx: Option<&QueryContext>,
536 ) -> CatalogResult<Vec<String>> {
537 Ok(vec![])
538 }
539
540 async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
541 Ok(false)
542 }
543
544 async fn schema_exists(
545 &self,
546 _catalog: &str,
547 _schema: &str,
548 _query_ctx: Option<&QueryContext>,
549 ) -> CatalogResult<bool> {
550 Ok(false)
551 }
552
553 async fn table_exists(
554 &self,
555 _catalog: &str,
556 _schema: &str,
557 _table: &str,
558 _query_ctx: Option<&QueryContext>,
559 ) -> CatalogResult<bool> {
560 Ok(false)
561 }
562
563 async fn table(
564 &self,
565 _catalog: &str,
566 _schema: &str,
567 _table_name: &str,
568 _query_ctx: Option<&QueryContext>,
569 ) -> CatalogResult<Option<TableRef>> {
570 Ok(None)
571 }
572
573 async fn table_id(
574 &self,
575 _catalog: &str,
576 _schema: &str,
577 _table_name: &str,
578 _query_ctx: Option<&QueryContext>,
579 ) -> CatalogResult<Option<TableId>> {
580 Ok(None)
581 }
582
583 async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
584 Ok(None)
585 }
586
587 async fn tables_by_ids(
588 &self,
589 _catalog: &str,
590 _schema: &str,
591 _table_ids: &[TableId],
592 ) -> CatalogResult<Vec<TableRef>> {
593 Ok(vec![])
594 }
595
596 fn tables<'a>(
597 &'a self,
598 _catalog: &'a str,
599 _schema: &'a str,
600 _query_ctx: Option<&'a QueryContext>,
601 ) -> BoxStream<'a, CatalogResult<TableRef>> {
602 Box::pin(futures::stream::empty())
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use std::collections::HashMap;
609 use std::sync::{Arc, RwLock};
610
611 use common_error::ext::ErrorExt;
612 use common_error::status_code::StatusCode;
613 use session::context::QueryContextBuilder;
614
615 use super::*;
616 use crate::error::Error;
617 use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID};
618
619 fn test_region_id() -> RegionId {
620 RegionId::new(1024, 1)
621 }
622
623 #[test]
624 fn test_scan_request_from_query_context_uses_snapshot_bound_intent() {
625 let region_id = test_region_id();
626 let query_ctx = QueryContextBuilder::default()
627 .extensions(HashMap::from([(
628 "flow.return_region_seq".to_string(),
629 "true".to_string(),
630 )]))
631 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
632 region_id.as_u64(),
633 42_u64,
634 )]))))
635 .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
636 region_id.as_u64(),
637 7_u64,
638 )]))))
639 .build();
640
641 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
642
643 assert!(!request.snapshot_on_scan);
644 assert_eq!(request.memtable_max_sequence, Some(42));
645 assert_eq!(request.sst_min_sequence, Some(7));
646 }
647
648 #[test]
649 fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
650 let region_id = test_region_id();
651 let query_ctx = QueryContextBuilder::default()
652 .extensions(HashMap::from([(
653 "flow.incremental_after_seqs".to_string(),
654 format!(r#"{{"{}":10}}"#, region_id.as_u64()),
655 )]))
656 .build();
657
658 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
659
660 assert!(request.snapshot_on_scan);
661 assert_eq!(request.memtable_min_sequence, Some(10));
662 assert_eq!(request.memtable_max_sequence, None);
663 }
664
665 #[test]
666 fn test_scan_request_from_query_context_keeps_snapshot_fields() {
667 let region_id = test_region_id();
668 let query_ctx = QueryContextBuilder::default()
669 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
670 region_id.as_u64(),
671 100,
672 )]))))
673 .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
674 region_id.as_u64(),
675 90,
676 )]))))
677 .build();
678
679 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
680 assert_eq!(request.memtable_max_sequence, Some(100));
681 assert_eq!(request.sst_min_sequence, Some(90));
682 assert_eq!(request.memtable_min_sequence, None);
683 assert!(!request.snapshot_on_scan);
684 }
685
686 #[test]
687 fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() {
688 let region_id = test_region_id();
689 let query_ctx = QueryContextBuilder::default()
690 .extensions(HashMap::from([(
691 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
692 format!(r#"{{"{}":10}}"#, region_id.as_u64()),
693 )]))
694 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
695 region_id.as_u64(),
696 42_u64,
697 )]))))
698 .build();
699
700 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
701
702 assert_eq!(request.memtable_min_sequence, Some(10));
703 assert_eq!(request.memtable_max_sequence, Some(42));
704 assert!(!request.snapshot_on_scan);
705 }
706
707 #[test]
708 fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() {
709 let region_id = test_region_id();
710 let query_ctx = QueryContextBuilder::default()
711 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
712 region_id.as_u64(),
713 88_u64,
714 )]))))
715 .build();
716 let mut request = ScanRequest {
717 snapshot_on_scan: true,
718 ..Default::default()
719 };
720
721 apply_cached_snapshot_to_request(&query_ctx, region_id, false, &mut request);
722
723 assert_eq!(request.memtable_max_sequence, Some(88));
724 assert!(!request.snapshot_on_scan);
725 }
726
727 #[test]
728 fn test_apply_cached_snapshot_to_request_skips_sink_scan() {
729 let region_id = test_region_id();
730 let query_ctx = QueryContextBuilder::default()
731 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
732 region_id.as_u64(),
733 88_u64,
734 )]))))
735 .build();
736 let mut request = ScanRequest {
737 snapshot_on_scan: true,
738 ..Default::default()
739 };
740
741 apply_cached_snapshot_to_request(&query_ctx, region_id, true, &mut request);
742
743 assert_eq!(request.memtable_max_sequence, None);
744 assert!(request.snapshot_on_scan);
745 }
746
747 #[test]
748 fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
749 let region_id = test_region_id();
750 let query_ctx = QueryContextBuilder::default()
751 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
752 region_id.as_u64(),
753 42_u64,
754 )]))))
755 .build();
756
757 let err = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap_err();
758
759 assert!(matches!(err, Error::ConflictingSnapshotSequence { .. }));
760 assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42));
761 }
762
763 #[test]
764 fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() {
765 let region_id = test_region_id();
766 let query_ctx = QueryContextBuilder::default().build();
767
768 let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap();
769
770 assert_eq!(seq, 99);
771 assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99));
772 }
773
774 #[test]
775 fn test_scan_request_from_query_context_applies_incremental_after_seq_for_source_scan() {
776 let region_id = test_region_id();
777 let query_ctx = QueryContextBuilder::default()
778 .extensions(HashMap::from([
779 (
780 FLOW_INCREMENTAL_MODE.to_string(),
781 "memtable_only".to_string(),
782 ),
783 (
784 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
785 format!(r#"{{"{}":55}}"#, region_id.as_u64()),
786 ),
787 ]))
788 .build();
789
790 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
791 assert_eq!(request.memtable_min_sequence, Some(55));
792 }
793
794 #[test]
795 fn test_scan_request_from_query_context_does_not_apply_incremental_for_sink_table() {
796 let region_id = test_region_id();
797 let query_ctx = QueryContextBuilder::default()
798 .extensions(HashMap::from([
799 (
800 FLOW_INCREMENTAL_MODE.to_string(),
801 "memtable_only".to_string(),
802 ),
803 (
804 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
805 format!(r#"{{"{}":55}}"#, region_id.as_u64()),
806 ),
807 (
808 FLOW_SINK_TABLE_ID.to_string(),
809 region_id.table_id().to_string(),
810 ),
811 ]))
812 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
813 region_id.as_u64(),
814 88_u64,
815 )]))))
816 .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
817 region_id.as_u64(),
818 77_u64,
819 )]))))
820 .build();
821
822 let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
823 assert_eq!(request.memtable_min_sequence, None);
824 assert_eq!(request.memtable_max_sequence, None);
825 assert_eq!(request.sst_min_sequence, None);
826 assert!(!request.snapshot_on_scan);
827 }
828
829 #[test]
830 fn test_scan_request_from_query_context_rejects_missing_memtable_only_region() {
831 let region_id = test_region_id();
832 let query_ctx = QueryContextBuilder::default()
833 .extensions(HashMap::from([
834 (
835 FLOW_INCREMENTAL_MODE.to_string(),
836 "memtable_only".to_string(),
837 ),
838 (
839 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
840 r#"{"9":55}"#.to_string(),
841 ),
842 ]))
843 .build();
844
845 let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
846 assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
847 }
848
849 #[test]
850 fn test_scan_request_from_query_context_rejects_invalid_incremental_json() {
851 let region_id = test_region_id();
852 let query_ctx = QueryContextBuilder::default()
853 .extensions(HashMap::from([(
854 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
855 "not-json".to_string(),
856 )]))
857 .build();
858
859 let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
860 assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
861 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
862 }
863
864 #[test]
865 fn test_scan_request_from_query_context_rejects_invalid_sink_table_id() {
866 let region_id = test_region_id();
867 let query_ctx = QueryContextBuilder::default()
868 .extensions(HashMap::from([(
869 FLOW_SINK_TABLE_ID.to_string(),
870 "abc".to_string(),
871 )]))
872 .build();
873
874 let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
875 assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
876 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
877 }
878}