1use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33 StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
34};
35use futures::StreamExt;
36use snafu::{OptionExt, ResultExt};
37use store_api::storage::{ScanRequest, TableId};
38use table::metadata::{TableInfo, TableType};
39
40use crate::CatalogManager;
41use crate::error::{
42 CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
43 UpgradeWeakCatalogManagerRefSnafu,
44};
45use crate::kvbackend::KvBackendCatalogManager;
46use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
47use crate::system_schema::utils;
48
49pub const TABLE_CATALOG: &str = "table_catalog";
50pub const TABLE_SCHEMA: &str = "table_schema";
51pub const TABLE_NAME: &str = "table_name";
52pub const TABLE_TYPE: &str = "table_type";
53pub const VERSION: &str = "version";
54pub const ROW_FORMAT: &str = "row_format";
55pub const TABLE_ROWS: &str = "table_rows";
56pub const DATA_LENGTH: &str = "data_length";
57pub const INDEX_LENGTH: &str = "index_length";
58pub const MAX_DATA_LENGTH: &str = "max_data_length";
59pub const AVG_ROW_LENGTH: &str = "avg_row_length";
60pub const DATA_FREE: &str = "data_free";
61pub const AUTO_INCREMENT: &str = "auto_increment";
62pub const CREATE_TIME: &str = "create_time";
63pub const UPDATE_TIME: &str = "update_time";
64pub const CHECK_TIME: &str = "check_time";
65pub const TABLE_COLLATION: &str = "table_collation";
66pub const CHECKSUM: &str = "checksum";
67pub const CREATE_OPTIONS: &str = "create_options";
68pub const TABLE_COMMENT: &str = "table_comment";
69pub const MAX_INDEX_LENGTH: &str = "max_index_length";
70pub const TEMPORARY: &str = "temporary";
71const TABLE_ID: &str = "table_id";
72pub const ENGINE: &str = "engine";
73const INIT_CAPACITY: usize = 42;
74
75#[derive(Debug)]
76pub(super) struct InformationSchemaTables {
77 schema: SchemaRef,
78 catalog_name: String,
79 catalog_manager: Weak<dyn CatalogManager>,
80}
81
82impl InformationSchemaTables {
83 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
84 Self {
85 schema: Self::schema(),
86 catalog_name,
87 catalog_manager,
88 }
89 }
90
91 pub(crate) fn schema() -> SchemaRef {
92 Arc::new(Schema::new(vec![
93 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
94 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
95 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
96 ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
97 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
98 ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
99 ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
100 ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
101 ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
102 ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
103 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
104 ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
105 ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
106 ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
107 ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
108 ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
109 ColumnSchema::new(
110 CREATE_TIME,
111 ConcreteDataType::timestamp_second_datatype(),
112 true,
113 ),
114 ColumnSchema::new(
115 UPDATE_TIME,
116 ConcreteDataType::timestamp_second_datatype(),
117 true,
118 ),
119 ColumnSchema::new(
120 CHECK_TIME,
121 ConcreteDataType::timestamp_second_datatype(),
122 true,
123 ),
124 ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
126 ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
127 ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
128 ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
129 ]))
130 }
131
132 fn builder(&self) -> InformationSchemaTablesBuilder {
133 InformationSchemaTablesBuilder::new(
134 self.schema.clone(),
135 self.catalog_name.clone(),
136 self.catalog_manager.clone(),
137 )
138 }
139}
140
141impl InformationTable for InformationSchemaTables {
142 fn table_id(&self) -> TableId {
143 INFORMATION_SCHEMA_TABLES_TABLE_ID
144 }
145
146 fn table_name(&self) -> &'static str {
147 TABLES
148 }
149
150 fn schema(&self) -> SchemaRef {
151 self.schema.clone()
152 }
153
154 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
155 let schema = self.schema.arrow_schema().clone();
156 let mut builder = self.builder();
157 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
158 schema,
159 futures::stream::once(async move {
160 builder
161 .make_tables(Some(request))
162 .await
163 .map(|x| x.into_df_record_batch())
164 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
165 }),
166 ));
167 Ok(Box::pin(
168 RecordBatchStreamAdapter::try_new(stream)
169 .map_err(BoxedError::new)
170 .context(InternalSnafu)?,
171 ))
172 }
173}
174
175struct InformationSchemaTablesBuilder {
179 schema: SchemaRef,
180 catalog_name: String,
181 catalog_manager: Weak<dyn CatalogManager>,
182
183 catalog_names: StringVectorBuilder,
184 schema_names: StringVectorBuilder,
185 table_names: StringVectorBuilder,
186 table_types: StringVectorBuilder,
187 table_ids: UInt32VectorBuilder,
188 version: UInt64VectorBuilder,
189 row_format: StringVectorBuilder,
190 table_rows: UInt64VectorBuilder,
191 data_length: UInt64VectorBuilder,
192 max_data_length: UInt64VectorBuilder,
193 index_length: UInt64VectorBuilder,
194 avg_row_length: UInt64VectorBuilder,
195 max_index_length: UInt64VectorBuilder,
196 data_free: UInt64VectorBuilder,
197 auto_increment: UInt64VectorBuilder,
198 create_time: TimestampSecondVectorBuilder,
199 update_time: TimestampSecondVectorBuilder,
200 check_time: TimestampSecondVectorBuilder,
201 table_collation: StringVectorBuilder,
202 checksum: UInt64VectorBuilder,
203 create_options: StringVectorBuilder,
204 table_comment: StringVectorBuilder,
205 engines: StringVectorBuilder,
206 temporary: StringVectorBuilder,
207}
208
209impl InformationSchemaTablesBuilder {
210 fn new(
211 schema: SchemaRef,
212 catalog_name: String,
213 catalog_manager: Weak<dyn CatalogManager>,
214 ) -> Self {
215 Self {
216 schema,
217 catalog_name,
218 catalog_manager,
219 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222 table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
223 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
224 data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225 max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226 index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227 avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
228 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229 version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230 row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231 table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232 max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233 data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
234 auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
235 create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
236 update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
237 check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
238 table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239 checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
240 create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241 table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242 temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
243 }
244 }
245
246 async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
248 let catalog_name = self.catalog_name.clone();
249 let catalog_manager = self
250 .catalog_manager
251 .upgrade()
252 .context(UpgradeWeakCatalogManagerRefSnafu)?;
253 let partition_manager = catalog_manager
254 .as_any()
255 .downcast_ref::<KvBackendCatalogManager>()
256 .map(|catalog_manager| catalog_manager.partition_manager());
257 let predicates = Predicates::from_scan_request(&request);
258
259 let information_extension = utils::information_extension(&self.catalog_manager)?;
260
261 let region_stats = {
265 let mut x = information_extension
266 .region_stats()
267 .await
268 .unwrap_or_else(|e| {
269 error!(e; "Failed to find region stats in information_schema, fallback to all empty");
270 vec![]
271 });
272 x.sort_unstable_by_key(|x| x.id);
273 x
274 };
275
276 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
277 let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None);
278
279 const BATCH_SIZE: usize = 128;
280 let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
282
283 while let Some(tables) = table_chunks.next().await {
284 let tables = tables.into_iter().collect::<Result<Vec<_>>>()?;
285 let mito_or_physical_table_ids = tables
286 .iter()
287 .filter(|table| {
288 table.table_info().meta.engine == MITO_ENGINE
289 || table.table_info().is_physical_table()
290 })
291 .map(|table| table.table_info().ident.table_id)
292 .collect::<Vec<_>>();
293
294 let table_routes = if let Some(partition_manager) = &partition_manager {
295 partition_manager
296 .batch_find_region_routes(&mito_or_physical_table_ids)
297 .await
298 .context(FindRegionRoutesSnafu)?
299 } else {
300 mito_or_physical_table_ids
301 .into_iter()
302 .map(|id| (id, vec![]))
303 .collect()
304 };
305
306 for table in tables {
307 let table_region_stats =
308 match table_routes.get(&table.table_info().ident.table_id) {
309 Some(routes) => routes
310 .iter()
311 .flat_map(|route| {
312 let region_id = route.region.id;
313 region_stats
314 .binary_search_by_key(®ion_id, |x| x.id)
315 .map(|i| ®ion_stats[i])
316 })
317 .collect::<Vec<_>>(),
318 None => vec![],
319 };
320
321 self.add_table(
322 &predicates,
323 &catalog_name,
324 &schema_name,
325 table.table_info(),
326 table.table_type(),
327 &table_region_stats,
328 );
329 }
330 }
331 }
332
333 self.finish()
334 }
335
336 #[allow(clippy::too_many_arguments)]
337 fn add_table(
338 &mut self,
339 predicates: &Predicates,
340 catalog_name: &str,
341 schema_name: &str,
342 table_info: Arc<TableInfo>,
343 table_type: TableType,
344 region_stats: &[&RegionStat],
345 ) {
346 let table_name = table_info.name.as_ref();
347 let table_id = table_info.table_id();
348 let engine = table_info.meta.engine.as_ref();
349
350 let table_type_text = match table_type {
351 TableType::Base => "BASE TABLE",
352 TableType::View => "VIEW",
353 TableType::Temporary => "LOCAL TEMPORARY",
354 };
355
356 let row = [
357 (TABLE_CATALOG, &Value::from(catalog_name)),
358 (TABLE_ID, &Value::from(table_id)),
359 (TABLE_SCHEMA, &Value::from(schema_name)),
360 (ENGINE, &Value::from(engine)),
361 (TABLE_NAME, &Value::from(table_name)),
362 (TABLE_TYPE, &Value::from(table_type_text)),
363 ];
364
365 if !predicates.eval(&row) {
366 return;
367 }
368
369 self.catalog_names.push(Some(catalog_name));
370 self.schema_names.push(Some(schema_name));
371 self.table_names.push(Some(table_name));
372 self.table_types.push(Some(table_type_text));
373 self.table_ids.push(Some(table_id));
374
375 let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
376 let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
377 let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
378
379 let avg_row_length = if table_rows > 0 {
381 let total_data_length = data_length
382 + region_stats
383 .iter()
384 .map(|stat| stat.memtable_size)
385 .sum::<u64>();
386
387 total_data_length / table_rows
388 } else {
389 0
390 };
391
392 self.data_length.push(Some(data_length));
393 self.index_length.push(Some(index_length));
394 self.table_rows.push(Some(table_rows));
395 self.avg_row_length.push(Some(avg_row_length));
396
397 self.max_data_length.push(Some(0));
399 self.checksum.push(Some(0));
400 self.max_index_length.push(Some(0));
401 self.data_free.push(Some(0));
402 self.auto_increment.push(Some(0));
403 self.row_format.push(Some("Fixed"));
404 self.table_collation.push(Some("utf8_bin"));
405 self.update_time
406 .push(Some(table_info.meta.updated_on.timestamp().into()));
407 self.check_time.push(None);
408 self.version.push(Some(11));
410 self.table_comment.push(table_info.desc.as_deref());
411 self.create_options
412 .push(Some(table_info.meta.options.to_string().as_ref()));
413 self.create_time
414 .push(Some(table_info.meta.created_on.timestamp().into()));
415
416 self.temporary
417 .push(if matches!(table_type, TableType::Temporary) {
418 Some("Y")
419 } else {
420 Some("N")
421 });
422 self.engines.push(Some(engine));
423 }
424
425 fn finish(&mut self) -> Result<RecordBatch> {
426 let columns: Vec<VectorRef> = vec![
427 Arc::new(self.catalog_names.finish()),
428 Arc::new(self.schema_names.finish()),
429 Arc::new(self.table_names.finish()),
430 Arc::new(self.table_types.finish()),
431 Arc::new(self.table_ids.finish()),
432 Arc::new(self.data_length.finish()),
433 Arc::new(self.max_data_length.finish()),
434 Arc::new(self.index_length.finish()),
435 Arc::new(self.max_index_length.finish()),
436 Arc::new(self.avg_row_length.finish()),
437 Arc::new(self.engines.finish()),
438 Arc::new(self.version.finish()),
439 Arc::new(self.row_format.finish()),
440 Arc::new(self.table_rows.finish()),
441 Arc::new(self.data_free.finish()),
442 Arc::new(self.auto_increment.finish()),
443 Arc::new(self.create_time.finish()),
444 Arc::new(self.update_time.finish()),
445 Arc::new(self.check_time.finish()),
446 Arc::new(self.table_collation.finish()),
447 Arc::new(self.checksum.finish()),
448 Arc::new(self.create_options.finish()),
449 Arc::new(self.table_comment.finish()),
450 Arc::new(self.temporary.finish()),
451 ];
452 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
453 }
454}
455
456impl DfPartitionStream for InformationSchemaTables {
457 fn schema(&self) -> &ArrowSchemaRef {
458 self.schema.arrow_schema()
459 }
460
461 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
462 let schema = self.schema.arrow_schema().clone();
463 let mut builder = self.builder();
464 Box::pin(DfRecordBatchStreamAdapter::new(
465 schema,
466 futures::stream::once(async move {
467 builder
468 .make_tables(None)
469 .await
470 .map(|x| x.into_df_record_batch())
471 .map_err(Into::into)
472 }),
473 ))
474 }
475}