mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::RecordBatch;
22use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::parquet::flat_format::sst_column_id_indices;
33use crate::sst::parquet::format::FormatProjection;
34use crate::sst::{
35    FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
36};
37
38/// Handles projection and converts batches in flat format with correct schema.
39///
40/// This mapper support duplicate and unsorted projection indices.
41/// The output schema is determined by the projection indices.
42pub struct FlatProjectionMapper {
43    /// Metadata of the region.
44    metadata: RegionMetadataRef,
45    /// Schema for converted [RecordBatch] to return.
46    output_schema: SchemaRef,
47    /// Ids of columns to project. It keeps ids in the same order as the `projection`
48    /// indices to build the mapper.
49    /// The mapper won't deduplicate the column ids.
50    ///
51    /// Note that this doesn't contain the `__table_id` and `__tsid`.
52    column_ids: Vec<ColumnId>,
53    /// Ids and DataTypes of columns of the expected batch.
54    /// We can use this to check if the batch is compatible with the expected schema.
55    ///
56    /// It doesn't contain internal columns but always contains the time index column.
57    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
58    /// `true` If the original projection is empty.
59    is_empty_projection: bool,
60    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
61    batch_indices: Vec<usize>,
62    /// Precomputed Arrow schema for input batches.
63    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
64}
65
66impl FlatProjectionMapper {
67    /// Returns a new mapper with projection.
68    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
69    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
70    /// empty `RecordBatch` and only use its row count in this query.
71    pub fn new(
72        metadata: &RegionMetadataRef,
73        projection: impl Iterator<Item = usize>,
74    ) -> Result<Self> {
75        let mut projection: Vec<_> = projection.collect();
76
77        // If the original projection is empty.
78        let is_empty_projection = projection.is_empty();
79        if is_empty_projection {
80            // If the projection is empty, we still read the time index column.
81            projection.push(metadata.time_index_column_pos());
82        }
83
84        // Output column schemas for the projection.
85        let mut column_schemas = Vec::with_capacity(projection.len());
86        // Column ids of the projection without deduplication.
87        let mut column_ids = Vec::with_capacity(projection.len());
88        for idx in &projection {
89            // For each projection index, we get the column id for projection.
90            let column = metadata
91                .column_metadatas
92                .get(*idx)
93                .context(InvalidRequestSnafu {
94                    region_id: metadata.region_id,
95                    reason: format!("projection index {} is out of bound", idx),
96                })?;
97
98            column_ids.push(column.column_id);
99            // Safety: idx is valid.
100            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
101        }
102
103        // Creates a map to lookup index.
104        let id_to_index = sst_column_id_indices(metadata);
105        // TODO(yingwen): Support different flat schema options.
106        let format_projection = FormatProjection::compute_format_projection(
107            &id_to_index,
108            // All columns with internal columns.
109            metadata.column_metadatas.len() + 3,
110            column_ids.iter().copied(),
111        );
112
113        let batch_schema = flat_projected_columns(metadata, &format_projection);
114
115        // Safety: We get the column id from the metadata.
116        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
117
118        if is_empty_projection {
119            // If projection is empty, we don't output any column.
120            return Ok(FlatProjectionMapper {
121                metadata: metadata.clone(),
122                output_schema: Arc::new(Schema::new(vec![])),
123                column_ids,
124                batch_schema: vec![],
125                is_empty_projection,
126                batch_indices: vec![],
127                input_arrow_schema,
128            });
129        }
130
131        // Safety: Columns come from existing schema.
132        let output_schema = Arc::new(Schema::new(column_schemas));
133
134        let batch_indices: Vec<_> = column_ids
135            .iter()
136            .map(|id| {
137                // Safety: The map is computed from `projection` itself.
138                format_projection
139                    .column_id_to_projected_index
140                    .get(id)
141                    .copied()
142                    .unwrap()
143            })
144            .collect();
145
146        Ok(FlatProjectionMapper {
147            metadata: metadata.clone(),
148            output_schema,
149            column_ids,
150            batch_schema,
151            is_empty_projection,
152            batch_indices,
153            input_arrow_schema,
154        })
155    }
156
157    /// Returns a new mapper without projection.
158    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
159        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
160    }
161
162    /// Returns the metadata that created the mapper.
163    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
164        &self.metadata
165    }
166
167    /// Returns ids of projected columns that we need to read
168    /// from memtables and SSTs.
169    pub(crate) fn column_ids(&self) -> &[ColumnId] {
170        &self.column_ids
171    }
172
173    /// Returns the field column start index in output batch.
174    pub(crate) fn field_column_start(&self) -> usize {
175        for (idx, column_id) in self
176            .batch_schema
177            .iter()
178            .map(|(column_id, _)| column_id)
179            .enumerate()
180        {
181            // Safety: We get the column id from the metadata in new().
182            if self
183                .metadata
184                .column_by_id(*column_id)
185                .unwrap()
186                .semantic_type
187                == SemanticType::Field
188            {
189                return idx;
190            }
191        }
192
193        self.batch_schema.len()
194    }
195
196    /// Returns ids of columns of the batch that the mapper expects to convert.
197    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
198        &self.batch_schema
199    }
200
201    /// Returns the input arrow schema from sources.
202    ///
203    /// The merge reader can use this schema.
204    pub(crate) fn input_arrow_schema(
205        &self,
206        compaction: bool,
207    ) -> datatypes::arrow::datatypes::SchemaRef {
208        if !compaction {
209            self.input_arrow_schema.clone()
210        } else {
211            // For compaction, we need to build a different schema from encoding.
212            to_flat_sst_arrow_schema(
213                &self.metadata,
214                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
215            )
216        }
217    }
218
219    /// Returns the schema of converted [RecordBatch].
220    /// This is the schema that the stream will output. This schema may contain
221    /// less columns than [FlatProjectionMapper::column_ids()].
222    pub(crate) fn output_schema(&self) -> SchemaRef {
223        self.output_schema.clone()
224    }
225
226    /// Returns an empty [RecordBatch].
227    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
228        RecordBatch::new_empty(self.output_schema.clone())
229    }
230
231    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
232    ///
233    /// The batch must match the `projection` using to build the mapper.
234    pub(crate) fn convert(
235        &self,
236        batch: &datatypes::arrow::record_batch::RecordBatch,
237    ) -> common_recordbatch::error::Result<RecordBatch> {
238        if self.is_empty_projection {
239            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
240        }
241
242        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
243        for index in &self.batch_indices {
244            let mut array = batch.column(*index).clone();
245            // Casts dictionary values to the target type.
246            if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
247                array.data_type()
248            {
249                let casted = datatypes::arrow::compute::cast(&array, value_type)
250                    .context(ArrowComputeSnafu)?;
251                array = casted;
252            }
253            let vector = Helper::try_into_vector(array)
254                .map_err(BoxedError::new)
255                .context(ExternalSnafu)?;
256            columns.push(vector);
257        }
258
259        RecordBatch::new(self.output_schema.clone(), columns)
260    }
261}
262
263/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
264///
265/// It adds the time index column if it doesn't present in the projection.
266pub(crate) fn flat_projected_columns(
267    metadata: &RegionMetadata,
268    format_projection: &FormatProjection,
269) -> Vec<(ColumnId, ConcreteDataType)> {
270    let time_index = metadata.time_index_column();
271    let num_columns = if format_projection
272        .column_id_to_projected_index
273        .contains_key(&time_index.column_id)
274    {
275        format_projection.column_id_to_projected_index.len()
276    } else {
277        format_projection.column_id_to_projected_index.len() + 1
278    };
279    let mut schema = vec![None; num_columns];
280    for (column_id, index) in &format_projection.column_id_to_projected_index {
281        // Safety: FormatProjection ensures the id is valid.
282        schema[*index] = Some((
283            *column_id,
284            metadata
285                .column_by_id(*column_id)
286                .unwrap()
287                .column_schema
288                .data_type
289                .clone(),
290        ));
291    }
292    if num_columns != format_projection.column_id_to_projected_index.len() {
293        schema[num_columns - 1] = Some((
294            time_index.column_id,
295            time_index.column_schema.data_type.clone(),
296        ));
297    }
298
299    // Safety: FormatProjection ensures all indices can be unwrapped.
300    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
301}
302
303/// Computes the Arrow schema for input batches.
304///
305/// # Panics
306/// Panics if it can't find the column by the column id in the batch_schema.
307fn compute_input_arrow_schema(
308    metadata: &RegionMetadata,
309    batch_schema: &[(ColumnId, ConcreteDataType)],
310) -> datatypes::arrow::datatypes::SchemaRef {
311    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
312    for (column_id, _) in batch_schema {
313        let column_metadata = metadata.column_by_id(*column_id).unwrap();
314        let field = Arc::new(Field::new(
315            &column_metadata.column_schema.name,
316            column_metadata.column_schema.data_type.as_arrow_type(),
317            column_metadata.column_schema.is_nullable(),
318        ));
319        let field = if column_metadata.semantic_type == SemanticType::Tag {
320            tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
321        } else {
322            field
323        };
324        new_fields.push(field);
325    }
326    new_fields.extend_from_slice(&internal_fields());
327
328    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
329}