query/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::TableRef;
40use table::metadata::{TableId, TableInfoRef};
41use table::table::scan::RegionScanExec;
42
43use crate::error::{GetRegionMetadataSnafu, Result};
44
45/// Resolve to the given region (specified by [RegionId]) unconditionally.
46#[derive(Clone, Debug)]
47pub struct DummyCatalogList {
48    catalog: DummyCatalogProvider,
49}
50
51impl DummyCatalogList {
52    /// Creates a new catalog list with the given table provider.
53    pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
54        let schema_provider = DummySchemaProvider {
55            table: table_provider,
56        };
57        let catalog_provider = DummyCatalogProvider {
58            schema: schema_provider,
59        };
60        Self {
61            catalog: catalog_provider,
62        }
63    }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    fn register_catalog(
72        &self,
73        _name: String,
74        _catalog: Arc<dyn CatalogProvider>,
75    ) -> Option<Arc<dyn CatalogProvider>> {
76        None
77    }
78
79    fn catalog_names(&self) -> Vec<String> {
80        vec![]
81    }
82
83    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
84        Some(Arc::new(self.catalog.clone()))
85    }
86}
87
88/// A dummy catalog provider for [DummyCatalogList].
89#[derive(Clone, Debug)]
90struct DummyCatalogProvider {
91    schema: DummySchemaProvider,
92}
93
94impl CatalogProvider for DummyCatalogProvider {
95    fn as_any(&self) -> &dyn Any {
96        self
97    }
98
99    fn schema_names(&self) -> Vec<String> {
100        vec![]
101    }
102
103    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
104        Some(Arc::new(self.schema.clone()))
105    }
106}
107
108/// A dummy schema provider for [DummyCatalogList].
109#[derive(Clone, Debug)]
110struct DummySchemaProvider {
111    table: Arc<dyn TableProvider>,
112}
113
114#[async_trait]
115impl SchemaProvider for DummySchemaProvider {
116    fn as_any(&self) -> &dyn Any {
117        self
118    }
119
120    fn table_names(&self) -> Vec<String> {
121        vec![]
122    }
123
124    async fn table(
125        &self,
126        _name: &str,
127    ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
128        Ok(Some(self.table.clone()))
129    }
130
131    fn table_exist(&self, _name: &str) -> bool {
132        true
133    }
134}
135
136/// For [TableProvider] and [DummyCatalogList]
137#[derive(Clone)]
138pub struct DummyTableProvider {
139    region_id: RegionId,
140    engine: RegionEngineRef,
141    metadata: RegionMetadataRef,
142    /// Keeping a mutable request makes it possible to change in the optimize phase.
143    scan_request: Arc<Mutex<ScanRequest>>,
144    query_ctx: Option<QueryContextRef>,
145}
146
147impl fmt::Debug for DummyTableProvider {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("DummyTableProvider")
150            .field("region_id", &self.region_id)
151            .field("metadata", &self.metadata)
152            .field("scan_request", &self.scan_request)
153            .finish()
154    }
155}
156
157#[async_trait]
158impl TableProvider for DummyTableProvider {
159    fn as_any(&self) -> &dyn Any {
160        self
161    }
162
163    fn schema(&self) -> SchemaRef {
164        self.metadata.schema.arrow_schema().clone()
165    }
166
167    fn table_type(&self) -> TableType {
168        TableType::Base
169    }
170
171    async fn scan(
172        &self,
173        _state: &dyn Session,
174        projection: Option<&Vec<usize>>,
175        filters: &[Expr],
176        limit: Option<usize>,
177    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
178        let mut request = self.scan_request.lock().unwrap().clone();
179        request.projection = projection.cloned();
180        request.filters = filters.to_vec();
181        request.limit = limit;
182
183        let scanner = self
184            .engine
185            .handle_query(self.region_id, request.clone())
186            .await
187            .map_err(|e| DataFusionError::External(Box::new(e)))?;
188        let query_memory_permit = self.engine.register_query_memory_permit();
189        let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
190        if let Some(query_ctx) = &self.query_ctx {
191            scan_exec.set_explain_verbose(query_ctx.explain_verbose());
192        }
193        Ok(Arc::new(scan_exec))
194    }
195
196    fn supports_filters_pushdown(
197        &self,
198        filters: &[&Expr],
199    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
200        let supported = filters
201            .iter()
202            .map(|e| {
203                // Simple filter on primary key columns are precisely evaluated.
204                if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
205                    if self
206                        .metadata
207                        .column_by_name(simple_filter.column_name())
208                        .and_then(|c| {
209                            (c.semantic_type == SemanticType::Tag
210                                || c.semantic_type == SemanticType::Timestamp)
211                                .then_some(())
212                        })
213                        .is_some()
214                    {
215                        TableProviderFilterPushDown::Exact
216                    } else {
217                        TableProviderFilterPushDown::Inexact
218                    }
219                } else {
220                    TableProviderFilterPushDown::Inexact
221                }
222            })
223            .collect();
224        Ok(supported)
225    }
226}
227
228impl DummyTableProvider {
229    /// Creates a new provider.
230    pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
231        Self {
232            region_id,
233            engine,
234            metadata,
235            scan_request: Default::default(),
236            query_ctx: None,
237        }
238    }
239
240    pub fn region_metadata(&self) -> RegionMetadataRef {
241        self.metadata.clone()
242    }
243
244    /// Sets the ordering hint of the query to the provider.
245    pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
246        self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
247    }
248
249    /// Sets the distribution hint of the query to the provider.
250    pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
251        self.scan_request.lock().unwrap().distribution = Some(distribution);
252    }
253
254    /// Sets the time series selector hint of the query to the provider.
255    pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
256        self.scan_request.lock().unwrap().series_row_selector = Some(selector);
257    }
258
259    pub fn with_sequence(&self, sequence: u64) {
260        self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
261    }
262
263    /// Gets the scan request of the provider.
264    #[cfg(test)]
265    pub fn scan_request(&self) -> ScanRequest {
266        self.scan_request.lock().unwrap().clone()
267    }
268}
269
270pub struct DummyTableProviderFactory;
271
272impl DummyTableProviderFactory {
273    pub async fn create_table_provider(
274        &self,
275        region_id: RegionId,
276        engine: RegionEngineRef,
277        query_ctx: Option<QueryContextRef>,
278    ) -> Result<DummyTableProvider> {
279        let metadata =
280            engine
281                .get_metadata(region_id)
282                .await
283                .with_context(|_| GetRegionMetadataSnafu {
284                    engine: engine.name(),
285                    region_id,
286                })?;
287
288        let scan_request = query_ctx
289            .as_ref()
290            .map(|ctx| ScanRequest {
291                memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
292                sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
293                ..Default::default()
294            })
295            .unwrap_or_default();
296
297        Ok(DummyTableProvider {
298            region_id,
299            engine,
300            metadata,
301            scan_request: Arc::new(Mutex::new(scan_request)),
302            query_ctx,
303        })
304    }
305}
306
307#[async_trait]
308impl TableProviderFactory for DummyTableProviderFactory {
309    async fn create(
310        &self,
311        region_id: RegionId,
312        engine: RegionEngineRef,
313        ctx: Option<QueryContextRef>,
314    ) -> Result<Arc<dyn TableProvider>> {
315        let provider = self.create_table_provider(region_id, engine, ctx).await?;
316        Ok(Arc::new(provider))
317    }
318}
319
320#[async_trait]
321pub trait TableProviderFactory: Send + Sync {
322    async fn create(
323        &self,
324        region_id: RegionId,
325        engine: RegionEngineRef,
326        ctx: Option<QueryContextRef>,
327    ) -> Result<Arc<dyn TableProvider>>;
328}
329
330pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
331
332/// A dummy catalog manager that always returns empty results.
333///
334/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode.
335pub struct DummyCatalogManager;
336
337impl DummyCatalogManager {
338    /// Returns a new `CatalogManagerRef` instance.
339    pub fn arc() -> CatalogManagerRef {
340        Arc::new(Self)
341    }
342}
343
344#[async_trait::async_trait]
345impl CatalogManager for DummyCatalogManager {
346    fn as_any(&self) -> &dyn Any {
347        self
348    }
349
350    async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
351        Ok(vec![])
352    }
353
354    async fn schema_names(
355        &self,
356        _catalog: &str,
357        _query_ctx: Option<&QueryContext>,
358    ) -> CatalogResult<Vec<String>> {
359        Ok(vec![])
360    }
361
362    async fn table_names(
363        &self,
364        _catalog: &str,
365        _schema: &str,
366        _query_ctx: Option<&QueryContext>,
367    ) -> CatalogResult<Vec<String>> {
368        Ok(vec![])
369    }
370
371    async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
372        Ok(false)
373    }
374
375    async fn schema_exists(
376        &self,
377        _catalog: &str,
378        _schema: &str,
379        _query_ctx: Option<&QueryContext>,
380    ) -> CatalogResult<bool> {
381        Ok(false)
382    }
383
384    async fn table_exists(
385        &self,
386        _catalog: &str,
387        _schema: &str,
388        _table: &str,
389        _query_ctx: Option<&QueryContext>,
390    ) -> CatalogResult<bool> {
391        Ok(false)
392    }
393
394    async fn table(
395        &self,
396        _catalog: &str,
397        _schema: &str,
398        _table_name: &str,
399        _query_ctx: Option<&QueryContext>,
400    ) -> CatalogResult<Option<TableRef>> {
401        Ok(None)
402    }
403
404    async fn table_id(
405        &self,
406        _catalog: &str,
407        _schema: &str,
408        _table_name: &str,
409        _query_ctx: Option<&QueryContext>,
410    ) -> CatalogResult<Option<TableId>> {
411        Ok(None)
412    }
413
414    async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
415        Ok(None)
416    }
417
418    async fn tables_by_ids(
419        &self,
420        _catalog: &str,
421        _schema: &str,
422        _table_ids: &[TableId],
423    ) -> CatalogResult<Vec<TableRef>> {
424        Ok(vec![])
425    }
426
427    fn tables<'a>(
428        &'a self,
429        _catalog: &'a str,
430        _schema: &'a str,
431        _query_ctx: Option<&'a QueryContext>,
432    ) -> BoxStream<'a, CatalogResult<TableRef>> {
433        Box::pin(futures::stream::empty())
434    }
435}