1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::{
19 INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
20 SEMANTIC_TYPE_TIME_INDEX,
21};
22use common_error::ext::BoxedError;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, DataType, MutableVector};
30use datatypes::scalars::ScalarVectorBuilder;
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::value::Value;
33use datatypes::vectors::{
34 ConstantVector, Int64Vector, Int64VectorBuilder, StringVector, StringVectorBuilder, VectorRef,
35};
36use futures::TryStreamExt;
37use snafu::{OptionExt, ResultExt};
38use sql::statements;
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::error::{
42 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::information_schema::Predicates;
45use crate::system_schema::information_schema::{InformationTable, COLUMNS};
46use crate::CatalogManager;
47
48#[derive(Debug)]
49pub(super) struct InformationSchemaColumns {
50 schema: SchemaRef,
51 catalog_name: String,
52 catalog_manager: Weak<dyn CatalogManager>,
53}
54
55pub const TABLE_CATALOG: &str = "table_catalog";
56pub const TABLE_SCHEMA: &str = "table_schema";
57pub const TABLE_NAME: &str = "table_name";
58pub const COLUMN_NAME: &str = "column_name";
59pub const REGION_ID: &str = "region_id";
60pub const PEER_ID: &str = "peer_id";
61const ORDINAL_POSITION: &str = "ordinal_position";
62const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
63const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";
64const NUMERIC_PRECISION: &str = "numeric_precision";
65const NUMERIC_SCALE: &str = "numeric_scale";
66const DATETIME_PRECISION: &str = "datetime_precision";
67const CHARACTER_SET_NAME: &str = "character_set_name";
68pub const COLLATION_NAME: &str = "collation_name";
69pub const COLUMN_KEY: &str = "column_key";
70pub const EXTRA: &str = "extra";
71pub const PRIVILEGES: &str = "privileges";
72const GENERATION_EXPRESSION: &str = "generation_expression";
73pub const GREPTIME_DATA_TYPE: &str = "greptime_data_type";
75pub const DATA_TYPE: &str = "data_type";
76pub const SEMANTIC_TYPE: &str = "semantic_type";
77pub const COLUMN_DEFAULT: &str = "column_default";
78pub const IS_NULLABLE: &str = "is_nullable";
79const COLUMN_TYPE: &str = "column_type";
80pub const COLUMN_COMMENT: &str = "column_comment";
81const SRS_ID: &str = "srs_id";
82const INIT_CAPACITY: usize = 42;
83
84const MAX_STRING_LENGTH: i64 = 2147483647;
86const UTF8_CHARSET_NAME: &str = "utf8";
87const UTF8_COLLATE_NAME: &str = "utf8_bin";
88const PRI_COLUMN_KEY: &str = "PRI";
89const TIME_INDEX_COLUMN_KEY: &str = "TIME INDEX";
90const DEFAULT_PRIVILEGES: &str = "select,insert";
91const EMPTY_STR: &str = "";
92
93impl InformationSchemaColumns {
94 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
95 Self {
96 schema: Self::schema(),
97 catalog_name,
98 catalog_manager,
99 }
100 }
101
102 fn schema() -> SchemaRef {
103 Arc::new(Schema::new(vec![
104 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
105 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
106 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
107 ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
108 ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::int64_datatype(), false),
109 ColumnSchema::new(
110 CHARACTER_MAXIMUM_LENGTH,
111 ConcreteDataType::int64_datatype(),
112 true,
113 ),
114 ColumnSchema::new(
115 CHARACTER_OCTET_LENGTH,
116 ConcreteDataType::int64_datatype(),
117 true,
118 ),
119 ColumnSchema::new(NUMERIC_PRECISION, ConcreteDataType::int64_datatype(), true),
120 ColumnSchema::new(NUMERIC_SCALE, ConcreteDataType::int64_datatype(), true),
121 ColumnSchema::new(DATETIME_PRECISION, ConcreteDataType::int64_datatype(), true),
122 ColumnSchema::new(
123 CHARACTER_SET_NAME,
124 ConcreteDataType::string_datatype(),
125 true,
126 ),
127 ColumnSchema::new(COLLATION_NAME, ConcreteDataType::string_datatype(), true),
128 ColumnSchema::new(COLUMN_KEY, ConcreteDataType::string_datatype(), false),
129 ColumnSchema::new(EXTRA, ConcreteDataType::string_datatype(), false),
130 ColumnSchema::new(PRIVILEGES, ConcreteDataType::string_datatype(), false),
131 ColumnSchema::new(
132 GENERATION_EXPRESSION,
133 ConcreteDataType::string_datatype(),
134 false,
135 ),
136 ColumnSchema::new(
137 GREPTIME_DATA_TYPE,
138 ConcreteDataType::string_datatype(),
139 false,
140 ),
141 ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
142 ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
143 ColumnSchema::new(COLUMN_DEFAULT, ConcreteDataType::string_datatype(), true),
144 ColumnSchema::new(IS_NULLABLE, ConcreteDataType::string_datatype(), false),
145 ColumnSchema::new(COLUMN_TYPE, ConcreteDataType::string_datatype(), false),
146 ColumnSchema::new(COLUMN_COMMENT, ConcreteDataType::string_datatype(), true),
147 ColumnSchema::new(SRS_ID, ConcreteDataType::int64_datatype(), true),
148 ]))
149 }
150
151 fn builder(&self) -> InformationSchemaColumnsBuilder {
152 InformationSchemaColumnsBuilder::new(
153 self.schema.clone(),
154 self.catalog_name.clone(),
155 self.catalog_manager.clone(),
156 )
157 }
158}
159
160impl InformationTable for InformationSchemaColumns {
161 fn table_id(&self) -> TableId {
162 INFORMATION_SCHEMA_COLUMNS_TABLE_ID
163 }
164
165 fn table_name(&self) -> &'static str {
166 COLUMNS
167 }
168
169 fn schema(&self) -> SchemaRef {
170 self.schema.clone()
171 }
172
173 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
174 let schema = self.schema.arrow_schema().clone();
175 let mut builder = self.builder();
176 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
177 schema,
178 futures::stream::once(async move {
179 builder
180 .make_columns(Some(request))
181 .await
182 .map(|x| x.into_df_record_batch())
183 .map_err(Into::into)
184 }),
185 ));
186 Ok(Box::pin(
187 RecordBatchStreamAdapter::try_new(stream)
188 .map_err(BoxedError::new)
189 .context(InternalSnafu)?,
190 ))
191 }
192}
193
194struct InformationSchemaColumnsBuilder {
195 schema: SchemaRef,
196 catalog_name: String,
197 catalog_manager: Weak<dyn CatalogManager>,
198
199 catalog_names: StringVectorBuilder,
200 schema_names: StringVectorBuilder,
201 table_names: StringVectorBuilder,
202 column_names: StringVectorBuilder,
203 ordinal_positions: Int64VectorBuilder,
204 character_maximum_lengths: Int64VectorBuilder,
205 character_octet_lengths: Int64VectorBuilder,
206 numeric_precisions: Int64VectorBuilder,
207 numeric_scales: Int64VectorBuilder,
208 datetime_precisions: Int64VectorBuilder,
209 character_set_names: StringVectorBuilder,
210 collation_names: StringVectorBuilder,
211 column_keys: StringVectorBuilder,
212 greptime_data_types: StringVectorBuilder,
213 data_types: StringVectorBuilder,
214 semantic_types: StringVectorBuilder,
215 column_defaults: StringVectorBuilder,
216 is_nullables: StringVectorBuilder,
217 column_types: StringVectorBuilder,
218 column_comments: StringVectorBuilder,
219}
220
221impl InformationSchemaColumnsBuilder {
222 fn new(
223 schema: SchemaRef,
224 catalog_name: String,
225 catalog_manager: Weak<dyn CatalogManager>,
226 ) -> Self {
227 Self {
228 schema,
229 catalog_name,
230 catalog_manager,
231 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234 column_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235 ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
236 character_maximum_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
237 character_octet_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
238 numeric_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
239 numeric_scales: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
240 datetime_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
241 character_set_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242 collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
243 column_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
244 greptime_data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
245 data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
246 semantic_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
247 column_defaults: StringVectorBuilder::with_capacity(INIT_CAPACITY),
248 is_nullables: StringVectorBuilder::with_capacity(INIT_CAPACITY),
249 column_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
250 column_comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
251 }
252 }
253
254 async fn make_columns(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
256 let catalog_name = self.catalog_name.clone();
257 let catalog_manager = self
258 .catalog_manager
259 .upgrade()
260 .context(UpgradeWeakCatalogManagerRefSnafu)?;
261 let predicates = Predicates::from_scan_request(&request);
262
263 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
264 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
265
266 while let Some(table) = stream.try_next().await? {
267 let keys = &table.table_info().meta.primary_key_indices;
268 let schema = table.schema();
269
270 for (idx, column) in schema.column_schemas().iter().enumerate() {
271 let semantic_type = if column.is_time_index() {
272 SEMANTIC_TYPE_TIME_INDEX
273 } else if keys.contains(&idx) {
274 SEMANTIC_TYPE_PRIMARY_KEY
275 } else {
276 SEMANTIC_TYPE_FIELD
277 };
278
279 self.add_column(
280 &predicates,
281 idx,
282 &catalog_name,
283 &schema_name,
284 &table.table_info().name,
285 semantic_type,
286 column,
287 );
288 }
289 }
290 }
291
292 self.finish()
293 }
294
295 #[allow(clippy::too_many_arguments)]
296 fn add_column(
297 &mut self,
298 predicates: &Predicates,
299 index: usize,
300 catalog_name: &str,
301 schema_name: &str,
302 table_name: &str,
303 semantic_type: &str,
304 column_schema: &ColumnSchema,
305 ) {
306 let data_type = statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
308 .map(|dt| dt.to_string().to_lowercase())
309 .unwrap_or_else(|_| column_schema.data_type.name());
310
311 let column_key = match semantic_type {
312 SEMANTIC_TYPE_PRIMARY_KEY => PRI_COLUMN_KEY,
313 SEMANTIC_TYPE_TIME_INDEX => TIME_INDEX_COLUMN_KEY,
314 _ => EMPTY_STR,
315 };
316
317 let row = [
318 (TABLE_CATALOG, &Value::from(catalog_name)),
319 (TABLE_SCHEMA, &Value::from(schema_name)),
320 (TABLE_NAME, &Value::from(table_name)),
321 (COLUMN_NAME, &Value::from(column_schema.name.as_str())),
322 (DATA_TYPE, &Value::from(data_type.as_str())),
323 (SEMANTIC_TYPE, &Value::from(semantic_type)),
324 (ORDINAL_POSITION, &Value::from((index + 1) as i64)),
325 (COLUMN_KEY, &Value::from(column_key)),
326 ];
327
328 if !predicates.eval(&row) {
329 return;
330 }
331
332 self.catalog_names.push(Some(catalog_name));
333 self.schema_names.push(Some(schema_name));
334 self.table_names.push(Some(table_name));
335 self.column_names.push(Some(&column_schema.name));
336 self.ordinal_positions.push(Some((index + 1) as i64));
338
339 if column_schema.data_type.is_string() {
340 self.character_maximum_lengths.push(Some(MAX_STRING_LENGTH));
341 self.character_octet_lengths.push(Some(MAX_STRING_LENGTH));
342 self.numeric_precisions.push(None);
343 self.numeric_scales.push(None);
344 self.datetime_precisions.push(None);
345 self.character_set_names.push(Some(UTF8_CHARSET_NAME));
346 self.collation_names.push(Some(UTF8_COLLATE_NAME));
347 } else if column_schema.data_type.is_numeric() || column_schema.data_type.is_decimal() {
348 self.character_maximum_lengths.push(None);
349 self.character_octet_lengths.push(None);
350
351 self.numeric_precisions.push(
352 column_schema
353 .data_type
354 .numeric_precision()
355 .map(|x| x as i64),
356 );
357 self.numeric_scales
358 .push(column_schema.data_type.numeric_scale().map(|x| x as i64));
359
360 self.datetime_precisions.push(None);
361 self.character_set_names.push(None);
362 self.collation_names.push(None);
363 } else {
364 self.character_maximum_lengths.push(None);
365 self.character_octet_lengths.push(None);
366 self.numeric_precisions.push(None);
367 self.numeric_scales.push(None);
368
369 match &column_schema.data_type {
370 ConcreteDataType::Timestamp(ts_type) => {
371 self.datetime_precisions
372 .push(Some(ts_type.precision() as i64));
373 }
374 ConcreteDataType::Time(time_type) => {
375 self.datetime_precisions
376 .push(Some(time_type.precision() as i64));
377 }
378 _ => self.datetime_precisions.push(None),
379 }
380
381 self.character_set_names.push(None);
382 self.collation_names.push(None);
383 }
384
385 self.column_keys.push(Some(column_key));
386 self.greptime_data_types
387 .push(Some(&column_schema.data_type.name()));
388 self.data_types.push(Some(&data_type));
389 self.semantic_types.push(Some(semantic_type));
390 self.column_defaults.push(
391 column_schema
392 .default_constraint()
393 .map(|s| format!("{}", s))
394 .as_deref(),
395 );
396 if column_schema.is_nullable() {
397 self.is_nullables.push(Some("Yes"));
398 } else {
399 self.is_nullables.push(Some("No"));
400 }
401 self.column_types.push(Some(&data_type));
402 self.column_comments
403 .push(column_schema.column_comment().map(|x| x.as_ref()));
404 }
405
406 fn finish(&mut self) -> Result<RecordBatch> {
407 let rows_num = self.collation_names.len();
408
409 let privileges = Arc::new(ConstantVector::new(
410 Arc::new(StringVector::from(vec![DEFAULT_PRIVILEGES])),
411 rows_num,
412 ));
413 let empty_string = Arc::new(ConstantVector::new(
414 Arc::new(StringVector::from(vec![EMPTY_STR])),
415 rows_num,
416 ));
417 let srs_ids = Arc::new(ConstantVector::new(
418 Arc::new(Int64Vector::from(vec![None])),
419 rows_num,
420 ));
421
422 let columns: Vec<VectorRef> = vec![
423 Arc::new(self.catalog_names.finish()),
424 Arc::new(self.schema_names.finish()),
425 Arc::new(self.table_names.finish()),
426 Arc::new(self.column_names.finish()),
427 Arc::new(self.ordinal_positions.finish()),
428 Arc::new(self.character_maximum_lengths.finish()),
429 Arc::new(self.character_octet_lengths.finish()),
430 Arc::new(self.numeric_precisions.finish()),
431 Arc::new(self.numeric_scales.finish()),
432 Arc::new(self.datetime_precisions.finish()),
433 Arc::new(self.character_set_names.finish()),
434 Arc::new(self.collation_names.finish()),
435 Arc::new(self.column_keys.finish()),
436 empty_string.clone(),
437 privileges,
438 empty_string,
439 Arc::new(self.greptime_data_types.finish()),
440 Arc::new(self.data_types.finish()),
441 Arc::new(self.semantic_types.finish()),
442 Arc::new(self.column_defaults.finish()),
443 Arc::new(self.is_nullables.finish()),
444 Arc::new(self.column_types.finish()),
445 Arc::new(self.column_comments.finish()),
446 srs_ids,
447 ];
448
449 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
450 }
451}
452
453impl DfPartitionStream for InformationSchemaColumns {
454 fn schema(&self) -> &ArrowSchemaRef {
455 self.schema.arrow_schema()
456 }
457
458 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
459 let schema = self.schema.arrow_schema().clone();
460 let mut builder = self.builder();
461 Box::pin(DfRecordBatchStreamAdapter::new(
462 schema,
463 futures::stream::once(async move {
464 builder
465 .make_columns(None)
466 .await
467 .map(|x| x.into_df_record_batch())
468 .map_err(Into::into)
469 }),
470 ))
471 }
472}