Skip to main content

query/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{
39    RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorSearchRequest,
40};
41use table::TableRef;
42use table::metadata::{TableId, TableInfoRef};
43use table::table::scan::RegionScanExec;
44
45use crate::error::{GetRegionMetadataSnafu, Result};
46use crate::options::FlowQueryExtensions;
47
48/// Resolve to the given region (specified by [RegionId]) unconditionally.
49#[derive(Clone, Debug)]
50pub struct DummyCatalogList {
51    catalog: DummyCatalogProvider,
52}
53
54impl DummyCatalogList {
55    /// Creates a new catalog list with the given table provider.
56    pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
57        let schema_provider = DummySchemaProvider {
58            table: table_provider,
59        };
60        let catalog_provider = DummyCatalogProvider {
61            schema: schema_provider,
62        };
63        Self {
64            catalog: catalog_provider,
65        }
66    }
67}
68
69impl CatalogProviderList for DummyCatalogList {
70    fn as_any(&self) -> &dyn Any {
71        self
72    }
73
74    fn register_catalog(
75        &self,
76        _name: String,
77        _catalog: Arc<dyn CatalogProvider>,
78    ) -> Option<Arc<dyn CatalogProvider>> {
79        None
80    }
81
82    fn catalog_names(&self) -> Vec<String> {
83        vec![]
84    }
85
86    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
87        Some(Arc::new(self.catalog.clone()))
88    }
89}
90
91/// A dummy catalog provider for [DummyCatalogList].
92#[derive(Clone, Debug)]
93struct DummyCatalogProvider {
94    schema: DummySchemaProvider,
95}
96
97impl CatalogProvider for DummyCatalogProvider {
98    fn as_any(&self) -> &dyn Any {
99        self
100    }
101
102    fn schema_names(&self) -> Vec<String> {
103        vec![]
104    }
105
106    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
107        Some(Arc::new(self.schema.clone()))
108    }
109}
110
111/// A dummy schema provider for [DummyCatalogList].
112#[derive(Clone, Debug)]
113struct DummySchemaProvider {
114    table: Arc<dyn TableProvider>,
115}
116
117#[async_trait]
118impl SchemaProvider for DummySchemaProvider {
119    fn as_any(&self) -> &dyn Any {
120        self
121    }
122
123    fn table_names(&self) -> Vec<String> {
124        vec![]
125    }
126
127    async fn table(
128        &self,
129        _name: &str,
130    ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
131        Ok(Some(self.table.clone()))
132    }
133
134    fn table_exist(&self, _name: &str) -> bool {
135        true
136    }
137}
138
139/// For [TableProvider] and [DummyCatalogList]
140#[derive(Clone)]
141pub struct DummyTableProvider {
142    region_id: RegionId,
143    engine: RegionEngineRef,
144    metadata: RegionMetadataRef,
145    /// Keeping a mutable request makes it possible to change in the optimize phase.
146    scan_request: Arc<Mutex<ScanRequest>>,
147    query_ctx: Option<QueryContextRef>,
148}
149
150impl fmt::Debug for DummyTableProvider {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        f.debug_struct("DummyTableProvider")
153            .field("region_id", &self.region_id)
154            .field("metadata", &self.metadata)
155            .field("scan_request", &self.scan_request)
156            .finish()
157    }
158}
159
160#[async_trait]
161impl TableProvider for DummyTableProvider {
162    fn as_any(&self) -> &dyn Any {
163        self
164    }
165
166    fn schema(&self) -> SchemaRef {
167        self.metadata.schema.arrow_schema().clone()
168    }
169
170    fn table_type(&self) -> TableType {
171        TableType::Base
172    }
173
174    async fn scan(
175        &self,
176        _state: &dyn Session,
177        projection: Option<&Vec<usize>>,
178        filters: &[Expr],
179        limit: Option<usize>,
180    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
181        let mut request = self.scan_request.lock().unwrap().clone();
182        request.projection_input = projection.map(|p| p.clone().into());
183        request.filters = filters.to_vec();
184        request.limit = limit;
185
186        if let Some(query_ctx) = &self.query_ctx {
187            let is_sink_scan = is_sink_scan(query_ctx, self.region_id)
188                .map_err(|e| DataFusionError::External(Box::new(e)))?;
189            apply_cached_snapshot_to_request(query_ctx, self.region_id, is_sink_scan, &mut request);
190        }
191
192        let scanner = self
193            .engine
194            .handle_query(self.region_id, request.clone())
195            .await
196            .map_err(|e| DataFusionError::External(Box::new(e)))?;
197
198        if request.snapshot_on_scan
199            && let Some(query_ctx) = &self.query_ctx
200            && let Some(snapshot_sequence) = scanner.snapshot_sequence()
201        {
202            bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence)
203                .map_err(|e| DataFusionError::External(Box::new(e)))?;
204        }
205
206        let query_memory_tracker = self.engine.query_memory_tracker();
207        let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?;
208        if let Some(query_ctx) = &self.query_ctx {
209            scan_exec.set_explain_verbose(query_ctx.explain_verbose());
210        }
211        Ok(Arc::new(scan_exec))
212    }
213
214    fn supports_filters_pushdown(
215        &self,
216        filters: &[&Expr],
217    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
218        let supported = filters
219            .iter()
220            .map(|e| {
221                // Simple filter on primary key columns are precisely evaluated.
222                if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
223                    if self
224                        .metadata
225                        .column_by_name(simple_filter.column_name())
226                        .and_then(|c| {
227                            (c.semantic_type == SemanticType::Tag
228                                || c.semantic_type == SemanticType::Timestamp)
229                                .then_some(())
230                        })
231                        .is_some()
232                    {
233                        TableProviderFilterPushDown::Exact
234                    } else {
235                        TableProviderFilterPushDown::Inexact
236                    }
237                } else {
238                    TableProviderFilterPushDown::Inexact
239                }
240            })
241            .collect();
242        Ok(supported)
243    }
244}
245
246impl DummyTableProvider {
247    /// Creates a new provider.
248    pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
249        Self {
250            region_id,
251            engine,
252            metadata,
253            scan_request: Default::default(),
254            query_ctx: None,
255        }
256    }
257
258    pub fn region_metadata(&self) -> RegionMetadataRef {
259        self.metadata.clone()
260    }
261
262    /// Sets the ordering hint of the query to the provider.
263    pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
264        self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
265    }
266
267    /// Sets the distribution hint of the query to the provider.
268    pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
269        self.scan_request.lock().unwrap().distribution = Some(distribution);
270    }
271
272    /// Sets the time series selector hint of the query to the provider.
273    pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
274        self.scan_request.lock().unwrap().series_row_selector = Some(selector);
275    }
276
277    pub fn with_vector_search_hint(&self, hint: VectorSearchRequest) {
278        self.scan_request.lock().unwrap().vector_search = Some(hint);
279    }
280
281    pub fn get_vector_search_hint(&self) -> Option<VectorSearchRequest> {
282        self.scan_request.lock().unwrap().vector_search.clone()
283    }
284
285    pub fn with_sequence(&self, sequence: u64) {
286        self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
287    }
288
289    /// Gets the scan request of the provider.
290    #[cfg(test)]
291    pub fn scan_request(&self) -> ScanRequest {
292        self.scan_request.lock().unwrap().clone()
293    }
294}
295
296pub struct DummyTableProviderFactory;
297
298impl DummyTableProviderFactory {
299    pub async fn create_table_provider(
300        &self,
301        region_id: RegionId,
302        engine: RegionEngineRef,
303        query_ctx: Option<QueryContextRef>,
304    ) -> Result<DummyTableProvider> {
305        let metadata =
306            engine
307                .get_metadata(region_id)
308                .await
309                .with_context(|_| GetRegionMetadataSnafu {
310                    engine: engine.name(),
311                    region_id,
312                })?;
313
314        let scan_request = if let Some(ctx) = query_ctx.as_ref() {
315            scan_request_from_query_context(region_id, ctx)?
316        } else {
317            ScanRequest::default()
318        };
319
320        Ok(DummyTableProvider {
321            region_id,
322            engine,
323            metadata,
324            scan_request: Arc::new(Mutex::new(scan_request)),
325            query_ctx,
326        })
327    }
328}
329
330fn scan_request_from_query_context(
331    region_id: RegionId,
332    query_ctx: &QueryContext,
333) -> Result<ScanRequest> {
334    let decision = decide_flow_scan(query_ctx, region_id)?;
335    Ok(build_scan_request(query_ctx, region_id, &decision))
336}
337
338#[derive(Debug, Clone, PartialEq, Eq)]
339struct FlowScanDecision {
340    /// Whether this region is the flow sink-table scan.
341    /// Sink scans intentionally bypass incremental and snapshot-binding semantics.
342    is_sink_scan: bool,
343    /// Whether this scan should bind a memtable upper bound when opening the scan.
344    /// This is only the initial intent; if a cached bound already exists in `query_ctx`,
345    /// we reuse that cached bound instead and clear this flag.
346    snapshot_on_scan: bool,
347    /// Optional lower exclusive memtable sequence bound for incremental reads.
348    /// When set, only rows with sequence strictly greater than this bound are read from memtables.
349    memtable_min_sequence: Option<u64>,
350    /// Optional cached per-region snapshot already bound in `query_ctx`.
351    /// When present, this becomes the effective memtable upper bound and suppresses
352    /// binding a new snapshot on scan open.
353    memtable_max_sequence: Option<u64>,
354}
355
356impl FlowScanDecision {
357    fn plain_scan() -> Self {
358        Self {
359            is_sink_scan: true,
360            snapshot_on_scan: false,
361            memtable_min_sequence: None,
362            memtable_max_sequence: None,
363        }
364    }
365}
366
367fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
368    let Some(flow_extensions) =
369        FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
370    else {
371        return Ok(FlowScanDecision {
372            is_sink_scan: false,
373            snapshot_on_scan: false,
374            memtable_min_sequence: None,
375            memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
376        });
377    };
378
379    // Sink-table scans intentionally bypass all flow scan semantics. They should
380    // behave like plain reads and must not participate in incremental lower bounds
381    // or per-region snapshot binding/reuse.
382    if flow_extensions.sink_table_id == Some(region_id.table_id()) {
383        return Ok(FlowScanDecision::plain_scan());
384    }
385
386    let apply_incremental = flow_extensions.validate_for_scan(region_id)?;
387
388    let memtable_min_sequence = if apply_incremental {
389        flow_extensions
390            .incremental_after_seqs
391            .as_ref()
392            .and_then(|seqs| seqs.get(&region_id.as_u64()))
393            .copied()
394    } else {
395        None
396    };
397
398    let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64());
399
400    Ok(FlowScanDecision {
401        is_sink_scan: false,
402        snapshot_on_scan: memtable_max_sequence.is_none()
403            && flow_extensions.should_collect_region_watermark(),
404        memtable_min_sequence,
405        memtable_max_sequence,
406    })
407}
408
409fn build_scan_request(
410    query_ctx: &QueryContext,
411    region_id: RegionId,
412    decision: &FlowScanDecision,
413) -> ScanRequest {
414    // Build the initial scan request from the final decision known at provider creation
415    // time. A later scan may still refresh `memtable_max_sequence` if another source scan
416    // has bound a snapshot into `query_ctx` after this provider was created.
417    ScanRequest {
418        sst_min_sequence: (!decision.is_sink_scan)
419            .then(|| query_ctx.sst_min_sequence(region_id.as_u64()))
420            .flatten(),
421        snapshot_on_scan: decision.snapshot_on_scan,
422        memtable_min_sequence: decision.memtable_min_sequence,
423        memtable_max_sequence: decision.memtable_max_sequence,
424        ..Default::default()
425    }
426}
427
428fn is_sink_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<bool> {
429    Ok(
430        FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
431            .is_some_and(|exts| exts.sink_table_id == Some(region_id.table_id())),
432    )
433}
434
435fn apply_cached_snapshot_to_request(
436    query_ctx: &QueryContext,
437    region_id: RegionId,
438    is_sink_scan: bool,
439    scan_request: &mut ScanRequest,
440) {
441    if is_sink_scan {
442        return;
443    }
444
445    if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) {
446        // Reuse the previously bound per-region snapshot instead of rebinding a new
447        // upper bound on scan open. This refresh is still needed at scan time because
448        // the provider's cached request may have been built before another source scan
449        // bound the shared query-level snapshot into `query_ctx`.
450        scan_request.memtable_max_sequence = Some(snapshot_sequence);
451        scan_request.snapshot_on_scan = false;
452    }
453}
454
455fn bind_snapshot_bound_region_seq(
456    query_ctx: &QueryContext,
457    region_id: RegionId,
458    snapshot_sequence: u64,
459) -> Result<u64> {
460    if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) {
461        if existing != snapshot_sequence {
462            return crate::error::ConflictingSnapshotSequenceSnafu {
463                region_id,
464                existing,
465                new: snapshot_sequence,
466            }
467            .fail();
468        }
469        Ok(existing)
470    } else {
471        query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence);
472        Ok(snapshot_sequence)
473    }
474}
475
476#[async_trait]
477impl TableProviderFactory for DummyTableProviderFactory {
478    async fn create(
479        &self,
480        region_id: RegionId,
481        engine: RegionEngineRef,
482        ctx: Option<QueryContextRef>,
483    ) -> Result<Arc<dyn TableProvider>> {
484        let provider = self.create_table_provider(region_id, engine, ctx).await?;
485        Ok(Arc::new(provider))
486    }
487}
488
489#[async_trait]
490pub trait TableProviderFactory: Send + Sync {
491    async fn create(
492        &self,
493        region_id: RegionId,
494        engine: RegionEngineRef,
495        ctx: Option<QueryContextRef>,
496    ) -> Result<Arc<dyn TableProvider>>;
497}
498
499pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
500
501/// A dummy catalog manager that always returns empty results.
502///
503/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode.
504pub struct DummyCatalogManager;
505
506impl DummyCatalogManager {
507    /// Returns a new `CatalogManagerRef` instance.
508    pub fn arc() -> CatalogManagerRef {
509        Arc::new(Self)
510    }
511}
512
513#[async_trait::async_trait]
514impl CatalogManager for DummyCatalogManager {
515    fn as_any(&self) -> &dyn Any {
516        self
517    }
518
519    async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
520        Ok(vec![])
521    }
522
523    async fn schema_names(
524        &self,
525        _catalog: &str,
526        _query_ctx: Option<&QueryContext>,
527    ) -> CatalogResult<Vec<String>> {
528        Ok(vec![])
529    }
530
531    async fn table_names(
532        &self,
533        _catalog: &str,
534        _schema: &str,
535        _query_ctx: Option<&QueryContext>,
536    ) -> CatalogResult<Vec<String>> {
537        Ok(vec![])
538    }
539
540    async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
541        Ok(false)
542    }
543
544    async fn schema_exists(
545        &self,
546        _catalog: &str,
547        _schema: &str,
548        _query_ctx: Option<&QueryContext>,
549    ) -> CatalogResult<bool> {
550        Ok(false)
551    }
552
553    async fn table_exists(
554        &self,
555        _catalog: &str,
556        _schema: &str,
557        _table: &str,
558        _query_ctx: Option<&QueryContext>,
559    ) -> CatalogResult<bool> {
560        Ok(false)
561    }
562
563    async fn table(
564        &self,
565        _catalog: &str,
566        _schema: &str,
567        _table_name: &str,
568        _query_ctx: Option<&QueryContext>,
569    ) -> CatalogResult<Option<TableRef>> {
570        Ok(None)
571    }
572
573    async fn table_id(
574        &self,
575        _catalog: &str,
576        _schema: &str,
577        _table_name: &str,
578        _query_ctx: Option<&QueryContext>,
579    ) -> CatalogResult<Option<TableId>> {
580        Ok(None)
581    }
582
583    async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
584        Ok(None)
585    }
586
587    async fn tables_by_ids(
588        &self,
589        _catalog: &str,
590        _schema: &str,
591        _table_ids: &[TableId],
592    ) -> CatalogResult<Vec<TableRef>> {
593        Ok(vec![])
594    }
595
596    fn tables<'a>(
597        &'a self,
598        _catalog: &'a str,
599        _schema: &'a str,
600        _query_ctx: Option<&'a QueryContext>,
601    ) -> BoxStream<'a, CatalogResult<TableRef>> {
602        Box::pin(futures::stream::empty())
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use std::collections::HashMap;
609    use std::sync::{Arc, RwLock};
610
611    use common_error::ext::ErrorExt;
612    use common_error::status_code::StatusCode;
613    use session::context::QueryContextBuilder;
614
615    use super::*;
616    use crate::error::Error;
617    use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID};
618
619    fn test_region_id() -> RegionId {
620        RegionId::new(1024, 1)
621    }
622
623    #[test]
624    fn test_scan_request_from_query_context_uses_snapshot_bound_intent() {
625        let region_id = test_region_id();
626        let query_ctx = QueryContextBuilder::default()
627            .extensions(HashMap::from([(
628                "flow.return_region_seq".to_string(),
629                "true".to_string(),
630            )]))
631            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
632                region_id.as_u64(),
633                42_u64,
634            )]))))
635            .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
636                region_id.as_u64(),
637                7_u64,
638            )]))))
639            .build();
640
641        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
642
643        assert!(!request.snapshot_on_scan);
644        assert_eq!(request.memtable_max_sequence, Some(42));
645        assert_eq!(request.sst_min_sequence, Some(7));
646    }
647
648    #[test]
649    fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
650        let region_id = test_region_id();
651        let query_ctx = QueryContextBuilder::default()
652            .extensions(HashMap::from([(
653                "flow.incremental_after_seqs".to_string(),
654                format!(r#"{{"{}":10}}"#, region_id.as_u64()),
655            )]))
656            .build();
657
658        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
659
660        assert!(request.snapshot_on_scan);
661        assert_eq!(request.memtable_min_sequence, Some(10));
662        assert_eq!(request.memtable_max_sequence, None);
663    }
664
665    #[test]
666    fn test_scan_request_from_query_context_keeps_snapshot_fields() {
667        let region_id = test_region_id();
668        let query_ctx = QueryContextBuilder::default()
669            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
670                region_id.as_u64(),
671                100,
672            )]))))
673            .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
674                region_id.as_u64(),
675                90,
676            )]))))
677            .build();
678
679        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
680        assert_eq!(request.memtable_max_sequence, Some(100));
681        assert_eq!(request.sst_min_sequence, Some(90));
682        assert_eq!(request.memtable_min_sequence, None);
683        assert!(!request.snapshot_on_scan);
684    }
685
686    #[test]
687    fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() {
688        let region_id = test_region_id();
689        let query_ctx = QueryContextBuilder::default()
690            .extensions(HashMap::from([(
691                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
692                format!(r#"{{"{}":10}}"#, region_id.as_u64()),
693            )]))
694            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
695                region_id.as_u64(),
696                42_u64,
697            )]))))
698            .build();
699
700        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
701
702        assert_eq!(request.memtable_min_sequence, Some(10));
703        assert_eq!(request.memtable_max_sequence, Some(42));
704        assert!(!request.snapshot_on_scan);
705    }
706
707    #[test]
708    fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() {
709        let region_id = test_region_id();
710        let query_ctx = QueryContextBuilder::default()
711            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
712                region_id.as_u64(),
713                88_u64,
714            )]))))
715            .build();
716        let mut request = ScanRequest {
717            snapshot_on_scan: true,
718            ..Default::default()
719        };
720
721        apply_cached_snapshot_to_request(&query_ctx, region_id, false, &mut request);
722
723        assert_eq!(request.memtable_max_sequence, Some(88));
724        assert!(!request.snapshot_on_scan);
725    }
726
727    #[test]
728    fn test_apply_cached_snapshot_to_request_skips_sink_scan() {
729        let region_id = test_region_id();
730        let query_ctx = QueryContextBuilder::default()
731            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
732                region_id.as_u64(),
733                88_u64,
734            )]))))
735            .build();
736        let mut request = ScanRequest {
737            snapshot_on_scan: true,
738            ..Default::default()
739        };
740
741        apply_cached_snapshot_to_request(&query_ctx, region_id, true, &mut request);
742
743        assert_eq!(request.memtable_max_sequence, None);
744        assert!(request.snapshot_on_scan);
745    }
746
747    #[test]
748    fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
749        let region_id = test_region_id();
750        let query_ctx = QueryContextBuilder::default()
751            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
752                region_id.as_u64(),
753                42_u64,
754            )]))))
755            .build();
756
757        let err = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap_err();
758
759        assert!(matches!(err, Error::ConflictingSnapshotSequence { .. }));
760        assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42));
761    }
762
763    #[test]
764    fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() {
765        let region_id = test_region_id();
766        let query_ctx = QueryContextBuilder::default().build();
767
768        let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap();
769
770        assert_eq!(seq, 99);
771        assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99));
772    }
773
774    #[test]
775    fn test_scan_request_from_query_context_applies_incremental_after_seq_for_source_scan() {
776        let region_id = test_region_id();
777        let query_ctx = QueryContextBuilder::default()
778            .extensions(HashMap::from([
779                (
780                    FLOW_INCREMENTAL_MODE.to_string(),
781                    "memtable_only".to_string(),
782                ),
783                (
784                    FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
785                    format!(r#"{{"{}":55}}"#, region_id.as_u64()),
786                ),
787            ]))
788            .build();
789
790        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
791        assert_eq!(request.memtable_min_sequence, Some(55));
792    }
793
794    #[test]
795    fn test_scan_request_from_query_context_does_not_apply_incremental_for_sink_table() {
796        let region_id = test_region_id();
797        let query_ctx = QueryContextBuilder::default()
798            .extensions(HashMap::from([
799                (
800                    FLOW_INCREMENTAL_MODE.to_string(),
801                    "memtable_only".to_string(),
802                ),
803                (
804                    FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
805                    format!(r#"{{"{}":55}}"#, region_id.as_u64()),
806                ),
807                (
808                    FLOW_SINK_TABLE_ID.to_string(),
809                    region_id.table_id().to_string(),
810                ),
811            ]))
812            .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
813                region_id.as_u64(),
814                88_u64,
815            )]))))
816            .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
817                region_id.as_u64(),
818                77_u64,
819            )]))))
820            .build();
821
822        let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
823        assert_eq!(request.memtable_min_sequence, None);
824        assert_eq!(request.memtable_max_sequence, None);
825        assert_eq!(request.sst_min_sequence, None);
826        assert!(!request.snapshot_on_scan);
827    }
828
829    #[test]
830    fn test_scan_request_from_query_context_rejects_missing_memtable_only_region() {
831        let region_id = test_region_id();
832        let query_ctx = QueryContextBuilder::default()
833            .extensions(HashMap::from([
834                (
835                    FLOW_INCREMENTAL_MODE.to_string(),
836                    "memtable_only".to_string(),
837                ),
838                (
839                    FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
840                    r#"{"9":55}"#.to_string(),
841                ),
842            ]))
843            .build();
844
845        let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
846        assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
847    }
848
849    #[test]
850    fn test_scan_request_from_query_context_rejects_invalid_incremental_json() {
851        let region_id = test_region_id();
852        let query_ctx = QueryContextBuilder::default()
853            .extensions(HashMap::from([(
854                FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
855                "not-json".to_string(),
856            )]))
857            .build();
858
859        let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
860        assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
861        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
862    }
863
864    #[test]
865    fn test_scan_request_from_query_context_rejects_invalid_sink_table_id() {
866        let region_id = test_region_id();
867        let query_ctx = QueryContextBuilder::default()
868            .extensions(HashMap::from([(
869                FLOW_SINK_TABLE_ID.to_string(),
870                "abc".to_string(),
871            )]))
872            .build();
873
874        let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
875        assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
876        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
877    }
878}