mito2/read/
flat_projection.rs1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::RecordBatch;
22use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::parquet::flat_format::sst_column_id_indices;
33use crate::sst::parquet::format::FormatProjection;
34use crate::sst::{
35 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
36};
37
38#[allow(dead_code)]
43pub struct FlatProjectionMapper {
44 metadata: RegionMetadataRef,
46 output_schema: SchemaRef,
48 column_ids: Vec<ColumnId>,
52 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
57 is_empty_projection: bool,
59 batch_indices: Vec<usize>,
61 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
63}
64
65impl FlatProjectionMapper {
66 pub fn new(
71 metadata: &RegionMetadataRef,
72 projection: impl Iterator<Item = usize>,
73 ) -> Result<Self> {
74 let mut projection: Vec<_> = projection.collect();
75
76 let is_empty_projection = projection.is_empty();
78 if is_empty_projection {
79 projection.push(metadata.time_index_column_pos());
81 }
82
83 let mut column_schemas = Vec::with_capacity(projection.len());
85 let mut column_ids = Vec::with_capacity(projection.len());
87 for idx in &projection {
88 let column = metadata
90 .column_metadatas
91 .get(*idx)
92 .context(InvalidRequestSnafu {
93 region_id: metadata.region_id,
94 reason: format!("projection index {} is out of bound", idx),
95 })?;
96
97 column_ids.push(column.column_id);
98 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
100 }
101
102 let id_to_index = sst_column_id_indices(metadata);
104 let format_projection = FormatProjection::compute_format_projection(
106 &id_to_index,
107 metadata.column_metadatas.len() + 3,
109 column_ids.iter().copied(),
110 );
111
112 let batch_schema = flat_projected_columns(metadata, &format_projection);
113
114 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
116
117 if is_empty_projection {
118 return Ok(FlatProjectionMapper {
120 metadata: metadata.clone(),
121 output_schema: Arc::new(Schema::new(vec![])),
122 column_ids,
123 batch_schema: vec![],
124 is_empty_projection,
125 batch_indices: vec![],
126 input_arrow_schema,
127 });
128 }
129
130 let output_schema = Arc::new(Schema::new(column_schemas));
132
133 let batch_indices: Vec<_> = column_ids
134 .iter()
135 .map(|id| {
136 format_projection
138 .column_id_to_projected_index
139 .get(id)
140 .copied()
141 .unwrap()
142 })
143 .collect();
144
145 Ok(FlatProjectionMapper {
146 metadata: metadata.clone(),
147 output_schema,
148 column_ids,
149 batch_schema,
150 is_empty_projection,
151 batch_indices,
152 input_arrow_schema,
153 })
154 }
155
156 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
158 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
159 }
160
161 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
163 &self.metadata
164 }
165
166 pub(crate) fn column_ids(&self) -> &[ColumnId] {
169 &self.column_ids
170 }
171
172 pub(crate) fn field_column_start(&self) -> usize {
174 for (idx, column_id) in self
175 .batch_schema
176 .iter()
177 .map(|(column_id, _)| column_id)
178 .enumerate()
179 {
180 if self
182 .metadata
183 .column_by_id(*column_id)
184 .unwrap()
185 .semantic_type
186 == SemanticType::Field
187 {
188 return idx;
189 }
190 }
191
192 self.batch_schema.len()
193 }
194
195 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
197 &self.batch_schema
198 }
199
200 pub(crate) fn input_arrow_schema(
204 &self,
205 compaction: bool,
206 ) -> datatypes::arrow::datatypes::SchemaRef {
207 if !compaction {
208 self.input_arrow_schema.clone()
209 } else {
210 to_flat_sst_arrow_schema(
212 &self.metadata,
213 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
214 )
215 }
216 }
217
218 pub(crate) fn output_schema(&self) -> SchemaRef {
222 self.output_schema.clone()
223 }
224
225 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
227 RecordBatch::new_empty(self.output_schema.clone())
228 }
229
230 pub(crate) fn convert(
234 &self,
235 batch: &datatypes::arrow::record_batch::RecordBatch,
236 ) -> common_recordbatch::error::Result<RecordBatch> {
237 if self.is_empty_projection {
238 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
239 }
240
241 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
242 for index in &self.batch_indices {
243 let mut array = batch.column(*index).clone();
244 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
246 array.data_type()
247 {
248 let casted = datatypes::arrow::compute::cast(&array, value_type)
249 .context(ArrowComputeSnafu)?;
250 array = casted;
251 }
252 let vector = Helper::try_into_vector(array)
253 .map_err(BoxedError::new)
254 .context(ExternalSnafu)?;
255 columns.push(vector);
256 }
257
258 RecordBatch::new(self.output_schema.clone(), columns)
259 }
260}
261
262pub(crate) fn flat_projected_columns(
266 metadata: &RegionMetadata,
267 format_projection: &FormatProjection,
268) -> Vec<(ColumnId, ConcreteDataType)> {
269 let time_index = metadata.time_index_column();
270 let num_columns = if format_projection
271 .column_id_to_projected_index
272 .contains_key(&time_index.column_id)
273 {
274 format_projection.column_id_to_projected_index.len()
275 } else {
276 format_projection.column_id_to_projected_index.len() + 1
277 };
278 let mut schema = vec![None; num_columns];
279 for (column_id, index) in &format_projection.column_id_to_projected_index {
280 schema[*index] = Some((
282 *column_id,
283 metadata
284 .column_by_id(*column_id)
285 .unwrap()
286 .column_schema
287 .data_type
288 .clone(),
289 ));
290 }
291 if num_columns != format_projection.column_id_to_projected_index.len() {
292 schema[num_columns - 1] = Some((
293 time_index.column_id,
294 time_index.column_schema.data_type.clone(),
295 ));
296 }
297
298 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
300}
301
302fn compute_input_arrow_schema(
307 metadata: &RegionMetadata,
308 batch_schema: &[(ColumnId, ConcreteDataType)],
309) -> datatypes::arrow::datatypes::SchemaRef {
310 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
311 for (column_id, _) in batch_schema {
312 let column_metadata = metadata.column_by_id(*column_id).unwrap();
313 let field = Arc::new(Field::new(
314 &column_metadata.column_schema.name,
315 column_metadata.column_schema.data_type.as_arrow_type(),
316 column_metadata.column_schema.is_nullable(),
317 ));
318 let field = if column_metadata.semantic_type == SemanticType::Tag {
319 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
320 } else {
321 field
322 };
323 new_fields.push(field);
324 }
325 new_fields.extend_from_slice(&internal_fields());
326
327 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
328}