1use std::collections::VecDeque;
16use std::ops::BitAnd;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array};
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
24use parquet::arrow::ProjectionMask;
25use parquet::file::metadata::ParquetMetaData;
26use snafu::ResultExt;
27use store_api::storage::SequenceNumber;
28
29use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
30use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
31use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext};
34
35pub struct EncodedBulkPartIter {
37 context: BulkIterContextRef,
38 row_groups_to_read: VecDeque<usize>,
39 current_reader: Option<ParquetRecordBatchReader>,
40 builder: MemtableRowGroupReaderBuilder,
41 sequence: Option<Scalar<UInt64Array>>,
43}
44
45impl EncodedBulkPartIter {
46 pub(crate) fn try_new(
48 context: BulkIterContextRef,
49 mut row_groups_to_read: VecDeque<usize>,
50 parquet_meta: Arc<ParquetMetaData>,
51 data: Bytes,
52 sequence: Option<SequenceNumber>,
53 ) -> error::Result<Self> {
54 assert!(context.read_format().as_flat().is_some());
55
56 let sequence = sequence.map(UInt64Array::new_scalar);
57
58 let projection_mask = ProjectionMask::roots(
59 parquet_meta.file_metadata().schema_descr(),
60 context.read_format().projection_indices().iter().copied(),
61 );
62 let builder =
63 MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
64
65 let init_reader = row_groups_to_read
66 .pop_front()
67 .map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
68 .transpose()?;
69 Ok(Self {
70 context,
71 row_groups_to_read,
72 current_reader: init_reader,
73 builder,
74 sequence,
75 })
76 }
77
78 pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
80 let Some(current) = &mut self.current_reader else {
81 return Ok(None);
83 };
84
85 for batch in current {
86 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
87 if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
88 return Ok(Some(batch));
89 }
90 }
91
92 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
94 let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
95 let current = self.current_reader.insert(next_reader);
96
97 for batch in current {
98 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
99 if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
100 return Ok(Some(batch));
101 }
102 }
103 }
104
105 Ok(None)
106 }
107}
108
109impl Iterator for EncodedBulkPartIter {
110 type Item = error::Result<RecordBatch>;
111
112 fn next(&mut self) -> Option<Self::Item> {
113 self.next_record_batch().transpose()
114 }
115}
116
117pub struct BulkPartRecordBatchIter {
119 record_batch: Option<RecordBatch>,
121 context: BulkIterContextRef,
123 sequence: Option<Scalar<UInt64Array>>,
125}
126
127impl BulkPartRecordBatchIter {
128 pub fn new(
130 record_batch: RecordBatch,
131 context: BulkIterContextRef,
132 sequence: Option<SequenceNumber>,
133 ) -> Self {
134 assert!(context.read_format().as_flat().is_some());
135
136 let sequence = sequence.map(UInt64Array::new_scalar);
137
138 Self {
139 record_batch: Some(record_batch),
140 context,
141 sequence,
142 }
143 }
144
145 fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
147 let projection_indices = self.context.read_format().projection_indices();
148 if projection_indices.len() == record_batch.num_columns() {
149 return Ok(record_batch);
150 }
151
152 record_batch
153 .project(projection_indices)
154 .context(ComputeArrowSnafu)
155 }
156
157 fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
158 let projected_batch = self.apply_projection(record_batch)?;
160 let Some(filtered_batch) =
162 apply_combined_filters(&self.context, &self.sequence, projected_batch)?
163 else {
164 return Ok(None);
165 };
166
167 Ok(Some(filtered_batch))
168 }
169}
170
171impl Iterator for BulkPartRecordBatchIter {
172 type Item = error::Result<RecordBatch>;
173
174 fn next(&mut self) -> Option<Self::Item> {
175 let record_batch = self.record_batch.take()?;
176
177 self.process_batch(record_batch).transpose()
178 }
179}
180
181fn apply_combined_filters(
185 context: &BulkIterContext,
186 sequence: &Option<Scalar<UInt64Array>>,
187 record_batch: RecordBatch,
188) -> error::Result<Option<RecordBatch>> {
189 let num_rows = record_batch.num_rows();
190 let mut combined_filter = None;
191
192 if !context.base.filters.is_empty() {
194 let num_rows = record_batch.num_rows();
195 let mut mask = BooleanBuffer::new_set(num_rows);
196
197 for filter_ctx in &context.base.filters {
199 let filter = match filter_ctx.filter() {
200 MaybeFilter::Filter(f) => f,
201 MaybeFilter::Matched => continue,
203 MaybeFilter::Pruned => return Ok(None),
205 };
206
207 let Some(column_index) = context
209 .read_format()
210 .as_flat()
211 .unwrap()
212 .projected_index_by_id(filter_ctx.column_id())
213 else {
214 continue;
215 };
216 let array = record_batch.column(column_index);
217 let result = filter
218 .evaluate_array(array)
219 .context(crate::error::RecordBatchSnafu)?;
220
221 mask = mask.bitand(&result);
222 }
223 combined_filter = Some(BooleanArray::from(mask));
225 }
226
227 if let Some(sequence) = sequence {
229 let sequence_column =
230 record_batch.column(sequence_column_index(record_batch.num_columns()));
231 let sequence_filter =
232 datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence)
233 .context(ComputeArrowSnafu)?;
234 combined_filter = match combined_filter {
236 None => Some(sequence_filter),
237 Some(existing_filter) => {
238 let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
239 .context(ComputeArrowSnafu)?;
240 Some(and_result)
241 }
242 };
243 }
244
245 let Some(filter_array) = combined_filter else {
247 return Ok(Some(record_batch));
249 };
250 let select_count = filter_array.true_count();
251 if select_count == 0 {
252 return Ok(None);
253 }
254 if select_count == num_rows {
255 return Ok(Some(record_batch));
256 }
257 let filtered_batch =
258 datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
259 .context(ComputeArrowSnafu)?;
260
261 Ok(Some(filtered_batch))
262}
263
264#[cfg(test)]
265mod tests {
266 use std::sync::Arc;
267
268 use api::v1::SemanticType;
269 use datafusion_expr::{col, lit};
270 use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, UInt64Array, UInt8Array};
271 use datatypes::arrow::datatypes::{DataType, Field, Schema};
272 use datatypes::data_type::ConcreteDataType;
273 use datatypes::schema::ColumnSchema;
274 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
275 use store_api::storage::RegionId;
276 use table::predicate::Predicate;
277
278 use super::*;
279 use crate::memtable::bulk::context::BulkIterContext;
280
281 #[test]
282 fn test_bulk_part_record_batch_iter() {
283 let schema = Arc::new(Schema::new(vec![
285 Field::new("key1", DataType::Utf8, false),
286 Field::new("field1", DataType::Int64, false),
287 Field::new(
288 "timestamp",
289 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
290 false,
291 ),
292 Field::new(
293 "__primary_key",
294 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
295 false,
296 ),
297 Field::new("__sequence", DataType::UInt64, false),
298 Field::new("__op_type", DataType::UInt8, false),
299 ]));
300
301 let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
303 let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
304 let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
305 vec![1000, 2000, 3000],
306 ));
307
308 use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
310 let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
311 let keys = UInt32Array::from(vec![0, 1, 2]);
312 let primary_key = Arc::new(DictionaryArray::new(keys, values));
313
314 let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
315 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); let record_batch = RecordBatch::try_new(
318 schema,
319 vec![
320 key1,
321 field1,
322 timestamp,
323 primary_key.clone(),
324 sequence,
325 op_type,
326 ],
327 )
328 .unwrap();
329
330 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
332 builder
333 .push_column_metadata(ColumnMetadata {
334 column_schema: ColumnSchema::new(
335 "key1",
336 ConcreteDataType::string_datatype(),
337 false,
338 ),
339 semantic_type: SemanticType::Tag,
340 column_id: 0,
341 })
342 .push_column_metadata(ColumnMetadata {
343 column_schema: ColumnSchema::new(
344 "field1",
345 ConcreteDataType::int64_datatype(),
346 false,
347 ),
348 semantic_type: SemanticType::Field,
349 column_id: 1,
350 })
351 .push_column_metadata(ColumnMetadata {
352 column_schema: ColumnSchema::new(
353 "timestamp",
354 ConcreteDataType::timestamp_millisecond_datatype(),
355 false,
356 ),
357 semantic_type: SemanticType::Timestamp,
358 column_id: 2,
359 })
360 .primary_key(vec![0]);
361
362 let region_metadata = builder.build().unwrap();
363
364 let context = Arc::new(BulkIterContext::new(
366 Arc::new(region_metadata.clone()),
367 &None, None, ));
370 let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
372 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
373 assert_eq!(1, result.len());
374 assert_eq!(3, result[0].num_rows());
375 assert_eq!(6, result[0].num_columns(),);
376
377 let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context, Some(2));
379 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
380 assert_eq!(1, result.len());
381 let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
382 assert_eq!(
383 &expect_sequence,
384 result[0].column(result[0].num_columns() - 2)
385 );
386 assert_eq!(6, result[0].num_columns());
387
388 let context = Arc::new(BulkIterContext::new(
389 Arc::new(region_metadata),
390 &Some(&[0, 2]),
391 Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
392 ));
393 let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
395 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
396 assert_eq!(1, result.len());
397 assert_eq!(1, result[0].num_rows());
398 assert_eq!(5, result[0].num_columns());
399 let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
400 assert_eq!(
401 &expect_sequence,
402 result[0].column(result[0].num_columns() - 2)
403 );
404 }
405}