1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::arrow::array::Array;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::Helper;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::{RegionMetadata, RegionMetadataRef};
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
35use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
36use crate::read::read_columns::ReadColumns;
37use crate::sst::parquet::flat_format::sst_column_id_indices;
38use crate::sst::parquet::format::FormatProjection;
39use crate::sst::{
40 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
41 with_field_id,
42};
43
44pub struct FlatProjectionMapper {
49 metadata: RegionMetadataRef,
51 output_schema: SchemaRef,
53 read_cols: ReadColumns,
58 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
63 is_empty_projection: bool,
65 batch_indices: Vec<usize>,
67 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
69}
70
71impl FlatProjectionMapper {
72 pub fn new(
77 metadata: &RegionMetadataRef,
78 projection: impl IntoIterator<Item = usize>,
79 ) -> Result<Self> {
80 let projection: Vec<_> = projection.into_iter().collect();
81 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
82 let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
83 Self::new_with_read_columns(metadata, projection, read_cols)
84 }
85
86 pub fn new_with_read_columns(
88 metadata: &RegionMetadataRef,
89 projection: Vec<usize>,
90 read_cols: ReadColumns,
91 ) -> Result<Self> {
92 let is_empty_projection = projection.is_empty();
94
95 let mut col_schemas = Vec::with_capacity(projection.len());
97 let mut output_col_ids = Vec::with_capacity(projection.len());
99 for idx in &projection {
100 let col = metadata
101 .column_metadatas
102 .get(*idx)
103 .with_context(|| InvalidRequestSnafu {
104 region_id: metadata.region_id,
105 reason: format!("projection index {} is out of bound", idx),
106 })?;
107 output_col_ids.push(col.column_id);
108 col_schemas.push(col.column_schema.clone());
109 }
110
111 let id_to_index = sst_column_id_indices(metadata);
113
114 let format_projection = FormatProjection::compute_format_projection(
116 &id_to_index,
117 metadata.column_metadatas.len() + 3,
119 read_cols.clone(),
120 );
121
122 let batch_schema = flat_projected_columns(metadata, &format_projection);
123
124 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
126
127 let output_schema = if is_empty_projection {
129 Arc::new(Schema::new(vec![]))
130 } else {
131 Arc::new(Schema::new(col_schemas))
133 };
134
135 let batch_indices = if is_empty_projection {
136 vec![]
137 } else {
138 output_col_ids
139 .iter()
140 .map(|id| {
141 format_projection
143 .column_id_to_projected_index
144 .get(id)
145 .copied()
146 .with_context(|| {
147 let name = metadata
148 .column_by_id(*id)
149 .map(|column| column.column_schema.name.clone())
150 .unwrap_or_else(|| id.to_string());
151 InvalidRequestSnafu {
152 region_id: metadata.region_id,
153 reason: format!(
154 "output column {} is missing in read projection",
155 name
156 ),
157 }
158 })
159 })
160 .collect::<Result<Vec<_>>>()?
161 };
162
163 Ok(FlatProjectionMapper {
164 metadata: metadata.clone(),
165 output_schema,
166 read_cols,
167 batch_schema,
168 is_empty_projection,
169 batch_indices,
170 input_arrow_schema,
171 })
172 }
173
174 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
176 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
177 }
178
179 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
181 &self.metadata
182 }
183 pub(crate) fn read_columns(&self) -> &ReadColumns {
185 &self.read_cols
186 }
187
188 pub(crate) fn field_column_start(&self) -> usize {
190 for (idx, column_id) in self
191 .batch_schema
192 .iter()
193 .map(|(column_id, _)| column_id)
194 .enumerate()
195 {
196 if self
198 .metadata
199 .column_by_id(*column_id)
200 .unwrap()
201 .semantic_type
202 == SemanticType::Field
203 {
204 return idx;
205 }
206 }
207
208 self.batch_schema.len()
209 }
210
211 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
213 &self.batch_schema
214 }
215
216 pub(crate) fn input_arrow_schema(
220 &self,
221 compaction: bool,
222 ) -> datatypes::arrow::datatypes::SchemaRef {
223 if !compaction {
224 self.input_arrow_schema.clone()
225 } else {
226 to_flat_sst_arrow_schema(
228 &self.metadata,
229 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
230 )
231 }
232 }
233
234 pub(crate) fn output_schema(&self) -> SchemaRef {
238 self.output_schema.clone()
239 }
240
241 pub(crate) fn convert(
245 &self,
246 batch: &datatypes::arrow::record_batch::RecordBatch,
247 cache_strategy: &CacheStrategy,
248 ) -> common_recordbatch::error::Result<RecordBatch> {
249 if self.is_empty_projection {
250 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
251 }
252 let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
255 for (output_idx, index) in self.batch_indices.iter().enumerate() {
256 let mut array = batch.column(*index).clone();
257 if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
259 if let Some(dict_array) = single_value_string_dictionary(
262 &array,
263 &self.output_schema.column_schemas()[output_idx].data_type,
264 value_type.as_ref(),
265 ) {
266 let dict_values = dict_array.values();
267 let value = if dict_values.is_null(0) {
268 Value::Null
269 } else {
270 Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
271 };
272
273 let repeated = repeated_vector_with_cache(
274 &self.output_schema.column_schemas()[output_idx].data_type,
275 &value,
276 batch.num_rows(),
277 cache_strategy,
278 )?;
279 array = repeated.to_arrow_array();
280 } else {
281 let casted = datatypes::arrow::compute::cast(&array, value_type)
282 .context(ArrowComputeSnafu)?;
283 array = casted;
284 }
285 }
286 arrays.push(array);
287 }
288
289 let df_record_batch =
290 DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
291 .context(NewDfRecordBatchSnafu)?;
292 Ok(RecordBatch::from_df_record_batch(
293 self.output_schema.clone(),
294 df_record_batch,
295 ))
296 }
297
298 pub(crate) fn project_vectors(
300 &self,
301 batch: &datatypes::arrow::record_batch::RecordBatch,
302 ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
303 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
304 for index in &self.batch_indices {
305 let mut array = batch.column(*index).clone();
306 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
308 array.data_type()
309 {
310 let casted = datatypes::arrow::compute::cast(&array, value_type)
311 .context(ArrowComputeSnafu)?;
312 array = casted;
313 }
314 let vector = Helper::try_into_vector(array)
315 .map_err(BoxedError::new)
316 .context(ExternalSnafu)?;
317 columns.push(vector);
318 }
319 Ok(columns)
320 }
321}
322
323fn single_value_string_dictionary<'a>(
324 array: &'a Arc<dyn Array>,
325 output_type: &ConcreteDataType,
326 value_type: &ArrowDataType,
327) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
328 if !matches!(
329 value_type,
330 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
331 ) || !output_type.is_string()
332 {
333 return None;
334 }
335
336 let dict_array = array
337 .as_any()
338 .downcast_ref::<datatypes::arrow::array::DictionaryArray<
339 datatypes::arrow::datatypes::UInt32Type,
340 >>()?;
341
342 (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
343}
344
345pub(crate) fn flat_projected_columns(
349 metadata: &RegionMetadata,
350 format_projection: &FormatProjection,
351) -> Vec<(ColumnId, ConcreteDataType)> {
352 let time_index = metadata.time_index_column();
353 let num_columns = if format_projection
354 .column_id_to_projected_index
355 .contains_key(&time_index.column_id)
356 {
357 format_projection.column_id_to_projected_index.len()
358 } else {
359 format_projection.column_id_to_projected_index.len() + 1
360 };
361 let mut schema = vec![None; num_columns];
362 for (column_id, index) in &format_projection.column_id_to_projected_index {
363 schema[*index] = Some((
365 *column_id,
366 metadata
367 .column_by_id(*column_id)
368 .unwrap()
369 .column_schema
370 .data_type
371 .clone(),
372 ));
373 }
374 if num_columns != format_projection.column_id_to_projected_index.len() {
375 schema[num_columns - 1] = Some((
376 time_index.column_id,
377 time_index.column_schema.data_type.clone(),
378 ));
379 }
380
381 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
383}
384
385pub(crate) fn compute_input_arrow_schema(
390 metadata: &RegionMetadata,
391 batch_schema: &[(ColumnId, ConcreteDataType)],
392) -> datatypes::arrow::datatypes::SchemaRef {
393 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
394 for (column_id, _) in batch_schema {
395 let column_metadata = metadata.column_by_id(*column_id).unwrap();
396 let field = Field::new(
397 &column_metadata.column_schema.name,
398 column_metadata.column_schema.data_type.as_arrow_type(),
399 column_metadata.column_schema.is_nullable(),
400 );
401 let field = with_field_id(field, *column_id);
402 if column_metadata.semantic_type == SemanticType::Tag {
403 new_fields.push(tag_maybe_to_dictionary_field(
404 &column_metadata.column_schema.data_type,
405 &Arc::new(field),
406 ));
407 } else {
408 new_fields.push(Arc::new(field));
409 }
410 }
411 new_fields.extend_from_slice(&internal_fields());
412
413 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
414}
415
416pub(crate) struct CompactionProjectionMapper {
419 mapper: FlatProjectionMapper,
420 assembler: DfBatchAssembler,
421}
422
423impl CompactionProjectionMapper {
424 pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
425 let projection = metadata
426 .column_metadatas
427 .iter()
428 .enumerate()
429 .filter_map(|(idx, col)| {
430 if matches!(col.semantic_type, SemanticType::Field) {
431 Some(idx)
432 } else {
433 None
434 }
435 })
436 .chain([metadata.time_index_column_pos()])
437 .collect::<Vec<_>>();
438
439 let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
440 let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
441 let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?;
442 let assembler = DfBatchAssembler::new(mapper.output_schema());
443
444 Ok(Self { mapper, assembler })
445 }
446
447 pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
451 let columns = self
452 .mapper
453 .project_vectors(&batch)
454 .context(RecordBatchSnafu)?;
455 self.assembler
456 .build_df_record_batch_with_internal(&batch, columns)
457 .context(RecordBatchSnafu)
458 }
459}
460
461pub(crate) struct DfBatchAssembler {
463 output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
464}
465
466impl DfBatchAssembler {
467 pub(crate) fn new(output_schema: SchemaRef) -> Self {
469 let fields = output_schema
470 .arrow_schema()
471 .fields()
472 .into_iter()
473 .chain(internal_fields().iter())
474 .cloned()
475 .collect::<Vec<_>>();
476 let output_arrow_schema_with_internal =
477 Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
478 Self {
479 output_arrow_schema_with_internal,
480 }
481 }
482
483 pub(crate) fn build_df_record_batch_with_internal(
488 &self,
489 batch: &datatypes::arrow::record_batch::RecordBatch,
490 mut columns: Vec<datatypes::vectors::VectorRef>,
491 ) -> common_recordbatch::error::Result<DfRecordBatch> {
492 let num_columns = batch.columns().len();
493 let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
495 for index in internal_indices.iter() {
496 let array = batch.column(*index).clone();
497 let vector = Helper::try_into_vector(array)
498 .map_err(BoxedError::new)
499 .context(ExternalSnafu)?;
500 columns.push(vector);
501 }
502 RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
503 }
504}