1use std::collections::HashMap;
16use std::sync::Arc;
17
18use datafusion::catalog::{
19 CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider,
20};
21use datafusion::datasource::provider_as_source;
22use datafusion::error as df_error;
23use datafusion::error::Result as DfResult;
24use datafusion_common::DataFusionError;
25use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
26use datafusion_expr::{LogicalPlan, TableSource};
27use futures::TryStreamExt;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
31use store_api::storage::RegionId;
32
33use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
34use crate::region_server::RegionServer;
35
36#[allow(clippy::enum_variant_names)]
39#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
40enum InternalTableKind {
41 InspectSstManifest,
42 InspectSstStorage,
43 InspectSstIndexMeta,
44}
45
46impl InternalTableKind {
47 pub fn from_table_name(name: &str) -> Option<Self> {
49 if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) {
50 return Some(Self::InspectSstManifest);
51 }
52 if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
53 return Some(Self::InspectSstStorage);
54 }
55 if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) {
56 return Some(Self::InspectSstIndexMeta);
57 }
58 None
59 }
60
61 pub async fn table_provider(&self, server: &RegionServer) -> Result<Arc<dyn TableProvider>> {
63 match self {
64 Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
65 Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
66 Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await,
67 }
68 }
69}
70
71impl RegionServer {
72 pub async fn inspect_sst_manifest_provider(&self) -> Result<Arc<dyn TableProvider>> {
74 let mito = {
75 let guard = self.inner.mito_engine.read().unwrap();
76 guard.as_ref().cloned().context(UnexpectedSnafu {
77 violated: "mito engine not available",
78 })?
79 };
80
81 let entries = mito.all_ssts_from_manifest().await;
82 let schema = ManifestSstEntry::schema().arrow_schema().clone();
83 let batch = ManifestSstEntry::to_record_batch(&entries)
84 .map_err(DataFusionError::from)
85 .context(DataFusionSnafu)?;
86
87 let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
88 Ok(Arc::new(table))
89 }
90
91 pub async fn inspect_sst_storage_provider(&self) -> Result<Arc<dyn TableProvider>> {
93 let mito = {
94 let guard = self.inner.mito_engine.read().unwrap();
95 guard.as_ref().cloned().context(UnexpectedSnafu {
96 violated: "mito engine not available",
97 })?
98 };
99 let entries = mito
100 .all_ssts_from_storage()
101 .try_collect::<Vec<_>>()
102 .await
103 .context(ListStorageSstsSnafu)?;
104 let schema = StorageSstEntry::schema().arrow_schema().clone();
105 let batch = StorageSstEntry::to_record_batch(&entries)
106 .map_err(DataFusionError::from)
107 .context(DataFusionSnafu)?;
108
109 let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
110 Ok(Arc::new(table))
111 }
112
113 pub async fn inspect_sst_index_meta_provider(&self) -> Result<Arc<dyn TableProvider>> {
115 let mito = {
116 let guard = self.inner.mito_engine.read().unwrap();
117 guard.as_ref().cloned().context(UnexpectedSnafu {
118 violated: "mito engine not available",
119 })?
120 };
121
122 let entries = mito.all_index_metas().await;
123 let schema = PuffinIndexMetaEntry::schema().arrow_schema().clone();
124 let batch = PuffinIndexMetaEntry::to_record_batch(&entries)
125 .map_err(DataFusionError::from)
126 .context(DataFusionSnafu)?;
127
128 let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
129 Ok(Arc::new(table))
130 }
131}
132
133#[derive(Clone, Debug)]
137pub(crate) struct NameAwareCatalogList {
138 catalog: NameAwareCatalogProvider,
139}
140
141impl NameAwareCatalogList {
142 pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self {
144 let schema_provider = NameAwareSchemaProvider {
145 server,
146 region_id,
147 query_ctx,
148 };
149 let catalog = NameAwareCatalogProvider {
150 schema: schema_provider,
151 };
152 Self { catalog }
153 }
154}
155
156impl CatalogProviderList for NameAwareCatalogList {
157 fn as_any(&self) -> &dyn std::any::Any {
158 self
159 }
160 fn register_catalog(
161 &self,
162 _name: String,
163 _catalog: Arc<dyn CatalogProvider>,
164 ) -> Option<Arc<dyn CatalogProvider>> {
165 None
166 }
167 fn catalog_names(&self) -> Vec<String> {
168 vec![]
169 }
170 fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
171 Some(Arc::new(self.catalog.clone()))
172 }
173}
174
175#[derive(Clone, Debug)]
176struct NameAwareCatalogProvider {
177 schema: NameAwareSchemaProvider,
178}
179
180impl CatalogProvider for NameAwareCatalogProvider {
181 fn as_any(&self) -> &dyn std::any::Any {
182 self
183 }
184 fn schema_names(&self) -> Vec<String> {
185 vec![]
186 }
187 fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
188 Some(Arc::new(self.schema.clone()))
189 }
190}
191
192#[derive(Clone)]
193struct NameAwareSchemaProvider {
194 server: RegionServer,
195 region_id: RegionId,
196 query_ctx: QueryContextRef,
197}
198
199impl std::fmt::Debug for NameAwareSchemaProvider {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 write!(f, "NameAwareSchemaProvider")
202 }
203}
204
205#[async_trait::async_trait]
206impl SchemaProvider for NameAwareSchemaProvider {
207 fn as_any(&self) -> &dyn std::any::Any {
208 self
209 }
210 fn table_names(&self) -> Vec<String> {
211 vec![]
212 }
213
214 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
215 if let Some(kind) = InternalTableKind::from_table_name(name) {
217 return kind
218 .table_provider(&self.server)
219 .await
220 .map(Some)
221 .map_err(|e| df_error::DataFusionError::External(Box::new(e)));
222 }
223
224 let provider = self
226 .server
227 .table_provider(self.region_id, Some(self.query_ctx.clone()))
228 .await
229 .map_err(|e| df_error::DataFusionError::External(Box::new(e)))?;
230 Ok(Some(provider))
231 }
232
233 fn table_exist(&self, _name: &str) -> bool {
234 true
235 }
236}
237pub(crate) struct NameAwareDataSourceInjectorBuilder {
243 need_region_provider: bool,
245 reserved_table_needed: Vec<InternalTableKind>,
247}
248
249impl NameAwareDataSourceInjectorBuilder {
250 pub fn from_plan(plan: &LogicalPlan) -> DfResult<Self> {
253 let mut need_region_provider = false;
254 let mut reserved_table_needed = Vec::new();
255 plan.apply(|node| {
256 if let LogicalPlan::TableScan(ts) = node {
257 let name = ts.table_name.to_string();
258 if let Some(kind) = InternalTableKind::from_table_name(&name) {
259 if !reserved_table_needed.contains(&kind) {
260 reserved_table_needed.push(kind);
261 }
262 } else {
263 need_region_provider = true;
265 }
266 }
267 Ok(TreeNodeRecursion::Continue)
268 })?;
269
270 Ok(Self {
271 need_region_provider,
272 reserved_table_needed,
273 })
274 }
275
276 pub async fn build(
277 self,
278 server: &RegionServer,
279 region_id: RegionId,
280 query_ctx: QueryContextRef,
281 ) -> Result<NameAwareDataSourceInjector> {
282 let region = if self.need_region_provider {
283 let provider = server.table_provider(region_id, Some(query_ctx)).await?;
284 Some(provider_as_source(provider))
285 } else {
286 None
287 };
288
289 let mut reserved_sources = HashMap::new();
290 for kind in &self.reserved_table_needed {
291 let provider = kind.table_provider(server).await?;
292 reserved_sources.insert(*kind, provider_as_source(provider));
293 }
294
295 Ok(NameAwareDataSourceInjector {
296 reserved_sources,
297 region_source: region,
298 })
299 }
300}
301
302pub(crate) struct NameAwareDataSourceInjector {
305 reserved_sources: HashMap<InternalTableKind, Arc<dyn TableSource>>,
307 region_source: Option<Arc<dyn TableSource>>,
309}
310
311impl TreeNodeRewriter for NameAwareDataSourceInjector {
312 type Node = LogicalPlan;
313
314 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
315 Ok(match node {
316 LogicalPlan::TableScan(mut scan) => {
317 let name = scan.table_name.to_string();
318 if let Some(kind) = InternalTableKind::from_table_name(&name)
319 && let Some(source) = self.reserved_sources.get(&kind)
320 {
321 scan.source = source.clone();
323 } else {
324 let Some(region) = &self.region_source else {
325 return Err(datafusion::error::DataFusionError::Plan(
327 "region provider not available".to_string(),
328 ));
329 };
330 scan.source = region.clone();
332 }
333 Transformed::yes(LogicalPlan::TableScan(scan))
334 }
335 _ => Transformed::no(node),
336 })
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use std::sync::Arc;
343
344 use datafusion::catalog::MemTable as DfMemTable;
345 use datafusion_common::tree_node::TreeNode;
346 use datafusion_expr::{LogicalPlanBuilder, table_scan};
347 use datatypes::arrow::array::Int32Array;
348 use datatypes::arrow::datatypes::{DataType, Field, Schema};
349 use datatypes::arrow::record_batch::RecordBatch;
350
351 use super::*; fn test_schema() -> Schema {
354 Schema::new(vec![Field::new("a", DataType::Int32, true)])
355 }
356
357 fn empty_mem_table() -> Arc<DfMemTable> {
358 let schema = Arc::new(test_schema());
359 let batch = RecordBatch::try_new(
360 schema.clone(),
361 vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
362 )
363 .unwrap();
364 Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap())
365 }
366
367 #[test]
368 fn test_injector_builder_from_plan_flags() {
369 let schema = test_schema();
370 let reserved = ManifestSstEntry::reserved_table_name_for_inspection();
371 let plan1 = table_scan(Some(reserved), &schema, None)
373 .unwrap()
374 .build()
375 .unwrap();
376 let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap();
377 assert!(!b1.need_region_provider);
378 assert_eq!(
379 b1.reserved_table_needed,
380 vec![InternalTableKind::InspectSstManifest]
381 );
382
383 let plan2 = table_scan(Some("normal_table"), &schema, None)
385 .unwrap()
386 .build()
387 .unwrap();
388 let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap();
389 assert!(b2.need_region_provider);
390 assert!(b2.reserved_table_needed.is_empty());
391
392 let p_res = table_scan(Some(reserved), &schema, None)
394 .unwrap()
395 .build()
396 .unwrap();
397 let p_norm = table_scan(Some("normal_table"), &schema, None)
398 .unwrap()
399 .build()
400 .unwrap();
401 let plan3 = LogicalPlanBuilder::from(p_res)
402 .union(LogicalPlanBuilder::from(p_norm).build().unwrap())
403 .unwrap()
404 .build()
405 .unwrap();
406 let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap();
407 assert!(b3.need_region_provider);
408 assert_eq!(
409 b3.reserved_table_needed,
410 vec![InternalTableKind::InspectSstManifest]
411 );
412 }
413
414 #[test]
415 fn test_rewriter_replaces_with_reserved_source() {
416 let schema = test_schema();
417 let table_name = ManifestSstEntry::reserved_table_name_for_inspection();
418 let plan = table_scan(Some(table_name), &schema, None)
419 .unwrap()
420 .build()
421 .unwrap();
422
423 let provider = empty_mem_table();
424 let source = provider_as_source(provider);
425
426 let mut injector = NameAwareDataSourceInjector {
427 reserved_sources: {
428 let mut m = HashMap::new();
429 m.insert(InternalTableKind::InspectSstManifest, source.clone());
430 m
431 },
432 region_source: None,
433 };
434
435 let transformed = plan.rewrite(&mut injector).unwrap();
436 let new_plan = transformed.data;
437
438 if let LogicalPlan::TableScan(scan) = new_plan {
439 let src_ptr = Arc::as_ptr(&scan.source);
441 let want_ptr = Arc::as_ptr(&source);
442 assert!(std::ptr::eq(src_ptr, want_ptr));
443 } else {
444 panic!("expected TableScan after rewrite");
445 }
446 }
447
448 #[test]
449 fn test_rewriter_replaces_with_region_source_for_normal() {
450 let schema = test_schema();
451 let plan = table_scan(Some("normal_table"), &schema, None)
452 .unwrap()
453 .build()
454 .unwrap();
455
456 let provider = empty_mem_table();
457 let region_source = provider_as_source(provider);
458
459 let mut injector = NameAwareDataSourceInjector {
460 reserved_sources: HashMap::new(),
461 region_source: Some(region_source.clone()),
462 };
463
464 let transformed = plan.rewrite(&mut injector).unwrap();
465 let new_plan = transformed.data;
466
467 if let LogicalPlan::TableScan(scan) = new_plan {
468 let src_ptr = Arc::as_ptr(&scan.source);
469 let want_ptr = Arc::as_ptr(®ion_source);
470 assert!(std::ptr::eq(src_ptr, want_ptr));
471 } else {
472 panic!("expected TableScan after rewrite");
473 }
474 }
475}