1use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampMicrosecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32 ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33 StringVectorBuilder, TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder,
34 UInt64VectorBuilder,
35};
36use futures::{StreamExt, TryStreamExt};
37use partition::manager::PartitionInfo;
38use snafu::{OptionExt, ResultExt};
39use store_api::storage::{ScanRequest, TableId};
40use table::metadata::{TableInfo, TableType};
41
42use crate::error::{
43 CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44 Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, Predicates, PARTITIONS};
48use crate::CatalogManager;
49
50const TABLE_CATALOG: &str = "table_catalog";
51const TABLE_SCHEMA: &str = "table_schema";
52const TABLE_NAME: &str = "table_name";
53const PARTITION_NAME: &str = "partition_name";
54const PARTITION_EXPRESSION: &str = "partition_expression";
55const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
57const INIT_CAPACITY: usize = 42;
58
59#[derive(Debug)]
63pub(super) struct InformationSchemaPartitions {
64 schema: SchemaRef,
65 catalog_name: String,
66 catalog_manager: Weak<dyn CatalogManager>,
67}
68
69impl InformationSchemaPartitions {
70 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
71 Self {
72 schema: Self::schema(),
73 catalog_name,
74 catalog_manager,
75 }
76 }
77
78 pub(crate) fn schema() -> SchemaRef {
79 Arc::new(Schema::new(vec![
80 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
81 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
82 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
83 ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
84 ColumnSchema::new(
85 "subpartition_name",
86 ConcreteDataType::string_datatype(),
87 true,
88 ),
89 ColumnSchema::new(
90 "partition_ordinal_position",
91 ConcreteDataType::int64_datatype(),
92 true,
93 ),
94 ColumnSchema::new(
95 "subpartition_ordinal_position",
96 ConcreteDataType::int64_datatype(),
97 true,
98 ),
99 ColumnSchema::new(
100 "partition_method",
101 ConcreteDataType::string_datatype(),
102 true,
103 ),
104 ColumnSchema::new(
105 "subpartition_method",
106 ConcreteDataType::string_datatype(),
107 true,
108 ),
109 ColumnSchema::new(
110 PARTITION_EXPRESSION,
111 ConcreteDataType::string_datatype(),
112 true,
113 ),
114 ColumnSchema::new(
115 "subpartition_expression",
116 ConcreteDataType::string_datatype(),
117 true,
118 ),
119 ColumnSchema::new(
120 "partition_description",
121 ConcreteDataType::string_datatype(),
122 true,
123 ),
124 ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
125 ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
126 ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
127 ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
128 ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
129 ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
130 ColumnSchema::new(
131 "create_time",
132 ConcreteDataType::timestamp_microsecond_datatype(),
133 true,
134 ),
135 ColumnSchema::new(
136 "update_time",
137 ConcreteDataType::timestamp_microsecond_datatype(),
138 true,
139 ),
140 ColumnSchema::new(
141 "check_time",
142 ConcreteDataType::timestamp_microsecond_datatype(),
143 true,
144 ),
145 ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
146 ColumnSchema::new(
147 "partition_comment",
148 ConcreteDataType::string_datatype(),
149 true,
150 ),
151 ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
152 ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
153 ColumnSchema::new(
154 GREPTIME_PARTITION_ID,
155 ConcreteDataType::uint64_datatype(),
156 true,
157 ),
158 ]))
159 }
160
161 fn builder(&self) -> InformationSchemaPartitionsBuilder {
162 InformationSchemaPartitionsBuilder::new(
163 self.schema.clone(),
164 self.catalog_name.clone(),
165 self.catalog_manager.clone(),
166 )
167 }
168}
169
170impl InformationTable for InformationSchemaPartitions {
171 fn table_id(&self) -> TableId {
172 INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
173 }
174
175 fn table_name(&self) -> &'static str {
176 PARTITIONS
177 }
178
179 fn schema(&self) -> SchemaRef {
180 self.schema.clone()
181 }
182
183 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
184 let schema = self.schema.arrow_schema().clone();
185 let mut builder = self.builder();
186 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
187 schema,
188 futures::stream::once(async move {
189 builder
190 .make_partitions(Some(request))
191 .await
192 .map(|x| x.into_df_record_batch())
193 .map_err(Into::into)
194 }),
195 ));
196 Ok(Box::pin(
197 RecordBatchStreamAdapter::try_new(stream)
198 .map_err(BoxedError::new)
199 .context(InternalSnafu)?,
200 ))
201 }
202}
203
204struct InformationSchemaPartitionsBuilder {
205 schema: SchemaRef,
206 catalog_name: String,
207 catalog_manager: Weak<dyn CatalogManager>,
208
209 catalog_names: StringVectorBuilder,
210 schema_names: StringVectorBuilder,
211 table_names: StringVectorBuilder,
212 partition_names: StringVectorBuilder,
213 partition_ordinal_positions: Int64VectorBuilder,
214 partition_expressions: StringVectorBuilder,
215 create_times: TimestampMicrosecondVectorBuilder,
216 partition_ids: UInt64VectorBuilder,
217}
218
219impl InformationSchemaPartitionsBuilder {
220 fn new(
221 schema: SchemaRef,
222 catalog_name: String,
223 catalog_manager: Weak<dyn CatalogManager>,
224 ) -> Self {
225 Self {
226 schema,
227 catalog_name,
228 catalog_manager,
229 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232 partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233 partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
234 partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235 create_times: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
236 partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
237 }
238 }
239
240 async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
242 let catalog_name = self.catalog_name.clone();
243 let catalog_manager = self
244 .catalog_manager
245 .upgrade()
246 .context(UpgradeWeakCatalogManagerRefSnafu)?;
247
248 let partition_manager = catalog_manager
249 .as_any()
250 .downcast_ref::<KvBackendCatalogManager>()
251 .map(|catalog_manager| catalog_manager.partition_manager())
252 .context(PartitionManagerNotFoundSnafu)?;
253
254 let predicates = Predicates::from_scan_request(&request);
255
256 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
257 let table_info_stream = catalog_manager
258 .tables(&catalog_name, &schema_name, None)
259 .try_filter_map(|t| async move {
260 let table_info = t.table_info();
261 if table_info.table_type == TableType::Temporary {
262 Ok(None)
263 } else {
264 Ok(Some(table_info))
265 }
266 });
267
268 const BATCH_SIZE: usize = 128;
269
270 let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
272
273 while let Some(table_infos) = table_info_chunks.next().await {
274 let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
275 let table_ids: Vec<TableId> =
276 table_infos.iter().map(|info| info.ident.table_id).collect();
277
278 let mut table_partitions = partition_manager
279 .batch_find_table_partitions(&table_ids)
280 .await
281 .context(FindPartitionsSnafu)?;
282
283 for table_info in table_infos {
284 let partitions = table_partitions
285 .remove(&table_info.ident.table_id)
286 .unwrap_or(vec![]);
287
288 self.add_partitions(
289 &predicates,
290 &table_info,
291 &catalog_name,
292 &schema_name,
293 &table_info.name,
294 &partitions,
295 );
296 }
297 }
298 }
299
300 self.finish()
301 }
302
303 #[allow(clippy::too_many_arguments)]
304 fn add_partitions(
305 &mut self,
306 predicates: &Predicates,
307 table_info: &TableInfo,
308 catalog_name: &str,
309 schema_name: &str,
310 table_name: &str,
311 partitions: &[PartitionInfo],
312 ) {
313 let row = [
314 (TABLE_CATALOG, &Value::from(catalog_name)),
315 (TABLE_SCHEMA, &Value::from(schema_name)),
316 (TABLE_NAME, &Value::from(table_name)),
317 ];
318
319 if !predicates.eval(&row) {
320 return;
321 }
322
323 for (index, partition) in partitions.iter().enumerate() {
324 let partition_name = format!("p{index}");
325
326 self.catalog_names.push(Some(catalog_name));
327 self.schema_names.push(Some(schema_name));
328 self.table_names.push(Some(table_name));
329 self.partition_names.push(Some(&partition_name));
330 self.partition_ordinal_positions
331 .push(Some((index + 1) as i64));
332 let expressions = if partition.partition.partition_columns().is_empty() {
333 None
334 } else {
335 Some(partition.partition.to_string())
336 };
337
338 self.partition_expressions.push(expressions.as_deref());
339 self.create_times.push(Some(TimestampMicrosecond::from(
340 table_info.meta.created_on.timestamp_millis(),
341 )));
342 self.partition_ids.push(Some(partition.id.as_u64()));
343 }
344 }
345
346 fn finish(&mut self) -> Result<RecordBatch> {
347 let rows_num = self.catalog_names.len();
348
349 let null_string_vector = Arc::new(ConstantVector::new(
350 Arc::new(StringVector::from(vec![None as Option<&str>])),
351 rows_num,
352 ));
353 let null_i64_vector = Arc::new(ConstantVector::new(
354 Arc::new(Int64Vector::from(vec![None])),
355 rows_num,
356 ));
357 let null_timestampmicrosecond_vector = Arc::new(ConstantVector::new(
358 Arc::new(TimestampMicrosecondVector::from(vec![None])),
359 rows_num,
360 ));
361 let partition_methods = Arc::new(ConstantVector::new(
362 Arc::new(StringVector::from(vec![Some("RANGE")])),
363 rows_num,
364 ));
365
366 let columns: Vec<VectorRef> = vec![
367 Arc::new(self.catalog_names.finish()),
368 Arc::new(self.schema_names.finish()),
369 Arc::new(self.table_names.finish()),
370 Arc::new(self.partition_names.finish()),
371 null_string_vector.clone(),
372 Arc::new(self.partition_ordinal_positions.finish()),
373 null_i64_vector.clone(),
374 partition_methods,
375 null_string_vector.clone(),
376 Arc::new(self.partition_expressions.finish()),
377 null_string_vector.clone(),
378 null_string_vector.clone(),
379 null_i64_vector.clone(),
381 null_i64_vector.clone(),
382 null_i64_vector.clone(),
383 null_i64_vector.clone(),
384 null_i64_vector.clone(),
385 null_i64_vector.clone(),
386 Arc::new(self.create_times.finish()),
387 null_timestampmicrosecond_vector.clone(),
389 null_timestampmicrosecond_vector,
390 null_i64_vector,
391 null_string_vector.clone(),
392 null_string_vector.clone(),
393 null_string_vector,
394 Arc::new(self.partition_ids.finish()),
395 ];
396 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
397 }
398}
399
400impl DfPartitionStream for InformationSchemaPartitions {
401 fn schema(&self) -> &ArrowSchemaRef {
402 self.schema.arrow_schema()
403 }
404
405 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
406 let schema = self.schema.arrow_schema().clone();
407 let mut builder = self.builder();
408 Box::pin(DfRecordBatchStreamAdapter::new(
409 schema,
410 futures::stream::once(async move {
411 builder
412 .make_partitions(None)
413 .await
414 .map(|x| x.into_df_record_batch())
415 .map_err(Into::into)
416 }),
417 ))
418 }
419}