1use std::any::Any;
18use std::fmt;
19use std::sync::{Arc, Mutex};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_recordbatch::OrderOption;
25use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
26use datafusion::datasource::TableProvider;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion_common::DataFusionError;
29use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
30use datatypes::arrow::datatypes::SchemaRef;
31use snafu::ResultExt;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::RegionEngineRef;
34use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
35use table::table::scan::RegionScanExec;
36
37use crate::error::{GetRegionMetadataSnafu, Result};
38
39#[derive(Clone, Debug)]
41pub struct DummyCatalogList {
42 catalog: DummyCatalogProvider,
43}
44
45impl DummyCatalogList {
46 pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
48 let schema_provider = DummySchemaProvider {
49 table: table_provider,
50 };
51 let catalog_provider = DummyCatalogProvider {
52 schema: schema_provider,
53 };
54 Self {
55 catalog: catalog_provider,
56 }
57 }
58}
59
60impl CatalogProviderList for DummyCatalogList {
61 fn as_any(&self) -> &dyn Any {
62 self
63 }
64
65 fn register_catalog(
66 &self,
67 _name: String,
68 _catalog: Arc<dyn CatalogProvider>,
69 ) -> Option<Arc<dyn CatalogProvider>> {
70 None
71 }
72
73 fn catalog_names(&self) -> Vec<String> {
74 vec![]
75 }
76
77 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
78 Some(Arc::new(self.catalog.clone()))
79 }
80}
81
82#[derive(Clone, Debug)]
84struct DummyCatalogProvider {
85 schema: DummySchemaProvider,
86}
87
88impl CatalogProvider for DummyCatalogProvider {
89 fn as_any(&self) -> &dyn Any {
90 self
91 }
92
93 fn schema_names(&self) -> Vec<String> {
94 vec![]
95 }
96
97 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
98 Some(Arc::new(self.schema.clone()))
99 }
100}
101
102#[derive(Clone, Debug)]
104struct DummySchemaProvider {
105 table: Arc<dyn TableProvider>,
106}
107
108#[async_trait]
109impl SchemaProvider for DummySchemaProvider {
110 fn as_any(&self) -> &dyn Any {
111 self
112 }
113
114 fn table_names(&self) -> Vec<String> {
115 vec![]
116 }
117
118 async fn table(
119 &self,
120 _name: &str,
121 ) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
122 Ok(Some(self.table.clone()))
123 }
124
125 fn table_exist(&self, _name: &str) -> bool {
126 true
127 }
128}
129
130#[derive(Clone)]
132pub struct DummyTableProvider {
133 region_id: RegionId,
134 engine: RegionEngineRef,
135 metadata: RegionMetadataRef,
136 scan_request: Arc<Mutex<ScanRequest>>,
138}
139
140impl fmt::Debug for DummyTableProvider {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_struct("DummyTableProvider")
143 .field("region_id", &self.region_id)
144 .field("metadata", &self.metadata)
145 .field("scan_request", &self.scan_request)
146 .finish()
147 }
148}
149
150#[async_trait]
151impl TableProvider for DummyTableProvider {
152 fn as_any(&self) -> &dyn Any {
153 self
154 }
155
156 fn schema(&self) -> SchemaRef {
157 self.metadata.schema.arrow_schema().clone()
158 }
159
160 fn table_type(&self) -> TableType {
161 TableType::Base
162 }
163
164 async fn scan(
165 &self,
166 _state: &dyn Session,
167 projection: Option<&Vec<usize>>,
168 filters: &[Expr],
169 limit: Option<usize>,
170 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
171 let mut request = self.scan_request.lock().unwrap().clone();
172 request.projection = projection.cloned();
173 request.filters = filters.to_vec();
174 request.limit = limit;
175
176 let scanner = self
177 .engine
178 .handle_query(self.region_id, request.clone())
179 .await
180 .map_err(|e| DataFusionError::External(Box::new(e)))?;
181 Ok(Arc::new(RegionScanExec::new(scanner, request)?))
182 }
183
184 fn supports_filters_pushdown(
185 &self,
186 filters: &[&Expr],
187 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
188 let supported = filters
189 .iter()
190 .map(|e| {
191 if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
193 if self
194 .metadata
195 .column_by_name(simple_filter.column_name())
196 .and_then(|c| {
197 (c.semantic_type == SemanticType::Tag
198 || c.semantic_type == SemanticType::Timestamp)
199 .then_some(())
200 })
201 .is_some()
202 {
203 TableProviderFilterPushDown::Exact
204 } else {
205 TableProviderFilterPushDown::Inexact
206 }
207 } else {
208 TableProviderFilterPushDown::Inexact
209 }
210 })
211 .collect();
212 Ok(supported)
213 }
214}
215
216impl DummyTableProvider {
217 pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
219 Self {
220 region_id,
221 engine,
222 metadata,
223 scan_request: Default::default(),
224 }
225 }
226
227 pub fn region_metadata(&self) -> RegionMetadataRef {
228 self.metadata.clone()
229 }
230
231 pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
233 self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
234 }
235
236 pub fn with_distribution(&self, distribution: TimeSeriesDistribution) {
238 self.scan_request.lock().unwrap().distribution = Some(distribution);
239 }
240
241 pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
243 self.scan_request.lock().unwrap().series_row_selector = Some(selector);
244 }
245
246 pub fn with_sequence(&self, sequence: u64) {
247 self.scan_request.lock().unwrap().sequence = Some(sequence);
248 }
249
250 #[cfg(test)]
252 pub fn scan_request(&self) -> ScanRequest {
253 self.scan_request.lock().unwrap().clone()
254 }
255}
256
257pub struct DummyTableProviderFactory;
258
259#[async_trait]
260impl TableProviderFactory for DummyTableProviderFactory {
261 async fn create(
262 &self,
263 region_id: RegionId,
264 engine: RegionEngineRef,
265 ctx: Option<&session::context::QueryContext>,
266 ) -> Result<Arc<dyn TableProvider>> {
267 let metadata =
268 engine
269 .get_metadata(region_id)
270 .await
271 .with_context(|_| GetRegionMetadataSnafu {
272 engine: engine.name(),
273 region_id,
274 })?;
275
276 let scan_request = ctx
277 .and_then(|c| c.get_snapshot(region_id.as_u64()))
278 .map(|seq| ScanRequest {
279 sequence: Some(seq),
280 ..Default::default()
281 })
282 .unwrap_or_default();
283
284 Ok(Arc::new(DummyTableProvider {
285 region_id,
286 engine,
287 metadata,
288 scan_request: Arc::new(Mutex::new(scan_request)),
289 }))
290 }
291}
292
293#[async_trait]
294pub trait TableProviderFactory: Send + Sync {
295 async fn create(
296 &self,
297 region_id: RegionId,
298 engine: RegionEngineRef,
299 ctx: Option<&session::context::QueryContext>,
300 ) -> Result<Arc<dyn TableProvider>>;
301}
302
303pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;