1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use common_error::ext::BoxedError;
22use common_recordbatch::error::{
23 ArrowComputeSnafu, DataTypesSnafu, ExternalSnafu, NewDfRecordBatchSnafu,
24};
25use common_recordbatch::{DfRecordBatch, RecordBatch};
26use datatypes::arrow::array::Array;
27use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
28use datatypes::extension::json::is_structured_json_field;
29use datatypes::prelude::{ConcreteDataType, DataType};
30use datatypes::schema::{Schema, SchemaRef};
31use datatypes::types::json_type::JsonNativeType;
32use datatypes::value::Value;
33use datatypes::vectors::Helper;
34use datatypes::vectors::json::array::JsonArray;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::storage::ColumnId;
38
39use crate::cache::CacheStrategy;
40use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
41use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
42use crate::read::read_columns::ReadColumns;
43use crate::sst::parquet::flat_format::sst_column_id_indices;
44use crate::sst::parquet::format::FormatProjection;
45use crate::sst::{
46 FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
47 with_field_id,
48};
49
50pub struct FlatProjectionMapper {
55 metadata: RegionMetadataRef,
57 output_schema: SchemaRef,
59 read_cols: ReadColumns,
64 batch_schema: Vec<(ColumnId, ConcreteDataType)>,
69 is_empty_projection: bool,
71 batch_indices: Vec<usize>,
73 input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
75}
76
77impl FlatProjectionMapper {
78 pub fn new(
83 metadata: &RegionMetadataRef,
84 projection: impl IntoIterator<Item = usize>,
85 ) -> Result<Self> {
86 let projection: Vec<_> = projection.into_iter().collect();
87 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
88 let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
89 Self::new_with_read_columns(metadata, projection, read_cols, None)
90 }
91
92 pub fn new_with_read_columns(
94 metadata: &RegionMetadataRef,
95 projection: Vec<usize>,
96 read_cols: ReadColumns,
97 json_type_hint: Option<&HashMap<String, JsonNativeType>>,
98 ) -> Result<Self> {
99 let is_empty_projection = projection.is_empty();
101
102 let mut col_schemas = Vec::with_capacity(projection.len());
104 let mut output_col_ids = Vec::with_capacity(projection.len());
106 for idx in &projection {
107 let col = metadata
108 .column_metadatas
109 .get(*idx)
110 .with_context(|| InvalidRequestSnafu {
111 region_id: metadata.region_id,
112 reason: format!("projection index {} is out of bound", idx),
113 })?;
114 output_col_ids.push(col.column_id);
115
116 let mut schema = col.column_schema.clone();
117 if let Some(concretized) = json_type_hint
118 .and_then(|x| x.get(&schema.name))
119 .cloned()
120 .map(ConcreteDataType::json2)
121 && schema
122 .data_type
123 .as_json()
124 .is_some_and(|json_type| json_type.is_json2())
125 {
126 schema.data_type = concretized;
127 }
128 col_schemas.push(schema);
129 }
130
131 let id_to_index = sst_column_id_indices(metadata);
133
134 let format_projection = FormatProjection::compute_format_projection(
136 &id_to_index,
137 metadata.column_metadatas.len() + 3,
139 read_cols.clone(),
140 );
141
142 let mut batch_schema = flat_projected_columns(metadata, &format_projection);
143
144 if let Some(json_type_hint) = json_type_hint
145 && !json_type_hint.is_empty()
146 {
147 for (column_id, data_type) in batch_schema.iter_mut() {
148 if data_type
149 .as_json()
150 .is_some_and(|json_type| json_type.is_json2())
151 && let Some(concretized) = metadata
152 .column_by_id(*column_id)
153 .and_then(|x| json_type_hint.get(&x.column_schema.name).cloned())
154 .map(ConcreteDataType::json2)
155 {
156 *data_type = concretized;
157 }
158 }
159 }
160
161 let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
163
164 let output_schema = if is_empty_projection {
166 Arc::new(Schema::new(vec![]))
167 } else {
168 Arc::new(Schema::new(col_schemas))
170 };
171
172 let batch_indices = if is_empty_projection {
173 vec![]
174 } else {
175 output_col_ids
176 .iter()
177 .map(|id| {
178 format_projection
180 .column_id_to_projected_index
181 .get(id)
182 .copied()
183 .with_context(|| {
184 let name = metadata
185 .column_by_id(*id)
186 .map(|column| column.column_schema.name.clone())
187 .unwrap_or_else(|| id.to_string());
188 InvalidRequestSnafu {
189 region_id: metadata.region_id,
190 reason: format!(
191 "output column {} is missing in read projection",
192 name
193 ),
194 }
195 })
196 })
197 .collect::<Result<Vec<_>>>()?
198 };
199
200 Ok(FlatProjectionMapper {
201 metadata: metadata.clone(),
202 output_schema,
203 read_cols,
204 batch_schema,
205 is_empty_projection,
206 batch_indices,
207 input_arrow_schema,
208 })
209 }
210
211 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
213 FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
214 }
215
216 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
218 &self.metadata
219 }
220 pub(crate) fn read_columns(&self) -> &ReadColumns {
222 &self.read_cols
223 }
224
225 pub(crate) fn field_column_start(&self) -> usize {
227 for (idx, column_id) in self
228 .batch_schema
229 .iter()
230 .map(|(column_id, _)| column_id)
231 .enumerate()
232 {
233 if self
235 .metadata
236 .column_by_id(*column_id)
237 .unwrap()
238 .semantic_type
239 == SemanticType::Field
240 {
241 return idx;
242 }
243 }
244
245 self.batch_schema.len()
246 }
247
248 pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
250 &self.batch_schema
251 }
252
253 pub(crate) fn input_arrow_schema(
257 &self,
258 compaction: bool,
259 ) -> datatypes::arrow::datatypes::SchemaRef {
260 if !compaction {
261 self.input_arrow_schema.clone()
262 } else {
263 let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
265 options.concretized_json_types = self
266 .input_arrow_schema
267 .fields()
268 .iter()
269 .filter(|&field| is_structured_json_field(field))
270 .map(|field| (field.name().clone(), field.data_type().clone()))
271 .collect();
272 to_flat_sst_arrow_schema(&self.metadata, &options)
273 }
274 }
275
276 pub(crate) fn output_schema(&self) -> SchemaRef {
280 self.output_schema.clone()
281 }
282
283 pub(crate) fn convert(
287 &self,
288 batch: &datatypes::arrow::record_batch::RecordBatch,
289 cache_strategy: &CacheStrategy,
290 ) -> common_recordbatch::error::Result<RecordBatch> {
291 if self.is_empty_projection {
292 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
293 }
294 let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
297 for (output_idx, index) in self.batch_indices.iter().enumerate() {
298 let mut array = batch.column(*index).clone();
299 if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
301 if let Some(dict_array) = single_value_string_dictionary(
304 &array,
305 &self.output_schema.column_schemas()[output_idx].data_type,
306 value_type.as_ref(),
307 ) {
308 let dict_values = dict_array.values();
309 let value = if dict_values.is_null(0) {
310 Value::Null
311 } else {
312 Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
313 };
314
315 let repeated = repeated_vector_with_cache(
316 &self.output_schema.column_schemas()[output_idx].data_type,
317 &value,
318 batch.num_rows(),
319 cache_strategy,
320 )?;
321 array = repeated.to_arrow_array();
322 } else {
323 let casted = datatypes::arrow::compute::cast(&array, value_type)
324 .context(ArrowComputeSnafu)?;
325 array = casted;
326 }
327 }
328
329 let field = &self.output_schema.arrow_schema().fields()[output_idx];
330 if is_structured_json_field(field) {
331 array = JsonArray::from(&array)
332 .try_align(field.data_type())
333 .context(DataTypesSnafu)?;
334 }
335
336 arrays.push(array);
337 }
338
339 let df_record_batch =
340 DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
341 .context(NewDfRecordBatchSnafu)?;
342 Ok(RecordBatch::from_df_record_batch(
343 self.output_schema.clone(),
344 df_record_batch,
345 ))
346 }
347
348 pub(crate) fn project_vectors(
350 &self,
351 batch: &datatypes::arrow::record_batch::RecordBatch,
352 ) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
353 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
354 for index in &self.batch_indices {
355 let mut array = batch.column(*index).clone();
356 if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
358 array.data_type()
359 {
360 let casted = datatypes::arrow::compute::cast(&array, value_type)
361 .context(ArrowComputeSnafu)?;
362 array = casted;
363 }
364 let vector = Helper::try_into_vector(array)
365 .map_err(BoxedError::new)
366 .context(ExternalSnafu)?;
367 columns.push(vector);
368 }
369 Ok(columns)
370 }
371}
372
373fn single_value_string_dictionary<'a>(
374 array: &'a Arc<dyn Array>,
375 output_type: &ConcreteDataType,
376 value_type: &ArrowDataType,
377) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
378 if !matches!(
379 value_type,
380 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
381 ) || !output_type.is_string()
382 {
383 return None;
384 }
385
386 let dict_array = array
387 .as_any()
388 .downcast_ref::<datatypes::arrow::array::DictionaryArray<
389 datatypes::arrow::datatypes::UInt32Type,
390 >>()?;
391
392 (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
393}
394
395pub(crate) fn flat_projected_columns(
399 metadata: &RegionMetadata,
400 format_projection: &FormatProjection,
401) -> Vec<(ColumnId, ConcreteDataType)> {
402 let time_index = metadata.time_index_column();
403 let num_columns = if format_projection
404 .column_id_to_projected_index
405 .contains_key(&time_index.column_id)
406 {
407 format_projection.column_id_to_projected_index.len()
408 } else {
409 format_projection.column_id_to_projected_index.len() + 1
410 };
411 let mut schema = vec![None; num_columns];
412 for (column_id, index) in &format_projection.column_id_to_projected_index {
413 schema[*index] = Some((
415 *column_id,
416 metadata
417 .column_by_id(*column_id)
418 .unwrap()
419 .column_schema
420 .data_type
421 .clone(),
422 ));
423 }
424 if num_columns != format_projection.column_id_to_projected_index.len() {
425 schema[num_columns - 1] = Some((
426 time_index.column_id,
427 time_index.column_schema.data_type.clone(),
428 ));
429 }
430
431 schema.into_iter().map(|id_type| id_type.unwrap()).collect()
433}
434
435pub(crate) fn compute_input_arrow_schema(
440 metadata: &RegionMetadata,
441 batch_schema: &[(ColumnId, ConcreteDataType)],
442) -> datatypes::arrow::datatypes::SchemaRef {
443 let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
444 for (column_id, data_type) in batch_schema {
445 let column_metadata = metadata.column_by_id(*column_id).unwrap();
446 let field = Field::new(
447 &column_metadata.column_schema.name,
448 data_type.as_arrow_type(),
449 column_metadata.column_schema.is_nullable(),
450 )
451 .with_metadata(column_metadata.column_schema.metadata().clone());
452 let field = with_field_id(field, *column_id);
453 if column_metadata.semantic_type == SemanticType::Tag {
454 new_fields.push(tag_maybe_to_dictionary_field(
455 &column_metadata.column_schema.data_type,
456 &Arc::new(field),
457 ));
458 } else {
459 new_fields.push(Arc::new(field));
460 }
461 }
462 new_fields.extend_from_slice(&internal_fields());
463
464 Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
465}
466
467pub(crate) struct CompactionProjectionMapper {
470 mapper: FlatProjectionMapper,
471 assembler: DfBatchAssembler,
472}
473
474impl CompactionProjectionMapper {
475 pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
476 let projection = metadata
477 .column_metadatas
478 .iter()
479 .enumerate()
480 .filter_map(|(idx, col)| {
481 if matches!(col.semantic_type, SemanticType::Field) {
482 Some(idx)
483 } else {
484 None
485 }
486 })
487 .chain([metadata.time_index_column_pos()])
488 .collect::<Vec<_>>();
489
490 let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
491 let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
492 let mapper =
493 FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols, None)?;
494 let assembler = DfBatchAssembler::new(mapper.output_schema());
495
496 Ok(Self { mapper, assembler })
497 }
498
499 pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
503 let columns = self
504 .mapper
505 .project_vectors(&batch)
506 .context(RecordBatchSnafu)?;
507 self.assembler
508 .build_df_record_batch_with_internal(&batch, columns)
509 .context(RecordBatchSnafu)
510 }
511}
512
513pub(crate) struct DfBatchAssembler {
515 output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
516}
517
518impl DfBatchAssembler {
519 pub(crate) fn new(output_schema: SchemaRef) -> Self {
521 let fields = output_schema
522 .arrow_schema()
523 .fields()
524 .into_iter()
525 .chain(internal_fields().iter())
526 .cloned()
527 .collect::<Vec<_>>();
528 let output_arrow_schema_with_internal =
529 Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
530 Self {
531 output_arrow_schema_with_internal,
532 }
533 }
534
535 pub(crate) fn build_df_record_batch_with_internal(
540 &self,
541 batch: &datatypes::arrow::record_batch::RecordBatch,
542 mut columns: Vec<datatypes::vectors::VectorRef>,
543 ) -> common_recordbatch::error::Result<DfRecordBatch> {
544 let num_columns = batch.columns().len();
545 let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
547 for index in internal_indices.iter() {
548 let array = batch.column(*index).clone();
549 let vector = Helper::try_into_vector(array)
550 .map_err(BoxedError::new)
551 .context(ExternalSnafu)?;
552 columns.push(vector);
553 }
554 RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use datatypes::types::json_type::JsonObjectType;
561 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
562 use store_api::storage::RegionId;
563
564 use super::*;
565
566 fn metadata_with_legacy_json() -> RegionMetadataRef {
567 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
568 builder
569 .push_column_metadata(ColumnMetadata {
570 column_schema: datatypes::schema::ColumnSchema::new(
571 "j",
572 ConcreteDataType::json_datatype(),
573 true,
574 ),
575 semantic_type: SemanticType::Field,
576 column_id: 0,
577 })
578 .push_column_metadata(ColumnMetadata {
579 column_schema: datatypes::schema::ColumnSchema::new(
580 "ts",
581 ConcreteDataType::timestamp_millisecond_datatype(),
582 false,
583 ),
584 semantic_type: SemanticType::Timestamp,
585 column_id: 1,
586 });
587 Arc::new(builder.build().unwrap())
588 }
589
590 #[test]
591 fn test_json_type_hint_does_not_concretize_legacy_json() {
592 let metadata = metadata_with_legacy_json();
593 let hint = HashMap::from([(
594 "j".to_string(),
595 JsonNativeType::Object(JsonObjectType::from([(
596 "a".to_string(),
597 JsonNativeType::i64(),
598 )])),
599 )]);
600 let mapper = FlatProjectionMapper::new_with_read_columns(
601 &metadata,
602 vec![0, 1],
603 ReadColumns::from_deduped_column_ids([0, 1]),
604 Some(&hint),
605 )
606 .unwrap();
607
608 assert_eq!(
609 mapper.batch_schema()[0],
610 (0, ConcreteDataType::json_datatype())
611 );
612 }
613}