Skip to main content

mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::array::Array;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::Helper;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::{RegionMetadata, RegionMetadataRef};
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
35use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
36use crate::sst::parquet::flat_format::sst_column_id_indices;
37use crate::sst::parquet::format::FormatProjection;
38use crate::sst::{
39    FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
40};
41
42/// Handles projection and converts batches in flat format with correct schema.
43///
44/// This mapper support duplicate and unsorted projection indices.
45/// The output schema is determined by the projection indices.
46pub struct FlatProjectionMapper {
47    /// Metadata of the region.
48    metadata: RegionMetadataRef,
49    /// Schema for converted [RecordBatch] to return.
50    output_schema: SchemaRef,
51    /// Ids of columns to read from memtables and SSTs.
52    /// The mapper won't deduplicate the column ids.
53    ///
54    /// Note that this doesn't contain the `__table_id` and `__tsid`.
55    read_column_ids: Vec<ColumnId>,
56    /// Ids and DataTypes of columns of the expected batch.
57    /// We can use this to check if the batch is compatible with the expected schema.
58    ///
59    /// It doesn't contain internal columns but always contains the time index column.
60    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
61    /// `true` If the original projection is empty.
62    is_empty_projection: bool,
63    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
64    batch_indices: Vec<usize>,
65    /// Precomputed Arrow schema for input batches.
66    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
67}
68
69impl FlatProjectionMapper {
70    /// Returns a new mapper with projection.
71    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
72    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
73    /// empty `RecordBatch` and only use its row count in this query.
74    pub fn new(
75        metadata: &RegionMetadataRef,
76        projection: impl Iterator<Item = usize>,
77    ) -> Result<Self> {
78        let projection: Vec<_> = projection.collect();
79        let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
80        Self::new_with_read_columns(metadata, projection, read_column_ids)
81    }
82
83    /// Returns a new mapper with output projection and explicit read columns.
84    pub fn new_with_read_columns(
85        metadata: &RegionMetadataRef,
86        projection: Vec<usize>,
87        read_column_ids: Vec<ColumnId>,
88    ) -> Result<Self> {
89        // If the original projection is empty.
90        let is_empty_projection = projection.is_empty();
91
92        // Output column schemas for the projection.
93        let mut column_schemas = Vec::with_capacity(projection.len());
94        // Column ids of the output projection without deduplication.
95        let mut output_column_ids = Vec::with_capacity(projection.len());
96        for idx in &projection {
97            // For each projection index, we get the column id for projection.
98            let column =
99                metadata
100                    .column_metadatas
101                    .get(*idx)
102                    .with_context(|| InvalidRequestSnafu {
103                        region_id: metadata.region_id,
104                        reason: format!("projection index {} is out of bound", idx),
105                    })?;
106
107            output_column_ids.push(column.column_id);
108            // Safety: idx is valid.
109            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
110        }
111
112        // Creates a map to lookup index.
113        let id_to_index = sst_column_id_indices(metadata);
114        // TODO(yingwen): Support different flat schema options.
115        let format_projection = FormatProjection::compute_format_projection(
116            &id_to_index,
117            // All columns with internal columns.
118            metadata.column_metadatas.len() + 3,
119            read_column_ids.iter().copied(),
120        );
121
122        let batch_schema = flat_projected_columns(metadata, &format_projection);
123
124        // Safety: We get the column id from the metadata.
125        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
126
127        // If projection is empty, we don't output any column.
128        let output_schema = if is_empty_projection {
129            Arc::new(Schema::new(vec![]))
130        } else {
131            // Safety: Columns come from existing schema.
132            Arc::new(Schema::new(column_schemas))
133        };
134
135        let batch_indices = if is_empty_projection {
136            vec![]
137        } else {
138            output_column_ids
139                .iter()
140                .map(|id| {
141                    // Safety: The map is computed from the read projection.
142                    format_projection
143                        .column_id_to_projected_index
144                        .get(id)
145                        .copied()
146                        .with_context(|| {
147                            let name = metadata
148                                .column_by_id(*id)
149                                .map(|column| column.column_schema.name.clone())
150                                .unwrap_or_else(|| id.to_string());
151                            InvalidRequestSnafu {
152                                region_id: metadata.region_id,
153                                reason: format!(
154                                    "output column {} is missing in read projection",
155                                    name
156                                ),
157                            }
158                        })
159                })
160                .collect::<Result<Vec<_>>>()?
161        };
162
163        Ok(FlatProjectionMapper {
164            metadata: metadata.clone(),
165            output_schema,
166            read_column_ids,
167            batch_schema,
168            is_empty_projection,
169            batch_indices,
170            input_arrow_schema,
171        })
172    }
173
174    /// Returns a new mapper without projection.
175    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
176        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
177    }
178
179    /// Returns the metadata that created the mapper.
180    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
181        &self.metadata
182    }
183
184    /// Returns ids of projected columns that we need to read
185    /// from memtables and SSTs.
186    pub(crate) fn column_ids(&self) -> &[ColumnId] {
187        &self.read_column_ids
188    }
189
190    /// Returns the field column start index in output batch.
191    pub(crate) fn field_column_start(&self) -> usize {
192        for (idx, column_id) in self
193            .batch_schema
194            .iter()
195            .map(|(column_id, _)| column_id)
196            .enumerate()
197        {
198            // Safety: We get the column id from the metadata in new().
199            if self
200                .metadata
201                .column_by_id(*column_id)
202                .unwrap()
203                .semantic_type
204                == SemanticType::Field
205            {
206                return idx;
207            }
208        }
209
210        self.batch_schema.len()
211    }
212
213    /// Returns ids of columns of the batch that the mapper expects to convert.
214    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
215        &self.batch_schema
216    }
217
218    /// Returns the input arrow schema from sources.
219    ///
220    /// The merge reader can use this schema.
221    pub(crate) fn input_arrow_schema(
222        &self,
223        compaction: bool,
224    ) -> datatypes::arrow::datatypes::SchemaRef {
225        if !compaction {
226            self.input_arrow_schema.clone()
227        } else {
228            // For compaction, we need to build a different schema from encoding.
229            to_flat_sst_arrow_schema(
230                &self.metadata,
231                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
232            )
233        }
234    }
235
236    /// Returns the schema of converted [RecordBatch].
237    /// This is the schema that the stream will output. This schema may contain
238    /// less columns than [FlatProjectionMapper::column_ids()].
239    pub(crate) fn output_schema(&self) -> SchemaRef {
240        self.output_schema.clone()
241    }
242
243    /// Returns an empty [RecordBatch].
244    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
245        RecordBatch::new_empty(self.output_schema.clone())
246    }
247
248    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
249    ///
250    /// The batch must match the `projection` using to build the mapper.
251    pub(crate) fn convert(
252        &self,
253        batch: &datatypes::arrow::record_batch::RecordBatch,
254        cache_strategy: &CacheStrategy,
255    ) -> common_recordbatch::error::Result<RecordBatch> {
256        if self.is_empty_projection {
257            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
258        }
259        // Construct output record batch directly from Arrow arrays to avoid
260        // Arrow -> Vector -> Arrow roundtrips in the hot path.
261        let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
262        for (output_idx, index) in self.batch_indices.iter().enumerate() {
263            let mut array = batch.column(*index).clone();
264            // Cast dictionary values to the target type.
265            if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
266                // When a string dictionary column contains only a single value, reuse a cached
267                // repeated vector to avoid repeatedly expanding the dictionary.
268                if let Some(dict_array) = single_value_string_dictionary(
269                    &array,
270                    &self.output_schema.column_schemas()[output_idx].data_type,
271                    value_type.as_ref(),
272                ) {
273                    let dict_values = dict_array.values();
274                    let value = if dict_values.is_null(0) {
275                        Value::Null
276                    } else {
277                        Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
278                    };
279
280                    let repeated = repeated_vector_with_cache(
281                        &self.output_schema.column_schemas()[output_idx].data_type,
282                        &value,
283                        batch.num_rows(),
284                        cache_strategy,
285                    )?;
286                    array = repeated.to_arrow_array();
287                } else {
288                    let casted = datatypes::arrow::compute::cast(&array, value_type)
289                        .context(ArrowComputeSnafu)?;
290                    array = casted;
291                }
292            }
293            arrays.push(array);
294        }
295
296        let df_record_batch =
297            DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
298                .context(NewDfRecordBatchSnafu)?;
299        Ok(RecordBatch::from_df_record_batch(
300            self.output_schema.clone(),
301            df_record_batch,
302        ))
303    }
304
305    /// Projects columns from the input batch and converts them into vectors.
306    pub(crate) fn project_vectors(
307        &self,
308        batch: &datatypes::arrow::record_batch::RecordBatch,
309    ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
310        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
311        for index in &self.batch_indices {
312            let mut array = batch.column(*index).clone();
313            // Casts dictionary values to the target type.
314            if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
315                array.data_type()
316            {
317                let casted = datatypes::arrow::compute::cast(&array, value_type)
318                    .context(ArrowComputeSnafu)?;
319                array = casted;
320            }
321            let vector = Helper::try_into_vector(array)
322                .map_err(BoxedError::new)
323                .context(ExternalSnafu)?;
324            columns.push(vector);
325        }
326        Ok(columns)
327    }
328}
329
330fn single_value_string_dictionary<'a>(
331    array: &'a Arc<dyn Array>,
332    output_type: &ConcreteDataType,
333    value_type: &ArrowDataType,
334) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
335    if !matches!(
336        value_type,
337        ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
338    ) || !output_type.is_string()
339    {
340        return None;
341    }
342
343    let dict_array = array
344        .as_any()
345        .downcast_ref::<datatypes::arrow::array::DictionaryArray<
346            datatypes::arrow::datatypes::UInt32Type,
347        >>()?;
348
349    (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
350}
351
352/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
353///
354/// It adds the time index column if it doesn't present in the projection.
355pub(crate) fn flat_projected_columns(
356    metadata: &RegionMetadata,
357    format_projection: &FormatProjection,
358) -> Vec<(ColumnId, ConcreteDataType)> {
359    let time_index = metadata.time_index_column();
360    let num_columns = if format_projection
361        .column_id_to_projected_index
362        .contains_key(&time_index.column_id)
363    {
364        format_projection.column_id_to_projected_index.len()
365    } else {
366        format_projection.column_id_to_projected_index.len() + 1
367    };
368    let mut schema = vec![None; num_columns];
369    for (column_id, index) in &format_projection.column_id_to_projected_index {
370        // Safety: FormatProjection ensures the id is valid.
371        schema[*index] = Some((
372            *column_id,
373            metadata
374                .column_by_id(*column_id)
375                .unwrap()
376                .column_schema
377                .data_type
378                .clone(),
379        ));
380    }
381    if num_columns != format_projection.column_id_to_projected_index.len() {
382        schema[num_columns - 1] = Some((
383            time_index.column_id,
384            time_index.column_schema.data_type.clone(),
385        ));
386    }
387
388    // Safety: FormatProjection ensures all indices can be unwrapped.
389    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
390}
391
392/// Computes the Arrow schema for input batches.
393///
394/// # Panics
395/// Panics if it can't find the column by the column id in the batch_schema.
396pub(crate) fn compute_input_arrow_schema(
397    metadata: &RegionMetadata,
398    batch_schema: &[(ColumnId, ConcreteDataType)],
399) -> datatypes::arrow::datatypes::SchemaRef {
400    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
401    for (column_id, _) in batch_schema {
402        let column_metadata = metadata.column_by_id(*column_id).unwrap();
403        let field = Arc::new(Field::new(
404            &column_metadata.column_schema.name,
405            column_metadata.column_schema.data_type.as_arrow_type(),
406            column_metadata.column_schema.is_nullable(),
407        ));
408        let field = if column_metadata.semantic_type == SemanticType::Tag {
409            tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
410        } else {
411            field
412        };
413        new_fields.push(field);
414    }
415    new_fields.extend_from_slice(&internal_fields());
416
417    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
418}
419
420/// Helper to project compaction batches into flat format columns
421/// (fields + time index + __primary_key + __sequence + __op_type).
422pub(crate) struct CompactionProjectionMapper {
423    mapper: FlatProjectionMapper,
424    assembler: DfBatchAssembler,
425}
426
427impl CompactionProjectionMapper {
428    pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
429        let projection = metadata
430            .column_metadatas
431            .iter()
432            .enumerate()
433            .filter_map(|(idx, col)| {
434                if matches!(col.semantic_type, SemanticType::Field) {
435                    Some(idx)
436                } else {
437                    None
438                }
439            })
440            .chain([metadata.time_index_column_pos()])
441            .collect::<Vec<_>>();
442
443        let mapper = FlatProjectionMapper::new_with_read_columns(
444            metadata,
445            projection,
446            metadata
447                .column_metadatas
448                .iter()
449                .map(|col| col.column_id)
450                .collect(),
451        )?;
452        let assembler = DfBatchAssembler::new(mapper.output_schema());
453
454        Ok(Self { mapper, assembler })
455    }
456
457    /// Projects columns and appends internal columns for compaction output.
458    ///
459    /// The input batch is expected to be in flat format with internal columns appended.
460    pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
461        let columns = self
462            .mapper
463            .project_vectors(&batch)
464            .context(RecordBatchSnafu)?;
465        self.assembler
466            .build_df_record_batch_with_internal(&batch, columns)
467            .context(RecordBatchSnafu)
468    }
469}
470
471/// Builds [DfRecordBatch] with internal columns appended.
472pub(crate) struct DfBatchAssembler {
473    output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
474}
475
476impl DfBatchAssembler {
477    /// Precomputes the output schema with internal columns.
478    pub(crate) fn new(output_schema: SchemaRef) -> Self {
479        let fields = output_schema
480            .arrow_schema()
481            .fields()
482            .into_iter()
483            .chain(internal_fields().iter())
484            .cloned()
485            .collect::<Vec<_>>();
486        let output_arrow_schema_with_internal =
487            Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
488        Self {
489            output_arrow_schema_with_internal,
490        }
491    }
492
493    /// Builds a [DfRecordBatch] from projected vectors plus internal columns.
494    ///
495    /// Assumes the input batch already contains internal columns as the last three fields
496    /// ("__primary_key", "__sequence", "__op_type").
497    pub(crate) fn build_df_record_batch_with_internal(
498        &self,
499        batch: &datatypes::arrow::record_batch::RecordBatch,
500        mut columns: Vec<datatypes::vectors::VectorRef>,
501    ) -> common_recordbatch::error::Result<DfRecordBatch> {
502        let num_columns = batch.columns().len();
503        // The last 3 columns are the internal columns.
504        let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
505        for index in internal_indices.iter() {
506            let array = batch.column(*index).clone();
507            let vector = Helper::try_into_vector(array)
508                .map_err(BoxedError::new)
509                .context(ExternalSnafu)?;
510            columns.push(vector);
511        }
512        RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
513    }
514}