1use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampSecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32 ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33 StringVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder, UInt64VectorBuilder,
34};
35use futures::{StreamExt, TryStreamExt};
36use partition::manager::PartitionInfo;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::CatalogManager;
42use crate::error::{
43 CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44 Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
48
49const TABLE_CATALOG: &str = "table_catalog";
50const TABLE_SCHEMA: &str = "table_schema";
51const TABLE_NAME: &str = "table_name";
52const PARTITION_NAME: &str = "partition_name";
53const PARTITION_EXPRESSION: &str = "partition_expression";
54const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
56const INIT_CAPACITY: usize = 42;
57
58#[derive(Debug)]
62pub(super) struct InformationSchemaPartitions {
63 schema: SchemaRef,
64 catalog_name: String,
65 catalog_manager: Weak<dyn CatalogManager>,
66}
67
68impl InformationSchemaPartitions {
69 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
70 Self {
71 schema: Self::schema(),
72 catalog_name,
73 catalog_manager,
74 }
75 }
76
77 pub(crate) fn schema() -> SchemaRef {
78 Arc::new(Schema::new(vec![
79 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
80 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
81 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
82 ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
83 ColumnSchema::new(
84 "subpartition_name",
85 ConcreteDataType::string_datatype(),
86 true,
87 ),
88 ColumnSchema::new(
89 "partition_ordinal_position",
90 ConcreteDataType::int64_datatype(),
91 true,
92 ),
93 ColumnSchema::new(
94 "subpartition_ordinal_position",
95 ConcreteDataType::int64_datatype(),
96 true,
97 ),
98 ColumnSchema::new(
99 "partition_method",
100 ConcreteDataType::string_datatype(),
101 true,
102 ),
103 ColumnSchema::new(
104 "subpartition_method",
105 ConcreteDataType::string_datatype(),
106 true,
107 ),
108 ColumnSchema::new(
109 PARTITION_EXPRESSION,
110 ConcreteDataType::string_datatype(),
111 true,
112 ),
113 ColumnSchema::new(
114 "subpartition_expression",
115 ConcreteDataType::string_datatype(),
116 true,
117 ),
118 ColumnSchema::new(
119 "partition_description",
120 ConcreteDataType::string_datatype(),
121 true,
122 ),
123 ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
124 ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
125 ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
126 ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
127 ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
128 ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
129 ColumnSchema::new(
130 "create_time",
131 ConcreteDataType::timestamp_second_datatype(),
132 true,
133 ),
134 ColumnSchema::new(
135 "update_time",
136 ConcreteDataType::timestamp_second_datatype(),
137 true,
138 ),
139 ColumnSchema::new(
140 "check_time",
141 ConcreteDataType::timestamp_second_datatype(),
142 true,
143 ),
144 ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
145 ColumnSchema::new(
146 "partition_comment",
147 ConcreteDataType::string_datatype(),
148 true,
149 ),
150 ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
151 ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
152 ColumnSchema::new(
153 GREPTIME_PARTITION_ID,
154 ConcreteDataType::uint64_datatype(),
155 true,
156 ),
157 ]))
158 }
159
160 fn builder(&self) -> InformationSchemaPartitionsBuilder {
161 InformationSchemaPartitionsBuilder::new(
162 self.schema.clone(),
163 self.catalog_name.clone(),
164 self.catalog_manager.clone(),
165 )
166 }
167}
168
169impl InformationTable for InformationSchemaPartitions {
170 fn table_id(&self) -> TableId {
171 INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
172 }
173
174 fn table_name(&self) -> &'static str {
175 PARTITIONS
176 }
177
178 fn schema(&self) -> SchemaRef {
179 self.schema.clone()
180 }
181
182 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
183 let schema = self.schema.arrow_schema().clone();
184 let mut builder = self.builder();
185 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
186 schema,
187 futures::stream::once(async move {
188 builder
189 .make_partitions(Some(request))
190 .await
191 .map(|x| x.into_df_record_batch())
192 .map_err(Into::into)
193 }),
194 ));
195 Ok(Box::pin(
196 RecordBatchStreamAdapter::try_new(stream)
197 .map_err(BoxedError::new)
198 .context(InternalSnafu)?,
199 ))
200 }
201}
202
203struct InformationSchemaPartitionsBuilder {
204 schema: SchemaRef,
205 catalog_name: String,
206 catalog_manager: Weak<dyn CatalogManager>,
207
208 catalog_names: StringVectorBuilder,
209 schema_names: StringVectorBuilder,
210 table_names: StringVectorBuilder,
211 partition_names: StringVectorBuilder,
212 partition_ordinal_positions: Int64VectorBuilder,
213 partition_expressions: StringVectorBuilder,
214 create_times: TimestampSecondVectorBuilder,
215 partition_ids: UInt64VectorBuilder,
216}
217
218impl InformationSchemaPartitionsBuilder {
219 fn new(
220 schema: SchemaRef,
221 catalog_name: String,
222 catalog_manager: Weak<dyn CatalogManager>,
223 ) -> Self {
224 Self {
225 schema,
226 catalog_name,
227 catalog_manager,
228 catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229 schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231 partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232 partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
233 partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234 create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235 partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
236 }
237 }
238
239 async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
241 let catalog_name = self.catalog_name.clone();
242 let catalog_manager = self
243 .catalog_manager
244 .upgrade()
245 .context(UpgradeWeakCatalogManagerRefSnafu)?;
246
247 let partition_manager = catalog_manager
248 .as_any()
249 .downcast_ref::<KvBackendCatalogManager>()
250 .map(|catalog_manager| catalog_manager.partition_manager())
251 .context(PartitionManagerNotFoundSnafu)?;
252
253 let predicates = Predicates::from_scan_request(&request);
254
255 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
256 let table_info_stream = catalog_manager
257 .tables(&catalog_name, &schema_name, None)
258 .try_filter_map(|t| async move {
259 let table_info = t.table_info();
260 if table_info.table_type == TableType::Temporary {
261 Ok(None)
262 } else {
263 Ok(Some(table_info))
264 }
265 });
266
267 const BATCH_SIZE: usize = 128;
268
269 let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
271
272 while let Some(table_infos) = table_info_chunks.next().await {
273 let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
274 let table_ids: Vec<TableId> =
275 table_infos.iter().map(|info| info.ident.table_id).collect();
276
277 let mut table_partitions = partition_manager
278 .batch_find_table_partitions(&table_ids)
279 .await
280 .context(FindPartitionsSnafu)?;
281
282 for table_info in table_infos {
283 let partitions = table_partitions
284 .remove(&table_info.ident.table_id)
285 .unwrap_or(vec![]);
286
287 self.add_partitions(
288 &predicates,
289 &table_info,
290 &catalog_name,
291 &schema_name,
292 &table_info.name,
293 &partitions,
294 );
295 }
296 }
297 }
298
299 self.finish()
300 }
301
302 #[allow(clippy::too_many_arguments)]
303 fn add_partitions(
304 &mut self,
305 predicates: &Predicates,
306 table_info: &TableInfo,
307 catalog_name: &str,
308 schema_name: &str,
309 table_name: &str,
310 partitions: &[PartitionInfo],
311 ) {
312 let row = [
313 (TABLE_CATALOG, &Value::from(catalog_name)),
314 (TABLE_SCHEMA, &Value::from(schema_name)),
315 (TABLE_NAME, &Value::from(table_name)),
316 ];
317
318 if !predicates.eval(&row) {
319 return;
320 }
321
322 for (index, partition) in partitions.iter().enumerate() {
323 let partition_name = format!("p{index}");
324
325 self.catalog_names.push(Some(catalog_name));
326 self.schema_names.push(Some(schema_name));
327 self.table_names.push(Some(table_name));
328 self.partition_names.push(Some(&partition_name));
329 self.partition_ordinal_positions
330 .push(Some((index + 1) as i64));
331 let expression = partition.partition_expr.as_ref().map(|e| e.to_string());
332 self.partition_expressions.push(expression.as_deref());
333 self.create_times.push(Some(TimestampSecond::from(
334 table_info.meta.created_on.timestamp(),
335 )));
336 self.partition_ids.push(Some(partition.id.as_u64()));
337 }
338 }
339
340 fn finish(&mut self) -> Result<RecordBatch> {
341 let rows_num = self.catalog_names.len();
342
343 let null_string_vector = Arc::new(ConstantVector::new(
344 Arc::new(StringVector::from(vec![None as Option<&str>])),
345 rows_num,
346 ));
347 let null_i64_vector = Arc::new(ConstantVector::new(
348 Arc::new(Int64Vector::from(vec![None])),
349 rows_num,
350 ));
351 let null_timestamp_second_vector = Arc::new(ConstantVector::new(
352 Arc::new(TimestampSecondVector::from(vec![None])),
353 rows_num,
354 ));
355 let partition_methods = Arc::new(ConstantVector::new(
356 Arc::new(StringVector::from(vec![Some("RANGE")])),
357 rows_num,
358 ));
359
360 let columns: Vec<VectorRef> = vec![
361 Arc::new(self.catalog_names.finish()),
362 Arc::new(self.schema_names.finish()),
363 Arc::new(self.table_names.finish()),
364 Arc::new(self.partition_names.finish()),
365 null_string_vector.clone(),
366 Arc::new(self.partition_ordinal_positions.finish()),
367 null_i64_vector.clone(),
368 partition_methods,
369 null_string_vector.clone(),
370 Arc::new(self.partition_expressions.finish()),
371 null_string_vector.clone(),
372 null_string_vector.clone(),
373 null_i64_vector.clone(),
375 null_i64_vector.clone(),
376 null_i64_vector.clone(),
377 null_i64_vector.clone(),
378 null_i64_vector.clone(),
379 null_i64_vector.clone(),
380 Arc::new(self.create_times.finish()),
381 null_timestamp_second_vector.clone(),
383 null_timestamp_second_vector,
384 null_i64_vector,
385 null_string_vector.clone(),
386 null_string_vector.clone(),
387 null_string_vector,
388 Arc::new(self.partition_ids.finish()),
389 ];
390 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
391 }
392}
393
394impl DfPartitionStream for InformationSchemaPartitions {
395 fn schema(&self) -> &ArrowSchemaRef {
396 self.schema.arrow_schema()
397 }
398
399 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
400 let schema = self.schema.arrow_schema().clone();
401 let mut builder = self.builder();
402 Box::pin(DfRecordBatchStreamAdapter::new(
403 schema,
404 futures::stream::once(async move {
405 builder
406 .make_partitions(None)
407 .await
408 .map(|x| x.into_df_record_batch())
409 .map_err(Into::into)
410 }),
411 ))
412 }
413}