Skip to main content

mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::array::Array;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::Helper;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::{RegionMetadata, RegionMetadataRef};
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
35use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
36use crate::read::read_columns::ReadColumns;
37use crate::sst::parquet::flat_format::sst_column_id_indices;
38use crate::sst::parquet::format::FormatProjection;
39use crate::sst::{
40    FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
41    with_field_id,
42};
43
44/// Handles projection and converts batches in flat format with correct schema.
45///
46/// This mapper support duplicate and unsorted projection indices.
47/// The output schema is determined by the projection indices.
48pub struct FlatProjectionMapper {
49    /// Metadata of the region.
50    metadata: RegionMetadataRef,
51    /// Schema for converted [RecordBatch] to return.
52    output_schema: SchemaRef,
53    /// The columns to read from memtables and SSTs.
54    /// The mapper won't deduplicate the column ids.
55    ///
56    /// Note that this doesn't contain the `__table_id` and `__tsid`.
57    read_cols: ReadColumns,
58    /// Ids and DataTypes of columns of the expected batch.
59    /// We can use this to check if the batch is compatible with the expected schema.
60    ///
61    /// It doesn't contain internal columns but always contains the time index column.
62    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
63    /// `true` If the original projection is empty.
64    is_empty_projection: bool,
65    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
66    batch_indices: Vec<usize>,
67    /// Precomputed Arrow schema for input batches.
68    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
69}
70
71impl FlatProjectionMapper {
72    /// Returns a new mapper with projection.
73    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
74    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
75    /// empty `RecordBatch` and only use its row count in this query.
76    pub fn new(
77        metadata: &RegionMetadataRef,
78        projection: impl IntoIterator<Item = usize>,
79    ) -> Result<Self> {
80        let projection: Vec<_> = projection.into_iter().collect();
81        let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
82        let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
83        Self::new_with_read_columns(metadata, projection, read_cols)
84    }
85
86    /// Returns a new mapper with output projection and explicit read columns.
87    pub fn new_with_read_columns(
88        metadata: &RegionMetadataRef,
89        projection: Vec<usize>,
90        read_cols: ReadColumns,
91    ) -> Result<Self> {
92        // If the original projection is empty.
93        let is_empty_projection = projection.is_empty();
94
95        // Output column schemas for the projection.
96        let mut col_schemas = Vec::with_capacity(projection.len());
97        // Column ids of the output projection without deduplication.
98        let mut output_col_ids = Vec::with_capacity(projection.len());
99        for idx in &projection {
100            let col = metadata
101                .column_metadatas
102                .get(*idx)
103                .with_context(|| InvalidRequestSnafu {
104                    region_id: metadata.region_id,
105                    reason: format!("projection index {} is out of bound", idx),
106                })?;
107            output_col_ids.push(col.column_id);
108            col_schemas.push(col.column_schema.clone());
109        }
110
111        // Creates a map to lookup index.
112        let id_to_index = sst_column_id_indices(metadata);
113
114        // TODO(yingwen): Support different flat schema options.
115        let format_projection = FormatProjection::compute_format_projection(
116            &id_to_index,
117            // All columns with internal columns.
118            metadata.column_metadatas.len() + 3,
119            read_cols.clone(),
120        );
121
122        let batch_schema = flat_projected_columns(metadata, &format_projection);
123
124        // Safety: We get the column id from the metadata.
125        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
126
127        // If projection is empty, we don't output any column.
128        let output_schema = if is_empty_projection {
129            Arc::new(Schema::new(vec![]))
130        } else {
131            // Safety: Columns come from existing schema.
132            Arc::new(Schema::new(col_schemas))
133        };
134
135        let batch_indices = if is_empty_projection {
136            vec![]
137        } else {
138            output_col_ids
139                .iter()
140                .map(|id| {
141                    // Safety: The map is computed from the read projection.
142                    format_projection
143                        .column_id_to_projected_index
144                        .get(id)
145                        .copied()
146                        .with_context(|| {
147                            let name = metadata
148                                .column_by_id(*id)
149                                .map(|column| column.column_schema.name.clone())
150                                .unwrap_or_else(|| id.to_string());
151                            InvalidRequestSnafu {
152                                region_id: metadata.region_id,
153                                reason: format!(
154                                    "output column {} is missing in read projection",
155                                    name
156                                ),
157                            }
158                        })
159                })
160                .collect::<Result<Vec<_>>>()?
161        };
162
163        Ok(FlatProjectionMapper {
164            metadata: metadata.clone(),
165            output_schema,
166            read_cols,
167            batch_schema,
168            is_empty_projection,
169            batch_indices,
170            input_arrow_schema,
171        })
172    }
173
174    /// Returns a new mapper without projection.
175    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
176        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
177    }
178
179    /// Returns the metadata that created the mapper.
180    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
181        &self.metadata
182    }
183    /// Returns projected columns that we need to read from memtables and SSTs.
184    pub(crate) fn read_columns(&self) -> &ReadColumns {
185        &self.read_cols
186    }
187
188    /// Returns the field column start index in output batch.
189    pub(crate) fn field_column_start(&self) -> usize {
190        for (idx, column_id) in self
191            .batch_schema
192            .iter()
193            .map(|(column_id, _)| column_id)
194            .enumerate()
195        {
196            // Safety: We get the column id from the metadata in new().
197            if self
198                .metadata
199                .column_by_id(*column_id)
200                .unwrap()
201                .semantic_type
202                == SemanticType::Field
203            {
204                return idx;
205            }
206        }
207
208        self.batch_schema.len()
209    }
210
211    /// Returns ids of columns of the batch that the mapper expects to convert.
212    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
213        &self.batch_schema
214    }
215
216    /// Returns the input arrow schema from sources.
217    ///
218    /// The merge reader can use this schema.
219    pub(crate) fn input_arrow_schema(
220        &self,
221        compaction: bool,
222    ) -> datatypes::arrow::datatypes::SchemaRef {
223        if !compaction {
224            self.input_arrow_schema.clone()
225        } else {
226            // For compaction, we need to build a different schema from encoding.
227            to_flat_sst_arrow_schema(
228                &self.metadata,
229                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
230            )
231        }
232    }
233
234    /// Returns the schema of converted [RecordBatch].
235    /// This is the schema that the stream will output. This schema may contain
236    /// less columns than [FlatProjectionMapper::column_ids()].
237    pub(crate) fn output_schema(&self) -> SchemaRef {
238        self.output_schema.clone()
239    }
240
241    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
242    ///
243    /// The batch must match the `projection` using to build the mapper.
244    pub(crate) fn convert(
245        &self,
246        batch: &datatypes::arrow::record_batch::RecordBatch,
247        cache_strategy: &CacheStrategy,
248    ) -> common_recordbatch::error::Result<RecordBatch> {
249        if self.is_empty_projection {
250            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
251        }
252        // Construct output record batch directly from Arrow arrays to avoid
253        // Arrow -> Vector -> Arrow roundtrips in the hot path.
254        let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
255        for (output_idx, index) in self.batch_indices.iter().enumerate() {
256            let mut array = batch.column(*index).clone();
257            // Cast dictionary values to the target type.
258            if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
259                // When a string dictionary column contains only a single value, reuse a cached
260                // repeated vector to avoid repeatedly expanding the dictionary.
261                if let Some(dict_array) = single_value_string_dictionary(
262                    &array,
263                    &self.output_schema.column_schemas()[output_idx].data_type,
264                    value_type.as_ref(),
265                ) {
266                    let dict_values = dict_array.values();
267                    let value = if dict_values.is_null(0) {
268                        Value::Null
269                    } else {
270                        Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
271                    };
272
273                    let repeated = repeated_vector_with_cache(
274                        &self.output_schema.column_schemas()[output_idx].data_type,
275                        &value,
276                        batch.num_rows(),
277                        cache_strategy,
278                    )?;
279                    array = repeated.to_arrow_array();
280                } else {
281                    let casted = datatypes::arrow::compute::cast(&array, value_type)
282                        .context(ArrowComputeSnafu)?;
283                    array = casted;
284                }
285            }
286            arrays.push(array);
287        }
288
289        let df_record_batch =
290            DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
291                .context(NewDfRecordBatchSnafu)?;
292        Ok(RecordBatch::from_df_record_batch(
293            self.output_schema.clone(),
294            df_record_batch,
295        ))
296    }
297
298    /// Projects columns from the input batch and converts them into vectors.
299    pub(crate) fn project_vectors(
300        &self,
301        batch: &datatypes::arrow::record_batch::RecordBatch,
302    ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
303        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
304        for index in &self.batch_indices {
305            let mut array = batch.column(*index).clone();
306            // Casts dictionary values to the target type.
307            if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
308                array.data_type()
309            {
310                let casted = datatypes::arrow::compute::cast(&array, value_type)
311                    .context(ArrowComputeSnafu)?;
312                array = casted;
313            }
314            let vector = Helper::try_into_vector(array)
315                .map_err(BoxedError::new)
316                .context(ExternalSnafu)?;
317            columns.push(vector);
318        }
319        Ok(columns)
320    }
321}
322
323fn single_value_string_dictionary<'a>(
324    array: &'a Arc<dyn Array>,
325    output_type: &ConcreteDataType,
326    value_type: &ArrowDataType,
327) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
328    if !matches!(
329        value_type,
330        ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
331    ) || !output_type.is_string()
332    {
333        return None;
334    }
335
336    let dict_array = array
337        .as_any()
338        .downcast_ref::<datatypes::arrow::array::DictionaryArray<
339            datatypes::arrow::datatypes::UInt32Type,
340        >>()?;
341
342    (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
343}
344
345/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
346///
347/// It adds the time index column if it doesn't present in the projection.
348pub(crate) fn flat_projected_columns(
349    metadata: &RegionMetadata,
350    format_projection: &FormatProjection,
351) -> Vec<(ColumnId, ConcreteDataType)> {
352    let time_index = metadata.time_index_column();
353    let num_columns = if format_projection
354        .column_id_to_projected_index
355        .contains_key(&time_index.column_id)
356    {
357        format_projection.column_id_to_projected_index.len()
358    } else {
359        format_projection.column_id_to_projected_index.len() + 1
360    };
361    let mut schema = vec![None; num_columns];
362    for (column_id, index) in &format_projection.column_id_to_projected_index {
363        // Safety: FormatProjection ensures the id is valid.
364        schema[*index] = Some((
365            *column_id,
366            metadata
367                .column_by_id(*column_id)
368                .unwrap()
369                .column_schema
370                .data_type
371                .clone(),
372        ));
373    }
374    if num_columns != format_projection.column_id_to_projected_index.len() {
375        schema[num_columns - 1] = Some((
376            time_index.column_id,
377            time_index.column_schema.data_type.clone(),
378        ));
379    }
380
381    // Safety: FormatProjection ensures all indices can be unwrapped.
382    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
383}
384
385/// Computes the Arrow schema for input batches.
386///
387/// # Panics
388/// Panics if it can't find the column by the column id in the batch_schema.
389pub(crate) fn compute_input_arrow_schema(
390    metadata: &RegionMetadata,
391    batch_schema: &[(ColumnId, ConcreteDataType)],
392) -> datatypes::arrow::datatypes::SchemaRef {
393    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
394    for (column_id, _) in batch_schema {
395        let column_metadata = metadata.column_by_id(*column_id).unwrap();
396        let field = Field::new(
397            &column_metadata.column_schema.name,
398            column_metadata.column_schema.data_type.as_arrow_type(),
399            column_metadata.column_schema.is_nullable(),
400        );
401        let field = with_field_id(field, *column_id);
402        if column_metadata.semantic_type == SemanticType::Tag {
403            new_fields.push(tag_maybe_to_dictionary_field(
404                &column_metadata.column_schema.data_type,
405                &Arc::new(field),
406            ));
407        } else {
408            new_fields.push(Arc::new(field));
409        }
410    }
411    new_fields.extend_from_slice(&internal_fields());
412
413    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
414}
415
416/// Helper to project compaction batches into flat format columns
417/// (fields + time index + __primary_key + __sequence + __op_type).
418pub(crate) struct CompactionProjectionMapper {
419    mapper: FlatProjectionMapper,
420    assembler: DfBatchAssembler,
421}
422
423impl CompactionProjectionMapper {
424    pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
425        let projection = metadata
426            .column_metadatas
427            .iter()
428            .enumerate()
429            .filter_map(|(idx, col)| {
430                if matches!(col.semantic_type, SemanticType::Field) {
431                    Some(idx)
432                } else {
433                    None
434                }
435            })
436            .chain([metadata.time_index_column_pos()])
437            .collect::<Vec<_>>();
438
439        let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
440        let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
441        let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?;
442        let assembler = DfBatchAssembler::new(mapper.output_schema());
443
444        Ok(Self { mapper, assembler })
445    }
446
447    /// Projects columns and appends internal columns for compaction output.
448    ///
449    /// The input batch is expected to be in flat format with internal columns appended.
450    pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
451        let columns = self
452            .mapper
453            .project_vectors(&batch)
454            .context(RecordBatchSnafu)?;
455        self.assembler
456            .build_df_record_batch_with_internal(&batch, columns)
457            .context(RecordBatchSnafu)
458    }
459}
460
461/// Builds [DfRecordBatch] with internal columns appended.
462pub(crate) struct DfBatchAssembler {
463    output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
464}
465
466impl DfBatchAssembler {
467    /// Precomputes the output schema with internal columns.
468    pub(crate) fn new(output_schema: SchemaRef) -> Self {
469        let fields = output_schema
470            .arrow_schema()
471            .fields()
472            .into_iter()
473            .chain(internal_fields().iter())
474            .cloned()
475            .collect::<Vec<_>>();
476        let output_arrow_schema_with_internal =
477            Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
478        Self {
479            output_arrow_schema_with_internal,
480        }
481    }
482
483    /// Builds a [DfRecordBatch] from projected vectors plus internal columns.
484    ///
485    /// Assumes the input batch already contains internal columns as the last three fields
486    /// ("__primary_key", "__sequence", "__op_type").
487    pub(crate) fn build_df_record_batch_with_internal(
488        &self,
489        batch: &datatypes::arrow::record_batch::RecordBatch,
490        mut columns: Vec<datatypes::vectors::VectorRef>,
491    ) -> common_recordbatch::error::Result<DfRecordBatch> {
492        let num_columns = batch.columns().len();
493        // The last 3 columns are the internal columns.
494        let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
495        for index in internal_indices.iter() {
496            let array = batch.column(*index).clone();
497            let vector = Helper::try_into_vector(array)
498                .map_err(BoxedError::new)
499                .context(ExternalSnafu)?;
500            columns.push(vector);
501        }
502        RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
503    }
504}