1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::array::Array;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::Helper;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::{RegionMetadata, RegionMetadataRef};
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
35use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
36use crate::sst::parquet::flat_format::sst_column_id_indices;
37use crate::sst::parquet::format::FormatProjection;
38use crate::sst::{
39 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
40};
41
42pub struct FlatProjectionMapper {
47 metadata: RegionMetadataRef,
49 output_schema: SchemaRef,
51 read_column_ids: Vec<ColumnId>,
56 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
61 is_empty_projection: bool,
63 batch_indices: Vec<usize>,
65 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
67}
68
69impl FlatProjectionMapper {
70 pub fn new(
75 metadata: &RegionMetadataRef,
76 projection: impl Iterator<Item = usize>,
77 ) -> Result<Self> {
78 let projection: Vec<_> = projection.collect();
79 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
80 Self::new_with_read_columns(metadata, projection, read_column_ids)
81 }
82
83 pub fn new_with_read_columns(
85 metadata: &RegionMetadataRef,
86 projection: Vec<usize>,
87 read_column_ids: Vec<ColumnId>,
88 ) -> Result<Self> {
89 let is_empty_projection = projection.is_empty();
91
92 let mut column_schemas = Vec::with_capacity(projection.len());
94 let mut output_column_ids = Vec::with_capacity(projection.len());
96 for idx in &projection {
97 let column =
99 metadata
100 .column_metadatas
101 .get(*idx)
102 .with_context(|| InvalidRequestSnafu {
103 region_id: metadata.region_id,
104 reason: format!("projection index {} is out of bound", idx),
105 })?;
106
107 output_column_ids.push(column.column_id);
108 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
110 }
111
112 let id_to_index = sst_column_id_indices(metadata);
114 let format_projection = FormatProjection::compute_format_projection(
116 &id_to_index,
117 metadata.column_metadatas.len() + 3,
119 read_column_ids.iter().copied(),
120 );
121
122 let batch_schema = flat_projected_columns(metadata, &format_projection);
123
124 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
126
127 let output_schema = if is_empty_projection {
129 Arc::new(Schema::new(vec![]))
130 } else {
131 Arc::new(Schema::new(column_schemas))
133 };
134
135 let batch_indices = if is_empty_projection {
136 vec![]
137 } else {
138 output_column_ids
139 .iter()
140 .map(|id| {
141 format_projection
143 .column_id_to_projected_index
144 .get(id)
145 .copied()
146 .with_context(|| {
147 let name = metadata
148 .column_by_id(*id)
149 .map(|column| column.column_schema.name.clone())
150 .unwrap_or_else(|| id.to_string());
151 InvalidRequestSnafu {
152 region_id: metadata.region_id,
153 reason: format!(
154 "output column {} is missing in read projection",
155 name
156 ),
157 }
158 })
159 })
160 .collect::<Result<Vec<_>>>()?
161 };
162
163 Ok(FlatProjectionMapper {
164 metadata: metadata.clone(),
165 output_schema,
166 read_column_ids,
167 batch_schema,
168 is_empty_projection,
169 batch_indices,
170 input_arrow_schema,
171 })
172 }
173
174 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
176 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
177 }
178
179 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
181 &self.metadata
182 }
183
184 pub(crate) fn column_ids(&self) -> &[ColumnId] {
187 &self.read_column_ids
188 }
189
190 pub(crate) fn field_column_start(&self) -> usize {
192 for (idx, column_id) in self
193 .batch_schema
194 .iter()
195 .map(|(column_id, _)| column_id)
196 .enumerate()
197 {
198 if self
200 .metadata
201 .column_by_id(*column_id)
202 .unwrap()
203 .semantic_type
204 == SemanticType::Field
205 {
206 return idx;
207 }
208 }
209
210 self.batch_schema.len()
211 }
212
213 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
215 &self.batch_schema
216 }
217
218 pub(crate) fn input_arrow_schema(
222 &self,
223 compaction: bool,
224 ) -> datatypes::arrow::datatypes::SchemaRef {
225 if !compaction {
226 self.input_arrow_schema.clone()
227 } else {
228 to_flat_sst_arrow_schema(
230 &self.metadata,
231 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
232 )
233 }
234 }
235
236 pub(crate) fn output_schema(&self) -> SchemaRef {
240 self.output_schema.clone()
241 }
242
243 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
245 RecordBatch::new_empty(self.output_schema.clone())
246 }
247
248 pub(crate) fn convert(
252 &self,
253 batch: &datatypes::arrow::record_batch::RecordBatch,
254 cache_strategy: &CacheStrategy,
255 ) -> common_recordbatch::error::Result<RecordBatch> {
256 if self.is_empty_projection {
257 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
258 }
259 let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
262 for (output_idx, index) in self.batch_indices.iter().enumerate() {
263 let mut array = batch.column(*index).clone();
264 if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
266 if let Some(dict_array) = single_value_string_dictionary(
269 &array,
270 &self.output_schema.column_schemas()[output_idx].data_type,
271 value_type.as_ref(),
272 ) {
273 let dict_values = dict_array.values();
274 let value = if dict_values.is_null(0) {
275 Value::Null
276 } else {
277 Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
278 };
279
280 let repeated = repeated_vector_with_cache(
281 &self.output_schema.column_schemas()[output_idx].data_type,
282 &value,
283 batch.num_rows(),
284 cache_strategy,
285 )?;
286 array = repeated.to_arrow_array();
287 } else {
288 let casted = datatypes::arrow::compute::cast(&array, value_type)
289 .context(ArrowComputeSnafu)?;
290 array = casted;
291 }
292 }
293 arrays.push(array);
294 }
295
296 let df_record_batch =
297 DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
298 .context(NewDfRecordBatchSnafu)?;
299 Ok(RecordBatch::from_df_record_batch(
300 self.output_schema.clone(),
301 df_record_batch,
302 ))
303 }
304
305 pub(crate) fn project_vectors(
307 &self,
308 batch: &datatypes::arrow::record_batch::RecordBatch,
309 ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
310 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
311 for index in &self.batch_indices {
312 let mut array = batch.column(*index).clone();
313 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
315 array.data_type()
316 {
317 let casted = datatypes::arrow::compute::cast(&array, value_type)
318 .context(ArrowComputeSnafu)?;
319 array = casted;
320 }
321 let vector = Helper::try_into_vector(array)
322 .map_err(BoxedError::new)
323 .context(ExternalSnafu)?;
324 columns.push(vector);
325 }
326 Ok(columns)
327 }
328}
329
330fn single_value_string_dictionary<'a>(
331 array: &'a Arc<dyn Array>,
332 output_type: &ConcreteDataType,
333 value_type: &ArrowDataType,
334) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
335 if !matches!(
336 value_type,
337 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
338 ) || !output_type.is_string()
339 {
340 return None;
341 }
342
343 let dict_array = array
344 .as_any()
345 .downcast_ref::<datatypes::arrow::array::DictionaryArray<
346 datatypes::arrow::datatypes::UInt32Type,
347 >>()?;
348
349 (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
350}
351
352pub(crate) fn flat_projected_columns(
356 metadata: &RegionMetadata,
357 format_projection: &FormatProjection,
358) -> Vec<(ColumnId, ConcreteDataType)> {
359 let time_index = metadata.time_index_column();
360 let num_columns = if format_projection
361 .column_id_to_projected_index
362 .contains_key(&time_index.column_id)
363 {
364 format_projection.column_id_to_projected_index.len()
365 } else {
366 format_projection.column_id_to_projected_index.len() + 1
367 };
368 let mut schema = vec![None; num_columns];
369 for (column_id, index) in &format_projection.column_id_to_projected_index {
370 schema[*index] = Some((
372 *column_id,
373 metadata
374 .column_by_id(*column_id)
375 .unwrap()
376 .column_schema
377 .data_type
378 .clone(),
379 ));
380 }
381 if num_columns != format_projection.column_id_to_projected_index.len() {
382 schema[num_columns - 1] = Some((
383 time_index.column_id,
384 time_index.column_schema.data_type.clone(),
385 ));
386 }
387
388 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
390}
391
392pub(crate) fn compute_input_arrow_schema(
397 metadata: &RegionMetadata,
398 batch_schema: &[(ColumnId, ConcreteDataType)],
399) -> datatypes::arrow::datatypes::SchemaRef {
400 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
401 for (column_id, _) in batch_schema {
402 let column_metadata = metadata.column_by_id(*column_id).unwrap();
403 let field = Arc::new(Field::new(
404 &column_metadata.column_schema.name,
405 column_metadata.column_schema.data_type.as_arrow_type(),
406 column_metadata.column_schema.is_nullable(),
407 ));
408 let field = if column_metadata.semantic_type == SemanticType::Tag {
409 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
410 } else {
411 field
412 };
413 new_fields.push(field);
414 }
415 new_fields.extend_from_slice(&internal_fields());
416
417 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
418}
419
420pub(crate) struct CompactionProjectionMapper {
423 mapper: FlatProjectionMapper,
424 assembler: DfBatchAssembler,
425}
426
427impl CompactionProjectionMapper {
428 pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
429 let projection = metadata
430 .column_metadatas
431 .iter()
432 .enumerate()
433 .filter_map(|(idx, col)| {
434 if matches!(col.semantic_type, SemanticType::Field) {
435 Some(idx)
436 } else {
437 None
438 }
439 })
440 .chain([metadata.time_index_column_pos()])
441 .collect::<Vec<_>>();
442
443 let mapper = FlatProjectionMapper::new_with_read_columns(
444 metadata,
445 projection,
446 metadata
447 .column_metadatas
448 .iter()
449 .map(|col| col.column_id)
450 .collect(),
451 )?;
452 let assembler = DfBatchAssembler::new(mapper.output_schema());
453
454 Ok(Self { mapper, assembler })
455 }
456
457 pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
461 let columns = self
462 .mapper
463 .project_vectors(&batch)
464 .context(RecordBatchSnafu)?;
465 self.assembler
466 .build_df_record_batch_with_internal(&batch, columns)
467 .context(RecordBatchSnafu)
468 }
469}
470
471pub(crate) struct DfBatchAssembler {
473 output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
474}
475
476impl DfBatchAssembler {
477 pub(crate) fn new(output_schema: SchemaRef) -> Self {
479 let fields = output_schema
480 .arrow_schema()
481 .fields()
482 .into_iter()
483 .chain(internal_fields().iter())
484 .cloned()
485 .collect::<Vec<_>>();
486 let output_arrow_schema_with_internal =
487 Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
488 Self {
489 output_arrow_schema_with_internal,
490 }
491 }
492
493 pub(crate) fn build_df_record_batch_with_internal(
498 &self,
499 batch: &datatypes::arrow::record_batch::RecordBatch,
500 mut columns: Vec<datatypes::vectors::VectorRef>,
501 ) -> common_recordbatch::error::Result<DfRecordBatch> {
502 let num_columns = batch.columns().len();
503 let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
505 for index in internal_indices.iter() {
506 let array = batch.column(*index).clone();
507 let vector = Helper::try_into_vector(array)
508 .map_err(BoxedError::new)
509 .context(ExternalSnafu)?;
510 columns.push(vector);
511 }
512 RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
513 }
514}