1use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::TableRef;
40use table::metadata::{TableId, TableInfoRef};
41use table::table::scan::RegionScanExec;
42
43use crate::error::{GetRegionMetadataSnafu, Result};
44
45#[derive(Clone, Debug)]
47pub struct DummyCatalogList {
48 catalog: DummyCatalogProvider,
49}
50
51impl DummyCatalogList {
52 pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
54 let schema_provider = DummySchemaProvider {
55 table: table_provider,
56 };
57 let catalog_provider = DummyCatalogProvider {
58 schema: schema_provider,
59 };
60 Self {
61 catalog: catalog_provider,
62 }
63 }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn register_catalog(
72 &self,
73 _name: String,
74 _catalog: Arc<dyn CatalogProvider>,
75 ) -> Option<Arc<dyn CatalogProvider>> {
76 None
77 }
78
79 fn catalog_names(&self) -> Vec<String> {
80 vec![]
81 }
82
83 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
84 Some(Arc::new(self.catalog.clone()))
85 }
86}
87
88#[derive(Clone, Debug)]
90struct DummyCatalogProvider {
91 schema: DummySchemaProvider,
92}
93
94impl CatalogProvider for DummyCatalogProvider {
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98
99 fn schema_names(&self) -> Vec<String> {
100 vec![]
101 }
102
103 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
104 Some(Arc::new(self.schema.clone()))
105 }
106}
107
108#[derive(Clone, Debug)]
110struct DummySchemaProvider {
111 table: Arc<dyn TableProvider>,
112}
113
114#[async_trait]
115impl SchemaProvider for DummySchemaProvider {
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn table_names(&self) -> Vec<String> {
121 vec![]
122 }
123
124 async fn table(
125 &self,
126 _name: &str,
127 ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
128 Ok(Some(self.table.clone()))
129 }
130
131 fn table_exist(&self, _name: &str) -> bool {
132 true
133 }
134}
135
136#[derive(Clone)]
138pub struct DummyTableProvider {
139 region_id: RegionId,
140 engine: RegionEngineRef,
141 metadata: RegionMetadataRef,
142 scan_request: Arc<Mutex<ScanRequest>>,
144 query_ctx: Option<QueryContextRef>,
145}
146
147impl fmt::Debug for DummyTableProvider {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("DummyTableProvider")
150 .field("region_id", &self.region_id)
151 .field("metadata", &self.metadata)
152 .field("scan_request", &self.scan_request)
153 .finish()
154 }
155}
156
157#[async_trait]
158impl TableProvider for DummyTableProvider {
159 fn as_any(&self) -> &dyn Any {
160 self
161 }
162
163 fn schema(&self) -> SchemaRef {
164 self.metadata.schema.arrow_schema().clone()
165 }
166
167 fn table_type(&self) -> TableType {
168 TableType::Base
169 }
170
171 async fn scan(
172 &self,
173 _state: &dyn Session,
174 projection: Option<&Vec<usize>>,
175 filters: &[Expr],
176 limit: Option<usize>,
177 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
178 let mut request = self.scan_request.lock().unwrap().clone();
179 request.projection = projection.cloned();
180 request.filters = filters.to_vec();
181 request.limit = limit;
182
183 let scanner = self
184 .engine
185 .handle_query(self.region_id, request.clone())
186 .await
187 .map_err(|e| DataFusionError::External(Box::new(e)))?;
188 let query_memory_permit = self.engine.register_query_memory_permit();
189 let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
190 if let Some(query_ctx) = &self.query_ctx {
191 scan_exec.set_explain_verbose(query_ctx.explain_verbose());
192 }
193 Ok(Arc::new(scan_exec))
194 }
195
196 fn supports_filters_pushdown(
197 &self,
198 filters: &[&Expr],
199 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
200 let supported = filters
201 .iter()
202 .map(|e| {
203 if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
205 if self
206 .metadata
207 .column_by_name(simple_filter.column_name())
208 .and_then(|c| {
209 (c.semantic_type == SemanticType::Tag
210 || c.semantic_type == SemanticType::Timestamp)
211 .then_some(())
212 })
213 .is_some()
214 {
215 TableProviderFilterPushDown::Exact
216 } else {
217 TableProviderFilterPushDown::Inexact
218 }
219 } else {
220 TableProviderFilterPushDown::Inexact
221 }
222 })
223 .collect();
224 Ok(supported)
225 }
226}
227
228impl DummyTableProvider {
229 pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
231 Self {
232 region_id,
233 engine,
234 metadata,
235 scan_request: Default::default(),
236 query_ctx: None,
237 }
238 }
239
240 pub fn region_metadata(&self) -> RegionMetadataRef {
241 self.metadata.clone()
242 }
243
244 pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
246 self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
247 }
248
249 pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
251 self.scan_request.lock().unwrap().distribution = Some(distribution);
252 }
253
254 pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
256 self.scan_request.lock().unwrap().series_row_selector = Some(selector);
257 }
258
259 pub fn with_sequence(&self, sequence: u64) {
260 self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
261 }
262
263 #[cfg(test)]
265 pub fn scan_request(&self) -> ScanRequest {
266 self.scan_request.lock().unwrap().clone()
267 }
268}
269
270pub struct DummyTableProviderFactory;
271
272impl DummyTableProviderFactory {
273 pub async fn create_table_provider(
274 &self,
275 region_id: RegionId,
276 engine: RegionEngineRef,
277 query_ctx: Option<QueryContextRef>,
278 ) -> Result<DummyTableProvider> {
279 let metadata =
280 engine
281 .get_metadata(region_id)
282 .await
283 .with_context(|_| GetRegionMetadataSnafu {
284 engine: engine.name(),
285 region_id,
286 })?;
287
288 let scan_request = query_ctx
289 .as_ref()
290 .map(|ctx| ScanRequest {
291 memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
292 sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
293 ..Default::default()
294 })
295 .unwrap_or_default();
296
297 Ok(DummyTableProvider {
298 region_id,
299 engine,
300 metadata,
301 scan_request: Arc::new(Mutex::new(scan_request)),
302 query_ctx,
303 })
304 }
305}
306
307#[async_trait]
308impl TableProviderFactory for DummyTableProviderFactory {
309 async fn create(
310 &self,
311 region_id: RegionId,
312 engine: RegionEngineRef,
313 ctx: Option<QueryContextRef>,
314 ) -> Result<Arc<dyn TableProvider>> {
315 let provider = self.create_table_provider(region_id, engine, ctx).await?;
316 Ok(Arc::new(provider))
317 }
318}
319
320#[async_trait]
321pub trait TableProviderFactory: Send + Sync {
322 async fn create(
323 &self,
324 region_id: RegionId,
325 engine: RegionEngineRef,
326 ctx: Option<QueryContextRef>,
327 ) -> Result<Arc<dyn TableProvider>>;
328}
329
330pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
331
332pub struct DummyCatalogManager;
336
337impl DummyCatalogManager {
338 pub fn arc() -> CatalogManagerRef {
340 Arc::new(Self)
341 }
342}
343
344#[async_trait::async_trait]
345impl CatalogManager for DummyCatalogManager {
346 fn as_any(&self) -> &dyn Any {
347 self
348 }
349
350 async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
351 Ok(vec![])
352 }
353
354 async fn schema_names(
355 &self,
356 _catalog: &str,
357 _query_ctx: Option<&QueryContext>,
358 ) -> CatalogResult<Vec<String>> {
359 Ok(vec![])
360 }
361
362 async fn table_names(
363 &self,
364 _catalog: &str,
365 _schema: &str,
366 _query_ctx: Option<&QueryContext>,
367 ) -> CatalogResult<Vec<String>> {
368 Ok(vec![])
369 }
370
371 async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
372 Ok(false)
373 }
374
375 async fn schema_exists(
376 &self,
377 _catalog: &str,
378 _schema: &str,
379 _query_ctx: Option<&QueryContext>,
380 ) -> CatalogResult<bool> {
381 Ok(false)
382 }
383
384 async fn table_exists(
385 &self,
386 _catalog: &str,
387 _schema: &str,
388 _table: &str,
389 _query_ctx: Option<&QueryContext>,
390 ) -> CatalogResult<bool> {
391 Ok(false)
392 }
393
394 async fn table(
395 &self,
396 _catalog: &str,
397 _schema: &str,
398 _table_name: &str,
399 _query_ctx: Option<&QueryContext>,
400 ) -> CatalogResult<Option<TableRef>> {
401 Ok(None)
402 }
403
404 async fn table_id(
405 &self,
406 _catalog: &str,
407 _schema: &str,
408 _table_name: &str,
409 _query_ctx: Option<&QueryContext>,
410 ) -> CatalogResult<Option<TableId>> {
411 Ok(None)
412 }
413
414 async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
415 Ok(None)
416 }
417
418 async fn tables_by_ids(
419 &self,
420 _catalog: &str,
421 _schema: &str,
422 _table_ids: &[TableId],
423 ) -> CatalogResult<Vec<TableRef>> {
424 Ok(vec![])
425 }
426
427 fn tables<'a>(
428 &'a self,
429 _catalog: &'a str,
430 _schema: &'a str,
431 _query_ctx: Option<&'a QueryContext>,
432 ) -> BoxStream<'a, CatalogResult<TableRef>> {
433 Box::pin(futures::stream::empty())
434 }
435}