1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{ensure, OptionExt, ResultExt};
46use store_api::metadata::{RegionMetadata, RegionMetadataRef};
47use store_api::storage::{ColumnId, SequenceNumber};
48
49use crate::error::{
50 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
51 NewRecordBatchSnafu, Result,
52};
53use crate::sst::parquet::format::{
54 FormatProjection, PrimaryKeyArray, ReadFormat, StatValues, INTERNAL_COLUMN_NUM,
55};
56use crate::sst::{tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, FlatSchemaOptions};
57
58#[allow(dead_code)]
60pub(crate) struct FlatWriteFormat {
61 metadata: RegionMetadataRef,
62 arrow_schema: SchemaRef,
64 override_sequence: Option<SequenceNumber>,
65}
66
67impl FlatWriteFormat {
68 #[allow(dead_code)]
70 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
71 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
72 FlatWriteFormat {
73 metadata,
74 arrow_schema,
75 override_sequence: None,
76 }
77 }
78
79 #[allow(dead_code)]
81 pub(crate) fn with_override_sequence(
82 mut self,
83 override_sequence: Option<SequenceNumber>,
84 ) -> Self {
85 self.override_sequence = override_sequence;
86 self
87 }
88
89 #[allow(dead_code)]
91 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
92 &self.arrow_schema
93 }
94
95 #[allow(dead_code)]
97 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
98 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
99
100 let Some(override_sequence) = self.override_sequence else {
101 return Ok(batch.clone());
102 };
103
104 let mut columns = batch.columns().to_vec();
105 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
106 columns[sequence_column_index(batch.num_columns())] = sequence_array;
107
108 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
109 }
110}
111
112pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
114 num_columns - 2
115}
116
117pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
119 num_columns - 4
120}
121
122pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
124 num_columns - 3
125}
126
127pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
129 num_columns - 1
130}
131
132pub struct FlatReadFormat {
137 metadata: RegionMetadataRef,
139 arrow_schema: SchemaRef,
141 format_projection: FormatProjection,
143 column_id_to_sst_index: HashMap<ColumnId, usize>,
145 override_sequence: Option<SequenceNumber>,
147 convert_format: Option<FlatConvertFormat>,
149}
150
151impl FlatReadFormat {
152 pub fn new(
154 metadata: RegionMetadataRef,
155 column_ids: impl Iterator<Item = ColumnId>,
156 convert_to_flat: bool,
157 ) -> FlatReadFormat {
158 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
159
160 let id_to_index = sst_column_id_indices(&metadata);
162
163 let format_projection = FormatProjection::compute_format_projection(
164 &id_to_index,
165 arrow_schema.fields.len(),
166 column_ids,
167 );
168
169 let convert_format = if convert_to_flat {
170 let codec = build_primary_key_codec(&metadata);
171 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
172 } else {
173 None
174 };
175
176 FlatReadFormat {
177 metadata,
178 arrow_schema,
179 format_projection,
180 column_id_to_sst_index: id_to_index,
181 override_sequence: None,
182 convert_format,
183 }
184 }
185
186 #[allow(dead_code)]
188 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
189 self.override_sequence = sequence;
190 }
191
192 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
194 self.format_projection
195 .column_id_to_projected_index
196 .get(&column_id)
197 .copied()
198 }
199
200 pub fn min_values(
202 &self,
203 row_groups: &[impl Borrow<RowGroupMetaData>],
204 column_id: ColumnId,
205 ) -> StatValues {
206 self.get_stat_values(row_groups, column_id, true)
207 }
208
209 pub fn max_values(
211 &self,
212 row_groups: &[impl Borrow<RowGroupMetaData>],
213 column_id: ColumnId,
214 ) -> StatValues {
215 self.get_stat_values(row_groups, column_id, false)
216 }
217
218 pub fn null_counts(
220 &self,
221 row_groups: &[impl Borrow<RowGroupMetaData>],
222 column_id: ColumnId,
223 ) -> StatValues {
224 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
225 return StatValues::NoColumn;
227 };
228
229 let stats = ReadFormat::column_null_counts(row_groups, *index);
230 StatValues::from_stats_opt(stats)
231 }
232
233 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
238 &self.arrow_schema
239 }
240
241 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
243 &self.metadata
244 }
245
246 pub(crate) fn projection_indices(&self) -> &[usize] {
248 &self.format_projection.projection_indices
249 }
250
251 pub(crate) fn format_projection(&self) -> &FormatProjection {
253 &self.format_projection
254 }
255
256 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
258 self.override_sequence
259 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
260 }
261
262 #[allow(dead_code)]
267 pub(crate) fn convert_batch(
268 &self,
269 record_batch: RecordBatch,
270 override_sequence_array: Option<&ArrayRef>,
271 ) -> Result<RecordBatch> {
272 let batch = if let Some(ref convert_format) = self.convert_format {
274 convert_format.convert(record_batch)?
275 } else {
276 record_batch
277 };
278
279 let Some(override_array) = override_sequence_array else {
281 return Ok(batch);
282 };
283
284 let mut columns = batch.columns().to_vec();
285 let sequence_column_idx = sequence_column_index(batch.num_columns());
286
287 let sequence_array = if override_array.len() > batch.num_rows() {
289 override_array.slice(0, batch.num_rows())
290 } else {
291 override_array.clone()
292 };
293
294 columns[sequence_column_idx] = sequence_array;
295
296 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
297 }
298
299 #[allow(dead_code)]
305 pub(crate) fn need_convert_to_flat(
306 file_path: &str,
307 num_columns: usize,
308 metadata: &RegionMetadata,
309 ) -> Result<bool> {
310 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
313
314 if expected_columns == num_columns {
315 Ok(false)
317 } else {
318 ensure!(
319 expected_columns >= num_columns,
320 InvalidParquetSnafu {
321 file: file_path,
322 reason: format!(
323 "Expected columns {} should be >= actual columns {}",
324 expected_columns, num_columns
325 )
326 }
327 );
328
329 let column_diff = expected_columns - num_columns;
331
332 ensure!(
333 column_diff == metadata.primary_key.len(),
334 InvalidParquetSnafu {
335 file: file_path,
336 reason: format!(
337 "Column number difference {} does not match primary key count {}",
338 column_diff,
339 metadata.primary_key.len()
340 )
341 }
342 );
343
344 Ok(true)
345 }
346 }
347
348 fn get_stat_values(
349 &self,
350 row_groups: &[impl Borrow<RowGroupMetaData>],
351 column_id: ColumnId,
352 is_min: bool,
353 ) -> StatValues {
354 let Some(column) = self.metadata.column_by_id(column_id) else {
355 return StatValues::NoColumn;
357 };
358 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
360
361 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
362 StatValues::from_stats_opt(stats)
363 }
364}
365
366pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
370 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
371 let mut column_index = 0;
372 for pk_id in &metadata.primary_key {
374 id_to_index.insert(*pk_id, column_index);
375 column_index += 1;
376 }
377 for column in &metadata.column_metadatas {
379 if column.semantic_type == SemanticType::Field {
380 id_to_index.insert(column.column_id, column_index);
381 column_index += 1;
382 }
383 }
384 id_to_index.insert(metadata.time_index_column().column_id, column_index);
386
387 id_to_index
388}
389
390pub(crate) struct FlatConvertFormat {
393 metadata: RegionMetadataRef,
395 codec: Arc<dyn PrimaryKeyCodec>,
397 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
399}
400
401impl FlatConvertFormat {
402 pub(crate) fn new(
409 metadata: RegionMetadataRef,
410 format_projection: &FormatProjection,
411 codec: Arc<dyn PrimaryKeyCodec>,
412 ) -> Option<Self> {
413 if metadata.primary_key.is_empty() {
414 return None;
415 }
416
417 let mut projected_primary_keys = Vec::new();
419 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
420 if format_projection
421 .column_id_to_projected_index
422 .contains_key(&column_id)
423 {
424 let column_index = metadata.column_index_by_id(column_id).unwrap();
426 projected_primary_keys.push((column_id, pk_index, column_index));
427 }
428 }
429
430 Some(Self {
431 metadata,
432 codec,
433 projected_primary_keys,
434 })
435 }
436
437 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
443 if self.projected_primary_keys.is_empty() {
444 return Ok(batch);
445 }
446
447 let primary_key_index = primary_key_column_index(batch.num_columns());
448 let pk_dict_array = batch
449 .column(primary_key_index)
450 .as_any()
451 .downcast_ref::<PrimaryKeyArray>()
452 .with_context(|| InvalidRecordBatchSnafu {
453 reason: "Primary key column is not a dictionary array".to_string(),
454 })?;
455
456 let pk_values_array = pk_dict_array
457 .values()
458 .as_any()
459 .downcast_ref::<BinaryArray>()
460 .with_context(|| InvalidRecordBatchSnafu {
461 reason: "Primary key values are not binary array".to_string(),
462 })?;
463
464 let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len());
466 for i in 0..pk_values_array.len() {
467 if pk_values_array.is_null(i) {
468 decoded_pk_values.push(None);
469 } else {
470 let pk_bytes = pk_values_array.value(i);
471 let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?;
472 decoded_pk_values.push(Some(decoded));
473 }
474 }
475
476 let mut decoded_columns = Vec::new();
478 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
479 let column_metadata = &self.metadata.column_metadatas[*column_index];
480 let tag_column = self.build_primary_key_column(
481 *column_id,
482 *pk_index,
483 &column_metadata.column_schema.data_type,
484 pk_dict_array.keys(),
485 &decoded_pk_values,
486 )?;
487 decoded_columns.push(tag_column);
488 }
489
490 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
492 new_columns.extend(decoded_columns);
493 new_columns.extend_from_slice(batch.columns());
494
495 let mut new_fields =
497 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
498 for (_, _, column_index) in &self.projected_primary_keys {
499 let column_metadata = &self.metadata.column_metadatas[*column_index];
500 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
501 let field =
502 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
503 new_fields.push(field);
504 }
505 new_fields.extend(batch.schema().fields().iter().cloned());
506
507 let new_schema = Arc::new(Schema::new(new_fields));
508 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
509 }
510
511 fn build_primary_key_column(
516 &self,
517 column_id: ColumnId,
518 pk_index: usize,
519 column_type: &ConcreteDataType,
520 keys: &UInt32Array,
521 decoded_pk_values: &[Option<CompositeValues>],
522 ) -> Result<ArrayRef> {
523 let mut builder = column_type.create_mutable_vector(decoded_pk_values.len());
525 for decoded_opt in decoded_pk_values {
526 match decoded_opt {
527 Some(decoded) => {
528 match decoded {
529 CompositeValues::Dense(dense) => {
530 if pk_index < dense.len() {
531 builder.push_value_ref(dense[pk_index].1.as_value_ref());
532 } else {
533 builder.push_null();
534 }
535 }
536 CompositeValues::Sparse(sparse) => {
537 let value = sparse.get_or_null(column_id);
538 builder.push_value_ref(value.as_value_ref());
539 }
540 };
541 }
542 None => builder.push_null(),
543 }
544 }
545
546 let values_vector = builder.to_vector();
547 let values_array = values_vector.to_arrow_array();
548
549 if matches!(column_type, ConcreteDataType::String(_)) {
551 let dict_array = DictionaryArray::new(keys.clone(), values_array);
554 Ok(Arc::new(dict_array))
555 } else {
556 let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?;
558 Ok(taken_array)
559 }
560 }
561}
562
563#[cfg(test)]
564impl FlatReadFormat {
565 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
567 Self::new(
568 Arc::clone(&metadata),
569 metadata.column_metadatas.iter().map(|c| c.column_id),
570 false,
571 )
572 }
573}