catalog/system_schema/information_schema/
schemata.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::key::schema_name::SchemaNameKey;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::value::Value;
30use datatypes::vectors::StringVectorBuilder;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33
34use crate::error::{
35 CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu,
36 UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::system_schema::information_schema::{InformationTable, Predicates, SCHEMATA};
39use crate::system_schema::utils;
40use crate::CatalogManager;
41
42pub const CATALOG_NAME: &str = "catalog_name";
43pub const SCHEMA_NAME: &str = "schema_name";
44const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
45const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
46pub const SCHEMA_OPTS: &str = "options";
48const INIT_CAPACITY: usize = 42;
49
50#[derive(Debug)]
52pub(super) struct InformationSchemaSchemata {
53 schema: SchemaRef,
54 catalog_name: String,
55 catalog_manager: Weak<dyn CatalogManager>,
56}
57
58impl InformationSchemaSchemata {
59 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
60 Self {
61 schema: Self::schema(),
62 catalog_name,
63 catalog_manager,
64 }
65 }
66
67 pub(crate) fn schema() -> SchemaRef {
68 Arc::new(Schema::new(vec![
69 ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false),
70 ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false),
71 ColumnSchema::new(
72 DEFAULT_CHARACTER_SET_NAME,
73 ConcreteDataType::string_datatype(),
74 false,
75 ),
76 ColumnSchema::new(
77 DEFAULT_COLLATION_NAME,
78 ConcreteDataType::string_datatype(),
79 false,
80 ),
81 ColumnSchema::new("sql_path", ConcreteDataType::string_datatype(), true),
82 ColumnSchema::new(SCHEMA_OPTS, ConcreteDataType::string_datatype(), true),
83 ]))
84 }
85
86 fn builder(&self) -> InformationSchemaSchemataBuilder {
87 InformationSchemaSchemataBuilder::new(
88 self.schema.clone(),
89 self.catalog_name.clone(),
90 self.catalog_manager.clone(),
91 )
92 }
93}
94
95impl InformationTable for InformationSchemaSchemata {
96 fn table_id(&self) -> TableId {
97 INFORMATION_SCHEMA_SCHEMATA_TABLE_ID
98 }
99
100 fn table_name(&self) -> &'static str {
101 SCHEMATA
102 }
103
104 fn schema(&self) -> SchemaRef {
105 self.schema.clone()
106 }
107
108 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
109 let schema = self.schema.arrow_schema().clone();
110 let mut builder = self.builder();
111 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
112 schema,
113 futures::stream::once(async move {
114 builder
115 .make_schemata(Some(request))
116 .await
117 .map(|x| x.into_df_record_batch())
118 .map_err(Into::into)
119 }),
120 ));
121 Ok(Box::pin(
122 RecordBatchStreamAdapter::try_new(stream)
123 .map_err(BoxedError::new)
124 .context(InternalSnafu)?,
125 ))
126 }
127}
128
129struct InformationSchemaSchemataBuilder {
133 schema: SchemaRef,
134 catalog_name: String,
135 catalog_manager: Weak<dyn CatalogManager>,
136
137 catalog_names: StringVectorBuilder,
138 schema_names: StringVectorBuilder,
139 charset_names: StringVectorBuilder,
140 collation_names: StringVectorBuilder,
141 sql_paths: StringVectorBuilder,
142 schema_options: StringVectorBuilder,
143}
144
145impl InformationSchemaSchemataBuilder {
146 fn new(
147 schema: SchemaRef,
148 catalog_name: String,
149 catalog_manager: Weak<dyn CatalogManager>,
150 ) -> Self {
151 Self {
152 schema,
153 catalog_name,
154 catalog_manager,
155 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
156 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
157 charset_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
158 collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
159 sql_paths: StringVectorBuilder::with_capacity(INIT_CAPACITY),
160 schema_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
161 }
162 }
163
164 async fn make_schemata(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
166 let catalog_name = self.catalog_name.clone();
167 let catalog_manager = self
168 .catalog_manager
169 .upgrade()
170 .context(UpgradeWeakCatalogManagerRefSnafu)?;
171 let table_metadata_manager = utils::table_meta_manager(&self.catalog_manager)?;
172 let predicates = Predicates::from_scan_request(&request);
173
174 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
175 let opts = if let Some(table_metadata_manager) = &table_metadata_manager {
176 table_metadata_manager
177 .schema_manager()
178 .get(SchemaNameKey::new(&catalog_name, &schema_name))
179 .await
180 .context(TableMetadataManagerSnafu)?
181 .map(|schema_opts| format!("{}", schema_opts.into_inner()))
184 } else {
185 None
186 };
187
188 self.add_schema(
189 &predicates,
190 &catalog_name,
191 &schema_name,
192 opts.as_deref().unwrap_or(""),
193 );
194 }
195
196 self.finish()
197 }
198
199 fn add_schema(
200 &mut self,
201 predicates: &Predicates,
202 catalog_name: &str,
203 schema_name: &str,
204 schema_options: &str,
205 ) {
206 let row = [
207 (CATALOG_NAME, &Value::from(catalog_name)),
208 (SCHEMA_NAME, &Value::from(schema_name)),
209 (DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")),
210 (DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")),
211 (SCHEMA_OPTS, &Value::from(schema_options)),
212 ];
213
214 if !predicates.eval(&row) {
215 return;
216 }
217
218 self.catalog_names.push(Some(catalog_name));
219 self.schema_names.push(Some(schema_name));
220 self.charset_names.push(Some("utf8"));
221 self.collation_names.push(Some("utf8_bin"));
222 self.sql_paths.push(None);
223 self.schema_options.push(Some(schema_options));
224 }
225
226 fn finish(&mut self) -> Result<RecordBatch> {
227 let columns: Vec<VectorRef> = vec![
228 Arc::new(self.catalog_names.finish()),
229 Arc::new(self.schema_names.finish()),
230 Arc::new(self.charset_names.finish()),
231 Arc::new(self.collation_names.finish()),
232 Arc::new(self.sql_paths.finish()),
233 Arc::new(self.schema_options.finish()),
234 ];
235 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
236 }
237}
238
239impl DfPartitionStream for InformationSchemaSchemata {
240 fn schema(&self) -> &ArrowSchemaRef {
241 self.schema.arrow_schema()
242 }
243
244 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
245 let schema = self.schema.arrow_schema().clone();
246 let mut builder = self.builder();
247 Box::pin(DfRecordBatchStreamAdapter::new(
248 schema,
249 futures::stream::once(async move {
250 builder
251 .make_schemata(None)
252 .await
253 .map(|x| x.into_df_record_batch())
254 .map_err(Into::into)
255 }),
256 ))
257 }
258}