catalog/system_schema/information_schema/
views.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_VIEW_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::StringVectorBuilder;
30use futures::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33use table::metadata::TableType;
34
35use crate::error::{
36 CastManagerSnafu, CreateRecordBatchSnafu, GetViewCacheSnafu, InternalSnafu, Result,
37 UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::system_schema::information_schema::{InformationTable, Predicates, VIEWS};
41use crate::CatalogManager;
42const INIT_CAPACITY: usize = 42;
43
44pub const TABLE_CATALOG: &str = "table_catalog";
45pub const TABLE_SCHEMA: &str = "table_schema";
46pub const TABLE_NAME: &str = "table_name";
47pub const VIEW_DEFINITION: &str = "view_definition";
48pub const CHECK_OPTION: &str = "check_option";
49pub const IS_UPDATABLE: &str = "is_updatable";
50pub const DEFINER: &str = "definer";
51pub const SECURITY_TYPE: &str = "security_type";
52pub const CHARACTER_SET_CLIENT: &str = "character_set_client";
53pub const COLLATION_CONNECTION: &str = "collation_connection";
54
55#[derive(Debug)]
57pub(super) struct InformationSchemaViews {
58 schema: SchemaRef,
59 catalog_name: String,
60 catalog_manager: Weak<dyn CatalogManager>,
61}
62
63impl InformationSchemaViews {
64 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
65 Self {
66 schema: Self::schema(),
67 catalog_name,
68 catalog_manager,
69 }
70 }
71
72 pub(crate) fn schema() -> SchemaRef {
73 Arc::new(Schema::new(vec![
74 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
75 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
76 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
77 ColumnSchema::new(VIEW_DEFINITION, ConcreteDataType::string_datatype(), false),
78 ColumnSchema::new(CHECK_OPTION, ConcreteDataType::string_datatype(), true),
79 ColumnSchema::new(IS_UPDATABLE, ConcreteDataType::string_datatype(), true),
80 ColumnSchema::new(DEFINER, ConcreteDataType::string_datatype(), true),
81 ColumnSchema::new(SECURITY_TYPE, ConcreteDataType::string_datatype(), true),
82 ColumnSchema::new(
83 CHARACTER_SET_CLIENT,
84 ConcreteDataType::string_datatype(),
85 true,
86 ),
87 ColumnSchema::new(
88 COLLATION_CONNECTION,
89 ConcreteDataType::string_datatype(),
90 true,
91 ),
92 ]))
93 }
94
95 fn builder(&self) -> InformationSchemaViewsBuilder {
96 InformationSchemaViewsBuilder::new(
97 self.schema.clone(),
98 self.catalog_name.clone(),
99 self.catalog_manager.clone(),
100 )
101 }
102}
103
104impl InformationTable for InformationSchemaViews {
105 fn table_id(&self) -> TableId {
106 INFORMATION_SCHEMA_VIEW_TABLE_ID
107 }
108
109 fn table_name(&self) -> &'static str {
110 VIEWS
111 }
112
113 fn schema(&self) -> SchemaRef {
114 self.schema.clone()
115 }
116
117 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118 let schema = self.schema.arrow_schema().clone();
119 let mut builder = self.builder();
120 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
121 schema,
122 futures::stream::once(async move {
123 builder
124 .make_views(Some(request))
125 .await
126 .map(|x| x.into_df_record_batch())
127 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
128 }),
129 ));
130 Ok(Box::pin(
131 RecordBatchStreamAdapter::try_new(stream)
132 .map_err(BoxedError::new)
133 .context(InternalSnafu)?,
134 ))
135 }
136}
137
138struct InformationSchemaViewsBuilder {
142 schema: SchemaRef,
143 catalog_name: String,
144 catalog_manager: Weak<dyn CatalogManager>,
145
146 catalog_names: StringVectorBuilder,
147 schema_names: StringVectorBuilder,
148 table_names: StringVectorBuilder,
149 view_definitions: StringVectorBuilder,
150 check_options: StringVectorBuilder,
151 is_updatable: StringVectorBuilder,
152 definer: StringVectorBuilder,
153 security_type: StringVectorBuilder,
154 character_set_client: StringVectorBuilder,
155 collation_connection: StringVectorBuilder,
156}
157
158impl InformationSchemaViewsBuilder {
159 fn new(
160 schema: SchemaRef,
161 catalog_name: String,
162 catalog_manager: Weak<dyn CatalogManager>,
163 ) -> Self {
164 Self {
165 schema,
166 catalog_name,
167 catalog_manager,
168 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
169 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
170 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
171 view_definitions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
172 check_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173 is_updatable: StringVectorBuilder::with_capacity(INIT_CAPACITY),
174 definer: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175 security_type: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176 character_set_client: StringVectorBuilder::with_capacity(INIT_CAPACITY),
177 collation_connection: StringVectorBuilder::with_capacity(INIT_CAPACITY),
178 }
179 }
180
181 async fn make_views(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
183 let catalog_name = self.catalog_name.clone();
184 let catalog_manager = self
185 .catalog_manager
186 .upgrade()
187 .context(UpgradeWeakCatalogManagerRefSnafu)?;
188 let predicates = Predicates::from_scan_request(&request);
189 let view_info_cache = catalog_manager
190 .as_any()
191 .downcast_ref::<KvBackendCatalogManager>()
192 .context(CastManagerSnafu)?
193 .view_info_cache()?;
194
195 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
196 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
197
198 while let Some(table) = stream.try_next().await? {
199 let table_info = table.table_info();
200 if table_info.table_type == TableType::View {
201 let view_info = view_info_cache
202 .get(table_info.ident.table_id)
203 .await
204 .context(GetViewCacheSnafu)?
205 .context(ViewInfoNotFoundSnafu {
206 name: &table_info.name,
207 })?;
208 self.add_view(
209 &predicates,
210 &catalog_name,
211 &schema_name,
212 &table_info.name,
213 &view_info.definition,
214 )
215 }
216 }
217 }
218
219 self.finish()
220 }
221
222 fn add_view(
223 &mut self,
224 predicates: &Predicates,
225 catalog_name: &str,
226 schema_name: &str,
227 table_name: &str,
228 definition: &str,
229 ) {
230 let row = [
231 (TABLE_CATALOG, &Value::from(catalog_name)),
232 (TABLE_SCHEMA, &Value::from(schema_name)),
233 (TABLE_NAME, &Value::from(table_name)),
234 ];
235
236 if !predicates.eval(&row) {
237 return;
238 }
239 self.catalog_names.push(Some(catalog_name));
240 self.schema_names.push(Some(schema_name));
241 self.table_names.push(Some(table_name));
242 self.view_definitions.push(Some(definition));
243 self.check_options.push(None);
244 self.is_updatable.push(Some("NO"));
246 self.definer.push(None);
247 self.security_type.push(None);
248 self.character_set_client.push(Some("utf8"));
249 self.collation_connection.push(Some("utf8_bin"));
250 }
251
252 fn finish(&mut self) -> Result<RecordBatch> {
253 let columns: Vec<VectorRef> = vec![
254 Arc::new(self.catalog_names.finish()),
255 Arc::new(self.schema_names.finish()),
256 Arc::new(self.table_names.finish()),
257 Arc::new(self.view_definitions.finish()),
258 Arc::new(self.check_options.finish()),
259 Arc::new(self.is_updatable.finish()),
260 Arc::new(self.definer.finish()),
261 Arc::new(self.security_type.finish()),
262 Arc::new(self.character_set_client.finish()),
263 Arc::new(self.collation_connection.finish()),
264 ];
265 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
266 }
267}
268
269impl DfPartitionStream for InformationSchemaViews {
270 fn schema(&self) -> &ArrowSchemaRef {
271 self.schema.arrow_schema()
272 }
273
274 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
275 let schema = self.schema.arrow_schema().clone();
276 let mut builder = self.builder();
277 Box::pin(DfRecordBatchStreamAdapter::new(
278 schema,
279 futures::stream::once(async move {
280 builder
281 .make_views(None)
282 .await
283 .map(|x| x.into_df_record_batch())
284 .map_err(Into::into)
285 }),
286 ))
287 }
288}