query/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::OrderOption;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::metadata::{TableId, TableInfoRef};
40use table::table::scan::RegionScanExec;
41use table::TableRef;
42
43use crate::error::{GetRegionMetadataSnafu, Result};
44
45/// Resolve to the given region (specified by [RegionId]) unconditionally.
46#[derive(Clone, Debug)]
47pub struct DummyCatalogList {
48    catalog: DummyCatalogProvider,
49}
50
51impl DummyCatalogList {
52    /// Creates a new catalog list with the given table provider.
53    pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
54        let schema_provider = DummySchemaProvider {
55            table: table_provider,
56        };
57        let catalog_provider = DummyCatalogProvider {
58            schema: schema_provider,
59        };
60        Self {
61            catalog: catalog_provider,
62        }
63    }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    fn register_catalog(
72        &self,
73        _name: String,
74        _catalog: Arc<dyn CatalogProvider>,
75    ) -> Option<Arc<dyn CatalogProvider>> {
76        None
77    }
78
79    fn catalog_names(&self) -> Vec<String> {
80        vec![]
81    }
82
83    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
84        Some(Arc::new(self.catalog.clone()))
85    }
86}
87
88/// A dummy catalog provider for [DummyCatalogList].
89#[derive(Clone, Debug)]
90struct DummyCatalogProvider {
91    schema: DummySchemaProvider,
92}
93
94impl CatalogProvider for DummyCatalogProvider {
95    fn as_any(&self) -> &dyn Any {
96        self
97    }
98
99    fn schema_names(&self) -> Vec<String> {
100        vec![]
101    }
102
103    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
104        Some(Arc::new(self.schema.clone()))
105    }
106}
107
108/// A dummy schema provider for [DummyCatalogList].
109#[derive(Clone, Debug)]
110struct DummySchemaProvider {
111    table: Arc<dyn TableProvider>,
112}
113
114#[async_trait]
115impl SchemaProvider for DummySchemaProvider {
116    fn as_any(&self) -> &dyn Any {
117        self
118    }
119
120    fn table_names(&self) -> Vec<String> {
121        vec![]
122    }
123
124    async fn table(
125        &self,
126        _name: &str,
127    ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
128        Ok(Some(self.table.clone()))
129    }
130
131    fn table_exist(&self, _name: &str) -> bool {
132        true
133    }
134}
135
136/// For [TableProvider] and [DummyCatalogList]
137#[derive(Clone)]
138pub struct DummyTableProvider {
139    region_id: RegionId,
140    engine: RegionEngineRef,
141    metadata: RegionMetadataRef,
142    /// Keeping a mutable request makes it possible to change in the optimize phase.
143    scan_request: Arc<Mutex<ScanRequest>>,
144    query_ctx: Option<QueryContextRef>,
145}
146
147impl fmt::Debug for DummyTableProvider {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("DummyTableProvider")
150            .field("region_id", &self.region_id)
151            .field("metadata", &self.metadata)
152            .field("scan_request", &self.scan_request)
153            .finish()
154    }
155}
156
157#[async_trait]
158impl TableProvider for DummyTableProvider {
159    fn as_any(&self) -> &dyn Any {
160        self
161    }
162
163    fn schema(&self) -> SchemaRef {
164        self.metadata.schema.arrow_schema().clone()
165    }
166
167    fn table_type(&self) -> TableType {
168        TableType::Base
169    }
170
171    async fn scan(
172        &self,
173        _state: &dyn Session,
174        projection: Option<&Vec<usize>>,
175        filters: &[Expr],
176        limit: Option<usize>,
177    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
178        let mut request = self.scan_request.lock().unwrap().clone();
179        request.projection = projection.cloned();
180        request.filters = filters.to_vec();
181        request.limit = limit;
182
183        let scanner = self
184            .engine
185            .handle_query(self.region_id, request.clone())
186            .await
187            .map_err(|e| DataFusionError::External(Box::new(e)))?;
188        let mut scan_exec = RegionScanExec::new(scanner, request)?;
189        if let Some(query_ctx) = &self.query_ctx {
190            scan_exec.set_explain_verbose(query_ctx.explain_verbose());
191        }
192        Ok(Arc::new(scan_exec))
193    }
194
195    fn supports_filters_pushdown(
196        &self,
197        filters: &[&Expr],
198    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
199        let supported = filters
200            .iter()
201            .map(|e| {
202                // Simple filter on primary key columns are precisely evaluated.
203                if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
204                    if self
205                        .metadata
206                        .column_by_name(simple_filter.column_name())
207                        .and_then(|c| {
208                            (c.semantic_type == SemanticType::Tag
209                                || c.semantic_type == SemanticType::Timestamp)
210                                .then_some(())
211                        })
212                        .is_some()
213                    {
214                        TableProviderFilterPushDown::Exact
215                    } else {
216                        TableProviderFilterPushDown::Inexact
217                    }
218                } else {
219                    TableProviderFilterPushDown::Inexact
220                }
221            })
222            .collect();
223        Ok(supported)
224    }
225}
226
227impl DummyTableProvider {
228    /// Creates a new provider.
229    pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
230        Self {
231            region_id,
232            engine,
233            metadata,
234            scan_request: Default::default(),
235            query_ctx: None,
236        }
237    }
238
239    pub fn region_metadata(&self) -> RegionMetadataRef {
240        self.metadata.clone()
241    }
242
243    /// Sets the ordering hint of the query to the provider.
244    pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
245        self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
246    }
247
248    /// Sets the distribution hint of the query to the provider.
249    pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
250        self.scan_request.lock().unwrap().distribution = Some(distribution);
251    }
252
253    /// Sets the time series selector hint of the query to the provider.
254    pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
255        self.scan_request.lock().unwrap().series_row_selector = Some(selector);
256    }
257
258    pub fn with_sequence(&self, sequence: u64) {
259        self.scan_request.lock().unwrap().sequence = Some(sequence);
260    }
261
262    /// Gets the scan request of the provider.
263    #[cfg(test)]
264    pub fn scan_request(&self) -> ScanRequest {
265        self.scan_request.lock().unwrap().clone()
266    }
267}
268
269pub struct DummyTableProviderFactory;
270
271impl DummyTableProviderFactory {
272    pub async fn create_table_provider(
273        &self,
274        region_id: RegionId,
275        engine: RegionEngineRef,
276        query_ctx: Option<QueryContextRef>,
277    ) -> Result<DummyTableProvider> {
278        let metadata =
279            engine
280                .get_metadata(region_id)
281                .await
282                .with_context(|_| GetRegionMetadataSnafu {
283                    engine: engine.name(),
284                    region_id,
285                })?;
286
287        let scan_request = query_ctx
288            .as_ref()
289            .map(|ctx| ScanRequest {
290                sequence: ctx.get_snapshot(region_id.as_u64()),
291                sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
292                ..Default::default()
293            })
294            .unwrap_or_default();
295
296        Ok(DummyTableProvider {
297            region_id,
298            engine,
299            metadata,
300            scan_request: Arc::new(Mutex::new(scan_request)),
301            query_ctx,
302        })
303    }
304}
305
306#[async_trait]
307impl TableProviderFactory for DummyTableProviderFactory {
308    async fn create(
309        &self,
310        region_id: RegionId,
311        engine: RegionEngineRef,
312        ctx: Option<QueryContextRef>,
313    ) -> Result<Arc<dyn TableProvider>> {
314        let provider = self.create_table_provider(region_id, engine, ctx).await?;
315        Ok(Arc::new(provider))
316    }
317}
318
319#[async_trait]
320pub trait TableProviderFactory: Send + Sync {
321    async fn create(
322        &self,
323        region_id: RegionId,
324        engine: RegionEngineRef,
325        ctx: Option<QueryContextRef>,
326    ) -> Result<Arc<dyn TableProvider>>;
327}
328
329pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
330
331/// A dummy catalog manager that always returns empty results.
332///
333/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode.
334pub struct DummyCatalogManager;
335
336impl DummyCatalogManager {
337    /// Returns a new `CatalogManagerRef` instance.
338    pub fn arc() -> CatalogManagerRef {
339        Arc::new(Self)
340    }
341}
342
343#[async_trait::async_trait]
344impl CatalogManager for DummyCatalogManager {
345    fn as_any(&self) -> &dyn Any {
346        self
347    }
348
349    async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
350        Ok(vec![])
351    }
352
353    async fn schema_names(
354        &self,
355        _catalog: &str,
356        _query_ctx: Option<&QueryContext>,
357    ) -> CatalogResult<Vec<String>> {
358        Ok(vec![])
359    }
360
361    async fn table_names(
362        &self,
363        _catalog: &str,
364        _schema: &str,
365        _query_ctx: Option<&QueryContext>,
366    ) -> CatalogResult<Vec<String>> {
367        Ok(vec![])
368    }
369
370    async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
371        Ok(false)
372    }
373
374    async fn schema_exists(
375        &self,
376        _catalog: &str,
377        _schema: &str,
378        _query_ctx: Option<&QueryContext>,
379    ) -> CatalogResult<bool> {
380        Ok(false)
381    }
382
383    async fn table_exists(
384        &self,
385        _catalog: &str,
386        _schema: &str,
387        _table: &str,
388        _query_ctx: Option<&QueryContext>,
389    ) -> CatalogResult<bool> {
390        Ok(false)
391    }
392
393    async fn table(
394        &self,
395        _catalog: &str,
396        _schema: &str,
397        _table_name: &str,
398        _query_ctx: Option<&QueryContext>,
399    ) -> CatalogResult<Option<TableRef>> {
400        Ok(None)
401    }
402
403    async fn table_id(
404        &self,
405        _catalog: &str,
406        _schema: &str,
407        _table_name: &str,
408        _query_ctx: Option<&QueryContext>,
409    ) -> CatalogResult<Option<TableId>> {
410        Ok(None)
411    }
412
413    async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
414        Ok(None)
415    }
416
417    async fn tables_by_ids(
418        &self,
419        _catalog: &str,
420        _schema: &str,
421        _table_ids: &[TableId],
422    ) -> CatalogResult<Vec<TableRef>> {
423        Ok(vec![])
424    }
425
426    fn tables<'a>(
427        &'a self,
428        _catalog: &'a str,
429        _schema: &'a str,
430        _query_ctx: Option<&'a QueryContext>,
431    ) -> BoxStream<'a, CatalogResult<TableRef>> {
432        Box::pin(futures::stream::empty())
433    }
434}