1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52 NewRecordBatchSnafu, Result,
53};
54use crate::read::read_columns::ReadColumns;
55use crate::sst::parquet::format::{
56 FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
57 PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
58};
59use crate::sst::parquet::read_columns::ParquetReadColumns;
60use crate::sst::{
61 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
62 to_flat_sst_arrow_schema, with_field_id,
63};
64
65pub(crate) struct FlatWriteFormat {
67 arrow_schema: SchemaRef,
69 override_sequence: Option<SequenceNumber>,
70}
71
72impl FlatWriteFormat {
73 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
75 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
76 FlatWriteFormat {
77 arrow_schema,
78 override_sequence: None,
79 }
80 }
81
82 pub(crate) fn with_override_sequence(
84 mut self,
85 override_sequence: Option<SequenceNumber>,
86 ) -> Self {
87 self.override_sequence = override_sequence;
88 self
89 }
90
91 #[cfg(test)]
93 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
94 &self.arrow_schema
95 }
96
97 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
99 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
100
101 let Some(override_sequence) = self.override_sequence else {
102 return Ok(batch.clone());
103 };
104
105 let mut columns = batch.columns().to_vec();
106 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
107 columns[sequence_column_index(batch.num_columns())] = sequence_array;
108
109 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
110 }
111}
112
113pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
115 num_columns - 2
116}
117
118pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
120 num_columns - 4
121}
122
123pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
125 num_columns - 3
126}
127
128pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
130 num_columns - 1
131}
132
133pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
142 let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
145 num_columns - FIXED_POS_COLUMN_NUM - field_column_count
146}
147
148pub struct FlatReadFormat {
155 override_sequence: Option<SequenceNumber>,
157 parquet_adapter: ParquetAdapter,
159}
160
161impl FlatReadFormat {
162 pub fn new(
166 metadata: RegionMetadataRef,
167 read_cols: ReadColumns,
168 file_schema: Option<SchemaRef>,
169 file_path: &str,
170 skip_auto_convert: bool,
171 ) -> Result<FlatReadFormat> {
172 let num_columns = file_schema.as_ref().map(|x| x.fields().len());
173 let is_legacy = match num_columns {
174 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
175 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
176 };
177
178 let parquet_adapter = if is_legacy {
179 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
181 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
183 metadata,
184 read_cols,
185 skip_auto_convert,
186 ))
187 } else {
188 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
189 metadata, read_cols, false,
190 ))
191 }
192 } else {
193 let file_schema = file_schema
194 .unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
195 ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, file_schema))
196 };
197
198 Ok(FlatReadFormat {
199 override_sequence: None,
200 parquet_adapter,
201 })
202 }
203
204 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
206 self.override_sequence = sequence;
207 }
208
209 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
211 self.format_projection()
212 .column_id_to_projected_index
213 .get(&column_id)
214 .copied()
215 }
216
217 pub fn min_values(
219 &self,
220 row_groups: &[impl Borrow<RowGroupMetaData>],
221 column_id: ColumnId,
222 ) -> StatValues {
223 match &self.parquet_adapter {
224 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
225 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
226 }
227 }
228
229 pub fn max_values(
231 &self,
232 row_groups: &[impl Borrow<RowGroupMetaData>],
233 column_id: ColumnId,
234 ) -> StatValues {
235 match &self.parquet_adapter {
236 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
237 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
238 }
239 }
240
241 pub fn null_counts(
243 &self,
244 row_groups: &[impl Borrow<RowGroupMetaData>],
245 column_id: ColumnId,
246 ) -> StatValues {
247 match &self.parquet_adapter {
248 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
249 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
250 }
251 }
252
253 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
255 match &self.parquet_adapter {
256 ParquetAdapter::Flat(p) => &p.arrow_schema,
257 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
258 }
259 }
260
261 pub(crate) fn output_arrow_schema(&self) -> Result<SchemaRef> {
263 let projection = self.parquet_read_columns().root_indices();
264 let schema = self
265 .arrow_schema()
266 .project(projection)
267 .context(ComputeArrowSnafu)?;
268 Ok(Arc::new(schema))
269 }
270
271 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
273 match &self.parquet_adapter {
274 ParquetAdapter::Flat(p) => &p.metadata,
275 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
276 }
277 }
278
279 pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
281 match &self.parquet_adapter {
282 ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols,
283 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(),
284 }
285 }
286
287 pub(crate) fn format_projection(&self) -> &FormatProjection {
292 match &self.parquet_adapter {
293 ParquetAdapter::Flat(p) => &p.format_projection,
294 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
295 }
296 }
297
298 pub(crate) fn batch_has_raw_pk_columns(&self) -> bool {
302 matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
303 }
304
305 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
307 self.override_sequence
308 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
309 }
310
311 pub(crate) fn convert_batch(
316 &self,
317 record_batch: RecordBatch,
318 override_sequence_array: Option<&ArrayRef>,
319 ) -> Result<RecordBatch> {
320 let batch = match &self.parquet_adapter {
322 ParquetAdapter::Flat(_) => record_batch,
323 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
324 };
325
326 let Some(override_array) = override_sequence_array else {
328 return Ok(batch);
329 };
330
331 let mut columns = batch.columns().to_vec();
332 let sequence_column_idx = sequence_column_index(batch.num_columns());
333
334 let sequence_array = if override_array.len() > batch.num_rows() {
336 override_array.slice(0, batch.num_rows())
337 } else {
338 override_array.clone()
339 };
340
341 columns[sequence_column_idx] = sequence_array;
342
343 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
344 }
345
346 pub(crate) fn is_legacy_format(
352 metadata: &RegionMetadata,
353 num_columns: usize,
354 file_path: &str,
355 ) -> Result<bool> {
356 if metadata.primary_key.is_empty() {
357 return Ok(false);
358 }
359
360 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
363
364 if expected_columns == num_columns {
365 Ok(false)
367 } else {
368 ensure!(
369 expected_columns >= num_columns,
370 InvalidParquetSnafu {
371 file: file_path,
372 reason: format!(
373 "Expected columns {} should be >= actual columns {}",
374 expected_columns, num_columns
375 )
376 }
377 );
378
379 let column_diff = expected_columns - num_columns;
381
382 ensure!(
383 column_diff == metadata.primary_key.len(),
384 InvalidParquetSnafu {
385 file: file_path,
386 reason: format!(
387 "Column number difference {} does not match primary key count {}",
388 column_diff,
389 metadata.primary_key.len()
390 )
391 }
392 );
393
394 Ok(true)
395 }
396 }
397}
398
399enum ParquetAdapter {
401 Flat(ParquetFlat),
402 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
403}
404
405struct ParquetPrimaryKeyToFlat {
407 format: PrimaryKeyReadFormat,
409 convert_format: Option<FlatConvertFormat>,
411 format_projection: FormatProjection,
413}
414
415impl ParquetPrimaryKeyToFlat {
416 fn new(
418 metadata: RegionMetadataRef,
419 read_cols: ReadColumns,
420 skip_auto_convert: bool,
421 ) -> ParquetPrimaryKeyToFlat {
422 assert!(if skip_auto_convert {
423 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
424 } else {
425 true
426 });
427
428 let id_to_index = sst_column_id_indices(&metadata);
430 let sst_column_num =
431 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
432
433 let codec = build_primary_key_codec(&metadata);
434 let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone());
435 let (convert_format, format_projection) = if skip_auto_convert {
436 (
437 None,
438 FormatProjection {
439 parquet_read_cols: format.parquet_read_columns().clone(),
440 column_id_to_projected_index: format.field_id_to_projected_index().clone(),
441 },
442 )
443 } else {
444 let format_projection = FormatProjection::compute_format_projection(
446 &id_to_index,
447 sst_column_num,
448 read_cols.clone(),
449 );
450 (
451 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
452 format_projection,
453 )
454 };
455
456 Self {
457 format,
458 convert_format,
459 format_projection,
460 }
461 }
462
463 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
464 if let Some(convert_format) = &self.convert_format {
465 convert_format.convert(record_batch)
466 } else {
467 Ok(record_batch)
468 }
469 }
470}
471
472struct ParquetFlat {
474 metadata: RegionMetadataRef,
476 arrow_schema: SchemaRef,
478 format_projection: FormatProjection,
480 column_id_to_sst_index: HashMap<ColumnId, usize>,
482}
483
484impl ParquetFlat {
485 fn new(
487 metadata: RegionMetadataRef,
488 read_cols: ReadColumns,
489 arrow_schema: SchemaRef,
490 ) -> ParquetFlat {
491 let id_to_index = sst_column_id_indices(&metadata);
493 let sst_column_num =
494 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
495 let format_projection =
496 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols);
497
498 Self {
499 metadata,
500 arrow_schema,
501 format_projection,
502 column_id_to_sst_index: id_to_index,
503 }
504 }
505
506 fn min_values(
508 &self,
509 row_groups: &[impl Borrow<RowGroupMetaData>],
510 column_id: ColumnId,
511 ) -> StatValues {
512 self.get_stat_values(row_groups, column_id, true)
513 }
514
515 fn max_values(
517 &self,
518 row_groups: &[impl Borrow<RowGroupMetaData>],
519 column_id: ColumnId,
520 ) -> StatValues {
521 self.get_stat_values(row_groups, column_id, false)
522 }
523
524 fn null_counts(
526 &self,
527 row_groups: &[impl Borrow<RowGroupMetaData>],
528 column_id: ColumnId,
529 ) -> StatValues {
530 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
531 return StatValues::NoColumn;
533 };
534
535 let stats = column_null_counts(row_groups, *index);
536 StatValues::from_stats_opt(stats)
537 }
538
539 fn get_stat_values(
540 &self,
541 row_groups: &[impl Borrow<RowGroupMetaData>],
542 column_id: ColumnId,
543 is_min: bool,
544 ) -> StatValues {
545 let Some(column) = self.metadata.column_by_id(column_id) else {
546 return StatValues::NoColumn;
548 };
549 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
551
552 let stats = column_values(row_groups, column, *index, is_min);
553 StatValues::from_stats_opt(stats)
554 }
555}
556
557pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
561 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
562 let mut column_index = 0;
563 for pk_id in &metadata.primary_key {
565 id_to_index.insert(*pk_id, column_index);
566 column_index += 1;
567 }
568 for column in &metadata.column_metadatas {
570 if column.semantic_type == SemanticType::Field {
571 id_to_index.insert(column.column_id, column_index);
572 column_index += 1;
573 }
574 }
575 id_to_index.insert(metadata.time_index_column().column_id, column_index);
577
578 id_to_index
579}
580
581pub(crate) fn decode_primary_keys(
585 codec: &dyn PrimaryKeyCodec,
586 batch: &RecordBatch,
587) -> Result<DecodedPrimaryKeys> {
588 let primary_key_index = primary_key_column_index(batch.num_columns());
589 let pk_dict_array = batch
590 .column(primary_key_index)
591 .as_any()
592 .downcast_ref::<PrimaryKeyArray>()
593 .with_context(|| InvalidRecordBatchSnafu {
594 reason: "Primary key column is not a dictionary array".to_string(),
595 })?;
596 let pk_values_array = pk_dict_array
597 .values()
598 .as_any()
599 .downcast_ref::<BinaryArray>()
600 .with_context(|| InvalidRecordBatchSnafu {
601 reason: "Primary key values are not binary array".to_string(),
602 })?;
603
604 let keys = pk_dict_array.keys();
605
606 let mut key_to_decoded_index = Vec::with_capacity(keys.len());
609 let mut decoded_pk_values = Vec::new();
610 let mut prev_key: Option<u32> = None;
611
612 let pk_indices = keys.values();
615 for ¤t_key in pk_indices.iter().take(keys.len()) {
616 if let Some(prev) = prev_key
618 && prev == current_key
619 {
620 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
622 continue;
623 }
624
625 let pk_bytes = pk_values_array.value(current_key as usize);
627 let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
628
629 decoded_pk_values.push(decoded_value);
630 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
631 prev_key = Some(current_key);
632 }
633
634 let keys_array = UInt32Array::from(key_to_decoded_index);
636
637 Ok(DecodedPrimaryKeys {
638 decoded_pk_values,
639 keys_array,
640 })
641}
642
643pub(crate) struct DecodedPrimaryKeys {
645 decoded_pk_values: Vec<CompositeValues>,
647 keys_array: UInt32Array,
649}
650
651impl DecodedPrimaryKeys {
652 pub(crate) fn get_tag_column(
657 &self,
658 column_id: ColumnId,
659 pk_index: Option<usize>,
660 column_type: &ConcreteDataType,
661 ) -> Result<ArrayRef> {
662 let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
664 for decoded in &self.decoded_pk_values {
665 match decoded {
666 CompositeValues::Dense(dense) => {
667 let pk_idx = pk_index.expect("pk_index required for dense encoding");
668 if pk_idx < dense.len() {
669 builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
670 } else {
671 builder.push_null();
672 }
673 }
674 CompositeValues::Sparse(sparse) => {
675 let value = sparse.get_or_null(column_id);
676 builder.push_value_ref(&value.as_value_ref());
677 }
678 };
679 }
680
681 let values_vector = builder.to_vector();
682 let values_array = values_vector.to_arrow_array();
683
684 if column_type.is_string() {
686 let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
689 Ok(Arc::new(dict_array))
690 } else {
691 let taken_array =
693 take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
694 Ok(taken_array)
695 }
696 }
697}
698
699pub(crate) struct FlatConvertFormat {
702 metadata: RegionMetadataRef,
704 codec: Arc<dyn PrimaryKeyCodec>,
706 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
708}
709
710impl FlatConvertFormat {
711 pub(crate) fn new(
718 metadata: RegionMetadataRef,
719 format_projection: &FormatProjection,
720 codec: Arc<dyn PrimaryKeyCodec>,
721 ) -> Option<Self> {
722 if metadata.primary_key.is_empty() {
723 return None;
724 }
725
726 let mut projected_primary_keys = Vec::new();
728 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
729 if format_projection
730 .column_id_to_projected_index
731 .contains_key(&column_id)
732 {
733 let column_index = metadata.column_index_by_id(column_id).unwrap();
735 projected_primary_keys.push((column_id, pk_index, column_index));
736 }
737 }
738
739 Some(Self {
740 metadata,
741 codec,
742 projected_primary_keys,
743 })
744 }
745
746 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
750 if self.projected_primary_keys.is_empty() {
751 return Ok(batch);
752 }
753
754 let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
755
756 let mut decoded_columns = Vec::new();
758 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
759 let column_metadata = &self.metadata.column_metadatas[*column_index];
760 let tag_column = decoded_pks.get_tag_column(
761 *column_id,
762 Some(*pk_index),
763 &column_metadata.column_schema.data_type,
764 )?;
765 decoded_columns.push(tag_column);
766 }
767
768 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
770 new_columns.extend(decoded_columns);
771 new_columns.extend_from_slice(batch.columns());
772
773 let mut new_fields =
775 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
776 for (column_id, _, column_index) in &self.projected_primary_keys {
777 let column_metadata = &self.metadata.column_metadatas[*column_index];
778 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
779 let field =
780 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
781 new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id)));
782 }
783 new_fields.extend(batch.schema().fields().iter().cloned());
784
785 let new_schema = Arc::new(Schema::new(new_fields));
786 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
787 }
788}
789
790#[cfg(test)]
791impl FlatReadFormat {
792 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
794 Self::new(
795 Arc::clone(&metadata),
796 ReadColumns::from_deduped_column_ids(
797 metadata.column_metadatas.iter().map(|c| c.column_id),
798 ),
799 None,
800 "test",
801 false,
802 )
803 .unwrap()
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use std::sync::Arc;
810
811 use api::v1::SemanticType;
812 use datatypes::prelude::ConcreteDataType;
813 use datatypes::schema::ColumnSchema;
814 use store_api::codec::PrimaryKeyEncoding;
815 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
816 use store_api::storage::RegionId;
817
818 use super::{FlatReadFormat, field_column_start};
819 use crate::read::read_columns::ReadColumns;
820 use crate::sst::{
821 FlatSchemaOptions, flat_sst_arrow_schema_column_num, to_flat_sst_arrow_schema,
822 };
823
824 fn build_metadata(
826 num_tags: usize,
827 num_fields: usize,
828 encoding: PrimaryKeyEncoding,
829 ) -> RegionMetadata {
830 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
831 let mut col_id = 0u32;
832
833 for i in 0..num_tags {
834 builder.push_column_metadata(ColumnMetadata {
835 column_schema: ColumnSchema::new(
836 format!("tag_{i}"),
837 ConcreteDataType::string_datatype(),
838 true,
839 ),
840 semantic_type: SemanticType::Tag,
841 column_id: col_id,
842 });
843 col_id += 1;
844 }
845
846 for i in 0..num_fields {
847 builder.push_column_metadata(ColumnMetadata {
848 column_schema: ColumnSchema::new(
849 format!("field_{i}"),
850 ConcreteDataType::uint64_datatype(),
851 true,
852 ),
853 semantic_type: SemanticType::Field,
854 column_id: col_id,
855 });
856 col_id += 1;
857 }
858
859 builder.push_column_metadata(ColumnMetadata {
860 column_schema: ColumnSchema::new(
861 "ts".to_string(),
862 ConcreteDataType::timestamp_millisecond_datatype(),
863 false,
864 ),
865 semantic_type: SemanticType::Timestamp,
866 column_id: col_id,
867 });
868
869 let primary_key: Vec<u32> = (0..num_tags as u32).collect();
870 builder.primary_key(primary_key);
871 builder.primary_key_encoding(encoding);
872 builder.build().unwrap()
873 }
874
875 #[test]
876 fn test_field_column_start() {
877 let cases = [
879 (1, 1, PrimaryKeyEncoding::Dense, 1),
880 (2, 2, PrimaryKeyEncoding::Dense, 2),
881 (0, 2, PrimaryKeyEncoding::Dense, 0),
882 (2, 2, PrimaryKeyEncoding::Sparse, 0),
883 ];
884
885 for (num_tags, num_fields, encoding, expected) in cases {
886 let metadata = build_metadata(num_tags, num_fields, encoding);
887 let options = FlatSchemaOptions::from_encoding(encoding);
888 let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
889 let result = field_column_start(&metadata, num_columns);
890 assert_eq!(
891 result, expected,
892 "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
893 );
894 }
895 }
896
897 #[test]
898 fn test_output_arrow_schema_uses_projection() {
899 let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense));
900 let read_format = FlatReadFormat::new(
901 metadata.clone(),
902 ReadColumns::from_deduped_column_ids([0_u32, 2_u32]),
903 None,
904 "test",
905 false,
906 )
907 .unwrap();
908
909 let output_schema = read_format.output_arrow_schema().unwrap();
910 let projection = read_format.parquet_read_columns().root_indices();
911 let expected = Arc::new(
912 to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default())
913 .project(projection)
914 .unwrap(),
915 );
916
917 assert_eq!(expected, output_schema);
918 }
919}