datanode/region_server/
catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use datafusion::catalog::{
19    CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider,
20};
21use datafusion::datasource::provider_as_source;
22use datafusion::error as df_error;
23use datafusion::error::Result as DfResult;
24use datafusion_common::DataFusionError;
25use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
26use datafusion_expr::{LogicalPlan, TableSource};
27use futures::TryStreamExt;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
31use store_api::storage::RegionId;
32
33use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
34use crate::region_server::RegionServer;
35
36/// Reserved internal table kinds used.
37/// These are recognized by reserved table names and mapped to providers.
38#[allow(clippy::enum_variant_names)]
39#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
40enum InternalTableKind {
41    InspectSstManifest,
42    InspectSstStorage,
43    InspectSstIndexMeta,
44}
45
46impl InternalTableKind {
47    /// Determine if the name is a reserved internal table (case-insensitive).
48    pub fn from_table_name(name: &str) -> Option<Self> {
49        if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) {
50            return Some(Self::InspectSstManifest);
51        }
52        if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
53            return Some(Self::InspectSstStorage);
54        }
55        if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) {
56            return Some(Self::InspectSstIndexMeta);
57        }
58        None
59    }
60
61    /// Return the `TableProvider` for the internal table.
62    pub async fn table_provider(&self, server: &RegionServer) -> Result<Arc<dyn TableProvider>> {
63        match self {
64            Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
65            Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
66            Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await,
67        }
68    }
69}
70
71impl RegionServer {
72    /// Expose SSTs listed in Manifest as an in-memory table for inspection.
73    pub async fn inspect_sst_manifest_provider(&self) -> Result<Arc<dyn TableProvider>> {
74        let mito = {
75            let guard = self.inner.mito_engine.read().unwrap();
76            guard.as_ref().cloned().context(UnexpectedSnafu {
77                violated: "mito engine not available",
78            })?
79        };
80
81        let entries = mito.all_ssts_from_manifest().await;
82        let schema = ManifestSstEntry::schema().arrow_schema().clone();
83        let batch = ManifestSstEntry::to_record_batch(&entries)
84            .map_err(DataFusionError::from)
85            .context(DataFusionSnafu)?;
86
87        let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
88        Ok(Arc::new(table))
89    }
90
91    /// Expose SSTs found in storage as an in-memory table for inspection.
92    pub async fn inspect_sst_storage_provider(&self) -> Result<Arc<dyn TableProvider>> {
93        let mito = {
94            let guard = self.inner.mito_engine.read().unwrap();
95            guard.as_ref().cloned().context(UnexpectedSnafu {
96                violated: "mito engine not available",
97            })?
98        };
99        let entries = mito
100            .all_ssts_from_storage()
101            .try_collect::<Vec<_>>()
102            .await
103            .context(ListStorageSstsSnafu)?;
104        let schema = StorageSstEntry::schema().arrow_schema().clone();
105        let batch = StorageSstEntry::to_record_batch(&entries)
106            .map_err(DataFusionError::from)
107            .context(DataFusionSnafu)?;
108
109        let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
110        Ok(Arc::new(table))
111    }
112
113    /// Expose index metadata across the engine as an in-memory table.
114    pub async fn inspect_sst_index_meta_provider(&self) -> Result<Arc<dyn TableProvider>> {
115        let mito = {
116            let guard = self.inner.mito_engine.read().unwrap();
117            guard.as_ref().cloned().context(UnexpectedSnafu {
118                violated: "mito engine not available",
119            })?
120        };
121
122        let entries = mito.all_index_metas().await;
123        let schema = PuffinIndexMetaEntry::schema().arrow_schema().clone();
124        let batch = PuffinIndexMetaEntry::to_record_batch(&entries)
125            .map_err(DataFusionError::from)
126            .context(DataFusionSnafu)?;
127
128        let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
129        Ok(Arc::new(table))
130    }
131}
132
133/// A catalog list that resolves `TableProvider` by table name:
134/// - For reserved internal names, return inspection providers;
135/// - Otherwise, fall back to the Region provider.
136#[derive(Clone, Debug)]
137pub(crate) struct NameAwareCatalogList {
138    catalog: NameAwareCatalogProvider,
139}
140
141impl NameAwareCatalogList {
142    /// Creates the catalog list.
143    pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self {
144        let schema_provider = NameAwareSchemaProvider {
145            server,
146            region_id,
147            query_ctx,
148        };
149        let catalog = NameAwareCatalogProvider {
150            schema: schema_provider,
151        };
152        Self { catalog }
153    }
154}
155
156impl CatalogProviderList for NameAwareCatalogList {
157    fn as_any(&self) -> &dyn std::any::Any {
158        self
159    }
160    fn register_catalog(
161        &self,
162        _name: String,
163        _catalog: Arc<dyn CatalogProvider>,
164    ) -> Option<Arc<dyn CatalogProvider>> {
165        None
166    }
167    fn catalog_names(&self) -> Vec<String> {
168        vec![]
169    }
170    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
171        Some(Arc::new(self.catalog.clone()))
172    }
173}
174
175#[derive(Clone, Debug)]
176struct NameAwareCatalogProvider {
177    schema: NameAwareSchemaProvider,
178}
179
180impl CatalogProvider for NameAwareCatalogProvider {
181    fn as_any(&self) -> &dyn std::any::Any {
182        self
183    }
184    fn schema_names(&self) -> Vec<String> {
185        vec![]
186    }
187    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
188        Some(Arc::new(self.schema.clone()))
189    }
190}
191
192#[derive(Clone)]
193struct NameAwareSchemaProvider {
194    server: RegionServer,
195    region_id: RegionId,
196    query_ctx: QueryContextRef,
197}
198
199impl std::fmt::Debug for NameAwareSchemaProvider {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        write!(f, "NameAwareSchemaProvider")
202    }
203}
204
205#[async_trait::async_trait]
206impl SchemaProvider for NameAwareSchemaProvider {
207    fn as_any(&self) -> &dyn std::any::Any {
208        self
209    }
210    fn table_names(&self) -> Vec<String> {
211        vec![]
212    }
213
214    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
215        // Resolve inspect providers by reserved names.
216        if let Some(kind) = InternalTableKind::from_table_name(name) {
217            return kind
218                .table_provider(&self.server)
219                .await
220                .map(Some)
221                .map_err(|e| df_error::DataFusionError::External(Box::new(e)));
222        }
223
224        // Fallback to region provider for any other table name.
225        let provider = self
226            .server
227            .table_provider(self.region_id, Some(self.query_ctx.clone()))
228            .await
229            .map_err(|e| df_error::DataFusionError::External(Box::new(e)))?;
230        Ok(Some(provider))
231    }
232
233    fn table_exist(&self, _name: &str) -> bool {
234        true
235    }
236}
237/// Builds a `NameAwareDataSourceInjector` from a logical plan.
238///
239/// It scans the plan to determine:
240/// - whether a Region `TableSource` is required, and
241/// - which internal inspection sources are referenced.
242pub(crate) struct NameAwareDataSourceInjectorBuilder {
243    /// Whether the plan requires a Region `TableSource`.
244    need_region_provider: bool,
245    /// Internal table kinds referenced by the plan.
246    reserved_table_needed: Vec<InternalTableKind>,
247}
248
249impl NameAwareDataSourceInjectorBuilder {
250    /// Walk the `LogicalPlan` to determine whether a Region source is needed,
251    /// and collect the kinds of internal sources required.
252    pub fn from_plan(plan: &LogicalPlan) -> DfResult<Self> {
253        let mut need_region_provider = false;
254        let mut reserved_table_needed = Vec::new();
255        plan.apply(|node| {
256            if let LogicalPlan::TableScan(ts) = node {
257                let name = ts.table_name.to_string();
258                if let Some(kind) = InternalTableKind::from_table_name(&name) {
259                    if !reserved_table_needed.contains(&kind) {
260                        reserved_table_needed.push(kind);
261                    }
262                } else {
263                    // Any normal table scan implies a Region source is needed.
264                    need_region_provider = true;
265                }
266            }
267            Ok(TreeNodeRecursion::Continue)
268        })?;
269
270        Ok(Self {
271            need_region_provider,
272            reserved_table_needed,
273        })
274    }
275
276    pub async fn build(
277        self,
278        server: &RegionServer,
279        region_id: RegionId,
280        query_ctx: QueryContextRef,
281    ) -> Result<NameAwareDataSourceInjector> {
282        let region = if self.need_region_provider {
283            let provider = server.table_provider(region_id, Some(query_ctx)).await?;
284            Some(provider_as_source(provider))
285        } else {
286            None
287        };
288
289        let mut reserved_sources = HashMap::new();
290        for kind in &self.reserved_table_needed {
291            let provider = kind.table_provider(server).await?;
292            reserved_sources.insert(*kind, provider_as_source(provider));
293        }
294
295        Ok(NameAwareDataSourceInjector {
296            reserved_sources,
297            region_source: region,
298        })
299    }
300}
301
302/// Rewrites `LogicalPlan` to inject proper data sources for `TableScan`.
303/// Uses internal sources for reserved tables; otherwise uses the Region source.
304pub(crate) struct NameAwareDataSourceInjector {
305    /// Sources for reserved internal tables, keyed by kind.
306    reserved_sources: HashMap<InternalTableKind, Arc<dyn TableSource>>,
307    /// Optional Region-level source used for normal tables.
308    region_source: Option<Arc<dyn TableSource>>,
309}
310
311impl TreeNodeRewriter for NameAwareDataSourceInjector {
312    type Node = LogicalPlan;
313
314    fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
315        Ok(match node {
316            LogicalPlan::TableScan(mut scan) => {
317                let name = scan.table_name.to_string();
318                if let Some(kind) = InternalTableKind::from_table_name(&name)
319                    && let Some(source) = self.reserved_sources.get(&kind)
320                {
321                    // Matched a reserved internal table: rewrite to its dedicated source.
322                    scan.source = source.clone();
323                } else {
324                    let Some(region) = &self.region_source else {
325                        // Region source required but not constructed; this is unexpected.
326                        return Err(datafusion::error::DataFusionError::Plan(
327                            "region provider not available".to_string(),
328                        ));
329                    };
330                    // Normal table: rewrite to the Region source.
331                    scan.source = region.clone();
332                }
333                Transformed::yes(LogicalPlan::TableScan(scan))
334            }
335            _ => Transformed::no(node),
336        })
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use std::sync::Arc;
343
344    use datafusion::catalog::MemTable as DfMemTable;
345    use datafusion_common::tree_node::TreeNode;
346    use datafusion_expr::{LogicalPlanBuilder, table_scan};
347    use datatypes::arrow::array::Int32Array;
348    use datatypes::arrow::datatypes::{DataType, Field, Schema};
349    use datatypes::arrow::record_batch::RecordBatch;
350
351    use super::*; // bring rewrite() into scope
352
353    fn test_schema() -> Schema {
354        Schema::new(vec![Field::new("a", DataType::Int32, true)])
355    }
356
357    fn empty_mem_table() -> Arc<DfMemTable> {
358        let schema = Arc::new(test_schema());
359        let batch = RecordBatch::try_new(
360            schema.clone(),
361            vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
362        )
363        .unwrap();
364        Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap())
365    }
366
367    #[test]
368    fn test_injector_builder_from_plan_flags() {
369        let schema = test_schema();
370        let reserved = ManifestSstEntry::reserved_table_name_for_inspection();
371        // plan1: reserved table scan only
372        let plan1 = table_scan(Some(reserved), &schema, None)
373            .unwrap()
374            .build()
375            .unwrap();
376        let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap();
377        assert!(!b1.need_region_provider);
378        assert_eq!(
379            b1.reserved_table_needed,
380            vec![InternalTableKind::InspectSstManifest]
381        );
382
383        // plan2: normal table scan only
384        let plan2 = table_scan(Some("normal_table"), &schema, None)
385            .unwrap()
386            .build()
387            .unwrap();
388        let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap();
389        assert!(b2.need_region_provider);
390        assert!(b2.reserved_table_needed.is_empty());
391
392        // plan3: both reserved and normal (via UNION)
393        let p_res = table_scan(Some(reserved), &schema, None)
394            .unwrap()
395            .build()
396            .unwrap();
397        let p_norm = table_scan(Some("normal_table"), &schema, None)
398            .unwrap()
399            .build()
400            .unwrap();
401        let plan3 = LogicalPlanBuilder::from(p_res)
402            .union(LogicalPlanBuilder::from(p_norm).build().unwrap())
403            .unwrap()
404            .build()
405            .unwrap();
406        let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap();
407        assert!(b3.need_region_provider);
408        assert_eq!(
409            b3.reserved_table_needed,
410            vec![InternalTableKind::InspectSstManifest]
411        );
412    }
413
414    #[test]
415    fn test_rewriter_replaces_with_reserved_source() {
416        let schema = test_schema();
417        let table_name = ManifestSstEntry::reserved_table_name_for_inspection();
418        let plan = table_scan(Some(table_name), &schema, None)
419            .unwrap()
420            .build()
421            .unwrap();
422
423        let provider = empty_mem_table();
424        let source = provider_as_source(provider);
425
426        let mut injector = NameAwareDataSourceInjector {
427            reserved_sources: {
428                let mut m = HashMap::new();
429                m.insert(InternalTableKind::InspectSstManifest, source.clone());
430                m
431            },
432            region_source: None,
433        };
434
435        let transformed = plan.rewrite(&mut injector).unwrap();
436        let new_plan = transformed.data;
437
438        if let LogicalPlan::TableScan(scan) = new_plan {
439            // Compare the underlying Arc ptrs to ensure replacement happened
440            let src_ptr = Arc::as_ptr(&scan.source);
441            let want_ptr = Arc::as_ptr(&source);
442            assert!(std::ptr::eq(src_ptr, want_ptr));
443        } else {
444            panic!("expected TableScan after rewrite");
445        }
446    }
447
448    #[test]
449    fn test_rewriter_replaces_with_region_source_for_normal() {
450        let schema = test_schema();
451        let plan = table_scan(Some("normal_table"), &schema, None)
452            .unwrap()
453            .build()
454            .unwrap();
455
456        let provider = empty_mem_table();
457        let region_source = provider_as_source(provider);
458
459        let mut injector = NameAwareDataSourceInjector {
460            reserved_sources: HashMap::new(),
461            region_source: Some(region_source.clone()),
462        };
463
464        let transformed = plan.rewrite(&mut injector).unwrap();
465        let new_plan = transformed.data;
466
467        if let LogicalPlan::TableScan(scan) = new_plan {
468            let src_ptr = Arc::as_ptr(&scan.source);
469            let want_ptr = Arc::as_ptr(&region_source);
470            assert!(std::ptr::eq(src_ptr, want_ptr));
471        } else {
472            panic!("expected TableScan after rewrite");
473        }
474    }
475}