catalog/system_schema/information_schema/
ssts.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::{
18    INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID, INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
19};
20use common_error::ext::BoxedError;
21use common_recordbatch::SendableRecordBatchStream;
22use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
23use datatypes::schema::SchemaRef;
24use snafu::ResultExt;
25use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
26use store_api::storage::{ScanRequest, TableId};
27
28use crate::CatalogManager;
29use crate::error::{ProjectSchemaSnafu, Result};
30use crate::information_schema::{
31    DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_MANIFEST, SSTS_STORAGE,
32};
33use crate::system_schema::utils;
34
35/// Information schema table for sst manifest.
36pub struct InformationSchemaSstsManifest {
37    schema: SchemaRef,
38    catalog_manager: Weak<dyn CatalogManager>,
39}
40
41impl InformationSchemaSstsManifest {
42    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
43        Self {
44            schema: ManifestSstEntry::schema(),
45            catalog_manager,
46        }
47    }
48}
49
50impl InformationTable for InformationSchemaSstsManifest {
51    fn table_id(&self) -> TableId {
52        INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID
53    }
54
55    fn table_name(&self) -> &'static str {
56        SSTS_MANIFEST
57    }
58
59    fn schema(&self) -> SchemaRef {
60        self.schema.clone()
61    }
62
63    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
64        let schema = if let Some(p) = &request.projection {
65            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
66        } else {
67            self.schema.clone()
68        };
69        let info_ext = utils::information_extension(&self.catalog_manager)?;
70        let req = DatanodeInspectRequest {
71            kind: DatanodeInspectKind::SstManifest,
72            scan: request,
73        };
74
75        let future = async move {
76            info_ext
77                .inspect_datanode(req)
78                .await
79                .map_err(BoxedError::new)
80                .context(common_recordbatch::error::ExternalSnafu)
81        };
82        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
83            schema,
84            Box::pin(future),
85        )))
86    }
87}
88
89/// Information schema table for sst storage.
90pub struct InformationSchemaSstsStorage {
91    schema: SchemaRef,
92    catalog_manager: Weak<dyn CatalogManager>,
93}
94
95impl InformationSchemaSstsStorage {
96    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
97        Self {
98            schema: StorageSstEntry::schema(),
99            catalog_manager,
100        }
101    }
102}
103
104impl InformationTable for InformationSchemaSstsStorage {
105    fn table_id(&self) -> TableId {
106        INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID
107    }
108
109    fn table_name(&self) -> &'static str {
110        SSTS_STORAGE
111    }
112
113    fn schema(&self) -> SchemaRef {
114        self.schema.clone()
115    }
116
117    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118        let schema = if let Some(p) = &request.projection {
119            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
120        } else {
121            self.schema.clone()
122        };
123
124        let info_ext = utils::information_extension(&self.catalog_manager)?;
125        let req = DatanodeInspectRequest {
126            kind: DatanodeInspectKind::SstStorage,
127            scan: request,
128        };
129
130        let future = async move {
131            info_ext
132                .inspect_datanode(req)
133                .await
134                .map_err(BoxedError::new)
135                .context(common_recordbatch::error::ExternalSnafu)
136        };
137        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
138            schema,
139            Box::pin(future),
140        )))
141    }
142}