1use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_time::range::TimestampRange;
22use datatypes::arrow::array::{Array, AsArray, DictionaryArray};
23use datatypes::arrow::datatypes::UInt32Type;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use store_api::region_engine::PartitionRange;
28use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
29
30use crate::cache::CacheStrategy;
31use crate::read::BoxedRecordBatchStream;
32use crate::read::scan_region::StreamContext;
33use crate::read::scan_util::PartitionMetrics;
34use crate::region::options::MergeMode;
35use crate::sst::file::FileTimeRange;
36use crate::sst::parquet::flat_format::primary_key_column_index;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub(crate) struct ScanRequestFingerprint {
46 inner: Arc<SharedScanRequestFingerprint>,
48 time_filters: Option<Arc<Vec<String>>>,
50 series_row_selector: Option<TimeSeriesRowSelector>,
51 append_mode: bool,
52 filter_deleted: bool,
53 merge_mode: MergeMode,
54 partition_expr_version: u64,
57}
58
59#[derive(Debug)]
60pub(crate) struct ScanRequestFingerprintBuilder {
61 pub(crate) read_column_ids: Vec<ColumnId>,
62 pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
63 pub(crate) filters: Vec<String>,
64 pub(crate) time_filters: Vec<String>,
65 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
66 pub(crate) append_mode: bool,
67 pub(crate) filter_deleted: bool,
68 pub(crate) merge_mode: MergeMode,
69 pub(crate) partition_expr_version: u64,
70}
71
72impl ScanRequestFingerprintBuilder {
73 pub(crate) fn build(self) -> ScanRequestFingerprint {
74 let Self {
75 read_column_ids,
76 read_column_types,
77 filters,
78 time_filters,
79 series_row_selector,
80 append_mode,
81 filter_deleted,
82 merge_mode,
83 partition_expr_version,
84 } = self;
85
86 ScanRequestFingerprint {
87 inner: Arc::new(SharedScanRequestFingerprint {
88 read_column_ids,
89 read_column_types,
90 filters,
91 }),
92 time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
93 series_row_selector,
94 append_mode,
95 filter_deleted,
96 merge_mode,
97 partition_expr_version,
98 }
99 }
100}
101
102#[derive(Debug, PartialEq, Eq, Hash)]
104struct SharedScanRequestFingerprint {
105 read_column_ids: Vec<ColumnId>,
107 read_column_types: Vec<Option<ConcreteDataType>>,
110 filters: Vec<String>,
112}
113
114impl ScanRequestFingerprint {
115 #[cfg(test)]
116 pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
117 &self.inner.read_column_ids
118 }
119
120 #[cfg(test)]
121 pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
122 &self.inner.read_column_types
123 }
124
125 #[cfg(test)]
126 pub(crate) fn filters(&self) -> &[String] {
127 &self.inner.filters
128 }
129
130 #[cfg(test)]
131 pub(crate) fn time_filters(&self) -> &[String] {
132 self.time_filters
133 .as_deref()
134 .map(Vec::as_slice)
135 .unwrap_or(&[])
136 }
137
138 pub(crate) fn without_time_filters(&self) -> Self {
139 Self {
140 inner: Arc::clone(&self.inner),
141 time_filters: None,
142 series_row_selector: self.series_row_selector,
143 append_mode: self.append_mode,
144 filter_deleted: self.filter_deleted,
145 merge_mode: self.merge_mode,
146 partition_expr_version: self.partition_expr_version,
147 }
148 }
149
150 pub(crate) fn estimated_size(&self) -> usize {
151 mem::size_of::<SharedScanRequestFingerprint>()
152 + self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
153 + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
154 + self.inner.filters.capacity() * mem::size_of::<String>()
155 + self
156 .inner
157 .filters
158 .iter()
159 .map(|filter| filter.capacity())
160 .sum::<usize>()
161 + self.time_filters.as_ref().map_or(0, |filters| {
162 mem::size_of::<Vec<String>>()
163 + filters.capacity() * mem::size_of::<String>()
164 + filters
165 .iter()
166 .map(|filter| filter.capacity())
167 .sum::<usize>()
168 })
169 }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
174pub(crate) struct RangeScanCacheKey {
175 pub(crate) region_id: RegionId,
176 pub(crate) row_groups: Vec<(FileId, i64)>,
178 pub(crate) scan: ScanRequestFingerprint,
179}
180
181impl RangeScanCacheKey {
182 pub(crate) fn estimated_size(&self) -> usize {
183 mem::size_of::<Self>()
184 + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
185 + self.scan.estimated_size()
186 }
187}
188
189pub(crate) struct RangeScanCacheValue {
191 pub(crate) batches: Vec<RecordBatch>,
192 estimated_batches_size: usize,
194}
195
196impl RangeScanCacheValue {
197 pub(crate) fn new(batches: Vec<RecordBatch>, estimated_batches_size: usize) -> Self {
198 Self {
199 batches,
200 estimated_batches_size,
201 }
202 }
203
204 pub(crate) fn estimated_size(&self) -> usize {
205 mem::size_of::<Self>()
206 + self.batches.capacity() * mem::size_of::<RecordBatch>()
207 + self.estimated_batches_size
208 }
209}
210
211#[allow(dead_code)]
213pub(crate) struct PartitionRangeRowGroups {
214 pub(crate) row_groups: Vec<(FileId, i64)>,
216 pub(crate) only_file_sources: bool,
217}
218
219#[allow(dead_code)]
221pub(crate) fn collect_partition_range_row_groups(
222 stream_ctx: &StreamContext,
223 part_range: &PartitionRange,
224) -> PartitionRangeRowGroups {
225 let range_meta = &stream_ctx.ranges[part_range.identifier];
226 let mut row_groups = Vec::new();
227 let mut only_file_sources = true;
228
229 for index in &range_meta.row_group_indices {
230 if stream_ctx.is_file_range_index(*index) {
231 let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
232 row_groups.push((file_id, index.row_group_index));
233 } else {
234 only_file_sources = false;
235 }
236 }
237
238 row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
239
240 PartitionRangeRowGroups {
241 row_groups,
242 only_file_sources,
243 }
244}
245
246#[allow(dead_code)]
248pub(crate) fn build_range_cache_key(
249 stream_ctx: &StreamContext,
250 part_range: &PartitionRange,
251) -> Option<RangeScanCacheKey> {
252 let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
253
254 let has_dyn_filters = stream_ctx
256 .input
257 .predicate_group()
258 .predicate_without_region()
259 .is_some_and(|p| !p.dyn_filters().is_empty());
260 if has_dyn_filters {
261 return None;
262 }
263
264 let rg = collect_partition_range_row_groups(stream_ctx, part_range);
265 if !rg.only_file_sources || rg.row_groups.is_empty() {
266 return None;
267 }
268
269 let range_meta = &stream_ctx.ranges[part_range.identifier];
270 let scan = if query_time_range_covers_partition_range(
271 stream_ctx.input.time_range.as_ref(),
272 range_meta.time_range,
273 ) {
274 fingerprint.without_time_filters()
275 } else {
276 fingerprint.clone()
277 };
278
279 Some(RangeScanCacheKey {
280 region_id: stream_ctx.input.region_metadata().region_id,
281 row_groups: rg.row_groups,
282 scan,
283 })
284}
285
286#[allow(dead_code)]
287fn query_time_range_covers_partition_range(
288 query_time_range: Option<&TimestampRange>,
289 partition_time_range: FileTimeRange,
290) -> bool {
291 let Some(query_time_range) = query_time_range else {
292 return true;
293 };
294
295 let (part_start, part_end) = partition_time_range;
296 query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
297}
298
299#[allow(dead_code)]
301pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
302 Box::pin(futures::stream::iter(
303 value.batches.clone().into_iter().map(Ok),
304 ))
305}
306
307fn pk_values_ptr_eq(a: &DictionaryArray<UInt32Type>, b: &DictionaryArray<UInt32Type>) -> bool {
312 let a = a.values().as_binary::<i32>();
313 let b = b.values().as_binary::<i32>();
314 let values_eq = a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets());
315 match (a.nulls(), b.nulls()) {
316 (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()),
317 (None, None) => values_eq,
318 _ => false,
319 }
320}
321
322struct CacheBatchBuffer {
329 batches: Vec<RecordBatch>,
330 total_size: usize,
332 first_pk_dict: Option<DictionaryArray<UInt32Type>>,
335 total_dict_values_size: usize,
337 shared: bool,
339}
340
341impl CacheBatchBuffer {
342 fn new() -> Self {
343 Self {
344 batches: Vec::new(),
345 total_size: 0,
346 first_pk_dict: None,
347 total_dict_values_size: 0,
348 shared: true,
349 }
350 }
351
352 fn push(&mut self, batch: RecordBatch) {
353 if self.batches.is_empty() {
354 self.init_first_batch(&batch);
355 } else {
356 self.add_subsequent_batch(&batch);
357 }
358 self.batches.push(batch);
359 }
360
361 fn init_first_batch(&mut self, batch: &RecordBatch) {
362 self.total_size += batch.get_array_memory_size();
363
364 let pk_col_idx = primary_key_column_index(batch.num_columns());
365 let mut total_dict_values_size = 0;
366 for col_idx in 0..batch.num_columns() {
367 let col = batch.column(col_idx);
368 if let Some(dict) = col.as_any().downcast_ref::<DictionaryArray<UInt32Type>>() {
369 total_dict_values_size += dict.values().get_array_memory_size();
370 if col_idx == pk_col_idx {
371 self.first_pk_dict = Some(dict.clone());
372 }
373 }
374 }
375 self.total_dict_values_size = total_dict_values_size;
376 }
377
378 fn add_subsequent_batch(&mut self, batch: &RecordBatch) {
379 let batch_size = batch.get_array_memory_size();
380
381 if self.shared
382 && let Some(first_pk_dict) = &self.first_pk_dict
383 {
384 let pk_col_idx = primary_key_column_index(batch.num_columns());
385 let col = batch.column(pk_col_idx);
386 if let Some(dict) = col.as_any().downcast_ref::<DictionaryArray<UInt32Type>>()
387 && pk_values_ptr_eq(first_pk_dict, dict)
388 {
389 self.total_size += batch_size - self.total_dict_values_size;
391 return;
392 }
393 self.shared = false;
395 }
396
397 self.total_size += batch_size;
398 }
399
400 fn estimated_batches_size(&self) -> usize {
401 self.total_size
402 }
403
404 fn into_batches(self) -> Vec<RecordBatch> {
405 self.batches
406 }
407}
408
409#[allow(dead_code)]
411pub(crate) fn cache_flat_range_stream(
412 mut stream: BoxedRecordBatchStream,
413 cache_strategy: CacheStrategy,
414 key: RangeScanCacheKey,
415 part_metrics: PartitionMetrics,
416) -> BoxedRecordBatchStream {
417 Box::pin(try_stream! {
418 let mut buffer = CacheBatchBuffer::new();
419 while let Some(batch) = stream.try_next().await? {
420 buffer.push(batch.clone());
421 yield batch;
422 }
423
424 let estimated_size = buffer.estimated_batches_size();
425 let batches = buffer.into_batches();
426 let value = Arc::new(RangeScanCacheValue::new(batches, estimated_size));
427 part_metrics.inc_range_cache_size(key.estimated_size() + value.estimated_size());
428 cache_strategy.put_range_result(key, value);
429 })
430}
431
432#[cfg(feature = "test")]
437pub fn bench_cache_flat_range_stream(
438 stream: BoxedRecordBatchStream,
439 cache_size_bytes: u64,
440 region_id: RegionId,
441) -> BoxedRecordBatchStream {
442 use std::time::Instant;
443
444 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
445
446 use crate::region::options::MergeMode;
447
448 let cache_manager = Arc::new(
449 crate::cache::CacheManager::builder()
450 .range_result_cache_size(cache_size_bytes)
451 .build(),
452 );
453 let cache_strategy = CacheStrategy::EnableAll(cache_manager);
454
455 let fingerprint = ScanRequestFingerprintBuilder {
456 read_column_ids: vec![],
457 read_column_types: vec![],
458 filters: vec![],
459 time_filters: vec![],
460 series_row_selector: None,
461 append_mode: false,
462 filter_deleted: false,
463 merge_mode: MergeMode::LastRow,
464 partition_expr_version: 0,
465 }
466 .build();
467
468 let key = RangeScanCacheKey {
469 region_id,
470 row_groups: vec![],
471 scan: fingerprint,
472 };
473
474 let metrics_set = ExecutionPlanMetricsSet::new();
475 let part_metrics =
476 PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
477
478 cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
479}
480
481#[cfg(test)]
482mod tests {
483 use std::sync::Arc;
484 use std::time::Instant;
485
486 use common_time::Timestamp;
487 use common_time::range::TimestampRange;
488 use common_time::timestamp::TimeUnit;
489 use datafusion_common::ScalarValue;
490 use datafusion_expr::{Expr, col, lit};
491 use smallvec::smallvec;
492 use store_api::storage::FileId;
493
494 use super::*;
495 use crate::cache::CacheManager;
496 use crate::read::projection::ProjectionMapper;
497 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
498 use crate::read::scan_region::{PredicateGroup, ScanInput};
499 use crate::test_util::memtable_util::metadata_with_primary_key;
500 use crate::test_util::scheduler_util::SchedulerEnv;
501 use crate::test_util::sst_util::sst_file_handle_with_file_id;
502
503 fn test_cache_strategy() -> CacheStrategy {
504 CacheStrategy::EnableAll(Arc::new(
505 CacheManager::builder()
506 .range_result_cache_size(1024)
507 .build(),
508 ))
509 }
510
511 async fn new_stream_context(
512 filters: Vec<Expr>,
513 query_time_range: Option<TimestampRange>,
514 partition_time_range: FileTimeRange,
515 ) -> (StreamContext, PartitionRange) {
516 let env = SchedulerEnv::new().await;
517 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
518 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
519 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
520 let file_id = FileId::random();
521 let file = sst_file_handle_with_file_id(
522 file_id,
523 partition_time_range.0.value(),
524 partition_time_range.1.value(),
525 );
526 let input = ScanInput::new(env.access_layer.clone(), mapper)
527 .with_predicate(predicate)
528 .with_time_range(query_time_range)
529 .with_files(vec![file])
530 .with_cache(test_cache_strategy())
531 .with_flat_format(true);
532 let range_meta = RangeMeta {
533 time_range: partition_time_range,
534 indices: smallvec![SourceIndex {
535 index: 0,
536 num_row_groups: 1,
537 }],
538 row_group_indices: smallvec![RowGroupIndex {
539 index: 0,
540 row_group_index: 0,
541 }],
542 num_rows: 10,
543 };
544 let partition_range = range_meta.new_partition_range(0);
545 let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
546 let stream_ctx = StreamContext {
547 input,
548 ranges: vec![range_meta],
549 scan_fingerprint,
550 query_start: Instant::now(),
551 };
552
553 (stream_ctx, partition_range)
554 }
555
556 fn ts_lit(val: i64) -> Expr {
558 lit(ScalarValue::TimestampMillisecond(Some(val), None))
559 }
560
561 #[tokio::test]
562 async fn strips_time_only_filters_when_query_covers_partition_range() {
563 let (stream_ctx, part_range) = new_stream_context(
564 vec![
565 col("ts").gt_eq(ts_lit(1000)),
566 col("ts").lt(ts_lit(2001)),
567 col("ts").is_not_null(),
568 col("k0").eq(lit("foo")),
569 ],
570 TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
571 (
572 Timestamp::new_millisecond(1000),
573 Timestamp::new_millisecond(2000),
574 ),
575 )
576 .await;
577
578 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
579
580 assert!(key.scan.time_filters().is_empty());
582 let mut expected_filters = [
584 col("k0").eq(lit("foo")).to_string(),
585 col("ts").is_not_null().to_string(),
586 ];
587 expected_filters.sort_unstable();
588 assert_eq!(key.scan.filters(), expected_filters.as_slice());
589 }
590
591 #[tokio::test]
592 async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
593 let (stream_ctx, part_range) = new_stream_context(
594 vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
595 TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
596 (
597 Timestamp::new_millisecond(1000),
598 Timestamp::new_millisecond(2000),
599 ),
600 )
601 .await;
602
603 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
604
605 assert_eq!(
607 key.scan.time_filters(),
608 [col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice()
609 );
610 assert_eq!(
611 key.scan.filters(),
612 [col("k0").eq(lit("foo")).to_string()].as_slice()
613 );
614 }
615
616 #[tokio::test]
617 async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
618 let (stream_ctx, part_range) = new_stream_context(
619 vec![
620 col("ts").gt_eq(ts_lit(1000)),
621 col("ts").is_not_null(),
622 col("k0").eq(lit("foo")),
623 ],
624 None,
625 (
626 Timestamp::new_millisecond(1000),
627 Timestamp::new_millisecond(2000),
628 ),
629 )
630 .await;
631
632 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
633
634 assert!(key.scan.time_filters().is_empty());
636 let mut expected_filters = [
638 col("k0").eq(lit("foo")).to_string(),
639 col("ts").is_not_null().to_string(),
640 ];
641 expected_filters.sort_unstable();
642 assert_eq!(key.scan.filters(), expected_filters.as_slice());
643 }
644
645 #[test]
646 fn normalizes_and_clears_time_filters() {
647 let normalized = ScanRequestFingerprintBuilder {
648 read_column_ids: vec![1, 2],
649 read_column_types: vec![None, None],
650 filters: vec!["k0 = 'foo'".to_string()],
651 time_filters: vec![],
652 series_row_selector: None,
653 append_mode: false,
654 filter_deleted: true,
655 merge_mode: MergeMode::LastRow,
656 partition_expr_version: 0,
657 }
658 .build();
659
660 assert!(normalized.time_filters().is_empty());
661
662 let fingerprint = ScanRequestFingerprintBuilder {
663 read_column_ids: vec![1, 2],
664 read_column_types: vec![None, None],
665 filters: vec!["k0 = 'foo'".to_string()],
666 time_filters: vec!["ts >= 1000".to_string()],
667 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
668 append_mode: false,
669 filter_deleted: true,
670 merge_mode: MergeMode::LastRow,
671 partition_expr_version: 7,
672 }
673 .build();
674
675 let reset = fingerprint.without_time_filters();
676
677 assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
678 assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
679 assert_eq!(reset.filters(), fingerprint.filters());
680 assert!(reset.time_filters().is_empty());
681 assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
682 assert_eq!(reset.append_mode, fingerprint.append_mode);
683 assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
684 assert_eq!(reset.merge_mode, fingerprint.merge_mode);
685 assert_eq!(
686 reset.partition_expr_version,
687 fingerprint.partition_expr_version
688 );
689 }
690
691 fn dict_test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
696 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
697 Arc::new(Schema::new(vec![
698 Field::new("field0", ArrowDataType::Int64, false),
699 Field::new("field1", ArrowDataType::Int64, false),
700 Field::new(
701 "pk",
702 ArrowDataType::Dictionary(
703 Box::new(ArrowDataType::UInt32),
704 Box::new(ArrowDataType::Binary),
705 ),
706 false,
707 ),
708 Field::new("ts", ArrowDataType::Int64, false),
709 Field::new("seq", ArrowDataType::Int64, false),
710 ]))
711 }
712
713 fn make_dict_batch(
715 schema: Arc<datatypes::arrow::datatypes::Schema>,
716 dict_values: &datatypes::arrow::array::BinaryArray,
717 keys: &[u32],
718 int_values: &[i64],
719 ) -> RecordBatch {
720 use datatypes::arrow::array::{Int64Array, UInt32Array};
721
722 let key_array = UInt32Array::from(keys.to_vec());
723 let dict_array: DictionaryArray<UInt32Type> =
724 DictionaryArray::new(key_array, Arc::new(dict_values.clone()));
725 let int_array = Int64Array::from(int_values.to_vec());
726 let zeros = Int64Array::from(vec![0i64; int_values.len()]);
727 RecordBatch::try_new(
728 schema,
729 vec![
730 Arc::new(zeros.clone()),
731 Arc::new(int_array),
732 Arc::new(dict_array),
733 Arc::new(zeros.clone()),
734 Arc::new(zeros),
735 ],
736 )
737 .unwrap()
738 }
739
740 fn compute_total_dict_values_size(batch: &RecordBatch) -> usize {
742 batch
743 .columns()
744 .iter()
745 .filter_map(|col| {
746 col.as_any()
747 .downcast_ref::<DictionaryArray<UInt32Type>>()
748 .map(|dict| dict.values().get_array_memory_size())
749 })
750 .sum()
751 }
752
753 #[test]
754 fn cache_batch_buffer_empty() {
755 let buffer = CacheBatchBuffer::new();
756 assert_eq!(buffer.estimated_batches_size(), 0);
757 assert!(buffer.into_batches().is_empty());
758 }
759
760 #[test]
761 fn cache_batch_buffer_single_batch() {
762 use datatypes::arrow::array::BinaryArray;
763
764 let schema = dict_test_schema();
765 let dict_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]);
766 let batch = make_dict_batch(schema, &dict_values, &[0, 1, 2], &[10, 20, 30]);
767
768 let full_size = batch.get_array_memory_size();
769
770 let mut buffer = CacheBatchBuffer::new();
771 buffer.push(batch);
772 assert_eq!(buffer.estimated_batches_size(), full_size);
773 assert_eq!(buffer.into_batches().len(), 1);
774 }
775
776 #[test]
777 fn cache_batch_buffer_shared_dictionary() {
778 use datatypes::arrow::array::BinaryArray;
779
780 let schema = dict_test_schema();
781 let dict_values = BinaryArray::from_vec(vec![b"alpha", b"beta", b"gamma"]);
782
783 let batch1 = make_dict_batch(schema.clone(), &dict_values, &[0, 1], &[10, 20]);
785 let batch2 = make_dict_batch(schema, &dict_values, &[1, 2], &[30, 40]);
786
787 let batch1_full = batch1.get_array_memory_size();
788 let batch2_full = batch2.get_array_memory_size();
789
790 let dict_values_size = compute_total_dict_values_size(&batch2);
792
793 let mut buffer = CacheBatchBuffer::new();
794 buffer.push(batch1);
795 buffer.push(batch2);
796
797 assert_eq!(
799 buffer.estimated_batches_size(),
800 batch1_full + batch2_full - dict_values_size
801 );
802 assert_eq!(buffer.into_batches().len(), 2);
803 }
804
805 #[test]
806 fn cache_batch_buffer_non_shared_dictionary() {
807 use datatypes::arrow::array::BinaryArray;
808
809 let schema = dict_test_schema();
810 let dict_values1 = BinaryArray::from_vec(vec![b"a", b"b"]);
811 let dict_values2 = BinaryArray::from_vec(vec![b"x", b"y"]);
812
813 let batch1 = make_dict_batch(schema.clone(), &dict_values1, &[0, 1], &[10, 20]);
814 let batch2 = make_dict_batch(schema, &dict_values2, &[0, 1], &[30, 40]);
815
816 let batch1_full = batch1.get_array_memory_size();
817 let batch2_full = batch2.get_array_memory_size();
818
819 let mut buffer = CacheBatchBuffer::new();
820 buffer.push(batch1);
821 buffer.push(batch2);
822
823 assert_eq!(buffer.estimated_batches_size(), batch1_full + batch2_full);
825 }
826
827 #[test]
828 fn cache_batch_buffer_shared_then_diverged() {
829 use datatypes::arrow::array::BinaryArray;
830
831 let schema = dict_test_schema();
832 let shared_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]);
833 let different_values = BinaryArray::from_vec(vec![b"x", b"y"]);
834
835 let batch1 = make_dict_batch(schema.clone(), &shared_values, &[0], &[1]);
836 let batch2 = make_dict_batch(schema.clone(), &shared_values, &[1], &[2]);
837 let batch3 = make_dict_batch(schema, &different_values, &[0], &[3]);
838
839 let size1 = batch1.get_array_memory_size();
840 let size2 = batch2.get_array_memory_size();
841 let size3 = batch3.get_array_memory_size();
842
843 let dict_values_size = compute_total_dict_values_size(&batch2);
844
845 let mut buffer = CacheBatchBuffer::new();
846 buffer.push(batch1);
847 buffer.push(batch2);
848 buffer.push(batch3);
849
850 assert_eq!(
852 buffer.estimated_batches_size(),
853 size1 + (size2 - dict_values_size) + size3
854 );
855 }
856}