1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::error::{
41 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
42 EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
43 UnexpectedSnafu,
44};
45use crate::read::Batch;
46use crate::read::compat::CompatBatch;
47use crate::read::flat_projection::CompactionProjectionMapper;
48use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
49use crate::read::prune::{FlatPruneReader, PruneReader};
50use crate::sst::file::FileHandle;
51use crate::sst::parquet::flat_format::{
52 DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
53};
54use crate::sst::parquet::format::ReadFormat;
55use crate::sst::parquet::reader::{
56 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
57};
58use crate::sst::parquet::row_group::ParquetFetchMetrics;
59use crate::sst::parquet::stats::RowGroupPruningStats;
60
61pub(crate) fn row_group_contains_delete(
66 parquet_meta: &ParquetMetaData,
67 row_group_index: usize,
68 file_path: &str,
69) -> Result<bool> {
70 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
71
72 let column_metadata = &row_group_metadata.columns().last().unwrap();
74 let stats = column_metadata
75 .statistics()
76 .context(StatsNotPresentSnafu { file_path })?;
77 stats
78 .min_bytes_opt()
79 .context(StatsNotPresentSnafu { file_path })?
80 .try_into()
81 .map(i32::from_le_bytes)
82 .map(|min_op_type| min_op_type == OpType::Delete as i32)
83 .ok()
84 .context(DecodeStatsSnafu { file_path })
85}
86
87#[derive(Clone)]
90pub struct FileRange {
91 context: FileRangeContextRef,
93 row_group_idx: usize,
95 row_selection: Option<RowSelection>,
97}
98
99impl FileRange {
100 pub(crate) fn new(
102 context: FileRangeContextRef,
103 row_group_idx: usize,
104 row_selection: Option<RowSelection>,
105 ) -> Self {
106 Self {
107 context,
108 row_group_idx,
109 row_selection,
110 }
111 }
112
113 fn select_all(&self) -> bool {
115 let rows_in_group = self
116 .context
117 .reader_builder
118 .parquet_metadata()
119 .row_group(self.row_group_idx)
120 .num_rows();
121
122 let Some(row_selection) = &self.row_selection else {
123 return true;
124 };
125 row_selection.row_count() == rows_in_group as usize
126 }
127
128 fn in_dynamic_filter_range(&self) -> bool {
133 if self.context.base.dyn_filters.is_empty() {
134 return true;
135 }
136 let curr_row_group = self
137 .context
138 .reader_builder
139 .parquet_metadata()
140 .row_group(self.row_group_idx);
141 let read_format = self.context.read_format();
142 let prune_schema = &self.context.base.prune_schema;
143 let stats = RowGroupPruningStats::new(
144 std::slice::from_ref(curr_row_group),
145 read_format,
146 self.context.base.expected_metadata.clone(),
147 self.compute_skip_fields(),
148 );
149
150 let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
152
153 pred.prune_with_stats(&stats, prune_schema.arrow_schema())
154 .first()
155 .cloned()
156 .unwrap_or(true) }
158
159 fn compute_skip_fields(&self) -> bool {
160 match self.context.base.pre_filter_mode {
161 PreFilterMode::All => false,
162 PreFilterMode::SkipFields => true,
163 PreFilterMode::SkipFieldsOnDelete => {
164 row_group_contains_delete(
166 self.context.reader_builder.parquet_metadata(),
167 self.row_group_idx,
168 self.context.reader_builder.file_path(),
169 )
170 .unwrap_or(true)
171 }
172 }
173 }
174
175 pub(crate) async fn reader(
177 &self,
178 selector: Option<TimeSeriesRowSelector>,
179 fetch_metrics: Option<&ParquetFetchMetrics>,
180 ) -> Result<Option<PruneReader>> {
181 if !self.in_dynamic_filter_range() {
182 return Ok(None);
183 }
184 let parquet_reader = self
185 .context
186 .reader_builder
187 .build(
188 self.row_group_idx,
189 self.row_selection.clone(),
190 fetch_metrics,
191 )
192 .await?;
193
194 let use_last_row_reader = if selector
195 .map(|s| s == TimeSeriesRowSelector::LastRow)
196 .unwrap_or(false)
197 {
198 let put_only = !self
201 .context
202 .contains_delete(self.row_group_idx)
203 .inspect_err(|e| {
204 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
205 })
206 .unwrap_or(true);
207 put_only && self.select_all()
208 } else {
209 false
211 };
212
213 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
215
216 let prune_reader = if use_last_row_reader {
217 let reader = RowGroupLastRowCachedReader::new(
219 self.file_handle().file_id().file_id(),
220 self.row_group_idx,
221 self.context.reader_builder.cache_strategy().clone(),
222 RowGroupReader::new(self.context.clone(), parquet_reader),
223 );
224 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
225 } else {
226 PruneReader::new_with_row_group_reader(
228 self.context.clone(),
229 RowGroupReader::new(self.context.clone(), parquet_reader),
230 skip_fields,
231 )
232 };
233
234 Ok(Some(prune_reader))
235 }
236
237 pub(crate) async fn flat_reader(
239 &self,
240 selector: Option<TimeSeriesRowSelector>,
241 fetch_metrics: Option<&ParquetFetchMetrics>,
242 ) -> Result<Option<FlatPruneReader>> {
243 if !self.in_dynamic_filter_range() {
244 return Ok(None);
245 }
246 let parquet_reader = self
247 .context
248 .reader_builder
249 .build(
250 self.row_group_idx,
251 self.row_selection.clone(),
252 fetch_metrics,
253 )
254 .await?;
255
256 let use_last_row_reader = if selector
257 .map(|s| s == TimeSeriesRowSelector::LastRow)
258 .unwrap_or(false)
259 {
260 let put_only = !self
263 .context
264 .contains_delete(self.row_group_idx)
265 .inspect_err(|e| {
266 error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
267 })
268 .unwrap_or(true);
269 put_only && self.select_all()
270 } else {
271 false
272 };
273
274 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
276
277 let flat_prune_reader = if use_last_row_reader {
278 let flat_row_group_reader =
279 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
280 let reader = FlatRowGroupLastRowCachedReader::new(
281 self.file_handle().file_id().file_id(),
282 self.row_group_idx,
283 self.context.reader_builder.cache_strategy().clone(),
284 self.context.read_format().projection_indices(),
285 flat_row_group_reader,
286 );
287 FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
288 } else {
289 let flat_row_group_reader =
290 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
291 FlatPruneReader::new_with_row_group_reader(
292 self.context.clone(),
293 flat_row_group_reader,
294 skip_fields,
295 )
296 };
297
298 Ok(Some(flat_prune_reader))
299 }
300
301 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
303 self.context.compat_batch()
304 }
305
306 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
308 self.context.compaction_projection_mapper()
309 }
310
311 pub(crate) fn file_handle(&self) -> &FileHandle {
313 self.context.reader_builder.file_handle()
314 }
315}
316
317pub(crate) struct FileRangeContext {
319 reader_builder: RowGroupReaderBuilder,
321 base: RangeBase,
323}
324
325pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
326
327impl FileRangeContext {
328 pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
330 Self {
331 reader_builder,
332 base,
333 }
334 }
335
336 pub(crate) fn file_path(&self) -> &str {
338 self.reader_builder.file_path()
339 }
340
341 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
343 &self.base.filters
344 }
345
346 pub(crate) fn has_partition_filter(&self) -> bool {
348 self.base.partition_filter.is_some()
349 }
350
351 pub(crate) fn read_format(&self) -> &ReadFormat {
353 &self.base.read_format
354 }
355
356 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
358 &self.reader_builder
359 }
360
361 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
363 self.base.compat_batch.as_ref()
364 }
365
366 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
368 self.base.compaction_projection_mapper.as_ref()
369 }
370
371 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
373 self.base.compat_batch = compat;
374 }
375
376 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
380 self.base.precise_filter(input, skip_fields)
381 }
382
383 pub(crate) fn precise_filter_flat(
386 &self,
387 input: RecordBatch,
388 skip_fields: bool,
389 ) -> Result<Option<RecordBatch>> {
390 self.base.precise_filter_flat(input, skip_fields)
391 }
392
393 pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
395 match self.base.pre_filter_mode {
396 PreFilterMode::All => false,
397 PreFilterMode::SkipFields => true,
398 PreFilterMode::SkipFieldsOnDelete => {
399 self.contains_delete(row_group_idx).unwrap_or(true)
401 }
402 }
403 }
404
405 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
407 let metadata = self.reader_builder.parquet_metadata();
408 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
409 }
410
411 pub(crate) fn memory_size(&self) -> usize {
414 crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
415 }
416}
417
418#[derive(Debug, Clone, Copy)]
420pub enum PreFilterMode {
421 All,
423 SkipFieldsOnDelete,
426 SkipFields,
428}
429
430pub(crate) struct PartitionFilterContext {
432 pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
433 pub(crate) partition_schema: Arc<Schema>,
436}
437
438pub(crate) struct RangeBase {
440 pub(crate) filters: Vec<SimpleFilterContext>,
442 pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
444 pub(crate) read_format: ReadFormat,
446 pub(crate) expected_metadata: Option<RegionMetadataRef>,
447 pub(crate) prune_schema: Arc<Schema>,
449 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
451 pub(crate) compat_batch: Option<CompatBatch>,
453 pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
455 pub(crate) pre_filter_mode: PreFilterMode,
457 pub(crate) partition_filter: Option<PartitionFilterContext>,
459}
460
461pub(crate) struct TagDecodeState {
462 decoded_pks: Option<DecodedPrimaryKeys>,
463 decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
464}
465
466impl TagDecodeState {
467 pub(crate) fn new() -> Self {
468 Self {
469 decoded_pks: None,
470 decoded_tag_cache: HashMap::new(),
471 }
472 }
473}
474
475impl RangeBase {
476 pub(crate) fn precise_filter(
488 &self,
489 mut input: Batch,
490 skip_fields: bool,
491 ) -> Result<Option<Batch>> {
492 let mut mask = BooleanBuffer::new_set(input.num_rows());
493
494 for filter_ctx in &self.filters {
497 let filter = match filter_ctx.filter() {
498 MaybeFilter::Filter(f) => f,
499 MaybeFilter::Matched => continue,
501 MaybeFilter::Pruned => return Ok(None),
503 };
504 let result = match filter_ctx.semantic_type() {
505 SemanticType::Tag => {
506 let pk_values = if let Some(pk_values) = input.pk_values() {
507 pk_values
508 } else {
509 input.set_pk_values(
510 self.codec
511 .decode(input.primary_key())
512 .context(DecodeSnafu)?,
513 );
514 input.pk_values().unwrap()
515 };
516 let pk_value = match pk_values {
517 CompositeValues::Dense(v) => {
518 let pk_index = self
520 .read_format
521 .metadata()
522 .primary_key_index(filter_ctx.column_id())
523 .unwrap();
524 v[pk_index]
525 .1
526 .try_to_scalar_value(filter_ctx.data_type())
527 .context(DataTypeMismatchSnafu)?
528 }
529 CompositeValues::Sparse(v) => {
530 let v = v.get_or_null(filter_ctx.column_id());
531 v.try_to_scalar_value(filter_ctx.data_type())
532 .context(DataTypeMismatchSnafu)?
533 }
534 };
535 if filter
536 .evaluate_scalar(&pk_value)
537 .context(RecordBatchSnafu)?
538 {
539 continue;
540 } else {
541 return Ok(None);
543 }
544 }
545 SemanticType::Field => {
546 if skip_fields {
548 continue;
549 }
550 let Some(field_index) = self
552 .read_format
553 .as_primary_key()
554 .unwrap()
555 .field_index_by_id(filter_ctx.column_id())
556 else {
557 continue;
558 };
559 let field_col = &input.fields()[field_index].data;
560 filter
561 .evaluate_vector(field_col)
562 .context(RecordBatchSnafu)?
563 }
564 SemanticType::Timestamp => filter
565 .evaluate_vector(input.timestamps())
566 .context(RecordBatchSnafu)?,
567 };
568
569 mask = mask.bitand(&result);
570 }
571
572 if mask.count_set_bits() == 0 {
573 return Ok(None);
574 }
575
576 if let Some(partition_filter) = &self.partition_filter {
578 let record_batch = self
579 .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
580 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
581 mask = mask.bitand(&partition_mask);
582 }
583
584 if mask.count_set_bits() == 0 {
585 Ok(None)
586 } else {
587 input.filter(&BooleanArray::from(mask).into())?;
588 Ok(Some(input))
589 }
590 }
591
592 pub(crate) fn precise_filter_flat(
600 &self,
601 input: RecordBatch,
602 skip_fields: bool,
603 ) -> Result<Option<RecordBatch>> {
604 let mut tag_decode_state = TagDecodeState::new();
605 let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
606
607 let Some(mut mask) = mask else {
609 return Ok(None);
610 };
611
612 if let Some(partition_filter) = &self.partition_filter {
614 let record_batch = self.project_record_batch_for_pruning_flat(
615 &input,
616 &partition_filter.partition_schema,
617 &mut tag_decode_state,
618 )?;
619 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
620 mask = mask.bitand(&partition_mask);
621 }
622
623 if mask.count_set_bits() == 0 {
624 return Ok(None);
625 }
626
627 let filtered_batch =
628 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
629 .context(ComputeArrowSnafu)?;
630
631 if filtered_batch.num_rows() > 0 {
632 Ok(Some(filtered_batch))
633 } else {
634 Ok(None)
635 }
636 }
637
638 pub(crate) fn compute_filter_mask_flat(
647 &self,
648 input: &RecordBatch,
649 skip_fields: bool,
650 tag_decode_state: &mut TagDecodeState,
651 ) -> Result<Option<BooleanBuffer>> {
652 let mut mask = BooleanBuffer::new_set(input.num_rows());
653
654 let flat_format = self
655 .read_format
656 .as_flat()
657 .context(crate::error::UnexpectedSnafu {
658 reason: "Expected flat format for precise_filter_flat",
659 })?;
660 let metadata = flat_format.metadata();
661
662 for filter_ctx in &self.filters {
664 let filter = match filter_ctx.filter() {
665 MaybeFilter::Filter(f) => f,
666 MaybeFilter::Matched => continue,
668 MaybeFilter::Pruned => return Ok(None),
670 };
671
672 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
674 continue;
675 }
676
677 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
681 if let Some(idx) = column_idx {
682 let column = &input.columns().get(idx).unwrap();
683 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
684 mask = mask.bitand(&result);
685 } else if filter_ctx.semantic_type() == SemanticType::Tag {
686 let column_id = filter_ctx.column_id();
688
689 if let Some(tag_column) =
690 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
691 {
692 let result = filter
693 .evaluate_array(&tag_column)
694 .context(RecordBatchSnafu)?;
695 mask = mask.bitand(&result);
696 }
697 } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
698 let time_index_pos = time_index_column_index(input.num_columns());
699 let column = &input.columns()[time_index_pos];
700 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
701 mask = mask.bitand(&result);
702 }
703 }
705
706 Ok(Some(mask))
707 }
708
709 fn maybe_decode_tag_column(
711 &self,
712 metadata: &RegionMetadataRef,
713 column_id: ColumnId,
714 input: &RecordBatch,
715 tag_decode_state: &mut TagDecodeState,
716 ) -> Result<Option<ArrayRef>> {
717 let Some(pk_index) = metadata.primary_key_index(column_id) else {
718 return Ok(None);
719 };
720
721 if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
722 return Ok(Some(cached_column.clone()));
723 }
724
725 if tag_decode_state.decoded_pks.is_none() {
726 tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
727 }
728
729 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
730 None
731 } else {
732 Some(pk_index)
733 };
734 let Some(column_index) = metadata.column_index_by_id(column_id) else {
735 return Ok(None);
736 };
737 let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
738 return Ok(None);
739 };
740
741 let column_metadata = &metadata.column_metadatas[column_index];
742 let tag_column = decoded.get_tag_column(
743 column_id,
744 pk_index,
745 &column_metadata.column_schema.data_type,
746 )?;
747 tag_decode_state
748 .decoded_tag_cache
749 .insert(column_id, tag_column.clone());
750
751 Ok(Some(tag_column))
752 }
753
754 fn evaluate_partition_filter(
756 &self,
757 record_batch: &RecordBatch,
758 partition_filter: &PartitionFilterContext,
759 ) -> Result<BooleanBuffer> {
760 let columnar_value = partition_filter
761 .region_partition_physical_expr
762 .evaluate(record_batch)
763 .context(EvalPartitionFilterSnafu)?;
764 let array = columnar_value
765 .into_array(record_batch.num_rows())
766 .context(EvalPartitionFilterSnafu)?;
767 let boolean_array =
768 array
769 .as_any()
770 .downcast_ref::<BooleanArray>()
771 .context(UnexpectedSnafu {
772 reason: "Failed to downcast to BooleanArray".to_string(),
773 })?;
774
775 let mut mask = boolean_array.values().clone();
777 if let Some(nulls) = boolean_array.nulls() {
778 mask = mask.bitand(nulls.inner());
779 }
780
781 Ok(mask)
782 }
783
784 fn build_record_batch_for_pruning(
789 &self,
790 input: &mut Batch,
791 schema: &Arc<Schema>,
792 ) -> Result<RecordBatch> {
793 let arrow_schema = schema.arrow_schema();
794 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
795
796 if input.pk_values().is_none() {
798 input.set_pk_values(
799 self.codec
800 .decode(input.primary_key())
801 .context(DecodeSnafu)?,
802 );
803 }
804
805 for field in arrow_schema.fields() {
806 let metadata = self.read_format.metadata();
807 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
808
809 let Some(column_id) = column_id else {
811 return UnexpectedSnafu {
812 reason: format!(
813 "Partition pruning schema expects column '{}' but it is missing in \
814 region metadata",
815 field.name()
816 ),
817 }
818 .fail();
819 };
820
821 if let Some(pk_index) = metadata.primary_key_index(column_id) {
823 let pk_values = input.pk_values().unwrap();
824 let value = match pk_values {
825 CompositeValues::Dense(v) => &v[pk_index].1,
826 CompositeValues::Sparse(v) => v.get_or_null(column_id),
827 };
828 let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
829 let arrow_scalar = value
830 .try_to_scalar_value(&concrete_type)
831 .context(DataTypeMismatchSnafu)?;
832 let array = arrow_scalar
833 .to_array_of_size(input.num_rows())
834 .context(EvalPartitionFilterSnafu)?;
835 columns.push(array);
836 } else if metadata.time_index_column().column_id == column_id {
837 columns.push(input.timestamps().to_arrow_array());
839 } else if let Some(field_index) = self
840 .read_format
841 .as_primary_key()
842 .and_then(|f| f.field_index_by_id(column_id))
843 {
844 columns.push(input.fields()[field_index].data.to_arrow_array());
846 } else {
847 return UnexpectedSnafu {
848 reason: format!(
849 "Partition pruning schema expects column '{}' (id {}) but it is not \
850 present in input batch",
851 field.name(),
852 column_id
853 ),
854 }
855 .fail();
856 }
857 }
858
859 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
860 }
861
862 fn project_record_batch_for_pruning_flat(
867 &self,
868 input: &RecordBatch,
869 schema: &Arc<Schema>,
870 tag_decode_state: &mut TagDecodeState,
871 ) -> Result<RecordBatch> {
872 let arrow_schema = schema.arrow_schema();
873 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
874
875 let flat_format = self
876 .read_format
877 .as_flat()
878 .context(crate::error::UnexpectedSnafu {
879 reason: "Expected flat format for precise_filter_flat",
880 })?;
881 let metadata = flat_format.metadata();
882
883 for field in arrow_schema.fields() {
884 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
885
886 let Some(column_id) = column_id else {
887 return UnexpectedSnafu {
888 reason: format!(
889 "Partition pruning schema expects column '{}' but it is missing in \
890 region metadata",
891 field.name()
892 ),
893 }
894 .fail();
895 };
896
897 if let Some(idx) = flat_format.projected_index_by_id(column_id) {
898 columns.push(input.column(idx).clone());
899 continue;
900 }
901
902 if metadata.time_index_column().column_id == column_id {
903 let time_index_pos = time_index_column_index(input.num_columns());
904 columns.push(input.column(time_index_pos).clone());
905 continue;
906 }
907
908 if let Some(tag_column) =
909 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
910 {
911 columns.push(tag_column);
912 continue;
913 }
914
915 return UnexpectedSnafu {
916 reason: format!(
917 "Partition pruning schema expects column '{}' (id {}) but it is not \
918 present in projected record batch",
919 field.name(),
920 column_id
921 ),
922 }
923 .fail();
924 }
925
926 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
927 }
928}