catalog/system_schema/information_schema/
ssts.rs1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::{
18 INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID, INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID,
19 INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
20};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
24use datatypes::schema::SchemaRef;
25use snafu::ResultExt;
26use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
27use store_api::storage::{ScanRequest, TableId};
28
29use crate::CatalogManager;
30use crate::error::{ProjectSchemaSnafu, Result};
31use crate::information_schema::{
32 DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_INDEX_META, SSTS_MANIFEST,
33 SSTS_STORAGE,
34};
35use crate::system_schema::utils;
36
37pub struct InformationSchemaSstsManifest {
39 schema: SchemaRef,
40 catalog_manager: Weak<dyn CatalogManager>,
41}
42
43impl InformationSchemaSstsManifest {
44 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
45 Self {
46 schema: ManifestSstEntry::schema(),
47 catalog_manager,
48 }
49 }
50}
51
52impl InformationTable for InformationSchemaSstsManifest {
53 fn table_id(&self) -> TableId {
54 INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID
55 }
56
57 fn table_name(&self) -> &'static str {
58 SSTS_MANIFEST
59 }
60
61 fn schema(&self) -> SchemaRef {
62 self.schema.clone()
63 }
64
65 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
66 let schema = if let Some(p) = &request.projection {
67 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
68 } else {
69 self.schema.clone()
70 };
71 let info_ext = utils::information_extension(&self.catalog_manager)?;
72 let req = DatanodeInspectRequest {
73 kind: DatanodeInspectKind::SstManifest,
74 scan: request,
75 };
76
77 let future = async move {
78 info_ext
79 .inspect_datanode(req)
80 .await
81 .map_err(BoxedError::new)
82 .context(common_recordbatch::error::ExternalSnafu)
83 };
84 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
85 schema,
86 Box::pin(future),
87 )))
88 }
89}
90
91pub struct InformationSchemaSstsStorage {
93 schema: SchemaRef,
94 catalog_manager: Weak<dyn CatalogManager>,
95}
96
97impl InformationSchemaSstsStorage {
98 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
99 Self {
100 schema: StorageSstEntry::schema(),
101 catalog_manager,
102 }
103 }
104}
105
106impl InformationTable for InformationSchemaSstsStorage {
107 fn table_id(&self) -> TableId {
108 INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID
109 }
110
111 fn table_name(&self) -> &'static str {
112 SSTS_STORAGE
113 }
114
115 fn schema(&self) -> SchemaRef {
116 self.schema.clone()
117 }
118
119 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
120 let schema = if let Some(p) = &request.projection {
121 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
122 } else {
123 self.schema.clone()
124 };
125
126 let info_ext = utils::information_extension(&self.catalog_manager)?;
127 let req = DatanodeInspectRequest {
128 kind: DatanodeInspectKind::SstStorage,
129 scan: request,
130 };
131
132 let future = async move {
133 info_ext
134 .inspect_datanode(req)
135 .await
136 .map_err(BoxedError::new)
137 .context(common_recordbatch::error::ExternalSnafu)
138 };
139 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
140 schema,
141 Box::pin(future),
142 )))
143 }
144}
145
146pub struct InformationSchemaSstsIndexMeta {
148 schema: SchemaRef,
149 catalog_manager: Weak<dyn CatalogManager>,
150}
151
152impl InformationSchemaSstsIndexMeta {
153 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
154 Self {
155 schema: PuffinIndexMetaEntry::schema(),
156 catalog_manager,
157 }
158 }
159}
160
161impl InformationTable for InformationSchemaSstsIndexMeta {
162 fn table_id(&self) -> TableId {
163 INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID
164 }
165
166 fn table_name(&self) -> &'static str {
167 SSTS_INDEX_META
168 }
169
170 fn schema(&self) -> SchemaRef {
171 self.schema.clone()
172 }
173
174 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
175 let schema = if let Some(p) = &request.projection {
176 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
177 } else {
178 self.schema.clone()
179 };
180
181 let info_ext = utils::information_extension(&self.catalog_manager)?;
182 let req = DatanodeInspectRequest {
183 kind: DatanodeInspectKind::SstIndexMeta,
184 scan: request,
185 };
186
187 let future = async move {
188 info_ext
189 .inspect_datanode(req)
190 .await
191 .map_err(BoxedError::new)
192 .context(common_recordbatch::error::ExternalSnafu)
193 };
194 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
195 schema,
196 Box::pin(future),
197 )))
198 }
199}