catalog/system_schema/information_schema/
ssts.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::{
18    INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID, INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID,
19    INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
20};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
24use datatypes::schema::SchemaRef;
25use snafu::ResultExt;
26use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
27use store_api::storage::{ScanRequest, TableId};
28
29use crate::CatalogManager;
30use crate::error::{ProjectSchemaSnafu, Result};
31use crate::information_schema::{
32    DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_INDEX_META, SSTS_MANIFEST,
33    SSTS_STORAGE,
34};
35use crate::system_schema::utils;
36
37/// Information schema table for sst manifest.
38pub struct InformationSchemaSstsManifest {
39    schema: SchemaRef,
40    catalog_manager: Weak<dyn CatalogManager>,
41}
42
43impl InformationSchemaSstsManifest {
44    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
45        Self {
46            schema: ManifestSstEntry::schema(),
47            catalog_manager,
48        }
49    }
50}
51
52impl InformationTable for InformationSchemaSstsManifest {
53    fn table_id(&self) -> TableId {
54        INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID
55    }
56
57    fn table_name(&self) -> &'static str {
58        SSTS_MANIFEST
59    }
60
61    fn schema(&self) -> SchemaRef {
62        self.schema.clone()
63    }
64
65    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
66        let schema = if let Some(p) = &request.projection {
67            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
68        } else {
69            self.schema.clone()
70        };
71        let info_ext = utils::information_extension(&self.catalog_manager)?;
72        let req = DatanodeInspectRequest {
73            kind: DatanodeInspectKind::SstManifest,
74            scan: request,
75        };
76
77        let future = async move {
78            info_ext
79                .inspect_datanode(req)
80                .await
81                .map_err(BoxedError::new)
82                .context(common_recordbatch::error::ExternalSnafu)
83        };
84        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
85            schema,
86            Box::pin(future),
87        )))
88    }
89}
90
91/// Information schema table for sst storage.
92pub struct InformationSchemaSstsStorage {
93    schema: SchemaRef,
94    catalog_manager: Weak<dyn CatalogManager>,
95}
96
97impl InformationSchemaSstsStorage {
98    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
99        Self {
100            schema: StorageSstEntry::schema(),
101            catalog_manager,
102        }
103    }
104}
105
106impl InformationTable for InformationSchemaSstsStorage {
107    fn table_id(&self) -> TableId {
108        INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID
109    }
110
111    fn table_name(&self) -> &'static str {
112        SSTS_STORAGE
113    }
114
115    fn schema(&self) -> SchemaRef {
116        self.schema.clone()
117    }
118
119    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
120        let schema = if let Some(p) = &request.projection {
121            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
122        } else {
123            self.schema.clone()
124        };
125
126        let info_ext = utils::information_extension(&self.catalog_manager)?;
127        let req = DatanodeInspectRequest {
128            kind: DatanodeInspectKind::SstStorage,
129            scan: request,
130        };
131
132        let future = async move {
133            info_ext
134                .inspect_datanode(req)
135                .await
136                .map_err(BoxedError::new)
137                .context(common_recordbatch::error::ExternalSnafu)
138        };
139        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
140            schema,
141            Box::pin(future),
142        )))
143    }
144}
145
146/// Information schema table for index metadata.
147pub struct InformationSchemaSstsIndexMeta {
148    schema: SchemaRef,
149    catalog_manager: Weak<dyn CatalogManager>,
150}
151
152impl InformationSchemaSstsIndexMeta {
153    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
154        Self {
155            schema: PuffinIndexMetaEntry::schema(),
156            catalog_manager,
157        }
158    }
159}
160
161impl InformationTable for InformationSchemaSstsIndexMeta {
162    fn table_id(&self) -> TableId {
163        INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID
164    }
165
166    fn table_name(&self) -> &'static str {
167        SSTS_INDEX_META
168    }
169
170    fn schema(&self) -> SchemaRef {
171        self.schema.clone()
172    }
173
174    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
175        let schema = if let Some(p) = &request.projection {
176            Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
177        } else {
178            self.schema.clone()
179        };
180
181        let info_ext = utils::information_extension(&self.catalog_manager)?;
182        let req = DatanodeInspectRequest {
183            kind: DatanodeInspectKind::SstIndexMeta,
184            scan: request,
185        };
186
187        let future = async move {
188            info_ext
189                .inspect_datanode(req)
190                .await
191                .map_err(BoxedError::new)
192                .context(common_recordbatch::error::ExternalSnafu)
193        };
194        Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
195            schema,
196            Box::pin(future),
197        )))
198    }
199}