mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::RecordBatch;
22use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::parquet::flat_format::sst_column_id_indices;
33use crate::sst::parquet::format::FormatProjection;
34use crate::sst::{
35    FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
36};
37
38/// Handles projection and converts batches in flat format with correct schema.
39///
40/// This mapper support duplicate and unsorted projection indices.
41/// The output schema is determined by the projection indices.
42#[allow(dead_code)]
43pub struct FlatProjectionMapper {
44    /// Metadata of the region.
45    metadata: RegionMetadataRef,
46    /// Schema for converted [RecordBatch] to return.
47    output_schema: SchemaRef,
48    /// Ids of columns to project. It keeps ids in the same order as the `projection`
49    /// indices to build the mapper.
50    /// The mapper won't deduplicate the column ids.
51    column_ids: Vec<ColumnId>,
52    /// Ids and DataTypes of columns of the expected batch.
53    /// We can use this to check if the batch is compatible with the expected schema.
54    ///
55    /// It doesn't contain internal columns but always contains the time index column.
56    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
57    /// `true` If the original projection is empty.
58    is_empty_projection: bool,
59    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
60    batch_indices: Vec<usize>,
61    /// Precomputed Arrow schema for input batches.
62    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
63}
64
65impl FlatProjectionMapper {
66    /// Returns a new mapper with projection.
67    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
68    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
69    /// empty `RecordBatch` and only use its row count in this query.
70    pub fn new(
71        metadata: &RegionMetadataRef,
72        projection: impl Iterator<Item = usize>,
73    ) -> Result<Self> {
74        let mut projection: Vec<_> = projection.collect();
75
76        // If the original projection is empty.
77        let is_empty_projection = projection.is_empty();
78        if is_empty_projection {
79            // If the projection is empty, we still read the time index column.
80            projection.push(metadata.time_index_column_pos());
81        }
82
83        // Output column schemas for the projection.
84        let mut column_schemas = Vec::with_capacity(projection.len());
85        // Column ids of the projection without deduplication.
86        let mut column_ids = Vec::with_capacity(projection.len());
87        for idx in &projection {
88            // For each projection index, we get the column id for projection.
89            let column = metadata
90                .column_metadatas
91                .get(*idx)
92                .context(InvalidRequestSnafu {
93                    region_id: metadata.region_id,
94                    reason: format!("projection index {} is out of bound", idx),
95                })?;
96
97            column_ids.push(column.column_id);
98            // Safety: idx is valid.
99            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
100        }
101
102        // Creates a map to lookup index.
103        let id_to_index = sst_column_id_indices(metadata);
104        // TODO(yingwen): Support different flat schema options.
105        let format_projection = FormatProjection::compute_format_projection(
106            &id_to_index,
107            // All columns with internal columns.
108            metadata.column_metadatas.len() + 3,
109            column_ids.iter().copied(),
110        );
111
112        let batch_schema = flat_projected_columns(metadata, &format_projection);
113
114        // Safety: We get the column id from the metadata.
115        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
116
117        if is_empty_projection {
118            // If projection is empty, we don't output any column.
119            return Ok(FlatProjectionMapper {
120                metadata: metadata.clone(),
121                output_schema: Arc::new(Schema::new(vec![])),
122                column_ids,
123                batch_schema: vec![],
124                is_empty_projection,
125                batch_indices: vec![],
126                input_arrow_schema,
127            });
128        }
129
130        // Safety: Columns come from existing schema.
131        let output_schema = Arc::new(Schema::new(column_schemas));
132
133        let batch_indices: Vec<_> = column_ids
134            .iter()
135            .map(|id| {
136                // Safety: The map is computed from `projection` itself.
137                format_projection
138                    .column_id_to_projected_index
139                    .get(id)
140                    .copied()
141                    .unwrap()
142            })
143            .collect();
144
145        Ok(FlatProjectionMapper {
146            metadata: metadata.clone(),
147            output_schema,
148            column_ids,
149            batch_schema,
150            is_empty_projection,
151            batch_indices,
152            input_arrow_schema,
153        })
154    }
155
156    /// Returns a new mapper without projection.
157    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
158        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
159    }
160
161    /// Returns the metadata that created the mapper.
162    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
163        &self.metadata
164    }
165
166    /// Returns ids of projected columns that we need to read
167    /// from memtables and SSTs.
168    pub(crate) fn column_ids(&self) -> &[ColumnId] {
169        &self.column_ids
170    }
171
172    /// Returns the field column start index in output batch.
173    pub(crate) fn field_column_start(&self) -> usize {
174        for (idx, column_id) in self
175            .batch_schema
176            .iter()
177            .map(|(column_id, _)| column_id)
178            .enumerate()
179        {
180            // Safety: We get the column id from the metadata in new().
181            if self
182                .metadata
183                .column_by_id(*column_id)
184                .unwrap()
185                .semantic_type
186                == SemanticType::Field
187            {
188                return idx;
189            }
190        }
191
192        self.batch_schema.len()
193    }
194
195    /// Returns ids of columns of the batch that the mapper expects to convert.
196    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
197        &self.batch_schema
198    }
199
200    /// Returns the input arrow schema from sources.
201    ///
202    /// The merge reader can use this schema.
203    pub(crate) fn input_arrow_schema(
204        &self,
205        compaction: bool,
206    ) -> datatypes::arrow::datatypes::SchemaRef {
207        if !compaction {
208            self.input_arrow_schema.clone()
209        } else {
210            // For compaction, we need to build a different schema from encoding.
211            to_flat_sst_arrow_schema(
212                &self.metadata,
213                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
214            )
215        }
216    }
217
218    /// Returns the schema of converted [RecordBatch].
219    /// This is the schema that the stream will output. This schema may contain
220    /// less columns than [FlatProjectionMapper::column_ids()].
221    pub(crate) fn output_schema(&self) -> SchemaRef {
222        self.output_schema.clone()
223    }
224
225    /// Returns an empty [RecordBatch].
226    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
227        RecordBatch::new_empty(self.output_schema.clone())
228    }
229
230    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
231    ///
232    /// The batch must match the `projection` using to build the mapper.
233    pub(crate) fn convert(
234        &self,
235        batch: &datatypes::arrow::record_batch::RecordBatch,
236    ) -> common_recordbatch::error::Result<RecordBatch> {
237        if self.is_empty_projection {
238            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
239        }
240
241        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
242        for index in &self.batch_indices {
243            let mut array = batch.column(*index).clone();
244            // Casts dictionary values to the target type.
245            if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
246                array.data_type()
247            {
248                let casted = datatypes::arrow::compute::cast(&array, value_type)
249                    .context(ArrowComputeSnafu)?;
250                array = casted;
251            }
252            let vector = Helper::try_into_vector(array)
253                .map_err(BoxedError::new)
254                .context(ExternalSnafu)?;
255            columns.push(vector);
256        }
257
258        RecordBatch::new(self.output_schema.clone(), columns)
259    }
260}
261
262/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
263///
264/// It adds the time index column if it doesn't present in the projection.
265pub(crate) fn flat_projected_columns(
266    metadata: &RegionMetadata,
267    format_projection: &FormatProjection,
268) -> Vec<(ColumnId, ConcreteDataType)> {
269    let time_index = metadata.time_index_column();
270    let num_columns = if format_projection
271        .column_id_to_projected_index
272        .contains_key(&time_index.column_id)
273    {
274        format_projection.column_id_to_projected_index.len()
275    } else {
276        format_projection.column_id_to_projected_index.len() + 1
277    };
278    let mut schema = vec![None; num_columns];
279    for (column_id, index) in &format_projection.column_id_to_projected_index {
280        // Safety: FormatProjection ensures the id is valid.
281        schema[*index] = Some((
282            *column_id,
283            metadata
284                .column_by_id(*column_id)
285                .unwrap()
286                .column_schema
287                .data_type
288                .clone(),
289        ));
290    }
291    if num_columns != format_projection.column_id_to_projected_index.len() {
292        schema[num_columns - 1] = Some((
293            time_index.column_id,
294            time_index.column_schema.data_type.clone(),
295        ));
296    }
297
298    // Safety: FormatProjection ensures all indices can be unwrapped.
299    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
300}
301
302/// Computes the Arrow schema for input batches.
303///
304/// # Panics
305/// Panics if it can't find the column by the column id in the batch_schema.
306fn compute_input_arrow_schema(
307    metadata: &RegionMetadata,
308    batch_schema: &[(ColumnId, ConcreteDataType)],
309) -> datatypes::arrow::datatypes::SchemaRef {
310    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
311    for (column_id, _) in batch_schema {
312        let column_metadata = metadata.column_by_id(*column_id).unwrap();
313        let field = Arc::new(Field::new(
314            &column_metadata.column_schema.name,
315            column_metadata.column_schema.data_type.as_arrow_type(),
316            column_metadata.column_schema.is_nullable(),
317        ));
318        let field = if column_metadata.semantic_type == SemanticType::Tag {
319            tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
320        } else {
321            field
322        };
323        new_fields.push(field);
324    }
325    new_fields.extend_from_slice(&internal_fields());
326
327    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
328}