1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::OpType;
21use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array, UInt8Array};
22use datatypes::arrow::compute::filter_record_batch;
23use datatypes::arrow::datatypes::SchemaRef;
24use datatypes::arrow::record_batch::RecordBatch;
25use snafu::{OptionExt, ResultExt};
26use store_api::metadata::{ColumnMetadata, RegionMetadata};
27use store_api::storage::{RegionId, SequenceNumber};
28
29use crate::error::{
30 ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result,
31 UnexpectedImpureDefaultSnafu,
32};
33use crate::sst::parquet::plain_format::PLAIN_FIXED_POS_COLUMN_NUM;
34
35#[derive(Debug)]
42pub struct PlainBatch {
43 record_batch: RecordBatch,
45}
46
47impl PlainBatch {
48 pub fn new(record_batch: RecordBatch) -> Self {
50 assert!(
51 record_batch.num_columns() >= 2,
52 "record batch missing internal columns, num_columns: {}",
53 record_batch.num_columns()
54 );
55
56 Self { record_batch }
57 }
58
59 pub fn with_new_columns(&self, columns: Vec<ArrayRef>) -> Result<Self> {
61 let record_batch = RecordBatch::try_new(self.record_batch.schema(), columns)
62 .context(NewRecordBatchSnafu)?;
63 Ok(Self::new(record_batch))
64 }
65
66 pub fn num_columns(&self) -> usize {
68 self.record_batch.num_columns()
69 }
70
71 pub fn num_rows(&self) -> usize {
73 self.record_batch.num_rows()
74 }
75
76 pub fn is_empty(&self) -> bool {
78 self.num_rows() == 0
79 }
80
81 pub fn columns(&self) -> &[ArrayRef] {
83 self.record_batch.columns()
84 }
85
86 pub fn column(&self, idx: usize) -> &ArrayRef {
88 self.record_batch.column(idx)
89 }
90
91 pub fn internal_columns(&self) -> &[ArrayRef] {
93 &self.record_batch.columns()[self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM..]
94 }
95
96 pub fn as_record_batch(&self) -> &RecordBatch {
98 &self.record_batch
99 }
100
101 pub fn into_record_batch(self) -> RecordBatch {
103 self.record_batch
104 }
105
106 pub fn filter(&self, predicate: &BooleanArray) -> Result<Self> {
108 let record_batch =
109 filter_record_batch(&self.record_batch, predicate).context(ComputeArrowSnafu)?;
110 Ok(Self::new(record_batch))
111 }
112
113 #[allow(dead_code)]
115 pub(crate) fn sequence_column_index(&self) -> usize {
116 self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM
117 }
118}
119
120pub struct ColumnFiller<'a> {
122 metadata: &'a RegionMetadata,
124 schema: SchemaRef,
126 name_to_index: HashMap<String, usize>,
128}
129
130impl<'a> ColumnFiller<'a> {
131 pub fn new(
134 metadata: &'a RegionMetadata,
135 schema: SchemaRef,
136 record_batch: &RecordBatch,
137 ) -> Self {
138 debug_assert_eq!(metadata.column_metadatas.len() + 2, schema.fields().len());
139
140 let name_to_index: HashMap<_, _> = record_batch
142 .schema()
143 .fields()
144 .iter()
145 .enumerate()
146 .map(|(i, field)| (field.name().clone(), i))
147 .collect();
148
149 Self {
150 metadata,
151 schema,
152 name_to_index,
153 }
154 }
155
156 pub fn fill_missing_columns(
158 &self,
159 record_batch: &RecordBatch,
160 sequence: SequenceNumber,
161 op_type: OpType,
162 ) -> Result<RecordBatch> {
163 let num_rows = record_batch.num_rows();
164 let mut new_columns =
165 Vec::with_capacity(record_batch.num_columns() + PLAIN_FIXED_POS_COLUMN_NUM);
166
167 for column in &self.metadata.column_metadatas {
170 let array = match self.name_to_index.get(&column.column_schema.name) {
171 Some(index) => record_batch.column(*index).clone(),
172 None => match op_type {
173 OpType::Put => {
174 fill_column_put_default(self.metadata.region_id, column, num_rows)?
176 }
177 OpType::Delete => {
178 fill_column_delete_default(column, num_rows)?
180 }
181 },
182 };
183
184 new_columns.push(array);
185 }
186
187 let sequence_array = Arc::new(UInt64Array::from(vec![sequence; num_rows]));
190 let op_type_array = Arc::new(UInt8Array::from(vec![op_type as u8; num_rows]));
192 new_columns.push(sequence_array);
193 new_columns.push(op_type_array);
194
195 RecordBatch::try_new(self.schema.clone(), new_columns).context(NewRecordBatchSnafu)
196 }
197}
198
199fn fill_column_put_default(
200 region_id: RegionId,
201 column: &ColumnMetadata,
202 num_rows: usize,
203) -> Result<ArrayRef> {
204 if column.column_schema.is_default_impure() {
205 return UnexpectedImpureDefaultSnafu {
206 region_id,
207 column: &column.column_schema.name,
208 default_value: format!("{:?}", column.column_schema.default_constraint()),
209 }
210 .fail();
211 }
212 let vector = column
213 .column_schema
214 .create_default_vector(num_rows)
215 .context(CreateDefaultSnafu {
216 region_id,
217 column: &column.column_schema.name,
218 })?
219 .with_context(|| InvalidRequestSnafu {
221 region_id,
222 reason: format!(
223 "column {} does not have default value",
224 column.column_schema.name
225 ),
226 })?;
227 Ok(vector.to_arrow_array())
228}
229
230fn fill_column_delete_default(column: &ColumnMetadata, num_rows: usize) -> Result<ArrayRef> {
231 let vector = column
233 .column_schema
234 .create_default_vector_for_padding(num_rows);
235 Ok(vector.to_arrow_array())
236}
237
238#[cfg(test)]
239mod tests {
240 use api::v1::SemanticType;
241 use datatypes::arrow::array::{
242 Float64Array, Int32Array, StringArray, TimestampMillisecondArray,
243 };
244 use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
245 use datatypes::schema::constraint::ColumnDefaultConstraint;
246 use datatypes::schema::ColumnSchema;
247 use datatypes::value::Value;
248 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
249 use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
250 use store_api::storage::{ConcreteDataType, RegionId};
251
252 use super::*;
253 use crate::sst::to_plain_sst_arrow_schema;
254
255 fn create_test_region_metadata() -> RegionMetadata {
257 let mut builder = RegionMetadataBuilder::new(RegionId::new(100, 200));
258 builder
259 .push_column_metadata(ColumnMetadata {
261 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false)
262 .with_default_constraint(None)
263 .unwrap(),
264 semantic_type: SemanticType::Tag,
265 column_id: 0,
266 })
267 .push_column_metadata(ColumnMetadata {
269 column_schema: ColumnSchema::new(
270 "ts",
271 ConcreteDataType::timestamp_millisecond_datatype(),
272 false,
273 )
274 .with_time_index(true)
275 .with_default_constraint(None)
276 .unwrap(),
277 semantic_type: SemanticType::Timestamp,
278 column_id: 1,
279 })
280 .push_column_metadata(ColumnMetadata {
282 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true)
283 .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Float64(
284 datatypes::value::OrderedFloat::from(42.0),
285 ))))
286 .unwrap(),
287 semantic_type: SemanticType::Field,
288 column_id: 2,
289 })
290 .primary_key(vec![0]);
291
292 builder.build().unwrap()
293 }
294
295 #[test]
296 fn test_column_filler_put() {
297 let region_metadata = create_test_region_metadata();
298 let output_schema = to_plain_sst_arrow_schema(®ion_metadata);
299
300 let input_schema = Arc::new(Schema::new(vec![
302 Field::new("k0", DataType::Utf8, false),
303 Field::new(
304 "ts",
305 DataType::Timestamp(TimeUnit::Millisecond, None),
306 false,
307 ),
308 ]));
309
310 let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"]));
311 let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
312
313 let input_batch =
314 RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap();
315
316 let filler = ColumnFiller::new(®ion_metadata, output_schema.clone(), &input_batch);
318
319 let result = filler
321 .fill_missing_columns(&input_batch, 100, OpType::Put)
322 .unwrap();
323
324 let expected_columns = vec![
327 k0_values.clone(),
328 ts_values.clone(),
329 Arc::new(Float64Array::from(vec![42.0, 42.0])),
330 Arc::new(UInt64Array::from(vec![100, 100])),
331 Arc::new(UInt8Array::from(vec![OpType::Put as u8, OpType::Put as u8])),
332 ];
333 let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap();
334 assert_eq!(expected_batch, result);
335 }
336
337 #[test]
338 fn test_column_filler_delete() {
339 let region_metadata = create_test_region_metadata();
340 let output_schema = to_plain_sst_arrow_schema(®ion_metadata);
341
342 let input_schema = Arc::new(Schema::new(vec![
344 Field::new("k0", DataType::Utf8, false),
345 Field::new(
346 "ts",
347 DataType::Timestamp(TimeUnit::Millisecond, None),
348 false,
349 ),
350 ]));
351
352 let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"]));
353 let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
354
355 let input_batch =
356 RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap();
357
358 let filler = ColumnFiller::new(®ion_metadata, output_schema.clone(), &input_batch);
360
361 let result = filler
363 .fill_missing_columns(&input_batch, 200, OpType::Delete)
364 .unwrap();
365
366 let v1_default = Arc::new(Float64Array::from(vec![None, None]));
368 let expected_columns = vec![
369 k0_values.clone(),
370 ts_values.clone(),
371 v1_default,
372 Arc::new(UInt64Array::from(vec![200, 200])),
373 Arc::new(UInt8Array::from(vec![
374 OpType::Delete as u8,
375 OpType::Delete as u8,
376 ])),
377 ];
378 let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap();
379 assert_eq!(expected_batch, result);
380 }
381
382 fn create_test_record_batch() -> RecordBatch {
383 let schema = Arc::new(Schema::new(vec![
384 Field::new("col1", DataType::Int32, false),
385 Field::new("col2", DataType::Utf8, false),
386 Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
387 Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
388 ]));
389
390 let col1 = Arc::new(Int32Array::from(vec![1, 2, 3]));
391 let col2 = Arc::new(StringArray::from(vec!["a", "b", "c"]));
392 let sequence = Arc::new(UInt64Array::from(vec![100, 101, 102]));
393 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1]));
394
395 RecordBatch::try_new(schema, vec![col1, col2, sequence, op_type]).unwrap()
396 }
397
398 #[test]
399 fn test_plain_batch_basic_methods() {
400 let record_batch = create_test_record_batch();
401 let plain_batch = PlainBatch::new(record_batch.clone());
402
403 assert_eq!(plain_batch.num_columns(), 4);
405 assert_eq!(plain_batch.num_rows(), 3);
406 assert!(!plain_batch.is_empty());
407 assert_eq!(plain_batch.columns().len(), 4);
408
409 let internal_columns = plain_batch.internal_columns();
411 assert_eq!(internal_columns.len(), PLAIN_FIXED_POS_COLUMN_NUM);
412 assert_eq!(internal_columns[0].len(), 3);
413 assert_eq!(internal_columns[1].len(), 3);
414
415 let col1 = plain_batch.column(0);
417 assert_eq!(col1.len(), 3);
418 assert_eq!(
419 col1.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
420 1
421 );
422
423 assert_eq!(plain_batch.sequence_column_index(), 2);
425
426 assert_eq!(record_batch, *plain_batch.as_record_batch());
428 assert_eq!(record_batch, plain_batch.into_record_batch());
429 }
430
431 #[test]
432 fn test_with_new_columns() {
433 let record_batch = create_test_record_batch();
434 let plain_batch = PlainBatch::new(record_batch);
435
436 let col1 = Arc::new(Int32Array::from(vec![10, 20, 30]));
438 let col2 = Arc::new(StringArray::from(vec!["x", "y", "z"]));
439 let sequence = Arc::new(UInt64Array::from(vec![200, 201, 202]));
440 let op_type = Arc::new(UInt8Array::from(vec![0, 0, 0]));
441
442 let new_batch = plain_batch
443 .with_new_columns(vec![col1, col2, sequence, op_type])
444 .unwrap();
445
446 assert_eq!(new_batch.num_columns(), 4);
447 assert_eq!(new_batch.num_rows(), 3);
448 assert_eq!(
449 new_batch
450 .column(0)
451 .as_any()
452 .downcast_ref::<Int32Array>()
453 .unwrap()
454 .value(0),
455 10
456 );
457 assert_eq!(
458 new_batch
459 .column(1)
460 .as_any()
461 .downcast_ref::<StringArray>()
462 .unwrap()
463 .value(0),
464 "x"
465 );
466 }
467
468 #[test]
469 fn test_filter() {
470 let record_batch = create_test_record_batch();
471 let plain_batch = PlainBatch::new(record_batch);
472
473 let predicate = BooleanArray::from(vec![true, false, true]);
475
476 let filtered_batch = plain_batch.filter(&predicate).unwrap();
477
478 assert_eq!(filtered_batch.num_rows(), 2);
479 assert_eq!(
480 filtered_batch
481 .column(0)
482 .as_any()
483 .downcast_ref::<Int32Array>()
484 .unwrap()
485 .value(0),
486 1
487 );
488 assert_eq!(
489 filtered_batch
490 .column(0)
491 .as_any()
492 .downcast_ref::<Int32Array>()
493 .unwrap()
494 .value(1),
495 3
496 );
497 }
498}