mito2/read/
flat_projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection on flat format.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
32use crate::read::projection::read_column_ids_from_projection;
33use crate::sst::parquet::flat_format::sst_column_id_indices;
34use crate::sst::parquet::format::FormatProjection;
35use crate::sst::{
36    FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
37};
38
39/// Handles projection and converts batches in flat format with correct schema.
40///
41/// This mapper support duplicate and unsorted projection indices.
42/// The output schema is determined by the projection indices.
43pub struct FlatProjectionMapper {
44    /// Metadata of the region.
45    metadata: RegionMetadataRef,
46    /// Schema for converted [RecordBatch] to return.
47    output_schema: SchemaRef,
48    /// Ids of columns to read from memtables and SSTs.
49    /// The mapper won't deduplicate the column ids.
50    ///
51    /// Note that this doesn't contain the `__table_id` and `__tsid`.
52    read_column_ids: Vec<ColumnId>,
53    /// Ids and DataTypes of columns of the expected batch.
54    /// We can use this to check if the batch is compatible with the expected schema.
55    ///
56    /// It doesn't contain internal columns but always contains the time index column.
57    batch_schema: Vec<(ColumnId, ConcreteDataType)>,
58    /// `true` If the original projection is empty.
59    is_empty_projection: bool,
60    /// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
61    batch_indices: Vec<usize>,
62    /// Precomputed Arrow schema for input batches.
63    input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
64}
65
66impl FlatProjectionMapper {
67    /// Returns a new mapper with projection.
68    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
69    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
70    /// empty `RecordBatch` and only use its row count in this query.
71    pub fn new(
72        metadata: &RegionMetadataRef,
73        projection: impl Iterator<Item = usize>,
74    ) -> Result<Self> {
75        let projection: Vec<_> = projection.collect();
76        let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
77        Self::new_with_read_columns(metadata, projection, read_column_ids)
78    }
79
80    /// Returns a new mapper with output projection and explicit read columns.
81    pub fn new_with_read_columns(
82        metadata: &RegionMetadataRef,
83        projection: Vec<usize>,
84        read_column_ids: Vec<ColumnId>,
85    ) -> Result<Self> {
86        // If the original projection is empty.
87        let is_empty_projection = projection.is_empty();
88
89        // Output column schemas for the projection.
90        let mut column_schemas = Vec::with_capacity(projection.len());
91        // Column ids of the output projection without deduplication.
92        let mut output_column_ids = Vec::with_capacity(projection.len());
93        for idx in &projection {
94            // For each projection index, we get the column id for projection.
95            let column =
96                metadata
97                    .column_metadatas
98                    .get(*idx)
99                    .with_context(|| InvalidRequestSnafu {
100                        region_id: metadata.region_id,
101                        reason: format!("projection index {} is out of bound", idx),
102                    })?;
103
104            output_column_ids.push(column.column_id);
105            // Safety: idx is valid.
106            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
107        }
108
109        // Creates a map to lookup index.
110        let id_to_index = sst_column_id_indices(metadata);
111        // TODO(yingwen): Support different flat schema options.
112        let format_projection = FormatProjection::compute_format_projection(
113            &id_to_index,
114            // All columns with internal columns.
115            metadata.column_metadatas.len() + 3,
116            read_column_ids.iter().copied(),
117        );
118
119        let batch_schema = flat_projected_columns(metadata, &format_projection);
120
121        // Safety: We get the column id from the metadata.
122        let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
123
124        // If projection is empty, we don't output any column.
125        let output_schema = if is_empty_projection {
126            Arc::new(Schema::new(vec![]))
127        } else {
128            // Safety: Columns come from existing schema.
129            Arc::new(Schema::new(column_schemas))
130        };
131
132        let batch_indices = if is_empty_projection {
133            vec![]
134        } else {
135            output_column_ids
136                .iter()
137                .map(|id| {
138                    // Safety: The map is computed from the read projection.
139                    format_projection
140                        .column_id_to_projected_index
141                        .get(id)
142                        .copied()
143                        .with_context(|| {
144                            let name = metadata
145                                .column_by_id(*id)
146                                .map(|column| column.column_schema.name.clone())
147                                .unwrap_or_else(|| id.to_string());
148                            InvalidRequestSnafu {
149                                region_id: metadata.region_id,
150                                reason: format!(
151                                    "output column {} is missing in read projection",
152                                    name
153                                ),
154                            }
155                        })
156                })
157                .collect::<Result<Vec<_>>>()?
158        };
159
160        Ok(FlatProjectionMapper {
161            metadata: metadata.clone(),
162            output_schema,
163            read_column_ids,
164            batch_schema,
165            is_empty_projection,
166            batch_indices,
167            input_arrow_schema,
168        })
169    }
170
171    /// Returns a new mapper without projection.
172    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
173        FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
174    }
175
176    /// Returns the metadata that created the mapper.
177    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
178        &self.metadata
179    }
180
181    /// Returns ids of projected columns that we need to read
182    /// from memtables and SSTs.
183    pub(crate) fn column_ids(&self) -> &[ColumnId] {
184        &self.read_column_ids
185    }
186
187    /// Returns the field column start index in output batch.
188    pub(crate) fn field_column_start(&self) -> usize {
189        for (idx, column_id) in self
190            .batch_schema
191            .iter()
192            .map(|(column_id, _)| column_id)
193            .enumerate()
194        {
195            // Safety: We get the column id from the metadata in new().
196            if self
197                .metadata
198                .column_by_id(*column_id)
199                .unwrap()
200                .semantic_type
201                == SemanticType::Field
202            {
203                return idx;
204            }
205        }
206
207        self.batch_schema.len()
208    }
209
210    /// Returns ids of columns of the batch that the mapper expects to convert.
211    pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
212        &self.batch_schema
213    }
214
215    /// Returns the input arrow schema from sources.
216    ///
217    /// The merge reader can use this schema.
218    pub(crate) fn input_arrow_schema(
219        &self,
220        compaction: bool,
221    ) -> datatypes::arrow::datatypes::SchemaRef {
222        if !compaction {
223            self.input_arrow_schema.clone()
224        } else {
225            // For compaction, we need to build a different schema from encoding.
226            to_flat_sst_arrow_schema(
227                &self.metadata,
228                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
229            )
230        }
231    }
232
233    /// Returns the schema of converted [RecordBatch].
234    /// This is the schema that the stream will output. This schema may contain
235    /// less columns than [FlatProjectionMapper::column_ids()].
236    pub(crate) fn output_schema(&self) -> SchemaRef {
237        self.output_schema.clone()
238    }
239
240    /// Returns an empty [RecordBatch].
241    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
242        RecordBatch::new_empty(self.output_schema.clone())
243    }
244
245    /// Converts a flat format [RecordBatch] to a normal [RecordBatch].
246    ///
247    /// The batch must match the `projection` using to build the mapper.
248    pub(crate) fn convert(
249        &self,
250        batch: &datatypes::arrow::record_batch::RecordBatch,
251    ) -> common_recordbatch::error::Result<RecordBatch> {
252        if self.is_empty_projection {
253            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
254        }
255        let columns = self.project_vectors(batch)?;
256        RecordBatch::new(self.output_schema.clone(), columns)
257    }
258
259    /// Projects columns from the input batch and converts them into vectors.
260    pub(crate) fn project_vectors(
261        &self,
262        batch: &datatypes::arrow::record_batch::RecordBatch,
263    ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
264        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
265        for index in &self.batch_indices {
266            let mut array = batch.column(*index).clone();
267            // Casts dictionary values to the target type.
268            if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
269                array.data_type()
270            {
271                let casted = datatypes::arrow::compute::cast(&array, value_type)
272                    .context(ArrowComputeSnafu)?;
273                array = casted;
274            }
275            let vector = Helper::try_into_vector(array)
276                .map_err(BoxedError::new)
277                .context(ExternalSnafu)?;
278            columns.push(vector);
279        }
280        Ok(columns)
281    }
282}
283
284/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
285///
286/// It adds the time index column if it doesn't present in the projection.
287pub(crate) fn flat_projected_columns(
288    metadata: &RegionMetadata,
289    format_projection: &FormatProjection,
290) -> Vec<(ColumnId, ConcreteDataType)> {
291    let time_index = metadata.time_index_column();
292    let num_columns = if format_projection
293        .column_id_to_projected_index
294        .contains_key(&time_index.column_id)
295    {
296        format_projection.column_id_to_projected_index.len()
297    } else {
298        format_projection.column_id_to_projected_index.len() + 1
299    };
300    let mut schema = vec![None; num_columns];
301    for (column_id, index) in &format_projection.column_id_to_projected_index {
302        // Safety: FormatProjection ensures the id is valid.
303        schema[*index] = Some((
304            *column_id,
305            metadata
306                .column_by_id(*column_id)
307                .unwrap()
308                .column_schema
309                .data_type
310                .clone(),
311        ));
312    }
313    if num_columns != format_projection.column_id_to_projected_index.len() {
314        schema[num_columns - 1] = Some((
315            time_index.column_id,
316            time_index.column_schema.data_type.clone(),
317        ));
318    }
319
320    // Safety: FormatProjection ensures all indices can be unwrapped.
321    schema.into_iter().map(|id_type| id_type.unwrap()).collect()
322}
323
324/// Computes the Arrow schema for input batches.
325///
326/// # Panics
327/// Panics if it can't find the column by the column id in the batch_schema.
328fn compute_input_arrow_schema(
329    metadata: &RegionMetadata,
330    batch_schema: &[(ColumnId, ConcreteDataType)],
331) -> datatypes::arrow::datatypes::SchemaRef {
332    let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
333    for (column_id, _) in batch_schema {
334        let column_metadata = metadata.column_by_id(*column_id).unwrap();
335        let field = Arc::new(Field::new(
336            &column_metadata.column_schema.name,
337            column_metadata.column_schema.data_type.as_arrow_type(),
338            column_metadata.column_schema.is_nullable(),
339        ));
340        let field = if column_metadata.semantic_type == SemanticType::Tag {
341            tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
342        } else {
343            field
344        };
345        new_fields.push(field);
346    }
347    new_fields.extend_from_slice(&internal_fields());
348
349    Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
350}
351
352/// Helper to project compaction batches into flat format columns
353/// (fields + time index + __primary_key + __sequence + __op_type).
354pub(crate) struct CompactionProjectionMapper {
355    mapper: FlatProjectionMapper,
356    assembler: DfBatchAssembler,
357}
358
359impl CompactionProjectionMapper {
360    pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
361        let projection = metadata
362            .column_metadatas
363            .iter()
364            .enumerate()
365            .filter_map(|(idx, col)| {
366                if matches!(col.semantic_type, SemanticType::Field) {
367                    Some(idx)
368                } else {
369                    None
370                }
371            })
372            .chain([metadata.time_index_column_pos()])
373            .collect::<Vec<_>>();
374
375        let mapper = FlatProjectionMapper::new_with_read_columns(
376            metadata,
377            projection,
378            metadata
379                .column_metadatas
380                .iter()
381                .map(|col| col.column_id)
382                .collect(),
383        )?;
384        let assembler = DfBatchAssembler::new(mapper.output_schema());
385
386        Ok(Self { mapper, assembler })
387    }
388
389    /// Projects columns and appends internal columns for compaction output.
390    ///
391    /// The input batch is expected to be in flat format with internal columns appended.
392    pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
393        let columns = self
394            .mapper
395            .project_vectors(&batch)
396            .context(RecordBatchSnafu)?;
397        self.assembler
398            .build_df_record_batch_with_internal(&batch, columns)
399            .context(RecordBatchSnafu)
400    }
401}
402
403/// Builds [DfRecordBatch] with internal columns appended.
404pub(crate) struct DfBatchAssembler {
405    output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
406}
407
408impl DfBatchAssembler {
409    /// Precomputes the output schema with internal columns.
410    pub(crate) fn new(output_schema: SchemaRef) -> Self {
411        let fields = output_schema
412            .arrow_schema()
413            .fields()
414            .into_iter()
415            .chain(internal_fields().iter())
416            .cloned()
417            .collect::<Vec<_>>();
418        let output_arrow_schema_with_internal =
419            Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
420        Self {
421            output_arrow_schema_with_internal,
422        }
423    }
424
425    /// Builds a [DfRecordBatch] from projected vectors plus internal columns.
426    ///
427    /// Assumes the input batch already contains internal columns as the last three fields
428    /// ("__primary_key", "__sequence", "__op_type").
429    pub(crate) fn build_df_record_batch_with_internal(
430        &self,
431        batch: &datatypes::arrow::record_batch::RecordBatch,
432        mut columns: Vec<datatypes::vectors::VectorRef>,
433    ) -> common_recordbatch::error::Result<DfRecordBatch> {
434        let num_columns = batch.columns().len();
435        // The last 3 columns are the internal columns.
436        let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
437        for index in internal_indices.iter() {
438            let array = batch.column(*index).clone();
439            let vector = Helper::try_into_vector(array)
440                .map_err(BoxedError::new)
441                .context(ExternalSnafu)?;
442            columns.push(vector);
443        }
444        RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
445    }
446}