query/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_recordbatch::OrderOption;
25use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
26use datafusion::datasource::TableProvider;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion_common::DataFusionError;
29use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
30use datatypes::arrow::datatypes::SchemaRef;
31use snafu::ResultExt;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::RegionEngineRef;
34use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
35use table::table::scan::RegionScanExec;
36
37use crate::error::{GetRegionMetadataSnafu, Result};
38
39/// Resolve to the given region (specified by [RegionId]) unconditionally.
40#[derive(Clone, Debug)]
41pub struct DummyCatalogList {
42    catalog: DummyCatalogProvider,
43}
44
45impl DummyCatalogList {
46    /// Creates a new catalog list with the given table provider.
47    pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
48        let schema_provider = DummySchemaProvider {
49            table: table_provider,
50        };
51        let catalog_provider = DummyCatalogProvider {
52            schema: schema_provider,
53        };
54        Self {
55            catalog: catalog_provider,
56        }
57    }
58}
59
60impl CatalogProviderList for DummyCatalogList {
61    fn as_any(&self) -> &dyn Any {
62        self
63    }
64
65    fn register_catalog(
66        &self,
67        _name: String,
68        _catalog: Arc<dyn CatalogProvider>,
69    ) -> Option<Arc<dyn CatalogProvider>> {
70        None
71    }
72
73    fn catalog_names(&self) -> Vec<String> {
74        vec![]
75    }
76
77    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
78        Some(Arc::new(self.catalog.clone()))
79    }
80}
81
82/// A dummy catalog provider for [DummyCatalogList].
83#[derive(Clone, Debug)]
84struct DummyCatalogProvider {
85    schema: DummySchemaProvider,
86}
87
88impl CatalogProvider for DummyCatalogProvider {
89    fn as_any(&self) -> &dyn Any {
90        self
91    }
92
93    fn schema_names(&self) -> Vec<String> {
94        vec![]
95    }
96
97    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
98        Some(Arc::new(self.schema.clone()))
99    }
100}
101
102/// A dummy schema provider for [DummyCatalogList].
103#[derive(Clone, Debug)]
104struct DummySchemaProvider {
105    table: Arc<dyn TableProvider>,
106}
107
108#[async_trait]
109impl SchemaProvider for DummySchemaProvider {
110    fn as_any(&self) -> &dyn Any {
111        self
112    }
113
114    fn table_names(&self) -> Vec<String> {
115        vec![]
116    }
117
118    async fn table(
119        &self,
120        _name: &str,
121    ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
122        Ok(Some(self.table.clone()))
123    }
124
125    fn table_exist(&self, _name: &str) -> bool {
126        true
127    }
128}
129
130/// For [TableProvider] and [DummyCatalogList]
131#[derive(Clone)]
132pub struct DummyTableProvider {
133    region_id: RegionId,
134    engine: RegionEngineRef,
135    metadata: RegionMetadataRef,
136    /// Keeping a mutable request makes it possible to change in the optimize phase.
137    scan_request: Arc<Mutex<ScanRequest>>,
138}
139
140impl fmt::Debug for DummyTableProvider {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        f.debug_struct("DummyTableProvider")
143            .field("region_id", &self.region_id)
144            .field("metadata", &self.metadata)
145            .field("scan_request", &self.scan_request)
146            .finish()
147    }
148}
149
150#[async_trait]
151impl TableProvider for DummyTableProvider {
152    fn as_any(&self) -> &dyn Any {
153        self
154    }
155
156    fn schema(&self) -> SchemaRef {
157        self.metadata.schema.arrow_schema().clone()
158    }
159
160    fn table_type(&self) -> TableType {
161        TableType::Base
162    }
163
164    async fn scan(
165        &self,
166        _state: &dyn Session,
167        projection: Option<&Vec<usize>>,
168        filters: &[Expr],
169        limit: Option<usize>,
170    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
171        let mut request = self.scan_request.lock().unwrap().clone();
172        request.projection = projection.cloned();
173        request.filters = filters.to_vec();
174        request.limit = limit;
175
176        let scanner = self
177            .engine
178            .handle_query(self.region_id, request.clone())
179            .await
180            .map_err(|e| DataFusionError::External(Box::new(e)))?;
181        Ok(Arc::new(RegionScanExec::new(scanner, request)?))
182    }
183
184    fn supports_filters_pushdown(
185        &self,
186        filters: &[&Expr],
187    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
188        let supported = filters
189            .iter()
190            .map(|e| {
191                // Simple filter on primary key columns are precisely evaluated.
192                if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
193                    if self
194                        .metadata
195                        .column_by_name(simple_filter.column_name())
196                        .and_then(|c| {
197                            (c.semantic_type == SemanticType::Tag
198                                || c.semantic_type == SemanticType::Timestamp)
199                                .then_some(())
200                        })
201                        .is_some()
202                    {
203                        TableProviderFilterPushDown::Exact
204                    } else {
205                        TableProviderFilterPushDown::Inexact
206                    }
207                } else {
208                    TableProviderFilterPushDown::Inexact
209                }
210            })
211            .collect();
212        Ok(supported)
213    }
214}
215
216impl DummyTableProvider {
217    /// Creates a new provider.
218    pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
219        Self {
220            region_id,
221            engine,
222            metadata,
223            scan_request: Default::default(),
224        }
225    }
226
227    pub fn region_metadata(&self) -> RegionMetadataRef {
228        self.metadata.clone()
229    }
230
231    /// Sets the ordering hint of the query to the provider.
232    pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
233        self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
234    }
235
236    /// Sets the distribution hint of the query to the provider.
237    pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
238        self.scan_request.lock().unwrap().distribution = Some(distribution);
239    }
240
241    /// Sets the time series selector hint of the query to the provider.
242    pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
243        self.scan_request.lock().unwrap().series_row_selector = Some(selector);
244    }
245
246    pub fn with_sequence(&self, sequence: u64) {
247        self.scan_request.lock().unwrap().sequence = Some(sequence);
248    }
249
250    /// Gets the scan request of the provider.
251    #[cfg(test)]
252    pub fn scan_request(&self) -> ScanRequest {
253        self.scan_request.lock().unwrap().clone()
254    }
255}
256
257pub struct DummyTableProviderFactory;
258
259#[async_trait]
260impl TableProviderFactory for DummyTableProviderFactory {
261    async fn create(
262        &self,
263        region_id: RegionId,
264        engine: RegionEngineRef,
265        ctx: Option<&session::context::QueryContext>,
266    ) -> Result<Arc<dyn TableProvider>> {
267        let metadata =
268            engine
269                .get_metadata(region_id)
270                .await
271                .with_context(|_| GetRegionMetadataSnafu {
272                    engine: engine.name(),
273                    region_id,
274                })?;
275
276        let scan_request = ctx
277            .and_then(|c| c.get_snapshot(region_id.as_u64()))
278            .map(|seq| ScanRequest {
279                sequence: Some(seq),
280                ..Default::default()
281            })
282            .unwrap_or_default();
283
284        Ok(Arc::new(DummyTableProvider {
285            region_id,
286            engine,
287            metadata,
288            scan_request: Arc::new(Mutex::new(scan_request)),
289        }))
290    }
291}
292
293#[async_trait]
294pub trait TableProviderFactory: Send + Sync {
295    async fn create(
296        &self,
297        region_id: RegionId,
298        engine: RegionEngineRef,
299        ctx: Option<&session::context::QueryContext>,
300    ) -> Result<Arc<dyn TableProvider>>;
301}
302
303pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;