1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::datatypes::Field;
24use datatypes::prelude::{ConcreteDataType, DataType};
25use datatypes::schema::{Schema, SchemaRef};
26use datatypes::vectors::Helper;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
32use crate::read::projection::read_column_ids_from_projection;
33use crate::sst::parquet::flat_format::sst_column_id_indices;
34use crate::sst::parquet::format::FormatProjection;
35use crate::sst::{
36 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
37};
38
39pub struct FlatProjectionMapper {
44 metadata: RegionMetadataRef,
46 output_schema: SchemaRef,
48 read_column_ids: Vec<ColumnId>,
53 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
58 is_empty_projection: bool,
60 batch_indices: Vec<usize>,
62 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
64}
65
66impl FlatProjectionMapper {
67 pub fn new(
72 metadata: &RegionMetadataRef,
73 projection: impl Iterator<Item = usize>,
74 ) -> Result<Self> {
75 let projection: Vec<_> = projection.collect();
76 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
77 Self::new_with_read_columns(metadata, projection, read_column_ids)
78 }
79
80 pub fn new_with_read_columns(
82 metadata: &RegionMetadataRef,
83 projection: Vec<usize>,
84 read_column_ids: Vec<ColumnId>,
85 ) -> Result<Self> {
86 let is_empty_projection = projection.is_empty();
88
89 let mut column_schemas = Vec::with_capacity(projection.len());
91 let mut output_column_ids = Vec::with_capacity(projection.len());
93 for idx in &projection {
94 let column =
96 metadata
97 .column_metadatas
98 .get(*idx)
99 .with_context(|| InvalidRequestSnafu {
100 region_id: metadata.region_id,
101 reason: format!("projection index {} is out of bound", idx),
102 })?;
103
104 output_column_ids.push(column.column_id);
105 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
107 }
108
109 let id_to_index = sst_column_id_indices(metadata);
111 let format_projection = FormatProjection::compute_format_projection(
113 &id_to_index,
114 metadata.column_metadatas.len() + 3,
116 read_column_ids.iter().copied(),
117 );
118
119 let batch_schema = flat_projected_columns(metadata, &format_projection);
120
121 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
123
124 let output_schema = if is_empty_projection {
126 Arc::new(Schema::new(vec![]))
127 } else {
128 Arc::new(Schema::new(column_schemas))
130 };
131
132 let batch_indices = if is_empty_projection {
133 vec![]
134 } else {
135 output_column_ids
136 .iter()
137 .map(|id| {
138 format_projection
140 .column_id_to_projected_index
141 .get(id)
142 .copied()
143 .with_context(|| {
144 let name = metadata
145 .column_by_id(*id)
146 .map(|column| column.column_schema.name.clone())
147 .unwrap_or_else(|| id.to_string());
148 InvalidRequestSnafu {
149 region_id: metadata.region_id,
150 reason: format!(
151 "output column {} is missing in read projection",
152 name
153 ),
154 }
155 })
156 })
157 .collect::<Result<Vec<_>>>()?
158 };
159
160 Ok(FlatProjectionMapper {
161 metadata: metadata.clone(),
162 output_schema,
163 read_column_ids,
164 batch_schema,
165 is_empty_projection,
166 batch_indices,
167 input_arrow_schema,
168 })
169 }
170
171 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
173 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
174 }
175
176 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
178 &self.metadata
179 }
180
181 pub(crate) fn column_ids(&self) -> &[ColumnId] {
184 &self.read_column_ids
185 }
186
187 pub(crate) fn field_column_start(&self) -> usize {
189 for (idx, column_id) in self
190 .batch_schema
191 .iter()
192 .map(|(column_id, _)| column_id)
193 .enumerate()
194 {
195 if self
197 .metadata
198 .column_by_id(*column_id)
199 .unwrap()
200 .semantic_type
201 == SemanticType::Field
202 {
203 return idx;
204 }
205 }
206
207 self.batch_schema.len()
208 }
209
210 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
212 &self.batch_schema
213 }
214
215 pub(crate) fn input_arrow_schema(
219 &self,
220 compaction: bool,
221 ) -> datatypes::arrow::datatypes::SchemaRef {
222 if !compaction {
223 self.input_arrow_schema.clone()
224 } else {
225 to_flat_sst_arrow_schema(
227 &self.metadata,
228 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
229 )
230 }
231 }
232
233 pub(crate) fn output_schema(&self) -> SchemaRef {
237 self.output_schema.clone()
238 }
239
240 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
242 RecordBatch::new_empty(self.output_schema.clone())
243 }
244
245 pub(crate) fn convert(
249 &self,
250 batch: &datatypes::arrow::record_batch::RecordBatch,
251 ) -> common_recordbatch::error::Result<RecordBatch> {
252 if self.is_empty_projection {
253 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
254 }
255 let columns = self.project_vectors(batch)?;
256 RecordBatch::new(self.output_schema.clone(), columns)
257 }
258
259 pub(crate) fn project_vectors(
261 &self,
262 batch: &datatypes::arrow::record_batch::RecordBatch,
263 ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
264 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
265 for index in &self.batch_indices {
266 let mut array = batch.column(*index).clone();
267 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
269 array.data_type()
270 {
271 let casted = datatypes::arrow::compute::cast(&array, value_type)
272 .context(ArrowComputeSnafu)?;
273 array = casted;
274 }
275 let vector = Helper::try_into_vector(array)
276 .map_err(BoxedError::new)
277 .context(ExternalSnafu)?;
278 columns.push(vector);
279 }
280 Ok(columns)
281 }
282}
283
284pub(crate) fn flat_projected_columns(
288 metadata: &RegionMetadata,
289 format_projection: &FormatProjection,
290) -> Vec<(ColumnId, ConcreteDataType)> {
291 let time_index = metadata.time_index_column();
292 let num_columns = if format_projection
293 .column_id_to_projected_index
294 .contains_key(&time_index.column_id)
295 {
296 format_projection.column_id_to_projected_index.len()
297 } else {
298 format_projection.column_id_to_projected_index.len() + 1
299 };
300 let mut schema = vec![None; num_columns];
301 for (column_id, index) in &format_projection.column_id_to_projected_index {
302 schema[*index] = Some((
304 *column_id,
305 metadata
306 .column_by_id(*column_id)
307 .unwrap()
308 .column_schema
309 .data_type
310 .clone(),
311 ));
312 }
313 if num_columns != format_projection.column_id_to_projected_index.len() {
314 schema[num_columns - 1] = Some((
315 time_index.column_id,
316 time_index.column_schema.data_type.clone(),
317 ));
318 }
319
320 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
322}
323
324fn compute_input_arrow_schema(
329 metadata: &RegionMetadata,
330 batch_schema: &[(ColumnId, ConcreteDataType)],
331) -> datatypes::arrow::datatypes::SchemaRef {
332 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
333 for (column_id, _) in batch_schema {
334 let column_metadata = metadata.column_by_id(*column_id).unwrap();
335 let field = Arc::new(Field::new(
336 &column_metadata.column_schema.name,
337 column_metadata.column_schema.data_type.as_arrow_type(),
338 column_metadata.column_schema.is_nullable(),
339 ));
340 let field = if column_metadata.semantic_type == SemanticType::Tag {
341 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
342 } else {
343 field
344 };
345 new_fields.push(field);
346 }
347 new_fields.extend_from_slice(&internal_fields());
348
349 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
350}
351
352pub(crate) struct CompactionProjectionMapper {
355 mapper: FlatProjectionMapper,
356 assembler: DfBatchAssembler,
357}
358
359impl CompactionProjectionMapper {
360 pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
361 let projection = metadata
362 .column_metadatas
363 .iter()
364 .enumerate()
365 .filter_map(|(idx, col)| {
366 if matches!(col.semantic_type, SemanticType::Field) {
367 Some(idx)
368 } else {
369 None
370 }
371 })
372 .chain([metadata.time_index_column_pos()])
373 .collect::<Vec<_>>();
374
375 let mapper = FlatProjectionMapper::new_with_read_columns(
376 metadata,
377 projection,
378 metadata
379 .column_metadatas
380 .iter()
381 .map(|col| col.column_id)
382 .collect(),
383 )?;
384 let assembler = DfBatchAssembler::new(mapper.output_schema());
385
386 Ok(Self { mapper, assembler })
387 }
388
389 pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
393 let columns = self
394 .mapper
395 .project_vectors(&batch)
396 .context(RecordBatchSnafu)?;
397 self.assembler
398 .build_df_record_batch_with_internal(&batch, columns)
399 .context(RecordBatchSnafu)
400 }
401}
402
403pub(crate) struct DfBatchAssembler {
405 output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
406}
407
408impl DfBatchAssembler {
409 pub(crate) fn new(output_schema: SchemaRef) -> Self {
411 let fields = output_schema
412 .arrow_schema()
413 .fields()
414 .into_iter()
415 .chain(internal_fields().iter())
416 .cloned()
417 .collect::<Vec<_>>();
418 let output_arrow_schema_with_internal =
419 Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
420 Self {
421 output_arrow_schema_with_internal,
422 }
423 }
424
425 pub(crate) fn build_df_record_batch_with_internal(
430 &self,
431 batch: &datatypes::arrow::record_batch::RecordBatch,
432 mut columns: Vec<datatypes::vectors::VectorRef>,
433 ) -> common_recordbatch::error::Result<DfRecordBatch> {
434 let num_columns = batch.columns().len();
435 let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
437 for index in internal_indices.iter() {
438 let array = batch.column(*index).clone();
439 let vector = Helper::try_into_vector(array)
440 .map_err(BoxedError::new)
441 .context(ExternalSnafu)?;
442 columns.push(vector);
443 }
444 RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
445 }
446}