mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::ExternalSnafu;
22use common_recordbatch::RecordBatch;
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::internal_fields;
33use crate::sst::parquet::flat_format::sst_column_id_indices;
34use crate::sst::parquet::format::FormatProjection;
35
36/// Handles projection and converts batches in flat format with correct schema.
37///
38/// This mapper support duplicate and unsorted projection indices.
39/// The output schema is determined by the projection indices.
40#[allow(dead_code)]
41pub struct FlatProjectionMapper {
42    /// Metadata of the region.
43    metadata: RegionMetadataRef,
44    /// Schema for converted [RecordBatch] to return.
45    output_schema: SchemaRef,
46    /// Ids of columns to project. It keeps ids in the same order as the `projection`
47    /// indices to build the mapper.
48    /// The mapper won't deduplicate the column ids.
49    column_ids: Vec<ColumnId>,
50    /// Ids and DataTypes of columns of the expected batch.
51    /// We can use this to check if the batch is compatible with the expected schema.
52    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
53    /// `true` If the original projection is empty.
54    is_empty_projection: bool,
55    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
56    batch_indices: Vec<usize>,
57    /// Precomputed Arrow schema for input batches.
58    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
59}
60
61impl FlatProjectionMapper {
62    /// Returns a new mapper with projection.
63    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
64    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
65    /// empty `RecordBatch` and only use its row count in this query.
66    pub fn new(
67        metadata: &RegionMetadataRef,
68        projection: impl Iterator<Item = usize>,
69    ) -> Result<Self> {
70        let mut projection: Vec<_> = projection.collect();
71        // If the original projection is empty.
72        let is_empty_projection = projection.is_empty();
73        if is_empty_projection {
74            // If the projection is empty, we still read the time index column.
75            projection.push(metadata.time_index_column_pos());
76        }
77
78        // Output column schemas for the projection.
79        let mut column_schemas = Vec::with_capacity(projection.len());
80        // Column ids of the projection without deduplication.
81        let mut column_ids = Vec::with_capacity(projection.len());
82        for idx in &projection {
83            // For each projection index, we get the column id for projection.
84            let column = metadata
85                .column_metadatas
86                .get(*idx)
87                .context(InvalidRequestSnafu {
88                    region_id: metadata.region_id,
89                    reason: format!("projection index {} is out of bound", idx),
90                })?;
91
92            column_ids.push(column.column_id);
93            // Safety: idx is valid.
94            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
95        }
96
97        // Creates a map to lookup index.
98        let id_to_index = sst_column_id_indices(metadata);
99        // TODO(yingwen): Support different flat schema options.
100        let format_projection = FormatProjection::compute_format_projection(
101            &id_to_index,
102            // All columns with internal columns.
103            metadata.column_metadatas.len() + 3,
104            column_ids.iter().copied(),
105        );
106
107        let batch_schema = flat_projected_columns(metadata, &format_projection);
108
109        // Safety: We get the column id from the metadata.
110        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
111
112        if is_empty_projection {
113            // If projection is empty, we don't output any column.
114            return Ok(FlatProjectionMapper {
115                metadata: metadata.clone(),
116                output_schema: Arc::new(Schema::new(vec![])),
117                column_ids,
118                batch_schema: vec![],
119                is_empty_projection,
120                batch_indices: vec![],
121                input_arrow_schema,
122            });
123        }
124
125        // Safety: Columns come from existing schema.
126        let output_schema = Arc::new(Schema::new(column_schemas));
127
128        let batch_indices: Vec<_> = column_ids
129            .iter()
130            .map(|id| {
131                // Safety: The map is computed from `projection` itself.
132                format_projection
133                    .column_id_to_projected_index
134                    .get(id)
135                    .copied()
136                    .unwrap()
137            })
138            .collect();
139
140        Ok(FlatProjectionMapper {
141            metadata: metadata.clone(),
142            output_schema,
143            column_ids,
144            batch_schema,
145            is_empty_projection,
146            batch_indices,
147            input_arrow_schema,
148        })
149    }
150
151    /// Returns a new mapper without projection.
152    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
153        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
154    }
155
156    /// Returns the metadata that created the mapper.
157    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
158        &self.metadata
159    }
160
161    /// Returns ids of projected columns that we need to read
162    /// from memtables and SSTs.
163    pub(crate) fn column_ids(&self) -> &[ColumnId] {
164        &self.column_ids
165    }
166
167    /// Returns the field column start index in output batch.
168    pub(crate) fn field_column_start(&self) -> usize {
169        for (idx, column_id) in self
170            .batch_schema
171            .iter()
172            .map(|(column_id, _)| column_id)
173            .enumerate()
174        {
175            // Safety: We get the column id from the metadata in new().
176            if self
177                .metadata
178                .column_by_id(*column_id)
179                .unwrap()
180                .semantic_type
181                == SemanticType::Field
182            {
183                return idx;
184            }
185        }
186
187        self.batch_schema.len()
188    }
189
190    /// Returns ids of columns of the batch that the mapper expects to convert.
191    #[allow(dead_code)]
192    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
193        &self.batch_schema
194    }
195
196    pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
197        self.input_arrow_schema.clone()
198    }
199
200    /// Returns the schema of converted [RecordBatch].
201    /// This is the schema that the stream will output. This schema may contain
202    /// less columns than [FlatProjectionMapper::column_ids()].
203    pub(crate) fn output_schema(&self) -> SchemaRef {
204        self.output_schema.clone()
205    }
206
207    /// Returns an empty [RecordBatch].
208    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
209        RecordBatch::new_empty(self.output_schema.clone())
210    }
211
212    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
213    ///
214    /// The batch must match the `projection` using to build the mapper.
215    #[allow(dead_code)]
216    pub(crate) fn convert(
217        &self,
218        batch: &datatypes::arrow::record_batch::RecordBatch,
219    ) -> common_recordbatch::error::Result<RecordBatch> {
220        if self.is_empty_projection {
221            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
222        }
223
224        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
225        for index in &self.batch_indices {
226            let array = batch.column(*index).clone();
227            let vector = Helper::try_into_vector(array)
228                .map_err(BoxedError::new)
229                .context(ExternalSnafu)?;
230            columns.push(vector);
231        }
232
233        RecordBatch::new(self.output_schema.clone(), columns)
234    }
235}
236
237/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
238pub(crate) fn flat_projected_columns(
239    metadata: &RegionMetadata,
240    format_projection: &FormatProjection,
241) -> Vec<(ColumnId, ConcreteDataType)> {
242    let mut schema = vec![None; format_projection.column_id_to_projected_index.len()];
243    for (column_id, index) in &format_projection.column_id_to_projected_index {
244        // Safety: FormatProjection ensures the id is valid.
245        schema[*index] = Some((
246            *column_id,
247            metadata
248                .column_by_id(*column_id)
249                .unwrap()
250                .column_schema
251                .data_type
252                .clone(),
253        ));
254    }
255
256    // Safety: FormatProjection ensures all indices can be unwrapped.
257    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
258}
259
260/// Computes the Arrow schema for input batches.
261///
262/// # Panics
263/// Panics if it can't find the column by the column id in the batch_schema.
264fn compute_input_arrow_schema(
265    metadata: &RegionMetadata,
266    batch_schema: &[(ColumnId, ConcreteDataType)],
267) -> datatypes::arrow::datatypes::SchemaRef {
268    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
269    for (column_id, _) in batch_schema {
270        let column_metadata = metadata.column_by_id(*column_id).unwrap();
271        let field = if column_metadata.semantic_type == SemanticType::Tag {
272            Field::new_dictionary(
273                &column_metadata.column_schema.name,
274                datatypes::arrow::datatypes::DataType::UInt32,
275                column_metadata.column_schema.data_type.as_arrow_type(),
276                column_metadata.column_schema.is_nullable(),
277            )
278        } else {
279            Field::new(
280                &column_metadata.column_schema.name,
281                column_metadata.column_schema.data_type.as_arrow_type(),
282                column_metadata.column_schema.is_nullable(),
283            )
284        };
285        new_fields.push(Arc::new(field));
286    }
287    new_fields.extend_from_slice(&internal_fields());
288
289    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
290}