1use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::OrderOption;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::metadata::{TableId, TableInfoRef};
40use table::table::scan::RegionScanExec;
41use table::TableRef;
42
43use crate::error::{GetRegionMetadataSnafu, Result};
44
45#[derive(Clone, Debug)]
47pub struct DummyCatalogList {
48 catalog: DummyCatalogProvider,
49}
50
51impl DummyCatalogList {
52 pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
54 let schema_provider = DummySchemaProvider {
55 table: table_provider,
56 };
57 let catalog_provider = DummyCatalogProvider {
58 schema: schema_provider,
59 };
60 Self {
61 catalog: catalog_provider,
62 }
63 }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn register_catalog(
72 &self,
73 _name: String,
74 _catalog: Arc<dyn CatalogProvider>,
75 ) -> Option<Arc<dyn CatalogProvider>> {
76 None
77 }
78
79 fn catalog_names(&self) -> Vec<String> {
80 vec![]
81 }
82
83 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
84 Some(Arc::new(self.catalog.clone()))
85 }
86}
87
88#[derive(Clone, Debug)]
90struct DummyCatalogProvider {
91 schema: DummySchemaProvider,
92}
93
94impl CatalogProvider for DummyCatalogProvider {
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98
99 fn schema_names(&self) -> Vec<String> {
100 vec![]
101 }
102
103 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
104 Some(Arc::new(self.schema.clone()))
105 }
106}
107
108#[derive(Clone, Debug)]
110struct DummySchemaProvider {
111 table: Arc<dyn TableProvider>,
112}
113
114#[async_trait]
115impl SchemaProvider for DummySchemaProvider {
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn table_names(&self) -> Vec<String> {
121 vec![]
122 }
123
124 async fn table(
125 &self,
126 _name: &str,
127 ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
128 Ok(Some(self.table.clone()))
129 }
130
131 fn table_exist(&self, _name: &str) -> bool {
132 true
133 }
134}
135
136#[derive(Clone)]
138pub struct DummyTableProvider {
139 region_id: RegionId,
140 engine: RegionEngineRef,
141 metadata: RegionMetadataRef,
142 scan_request: Arc<Mutex<ScanRequest>>,
144 query_ctx: Option<QueryContextRef>,
145}
146
147impl fmt::Debug for DummyTableProvider {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("DummyTableProvider")
150 .field("region_id", &self.region_id)
151 .field("metadata", &self.metadata)
152 .field("scan_request", &self.scan_request)
153 .finish()
154 }
155}
156
157#[async_trait]
158impl TableProvider for DummyTableProvider {
159 fn as_any(&self) -> &dyn Any {
160 self
161 }
162
163 fn schema(&self) -> SchemaRef {
164 self.metadata.schema.arrow_schema().clone()
165 }
166
167 fn table_type(&self) -> TableType {
168 TableType::Base
169 }
170
171 async fn scan(
172 &self,
173 _state: &dyn Session,
174 projection: Option<&Vec<usize>>,
175 filters: &[Expr],
176 limit: Option<usize>,
177 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
178 let mut request = self.scan_request.lock().unwrap().clone();
179 request.projection = projection.cloned();
180 request.filters = filters.to_vec();
181 request.limit = limit;
182
183 let scanner = self
184 .engine
185 .handle_query(self.region_id, request.clone())
186 .await
187 .map_err(|e| DataFusionError::External(Box::new(e)))?;
188 let mut scan_exec = RegionScanExec::new(scanner, request)?;
189 if let Some(query_ctx) = &self.query_ctx {
190 scan_exec.set_explain_verbose(query_ctx.explain_verbose());
191 }
192 Ok(Arc::new(scan_exec))
193 }
194
195 fn supports_filters_pushdown(
196 &self,
197 filters: &[&Expr],
198 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
199 let supported = filters
200 .iter()
201 .map(|e| {
202 if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
204 if self
205 .metadata
206 .column_by_name(simple_filter.column_name())
207 .and_then(|c| {
208 (c.semantic_type == SemanticType::Tag
209 || c.semantic_type == SemanticType::Timestamp)
210 .then_some(())
211 })
212 .is_some()
213 {
214 TableProviderFilterPushDown::Exact
215 } else {
216 TableProviderFilterPushDown::Inexact
217 }
218 } else {
219 TableProviderFilterPushDown::Inexact
220 }
221 })
222 .collect();
223 Ok(supported)
224 }
225}
226
227impl DummyTableProvider {
228 pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
230 Self {
231 region_id,
232 engine,
233 metadata,
234 scan_request: Default::default(),
235 query_ctx: None,
236 }
237 }
238
239 pub fn region_metadata(&self) -> RegionMetadataRef {
240 self.metadata.clone()
241 }
242
243 pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
245 self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
246 }
247
248 pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
250 self.scan_request.lock().unwrap().distribution = Some(distribution);
251 }
252
253 pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
255 self.scan_request.lock().unwrap().series_row_selector = Some(selector);
256 }
257
258 pub fn with_sequence(&self, sequence: u64) {
259 self.scan_request.lock().unwrap().sequence = Some(sequence);
260 }
261
262 #[cfg(test)]
264 pub fn scan_request(&self) -> ScanRequest {
265 self.scan_request.lock().unwrap().clone()
266 }
267}
268
269pub struct DummyTableProviderFactory;
270
271impl DummyTableProviderFactory {
272 pub async fn create_table_provider(
273 &self,
274 region_id: RegionId,
275 engine: RegionEngineRef,
276 query_ctx: Option<QueryContextRef>,
277 ) -> Result<DummyTableProvider> {
278 let metadata =
279 engine
280 .get_metadata(region_id)
281 .await
282 .with_context(|_| GetRegionMetadataSnafu {
283 engine: engine.name(),
284 region_id,
285 })?;
286
287 let scan_request = query_ctx
288 .as_ref()
289 .map(|ctx| ScanRequest {
290 sequence: ctx.get_snapshot(region_id.as_u64()),
291 sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
292 ..Default::default()
293 })
294 .unwrap_or_default();
295
296 Ok(DummyTableProvider {
297 region_id,
298 engine,
299 metadata,
300 scan_request: Arc::new(Mutex::new(scan_request)),
301 query_ctx,
302 })
303 }
304}
305
306#[async_trait]
307impl TableProviderFactory for DummyTableProviderFactory {
308 async fn create(
309 &self,
310 region_id: RegionId,
311 engine: RegionEngineRef,
312 ctx: Option<QueryContextRef>,
313 ) -> Result<Arc<dyn TableProvider>> {
314 let provider = self.create_table_provider(region_id, engine, ctx).await?;
315 Ok(Arc::new(provider))
316 }
317}
318
319#[async_trait]
320pub trait TableProviderFactory: Send + Sync {
321 async fn create(
322 &self,
323 region_id: RegionId,
324 engine: RegionEngineRef,
325 ctx: Option<QueryContextRef>,
326 ) -> Result<Arc<dyn TableProvider>>;
327}
328
329pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
330
331pub struct DummyCatalogManager;
335
336impl DummyCatalogManager {
337 pub fn arc() -> CatalogManagerRef {
339 Arc::new(Self)
340 }
341}
342
343#[async_trait::async_trait]
344impl CatalogManager for DummyCatalogManager {
345 fn as_any(&self) -> &dyn Any {
346 self
347 }
348
349 async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
350 Ok(vec![])
351 }
352
353 async fn schema_names(
354 &self,
355 _catalog: &str,
356 _query_ctx: Option<&QueryContext>,
357 ) -> CatalogResult<Vec<String>> {
358 Ok(vec![])
359 }
360
361 async fn table_names(
362 &self,
363 _catalog: &str,
364 _schema: &str,
365 _query_ctx: Option<&QueryContext>,
366 ) -> CatalogResult<Vec<String>> {
367 Ok(vec![])
368 }
369
370 async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
371 Ok(false)
372 }
373
374 async fn schema_exists(
375 &self,
376 _catalog: &str,
377 _schema: &str,
378 _query_ctx: Option<&QueryContext>,
379 ) -> CatalogResult<bool> {
380 Ok(false)
381 }
382
383 async fn table_exists(
384 &self,
385 _catalog: &str,
386 _schema: &str,
387 _table: &str,
388 _query_ctx: Option<&QueryContext>,
389 ) -> CatalogResult<bool> {
390 Ok(false)
391 }
392
393 async fn table(
394 &self,
395 _catalog: &str,
396 _schema: &str,
397 _table_name: &str,
398 _query_ctx: Option<&QueryContext>,
399 ) -> CatalogResult<Option<TableRef>> {
400 Ok(None)
401 }
402
403 async fn table_id(
404 &self,
405 _catalog: &str,
406 _schema: &str,
407 _table_name: &str,
408 _query_ctx: Option<&QueryContext>,
409 ) -> CatalogResult<Option<TableId>> {
410 Ok(None)
411 }
412
413 async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
414 Ok(None)
415 }
416
417 async fn tables_by_ids(
418 &self,
419 _catalog: &str,
420 _schema: &str,
421 _table_ids: &[TableId],
422 ) -> CatalogResult<Vec<TableRef>> {
423 Ok(vec![])
424 }
425
426 fn tables<'a>(
427 &'a self,
428 _catalog: &'a str,
429 _schema: &'a str,
430 _query_ctx: Option<&'a QueryContext>,
431 ) -> BoxStream<'a, CatalogResult<TableRef>> {
432 Box::pin(futures::stream::empty())
433 }
434}