datanode/region_server/
catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use datafusion::catalog::{
19    CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider,
20};
21use datafusion::datasource::provider_as_source;
22use datafusion::error as df_error;
23use datafusion::error::Result as DfResult;
24use datafusion_common::DataFusionError;
25use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
26use datafusion_expr::{LogicalPlan, TableSource};
27use futures::TryStreamExt;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
31use store_api::storage::RegionId;
32
33use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
34use crate::region_server::RegionServer;
35
36/// Reserved internal table kinds used.
37/// These are recognized by reserved table names and mapped to providers.
38#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
39enum InternalTableKind {
40    InspectSstManifest,
41    InspectSstStorage,
42}
43
44impl InternalTableKind {
45    /// Determine if the name is a reserved internal table (case-insensitive).
46    pub fn from_table_name(name: &str) -> Option<Self> {
47        if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) {
48            return Some(Self::InspectSstManifest);
49        }
50        if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
51            return Some(Self::InspectSstStorage);
52        }
53        None
54    }
55
56    /// Return the `TableProvider` for the internal table.
57    pub async fn table_provider(&self, server: &RegionServer) -> Result<Arc<dyn TableProvider>> {
58        match self {
59            Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
60            Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
61        }
62    }
63}
64
65impl RegionServer {
66    /// Expose SSTs listed in Manifest as an in-memory table for inspection.
67    pub async fn inspect_sst_manifest_provider(&self) -> Result<Arc<dyn TableProvider>> {
68        let mito = {
69            let guard = self.inner.mito_engine.read().unwrap();
70            guard.as_ref().cloned().context(UnexpectedSnafu {
71                violated: "mito engine not available",
72            })?
73        };
74
75        let entries = mito.all_ssts_from_manifest().await;
76        let schema = ManifestSstEntry::schema().arrow_schema().clone();
77        let batch = ManifestSstEntry::to_record_batch(&entries)
78            .map_err(DataFusionError::from)
79            .context(DataFusionSnafu)?;
80
81        let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
82        Ok(Arc::new(table))
83    }
84
85    /// Expose SSTs found in storage as an in-memory table for inspection.
86    pub async fn inspect_sst_storage_provider(&self) -> Result<Arc<dyn TableProvider>> {
87        let mito = {
88            let guard = self.inner.mito_engine.read().unwrap();
89            guard.as_ref().cloned().context(UnexpectedSnafu {
90                violated: "mito engine not available",
91            })?
92        };
93        let entries = mito
94            .all_ssts_from_storage()
95            .try_collect::<Vec<_>>()
96            .await
97            .context(ListStorageSstsSnafu)?;
98        let schema = StorageSstEntry::schema().arrow_schema().clone();
99        let batch = StorageSstEntry::to_record_batch(&entries)
100            .map_err(DataFusionError::from)
101            .context(DataFusionSnafu)?;
102
103        let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
104        Ok(Arc::new(table))
105    }
106}
107
108/// A catalog list that resolves `TableProvider` by table name:
109/// - For reserved internal names, return inspection providers;
110/// - Otherwise, fall back to the Region provider.
111#[derive(Clone, Debug)]
112pub(crate) struct NameAwareCatalogList {
113    catalog: NameAwareCatalogProvider,
114}
115
116impl NameAwareCatalogList {
117    /// Creates the catalog list.
118    pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self {
119        let schema_provider = NameAwareSchemaProvider {
120            server,
121            region_id,
122            query_ctx,
123        };
124        let catalog = NameAwareCatalogProvider {
125            schema: schema_provider,
126        };
127        Self { catalog }
128    }
129}
130
131impl CatalogProviderList for NameAwareCatalogList {
132    fn as_any(&self) -> &dyn std::any::Any {
133        self
134    }
135    fn register_catalog(
136        &self,
137        _name: String,
138        _catalog: Arc<dyn CatalogProvider>,
139    ) -> Option<Arc<dyn CatalogProvider>> {
140        None
141    }
142    fn catalog_names(&self) -> Vec<String> {
143        vec![]
144    }
145    fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
146        Some(Arc::new(self.catalog.clone()))
147    }
148}
149
150#[derive(Clone, Debug)]
151struct NameAwareCatalogProvider {
152    schema: NameAwareSchemaProvider,
153}
154
155impl CatalogProvider for NameAwareCatalogProvider {
156    fn as_any(&self) -> &dyn std::any::Any {
157        self
158    }
159    fn schema_names(&self) -> Vec<String> {
160        vec![]
161    }
162    fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
163        Some(Arc::new(self.schema.clone()))
164    }
165}
166
167#[derive(Clone)]
168struct NameAwareSchemaProvider {
169    server: RegionServer,
170    region_id: RegionId,
171    query_ctx: QueryContextRef,
172}
173
174impl std::fmt::Debug for NameAwareSchemaProvider {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        write!(f, "NameAwareSchemaProvider")
177    }
178}
179
180#[async_trait::async_trait]
181impl SchemaProvider for NameAwareSchemaProvider {
182    fn as_any(&self) -> &dyn std::any::Any {
183        self
184    }
185    fn table_names(&self) -> Vec<String> {
186        vec![]
187    }
188
189    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
190        // Resolve inspect providers by reserved names.
191        if let Some(kind) = InternalTableKind::from_table_name(name) {
192            return kind
193                .table_provider(&self.server)
194                .await
195                .map(Some)
196                .map_err(|e| df_error::DataFusionError::External(Box::new(e)));
197        }
198
199        // Fallback to region provider for any other table name.
200        let provider = self
201            .server
202            .table_provider(self.region_id, Some(self.query_ctx.clone()))
203            .await
204            .map_err(|e| df_error::DataFusionError::External(Box::new(e)))?;
205        Ok(Some(provider))
206    }
207
208    fn table_exist(&self, _name: &str) -> bool {
209        true
210    }
211}
212/// Builds a `NameAwareDataSourceInjector` from a logical plan.
213///
214/// It scans the plan to determine:
215/// - whether a Region `TableSource` is required, and
216/// - which internal inspection sources are referenced.
217pub(crate) struct NameAwareDataSourceInjectorBuilder {
218    /// Whether the plan requires a Region `TableSource`.
219    need_region_provider: bool,
220    /// Internal table kinds referenced by the plan.
221    reserved_table_needed: Vec<InternalTableKind>,
222}
223
224impl NameAwareDataSourceInjectorBuilder {
225    /// Walk the `LogicalPlan` to determine whether a Region source is needed,
226    /// and collect the kinds of internal sources required.
227    pub fn from_plan(plan: &LogicalPlan) -> DfResult<Self> {
228        let mut need_region_provider = false;
229        let mut reserved_table_needed = Vec::new();
230        plan.apply(|node| {
231            if let LogicalPlan::TableScan(ts) = node {
232                let name = ts.table_name.to_string();
233                if let Some(kind) = InternalTableKind::from_table_name(&name) {
234                    if !reserved_table_needed.contains(&kind) {
235                        reserved_table_needed.push(kind);
236                    }
237                } else {
238                    // Any normal table scan implies a Region source is needed.
239                    need_region_provider = true;
240                }
241            }
242            Ok(TreeNodeRecursion::Continue)
243        })?;
244
245        Ok(Self {
246            need_region_provider,
247            reserved_table_needed,
248        })
249    }
250
251    pub async fn build(
252        self,
253        server: &RegionServer,
254        region_id: RegionId,
255        query_ctx: QueryContextRef,
256    ) -> Result<NameAwareDataSourceInjector> {
257        let region = if self.need_region_provider {
258            let provider = server.table_provider(region_id, Some(query_ctx)).await?;
259            Some(provider_as_source(provider))
260        } else {
261            None
262        };
263
264        let mut reserved_sources = HashMap::new();
265        for kind in &self.reserved_table_needed {
266            let provider = kind.table_provider(server).await?;
267            reserved_sources.insert(*kind, provider_as_source(provider));
268        }
269
270        Ok(NameAwareDataSourceInjector {
271            reserved_sources,
272            region_source: region,
273        })
274    }
275}
276
277/// Rewrites `LogicalPlan` to inject proper data sources for `TableScan`.
278/// Uses internal sources for reserved tables; otherwise uses the Region source.
279pub(crate) struct NameAwareDataSourceInjector {
280    /// Sources for reserved internal tables, keyed by kind.
281    reserved_sources: HashMap<InternalTableKind, Arc<dyn TableSource>>,
282    /// Optional Region-level source used for normal tables.
283    region_source: Option<Arc<dyn TableSource>>,
284}
285
286impl TreeNodeRewriter for NameAwareDataSourceInjector {
287    type Node = LogicalPlan;
288
289    fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
290        Ok(match node {
291            LogicalPlan::TableScan(mut scan) => {
292                let name = scan.table_name.to_string();
293                if let Some(kind) = InternalTableKind::from_table_name(&name)
294                    && let Some(source) = self.reserved_sources.get(&kind)
295                {
296                    // Matched a reserved internal table: rewrite to its dedicated source.
297                    scan.source = source.clone();
298                } else {
299                    let Some(region) = &self.region_source else {
300                        // Region source required but not constructed; this is unexpected.
301                        return Err(datafusion::error::DataFusionError::Plan(
302                            "region provider not available".to_string(),
303                        ));
304                    };
305                    // Normal table: rewrite to the Region source.
306                    scan.source = region.clone();
307                }
308                Transformed::yes(LogicalPlan::TableScan(scan))
309            }
310            _ => Transformed::no(node),
311        })
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use std::sync::Arc;
318
319    use datafusion::catalog::MemTable as DfMemTable;
320    use datafusion_common::tree_node::TreeNode;
321    use datafusion_expr::{LogicalPlanBuilder, table_scan};
322    use datatypes::arrow::array::Int32Array;
323    use datatypes::arrow::datatypes::{DataType, Field, Schema};
324    use datatypes::arrow::record_batch::RecordBatch;
325
326    use super::*; // bring rewrite() into scope
327
328    fn test_schema() -> Schema {
329        Schema::new(vec![Field::new("a", DataType::Int32, true)])
330    }
331
332    fn empty_mem_table() -> Arc<DfMemTable> {
333        let schema = Arc::new(test_schema());
334        let batch = RecordBatch::try_new(
335            schema.clone(),
336            vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
337        )
338        .unwrap();
339        Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap())
340    }
341
342    #[test]
343    fn test_injector_builder_from_plan_flags() {
344        let schema = test_schema();
345        let reserved = ManifestSstEntry::reserved_table_name_for_inspection();
346        // plan1: reserved table scan only
347        let plan1 = table_scan(Some(reserved), &schema, None)
348            .unwrap()
349            .build()
350            .unwrap();
351        let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap();
352        assert!(!b1.need_region_provider);
353        assert_eq!(
354            b1.reserved_table_needed,
355            vec![InternalTableKind::InspectSstManifest]
356        );
357
358        // plan2: normal table scan only
359        let plan2 = table_scan(Some("normal_table"), &schema, None)
360            .unwrap()
361            .build()
362            .unwrap();
363        let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap();
364        assert!(b2.need_region_provider);
365        assert!(b2.reserved_table_needed.is_empty());
366
367        // plan3: both reserved and normal (via UNION)
368        let p_res = table_scan(Some(reserved), &schema, None)
369            .unwrap()
370            .build()
371            .unwrap();
372        let p_norm = table_scan(Some("normal_table"), &schema, None)
373            .unwrap()
374            .build()
375            .unwrap();
376        let plan3 = LogicalPlanBuilder::from(p_res)
377            .union(LogicalPlanBuilder::from(p_norm).build().unwrap())
378            .unwrap()
379            .build()
380            .unwrap();
381        let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap();
382        assert!(b3.need_region_provider);
383        assert_eq!(
384            b3.reserved_table_needed,
385            vec![InternalTableKind::InspectSstManifest]
386        );
387    }
388
389    #[test]
390    fn test_rewriter_replaces_with_reserved_source() {
391        let schema = test_schema();
392        let table_name = ManifestSstEntry::reserved_table_name_for_inspection();
393        let plan = table_scan(Some(table_name), &schema, None)
394            .unwrap()
395            .build()
396            .unwrap();
397
398        let provider = empty_mem_table();
399        let source = provider_as_source(provider);
400
401        let mut injector = NameAwareDataSourceInjector {
402            reserved_sources: {
403                let mut m = HashMap::new();
404                m.insert(InternalTableKind::InspectSstManifest, source.clone());
405                m
406            },
407            region_source: None,
408        };
409
410        let transformed = plan.rewrite(&mut injector).unwrap();
411        let new_plan = transformed.data;
412
413        if let LogicalPlan::TableScan(scan) = new_plan {
414            // Compare the underlying Arc ptrs to ensure replacement happened
415            let src_ptr = Arc::as_ptr(&scan.source);
416            let want_ptr = Arc::as_ptr(&source);
417            assert!(std::ptr::eq(src_ptr, want_ptr));
418        } else {
419            panic!("expected TableScan after rewrite");
420        }
421    }
422
423    #[test]
424    fn test_rewriter_replaces_with_region_source_for_normal() {
425        let schema = test_schema();
426        let plan = table_scan(Some("normal_table"), &schema, None)
427            .unwrap()
428            .build()
429            .unwrap();
430
431        let provider = empty_mem_table();
432        let region_source = provider_as_source(provider);
433
434        let mut injector = NameAwareDataSourceInjector {
435            reserved_sources: HashMap::new(),
436            region_source: Some(region_source.clone()),
437        };
438
439        let transformed = plan.rewrite(&mut injector).unwrap();
440        let new_plan = transformed.data;
441
442        if let LogicalPlan::TableScan(scan) = new_plan {
443            let src_ptr = Arc::as_ptr(&scan.source);
444            let want_ptr = Arc::as_ptr(&region_source);
445            assert!(std::ptr::eq(src_ptr, want_ptr));
446        } else {
447            panic!("expected TableScan after rewrite");
448        }
449    }
450}