1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use common_telemetry::error;
24use datafusion::execution::TaskContext;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::value::Value;
31use datatypes::vectors::{
32 StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
33};
34use futures::TryStreamExt;
35use snafu::{OptionExt, ResultExt};
36use store_api::storage::{RegionId, ScanRequest, TableId};
37use table::metadata::{TableInfo, TableType};
38
39use crate::CatalogManager;
40use crate::error::{
41 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
42};
43use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
44use crate::system_schema::utils;
45
46pub const TABLE_CATALOG: &str = "table_catalog";
47pub const TABLE_SCHEMA: &str = "table_schema";
48pub const TABLE_NAME: &str = "table_name";
49pub const TABLE_TYPE: &str = "table_type";
50pub const VERSION: &str = "version";
51pub const ROW_FORMAT: &str = "row_format";
52pub const TABLE_ROWS: &str = "table_rows";
53pub const DATA_LENGTH: &str = "data_length";
54pub const INDEX_LENGTH: &str = "index_length";
55pub const MAX_DATA_LENGTH: &str = "max_data_length";
56pub const AVG_ROW_LENGTH: &str = "avg_row_length";
57pub const DATA_FREE: &str = "data_free";
58pub const AUTO_INCREMENT: &str = "auto_increment";
59pub const CREATE_TIME: &str = "create_time";
60pub const UPDATE_TIME: &str = "update_time";
61pub const CHECK_TIME: &str = "check_time";
62pub const TABLE_COLLATION: &str = "table_collation";
63pub const CHECKSUM: &str = "checksum";
64pub const CREATE_OPTIONS: &str = "create_options";
65pub const TABLE_COMMENT: &str = "table_comment";
66pub const MAX_INDEX_LENGTH: &str = "max_index_length";
67pub const TEMPORARY: &str = "temporary";
68const TABLE_ID: &str = "table_id";
69pub const ENGINE: &str = "engine";
70const INIT_CAPACITY: usize = 42;
71
72#[derive(Debug)]
73pub(super) struct InformationSchemaTables {
74 schema: SchemaRef,
75 catalog_name: String,
76 catalog_manager: Weak<dyn CatalogManager>,
77}
78
79impl InformationSchemaTables {
80 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
81 Self {
82 schema: Self::schema(),
83 catalog_name,
84 catalog_manager,
85 }
86 }
87
88 pub(crate) fn schema() -> SchemaRef {
89 Arc::new(Schema::new(vec![
90 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
91 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
92 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
93 ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
94 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
95 ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
96 ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
97 ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
98 ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
99 ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
100 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
101 ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
102 ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
103 ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
104 ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
105 ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
106 ColumnSchema::new(
107 CREATE_TIME,
108 ConcreteDataType::timestamp_second_datatype(),
109 true,
110 ),
111 ColumnSchema::new(
112 UPDATE_TIME,
113 ConcreteDataType::timestamp_second_datatype(),
114 true,
115 ),
116 ColumnSchema::new(
117 CHECK_TIME,
118 ConcreteDataType::timestamp_second_datatype(),
119 true,
120 ),
121 ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
122 ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
123 ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
124 ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
126 ]))
127 }
128
129 fn builder(&self) -> InformationSchemaTablesBuilder {
130 InformationSchemaTablesBuilder::new(
131 self.schema.clone(),
132 self.catalog_name.clone(),
133 self.catalog_manager.clone(),
134 )
135 }
136}
137
138impl InformationTable for InformationSchemaTables {
139 fn table_id(&self) -> TableId {
140 INFORMATION_SCHEMA_TABLES_TABLE_ID
141 }
142
143 fn table_name(&self) -> &'static str {
144 TABLES
145 }
146
147 fn schema(&self) -> SchemaRef {
148 self.schema.clone()
149 }
150
151 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
152 let schema = self.schema.arrow_schema().clone();
153 let mut builder = self.builder();
154 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
155 schema,
156 futures::stream::once(async move {
157 builder
158 .make_tables(Some(request))
159 .await
160 .map(|x| x.into_df_record_batch())
161 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
162 }),
163 ));
164 Ok(Box::pin(
165 RecordBatchStreamAdapter::try_new(stream)
166 .map_err(BoxedError::new)
167 .context(InternalSnafu)?,
168 ))
169 }
170}
171
172struct InformationSchemaTablesBuilder {
176 schema: SchemaRef,
177 catalog_name: String,
178 catalog_manager: Weak<dyn CatalogManager>,
179
180 catalog_names: StringVectorBuilder,
181 schema_names: StringVectorBuilder,
182 table_names: StringVectorBuilder,
183 table_types: StringVectorBuilder,
184 table_ids: UInt32VectorBuilder,
185 version: UInt64VectorBuilder,
186 row_format: StringVectorBuilder,
187 table_rows: UInt64VectorBuilder,
188 data_length: UInt64VectorBuilder,
189 max_data_length: UInt64VectorBuilder,
190 index_length: UInt64VectorBuilder,
191 avg_row_length: UInt64VectorBuilder,
192 max_index_length: UInt64VectorBuilder,
193 data_free: UInt64VectorBuilder,
194 auto_increment: UInt64VectorBuilder,
195 create_time: TimestampSecondVectorBuilder,
196 update_time: TimestampSecondVectorBuilder,
197 check_time: TimestampSecondVectorBuilder,
198 table_collation: StringVectorBuilder,
199 checksum: UInt64VectorBuilder,
200 create_options: StringVectorBuilder,
201 table_comment: StringVectorBuilder,
202 engines: StringVectorBuilder,
203 temporary: StringVectorBuilder,
204}
205
206impl InformationSchemaTablesBuilder {
207 fn new(
208 schema: SchemaRef,
209 catalog_name: String,
210 catalog_manager: Weak<dyn CatalogManager>,
211 ) -> Self {
212 Self {
213 schema,
214 catalog_name,
215 catalog_manager,
216 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
217 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219 table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
221 data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
222 max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
223 index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224 avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
226 version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227 row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228 table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
229 max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230 data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231 auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232 create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
233 update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
234 check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235 table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
236 checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
237 create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
238 table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239 temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240 }
241 }
242
243 async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
245 let catalog_name = self.catalog_name.clone();
246 let catalog_manager = self
247 .catalog_manager
248 .upgrade()
249 .context(UpgradeWeakCatalogManagerRefSnafu)?;
250 let predicates = Predicates::from_scan_request(&request);
251
252 let information_extension = utils::information_extension(&self.catalog_manager)?;
253
254 let region_stats = {
258 let mut x = information_extension
259 .region_stats()
260 .await
261 .unwrap_or_else(|e| {
262 error!(e; "Failed to find region stats in information_schema, fallback to all empty");
263 vec![]
264 });
265 x.sort_unstable_by_key(|x| x.id);
266 x
267 };
268
269 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
270 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
271
272 while let Some(table) = stream.try_next().await? {
273 let table_info = table.table_info();
274
275 let table_region_stats =
277 if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
278 table_info
279 .meta
280 .region_numbers
281 .iter()
282 .map(|n| RegionId::new(table_info.ident.table_id, *n))
283 .flat_map(|region_id| {
284 region_stats
285 .binary_search_by_key(®ion_id, |x| x.id)
286 .map(|i| ®ion_stats[i])
287 })
288 .collect::<Vec<_>>()
289 } else {
290 vec![]
291 };
292
293 self.add_table(
294 &predicates,
295 &catalog_name,
296 &schema_name,
297 table_info,
298 table.table_type(),
299 &table_region_stats,
300 );
301 }
302 }
303
304 self.finish()
305 }
306
307 #[allow(clippy::too_many_arguments)]
308 fn add_table(
309 &mut self,
310 predicates: &Predicates,
311 catalog_name: &str,
312 schema_name: &str,
313 table_info: Arc<TableInfo>,
314 table_type: TableType,
315 region_stats: &[&RegionStat],
316 ) {
317 let table_name = table_info.name.as_ref();
318 let table_id = table_info.table_id();
319 let engine = table_info.meta.engine.as_ref();
320
321 let table_type_text = match table_type {
322 TableType::Base => "BASE TABLE",
323 TableType::View => "VIEW",
324 TableType::Temporary => "LOCAL TEMPORARY",
325 };
326
327 let row = [
328 (TABLE_CATALOG, &Value::from(catalog_name)),
329 (TABLE_ID, &Value::from(table_id)),
330 (TABLE_SCHEMA, &Value::from(schema_name)),
331 (ENGINE, &Value::from(engine)),
332 (TABLE_NAME, &Value::from(table_name)),
333 (TABLE_TYPE, &Value::from(table_type_text)),
334 ];
335
336 if !predicates.eval(&row) {
337 return;
338 }
339
340 self.catalog_names.push(Some(catalog_name));
341 self.schema_names.push(Some(schema_name));
342 self.table_names.push(Some(table_name));
343 self.table_types.push(Some(table_type_text));
344 self.table_ids.push(Some(table_id));
345
346 let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
347 let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
348 let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
349
350 let avg_row_length = if table_rows > 0 {
352 let total_data_length = data_length
353 + region_stats
354 .iter()
355 .map(|stat| stat.memtable_size)
356 .sum::<u64>();
357
358 total_data_length / table_rows
359 } else {
360 0
361 };
362
363 self.data_length.push(Some(data_length));
364 self.index_length.push(Some(index_length));
365 self.table_rows.push(Some(table_rows));
366 self.avg_row_length.push(Some(avg_row_length));
367
368 self.max_data_length.push(Some(0));
370 self.checksum.push(Some(0));
371 self.max_index_length.push(Some(0));
372 self.data_free.push(Some(0));
373 self.auto_increment.push(Some(0));
374 self.row_format.push(Some("Fixed"));
375 self.table_collation.push(Some("utf8_bin"));
376 self.update_time
377 .push(Some(table_info.meta.updated_on.timestamp().into()));
378 self.check_time.push(None);
379 self.version.push(Some(11));
381 self.table_comment.push(table_info.desc.as_deref());
382 self.create_options
383 .push(Some(table_info.meta.options.to_string().as_ref()));
384 self.create_time
385 .push(Some(table_info.meta.created_on.timestamp().into()));
386
387 self.temporary
388 .push(if matches!(table_type, TableType::Temporary) {
389 Some("Y")
390 } else {
391 Some("N")
392 });
393 self.engines.push(Some(engine));
394 }
395
396 fn finish(&mut self) -> Result<RecordBatch> {
397 let columns: Vec<VectorRef> = vec![
398 Arc::new(self.catalog_names.finish()),
399 Arc::new(self.schema_names.finish()),
400 Arc::new(self.table_names.finish()),
401 Arc::new(self.table_types.finish()),
402 Arc::new(self.table_ids.finish()),
403 Arc::new(self.data_length.finish()),
404 Arc::new(self.max_data_length.finish()),
405 Arc::new(self.index_length.finish()),
406 Arc::new(self.max_index_length.finish()),
407 Arc::new(self.avg_row_length.finish()),
408 Arc::new(self.engines.finish()),
409 Arc::new(self.version.finish()),
410 Arc::new(self.row_format.finish()),
411 Arc::new(self.table_rows.finish()),
412 Arc::new(self.data_free.finish()),
413 Arc::new(self.auto_increment.finish()),
414 Arc::new(self.create_time.finish()),
415 Arc::new(self.update_time.finish()),
416 Arc::new(self.check_time.finish()),
417 Arc::new(self.table_collation.finish()),
418 Arc::new(self.checksum.finish()),
419 Arc::new(self.create_options.finish()),
420 Arc::new(self.table_comment.finish()),
421 Arc::new(self.temporary.finish()),
422 ];
423 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
424 }
425}
426
427impl DfPartitionStream for InformationSchemaTables {
428 fn schema(&self) -> &ArrowSchemaRef {
429 self.schema.arrow_schema()
430 }
431
432 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
433 let schema = self.schema.arrow_schema().clone();
434 let mut builder = self.builder();
435 Box::pin(DfRecordBatchStreamAdapter::new(
436 schema,
437 futures::stream::once(async move {
438 builder
439 .make_tables(None)
440 .await
441 .map(|x| x.into_df_record_batch())
442 .map_err(Into::into)
443 }),
444 ))
445 }
446}