1use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use catalog::error::Result as CatalogResult;
24use catalog::{CatalogManager, CatalogManagerRef};
25use common_recordbatch::OrderOption;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
28use datafusion::datasource::TableProvider;
29use datafusion::physical_plan::ExecutionPlan;
30use datafusion_common::DataFusionError;
31use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
32use datatypes::arrow::datatypes::SchemaRef;
33use futures::stream::BoxStream;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::RegionEngineRef;
38use store_api::storage::{
39 RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorSearchRequest,
40};
41use table::TableRef;
42use table::metadata::{TableId, TableInfoRef};
43use table::table::scan::RegionScanExec;
44
45use crate::error::{GetRegionMetadataSnafu, Result};
46
47#[derive(Clone, Debug)]
49pub struct DummyCatalogList {
50 catalog: DummyCatalogProvider,
51}
52
53impl DummyCatalogList {
54 pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
56 let schema_provider = DummySchemaProvider {
57 table: table_provider,
58 };
59 let catalog_provider = DummyCatalogProvider {
60 schema: schema_provider,
61 };
62 Self {
63 catalog: catalog_provider,
64 }
65 }
66}
67
68impl CatalogProviderList for DummyCatalogList {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn register_catalog(
74 &self,
75 _name: String,
76 _catalog: Arc<dyn CatalogProvider>,
77 ) -> Option<Arc<dyn CatalogProvider>> {
78 None
79 }
80
81 fn catalog_names(&self) -> Vec<String> {
82 vec![]
83 }
84
85 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
86 Some(Arc::new(self.catalog.clone()))
87 }
88}
89
90#[derive(Clone, Debug)]
92struct DummyCatalogProvider {
93 schema: DummySchemaProvider,
94}
95
96impl CatalogProvider for DummyCatalogProvider {
97 fn as_any(&self) -> &dyn Any {
98 self
99 }
100
101 fn schema_names(&self) -> Vec<String> {
102 vec![]
103 }
104
105 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
106 Some(Arc::new(self.schema.clone()))
107 }
108}
109
110#[derive(Clone, Debug)]
112struct DummySchemaProvider {
113 table: Arc<dyn TableProvider>,
114}
115
116#[async_trait]
117impl SchemaProvider for DummySchemaProvider {
118 fn as_any(&self) -> &dyn Any {
119 self
120 }
121
122 fn table_names(&self) -> Vec<String> {
123 vec![]
124 }
125
126 async fn table(
127 &self,
128 _name: &str,
129 ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
130 Ok(Some(self.table.clone()))
131 }
132
133 fn table_exist(&self, _name: &str) -> bool {
134 true
135 }
136}
137
138#[derive(Clone)]
140pub struct DummyTableProvider {
141 region_id: RegionId,
142 engine: RegionEngineRef,
143 metadata: RegionMetadataRef,
144 scan_request: Arc<Mutex<ScanRequest>>,
146 query_ctx: Option<QueryContextRef>,
147}
148
149impl fmt::Debug for DummyTableProvider {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 f.debug_struct("DummyTableProvider")
152 .field("region_id", &self.region_id)
153 .field("metadata", &self.metadata)
154 .field("scan_request", &self.scan_request)
155 .finish()
156 }
157}
158
159#[async_trait]
160impl TableProvider for DummyTableProvider {
161 fn as_any(&self) -> &dyn Any {
162 self
163 }
164
165 fn schema(&self) -> SchemaRef {
166 self.metadata.schema.arrow_schema().clone()
167 }
168
169 fn table_type(&self) -> TableType {
170 TableType::Base
171 }
172
173 async fn scan(
174 &self,
175 _state: &dyn Session,
176 projection: Option<&Vec<usize>>,
177 filters: &[Expr],
178 limit: Option<usize>,
179 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
180 let mut request = self.scan_request.lock().unwrap().clone();
181 request.projection = projection.cloned();
182 request.filters = filters.to_vec();
183 request.limit = limit;
184
185 let scanner = self
186 .engine
187 .handle_query(self.region_id, request.clone())
188 .await
189 .map_err(|e| DataFusionError::External(Box::new(e)))?;
190 let query_memory_permit = self.engine.register_query_memory_permit();
191 let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
192 if let Some(query_ctx) = &self.query_ctx {
193 scan_exec.set_explain_verbose(query_ctx.explain_verbose());
194 }
195 Ok(Arc::new(scan_exec))
196 }
197
198 fn supports_filters_pushdown(
199 &self,
200 filters: &[&Expr],
201 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
202 let supported = filters
203 .iter()
204 .map(|e| {
205 if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
207 if self
208 .metadata
209 .column_by_name(simple_filter.column_name())
210 .and_then(|c| {
211 (c.semantic_type == SemanticType::Tag
212 || c.semantic_type == SemanticType::Timestamp)
213 .then_some(())
214 })
215 .is_some()
216 {
217 TableProviderFilterPushDown::Exact
218 } else {
219 TableProviderFilterPushDown::Inexact
220 }
221 } else {
222 TableProviderFilterPushDown::Inexact
223 }
224 })
225 .collect();
226 Ok(supported)
227 }
228}
229
230impl DummyTableProvider {
231 pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
233 Self {
234 region_id,
235 engine,
236 metadata,
237 scan_request: Default::default(),
238 query_ctx: None,
239 }
240 }
241
242 pub fn region_metadata(&self) -> RegionMetadataRef {
243 self.metadata.clone()
244 }
245
246 pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
248 self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
249 }
250
251 pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
253 self.scan_request.lock().unwrap().distribution = Some(distribution);
254 }
255
256 pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
258 self.scan_request.lock().unwrap().series_row_selector = Some(selector);
259 }
260
261 pub fn with_vector_search_hint(&self, hint: VectorSearchRequest) {
262 self.scan_request.lock().unwrap().vector_search = Some(hint);
263 }
264
265 pub fn get_vector_search_hint(&self) -> Option<VectorSearchRequest> {
266 self.scan_request.lock().unwrap().vector_search.clone()
267 }
268
269 pub fn with_sequence(&self, sequence: u64) {
270 self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
271 }
272
273 #[cfg(test)]
275 pub fn scan_request(&self) -> ScanRequest {
276 self.scan_request.lock().unwrap().clone()
277 }
278}
279
280pub struct DummyTableProviderFactory;
281
282impl DummyTableProviderFactory {
283 pub async fn create_table_provider(
284 &self,
285 region_id: RegionId,
286 engine: RegionEngineRef,
287 query_ctx: Option<QueryContextRef>,
288 ) -> Result<DummyTableProvider> {
289 let metadata =
290 engine
291 .get_metadata(region_id)
292 .await
293 .with_context(|_| GetRegionMetadataSnafu {
294 engine: engine.name(),
295 region_id,
296 })?;
297
298 let scan_request = query_ctx
299 .as_ref()
300 .map(|ctx| ScanRequest {
301 memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
302 sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
303 ..Default::default()
304 })
305 .unwrap_or_default();
306
307 Ok(DummyTableProvider {
308 region_id,
309 engine,
310 metadata,
311 scan_request: Arc::new(Mutex::new(scan_request)),
312 query_ctx,
313 })
314 }
315}
316
317#[async_trait]
318impl TableProviderFactory for DummyTableProviderFactory {
319 async fn create(
320 &self,
321 region_id: RegionId,
322 engine: RegionEngineRef,
323 ctx: Option<QueryContextRef>,
324 ) -> Result<Arc<dyn TableProvider>> {
325 let provider = self.create_table_provider(region_id, engine, ctx).await?;
326 Ok(Arc::new(provider))
327 }
328}
329
330#[async_trait]
331pub trait TableProviderFactory: Send + Sync {
332 async fn create(
333 &self,
334 region_id: RegionId,
335 engine: RegionEngineRef,
336 ctx: Option<QueryContextRef>,
337 ) -> Result<Arc<dyn TableProvider>>;
338}
339
340pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
341
342pub struct DummyCatalogManager;
346
347impl DummyCatalogManager {
348 pub fn arc() -> CatalogManagerRef {
350 Arc::new(Self)
351 }
352}
353
354#[async_trait::async_trait]
355impl CatalogManager for DummyCatalogManager {
356 fn as_any(&self) -> &dyn Any {
357 self
358 }
359
360 async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
361 Ok(vec![])
362 }
363
364 async fn schema_names(
365 &self,
366 _catalog: &str,
367 _query_ctx: Option<&QueryContext>,
368 ) -> CatalogResult<Vec<String>> {
369 Ok(vec![])
370 }
371
372 async fn table_names(
373 &self,
374 _catalog: &str,
375 _schema: &str,
376 _query_ctx: Option<&QueryContext>,
377 ) -> CatalogResult<Vec<String>> {
378 Ok(vec![])
379 }
380
381 async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
382 Ok(false)
383 }
384
385 async fn schema_exists(
386 &self,
387 _catalog: &str,
388 _schema: &str,
389 _query_ctx: Option<&QueryContext>,
390 ) -> CatalogResult<bool> {
391 Ok(false)
392 }
393
394 async fn table_exists(
395 &self,
396 _catalog: &str,
397 _schema: &str,
398 _table: &str,
399 _query_ctx: Option<&QueryContext>,
400 ) -> CatalogResult<bool> {
401 Ok(false)
402 }
403
404 async fn table(
405 &self,
406 _catalog: &str,
407 _schema: &str,
408 _table_name: &str,
409 _query_ctx: Option<&QueryContext>,
410 ) -> CatalogResult<Option<TableRef>> {
411 Ok(None)
412 }
413
414 async fn table_id(
415 &self,
416 _catalog: &str,
417 _schema: &str,
418 _table_name: &str,
419 _query_ctx: Option<&QueryContext>,
420 ) -> CatalogResult<Option<TableId>> {
421 Ok(None)
422 }
423
424 async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
425 Ok(None)
426 }
427
428 async fn tables_by_ids(
429 &self,
430 _catalog: &str,
431 _schema: &str,
432 _table_ids: &[TableId],
433 ) -> CatalogResult<Vec<TableRef>> {
434 Ok(vec![])
435 }
436
437 fn tables<'a>(
438 &'a self,
439 _catalog: &'a str,
440 _schema: &'a str,
441 _query_ctx: Option<&'a QueryContext>,
442 ) -> BoxStream<'a, CatalogResult<TableRef>> {
443 Box::pin(futures::stream::empty())
444 }
445}