query/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{
39    RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorSearchRequest,
40};
41use table::TableRef;
42use table::metadata::{TableId, TableInfoRef};
43use table::table::scan::RegionScanExec;
44
45use crate::error::{GetRegionMetadataSnafu, Result};
46
47/// Resolve to the given region (specified by [RegionId]) unconditionally.
48#[derive(Clone, Debug)]
49pub struct DummyCatalogList {
50    catalog: DummyCatalogProvider,
51}
52
53impl DummyCatalogList {
54    /// Creates a new catalog list with the given table provider.
55    pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
56        let schema_provider = DummySchemaProvider {
57            table: table_provider,
58        };
59        let catalog_provider = DummyCatalogProvider {
60            schema: schema_provider,
61        };
62        Self {
63            catalog: catalog_provider,
64        }
65    }
66}
67
68impl CatalogProviderList for DummyCatalogList {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    fn register_catalog(
74        &self,
75        _name: String,
76        _catalog: Arc<dyn CatalogProvider>,
77    ) -> Option<Arc<dyn CatalogProvider>> {
78        None
79    }
80
81    fn catalog_names(&self) -> Vec<String> {
82        vec![]
83    }
84
85    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
86        Some(Arc::new(self.catalog.clone()))
87    }
88}
89
90/// A dummy catalog provider for [DummyCatalogList].
91#[derive(Clone, Debug)]
92struct DummyCatalogProvider {
93    schema: DummySchemaProvider,
94}
95
96impl CatalogProvider for DummyCatalogProvider {
97    fn as_any(&self) -> &dyn Any {
98        self
99    }
100
101    fn schema_names(&self) -> Vec<String> {
102        vec![]
103    }
104
105    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
106        Some(Arc::new(self.schema.clone()))
107    }
108}
109
110/// A dummy schema provider for [DummyCatalogList].
111#[derive(Clone, Debug)]
112struct DummySchemaProvider {
113    table: Arc<dyn TableProvider>,
114}
115
116#[async_trait]
117impl SchemaProvider for DummySchemaProvider {
118    fn as_any(&self) -> &dyn Any {
119        self
120    }
121
122    fn table_names(&self) -> Vec<String> {
123        vec![]
124    }
125
126    async fn table(
127        &self,
128        _name: &str,
129    ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
130        Ok(Some(self.table.clone()))
131    }
132
133    fn table_exist(&self, _name: &str) -> bool {
134        true
135    }
136}
137
138/// For [TableProvider] and [DummyCatalogList]
139#[derive(Clone)]
140pub struct DummyTableProvider {
141    region_id: RegionId,
142    engine: RegionEngineRef,
143    metadata: RegionMetadataRef,
144    /// Keeping a mutable request makes it possible to change in the optimize phase.
145    scan_request: Arc<Mutex<ScanRequest>>,
146    query_ctx: Option<QueryContextRef>,
147}
148
149impl fmt::Debug for DummyTableProvider {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        f.debug_struct("DummyTableProvider")
152            .field("region_id", &self.region_id)
153            .field("metadata", &self.metadata)
154            .field("scan_request", &self.scan_request)
155            .finish()
156    }
157}
158
159#[async_trait]
160impl TableProvider for DummyTableProvider {
161    fn as_any(&self) -> &dyn Any {
162        self
163    }
164
165    fn schema(&self) -> SchemaRef {
166        self.metadata.schema.arrow_schema().clone()
167    }
168
169    fn table_type(&self) -> TableType {
170        TableType::Base
171    }
172
173    async fn scan(
174        &self,
175        _state: &dyn Session,
176        projection: Option<&Vec<usize>>,
177        filters: &[Expr],
178        limit: Option<usize>,
179    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
180        let mut request = self.scan_request.lock().unwrap().clone();
181        request.projection = projection.cloned();
182        request.filters = filters.to_vec();
183        request.limit = limit;
184
185        let scanner = self
186            .engine
187            .handle_query(self.region_id, request.clone())
188            .await
189            .map_err(|e| DataFusionError::External(Box::new(e)))?;
190        let query_memory_permit = self.engine.register_query_memory_permit();
191        let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
192        if let Some(query_ctx) = &self.query_ctx {
193            scan_exec.set_explain_verbose(query_ctx.explain_verbose());
194        }
195        Ok(Arc::new(scan_exec))
196    }
197
198    fn supports_filters_pushdown(
199        &self,
200        filters: &[&Expr],
201    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
202        let supported = filters
203            .iter()
204            .map(|e| {
205                // Simple filter on primary key columns are precisely evaluated.
206                if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
207                    if self
208                        .metadata
209                        .column_by_name(simple_filter.column_name())
210                        .and_then(|c| {
211                            (c.semantic_type == SemanticType::Tag
212                                || c.semantic_type == SemanticType::Timestamp)
213                                .then_some(())
214                        })
215                        .is_some()
216                    {
217                        TableProviderFilterPushDown::Exact
218                    } else {
219                        TableProviderFilterPushDown::Inexact
220                    }
221                } else {
222                    TableProviderFilterPushDown::Inexact
223                }
224            })
225            .collect();
226        Ok(supported)
227    }
228}
229
230impl DummyTableProvider {
231    /// Creates a new provider.
232    pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
233        Self {
234            region_id,
235            engine,
236            metadata,
237            scan_request: Default::default(),
238            query_ctx: None,
239        }
240    }
241
242    pub fn region_metadata(&self) -> RegionMetadataRef {
243        self.metadata.clone()
244    }
245
246    /// Sets the ordering hint of the query to the provider.
247    pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
248        self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
249    }
250
251    /// Sets the distribution hint of the query to the provider.
252    pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
253        self.scan_request.lock().unwrap().distribution = Some(distribution);
254    }
255
256    /// Sets the time series selector hint of the query to the provider.
257    pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
258        self.scan_request.lock().unwrap().series_row_selector = Some(selector);
259    }
260
261    pub fn with_vector_search_hint(&self, hint: VectorSearchRequest) {
262        self.scan_request.lock().unwrap().vector_search = Some(hint);
263    }
264
265    pub fn get_vector_search_hint(&self) -> Option<VectorSearchRequest> {
266        self.scan_request.lock().unwrap().vector_search.clone()
267    }
268
269    pub fn with_sequence(&self, sequence: u64) {
270        self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
271    }
272
273    /// Gets the scan request of the provider.
274    #[cfg(test)]
275    pub fn scan_request(&self) -> ScanRequest {
276        self.scan_request.lock().unwrap().clone()
277    }
278}
279
280pub struct DummyTableProviderFactory;
281
282impl DummyTableProviderFactory {
283    pub async fn create_table_provider(
284        &self,
285        region_id: RegionId,
286        engine: RegionEngineRef,
287        query_ctx: Option<QueryContextRef>,
288    ) -> Result<DummyTableProvider> {
289        let metadata =
290            engine
291                .get_metadata(region_id)
292                .await
293                .with_context(|_| GetRegionMetadataSnafu {
294                    engine: engine.name(),
295                    region_id,
296                })?;
297
298        let scan_request = query_ctx
299            .as_ref()
300            .map(|ctx| ScanRequest {
301                memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
302                sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
303                ..Default::default()
304            })
305            .unwrap_or_default();
306
307        Ok(DummyTableProvider {
308            region_id,
309            engine,
310            metadata,
311            scan_request: Arc::new(Mutex::new(scan_request)),
312            query_ctx,
313        })
314    }
315}
316
317#[async_trait]
318impl TableProviderFactory for DummyTableProviderFactory {
319    async fn create(
320        &self,
321        region_id: RegionId,
322        engine: RegionEngineRef,
323        ctx: Option<QueryContextRef>,
324    ) -> Result<Arc<dyn TableProvider>> {
325        let provider = self.create_table_provider(region_id, engine, ctx).await?;
326        Ok(Arc::new(provider))
327    }
328}
329
330#[async_trait]
331pub trait TableProviderFactory: Send + Sync {
332    async fn create(
333        &self,
334        region_id: RegionId,
335        engine: RegionEngineRef,
336        ctx: Option<QueryContextRef>,
337    ) -> Result<Arc<dyn TableProvider>>;
338}
339
340pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
341
342/// A dummy catalog manager that always returns empty results.
343///
344/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode.
345pub struct DummyCatalogManager;
346
347impl DummyCatalogManager {
348    /// Returns a new `CatalogManagerRef` instance.
349    pub fn arc() -> CatalogManagerRef {
350        Arc::new(Self)
351    }
352}
353
354#[async_trait::async_trait]
355impl CatalogManager for DummyCatalogManager {
356    fn as_any(&self) -> &dyn Any {
357        self
358    }
359
360    async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
361        Ok(vec![])
362    }
363
364    async fn schema_names(
365        &self,
366        _catalog: &str,
367        _query_ctx: Option<&QueryContext>,
368    ) -> CatalogResult<Vec<String>> {
369        Ok(vec![])
370    }
371
372    async fn table_names(
373        &self,
374        _catalog: &str,
375        _schema: &str,
376        _query_ctx: Option<&QueryContext>,
377    ) -> CatalogResult<Vec<String>> {
378        Ok(vec![])
379    }
380
381    async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
382        Ok(false)
383    }
384
385    async fn schema_exists(
386        &self,
387        _catalog: &str,
388        _schema: &str,
389        _query_ctx: Option<&QueryContext>,
390    ) -> CatalogResult<bool> {
391        Ok(false)
392    }
393
394    async fn table_exists(
395        &self,
396        _catalog: &str,
397        _schema: &str,
398        _table: &str,
399        _query_ctx: Option<&QueryContext>,
400    ) -> CatalogResult<bool> {
401        Ok(false)
402    }
403
404    async fn table(
405        &self,
406        _catalog: &str,
407        _schema: &str,
408        _table_name: &str,
409        _query_ctx: Option<&QueryContext>,
410    ) -> CatalogResult<Option<TableRef>> {
411        Ok(None)
412    }
413
414    async fn table_id(
415        &self,
416        _catalog: &str,
417        _schema: &str,
418        _table_name: &str,
419        _query_ctx: Option<&QueryContext>,
420    ) -> CatalogResult<Option<TableId>> {
421        Ok(None)
422    }
423
424    async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
425        Ok(None)
426    }
427
428    async fn tables_by_ids(
429        &self,
430        _catalog: &str,
431        _schema: &str,
432        _table_ids: &[TableId],
433    ) -> CatalogResult<Vec<TableRef>> {
434        Ok(vec![])
435    }
436
437    fn tables<'a>(
438        &'a self,
439        _catalog: &'a str,
440        _schema: &'a str,
441        _query_ctx: Option<&'a QueryContext>,
442    ) -> BoxStream<'a, CatalogResult<TableRef>> {
443        Box::pin(futures::stream::empty())
444    }
445}