catalog/system_schema/information_schema/
ssts.rs1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::{
18 INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID, INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
19};
20use common_error::ext::BoxedError;
21use common_recordbatch::SendableRecordBatchStream;
22use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
23use datatypes::schema::SchemaRef;
24use snafu::ResultExt;
25use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
26use store_api::storage::{ScanRequest, TableId};
27
28use crate::CatalogManager;
29use crate::error::{ProjectSchemaSnafu, Result};
30use crate::information_schema::{
31 DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_MANIFEST, SSTS_STORAGE,
32};
33use crate::system_schema::utils;
34
35pub struct InformationSchemaSstsManifest {
37 schema: SchemaRef,
38 catalog_manager: Weak<dyn CatalogManager>,
39}
40
41impl InformationSchemaSstsManifest {
42 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
43 Self {
44 schema: ManifestSstEntry::schema(),
45 catalog_manager,
46 }
47 }
48}
49
50impl InformationTable for InformationSchemaSstsManifest {
51 fn table_id(&self) -> TableId {
52 INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID
53 }
54
55 fn table_name(&self) -> &'static str {
56 SSTS_MANIFEST
57 }
58
59 fn schema(&self) -> SchemaRef {
60 self.schema.clone()
61 }
62
63 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
64 let schema = if let Some(p) = &request.projection {
65 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
66 } else {
67 self.schema.clone()
68 };
69 let info_ext = utils::information_extension(&self.catalog_manager)?;
70 let req = DatanodeInspectRequest {
71 kind: DatanodeInspectKind::SstManifest,
72 scan: request,
73 };
74
75 let future = async move {
76 info_ext
77 .inspect_datanode(req)
78 .await
79 .map_err(BoxedError::new)
80 .context(common_recordbatch::error::ExternalSnafu)
81 };
82 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
83 schema,
84 Box::pin(future),
85 )))
86 }
87}
88
89pub struct InformationSchemaSstsStorage {
91 schema: SchemaRef,
92 catalog_manager: Weak<dyn CatalogManager>,
93}
94
95impl InformationSchemaSstsStorage {
96 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
97 Self {
98 schema: StorageSstEntry::schema(),
99 catalog_manager,
100 }
101 }
102}
103
104impl InformationTable for InformationSchemaSstsStorage {
105 fn table_id(&self) -> TableId {
106 INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID
107 }
108
109 fn table_name(&self) -> &'static str {
110 SSTS_STORAGE
111 }
112
113 fn schema(&self) -> SchemaRef {
114 self.schema.clone()
115 }
116
117 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118 let schema = if let Some(p) = &request.projection {
119 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
120 } else {
121 self.schema.clone()
122 };
123
124 let info_ext = utils::information_extension(&self.catalog_manager)?;
125 let req = DatanodeInspectRequest {
126 kind: DatanodeInspectKind::SstStorage,
127 scan: request,
128 };
129
130 let future = async move {
131 info_ext
132 .inspect_datanode(req)
133 .await
134 .map_err(BoxedError::new)
135 .context(common_recordbatch::error::ExternalSnafu)
136 };
137 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
138 schema,
139 Box::pin(future),
140 )))
141 }
142}