mito2/read/
flat_projection.rs1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::RecordBatch;
22use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::parquet::flat_format::sst_column_id_indices;
33use crate::sst::parquet::format::FormatProjection;
34use crate::sst::{
35 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
36};
37
38pub struct FlatProjectionMapper {
43 metadata: RegionMetadataRef,
45 output_schema: SchemaRef,
47 column_ids: Vec<ColumnId>,
53 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
58 is_empty_projection: bool,
60 batch_indices: Vec<usize>,
62 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
64}
65
66impl FlatProjectionMapper {
67 pub fn new(
72 metadata: &RegionMetadataRef,
73 projection: impl Iterator<Item = usize>,
74 ) -> Result<Self> {
75 let mut projection: Vec<_> = projection.collect();
76
77 let is_empty_projection = projection.is_empty();
79 if is_empty_projection {
80 projection.push(metadata.time_index_column_pos());
82 }
83
84 let mut column_schemas = Vec::with_capacity(projection.len());
86 let mut column_ids = Vec::with_capacity(projection.len());
88 for idx in &projection {
89 let column = metadata
91 .column_metadatas
92 .get(*idx)
93 .context(InvalidRequestSnafu {
94 region_id: metadata.region_id,
95 reason: format!("projection index {} is out of bound", idx),
96 })?;
97
98 column_ids.push(column.column_id);
99 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
101 }
102
103 let id_to_index = sst_column_id_indices(metadata);
105 let format_projection = FormatProjection::compute_format_projection(
107 &id_to_index,
108 metadata.column_metadatas.len() + 3,
110 column_ids.iter().copied(),
111 );
112
113 let batch_schema = flat_projected_columns(metadata, &format_projection);
114
115 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
117
118 if is_empty_projection {
119 return Ok(FlatProjectionMapper {
121 metadata: metadata.clone(),
122 output_schema: Arc::new(Schema::new(vec![])),
123 column_ids,
124 batch_schema: vec![],
125 is_empty_projection,
126 batch_indices: vec![],
127 input_arrow_schema,
128 });
129 }
130
131 let output_schema = Arc::new(Schema::new(column_schemas));
133
134 let batch_indices: Vec<_> = column_ids
135 .iter()
136 .map(|id| {
137 format_projection
139 .column_id_to_projected_index
140 .get(id)
141 .copied()
142 .unwrap()
143 })
144 .collect();
145
146 Ok(FlatProjectionMapper {
147 metadata: metadata.clone(),
148 output_schema,
149 column_ids,
150 batch_schema,
151 is_empty_projection,
152 batch_indices,
153 input_arrow_schema,
154 })
155 }
156
157 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
159 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
160 }
161
162 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
164 &self.metadata
165 }
166
167 pub(crate) fn column_ids(&self) -> &[ColumnId] {
170 &self.column_ids
171 }
172
173 pub(crate) fn field_column_start(&self) -> usize {
175 for (idx, column_id) in self
176 .batch_schema
177 .iter()
178 .map(|(column_id, _)| column_id)
179 .enumerate()
180 {
181 if self
183 .metadata
184 .column_by_id(*column_id)
185 .unwrap()
186 .semantic_type
187 == SemanticType::Field
188 {
189 return idx;
190 }
191 }
192
193 self.batch_schema.len()
194 }
195
196 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
198 &self.batch_schema
199 }
200
201 pub(crate) fn input_arrow_schema(
205 &self,
206 compaction: bool,
207 ) -> datatypes::arrow::datatypes::SchemaRef {
208 if !compaction {
209 self.input_arrow_schema.clone()
210 } else {
211 to_flat_sst_arrow_schema(
213 &self.metadata,
214 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
215 )
216 }
217 }
218
219 pub(crate) fn output_schema(&self) -> SchemaRef {
223 self.output_schema.clone()
224 }
225
226 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
228 RecordBatch::new_empty(self.output_schema.clone())
229 }
230
231 pub(crate) fn convert(
235 &self,
236 batch: &datatypes::arrow::record_batch::RecordBatch,
237 ) -> common_recordbatch::error::Result<RecordBatch> {
238 if self.is_empty_projection {
239 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
240 }
241
242 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
243 for index in &self.batch_indices {
244 let mut array = batch.column(*index).clone();
245 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
247 array.data_type()
248 {
249 let casted = datatypes::arrow::compute::cast(&array, value_type)
250 .context(ArrowComputeSnafu)?;
251 array = casted;
252 }
253 let vector = Helper::try_into_vector(array)
254 .map_err(BoxedError::new)
255 .context(ExternalSnafu)?;
256 columns.push(vector);
257 }
258
259 RecordBatch::new(self.output_schema.clone(), columns)
260 }
261}
262
263pub(crate) fn flat_projected_columns(
267 metadata: &RegionMetadata,
268 format_projection: &FormatProjection,
269) -> Vec<(ColumnId, ConcreteDataType)> {
270 let time_index = metadata.time_index_column();
271 let num_columns = if format_projection
272 .column_id_to_projected_index
273 .contains_key(&time_index.column_id)
274 {
275 format_projection.column_id_to_projected_index.len()
276 } else {
277 format_projection.column_id_to_projected_index.len() + 1
278 };
279 let mut schema = vec![None; num_columns];
280 for (column_id, index) in &format_projection.column_id_to_projected_index {
281 schema[*index] = Some((
283 *column_id,
284 metadata
285 .column_by_id(*column_id)
286 .unwrap()
287 .column_schema
288 .data_type
289 .clone(),
290 ));
291 }
292 if num_columns != format_projection.column_id_to_projected_index.len() {
293 schema[num_columns - 1] = Some((
294 time_index.column_id,
295 time_index.column_schema.data_type.clone(),
296 ));
297 }
298
299 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
301}
302
303fn compute_input_arrow_schema(
308 metadata: &RegionMetadata,
309 batch_schema: &[(ColumnId, ConcreteDataType)],
310) -> datatypes::arrow::datatypes::SchemaRef {
311 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
312 for (column_id, _) in batch_schema {
313 let column_metadata = metadata.column_by_id(*column_id).unwrap();
314 let field = Arc::new(Field::new(
315 &column_metadata.column_schema.name,
316 column_metadata.column_schema.data_type.as_arrow_type(),
317 column_metadata.column_schema.is_nullable(),
318 ));
319 let field = if column_metadata.semantic_type == SemanticType::Tag {
320 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
321 } else {
322 field
323 };
324 new_fields.push(field);
325 }
326 new_fields.extend_from_slice(&internal_fields());
327
328 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
329}