1use std::collections::HashMap;
16use std::sync::Arc;
17
18use datafusion::catalog::{
19 CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider,
20};
21use datafusion::datasource::provider_as_source;
22use datafusion::error as df_error;
23use datafusion::error::Result as DfResult;
24use datafusion_common::DataFusionError;
25use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
26use datafusion_expr::{LogicalPlan, TableSource};
27use futures::TryStreamExt;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
31use store_api::storage::RegionId;
32
33use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
34use crate::region_server::RegionServer;
35
36#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
39enum InternalTableKind {
40 InspectSstManifest,
41 InspectSstStorage,
42}
43
44impl InternalTableKind {
45 pub fn from_table_name(name: &str) -> Option<Self> {
47 if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) {
48 return Some(Self::InspectSstManifest);
49 }
50 if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
51 return Some(Self::InspectSstStorage);
52 }
53 None
54 }
55
56 pub async fn table_provider(&self, server: &RegionServer) -> Result<Arc<dyn TableProvider>> {
58 match self {
59 Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
60 Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
61 }
62 }
63}
64
65impl RegionServer {
66 pub async fn inspect_sst_manifest_provider(&self) -> Result<Arc<dyn TableProvider>> {
68 let mito = {
69 let guard = self.inner.mito_engine.read().unwrap();
70 guard.as_ref().cloned().context(UnexpectedSnafu {
71 violated: "mito engine not available",
72 })?
73 };
74
75 let entries = mito.all_ssts_from_manifest().await;
76 let schema = ManifestSstEntry::schema().arrow_schema().clone();
77 let batch = ManifestSstEntry::to_record_batch(&entries)
78 .map_err(DataFusionError::from)
79 .context(DataFusionSnafu)?;
80
81 let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
82 Ok(Arc::new(table))
83 }
84
85 pub async fn inspect_sst_storage_provider(&self) -> Result<Arc<dyn TableProvider>> {
87 let mito = {
88 let guard = self.inner.mito_engine.read().unwrap();
89 guard.as_ref().cloned().context(UnexpectedSnafu {
90 violated: "mito engine not available",
91 })?
92 };
93 let entries = mito
94 .all_ssts_from_storage()
95 .try_collect::<Vec<_>>()
96 .await
97 .context(ListStorageSstsSnafu)?;
98 let schema = StorageSstEntry::schema().arrow_schema().clone();
99 let batch = StorageSstEntry::to_record_batch(&entries)
100 .map_err(DataFusionError::from)
101 .context(DataFusionSnafu)?;
102
103 let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
104 Ok(Arc::new(table))
105 }
106}
107
108#[derive(Clone, Debug)]
112pub(crate) struct NameAwareCatalogList {
113 catalog: NameAwareCatalogProvider,
114}
115
116impl NameAwareCatalogList {
117 pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self {
119 let schema_provider = NameAwareSchemaProvider {
120 server,
121 region_id,
122 query_ctx,
123 };
124 let catalog = NameAwareCatalogProvider {
125 schema: schema_provider,
126 };
127 Self { catalog }
128 }
129}
130
131impl CatalogProviderList for NameAwareCatalogList {
132 fn as_any(&self) -> &dyn std::any::Any {
133 self
134 }
135 fn register_catalog(
136 &self,
137 _name: String,
138 _catalog: Arc<dyn CatalogProvider>,
139 ) -> Option<Arc<dyn CatalogProvider>> {
140 None
141 }
142 fn catalog_names(&self) -> Vec<String> {
143 vec![]
144 }
145 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
146 Some(Arc::new(self.catalog.clone()))
147 }
148}
149
150#[derive(Clone, Debug)]
151struct NameAwareCatalogProvider {
152 schema: NameAwareSchemaProvider,
153}
154
155impl CatalogProvider for NameAwareCatalogProvider {
156 fn as_any(&self) -> &dyn std::any::Any {
157 self
158 }
159 fn schema_names(&self) -> Vec<String> {
160 vec![]
161 }
162 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
163 Some(Arc::new(self.schema.clone()))
164 }
165}
166
167#[derive(Clone)]
168struct NameAwareSchemaProvider {
169 server: RegionServer,
170 region_id: RegionId,
171 query_ctx: QueryContextRef,
172}
173
174impl std::fmt::Debug for NameAwareSchemaProvider {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 write!(f, "NameAwareSchemaProvider")
177 }
178}
179
180#[async_trait::async_trait]
181impl SchemaProvider for NameAwareSchemaProvider {
182 fn as_any(&self) -> &dyn std::any::Any {
183 self
184 }
185 fn table_names(&self) -> Vec<String> {
186 vec![]
187 }
188
189 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
190 if let Some(kind) = InternalTableKind::from_table_name(name) {
192 return kind
193 .table_provider(&self.server)
194 .await
195 .map(Some)
196 .map_err(|e| df_error::DataFusionError::External(Box::new(e)));
197 }
198
199 let provider = self
201 .server
202 .table_provider(self.region_id, Some(self.query_ctx.clone()))
203 .await
204 .map_err(|e| df_error::DataFusionError::External(Box::new(e)))?;
205 Ok(Some(provider))
206 }
207
208 fn table_exist(&self, _name: &str) -> bool {
209 true
210 }
211}
212pub(crate) struct NameAwareDataSourceInjectorBuilder {
218 need_region_provider: bool,
220 reserved_table_needed: Vec<InternalTableKind>,
222}
223
224impl NameAwareDataSourceInjectorBuilder {
225 pub fn from_plan(plan: &LogicalPlan) -> DfResult<Self> {
228 let mut need_region_provider = false;
229 let mut reserved_table_needed = Vec::new();
230 plan.apply(|node| {
231 if let LogicalPlan::TableScan(ts) = node {
232 let name = ts.table_name.to_string();
233 if let Some(kind) = InternalTableKind::from_table_name(&name) {
234 if !reserved_table_needed.contains(&kind) {
235 reserved_table_needed.push(kind);
236 }
237 } else {
238 need_region_provider = true;
240 }
241 }
242 Ok(TreeNodeRecursion::Continue)
243 })?;
244
245 Ok(Self {
246 need_region_provider,
247 reserved_table_needed,
248 })
249 }
250
251 pub async fn build(
252 self,
253 server: &RegionServer,
254 region_id: RegionId,
255 query_ctx: QueryContextRef,
256 ) -> Result<NameAwareDataSourceInjector> {
257 let region = if self.need_region_provider {
258 let provider = server.table_provider(region_id, Some(query_ctx)).await?;
259 Some(provider_as_source(provider))
260 } else {
261 None
262 };
263
264 let mut reserved_sources = HashMap::new();
265 for kind in &self.reserved_table_needed {
266 let provider = kind.table_provider(server).await?;
267 reserved_sources.insert(*kind, provider_as_source(provider));
268 }
269
270 Ok(NameAwareDataSourceInjector {
271 reserved_sources,
272 region_source: region,
273 })
274 }
275}
276
277pub(crate) struct NameAwareDataSourceInjector {
280 reserved_sources: HashMap<InternalTableKind, Arc<dyn TableSource>>,
282 region_source: Option<Arc<dyn TableSource>>,
284}
285
286impl TreeNodeRewriter for NameAwareDataSourceInjector {
287 type Node = LogicalPlan;
288
289 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
290 Ok(match node {
291 LogicalPlan::TableScan(mut scan) => {
292 let name = scan.table_name.to_string();
293 if let Some(kind) = InternalTableKind::from_table_name(&name)
294 && let Some(source) = self.reserved_sources.get(&kind)
295 {
296 scan.source = source.clone();
298 } else {
299 let Some(region) = &self.region_source else {
300 return Err(datafusion::error::DataFusionError::Plan(
302 "region provider not available".to_string(),
303 ));
304 };
305 scan.source = region.clone();
307 }
308 Transformed::yes(LogicalPlan::TableScan(scan))
309 }
310 _ => Transformed::no(node),
311 })
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use std::sync::Arc;
318
319 use datafusion::catalog::MemTable as DfMemTable;
320 use datafusion_common::tree_node::TreeNode;
321 use datafusion_expr::{LogicalPlanBuilder, table_scan};
322 use datatypes::arrow::array::Int32Array;
323 use datatypes::arrow::datatypes::{DataType, Field, Schema};
324 use datatypes::arrow::record_batch::RecordBatch;
325
326 use super::*; fn test_schema() -> Schema {
329 Schema::new(vec![Field::new("a", DataType::Int32, true)])
330 }
331
332 fn empty_mem_table() -> Arc<DfMemTable> {
333 let schema = Arc::new(test_schema());
334 let batch = RecordBatch::try_new(
335 schema.clone(),
336 vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
337 )
338 .unwrap();
339 Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap())
340 }
341
342 #[test]
343 fn test_injector_builder_from_plan_flags() {
344 let schema = test_schema();
345 let reserved = ManifestSstEntry::reserved_table_name_for_inspection();
346 let plan1 = table_scan(Some(reserved), &schema, None)
348 .unwrap()
349 .build()
350 .unwrap();
351 let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap();
352 assert!(!b1.need_region_provider);
353 assert_eq!(
354 b1.reserved_table_needed,
355 vec![InternalTableKind::InspectSstManifest]
356 );
357
358 let plan2 = table_scan(Some("normal_table"), &schema, None)
360 .unwrap()
361 .build()
362 .unwrap();
363 let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap();
364 assert!(b2.need_region_provider);
365 assert!(b2.reserved_table_needed.is_empty());
366
367 let p_res = table_scan(Some(reserved), &schema, None)
369 .unwrap()
370 .build()
371 .unwrap();
372 let p_norm = table_scan(Some("normal_table"), &schema, None)
373 .unwrap()
374 .build()
375 .unwrap();
376 let plan3 = LogicalPlanBuilder::from(p_res)
377 .union(LogicalPlanBuilder::from(p_norm).build().unwrap())
378 .unwrap()
379 .build()
380 .unwrap();
381 let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap();
382 assert!(b3.need_region_provider);
383 assert_eq!(
384 b3.reserved_table_needed,
385 vec![InternalTableKind::InspectSstManifest]
386 );
387 }
388
389 #[test]
390 fn test_rewriter_replaces_with_reserved_source() {
391 let schema = test_schema();
392 let table_name = ManifestSstEntry::reserved_table_name_for_inspection();
393 let plan = table_scan(Some(table_name), &schema, None)
394 .unwrap()
395 .build()
396 .unwrap();
397
398 let provider = empty_mem_table();
399 let source = provider_as_source(provider);
400
401 let mut injector = NameAwareDataSourceInjector {
402 reserved_sources: {
403 let mut m = HashMap::new();
404 m.insert(InternalTableKind::InspectSstManifest, source.clone());
405 m
406 },
407 region_source: None,
408 };
409
410 let transformed = plan.rewrite(&mut injector).unwrap();
411 let new_plan = transformed.data;
412
413 if let LogicalPlan::TableScan(scan) = new_plan {
414 let src_ptr = Arc::as_ptr(&scan.source);
416 let want_ptr = Arc::as_ptr(&source);
417 assert!(std::ptr::eq(src_ptr, want_ptr));
418 } else {
419 panic!("expected TableScan after rewrite");
420 }
421 }
422
423 #[test]
424 fn test_rewriter_replaces_with_region_source_for_normal() {
425 let schema = test_schema();
426 let plan = table_scan(Some("normal_table"), &schema, None)
427 .unwrap()
428 .build()
429 .unwrap();
430
431 let provider = empty_mem_table();
432 let region_source = provider_as_source(provider);
433
434 let mut injector = NameAwareDataSourceInjector {
435 reserved_sources: HashMap::new(),
436 region_source: Some(region_source.clone()),
437 };
438
439 let transformed = plan.rewrite(&mut injector).unwrap();
440 let new_plan = transformed.data;
441
442 if let LogicalPlan::TableScan(scan) = new_plan {
443 let src_ptr = Arc::as_ptr(&scan.source);
444 let want_ptr = Arc::as_ptr(®ion_source);
445 assert!(std::ptr::eq(src_ptr, want_ptr));
446 } else {
447 panic!("expected TableScan after rewrite");
448 }
449 }
450}