mito2/read/
flat_projection.rs1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::ExternalSnafu;
22use common_recordbatch::RecordBatch;
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, Result};
32use crate::sst::internal_fields;
33use crate::sst::parquet::flat_format::sst_column_id_indices;
34use crate::sst::parquet::format::FormatProjection;
35
36#[allow(dead_code)]
41pub struct FlatProjectionMapper {
42 metadata: RegionMetadataRef,
44 output_schema: SchemaRef,
46 column_ids: Vec<ColumnId>,
50 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
53 is_empty_projection: bool,
55 batch_indices: Vec<usize>,
57 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
59}
60
61impl FlatProjectionMapper {
62 pub fn new(
67 metadata: &RegionMetadataRef,
68 projection: impl Iterator<Item = usize>,
69 ) -> Result<Self> {
70 let mut projection: Vec<_> = projection.collect();
71 let is_empty_projection = projection.is_empty();
73 if is_empty_projection {
74 projection.push(metadata.time_index_column_pos());
76 }
77
78 let mut column_schemas = Vec::with_capacity(projection.len());
80 let mut column_ids = Vec::with_capacity(projection.len());
82 for idx in &projection {
83 let column = metadata
85 .column_metadatas
86 .get(*idx)
87 .context(InvalidRequestSnafu {
88 region_id: metadata.region_id,
89 reason: format!("projection index {} is out of bound", idx),
90 })?;
91
92 column_ids.push(column.column_id);
93 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
95 }
96
97 let id_to_index = sst_column_id_indices(metadata);
99 let format_projection = FormatProjection::compute_format_projection(
101 &id_to_index,
102 metadata.column_metadatas.len() + 3,
104 column_ids.iter().copied(),
105 );
106
107 let batch_schema = flat_projected_columns(metadata, &format_projection);
108
109 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
111
112 if is_empty_projection {
113 return Ok(FlatProjectionMapper {
115 metadata: metadata.clone(),
116 output_schema: Arc::new(Schema::new(vec![])),
117 column_ids,
118 batch_schema: vec![],
119 is_empty_projection,
120 batch_indices: vec![],
121 input_arrow_schema,
122 });
123 }
124
125 let output_schema = Arc::new(Schema::new(column_schemas));
127
128 let batch_indices: Vec<_> = column_ids
129 .iter()
130 .map(|id| {
131 format_projection
133 .column_id_to_projected_index
134 .get(id)
135 .copied()
136 .unwrap()
137 })
138 .collect();
139
140 Ok(FlatProjectionMapper {
141 metadata: metadata.clone(),
142 output_schema,
143 column_ids,
144 batch_schema,
145 is_empty_projection,
146 batch_indices,
147 input_arrow_schema,
148 })
149 }
150
151 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
153 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
154 }
155
156 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
158 &self.metadata
159 }
160
161 pub(crate) fn column_ids(&self) -> &[ColumnId] {
164 &self.column_ids
165 }
166
167 pub(crate) fn field_column_start(&self) -> usize {
169 for (idx, column_id) in self
170 .batch_schema
171 .iter()
172 .map(|(column_id, _)| column_id)
173 .enumerate()
174 {
175 if self
177 .metadata
178 .column_by_id(*column_id)
179 .unwrap()
180 .semantic_type
181 == SemanticType::Field
182 {
183 return idx;
184 }
185 }
186
187 self.batch_schema.len()
188 }
189
190 #[allow(dead_code)]
192 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
193 &self.batch_schema
194 }
195
196 pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
197 self.input_arrow_schema.clone()
198 }
199
200 pub(crate) fn output_schema(&self) -> SchemaRef {
204 self.output_schema.clone()
205 }
206
207 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
209 RecordBatch::new_empty(self.output_schema.clone())
210 }
211
212 #[allow(dead_code)]
216 pub(crate) fn convert(
217 &self,
218 batch: &datatypes::arrow::record_batch::RecordBatch,
219 ) -> common_recordbatch::error::Result<RecordBatch> {
220 if self.is_empty_projection {
221 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
222 }
223
224 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
225 for index in &self.batch_indices {
226 let array = batch.column(*index).clone();
227 let vector = Helper::try_into_vector(array)
228 .map_err(BoxedError::new)
229 .context(ExternalSnafu)?;
230 columns.push(vector);
231 }
232
233 RecordBatch::new(self.output_schema.clone(), columns)
234 }
235}
236
237pub(crate) fn flat_projected_columns(
239 metadata: &RegionMetadata,
240 format_projection: &FormatProjection,
241) -> Vec<(ColumnId, ConcreteDataType)> {
242 let mut schema = vec![None; format_projection.column_id_to_projected_index.len()];
243 for (column_id, index) in &format_projection.column_id_to_projected_index {
244 schema[*index] = Some((
246 *column_id,
247 metadata
248 .column_by_id(*column_id)
249 .unwrap()
250 .column_schema
251 .data_type
252 .clone(),
253 ));
254 }
255
256 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
258}
259
260fn compute_input_arrow_schema(
265 metadata: &RegionMetadata,
266 batch_schema: &[(ColumnId, ConcreteDataType)],
267) -> datatypes::arrow::datatypes::SchemaRef {
268 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
269 for (column_id, _) in batch_schema {
270 let column_metadata = metadata.column_by_id(*column_id).unwrap();
271 let field = if column_metadata.semantic_type == SemanticType::Tag {
272 Field::new_dictionary(
273 &column_metadata.column_schema.name,
274 datatypes::arrow::datatypes::DataType::UInt32,
275 column_metadata.column_schema.data_type.as_arrow_type(),
276 column_metadata.column_schema.is_nullable(),
277 )
278 } else {
279 Field::new(
280 &column_metadata.column_schema.name,
281 column_metadata.column_schema.data_type.as_arrow_type(),
282 column_metadata.column_schema.is_nullable(),
283 )
284 };
285 new_fields.push(Arc::new(field));
286 }
287 new_fields.extend_from_slice(&internal_fields());
288
289 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
290}