1use std::collections::HashSet;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33 StringVectorBuilder, TimestampMicrosecondVectorBuilder, UInt32VectorBuilder,
34 UInt64VectorBuilder,
35};
36use futures::TryStreamExt;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{RegionId, ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::error::{
42 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
45use crate::system_schema::utils;
46use crate::CatalogManager;
47
48pub const TABLE_CATALOG: &str = "table_catalog";
49pub const TABLE_SCHEMA: &str = "table_schema";
50pub const TABLE_NAME: &str = "table_name";
51pub const TABLE_TYPE: &str = "table_type";
52pub const VERSION: &str = "version";
53pub const ROW_FORMAT: &str = "row_format";
54pub const TABLE_ROWS: &str = "table_rows";
55pub const DATA_LENGTH: &str = "data_length";
56pub const INDEX_LENGTH: &str = "index_length";
57pub const MAX_DATA_LENGTH: &str = "max_data_length";
58pub const AVG_ROW_LENGTH: &str = "avg_row_length";
59pub const DATA_FREE: &str = "data_free";
60pub const AUTO_INCREMENT: &str = "auto_increment";
61pub const CREATE_TIME: &str = "create_time";
62pub const UPDATE_TIME: &str = "update_time";
63pub const CHECK_TIME: &str = "check_time";
64pub const TABLE_COLLATION: &str = "table_collation";
65pub const CHECKSUM: &str = "checksum";
66pub const CREATE_OPTIONS: &str = "create_options";
67pub const TABLE_COMMENT: &str = "table_comment";
68pub const MAX_INDEX_LENGTH: &str = "max_index_length";
69pub const TEMPORARY: &str = "temporary";
70const TABLE_ID: &str = "table_id";
71pub const ENGINE: &str = "engine";
72const INIT_CAPACITY: usize = 42;
73
74#[derive(Debug)]
75pub(super) struct InformationSchemaTables {
76 schema: SchemaRef,
77 catalog_name: String,
78 catalog_manager: Weak<dyn CatalogManager>,
79}
80
81impl InformationSchemaTables {
82 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
83 Self {
84 schema: Self::schema(),
85 catalog_name,
86 catalog_manager,
87 }
88 }
89
90 pub(crate) fn schema() -> SchemaRef {
91 Arc::new(Schema::new(vec![
92 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
93 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
94 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
95 ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
96 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
97 ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
98 ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
99 ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
100 ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
101 ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
102 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
103 ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
104 ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
105 ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
106 ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
107 ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
108 ColumnSchema::new(
109 CREATE_TIME,
110 ConcreteDataType::timestamp_microsecond_datatype(),
111 true,
112 ),
113 ColumnSchema::new(
114 UPDATE_TIME,
115 ConcreteDataType::timestamp_microsecond_datatype(),
116 true,
117 ),
118 ColumnSchema::new(
119 CHECK_TIME,
120 ConcreteDataType::timestamp_microsecond_datatype(),
121 true,
122 ),
123 ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
124 ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
125 ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
126 ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
127 ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
128 ]))
129 }
130
131 fn builder(&self) -> InformationSchemaTablesBuilder {
132 InformationSchemaTablesBuilder::new(
133 self.schema.clone(),
134 self.catalog_name.clone(),
135 self.catalog_manager.clone(),
136 )
137 }
138}
139
140impl InformationTable for InformationSchemaTables {
141 fn table_id(&self) -> TableId {
142 INFORMATION_SCHEMA_TABLES_TABLE_ID
143 }
144
145 fn table_name(&self) -> &'static str {
146 TABLES
147 }
148
149 fn schema(&self) -> SchemaRef {
150 self.schema.clone()
151 }
152
153 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
154 let schema = self.schema.arrow_schema().clone();
155 let mut builder = self.builder();
156 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
157 schema,
158 futures::stream::once(async move {
159 builder
160 .make_tables(Some(request))
161 .await
162 .map(|x| x.into_df_record_batch())
163 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
164 }),
165 ));
166 Ok(Box::pin(
167 RecordBatchStreamAdapter::try_new(stream)
168 .map_err(BoxedError::new)
169 .context(InternalSnafu)?,
170 ))
171 }
172}
173
174struct InformationSchemaTablesBuilder {
178 schema: SchemaRef,
179 catalog_name: String,
180 catalog_manager: Weak<dyn CatalogManager>,
181
182 catalog_names: StringVectorBuilder,
183 schema_names: StringVectorBuilder,
184 table_names: StringVectorBuilder,
185 table_types: StringVectorBuilder,
186 table_ids: UInt32VectorBuilder,
187 version: UInt64VectorBuilder,
188 row_format: StringVectorBuilder,
189 table_rows: UInt64VectorBuilder,
190 data_length: UInt64VectorBuilder,
191 max_data_length: UInt64VectorBuilder,
192 index_length: UInt64VectorBuilder,
193 avg_row_length: UInt64VectorBuilder,
194 max_index_length: UInt64VectorBuilder,
195 data_free: UInt64VectorBuilder,
196 auto_increment: UInt64VectorBuilder,
197 create_time: TimestampMicrosecondVectorBuilder,
198 update_time: TimestampMicrosecondVectorBuilder,
199 check_time: TimestampMicrosecondVectorBuilder,
200 table_collation: StringVectorBuilder,
201 checksum: UInt64VectorBuilder,
202 create_options: StringVectorBuilder,
203 table_comment: StringVectorBuilder,
204 engines: StringVectorBuilder,
205 temporary: StringVectorBuilder,
206}
207
208impl InformationSchemaTablesBuilder {
209 fn new(
210 schema: SchemaRef,
211 catalog_name: String,
212 catalog_manager: Weak<dyn CatalogManager>,
213 ) -> Self {
214 Self {
215 schema,
216 catalog_name,
217 catalog_manager,
218 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221 table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
223 data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224 max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225 index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226 avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228 version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
229 row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230 table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231 max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232 data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233 auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
234 create_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
235 update_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
236 check_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
237 table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
238 checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
239 create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240 table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241 temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242 }
243 }
244
245 async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
247 let catalog_name = self.catalog_name.clone();
248 let catalog_manager = self
249 .catalog_manager
250 .upgrade()
251 .context(UpgradeWeakCatalogManagerRefSnafu)?;
252 let predicates = Predicates::from_scan_request(&request);
253
254 let information_extension = utils::information_extension(&self.catalog_manager)?;
255
256 let region_stats = information_extension
260 .region_stats()
261 .await
262 .map_err(|e| {
263 error!(e; "Failed to call region_stats");
264 e
265 })
266 .unwrap_or_else(|_| vec![]);
267
268 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
269 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
270
271 while let Some(table) = stream.try_next().await? {
272 let table_info = table.table_info();
273
274 let table_region_stats =
276 if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
277 let region_ids = table_info
278 .meta
279 .region_numbers
280 .iter()
281 .map(|n| RegionId::new(table_info.ident.table_id, *n))
282 .collect::<HashSet<_>>();
283
284 region_stats
285 .iter()
286 .filter(|stat| region_ids.contains(&stat.id))
287 .collect::<Vec<_>>()
288 } else {
289 vec![]
290 };
291
292 self.add_table(
293 &predicates,
294 &catalog_name,
295 &schema_name,
296 table_info,
297 table.table_type(),
298 &table_region_stats,
299 );
300 }
301 }
302
303 self.finish()
304 }
305
306 #[allow(clippy::too_many_arguments)]
307 fn add_table(
308 &mut self,
309 predicates: &Predicates,
310 catalog_name: &str,
311 schema_name: &str,
312 table_info: Arc<TableInfo>,
313 table_type: TableType,
314 region_stats: &[&RegionStat],
315 ) {
316 let table_name = table_info.name.as_ref();
317 let table_id = table_info.table_id();
318 let engine = table_info.meta.engine.as_ref();
319
320 let table_type_text = match table_type {
321 TableType::Base => "BASE TABLE",
322 TableType::View => "VIEW",
323 TableType::Temporary => "LOCAL TEMPORARY",
324 };
325
326 let row = [
327 (TABLE_CATALOG, &Value::from(catalog_name)),
328 (TABLE_ID, &Value::from(table_id)),
329 (TABLE_SCHEMA, &Value::from(schema_name)),
330 (ENGINE, &Value::from(engine)),
331 (TABLE_NAME, &Value::from(table_name)),
332 (TABLE_TYPE, &Value::from(table_type_text)),
333 ];
334
335 if !predicates.eval(&row) {
336 return;
337 }
338
339 self.catalog_names.push(Some(catalog_name));
340 self.schema_names.push(Some(schema_name));
341 self.table_names.push(Some(table_name));
342 self.table_types.push(Some(table_type_text));
343 self.table_ids.push(Some(table_id));
344
345 let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
346 let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
347 let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
348
349 let avg_row_length = if table_rows > 0 {
351 let total_data_length = data_length
352 + region_stats
353 .iter()
354 .map(|stat| stat.memtable_size)
355 .sum::<u64>();
356
357 total_data_length / table_rows
358 } else {
359 0
360 };
361
362 self.data_length.push(Some(data_length));
363 self.index_length.push(Some(index_length));
364 self.table_rows.push(Some(table_rows));
365 self.avg_row_length.push(Some(avg_row_length));
366
367 self.max_data_length.push(Some(0));
369 self.checksum.push(Some(0));
370 self.max_index_length.push(Some(0));
371 self.data_free.push(Some(0));
372 self.auto_increment.push(Some(0));
373 self.row_format.push(Some("Fixed"));
374 self.table_collation.push(Some("utf8_bin"));
375 self.update_time.push(None);
376 self.check_time.push(None);
377 self.version.push(Some(11));
379 self.table_comment.push(table_info.desc.as_deref());
380 self.create_options
381 .push(Some(table_info.meta.options.to_string().as_ref()));
382 self.create_time
383 .push(Some(table_info.meta.created_on.timestamp_millis().into()));
384
385 self.temporary
386 .push(if matches!(table_type, TableType::Temporary) {
387 Some("Y")
388 } else {
389 Some("N")
390 });
391 self.engines.push(Some(engine));
392 }
393
394 fn finish(&mut self) -> Result<RecordBatch> {
395 let columns: Vec<VectorRef> = vec![
396 Arc::new(self.catalog_names.finish()),
397 Arc::new(self.schema_names.finish()),
398 Arc::new(self.table_names.finish()),
399 Arc::new(self.table_types.finish()),
400 Arc::new(self.table_ids.finish()),
401 Arc::new(self.data_length.finish()),
402 Arc::new(self.max_data_length.finish()),
403 Arc::new(self.index_length.finish()),
404 Arc::new(self.max_index_length.finish()),
405 Arc::new(self.avg_row_length.finish()),
406 Arc::new(self.engines.finish()),
407 Arc::new(self.version.finish()),
408 Arc::new(self.row_format.finish()),
409 Arc::new(self.table_rows.finish()),
410 Arc::new(self.data_free.finish()),
411 Arc::new(self.auto_increment.finish()),
412 Arc::new(self.create_time.finish()),
413 Arc::new(self.update_time.finish()),
414 Arc::new(self.check_time.finish()),
415 Arc::new(self.table_collation.finish()),
416 Arc::new(self.checksum.finish()),
417 Arc::new(self.create_options.finish()),
418 Arc::new(self.table_comment.finish()),
419 Arc::new(self.temporary.finish()),
420 ];
421 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
422 }
423}
424
425impl DfPartitionStream for InformationSchemaTables {
426 fn schema(&self) -> &ArrowSchemaRef {
427 self.schema.arrow_schema()
428 }
429
430 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
431 let schema = self.schema.arrow_schema().clone();
432 let mut builder = self.builder();
433 Box::pin(DfRecordBatchStreamAdapter::new(
434 schema,
435 futures::stream::once(async move {
436 builder
437 .make_tables(None)
438 .await
439 .map(|x| x.into_df_record_batch())
440 .map_err(Into::into)
441 }),
442 ))
443 }
444}