1use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampSecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32 ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33 StringVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder, UInt64VectorBuilder,
34};
35use futures::{StreamExt, TryStreamExt};
36use partition::manager::PartitionInfo;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::CatalogManager;
42use crate::error::{
43 CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44 Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
48
49const TABLE_CATALOG: &str = "table_catalog";
50const TABLE_SCHEMA: &str = "table_schema";
51const TABLE_NAME: &str = "table_name";
52const PARTITION_NAME: &str = "partition_name";
53const PARTITION_EXPRESSION: &str = "partition_expression";
54const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
56const INIT_CAPACITY: usize = 42;
57
58#[derive(Debug)]
62pub(super) struct InformationSchemaPartitions {
63 schema: SchemaRef,
64 catalog_name: String,
65 catalog_manager: Weak<dyn CatalogManager>,
66}
67
68impl InformationSchemaPartitions {
69 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
70 Self {
71 schema: Self::schema(),
72 catalog_name,
73 catalog_manager,
74 }
75 }
76
77 pub(crate) fn schema() -> SchemaRef {
78 Arc::new(Schema::new(vec![
79 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
80 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
81 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
82 ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
83 ColumnSchema::new(
84 "subpartition_name",
85 ConcreteDataType::string_datatype(),
86 true,
87 ),
88 ColumnSchema::new(
89 "partition_ordinal_position",
90 ConcreteDataType::int64_datatype(),
91 true,
92 ),
93 ColumnSchema::new(
94 "subpartition_ordinal_position",
95 ConcreteDataType::int64_datatype(),
96 true,
97 ),
98 ColumnSchema::new(
99 "partition_method",
100 ConcreteDataType::string_datatype(),
101 true,
102 ),
103 ColumnSchema::new(
104 "subpartition_method",
105 ConcreteDataType::string_datatype(),
106 true,
107 ),
108 ColumnSchema::new(
109 PARTITION_EXPRESSION,
110 ConcreteDataType::string_datatype(),
111 true,
112 ),
113 ColumnSchema::new(
114 "subpartition_expression",
115 ConcreteDataType::string_datatype(),
116 true,
117 ),
118 ColumnSchema::new(
119 "partition_description",
120 ConcreteDataType::string_datatype(),
121 true,
122 ),
123 ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
124 ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
125 ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
126 ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
127 ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
128 ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
129 ColumnSchema::new(
130 "create_time",
131 ConcreteDataType::timestamp_second_datatype(),
132 true,
133 ),
134 ColumnSchema::new(
135 "update_time",
136 ConcreteDataType::timestamp_second_datatype(),
137 true,
138 ),
139 ColumnSchema::new(
140 "check_time",
141 ConcreteDataType::timestamp_second_datatype(),
142 true,
143 ),
144 ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
145 ColumnSchema::new(
146 "partition_comment",
147 ConcreteDataType::string_datatype(),
148 true,
149 ),
150 ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
151 ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
152 ColumnSchema::new(
153 GREPTIME_PARTITION_ID,
154 ConcreteDataType::uint64_datatype(),
155 true,
156 ),
157 ]))
158 }
159
160 fn builder(&self) -> InformationSchemaPartitionsBuilder {
161 InformationSchemaPartitionsBuilder::new(
162 self.schema.clone(),
163 self.catalog_name.clone(),
164 self.catalog_manager.clone(),
165 )
166 }
167}
168
169impl InformationTable for InformationSchemaPartitions {
170 fn table_id(&self) -> TableId {
171 INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
172 }
173
174 fn table_name(&self) -> &'static str {
175 PARTITIONS
176 }
177
178 fn schema(&self) -> SchemaRef {
179 self.schema.clone()
180 }
181
182 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
183 let schema = self.schema.arrow_schema().clone();
184 let mut builder = self.builder();
185 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
186 schema,
187 futures::stream::once(async move {
188 builder
189 .make_partitions(Some(request))
190 .await
191 .map(|x| x.into_df_record_batch())
192 .map_err(Into::into)
193 }),
194 ));
195 Ok(Box::pin(
196 RecordBatchStreamAdapter::try_new(stream)
197 .map_err(BoxedError::new)
198 .context(InternalSnafu)?,
199 ))
200 }
201}
202
203struct InformationSchemaPartitionsBuilder {
204 schema: SchemaRef,
205 catalog_name: String,
206 catalog_manager: Weak<dyn CatalogManager>,
207
208 catalog_names: StringVectorBuilder,
209 schema_names: StringVectorBuilder,
210 table_names: StringVectorBuilder,
211 partition_names: StringVectorBuilder,
212 partition_ordinal_positions: Int64VectorBuilder,
213 partition_expressions: StringVectorBuilder,
214 partition_descriptions: StringVectorBuilder,
215 create_times: TimestampSecondVectorBuilder,
216 partition_ids: UInt64VectorBuilder,
217}
218
219impl InformationSchemaPartitionsBuilder {
220 fn new(
221 schema: SchemaRef,
222 catalog_name: String,
223 catalog_manager: Weak<dyn CatalogManager>,
224 ) -> Self {
225 Self {
226 schema,
227 catalog_name,
228 catalog_manager,
229 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232 partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233 partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
234 partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235 partition_descriptions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
236 create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
237 partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
238 }
239 }
240
241 async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
243 let catalog_name = self.catalog_name.clone();
244 let catalog_manager = self
245 .catalog_manager
246 .upgrade()
247 .context(UpgradeWeakCatalogManagerRefSnafu)?;
248
249 let partition_manager = catalog_manager
250 .as_any()
251 .downcast_ref::<KvBackendCatalogManager>()
252 .map(|catalog_manager| catalog_manager.partition_manager())
253 .context(PartitionManagerNotFoundSnafu)?;
254
255 let predicates = Predicates::from_scan_request(&request);
256
257 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
258 let table_info_stream = catalog_manager
259 .tables(&catalog_name, &schema_name, None)
260 .try_filter_map(|t| async move {
261 let table_info = t.table_info();
262 if table_info.table_type == TableType::Temporary {
263 Ok(None)
264 } else {
265 Ok(Some(table_info))
266 }
267 });
268
269 const BATCH_SIZE: usize = 128;
270
271 let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
273
274 while let Some(table_infos) = table_info_chunks.next().await {
275 let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
276 let table_ids: Vec<TableId> =
277 table_infos.iter().map(|info| info.ident.table_id).collect();
278
279 let mut table_partitions = partition_manager
280 .batch_find_table_partitions(&table_ids)
281 .await
282 .context(FindPartitionsSnafu)?;
283
284 for table_info in table_infos {
285 let partitions = table_partitions
286 .remove(&table_info.ident.table_id)
287 .unwrap_or(vec![]);
288
289 self.add_partitions(
290 &predicates,
291 &table_info,
292 &catalog_name,
293 &schema_name,
294 &table_info.name,
295 &partitions,
296 );
297 }
298 }
299 }
300
301 self.finish()
302 }
303
304 #[allow(clippy::too_many_arguments)]
305 fn add_partitions(
306 &mut self,
307 predicates: &Predicates,
308 table_info: &TableInfo,
309 catalog_name: &str,
310 schema_name: &str,
311 table_name: &str,
312 partitions: &[PartitionInfo],
313 ) {
314 let row = [
315 (TABLE_CATALOG, &Value::from(catalog_name)),
316 (TABLE_SCHEMA, &Value::from(schema_name)),
317 (TABLE_NAME, &Value::from(table_name)),
318 ];
319
320 if !predicates.eval(&row) {
321 return;
322 }
323
324 let partition_columns: String = table_info
327 .meta
328 .partition_column_names()
329 .cloned()
330 .collect::<Vec<_>>()
331 .join(", ");
332
333 let partition_expr_str = if partition_columns.is_empty() {
334 None
335 } else {
336 Some(partition_columns)
337 };
338
339 for (index, partition) in partitions.iter().enumerate() {
340 let partition_name = format!("p{index}");
341
342 self.catalog_names.push(Some(catalog_name));
343 self.schema_names.push(Some(schema_name));
344 self.table_names.push(Some(table_name));
345 self.partition_names.push(Some(&partition_name));
346 self.partition_ordinal_positions
347 .push(Some((index + 1) as i64));
348 self.partition_expressions
350 .push(partition_expr_str.as_deref());
351 let description = partition.partition_expr.as_ref().map(|e| e.to_string());
353 self.partition_descriptions.push(description.as_deref());
354 self.create_times.push(Some(TimestampSecond::from(
355 table_info.meta.created_on.timestamp(),
356 )));
357 self.partition_ids.push(Some(partition.id.as_u64()));
358 }
359 }
360
361 fn finish(&mut self) -> Result<RecordBatch> {
362 let rows_num = self.catalog_names.len();
363
364 let null_string_vector = Arc::new(ConstantVector::new(
365 Arc::new(StringVector::from(vec![None as Option<&str>])),
366 rows_num,
367 ));
368 let null_i64_vector = Arc::new(ConstantVector::new(
369 Arc::new(Int64Vector::from(vec![None])),
370 rows_num,
371 ));
372 let null_timestamp_second_vector = Arc::new(ConstantVector::new(
373 Arc::new(TimestampSecondVector::from(vec![None])),
374 rows_num,
375 ));
376 let partition_methods = Arc::new(ConstantVector::new(
377 Arc::new(StringVector::from(vec![Some("RANGE")])),
378 rows_num,
379 ));
380
381 let columns: Vec<VectorRef> = vec![
382 Arc::new(self.catalog_names.finish()),
383 Arc::new(self.schema_names.finish()),
384 Arc::new(self.table_names.finish()),
385 Arc::new(self.partition_names.finish()),
386 null_string_vector.clone(),
387 Arc::new(self.partition_ordinal_positions.finish()),
388 null_i64_vector.clone(),
389 partition_methods,
390 null_string_vector.clone(),
391 Arc::new(self.partition_expressions.finish()),
392 null_string_vector.clone(),
393 Arc::new(self.partition_descriptions.finish()),
394 null_i64_vector.clone(),
396 null_i64_vector.clone(),
397 null_i64_vector.clone(),
398 null_i64_vector.clone(),
399 null_i64_vector.clone(),
400 null_i64_vector.clone(),
401 Arc::new(self.create_times.finish()),
402 null_timestamp_second_vector.clone(),
404 null_timestamp_second_vector,
405 null_i64_vector,
406 null_string_vector.clone(),
407 null_string_vector.clone(),
408 null_string_vector,
409 Arc::new(self.partition_ids.finish()),
410 ];
411 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
412 }
413}
414
415impl DfPartitionStream for InformationSchemaPartitions {
416 fn schema(&self) -> &ArrowSchemaRef {
417 self.schema.arrow_schema()
418 }
419
420 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
421 let schema = self.schema.arrow_schema().clone();
422 let mut builder = self.builder();
423 Box::pin(DfRecordBatchStreamAdapter::new(
424 schema,
425 futures::stream::once(async move {
426 builder
427 .make_partitions(None)
428 .await
429 .map(|x| x.into_df_record_batch())
430 .map_err(Into::into)
431 }),
432 ))
433 }
434}