1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, BoxedRecordBatchIterator,
55 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57 read_column_ids_from_projection,
58};
59use crate::metrics::{
60 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
61 READ_STAGE_ELAPSED,
62};
63use crate::read::dedup::LastNonNullIter;
64use crate::read::prune::PruneTimeIterator;
65use crate::read::scan_region::PredicateGroup;
66use crate::read::{Batch, BatchBuilder, BatchColumn};
67use crate::region::options::MergeMode;
68
69const INITIAL_BUILDER_CAPACITY: usize = 4;
71
72const BUILDER_CAPACITY: usize = 512;
74
75#[derive(Debug, Default)]
77pub struct TimeSeriesMemtableBuilder {
78 write_buffer_manager: Option<WriteBufferManagerRef>,
79 dedup: bool,
80 merge_mode: MergeMode,
81}
82
83impl TimeSeriesMemtableBuilder {
84 pub fn new(
86 write_buffer_manager: Option<WriteBufferManagerRef>,
87 dedup: bool,
88 merge_mode: MergeMode,
89 ) -> Self {
90 Self {
91 write_buffer_manager,
92 dedup,
93 merge_mode,
94 }
95 }
96}
97
98impl MemtableBuilder for TimeSeriesMemtableBuilder {
99 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
100 if metadata.primary_key.is_empty() {
101 Arc::new(SimpleBulkMemtable::new(
102 id,
103 metadata.clone(),
104 self.write_buffer_manager.clone(),
105 self.dedup,
106 self.merge_mode,
107 ))
108 } else {
109 Arc::new(TimeSeriesMemtable::new(
110 metadata.clone(),
111 id,
112 self.write_buffer_manager.clone(),
113 self.dedup,
114 self.merge_mode,
115 ))
116 }
117 }
118
119 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
120 false
123 }
124}
125
126pub struct TimeSeriesMemtable {
128 id: MemtableId,
129 region_metadata: RegionMetadataRef,
130 row_codec: Arc<DensePrimaryKeyCodec>,
131 series_set: SeriesSet,
132 alloc_tracker: AllocTracker,
133 max_timestamp: AtomicI64,
134 min_timestamp: AtomicI64,
135 max_sequence: AtomicU64,
136 dedup: bool,
137 merge_mode: MergeMode,
138 num_rows: AtomicUsize,
140}
141
142impl TimeSeriesMemtable {
143 pub fn new(
144 region_metadata: RegionMetadataRef,
145 id: MemtableId,
146 write_buffer_manager: Option<WriteBufferManagerRef>,
147 dedup: bool,
148 merge_mode: MergeMode,
149 ) -> Self {
150 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
151 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
152 Self {
153 id,
154 region_metadata,
155 series_set,
156 row_codec,
157 alloc_tracker: AllocTracker::new(write_buffer_manager),
158 max_timestamp: AtomicI64::new(i64::MIN),
159 min_timestamp: AtomicI64::new(i64::MAX),
160 max_sequence: AtomicU64::new(0),
161 dedup,
162 merge_mode,
163 num_rows: Default::default(),
164 }
165 }
166
167 fn update_stats(&self, stats: WriteMetrics) {
169 self.alloc_tracker
170 .on_allocation(stats.key_bytes + stats.value_bytes);
171 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
172 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
173 self.max_sequence
174 .fetch_max(stats.max_sequence, Ordering::SeqCst);
175 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
176 }
177
178 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
179 ensure!(
180 self.row_codec.num_fields() == kv.num_primary_keys(),
181 PrimaryKeyLengthMismatchSnafu {
182 expect: self.row_codec.num_fields(),
183 actual: kv.num_primary_keys(),
184 }
185 );
186
187 let primary_key_encoded = self
188 .row_codec
189 .encode(kv.primary_keys())
190 .context(EncodeSnafu)?;
191
192 let (key_allocated, value_allocated) =
193 self.series_set.push_to_series(primary_key_encoded, &kv);
194 stats.key_bytes += key_allocated;
195 stats.value_bytes += value_allocated;
196
197 let ts = kv
199 .timestamp()
200 .try_into_timestamp()
201 .unwrap()
202 .unwrap()
203 .value();
204 stats.min_ts = stats.min_ts.min(ts);
205 stats.max_ts = stats.max_ts.max(ts);
206 Ok(())
207 }
208}
209
210impl Debug for TimeSeriesMemtable {
211 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212 f.debug_struct("TimeSeriesMemtable").finish()
213 }
214}
215
216impl Memtable for TimeSeriesMemtable {
217 fn id(&self) -> MemtableId {
218 self.id
219 }
220
221 fn write(&self, kvs: &KeyValues) -> Result<()> {
222 if kvs.is_empty() {
223 return Ok(());
224 }
225
226 let mut local_stats = WriteMetrics::default();
227
228 for kv in kvs.iter() {
229 self.write_key_value(kv, &mut local_stats)?;
230 }
231 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
232 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
233 local_stats.max_sequence = kvs.max_sequence();
234 local_stats.num_rows = kvs.num_rows();
235 self.update_stats(local_stats);
239 Ok(())
240 }
241
242 fn write_one(&self, key_value: KeyValue) -> Result<()> {
243 let mut metrics = WriteMetrics::default();
244 let res = self.write_key_value(key_value, &mut metrics);
245 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
246 metrics.max_sequence = key_value.sequence();
247 metrics.num_rows = 1;
248
249 if res.is_ok() {
250 self.update_stats(metrics);
251 }
252 res
253 }
254
255 fn write_bulk(&self, part: BulkPart) -> Result<()> {
256 let mutation = part.to_mutation(&self.region_metadata)?;
258 let mut metrics = WriteMetrics::default();
259 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
260 for kv in key_values.iter() {
261 self.write_key_value(kv, &mut metrics)?
262 }
263 }
264
265 metrics.max_sequence = part.sequence;
266 metrics.max_ts = part.max_timestamp;
267 metrics.min_ts = part.min_timestamp;
268 metrics.num_rows = part.num_rows();
269 self.update_stats(metrics);
270 Ok(())
271 }
272
273 fn ranges(
274 &self,
275 projection: Option<&[ColumnId]>,
276 options: RangesOptions,
277 ) -> Result<MemtableRanges> {
278 let predicate = options.predicate;
279 let sequence = options.sequence;
280 let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
281 let projection = if let Some(projection) = projection {
282 projection.iter().copied().collect()
283 } else {
284 self.region_metadata
285 .field_columns()
286 .map(|c| c.column_id)
287 .collect()
288 };
289 let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
290 self.region_metadata.clone(),
291 read_column_ids,
292 ));
293 let builder = Box::new(TimeSeriesIterBuilder {
294 series_set: self.series_set.clone(),
295 projection,
296 predicate: predicate.clone(),
297 dedup: self.dedup,
298 merge_mode: self.merge_mode,
299 sequence,
300 batch_to_record_batch,
301 });
302 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
303 let range_stats = self.stats();
304 let range = MemtableRange::new(context, range_stats);
305 Ok(MemtableRanges {
306 ranges: [(0, range)].into(),
307 })
308 }
309
310 fn is_empty(&self) -> bool {
311 self.series_set.series.read().unwrap().0.is_empty()
312 }
313
314 fn freeze(&self) -> Result<()> {
315 self.alloc_tracker.done_allocating();
316
317 Ok(())
318 }
319
320 fn stats(&self) -> MemtableStats {
321 let estimated_bytes = self.alloc_tracker.bytes_allocated();
322
323 if estimated_bytes == 0 {
324 return MemtableStats {
326 estimated_bytes,
327 time_range: None,
328 num_rows: 0,
329 num_ranges: 0,
330 max_sequence: 0,
331 series_count: 0,
332 };
333 }
334 let ts_type = self
335 .region_metadata
336 .time_index_column()
337 .column_schema
338 .data_type
339 .clone()
340 .as_timestamp()
341 .expect("Timestamp column must have timestamp type");
342 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
343 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
344 let series_count = self.series_set.series.read().unwrap().0.len();
345 MemtableStats {
346 estimated_bytes,
347 time_range: Some((min_timestamp, max_timestamp)),
348 num_rows: self.num_rows.load(Ordering::Relaxed),
349 num_ranges: 1,
350 max_sequence: self.max_sequence.load(Ordering::Relaxed),
351 series_count,
352 }
353 }
354
355 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
356 Arc::new(TimeSeriesMemtable::new(
357 metadata.clone(),
358 id,
359 self.alloc_tracker.write_buffer_manager(),
360 self.dedup,
361 self.merge_mode,
362 ))
363 }
364}
365
366#[derive(Default)]
367struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
368
369impl Drop for SeriesMap {
370 fn drop(&mut self) {
371 let num_series = self.0.len();
372 let num_field_builders = self
373 .0
374 .values()
375 .map(|v| v.read().unwrap().active.num_field_builders())
376 .sum::<usize>();
377 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
378 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
379 }
380}
381
382#[derive(Clone)]
383pub(crate) struct SeriesSet {
384 region_metadata: RegionMetadataRef,
385 series: Arc<RwLock<SeriesMap>>,
386 codec: Arc<DensePrimaryKeyCodec>,
387}
388
389impl SeriesSet {
390 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
391 Self {
392 region_metadata,
393 series: Default::default(),
394 codec,
395 }
396 }
397}
398
399impl SeriesSet {
400 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
402 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
403 let value_allocated = series.write().unwrap().push(
404 kv.timestamp(),
405 kv.sequence(),
406 kv.op_type(),
407 kv.fields(),
408 );
409 return (0, value_allocated);
410 };
411
412 let mut indices = self.series.write().unwrap();
413 match indices.0.entry(primary_key) {
414 Entry::Vacant(v) => {
415 let key_len = v.key().len();
416 let mut series = Series::new(&self.region_metadata);
417 let value_allocated =
418 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
419 v.insert(Arc::new(RwLock::new(series)));
420 (key_len, value_allocated)
421 }
422 Entry::Occupied(v) => {
424 let value_allocated = v.get().write().unwrap().push(
425 kv.timestamp(),
426 kv.sequence(),
427 kv.op_type(),
428 kv.fields(),
429 );
430 (0, value_allocated)
431 }
432 }
433 }
434
435 #[cfg(test)]
436 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
437 self.series.read().unwrap().0.get(primary_key).cloned()
438 }
439
440 fn iter_series(
442 &self,
443 projection: HashSet<ColumnId>,
444 predicate: PredicateGroup,
445 dedup: bool,
446 merge_mode: MergeMode,
447 sequence: Option<SequenceRange>,
448 mem_scan_metrics: Option<MemScanMetrics>,
449 ) -> Result<Iter> {
450 let primary_key_schema = primary_key_schema(&self.region_metadata);
451 let primary_key_datatypes = self
452 .region_metadata
453 .primary_key_columns()
454 .map(|pk| pk.column_schema.data_type.clone())
455 .collect();
456
457 Iter::try_new(
458 self.region_metadata.clone(),
459 self.series.clone(),
460 projection,
461 predicate.predicate().cloned(),
462 primary_key_schema,
463 primary_key_datatypes,
464 self.codec.clone(),
465 dedup,
466 merge_mode,
467 sequence,
468 mem_scan_metrics,
469 )
470 }
471}
472
473pub(crate) fn primary_key_schema(
476 region_metadata: &RegionMetadataRef,
477) -> arrow::datatypes::SchemaRef {
478 let fields = region_metadata
479 .primary_key_columns()
480 .map(|pk| {
481 arrow::datatypes::Field::new(
482 pk.column_schema.name.clone(),
483 pk.column_schema.data_type.as_arrow_type(),
484 pk.column_schema.is_nullable(),
485 )
486 })
487 .collect::<Vec<_>>();
488 Arc::new(arrow::datatypes::Schema::new(fields))
489}
490
491#[derive(Debug, Default)]
493struct Metrics {
494 total_series: usize,
496 num_pruned_series: usize,
498 num_rows: usize,
500 num_batches: usize,
502 scan_cost: Duration,
504}
505
506struct Iter {
507 metadata: RegionMetadataRef,
508 series: Arc<RwLock<SeriesMap>>,
509 projection: HashSet<ColumnId>,
510 last_key: Option<Vec<u8>>,
511 predicate: Vec<SimpleFilterEvaluator>,
512 pk_schema: arrow::datatypes::SchemaRef,
513 pk_datatypes: Vec<ConcreteDataType>,
514 codec: Arc<DensePrimaryKeyCodec>,
515 dedup: bool,
516 merge_mode: MergeMode,
517 sequence: Option<SequenceRange>,
518 metrics: Metrics,
519 mem_scan_metrics: Option<MemScanMetrics>,
520}
521
522impl Iter {
523 #[allow(clippy::too_many_arguments)]
524 pub(crate) fn try_new(
525 metadata: RegionMetadataRef,
526 series: Arc<RwLock<SeriesMap>>,
527 projection: HashSet<ColumnId>,
528 predicate: Option<Predicate>,
529 pk_schema: arrow::datatypes::SchemaRef,
530 pk_datatypes: Vec<ConcreteDataType>,
531 codec: Arc<DensePrimaryKeyCodec>,
532 dedup: bool,
533 merge_mode: MergeMode,
534 sequence: Option<SequenceRange>,
535 mem_scan_metrics: Option<MemScanMetrics>,
536 ) -> Result<Self> {
537 let predicate = predicate
538 .map(|predicate| {
539 predicate
540 .exprs()
541 .iter()
542 .filter_map(SimpleFilterEvaluator::try_new)
543 .collect::<Vec<_>>()
544 })
545 .unwrap_or_default();
546 Ok(Self {
547 metadata,
548 series,
549 projection,
550 last_key: None,
551 predicate,
552 pk_schema,
553 pk_datatypes,
554 codec,
555 dedup,
556 merge_mode,
557 sequence,
558 metrics: Metrics::default(),
559 mem_scan_metrics,
560 })
561 }
562
563 fn report_mem_scan_metrics(&mut self) {
564 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
565 let inner = crate::memtable::MemScanMetricsData {
566 total_series: self.metrics.total_series,
567 num_rows: self.metrics.num_rows,
568 num_batches: self.metrics.num_batches,
569 scan_cost: self.metrics.scan_cost,
570 ..Default::default()
571 };
572 mem_scan_metrics.merge_inner(&inner);
573 }
574 }
575}
576
577impl Drop for Iter {
578 fn drop(&mut self) {
579 debug!(
580 "Iter {} time series memtable, metrics: {:?}",
581 self.metadata.region_id, self.metrics
582 );
583
584 self.report_mem_scan_metrics();
586
587 READ_ROWS_TOTAL
588 .with_label_values(&["time_series_memtable"])
589 .inc_by(self.metrics.num_rows as u64);
590 READ_STAGE_ELAPSED
591 .with_label_values(&["scan_memtable"])
592 .observe(self.metrics.scan_cost.as_secs_f64());
593 }
594}
595
596impl Iterator for Iter {
597 type Item = Result<Batch>;
598
599 fn next(&mut self) -> Option<Self::Item> {
600 let start = Instant::now();
601 let map = self.series.read().unwrap();
602 let range = match &self.last_key {
603 None => map.0.range::<Vec<u8>, _>(..),
604 Some(last_key) => map
605 .0
606 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
607 };
608
609 for (primary_key, series) in range {
611 self.metrics.total_series += 1;
612
613 let mut series = series.write().unwrap();
614 if !self.predicate.is_empty()
615 && !prune_primary_key(
616 &self.codec,
617 primary_key.as_slice(),
618 &mut series,
619 &self.pk_datatypes,
620 self.pk_schema.clone(),
621 &self.predicate,
622 )
623 {
624 self.metrics.num_pruned_series += 1;
626 continue;
627 }
628 self.last_key = Some(primary_key.clone());
629
630 let values = series.compact(&self.metadata);
631 let batch = values.and_then(|v| {
632 v.to_batch(
633 primary_key,
634 &self.metadata,
635 &self.projection,
636 self.sequence,
637 self.dedup,
638 self.merge_mode,
639 )
640 });
641
642 self.metrics.num_batches += 1;
644 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
645 self.metrics.scan_cost += start.elapsed();
646
647 return Some(batch);
648 }
649 drop(map); self.metrics.scan_cost += start.elapsed();
651
652 self.report_mem_scan_metrics();
654
655 None
656 }
657}
658
659fn prune_primary_key(
660 codec: &Arc<DensePrimaryKeyCodec>,
661 pk: &[u8],
662 series: &mut Series,
663 datatypes: &[ConcreteDataType],
664 pk_schema: arrow::datatypes::SchemaRef,
665 predicates: &[SimpleFilterEvaluator],
666) -> bool {
667 if pk_schema.fields().is_empty() {
669 return true;
670 }
671
672 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
674 pk_values
675 } else {
676 let pk_values = codec.decode_dense_without_column_id(pk);
677 if let Err(e) = pk_values {
678 error!(e; "Failed to decode primary key");
679 return true;
680 }
681 series.update_pk_cache(pk_values.unwrap());
682 series.pk_cache.as_ref().unwrap()
683 };
684
685 let mut result = true;
687 for predicate in predicates {
688 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
690 continue;
691 };
692 let scalar_value = pk_values[index]
694 .try_to_scalar_value(&datatypes[index])
695 .unwrap();
696 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
697 }
698
699 result
700}
701
702pub struct Series {
704 pk_cache: Option<Vec<Value>>,
705 active: ValueBuilder,
706 frozen: Vec<Values>,
707 region_metadata: RegionMetadataRef,
708 capacity: usize,
709}
710
711impl Series {
712 pub(crate) fn with_capacity(
713 region_metadata: &RegionMetadataRef,
714 init_capacity: usize,
715 capacity: usize,
716 ) -> Self {
717 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
718 Self {
719 pk_cache: None,
720 active: ValueBuilder::new(region_metadata, init_capacity),
721 frozen: vec![],
722 region_metadata: region_metadata.clone(),
723 capacity,
724 }
725 }
726
727 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
728 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
729 }
730
731 pub fn is_empty(&self) -> bool {
732 self.active.len() == 0 && self.frozen.is_empty()
733 }
734
735 pub(crate) fn push<'a>(
737 &mut self,
738 ts: ValueRef<'a>,
739 sequence: u64,
740 op_type: OpType,
741 values: impl Iterator<Item = ValueRef<'a>>,
742 ) -> usize {
743 if self.active.len() + 10 > self.capacity {
745 let region_metadata = self.region_metadata.clone();
746 self.freeze(®ion_metadata);
747 }
748 self.active.push(ts, sequence, op_type as u8, values)
749 }
750
751 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
752 self.pk_cache = Some(pk_values);
753 }
754
755 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
757 if self.active.len() != 0 {
758 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
759 std::mem::swap(&mut self.active, &mut builder);
760 self.frozen.push(Values::from(builder));
761 }
762 }
763
764 pub(crate) fn extend(
765 &mut self,
766 ts_v: VectorRef,
767 op_type_v: u8,
768 sequence_v: u64,
769 fields: Vec<VectorRef>,
770 ) -> Result<()> {
771 if !self.active.can_accommodate(&fields)? {
772 let region_metadata = self.region_metadata.clone();
773 self.freeze(®ion_metadata);
774 }
775 self.active.extend(ts_v, op_type_v, sequence_v, fields)
776 }
777
778 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
781 self.freeze(region_metadata);
782
783 let frozen = &self.frozen;
784
785 debug_assert!(!frozen.is_empty());
787
788 if frozen.len() > 1 {
789 let column_size = frozen[0].fields.len() + 3;
793
794 if cfg!(debug_assertions) {
795 debug_assert!(
796 frozen
797 .iter()
798 .zip(frozen.iter().skip(1))
799 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
800 );
801 }
802
803 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
804 let concatenated = (0..column_size)
805 .map(|i| {
806 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
807 arrow::compute::concat(&to_concat)
808 })
809 .collect::<std::result::Result<Vec<_>, _>>()
810 .context(ComputeArrowSnafu)?;
811
812 debug_assert_eq!(concatenated.len(), column_size);
813 let values = Values::from_columns(&concatenated)?;
814 self.frozen = vec![values];
815 };
816 Ok(&self.frozen[0])
817 }
818
819 pub fn read_to_values(&self) -> Vec<Values> {
820 let mut res = Vec::with_capacity(self.frozen.len() + 1);
821 res.extend(self.frozen.iter().cloned());
822 res.push(self.active.finish_cloned());
823 res
824 }
825}
826
827pub(crate) struct ValueBuilder {
829 timestamp: Vec<i64>,
830 timestamp_type: ConcreteDataType,
831 sequence: Vec<u64>,
832 op_type: Vec<u8>,
833 fields: Vec<Option<FieldBuilder>>,
834 field_types: Vec<ConcreteDataType>,
835}
836
837impl ValueBuilder {
838 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
839 let timestamp_type = region_metadata
840 .time_index_column()
841 .column_schema
842 .data_type
843 .clone();
844 let sequence = Vec::with_capacity(capacity);
845 let op_type = Vec::with_capacity(capacity);
846
847 let field_types = region_metadata
848 .field_columns()
849 .map(|c| c.column_schema.data_type.clone())
850 .collect::<Vec<_>>();
851 let fields = (0..field_types.len()).map(|_| None).collect();
852 Self {
853 timestamp: Vec::with_capacity(capacity),
854 timestamp_type,
855 sequence,
856 op_type,
857 fields,
858 field_types,
859 }
860 }
861
862 pub fn num_field_builders(&self) -> usize {
864 self.fields.iter().flatten().count()
865 }
866
867 pub(crate) fn push<'a>(
873 &mut self,
874 ts: ValueRef,
875 sequence: u64,
876 op_type: u8,
877 fields: impl Iterator<Item = ValueRef<'a>>,
878 ) -> usize {
879 #[cfg(debug_assertions)]
880 let fields = {
881 let field_vec = fields.collect::<Vec<_>>();
882 debug_assert_eq!(field_vec.len(), self.fields.len());
883 field_vec.into_iter()
884 };
885
886 self.timestamp
887 .push(ts.try_into_timestamp().unwrap().unwrap().value());
888 self.sequence.push(sequence);
889 self.op_type.push(op_type);
890 let num_rows = self.timestamp.len();
891 let mut size = 0;
892 for (idx, field_value) in fields.enumerate() {
893 size += field_value.data_size();
894 if !field_value.is_null() || self.fields[idx].is_some() {
895 if let Some(field) = self.fields[idx].as_mut() {
896 let _ = field.push(field_value);
897 } else {
898 let mut mutable_vector =
899 if let ConcreteDataType::String(_) = &self.field_types[idx] {
900 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
901 } else {
902 FieldBuilder::Other(
903 self.field_types[idx]
904 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
905 )
906 };
907 mutable_vector.push_nulls(num_rows - 1);
908 mutable_vector
909 .push(field_value)
910 .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
911 self.fields[idx] = Some(mutable_vector);
912 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
913 }
914 }
915 }
916
917 size
918 }
919
920 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
923 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
924 let Some(builder) = field_dest else {
925 continue;
926 };
927 let FieldBuilder::String(builder) = builder else {
928 continue;
929 };
930 let array = field_src.to_arrow_array();
931 let string_array = array
932 .as_any()
933 .downcast_ref::<StringArray>()
934 .with_context(|| error::InvalidBatchSnafu {
935 reason: format!(
936 "Field type mismatch, expecting String, given: {}",
937 field_src.data_type()
938 ),
939 })?;
940 let space_needed = string_array.value_data().len() as i32;
941 if builder.next_offset().checked_add(space_needed).is_none() {
943 return Ok(false);
944 }
945 }
946 Ok(true)
947 }
948
949 pub(crate) fn extend(
950 &mut self,
951 ts_v: VectorRef,
952 op_type: u8,
953 sequence: u64,
954 fields: Vec<VectorRef>,
955 ) -> Result<()> {
956 let num_rows_before = self.timestamp.len();
957 let num_rows_to_write = ts_v.len();
958 self.timestamp.reserve(num_rows_to_write);
959 match self.timestamp_type {
960 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
961 self.timestamp.extend(
962 ts_v.as_any()
963 .downcast_ref::<TimestampSecondVector>()
964 .unwrap()
965 .iter_data()
966 .map(|v| v.unwrap().0.value()),
967 );
968 }
969 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
970 self.timestamp.extend(
971 ts_v.as_any()
972 .downcast_ref::<TimestampMillisecondVector>()
973 .unwrap()
974 .iter_data()
975 .map(|v| v.unwrap().0.value()),
976 );
977 }
978 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
979 self.timestamp.extend(
980 ts_v.as_any()
981 .downcast_ref::<TimestampMicrosecondVector>()
982 .unwrap()
983 .iter_data()
984 .map(|v| v.unwrap().0.value()),
985 );
986 }
987 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
988 self.timestamp.extend(
989 ts_v.as_any()
990 .downcast_ref::<TimestampNanosecondVector>()
991 .unwrap()
992 .iter_data()
993 .map(|v| v.unwrap().0.value()),
994 );
995 }
996 _ => unreachable!(),
997 };
998
999 self.op_type.reserve(num_rows_to_write);
1000 self.op_type
1001 .extend(iter::repeat_n(op_type, num_rows_to_write));
1002 self.sequence.reserve(num_rows_to_write);
1003 self.sequence
1004 .extend(iter::repeat_n(sequence, num_rows_to_write));
1005
1006 for (field_idx, (field_src, field_dest)) in
1007 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1008 {
1009 let builder = field_dest.get_or_insert_with(|| {
1010 let mut field_builder =
1011 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1012 field_builder.push_nulls(num_rows_before);
1013 field_builder
1014 });
1015 match builder {
1016 FieldBuilder::String(builder) => {
1017 let array = field_src.to_arrow_array();
1018 let string_array =
1019 array
1020 .as_any()
1021 .downcast_ref::<StringArray>()
1022 .with_context(|| error::InvalidBatchSnafu {
1023 reason: format!(
1024 "Field type mismatch, expecting String, given: {}",
1025 field_src.data_type()
1026 ),
1027 })?;
1028 builder.append_array(string_array);
1029 }
1030 FieldBuilder::Other(builder) => {
1031 let len = field_src.len();
1032 builder
1033 .extend_slice_of(&*field_src, 0, len)
1034 .context(error::ComputeVectorSnafu)?;
1035 }
1036 }
1037 }
1038 Ok(())
1039 }
1040
1041 fn len(&self) -> usize {
1043 let sequence_len = self.sequence.len();
1044 debug_assert_eq!(sequence_len, self.op_type.len());
1045 debug_assert_eq!(sequence_len, self.timestamp.len());
1046 sequence_len
1047 }
1048
1049 fn finish_cloned(&self) -> Values {
1050 let num_rows = self.sequence.len();
1051 let fields = self
1052 .fields
1053 .iter()
1054 .enumerate()
1055 .map(|(i, v)| {
1056 if let Some(v) = v {
1057 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1058 v.finish_cloned()
1059 } else {
1060 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1061 single_null.push_nulls(num_rows);
1062 single_null.to_vector()
1063 }
1064 })
1065 .collect::<Vec<_>>();
1066
1067 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1068 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1069 let timestamp: VectorRef = match self.timestamp_type {
1070 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1071 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1072 }
1073 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1074 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1075 }
1076 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1077 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1078 }
1079 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1080 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1081 }
1082 _ => unreachable!(),
1083 };
1084
1085 if cfg!(debug_assertions) {
1086 debug_assert_eq!(timestamp.len(), sequence.len());
1087 debug_assert_eq!(timestamp.len(), op_type.len());
1088 for field in &fields {
1089 debug_assert_eq!(timestamp.len(), field.len());
1090 }
1091 }
1092
1093 Values {
1094 timestamp,
1095 sequence,
1096 op_type,
1097 fields,
1098 }
1099 }
1100}
1101
1102#[derive(Clone)]
1104pub struct Values {
1105 pub(crate) timestamp: VectorRef,
1106 pub(crate) sequence: Arc<UInt64Vector>,
1107 pub(crate) op_type: Arc<UInt8Vector>,
1108 pub(crate) fields: Vec<VectorRef>,
1109}
1110
1111impl Values {
1112 pub fn to_batch(
1115 &self,
1116 primary_key: &[u8],
1117 metadata: &RegionMetadataRef,
1118 projection: &HashSet<ColumnId>,
1119 sequence: Option<SequenceRange>,
1120 dedup: bool,
1121 merge_mode: MergeMode,
1122 ) -> Result<Batch> {
1123 let builder = BatchBuilder::with_required_columns(
1124 primary_key.to_vec(),
1125 self.timestamp.clone(),
1126 self.sequence.clone(),
1127 self.op_type.clone(),
1128 );
1129
1130 let fields = metadata
1131 .field_columns()
1132 .zip(self.fields.iter())
1133 .filter_map(|(c, f)| {
1134 projection.get(&c.column_id).map(|c| BatchColumn {
1135 column_id: *c,
1136 data: f.clone(),
1137 })
1138 })
1139 .collect();
1140
1141 let mut batch = builder.with_fields(fields).build()?;
1142 batch.filter_by_sequence(sequence)?;
1146
1147 match (dedup, merge_mode) {
1148 (false, _) => batch.sort(false)?,
1150 (true, MergeMode::LastRow) => batch.sort(true)?,
1152 (true, MergeMode::LastNonNull) => {
1154 batch.sort(false)?;
1155 batch.merge_last_non_null()?;
1156 }
1157 }
1158 Ok(batch)
1159 }
1160
1161 fn columns(&self) -> Vec<ArrayRef> {
1163 let mut res = Vec::with_capacity(3 + self.fields.len());
1164 res.push(self.timestamp.to_arrow_array());
1165 res.push(self.sequence.to_arrow_array());
1166 res.push(self.op_type.to_arrow_array());
1167 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1168 res
1169 }
1170
1171 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1173 debug_assert!(cols.len() >= 3);
1174 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1175 let sequence =
1176 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1177 let op_type =
1178 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1179 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1180
1181 Ok(Self {
1182 timestamp,
1183 sequence,
1184 op_type,
1185 fields,
1186 })
1187 }
1188}
1189
1190impl From<ValueBuilder> for Values {
1191 fn from(mut value: ValueBuilder) -> Self {
1192 let num_rows = value.len();
1193 let fields = value
1194 .fields
1195 .iter_mut()
1196 .enumerate()
1197 .map(|(i, v)| {
1198 if let Some(v) = v {
1199 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1200 v.finish()
1201 } else {
1202 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1203 single_null.push_nulls(num_rows);
1204 single_null.to_vector()
1205 }
1206 })
1207 .collect::<Vec<_>>();
1208
1209 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1210 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1211 let timestamp: VectorRef = match value.timestamp_type {
1212 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1213 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1214 }
1215 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1216 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1217 }
1218 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1219 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1220 }
1221 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1222 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1223 }
1224 _ => unreachable!(),
1225 };
1226
1227 if cfg!(debug_assertions) {
1228 debug_assert_eq!(timestamp.len(), sequence.len());
1229 debug_assert_eq!(timestamp.len(), op_type.len());
1230 for field in &fields {
1231 debug_assert_eq!(timestamp.len(), field.len());
1232 }
1233 }
1234
1235 Self {
1236 timestamp,
1237 sequence,
1238 op_type,
1239 fields,
1240 }
1241 }
1242}
1243
1244struct TimeSeriesIterBuilder {
1245 series_set: SeriesSet,
1246 projection: HashSet<ColumnId>,
1247 predicate: PredicateGroup,
1248 dedup: bool,
1249 sequence: Option<SequenceRange>,
1250 merge_mode: MergeMode,
1251 batch_to_record_batch: Arc<BatchToRecordBatchContext>,
1252}
1253
1254impl IterBuilder for TimeSeriesIterBuilder {
1255 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1256 let iter = self.series_set.iter_series(
1257 self.projection.clone(),
1258 self.predicate.clone(),
1259 self.dedup,
1260 self.merge_mode,
1261 self.sequence,
1262 metrics,
1263 )?;
1264 if self.merge_mode == MergeMode::LastNonNull {
1265 let iter = LastNonNullIter::new(iter);
1266 Ok(Box::new(iter))
1267 } else {
1268 Ok(Box::new(iter))
1269 }
1270 }
1271
1272 fn is_record_batch(&self) -> bool {
1273 true
1274 }
1275
1276 fn build_record_batch(
1277 &self,
1278 time_range: Option<(Timestamp, Timestamp)>,
1279 metrics: Option<MemScanMetrics>,
1280 ) -> Result<BoxedRecordBatchIterator> {
1281 let iter = self.build(metrics)?;
1282 let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
1283 let time_filters = self.predicate.time_filters();
1284 Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
1285 } else {
1286 iter
1287 };
1288 Ok(self.batch_to_record_batch.adapt_iter(iter))
1289 }
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294 use std::collections::{HashMap, HashSet};
1295
1296 use api::helper::ColumnDataTypeWrapper;
1297 use api::v1::helper::row;
1298 use api::v1::value::ValueData;
1299 use api::v1::{Mutation, Rows, SemanticType};
1300 use common_time::Timestamp;
1301 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1302 use datatypes::schema::ColumnSchema;
1303 use datatypes::value::{OrderedFloat, Value};
1304 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1305 use mito_codec::row_converter::SortField;
1306 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1307 use store_api::storage::RegionId;
1308
1309 use super::*;
1310 use crate::test_util::column_metadata_to_column_schema;
1311
1312 fn schema_for_test() -> RegionMetadataRef {
1313 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1314 builder
1315 .push_column_metadata(ColumnMetadata {
1316 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1317 semantic_type: SemanticType::Tag,
1318 column_id: 0,
1319 })
1320 .push_column_metadata(ColumnMetadata {
1321 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1322 semantic_type: SemanticType::Tag,
1323 column_id: 1,
1324 })
1325 .push_column_metadata(ColumnMetadata {
1326 column_schema: ColumnSchema::new(
1327 "ts",
1328 ConcreteDataType::timestamp_millisecond_datatype(),
1329 false,
1330 ),
1331 semantic_type: SemanticType::Timestamp,
1332 column_id: 2,
1333 })
1334 .push_column_metadata(ColumnMetadata {
1335 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1336 semantic_type: SemanticType::Field,
1337 column_id: 3,
1338 })
1339 .push_column_metadata(ColumnMetadata {
1340 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1341 semantic_type: SemanticType::Field,
1342 column_id: 4,
1343 })
1344 .primary_key(vec![0, 1]);
1345 let region_metadata = builder.build().unwrap();
1346 Arc::new(region_metadata)
1347 }
1348
1349 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1350 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1351 }
1352
1353 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1354 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1355 }
1356
1357 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1358 let ts = values
1359 .timestamp
1360 .as_any()
1361 .downcast_ref::<TimestampMillisecondVector>()
1362 .unwrap();
1363
1364 let v0 = values.fields[0]
1365 .as_any()
1366 .downcast_ref::<Int64Vector>()
1367 .unwrap();
1368 let v1 = values.fields[1]
1369 .as_any()
1370 .downcast_ref::<Float64Vector>()
1371 .unwrap();
1372 let read = ts
1373 .iter_data()
1374 .zip(values.sequence.iter_data())
1375 .zip(values.op_type.iter_data())
1376 .zip(v0.iter_data())
1377 .zip(v1.iter_data())
1378 .map(|((((ts, sequence), op_type), v0), v1)| {
1379 (
1380 ts.unwrap().0.value(),
1381 sequence.unwrap(),
1382 op_type.unwrap(),
1383 v0.unwrap(),
1384 v1.unwrap(),
1385 )
1386 })
1387 .collect::<Vec<_>>();
1388 assert_eq!(expect, &read);
1389 }
1390
1391 #[test]
1392 fn test_series() {
1393 let region_metadata = schema_for_test();
1394 let mut series = Series::new(®ion_metadata);
1395 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1396 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1397 assert_eq!(2, series.active.timestamp.len());
1398 assert_eq!(0, series.frozen.len());
1399
1400 let values = series.compact(®ion_metadata).unwrap();
1401 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1402 assert_eq!(0, series.active.timestamp.len());
1403 assert_eq!(1, series.frozen.len());
1404 }
1405
1406 #[test]
1407 fn test_series_with_nulls() {
1408 let region_metadata = schema_for_test();
1409 let mut series = Series::new(®ion_metadata);
1410 series.push(
1413 ts_value_ref(1),
1414 0,
1415 OpType::Put,
1416 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1417 );
1418 series.push(
1419 ts_value_ref(1),
1420 0,
1421 OpType::Put,
1422 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1423 );
1424 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1425 series.push(
1426 ts_value_ref(1),
1427 3,
1428 OpType::Put,
1429 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1430 );
1431 assert_eq!(4, series.active.timestamp.len());
1432 assert_eq!(0, series.frozen.len());
1433
1434 let values = series.compact(®ion_metadata).unwrap();
1435 assert_eq!(values.fields[0].null_count(), 1);
1436 assert_eq!(values.fields[1].null_count(), 3);
1437 assert_eq!(0, series.active.timestamp.len());
1438 assert_eq!(1, series.frozen.len());
1439 }
1440
1441 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1442 let ts_len = batch.timestamps().len();
1443 assert_eq!(batch.sequences().len(), ts_len);
1444 assert_eq!(batch.op_types().len(), ts_len);
1445 for f in batch.fields() {
1446 assert_eq!(f.data.len(), ts_len);
1447 }
1448
1449 let mut rows = vec![];
1450 for idx in 0..ts_len {
1451 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1452 row.push(batch.timestamps().get(idx));
1453 row.push(batch.sequences().get(idx));
1454 row.push(batch.op_types().get(idx));
1455 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1456 rows.push(row);
1457 }
1458
1459 assert_eq!(expect.len(), rows.len());
1460 for (idx, row) in rows.iter().enumerate() {
1461 assert_eq!(&expect[idx], row);
1462 }
1463 }
1464
1465 #[test]
1466 fn test_values_sort() {
1467 let schema = schema_for_test();
1468 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1469 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1470 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1471
1472 let fields = vec![
1473 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1474 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1475 ];
1476 let values = Values {
1477 timestamp: timestamp as Arc<_>,
1478 sequence,
1479 op_type,
1480 fields,
1481 };
1482
1483 let batch = values
1484 .to_batch(
1485 b"test",
1486 &schema,
1487 &[0, 1, 2, 3, 4].into_iter().collect(),
1488 None,
1489 true,
1490 MergeMode::LastRow,
1491 )
1492 .unwrap();
1493 check_value(
1494 &batch,
1495 vec![
1496 vec![
1497 Value::Timestamp(Timestamp::new_millisecond(1)),
1498 Value::UInt64(1),
1499 Value::UInt8(1),
1500 Value::Int64(4),
1501 Value::Float64(OrderedFloat(1.1)),
1502 ],
1503 vec![
1504 Value::Timestamp(Timestamp::new_millisecond(2)),
1505 Value::UInt64(1),
1506 Value::UInt8(1),
1507 Value::Int64(3),
1508 Value::Float64(OrderedFloat(2.1)),
1509 ],
1510 vec![
1511 Value::Timestamp(Timestamp::new_millisecond(3)),
1512 Value::UInt64(2),
1513 Value::UInt8(0),
1514 Value::Int64(2),
1515 Value::Float64(OrderedFloat(4.2)),
1516 ],
1517 vec![
1518 Value::Timestamp(Timestamp::new_millisecond(4)),
1519 Value::UInt64(1),
1520 Value::UInt8(1),
1521 Value::Int64(1),
1522 Value::Float64(OrderedFloat(3.3)),
1523 ],
1524 ],
1525 )
1526 }
1527
1528 #[test]
1529 fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1530 let schema = schema_for_test();
1531 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1532
1533 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1540 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1541 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1542 let fields = vec![
1543 Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1544 Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1545 ];
1546 let values = Values {
1547 timestamp: timestamp as Arc<_>,
1548 sequence,
1549 op_type,
1550 fields,
1551 };
1552
1553 let batch = values
1554 .to_batch(
1555 b"test",
1556 &schema,
1557 &projection,
1558 Some(SequenceRange::LtEq { max: 2 }),
1559 true,
1560 MergeMode::LastNonNull,
1561 )
1562 .unwrap();
1563
1564 check_value(
1565 &batch,
1566 vec![vec![
1567 Value::Timestamp(Timestamp::new_millisecond(1)),
1568 Value::UInt64(2),
1569 Value::UInt8(OpType::Put as u8),
1570 Value::Int64(10),
1571 Value::Float64(OrderedFloat(1.5)),
1572 ]],
1573 );
1574 }
1575
1576 #[test]
1577 fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1578 let schema = schema_for_test();
1579 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1580
1581 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1587 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1588 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1589 let fields = vec![
1590 Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1591 Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1592 ];
1593 let values = Values {
1594 timestamp: timestamp as Arc<_>,
1595 sequence,
1596 op_type,
1597 fields,
1598 };
1599
1600 let batch = values
1601 .to_batch(
1602 b"test",
1603 &schema,
1604 &projection,
1605 Some(SequenceRange::Gt { min: 1 }),
1606 true,
1607 MergeMode::LastNonNull,
1608 )
1609 .unwrap();
1610
1611 check_value(
1612 &batch,
1613 vec![vec![
1614 Value::Timestamp(Timestamp::new_millisecond(1)),
1615 Value::UInt64(2),
1616 Value::UInt8(OpType::Put as u8),
1617 Value::Null,
1618 Value::Float64(OrderedFloat(1.0)),
1619 ]],
1620 );
1621 }
1622
1623 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1624 let column_schema = schema
1625 .column_metadatas
1626 .iter()
1627 .map(|c| api::v1::ColumnSchema {
1628 column_name: c.column_schema.name.clone(),
1629 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1630 .unwrap()
1631 .datatype() as i32,
1632 semantic_type: c.semantic_type as i32,
1633 ..Default::default()
1634 })
1635 .collect();
1636
1637 let rows = (0..len)
1638 .map(|i| {
1639 row(vec![
1640 ValueData::StringValue(k0.clone()),
1641 ValueData::I64Value(k1),
1642 ValueData::TimestampMillisecondValue(i as i64),
1643 ValueData::I64Value(i as i64),
1644 ValueData::F64Value(i as f64),
1645 ])
1646 })
1647 .collect();
1648 let mutation = api::v1::Mutation {
1649 op_type: 1,
1650 sequence: 0,
1651 rows: Some(Rows {
1652 schema: column_schema,
1653 rows,
1654 }),
1655 write_hint: None,
1656 };
1657 KeyValues::new(schema.as_ref(), mutation).unwrap()
1658 }
1659
1660 #[test]
1661 fn test_series_set_concurrency() {
1662 let schema = schema_for_test();
1663 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1664 schema
1665 .primary_key_columns()
1666 .map(|c| {
1667 (
1668 c.column_id,
1669 SortField::new(c.column_schema.data_type.clone()),
1670 )
1671 })
1672 .collect(),
1673 ));
1674 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1675
1676 let concurrency = 32;
1677 let pk_num = concurrency * 2;
1678 let mut handles = Vec::with_capacity(concurrency);
1679 for i in 0..concurrency {
1680 let set = set.clone();
1681 let schema = schema.clone();
1682 let column_schemas = schema
1683 .column_metadatas
1684 .iter()
1685 .map(column_metadata_to_column_schema)
1686 .collect::<Vec<_>>();
1687 let handle = std::thread::spawn(move || {
1688 for j in i * 100..(i + 1) * 100 {
1689 let pk = j % pk_num;
1690 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1691
1692 let kvs = KeyValues::new(
1693 &schema,
1694 Mutation {
1695 op_type: OpType::Put as i32,
1696 sequence: j as u64,
1697 rows: Some(Rows {
1698 schema: column_schemas.clone(),
1699 rows: vec![row(vec![
1700 ValueData::StringValue(format!("{}", j)),
1701 ValueData::I64Value(j as i64),
1702 ValueData::TimestampMillisecondValue(j as i64),
1703 ValueData::I64Value(j as i64),
1704 ValueData::F64Value(j as f64),
1705 ])],
1706 }),
1707 write_hint: None,
1708 },
1709 )
1710 .unwrap();
1711 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1712 }
1713 });
1714 handles.push(handle);
1715 }
1716 for h in handles {
1717 h.join().unwrap();
1718 }
1719
1720 let mut timestamps = Vec::with_capacity(concurrency * 100);
1721 let mut sequences = Vec::with_capacity(concurrency * 100);
1722 let mut op_types = Vec::with_capacity(concurrency * 100);
1723 let mut v0 = Vec::with_capacity(concurrency * 100);
1724
1725 for i in 0..pk_num {
1726 let pk = format!("pk-{}", i).as_bytes().to_vec();
1727 let series = set.get_series(&pk).unwrap();
1728 let mut guard = series.write().unwrap();
1729 let values = guard.compact(&schema).unwrap();
1730 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1731 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1732 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1733 v0.extend(
1734 values
1735 .fields
1736 .first()
1737 .unwrap()
1738 .as_any()
1739 .downcast_ref::<Int64Vector>()
1740 .unwrap()
1741 .iter_data()
1742 .map(|v| v.unwrap()),
1743 );
1744 }
1745
1746 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1747 assert_eq!(
1748 expected_sequence,
1749 sequences.iter().copied().collect::<HashSet<_>>()
1750 );
1751
1752 op_types.iter().all(|op| *op == OpType::Put as u8);
1753 assert_eq!(
1754 expected_sequence,
1755 timestamps.iter().copied().collect::<HashSet<_>>()
1756 );
1757
1758 assert_eq!(timestamps, sequences);
1759 assert_eq!(v0, timestamps);
1760 }
1761
1762 #[test]
1763 fn test_memtable() {
1764 common_telemetry::init_default_ut_logging();
1765 check_memtable_dedup(true);
1766 check_memtable_dedup(false);
1767 }
1768
1769 fn check_memtable_dedup(dedup: bool) {
1770 let schema = schema_for_test();
1771 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1772 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1773 memtable.write(&kvs).unwrap();
1774 memtable.write(&kvs).unwrap();
1775
1776 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1777 for ts in kvs.iter().map(|kv| {
1778 kv.timestamp()
1779 .try_into_timestamp()
1780 .unwrap()
1781 .unwrap()
1782 .value()
1783 }) {
1784 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1785 }
1786
1787 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1788 let range = ranges.ranges.into_values().next().unwrap();
1789 let iter = range.build_iter().unwrap();
1790 let mut read = HashMap::new();
1791
1792 for ts in iter
1793 .flat_map(|batch| {
1794 batch
1795 .unwrap()
1796 .timestamps()
1797 .as_any()
1798 .downcast_ref::<TimestampMillisecondVector>()
1799 .unwrap()
1800 .iter_data()
1801 .collect::<Vec<_>>()
1802 .into_iter()
1803 })
1804 .map(|v| v.unwrap().0.value())
1805 {
1806 *read.entry(ts).or_default() += 1;
1807 }
1808 assert_eq!(expected_ts, read);
1809
1810 let stats = memtable.stats();
1811 assert!(stats.bytes_allocated() > 0);
1812 assert_eq!(
1813 Some((
1814 Timestamp::new_millisecond(0),
1815 Timestamp::new_millisecond(99)
1816 )),
1817 stats.time_range()
1818 );
1819 }
1820
1821 #[test]
1822 fn test_memtable_projection() {
1823 common_telemetry::init_default_ut_logging();
1824 let schema = schema_for_test();
1825 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1826 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1827 memtable.write(&kvs).unwrap();
1828
1829 let iter = memtable
1830 .ranges(Some(&[3]), RangesOptions::default())
1831 .unwrap()
1832 .build(None)
1833 .unwrap();
1834
1835 let mut v0_all = vec![];
1836
1837 for res in iter {
1838 let batch = res.unwrap();
1839 assert_eq!(1, batch.fields().len());
1840 let v0 = batch
1841 .fields()
1842 .first()
1843 .unwrap()
1844 .data
1845 .as_any()
1846 .downcast_ref::<Int64Vector>()
1847 .unwrap();
1848 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1849 }
1850 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1851 }
1852
1853 #[test]
1854 fn test_memtable_concurrent_write_read() {
1855 common_telemetry::init_default_ut_logging();
1856 let schema = schema_for_test();
1857 let memtable = Arc::new(TimeSeriesMemtable::new(
1858 schema.clone(),
1859 42,
1860 None,
1861 true,
1862 MergeMode::LastRow,
1863 ));
1864
1865 let num_writers = 10;
1867 let num_readers = 5;
1869 let series_per_writer = 100;
1871 let rows_per_series = 10;
1873 let total_series = num_writers * series_per_writer;
1875
1876 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1878
1879 let mut writer_handles = Vec::with_capacity(num_writers);
1881 for writer_id in 0..num_writers {
1882 let memtable = memtable.clone();
1883 let schema = schema.clone();
1884 let barrier = barrier.clone();
1885
1886 let handle = std::thread::spawn(move || {
1887 barrier.wait();
1889
1890 for series_id in 0..series_per_writer {
1892 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1893 let kvs =
1894 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1895 memtable.write(&kvs).unwrap();
1896 }
1897 });
1898
1899 writer_handles.push(handle);
1900 }
1901
1902 let mut reader_handles = Vec::with_capacity(num_readers);
1904 for _ in 0..num_readers {
1905 let memtable = memtable.clone();
1906 let barrier = barrier.clone();
1907
1908 let handle = std::thread::spawn(move || {
1909 barrier.wait();
1910
1911 for _ in 0..10 {
1912 let iter = memtable
1913 .ranges(None, RangesOptions::default())
1914 .unwrap()
1915 .build(None)
1916 .unwrap();
1917 for batch_result in iter {
1918 let _ = batch_result.unwrap();
1919 }
1920 }
1921 });
1922
1923 reader_handles.push(handle);
1924 }
1925
1926 barrier.wait();
1927
1928 for handle in writer_handles {
1929 handle.join().unwrap();
1930 }
1931 for handle in reader_handles {
1932 handle.join().unwrap();
1933 }
1934
1935 let iter = memtable
1936 .ranges(None, RangesOptions::default())
1937 .unwrap()
1938 .build(None)
1939 .unwrap();
1940 let mut series_count = 0;
1941 let mut row_count = 0;
1942
1943 for batch_result in iter {
1944 let batch = batch_result.unwrap();
1945 series_count += 1;
1946 row_count += batch.num_rows();
1947 }
1948 assert_eq!(total_series, series_count);
1949 assert_eq!(total_series * rows_per_series, row_count);
1950 }
1951
1952 #[test]
1953 fn test_build_record_batch_iter_from_memtable() {
1954 let schema = schema_for_test();
1955 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1956
1957 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1958 memtable.write(&kvs).unwrap();
1959
1960 let read_column_ids: Vec<ColumnId> = schema
1961 .column_metadatas
1962 .iter()
1963 .map(|c| c.column_id)
1964 .collect();
1965 let ranges = memtable
1966 .ranges(Some(&read_column_ids), RangesOptions::default())
1967 .unwrap();
1968 assert_eq!(1, ranges.ranges.len());
1969
1970 let range = ranges.ranges.into_values().next().unwrap();
1971 let mut iter = range.build_record_batch_iter(None, None).unwrap();
1972 let rb = iter.next().transpose().unwrap().unwrap();
1973 assert_eq!(10, rb.num_rows());
1974 let schema = rb.schema();
1976 let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1977 assert_eq!(
1978 column_names,
1979 vec![
1980 "k0",
1981 "k1",
1982 "v0",
1983 "v1",
1984 "ts",
1985 "__primary_key",
1986 "__sequence",
1987 "__op_type",
1988 ]
1989 );
1990 assert!(iter.next().is_none());
1991 }
1992
1993 #[test]
1994 fn test_build_record_batch_iter_with_time_range() {
1995 let schema = schema_for_test();
1996 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1997
1998 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1999 memtable.write(&kvs).unwrap();
2000
2001 let read_column_ids: Vec<ColumnId> = schema
2002 .column_metadatas
2003 .iter()
2004 .map(|c| c.column_id)
2005 .collect();
2006 let ranges = memtable
2007 .ranges(Some(&read_column_ids), RangesOptions::default())
2008 .unwrap();
2009 assert_eq!(1, ranges.ranges.len());
2010
2011 let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
2012
2013 let range = ranges.ranges.into_values().next().unwrap();
2014 let mut iter = range
2015 .build_record_batch_iter(Some(time_range), None)
2016 .unwrap();
2017
2018 let mut total_rows = 0;
2019 let mut all_timestamps = Vec::new();
2020 while let Some(rb) = iter.next().transpose().unwrap() {
2021 total_rows += rb.num_rows();
2022 let ts_col = rb
2023 .column_by_name("ts")
2024 .unwrap()
2025 .as_any()
2026 .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2027 .unwrap();
2028 for i in 0..ts_col.len() {
2029 all_timestamps.push(ts_col.value(i));
2030 }
2031 }
2032 assert_eq!(5, total_rows);
2033 all_timestamps.sort();
2034 assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2035 }
2036
2037 fn build_iter_builder(
2039 schema: &RegionMetadataRef,
2040 memtable: &TimeSeriesMemtable,
2041 projection: Option<&[ColumnId]>,
2042 dedup: bool,
2043 merge_mode: MergeMode,
2044 sequence: Option<SequenceRange>,
2045 ) -> TimeSeriesIterBuilder {
2046 let read_column_ids = read_column_ids_from_projection(schema, projection);
2047 let field_projection = if let Some(projection) = projection {
2048 projection.iter().copied().collect()
2049 } else {
2050 schema.field_columns().map(|c| c.column_id).collect()
2051 };
2052 let adapter_context = Arc::new(BatchToRecordBatchContext::new(
2053 schema.clone(),
2054 read_column_ids,
2055 ));
2056 TimeSeriesIterBuilder {
2057 series_set: memtable.series_set.clone(),
2058 projection: field_projection,
2059 predicate: PredicateGroup::default(),
2060 dedup,
2061 merge_mode,
2062 sequence,
2063 batch_to_record_batch: adapter_context,
2064 }
2065 }
2066
2067 #[test]
2068 fn test_iter_builder_build_record_batch_basic() {
2069 let schema = schema_for_test();
2070 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2071
2072 let kvs = build_key_values(&schema, "hello".to_string(), 42, 10);
2073 memtable.write(&kvs).unwrap();
2074
2075 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2076
2077 let mut iter = builder.build_record_batch(None, None).unwrap();
2078 let rb = iter.next().transpose().unwrap().unwrap();
2079 assert_eq!(10, rb.num_rows());
2080
2081 let rb_schema = rb.schema();
2082 let col_names: Vec<_> = rb_schema
2083 .fields()
2084 .iter()
2085 .map(|f| f.name().as_str())
2086 .collect();
2087 assert_eq!(
2088 col_names,
2089 vec![
2090 "k0",
2091 "k1",
2092 "v0",
2093 "v1",
2094 "ts",
2095 "__primary_key",
2096 "__sequence",
2097 "__op_type",
2098 ]
2099 );
2100
2101 assert!(iter.next().is_none());
2102 }
2103
2104 #[test]
2105 fn test_iter_builder_build_record_batch_with_projection() {
2106 let schema = schema_for_test();
2107 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2108
2109 let kvs = build_key_values(&schema, "test".to_string(), 1, 5);
2110 memtable.write(&kvs).unwrap();
2111
2112 let projection = vec![2, 3];
2114 let builder = build_iter_builder(
2115 &schema,
2116 &memtable,
2117 Some(&projection),
2118 true,
2119 MergeMode::LastRow,
2120 None,
2121 );
2122
2123 let mut iter = builder.build_record_batch(None, None).unwrap();
2124 let rb = iter.next().transpose().unwrap().unwrap();
2125 assert_eq!(5, rb.num_rows());
2126
2127 let rb_schema = rb.schema();
2128 let col_names: Vec<_> = rb_schema
2129 .fields()
2130 .iter()
2131 .map(|f| f.name().as_str())
2132 .collect();
2133 assert_eq!(
2135 col_names,
2136 vec!["v0", "ts", "__primary_key", "__sequence", "__op_type",]
2137 );
2138
2139 assert!(iter.next().is_none());
2140 }
2141
2142 #[test]
2143 fn test_iter_builder_build_record_batch_multiple_series() {
2144 let schema = schema_for_test();
2145 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2146
2147 let kvs_a = build_key_values(&schema, "aaa".to_string(), 1, 3);
2148 let kvs_b = build_key_values(&schema, "bbb".to_string(), 2, 4);
2149 memtable.write(&kvs_a).unwrap();
2150 memtable.write(&kvs_b).unwrap();
2151
2152 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2153
2154 let iter = builder.build_record_batch(None, None).unwrap();
2155 let mut total_rows = 0;
2156 for rb in iter {
2157 let rb = rb.unwrap();
2158 total_rows += rb.num_rows();
2159 assert_eq!(8, rb.num_columns());
2160 }
2161 assert_eq!(7, total_rows);
2162 }
2163
2164 #[test]
2165 fn test_iter_builder_build_record_batch_dedup() {
2166 let schema = schema_for_test();
2167 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2168
2169 let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2171 memtable.write(&kvs).unwrap();
2172 memtable.write(&kvs).unwrap();
2173
2174 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2175
2176 let iter = builder.build_record_batch(None, None).unwrap();
2177 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2178 assert_eq!(5, total_rows);
2179 }
2180
2181 #[test]
2182 fn test_iter_builder_build_record_batch_no_dedup() {
2183 let schema = schema_for_test();
2184 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, false, MergeMode::LastRow);
2185
2186 let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2187 memtable.write(&kvs).unwrap();
2188 memtable.write(&kvs).unwrap();
2189
2190 let builder = build_iter_builder(&schema, &memtable, None, false, MergeMode::LastRow, None);
2191
2192 let iter = builder.build_record_batch(None, None).unwrap();
2193 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2194 assert_eq!(10, total_rows);
2195 }
2196
2197 #[test]
2198 fn test_iter_builder_build_record_batch_with_sequence_filter() {
2199 let schema = schema_for_test();
2200 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2201
2202 let kvs = build_key_values(&schema, "seq".to_string(), 1, 5);
2205 memtable.write(&kvs).unwrap();
2206
2207 let builder = build_iter_builder(
2209 &schema,
2210 &memtable,
2211 None,
2212 true,
2213 MergeMode::LastRow,
2214 Some(SequenceRange::Gt { min: 4 }),
2215 );
2216
2217 let iter = builder.build_record_batch(None, None).unwrap();
2218 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2219 assert_eq!(0, total_rows);
2220
2221 let builder = build_iter_builder(
2223 &schema,
2224 &memtable,
2225 None,
2226 true,
2227 MergeMode::LastRow,
2228 Some(SequenceRange::LtEq { max: 2 }),
2229 );
2230
2231 let iter = builder.build_record_batch(None, None).unwrap();
2232 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2233 assert_eq!(3, total_rows);
2234 }
2235
2236 #[test]
2237 fn test_iter_builder_build_record_batch_data_correctness() {
2238 use datatypes::arrow::array::{
2239 Float64Array, Int64Array, TimestampMillisecondArray, UInt8Array,
2240 };
2241
2242 let schema = schema_for_test();
2243 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2244
2245 let kvs = build_key_values(&schema, "check".to_string(), 7, 3);
2246 memtable.write(&kvs).unwrap();
2247
2248 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2249
2250 let mut iter = builder.build_record_batch(None, None).unwrap();
2251 let rb = iter.next().transpose().unwrap().unwrap();
2252 assert_eq!(3, rb.num_rows());
2253
2254 let ts_col = rb
2256 .column_by_name("ts")
2257 .unwrap()
2258 .as_any()
2259 .downcast_ref::<TimestampMillisecondArray>()
2260 .unwrap();
2261 let timestamps: Vec<_> = (0..ts_col.len()).map(|i| ts_col.value(i)).collect();
2262 assert_eq!(vec![0, 1, 2], timestamps);
2263
2264 let v0_col = rb
2266 .column_by_name("v0")
2267 .unwrap()
2268 .as_any()
2269 .downcast_ref::<Int64Array>()
2270 .unwrap();
2271 let v0_values: Vec<_> = (0..v0_col.len()).map(|i| v0_col.value(i)).collect();
2272 assert_eq!(vec![0, 1, 2], v0_values);
2273
2274 let v1_col = rb
2276 .column_by_name("v1")
2277 .unwrap()
2278 .as_any()
2279 .downcast_ref::<Float64Array>()
2280 .unwrap();
2281 let v1_values: Vec<_> = (0..v1_col.len()).map(|i| v1_col.value(i)).collect();
2282 assert_eq!(vec![0.0, 1.0, 2.0], v1_values);
2283
2284 let op_col = rb
2286 .column_by_name("__op_type")
2287 .unwrap()
2288 .as_any()
2289 .downcast_ref::<UInt8Array>()
2290 .unwrap();
2291 for i in 0..op_col.len() {
2292 assert_eq!(OpType::Put as u8, op_col.value(i));
2293 }
2294
2295 assert!(iter.next().is_none());
2296 }
2297}