Skip to main content

catalog/system_schema/information_schema/
region_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_REGION_INFO_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_recordbatch::SendableRecordBatchStream;
20use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
21use datatypes::schema::SchemaRef;
22use snafu::ResultExt;
23use store_api::region_info::RegionInfoEntry;
24use store_api::storage::{ScanRequest, TableId};
25
26use crate::CatalogManager;
27use crate::error::{ProjectSchemaSnafu, Result};
28use crate::information_schema::{
29    DatanodeInspectKind, DatanodeInspectRequest, InformationTable, REGION_INFO,
30};
31use crate::system_schema::utils;
32
33/// Information schema table for region info.
34pub struct InformationSchemaRegionInfo {
35    schema: SchemaRef,
36    catalog_manager: Weak<dyn CatalogManager>,
37}
38
39impl InformationSchemaRegionInfo {
40    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
41        Self {
42            schema: RegionInfoEntry::schema(),
43            catalog_manager,
44        }
45    }
46}
47
48impl InformationTable for InformationSchemaRegionInfo {
49    fn table_id(&self) -> TableId {
50        INFORMATION_SCHEMA_REGION_INFO_TABLE_ID
51    }
52
53    fn table_name(&self) -> &'static str {
54        REGION_INFO
55    }
56
57    fn schema(&self) -> SchemaRef {
58        self.schema.clone()
59    }
60
61    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
62        let schema = if let Some(p) = request.projection_indices() {
63            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
64        } else {
65            self.schema.clone()
66        };
67
68        let info_ext = utils::information_extension(&self.catalog_manager)?;
69        let req = DatanodeInspectRequest {
70            kind: DatanodeInspectKind::RegionInfo,
71            scan: request,
72        };
73
74        let future = async move {
75            info_ext
76                .inspect_datanode(req)
77                .await
78                .map_err(BoxedError::new)
79                .context(common_recordbatch::error::ExternalSnafu)
80        };
81        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
82            schema,
83            Box::pin(future),
84        )))
85    }
86}