1use std::collections::HashSet;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33 StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
34};
35use futures::TryStreamExt;
36use snafu::{OptionExt, ResultExt};
37use store_api::storage::{RegionId, ScanRequest, TableId};
38use table::metadata::{TableInfo, TableType};
39
40use crate::CatalogManager;
41use crate::error::{
42 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
45use crate::system_schema::utils;
46
47pub const TABLE_CATALOG: &str = "table_catalog";
48pub const TABLE_SCHEMA: &str = "table_schema";
49pub const TABLE_NAME: &str = "table_name";
50pub const TABLE_TYPE: &str = "table_type";
51pub const VERSION: &str = "version";
52pub const ROW_FORMAT: &str = "row_format";
53pub const TABLE_ROWS: &str = "table_rows";
54pub const DATA_LENGTH: &str = "data_length";
55pub const INDEX_LENGTH: &str = "index_length";
56pub const MAX_DATA_LENGTH: &str = "max_data_length";
57pub const AVG_ROW_LENGTH: &str = "avg_row_length";
58pub const DATA_FREE: &str = "data_free";
59pub const AUTO_INCREMENT: &str = "auto_increment";
60pub const CREATE_TIME: &str = "create_time";
61pub const UPDATE_TIME: &str = "update_time";
62pub const CHECK_TIME: &str = "check_time";
63pub const TABLE_COLLATION: &str = "table_collation";
64pub const CHECKSUM: &str = "checksum";
65pub const CREATE_OPTIONS: &str = "create_options";
66pub const TABLE_COMMENT: &str = "table_comment";
67pub const MAX_INDEX_LENGTH: &str = "max_index_length";
68pub const TEMPORARY: &str = "temporary";
69const TABLE_ID: &str = "table_id";
70pub const ENGINE: &str = "engine";
71const INIT_CAPACITY: usize = 42;
72
73#[derive(Debug)]
74pub(super) struct InformationSchemaTables {
75 schema: SchemaRef,
76 catalog_name: String,
77 catalog_manager: Weak<dyn CatalogManager>,
78}
79
80impl InformationSchemaTables {
81 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
82 Self {
83 schema: Self::schema(),
84 catalog_name,
85 catalog_manager,
86 }
87 }
88
89 pub(crate) fn schema() -> SchemaRef {
90 Arc::new(Schema::new(vec![
91 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
92 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
93 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
94 ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
95 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
96 ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
97 ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
98 ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
99 ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
100 ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
101 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
102 ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
103 ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
104 ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
105 ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
106 ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
107 ColumnSchema::new(
108 CREATE_TIME,
109 ConcreteDataType::timestamp_second_datatype(),
110 true,
111 ),
112 ColumnSchema::new(
113 UPDATE_TIME,
114 ConcreteDataType::timestamp_second_datatype(),
115 true,
116 ),
117 ColumnSchema::new(
118 CHECK_TIME,
119 ConcreteDataType::timestamp_second_datatype(),
120 true,
121 ),
122 ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
123 ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
124 ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
126 ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
127 ]))
128 }
129
130 fn builder(&self) -> InformationSchemaTablesBuilder {
131 InformationSchemaTablesBuilder::new(
132 self.schema.clone(),
133 self.catalog_name.clone(),
134 self.catalog_manager.clone(),
135 )
136 }
137}
138
139impl InformationTable for InformationSchemaTables {
140 fn table_id(&self) -> TableId {
141 INFORMATION_SCHEMA_TABLES_TABLE_ID
142 }
143
144 fn table_name(&self) -> &'static str {
145 TABLES
146 }
147
148 fn schema(&self) -> SchemaRef {
149 self.schema.clone()
150 }
151
152 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
153 let schema = self.schema.arrow_schema().clone();
154 let mut builder = self.builder();
155 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
156 schema,
157 futures::stream::once(async move {
158 builder
159 .make_tables(Some(request))
160 .await
161 .map(|x| x.into_df_record_batch())
162 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
163 }),
164 ));
165 Ok(Box::pin(
166 RecordBatchStreamAdapter::try_new(stream)
167 .map_err(BoxedError::new)
168 .context(InternalSnafu)?,
169 ))
170 }
171}
172
173struct InformationSchemaTablesBuilder {
177 schema: SchemaRef,
178 catalog_name: String,
179 catalog_manager: Weak<dyn CatalogManager>,
180
181 catalog_names: StringVectorBuilder,
182 schema_names: StringVectorBuilder,
183 table_names: StringVectorBuilder,
184 table_types: StringVectorBuilder,
185 table_ids: UInt32VectorBuilder,
186 version: UInt64VectorBuilder,
187 row_format: StringVectorBuilder,
188 table_rows: UInt64VectorBuilder,
189 data_length: UInt64VectorBuilder,
190 max_data_length: UInt64VectorBuilder,
191 index_length: UInt64VectorBuilder,
192 avg_row_length: UInt64VectorBuilder,
193 max_index_length: UInt64VectorBuilder,
194 data_free: UInt64VectorBuilder,
195 auto_increment: UInt64VectorBuilder,
196 create_time: TimestampSecondVectorBuilder,
197 update_time: TimestampSecondVectorBuilder,
198 check_time: TimestampSecondVectorBuilder,
199 table_collation: StringVectorBuilder,
200 checksum: UInt64VectorBuilder,
201 create_options: StringVectorBuilder,
202 table_comment: StringVectorBuilder,
203 engines: StringVectorBuilder,
204 temporary: StringVectorBuilder,
205}
206
207impl InformationSchemaTablesBuilder {
208 fn new(
209 schema: SchemaRef,
210 catalog_name: String,
211 catalog_manager: Weak<dyn CatalogManager>,
212 ) -> Self {
213 Self {
214 schema,
215 catalog_name,
216 catalog_manager,
217 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220 table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
222 data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
223 max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224 index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225 avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
227 version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
228 row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229 table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230 max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231 data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232 auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233 create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
234 update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235 check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
236 table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
237 checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
238 create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239 table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240 temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241 }
242 }
243
244 async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
246 let catalog_name = self.catalog_name.clone();
247 let catalog_manager = self
248 .catalog_manager
249 .upgrade()
250 .context(UpgradeWeakCatalogManagerRefSnafu)?;
251 let predicates = Predicates::from_scan_request(&request);
252
253 let information_extension = utils::information_extension(&self.catalog_manager)?;
254
255 let region_stats = information_extension
259 .region_stats()
260 .await
261 .map_err(|e| {
262 error!(e; "Failed to call region_stats");
263 e
264 })
265 .unwrap_or_else(|_| vec![]);
266
267 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
268 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
269
270 while let Some(table) = stream.try_next().await? {
271 let table_info = table.table_info();
272
273 let table_region_stats =
275 if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
276 let region_ids = table_info
277 .meta
278 .region_numbers
279 .iter()
280 .map(|n| RegionId::new(table_info.ident.table_id, *n))
281 .collect::<HashSet<_>>();
282
283 region_stats
284 .iter()
285 .filter(|stat| region_ids.contains(&stat.id))
286 .collect::<Vec<_>>()
287 } else {
288 vec![]
289 };
290
291 self.add_table(
292 &predicates,
293 &catalog_name,
294 &schema_name,
295 table_info,
296 table.table_type(),
297 &table_region_stats,
298 );
299 }
300 }
301
302 self.finish()
303 }
304
305 #[allow(clippy::too_many_arguments)]
306 fn add_table(
307 &mut self,
308 predicates: &Predicates,
309 catalog_name: &str,
310 schema_name: &str,
311 table_info: Arc<TableInfo>,
312 table_type: TableType,
313 region_stats: &[&RegionStat],
314 ) {
315 let table_name = table_info.name.as_ref();
316 let table_id = table_info.table_id();
317 let engine = table_info.meta.engine.as_ref();
318
319 let table_type_text = match table_type {
320 TableType::Base => "BASE TABLE",
321 TableType::View => "VIEW",
322 TableType::Temporary => "LOCAL TEMPORARY",
323 };
324
325 let row = [
326 (TABLE_CATALOG, &Value::from(catalog_name)),
327 (TABLE_ID, &Value::from(table_id)),
328 (TABLE_SCHEMA, &Value::from(schema_name)),
329 (ENGINE, &Value::from(engine)),
330 (TABLE_NAME, &Value::from(table_name)),
331 (TABLE_TYPE, &Value::from(table_type_text)),
332 ];
333
334 if !predicates.eval(&row) {
335 return;
336 }
337
338 self.catalog_names.push(Some(catalog_name));
339 self.schema_names.push(Some(schema_name));
340 self.table_names.push(Some(table_name));
341 self.table_types.push(Some(table_type_text));
342 self.table_ids.push(Some(table_id));
343
344 let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
345 let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
346 let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
347
348 let avg_row_length = if table_rows > 0 {
350 let total_data_length = data_length
351 + region_stats
352 .iter()
353 .map(|stat| stat.memtable_size)
354 .sum::<u64>();
355
356 total_data_length / table_rows
357 } else {
358 0
359 };
360
361 self.data_length.push(Some(data_length));
362 self.index_length.push(Some(index_length));
363 self.table_rows.push(Some(table_rows));
364 self.avg_row_length.push(Some(avg_row_length));
365
366 self.max_data_length.push(Some(0));
368 self.checksum.push(Some(0));
369 self.max_index_length.push(Some(0));
370 self.data_free.push(Some(0));
371 self.auto_increment.push(Some(0));
372 self.row_format.push(Some("Fixed"));
373 self.table_collation.push(Some("utf8_bin"));
374 self.update_time.push(None);
375 self.check_time.push(None);
376 self.version.push(Some(11));
378 self.table_comment.push(table_info.desc.as_deref());
379 self.create_options
380 .push(Some(table_info.meta.options.to_string().as_ref()));
381 self.create_time
382 .push(Some(table_info.meta.created_on.timestamp().into()));
383
384 self.temporary
385 .push(if matches!(table_type, TableType::Temporary) {
386 Some("Y")
387 } else {
388 Some("N")
389 });
390 self.engines.push(Some(engine));
391 }
392
393 fn finish(&mut self) -> Result<RecordBatch> {
394 let columns: Vec<VectorRef> = vec![
395 Arc::new(self.catalog_names.finish()),
396 Arc::new(self.schema_names.finish()),
397 Arc::new(self.table_names.finish()),
398 Arc::new(self.table_types.finish()),
399 Arc::new(self.table_ids.finish()),
400 Arc::new(self.data_length.finish()),
401 Arc::new(self.max_data_length.finish()),
402 Arc::new(self.index_length.finish()),
403 Arc::new(self.max_index_length.finish()),
404 Arc::new(self.avg_row_length.finish()),
405 Arc::new(self.engines.finish()),
406 Arc::new(self.version.finish()),
407 Arc::new(self.row_format.finish()),
408 Arc::new(self.table_rows.finish()),
409 Arc::new(self.data_free.finish()),
410 Arc::new(self.auto_increment.finish()),
411 Arc::new(self.create_time.finish()),
412 Arc::new(self.update_time.finish()),
413 Arc::new(self.check_time.finish()),
414 Arc::new(self.table_collation.finish()),
415 Arc::new(self.checksum.finish()),
416 Arc::new(self.create_options.finish()),
417 Arc::new(self.table_comment.finish()),
418 Arc::new(self.temporary.finish()),
419 ];
420 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
421 }
422}
423
424impl DfPartitionStream for InformationSchemaTables {
425 fn schema(&self) -> &ArrowSchemaRef {
426 self.schema.arrow_schema()
427 }
428
429 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
430 let schema = self.schema.arrow_schema().clone();
431 let mut builder = self.builder();
432 Box::pin(DfRecordBatchStreamAdapter::new(
433 schema,
434 futures::stream::once(async move {
435 builder
436 .make_tables(None)
437 .await
438 .map(|x| x.into_df_record_batch())
439 .map_err(Into::into)
440 }),
441 ))
442 }
443}