1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, BoxedRecordBatchIterator,
55 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57 read_column_ids_from_projection,
58};
59use crate::metrics::{
60 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
61 READ_STAGE_ELAPSED,
62};
63use crate::read::dedup::LastNonNullIter;
64use crate::read::prune::PruneTimeIterator;
65use crate::read::scan_region::PredicateGroup;
66use crate::read::{Batch, BatchBuilder, BatchColumn};
67use crate::region::options::MergeMode;
68
69const INITIAL_BUILDER_CAPACITY: usize = 4;
71
72const BUILDER_CAPACITY: usize = 512;
74
75#[derive(Debug, Default)]
77pub struct TimeSeriesMemtableBuilder {
78 write_buffer_manager: Option<WriteBufferManagerRef>,
79 dedup: bool,
80 merge_mode: MergeMode,
81}
82
83impl TimeSeriesMemtableBuilder {
84 pub fn new(
86 write_buffer_manager: Option<WriteBufferManagerRef>,
87 dedup: bool,
88 merge_mode: MergeMode,
89 ) -> Self {
90 Self {
91 write_buffer_manager,
92 dedup,
93 merge_mode,
94 }
95 }
96}
97
98impl MemtableBuilder for TimeSeriesMemtableBuilder {
99 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
100 if metadata.primary_key.is_empty() {
101 Arc::new(SimpleBulkMemtable::new(
102 id,
103 metadata.clone(),
104 self.write_buffer_manager.clone(),
105 self.dedup,
106 self.merge_mode,
107 ))
108 } else {
109 Arc::new(TimeSeriesMemtable::new(
110 metadata.clone(),
111 id,
112 self.write_buffer_manager.clone(),
113 self.dedup,
114 self.merge_mode,
115 ))
116 }
117 }
118
119 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
120 false
123 }
124}
125
126pub struct TimeSeriesMemtable {
128 id: MemtableId,
129 region_metadata: RegionMetadataRef,
130 row_codec: Arc<DensePrimaryKeyCodec>,
131 series_set: SeriesSet,
132 alloc_tracker: AllocTracker,
133 max_timestamp: AtomicI64,
134 min_timestamp: AtomicI64,
135 max_sequence: AtomicU64,
136 dedup: bool,
137 merge_mode: MergeMode,
138 num_rows: AtomicUsize,
140}
141
142impl TimeSeriesMemtable {
143 pub fn new(
144 region_metadata: RegionMetadataRef,
145 id: MemtableId,
146 write_buffer_manager: Option<WriteBufferManagerRef>,
147 dedup: bool,
148 merge_mode: MergeMode,
149 ) -> Self {
150 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
151 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
152 Self {
153 id,
154 region_metadata,
155 series_set,
156 row_codec,
157 alloc_tracker: AllocTracker::new(write_buffer_manager),
158 max_timestamp: AtomicI64::new(i64::MIN),
159 min_timestamp: AtomicI64::new(i64::MAX),
160 max_sequence: AtomicU64::new(0),
161 dedup,
162 merge_mode,
163 num_rows: Default::default(),
164 }
165 }
166
167 fn update_stats(&self, stats: WriteMetrics) {
169 self.alloc_tracker
170 .on_allocation(stats.key_bytes + stats.value_bytes);
171 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
172 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
173 self.max_sequence
174 .fetch_max(stats.max_sequence, Ordering::SeqCst);
175 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
176 }
177
178 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
179 ensure!(
180 self.row_codec.num_fields() == kv.num_primary_keys(),
181 PrimaryKeyLengthMismatchSnafu {
182 expect: self.row_codec.num_fields(),
183 actual: kv.num_primary_keys(),
184 }
185 );
186
187 let primary_key_encoded = self
188 .row_codec
189 .encode(kv.primary_keys())
190 .context(EncodeSnafu)?;
191
192 let (key_allocated, value_allocated) =
193 self.series_set.push_to_series(primary_key_encoded, &kv);
194 stats.key_bytes += key_allocated;
195 stats.value_bytes += value_allocated;
196
197 let ts = kv
199 .timestamp()
200 .try_into_timestamp()
201 .unwrap()
202 .unwrap()
203 .value();
204 stats.min_ts = stats.min_ts.min(ts);
205 stats.max_ts = stats.max_ts.max(ts);
206 Ok(())
207 }
208}
209
210impl Debug for TimeSeriesMemtable {
211 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212 f.debug_struct("TimeSeriesMemtable").finish()
213 }
214}
215
216impl Memtable for TimeSeriesMemtable {
217 fn id(&self) -> MemtableId {
218 self.id
219 }
220
221 fn write(&self, kvs: &KeyValues) -> Result<()> {
222 if kvs.is_empty() {
223 return Ok(());
224 }
225
226 let mut local_stats = WriteMetrics::default();
227
228 for kv in kvs.iter() {
229 self.write_key_value(kv, &mut local_stats)?;
230 }
231 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
232 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
233 local_stats.max_sequence = kvs.max_sequence();
234 local_stats.num_rows = kvs.num_rows();
235 self.update_stats(local_stats);
239 Ok(())
240 }
241
242 fn write_one(&self, key_value: KeyValue) -> Result<()> {
243 let mut metrics = WriteMetrics::default();
244 let res = self.write_key_value(key_value, &mut metrics);
245 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
246 metrics.max_sequence = key_value.sequence();
247 metrics.num_rows = 1;
248
249 if res.is_ok() {
250 self.update_stats(metrics);
251 }
252 res
253 }
254
255 fn write_bulk(&self, part: BulkPart) -> Result<()> {
256 let mutation = part.to_mutation(&self.region_metadata)?;
258 let mut metrics = WriteMetrics::default();
259 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
260 for kv in key_values.iter() {
261 self.write_key_value(kv, &mut metrics)?
262 }
263 }
264
265 metrics.max_sequence = part.sequence;
266 metrics.max_ts = part.max_timestamp;
267 metrics.min_ts = part.min_timestamp;
268 metrics.num_rows = part.num_rows();
269 self.update_stats(metrics);
270 Ok(())
271 }
272
273 fn ranges(
274 &self,
275 projection: Option<&[ColumnId]>,
276 options: RangesOptions,
277 ) -> Result<MemtableRanges> {
278 let predicate = options.predicate;
279 let sequence = options.sequence;
280 let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
281 let projection = if let Some(projection) = projection {
282 projection.iter().copied().collect()
283 } else {
284 self.region_metadata
285 .field_columns()
286 .map(|c| c.column_id)
287 .collect()
288 };
289 let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
290 self.region_metadata.clone(),
291 read_column_ids,
292 ));
293 let builder = Box::new(TimeSeriesIterBuilder {
294 series_set: self.series_set.clone(),
295 projection,
296 predicate: predicate.clone(),
297 dedup: self.dedup,
298 merge_mode: self.merge_mode,
299 sequence,
300 batch_to_record_batch,
301 });
302 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
303 let range_stats = self.stats();
304 let range = MemtableRange::new(context, range_stats);
305 Ok(MemtableRanges {
306 ranges: [(0, range)].into(),
307 })
308 }
309
310 fn is_empty(&self) -> bool {
311 self.series_set.series.read().unwrap().0.is_empty()
312 }
313
314 fn freeze(&self) -> Result<()> {
315 self.alloc_tracker.done_allocating();
316
317 Ok(())
318 }
319
320 fn stats(&self) -> MemtableStats {
321 let estimated_bytes = self.alloc_tracker.bytes_allocated();
322
323 if estimated_bytes == 0 {
324 return MemtableStats {
326 estimated_bytes,
327 time_range: None,
328 num_rows: 0,
329 num_ranges: 0,
330 max_sequence: 0,
331 series_count: 0,
332 };
333 }
334 let ts_type = self
335 .region_metadata
336 .time_index_column()
337 .column_schema
338 .data_type
339 .clone()
340 .as_timestamp()
341 .expect("Timestamp column must have timestamp type");
342 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
343 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
344 let series_count = self.series_set.series.read().unwrap().0.len();
345 MemtableStats {
346 estimated_bytes,
347 time_range: Some((min_timestamp, max_timestamp)),
348 num_rows: self.num_rows.load(Ordering::Relaxed),
349 num_ranges: 1,
350 max_sequence: self.max_sequence.load(Ordering::Relaxed),
351 series_count,
352 }
353 }
354
355 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
356 Arc::new(TimeSeriesMemtable::new(
357 metadata.clone(),
358 id,
359 self.alloc_tracker.write_buffer_manager(),
360 self.dedup,
361 self.merge_mode,
362 ))
363 }
364}
365
366#[derive(Default)]
367struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
368
369impl Drop for SeriesMap {
370 fn drop(&mut self) {
371 let num_series = self.0.len();
372 let num_field_builders = self
373 .0
374 .values()
375 .map(|v| v.read().unwrap().active.num_field_builders())
376 .sum::<usize>();
377 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
378 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
379 }
380}
381
382#[derive(Clone)]
383pub(crate) struct SeriesSet {
384 region_metadata: RegionMetadataRef,
385 series: Arc<RwLock<SeriesMap>>,
386 codec: Arc<DensePrimaryKeyCodec>,
387}
388
389impl SeriesSet {
390 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
391 Self {
392 region_metadata,
393 series: Default::default(),
394 codec,
395 }
396 }
397}
398
399impl SeriesSet {
400 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
402 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
403 let value_allocated = series.write().unwrap().push(
404 kv.timestamp(),
405 kv.sequence(),
406 kv.op_type(),
407 kv.fields(),
408 );
409 return (0, value_allocated);
410 };
411
412 let mut indices = self.series.write().unwrap();
413 match indices.0.entry(primary_key) {
414 Entry::Vacant(v) => {
415 let key_len = v.key().len();
416 let mut series = Series::new(&self.region_metadata);
417 let value_allocated =
418 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
419 v.insert(Arc::new(RwLock::new(series)));
420 (key_len, value_allocated)
421 }
422 Entry::Occupied(v) => {
424 let value_allocated = v.get().write().unwrap().push(
425 kv.timestamp(),
426 kv.sequence(),
427 kv.op_type(),
428 kv.fields(),
429 );
430 (0, value_allocated)
431 }
432 }
433 }
434
435 #[cfg(test)]
436 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
437 self.series.read().unwrap().0.get(primary_key).cloned()
438 }
439
440 fn iter_series(
442 &self,
443 projection: HashSet<ColumnId>,
444 predicate: PredicateGroup,
445 dedup: bool,
446 merge_mode: MergeMode,
447 sequence: Option<SequenceRange>,
448 mem_scan_metrics: Option<MemScanMetrics>,
449 ) -> Result<Iter> {
450 let primary_key_schema = primary_key_schema(&self.region_metadata);
451 let primary_key_datatypes = self
452 .region_metadata
453 .primary_key_columns()
454 .map(|pk| pk.column_schema.data_type.clone())
455 .collect();
456
457 Iter::try_new(
458 self.region_metadata.clone(),
459 self.series.clone(),
460 projection,
461 predicate.predicate().cloned(),
462 primary_key_schema,
463 primary_key_datatypes,
464 self.codec.clone(),
465 dedup,
466 merge_mode,
467 sequence,
468 mem_scan_metrics,
469 )
470 }
471}
472
473pub(crate) fn primary_key_schema(
476 region_metadata: &RegionMetadataRef,
477) -> arrow::datatypes::SchemaRef {
478 let fields = region_metadata
479 .primary_key_columns()
480 .map(|pk| {
481 arrow::datatypes::Field::new(
482 pk.column_schema.name.clone(),
483 pk.column_schema.data_type.as_arrow_type(),
484 pk.column_schema.is_nullable(),
485 )
486 })
487 .collect::<Vec<_>>();
488 Arc::new(arrow::datatypes::Schema::new(fields))
489}
490
491#[derive(Debug, Default)]
493struct Metrics {
494 total_series: usize,
496 num_pruned_series: usize,
498 num_rows: usize,
500 num_batches: usize,
502 scan_cost: Duration,
504}
505
506struct Iter {
507 metadata: RegionMetadataRef,
508 series: Arc<RwLock<SeriesMap>>,
509 projection: HashSet<ColumnId>,
510 last_key: Option<Vec<u8>>,
511 predicate: Vec<SimpleFilterEvaluator>,
512 pk_schema: arrow::datatypes::SchemaRef,
513 pk_datatypes: Vec<ConcreteDataType>,
514 codec: Arc<DensePrimaryKeyCodec>,
515 dedup: bool,
516 merge_mode: MergeMode,
517 sequence: Option<SequenceRange>,
518 metrics: Metrics,
519 mem_scan_metrics: Option<MemScanMetrics>,
520}
521
522impl Iter {
523 #[allow(clippy::too_many_arguments)]
524 pub(crate) fn try_new(
525 metadata: RegionMetadataRef,
526 series: Arc<RwLock<SeriesMap>>,
527 projection: HashSet<ColumnId>,
528 predicate: Option<Predicate>,
529 pk_schema: arrow::datatypes::SchemaRef,
530 pk_datatypes: Vec<ConcreteDataType>,
531 codec: Arc<DensePrimaryKeyCodec>,
532 dedup: bool,
533 merge_mode: MergeMode,
534 sequence: Option<SequenceRange>,
535 mem_scan_metrics: Option<MemScanMetrics>,
536 ) -> Result<Self> {
537 let predicate = predicate
538 .map(|predicate| {
539 predicate
540 .exprs()
541 .iter()
542 .filter_map(SimpleFilterEvaluator::try_new)
543 .collect::<Vec<_>>()
544 })
545 .unwrap_or_default();
546 Ok(Self {
547 metadata,
548 series,
549 projection,
550 last_key: None,
551 predicate,
552 pk_schema,
553 pk_datatypes,
554 codec,
555 dedup,
556 merge_mode,
557 sequence,
558 metrics: Metrics::default(),
559 mem_scan_metrics,
560 })
561 }
562
563 fn report_mem_scan_metrics(&mut self) {
564 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
565 let inner = crate::memtable::MemScanMetricsData {
566 total_series: self.metrics.total_series,
567 num_rows: self.metrics.num_rows,
568 num_batches: self.metrics.num_batches,
569 scan_cost: self.metrics.scan_cost,
570 ..Default::default()
571 };
572 mem_scan_metrics.merge_inner(&inner);
573 }
574 }
575}
576
577impl Drop for Iter {
578 fn drop(&mut self) {
579 debug!(
580 "Iter {} time series memtable, metrics: {:?}",
581 self.metadata.region_id, self.metrics
582 );
583
584 self.report_mem_scan_metrics();
586
587 READ_ROWS_TOTAL
588 .with_label_values(&["time_series_memtable"])
589 .inc_by(self.metrics.num_rows as u64);
590 READ_STAGE_ELAPSED
591 .with_label_values(&["scan_memtable"])
592 .observe(self.metrics.scan_cost.as_secs_f64());
593 }
594}
595
596impl Iterator for Iter {
597 type Item = Result<Batch>;
598
599 fn next(&mut self) -> Option<Self::Item> {
600 let start = Instant::now();
601 let map = self.series.read().unwrap();
602 let range = match &self.last_key {
603 None => map.0.range::<Vec<u8>, _>(..),
604 Some(last_key) => map
605 .0
606 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
607 };
608
609 for (primary_key, series) in range {
611 self.metrics.total_series += 1;
612
613 let mut series = series.write().unwrap();
614 if !self.predicate.is_empty()
615 && !prune_primary_key(
616 &self.codec,
617 primary_key.as_slice(),
618 &mut series,
619 &self.pk_datatypes,
620 self.pk_schema.clone(),
621 &self.predicate,
622 )
623 {
624 self.metrics.num_pruned_series += 1;
626 continue;
627 }
628 self.last_key = Some(primary_key.clone());
629
630 let values = series.compact(&self.metadata);
631 let batch = values.and_then(|v| {
632 v.to_batch(
633 primary_key,
634 &self.metadata,
635 &self.projection,
636 self.sequence,
637 self.dedup,
638 self.merge_mode,
639 )
640 });
641
642 self.metrics.num_batches += 1;
644 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
645 self.metrics.scan_cost += start.elapsed();
646
647 return Some(batch);
648 }
649 drop(map); self.metrics.scan_cost += start.elapsed();
651
652 self.report_mem_scan_metrics();
654
655 None
656 }
657}
658
659fn prune_primary_key(
660 codec: &Arc<DensePrimaryKeyCodec>,
661 pk: &[u8],
662 series: &mut Series,
663 datatypes: &[ConcreteDataType],
664 pk_schema: arrow::datatypes::SchemaRef,
665 predicates: &[SimpleFilterEvaluator],
666) -> bool {
667 if pk_schema.fields().is_empty() {
669 return true;
670 }
671
672 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
674 pk_values
675 } else {
676 let pk_values = codec.decode_dense_without_column_id(pk);
677 if let Err(e) = pk_values {
678 error!(e; "Failed to decode primary key");
679 return true;
680 }
681 series.update_pk_cache(pk_values.unwrap());
682 series.pk_cache.as_ref().unwrap()
683 };
684
685 let mut result = true;
687 for predicate in predicates {
688 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
690 continue;
691 };
692 let scalar_value = pk_values[index]
694 .try_to_scalar_value(&datatypes[index])
695 .unwrap();
696 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
697 }
698
699 result
700}
701
702pub struct Series {
704 pk_cache: Option<Vec<Value>>,
705 active: ValueBuilder,
706 frozen: Vec<Values>,
707 region_metadata: RegionMetadataRef,
708 capacity: usize,
709}
710
711impl Series {
712 pub(crate) fn with_capacity(
713 region_metadata: &RegionMetadataRef,
714 init_capacity: usize,
715 capacity: usize,
716 ) -> Self {
717 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
718 Self {
719 pk_cache: None,
720 active: ValueBuilder::new(region_metadata, init_capacity),
721 frozen: vec![],
722 region_metadata: region_metadata.clone(),
723 capacity,
724 }
725 }
726
727 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
728 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
729 }
730
731 pub fn is_empty(&self) -> bool {
732 self.active.len() == 0 && self.frozen.is_empty()
733 }
734
735 pub(crate) fn push<'a>(
737 &mut self,
738 ts: ValueRef<'a>,
739 sequence: u64,
740 op_type: OpType,
741 values: impl Iterator<Item = ValueRef<'a>>,
742 ) -> usize {
743 if self.active.len() + 10 > self.capacity {
745 let region_metadata = self.region_metadata.clone();
746 self.freeze(®ion_metadata);
747 }
748 self.active.push(ts, sequence, op_type as u8, values)
749 }
750
751 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
752 self.pk_cache = Some(pk_values);
753 }
754
755 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
757 if self.active.len() != 0 {
758 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
759 std::mem::swap(&mut self.active, &mut builder);
760 self.frozen.push(Values::from(builder));
761 }
762 }
763
764 pub(crate) fn extend(
765 &mut self,
766 ts_v: VectorRef,
767 op_type_v: u8,
768 sequence_v: u64,
769 fields: Vec<VectorRef>,
770 ) -> Result<()> {
771 if !self.active.can_accommodate(&fields)? {
772 let region_metadata = self.region_metadata.clone();
773 self.freeze(®ion_metadata);
774 }
775 self.active.extend(ts_v, op_type_v, sequence_v, fields)
776 }
777
778 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
781 self.freeze(region_metadata);
782
783 let frozen = &self.frozen;
784
785 debug_assert!(!frozen.is_empty());
787
788 if frozen.len() > 1 {
789 let column_size = frozen[0].fields.len() + 3;
793
794 if cfg!(debug_assertions) {
795 debug_assert!(
796 frozen
797 .iter()
798 .zip(frozen.iter().skip(1))
799 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
800 );
801 }
802
803 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
804 let concatenated = (0..column_size)
805 .map(|i| {
806 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
807 arrow::compute::concat(&to_concat)
808 })
809 .collect::<std::result::Result<Vec<_>, _>>()
810 .context(ComputeArrowSnafu)?;
811
812 debug_assert_eq!(concatenated.len(), column_size);
813 let values = Values::from_columns(&concatenated)?;
814 self.frozen = vec![values];
815 };
816 Ok(&self.frozen[0])
817 }
818
819 pub fn read_to_values(&self) -> Vec<Values> {
820 let mut res = Vec::with_capacity(self.frozen.len() + 1);
821 res.extend(self.frozen.iter().cloned());
822 res.push(self.active.finish_cloned());
823 res
824 }
825}
826
827pub(crate) struct ValueBuilder {
829 timestamp: Vec<i64>,
830 timestamp_type: ConcreteDataType,
831 sequence: Vec<u64>,
832 op_type: Vec<u8>,
833 fields: Vec<Option<FieldBuilder>>,
834 field_types: Vec<ConcreteDataType>,
835}
836
837impl ValueBuilder {
838 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
839 let timestamp_type = region_metadata
840 .time_index_column()
841 .column_schema
842 .data_type
843 .clone();
844 let sequence = Vec::with_capacity(capacity);
845 let op_type = Vec::with_capacity(capacity);
846
847 let field_types = region_metadata
848 .field_columns()
849 .map(|c| c.column_schema.data_type.clone())
850 .collect::<Vec<_>>();
851 let fields = (0..field_types.len()).map(|_| None).collect();
852 Self {
853 timestamp: Vec::with_capacity(capacity),
854 timestamp_type,
855 sequence,
856 op_type,
857 fields,
858 field_types,
859 }
860 }
861
862 pub fn num_field_builders(&self) -> usize {
864 self.fields.iter().flatten().count()
865 }
866
867 pub(crate) fn push<'a>(
873 &mut self,
874 ts: ValueRef,
875 sequence: u64,
876 op_type: u8,
877 fields: impl Iterator<Item = ValueRef<'a>>,
878 ) -> usize {
879 #[cfg(debug_assertions)]
880 let fields = {
881 let field_vec = fields.collect::<Vec<_>>();
882 debug_assert_eq!(field_vec.len(), self.fields.len());
883 field_vec.into_iter()
884 };
885
886 self.timestamp
887 .push(ts.try_into_timestamp().unwrap().unwrap().value());
888 self.sequence.push(sequence);
889 self.op_type.push(op_type);
890 let num_rows = self.timestamp.len();
891 let mut size = 0;
892 for (idx, field_value) in fields.enumerate() {
893 size += field_value.data_size();
894 if !field_value.is_null() || self.fields[idx].is_some() {
895 if let Some(field) = self.fields[idx].as_mut() {
896 field
897 .push(field_value)
898 .unwrap_or_else(|e| panic!("Failed to push field value: {e:?}"));
899 } else {
900 let mut mutable_vector =
901 if let ConcreteDataType::String(_) = &self.field_types[idx] {
902 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
903 } else {
904 FieldBuilder::Other(
905 self.field_types[idx]
906 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
907 )
908 };
909 mutable_vector.push_nulls(num_rows - 1);
910 mutable_vector
911 .push(field_value)
912 .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
913 self.fields[idx] = Some(mutable_vector);
914 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
915 }
916 }
917 }
918
919 size
920 }
921
922 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
925 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
926 let Some(builder) = field_dest else {
927 continue;
928 };
929 let FieldBuilder::String(builder) = builder else {
930 continue;
931 };
932 let array = field_src.to_arrow_array();
933 let string_array = array
934 .as_any()
935 .downcast_ref::<StringArray>()
936 .with_context(|| error::InvalidBatchSnafu {
937 reason: format!(
938 "Field type mismatch, expecting String, given: {}",
939 field_src.data_type()
940 ),
941 })?;
942 let space_needed = string_array.value_data().len() as i32;
943 if builder.next_offset().checked_add(space_needed).is_none() {
945 return Ok(false);
946 }
947 }
948 Ok(true)
949 }
950
951 pub(crate) fn extend(
952 &mut self,
953 ts_v: VectorRef,
954 op_type: u8,
955 sequence: u64,
956 fields: Vec<VectorRef>,
957 ) -> Result<()> {
958 let num_rows_before = self.timestamp.len();
959 let num_rows_to_write = ts_v.len();
960 self.timestamp.reserve(num_rows_to_write);
961 match self.timestamp_type {
962 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
963 self.timestamp.extend(
964 ts_v.as_any()
965 .downcast_ref::<TimestampSecondVector>()
966 .unwrap()
967 .iter_data()
968 .map(|v| v.unwrap().0.value()),
969 );
970 }
971 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
972 self.timestamp.extend(
973 ts_v.as_any()
974 .downcast_ref::<TimestampMillisecondVector>()
975 .unwrap()
976 .iter_data()
977 .map(|v| v.unwrap().0.value()),
978 );
979 }
980 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
981 self.timestamp.extend(
982 ts_v.as_any()
983 .downcast_ref::<TimestampMicrosecondVector>()
984 .unwrap()
985 .iter_data()
986 .map(|v| v.unwrap().0.value()),
987 );
988 }
989 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
990 self.timestamp.extend(
991 ts_v.as_any()
992 .downcast_ref::<TimestampNanosecondVector>()
993 .unwrap()
994 .iter_data()
995 .map(|v| v.unwrap().0.value()),
996 );
997 }
998 _ => unreachable!(),
999 };
1000
1001 self.op_type.reserve(num_rows_to_write);
1002 self.op_type
1003 .extend(iter::repeat_n(op_type, num_rows_to_write));
1004 self.sequence.reserve(num_rows_to_write);
1005 self.sequence
1006 .extend(iter::repeat_n(sequence, num_rows_to_write));
1007
1008 for (field_idx, (field_src, field_dest)) in
1009 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1010 {
1011 let builder = field_dest.get_or_insert_with(|| {
1012 let mut field_builder =
1013 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1014 field_builder.push_nulls(num_rows_before);
1015 field_builder
1016 });
1017 match builder {
1018 FieldBuilder::String(builder) => {
1019 let array = field_src.to_arrow_array();
1020 let string_array =
1021 array
1022 .as_any()
1023 .downcast_ref::<StringArray>()
1024 .with_context(|| error::InvalidBatchSnafu {
1025 reason: format!(
1026 "Field type mismatch, expecting String, given: {}",
1027 field_src.data_type()
1028 ),
1029 })?;
1030 builder.append_array(string_array);
1031 }
1032 FieldBuilder::Other(builder) => {
1033 let len = field_src.len();
1034 builder
1035 .extend_slice_of(&*field_src, 0, len)
1036 .context(error::ComputeVectorSnafu)?;
1037 }
1038 }
1039 }
1040 Ok(())
1041 }
1042
1043 fn len(&self) -> usize {
1045 let sequence_len = self.sequence.len();
1046 debug_assert_eq!(sequence_len, self.op_type.len());
1047 debug_assert_eq!(sequence_len, self.timestamp.len());
1048 sequence_len
1049 }
1050
1051 fn finish_cloned(&self) -> Values {
1052 let num_rows = self.sequence.len();
1053 let fields = self
1054 .fields
1055 .iter()
1056 .enumerate()
1057 .map(|(i, v)| {
1058 if let Some(v) = v {
1059 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1060 v.finish_cloned()
1061 } else {
1062 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1063 single_null.push_nulls(num_rows);
1064 single_null.to_vector()
1065 }
1066 })
1067 .collect::<Vec<_>>();
1068
1069 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1070 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1071 let timestamp: VectorRef = match self.timestamp_type {
1072 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1073 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1074 }
1075 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1076 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1077 }
1078 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1079 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1080 }
1081 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1082 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1083 }
1084 _ => unreachable!(),
1085 };
1086
1087 if cfg!(debug_assertions) {
1088 debug_assert_eq!(timestamp.len(), sequence.len());
1089 debug_assert_eq!(timestamp.len(), op_type.len());
1090 for field in &fields {
1091 debug_assert_eq!(timestamp.len(), field.len());
1092 }
1093 }
1094
1095 Values {
1096 timestamp,
1097 sequence,
1098 op_type,
1099 fields,
1100 }
1101 }
1102}
1103
1104#[derive(Clone)]
1106pub struct Values {
1107 pub(crate) timestamp: VectorRef,
1108 pub(crate) sequence: Arc<UInt64Vector>,
1109 pub(crate) op_type: Arc<UInt8Vector>,
1110 pub(crate) fields: Vec<VectorRef>,
1111}
1112
1113impl Values {
1114 pub fn to_batch(
1117 &self,
1118 primary_key: &[u8],
1119 metadata: &RegionMetadataRef,
1120 projection: &HashSet<ColumnId>,
1121 sequence: Option<SequenceRange>,
1122 dedup: bool,
1123 merge_mode: MergeMode,
1124 ) -> Result<Batch> {
1125 let builder = BatchBuilder::with_required_columns(
1126 primary_key.to_vec(),
1127 self.timestamp.clone(),
1128 self.sequence.clone(),
1129 self.op_type.clone(),
1130 );
1131
1132 let fields = metadata
1133 .field_columns()
1134 .zip(self.fields.iter())
1135 .filter_map(|(c, f)| {
1136 projection.get(&c.column_id).map(|c| BatchColumn {
1137 column_id: *c,
1138 data: f.clone(),
1139 })
1140 })
1141 .collect();
1142
1143 let mut batch = builder.with_fields(fields).build()?;
1144 batch.filter_by_sequence(sequence)?;
1148
1149 match (dedup, merge_mode) {
1150 (false, _) => batch.sort(false)?,
1152 (true, MergeMode::LastRow) => batch.sort(true)?,
1154 (true, MergeMode::LastNonNull) => {
1156 batch.sort(false)?;
1157 batch.merge_last_non_null()?;
1158 }
1159 }
1160 Ok(batch)
1161 }
1162
1163 fn columns(&self) -> Vec<ArrayRef> {
1165 let mut res = Vec::with_capacity(3 + self.fields.len());
1166 res.push(self.timestamp.to_arrow_array());
1167 res.push(self.sequence.to_arrow_array());
1168 res.push(self.op_type.to_arrow_array());
1169 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1170 res
1171 }
1172
1173 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1175 debug_assert!(cols.len() >= 3);
1176 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1177 let sequence =
1178 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1179 let op_type =
1180 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1181 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1182
1183 Ok(Self {
1184 timestamp,
1185 sequence,
1186 op_type,
1187 fields,
1188 })
1189 }
1190}
1191
1192impl From<ValueBuilder> for Values {
1193 fn from(mut value: ValueBuilder) -> Self {
1194 let num_rows = value.len();
1195 let fields = value
1196 .fields
1197 .iter_mut()
1198 .enumerate()
1199 .map(|(i, v)| {
1200 if let Some(v) = v {
1201 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1202 v.finish()
1203 } else {
1204 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1205 single_null.push_nulls(num_rows);
1206 single_null.to_vector()
1207 }
1208 })
1209 .collect::<Vec<_>>();
1210
1211 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1212 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1213 let timestamp: VectorRef = match value.timestamp_type {
1214 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1215 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1216 }
1217 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1218 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1219 }
1220 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1221 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1222 }
1223 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1224 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1225 }
1226 _ => unreachable!(),
1227 };
1228
1229 if cfg!(debug_assertions) {
1230 debug_assert_eq!(timestamp.len(), sequence.len());
1231 debug_assert_eq!(timestamp.len(), op_type.len());
1232 for field in &fields {
1233 debug_assert_eq!(timestamp.len(), field.len());
1234 }
1235 }
1236
1237 Self {
1238 timestamp,
1239 sequence,
1240 op_type,
1241 fields,
1242 }
1243 }
1244}
1245
1246struct TimeSeriesIterBuilder {
1247 series_set: SeriesSet,
1248 projection: HashSet<ColumnId>,
1249 predicate: PredicateGroup,
1250 dedup: bool,
1251 sequence: Option<SequenceRange>,
1252 merge_mode: MergeMode,
1253 batch_to_record_batch: Arc<BatchToRecordBatchContext>,
1254}
1255
1256impl IterBuilder for TimeSeriesIterBuilder {
1257 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1258 let iter = self.series_set.iter_series(
1259 self.projection.clone(),
1260 self.predicate.clone(),
1261 self.dedup,
1262 self.merge_mode,
1263 self.sequence,
1264 metrics,
1265 )?;
1266 if self.merge_mode == MergeMode::LastNonNull {
1267 let iter = LastNonNullIter::new(iter);
1268 Ok(Box::new(iter))
1269 } else {
1270 Ok(Box::new(iter))
1271 }
1272 }
1273
1274 fn is_record_batch(&self) -> bool {
1275 true
1276 }
1277
1278 fn build_record_batch(
1279 &self,
1280 time_range: Option<(Timestamp, Timestamp)>,
1281 metrics: Option<MemScanMetrics>,
1282 ) -> Result<BoxedRecordBatchIterator> {
1283 let iter = self.build(metrics)?;
1284 let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
1285 let time_filters = self.predicate.time_filters();
1286 Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
1287 } else {
1288 iter
1289 };
1290 Ok(self.batch_to_record_batch.adapt_iter(iter))
1291 }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296 use std::collections::{HashMap, HashSet};
1297
1298 use api::helper::ColumnDataTypeWrapper;
1299 use api::v1::helper::row;
1300 use api::v1::value::ValueData;
1301 use api::v1::{Mutation, Rows, SemanticType};
1302 use common_time::Timestamp;
1303 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1304 use datatypes::schema::ColumnSchema;
1305 use datatypes::value::{OrderedFloat, Value};
1306 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1307 use mito_codec::row_converter::SortField;
1308 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1309 use store_api::storage::RegionId;
1310
1311 use super::*;
1312 use crate::test_util::column_metadata_to_column_schema;
1313
1314 fn schema_for_test() -> RegionMetadataRef {
1315 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1316 builder
1317 .push_column_metadata(ColumnMetadata {
1318 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1319 semantic_type: SemanticType::Tag,
1320 column_id: 0,
1321 })
1322 .push_column_metadata(ColumnMetadata {
1323 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1324 semantic_type: SemanticType::Tag,
1325 column_id: 1,
1326 })
1327 .push_column_metadata(ColumnMetadata {
1328 column_schema: ColumnSchema::new(
1329 "ts",
1330 ConcreteDataType::timestamp_millisecond_datatype(),
1331 false,
1332 ),
1333 semantic_type: SemanticType::Timestamp,
1334 column_id: 2,
1335 })
1336 .push_column_metadata(ColumnMetadata {
1337 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1338 semantic_type: SemanticType::Field,
1339 column_id: 3,
1340 })
1341 .push_column_metadata(ColumnMetadata {
1342 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1343 semantic_type: SemanticType::Field,
1344 column_id: 4,
1345 })
1346 .primary_key(vec![0, 1]);
1347 let region_metadata = builder.build().unwrap();
1348 Arc::new(region_metadata)
1349 }
1350
1351 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1352 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1353 }
1354
1355 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1356 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1357 }
1358
1359 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1360 let ts = values
1361 .timestamp
1362 .as_any()
1363 .downcast_ref::<TimestampMillisecondVector>()
1364 .unwrap();
1365
1366 let v0 = values.fields[0]
1367 .as_any()
1368 .downcast_ref::<Int64Vector>()
1369 .unwrap();
1370 let v1 = values.fields[1]
1371 .as_any()
1372 .downcast_ref::<Float64Vector>()
1373 .unwrap();
1374 let read = ts
1375 .iter_data()
1376 .zip(values.sequence.iter_data())
1377 .zip(values.op_type.iter_data())
1378 .zip(v0.iter_data())
1379 .zip(v1.iter_data())
1380 .map(|((((ts, sequence), op_type), v0), v1)| {
1381 (
1382 ts.unwrap().0.value(),
1383 sequence.unwrap(),
1384 op_type.unwrap(),
1385 v0.unwrap(),
1386 v1.unwrap(),
1387 )
1388 })
1389 .collect::<Vec<_>>();
1390 assert_eq!(expect, &read);
1391 }
1392
1393 #[test]
1394 fn test_series() {
1395 let region_metadata = schema_for_test();
1396 let mut series = Series::new(®ion_metadata);
1397 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1398 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1399 assert_eq!(2, series.active.timestamp.len());
1400 assert_eq!(0, series.frozen.len());
1401
1402 let values = series.compact(®ion_metadata).unwrap();
1403 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1404 assert_eq!(0, series.active.timestamp.len());
1405 assert_eq!(1, series.frozen.len());
1406 }
1407
1408 #[test]
1409 fn test_series_with_nulls() {
1410 let region_metadata = schema_for_test();
1411 let mut series = Series::new(®ion_metadata);
1412 series.push(
1415 ts_value_ref(1),
1416 0,
1417 OpType::Put,
1418 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1419 );
1420 series.push(
1421 ts_value_ref(1),
1422 0,
1423 OpType::Put,
1424 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1425 );
1426 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1427 series.push(
1428 ts_value_ref(1),
1429 3,
1430 OpType::Put,
1431 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1432 );
1433 assert_eq!(4, series.active.timestamp.len());
1434 assert_eq!(0, series.frozen.len());
1435
1436 let values = series.compact(®ion_metadata).unwrap();
1437 assert_eq!(values.fields[0].null_count(), 1);
1438 assert_eq!(values.fields[1].null_count(), 3);
1439 assert_eq!(0, series.active.timestamp.len());
1440 assert_eq!(1, series.frozen.len());
1441 }
1442
1443 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1444 let ts_len = batch.timestamps().len();
1445 assert_eq!(batch.sequences().len(), ts_len);
1446 assert_eq!(batch.op_types().len(), ts_len);
1447 for f in batch.fields() {
1448 assert_eq!(f.data.len(), ts_len);
1449 }
1450
1451 let mut rows = vec![];
1452 for idx in 0..ts_len {
1453 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1454 row.push(batch.timestamps().get(idx));
1455 row.push(batch.sequences().get(idx));
1456 row.push(batch.op_types().get(idx));
1457 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1458 rows.push(row);
1459 }
1460
1461 assert_eq!(expect.len(), rows.len());
1462 for (idx, row) in rows.iter().enumerate() {
1463 assert_eq!(&expect[idx], row);
1464 }
1465 }
1466
1467 #[test]
1468 fn test_values_sort() {
1469 let schema = schema_for_test();
1470 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1471 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1472 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1473
1474 let fields = vec![
1475 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1476 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1477 ];
1478 let values = Values {
1479 timestamp: timestamp as Arc<_>,
1480 sequence,
1481 op_type,
1482 fields,
1483 };
1484
1485 let batch = values
1486 .to_batch(
1487 b"test",
1488 &schema,
1489 &[0, 1, 2, 3, 4].into_iter().collect(),
1490 None,
1491 true,
1492 MergeMode::LastRow,
1493 )
1494 .unwrap();
1495 check_value(
1496 &batch,
1497 vec![
1498 vec![
1499 Value::Timestamp(Timestamp::new_millisecond(1)),
1500 Value::UInt64(1),
1501 Value::UInt8(1),
1502 Value::Int64(4),
1503 Value::Float64(OrderedFloat(1.1)),
1504 ],
1505 vec![
1506 Value::Timestamp(Timestamp::new_millisecond(2)),
1507 Value::UInt64(1),
1508 Value::UInt8(1),
1509 Value::Int64(3),
1510 Value::Float64(OrderedFloat(2.1)),
1511 ],
1512 vec![
1513 Value::Timestamp(Timestamp::new_millisecond(3)),
1514 Value::UInt64(2),
1515 Value::UInt8(0),
1516 Value::Int64(2),
1517 Value::Float64(OrderedFloat(4.2)),
1518 ],
1519 vec![
1520 Value::Timestamp(Timestamp::new_millisecond(4)),
1521 Value::UInt64(1),
1522 Value::UInt8(1),
1523 Value::Int64(1),
1524 Value::Float64(OrderedFloat(3.3)),
1525 ],
1526 ],
1527 )
1528 }
1529
1530 #[test]
1531 fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1532 let schema = schema_for_test();
1533 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1534
1535 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1542 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1543 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1544 let fields = vec![
1545 Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1546 Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1547 ];
1548 let values = Values {
1549 timestamp: timestamp as Arc<_>,
1550 sequence,
1551 op_type,
1552 fields,
1553 };
1554
1555 let batch = values
1556 .to_batch(
1557 b"test",
1558 &schema,
1559 &projection,
1560 Some(SequenceRange::LtEq { max: 2 }),
1561 true,
1562 MergeMode::LastNonNull,
1563 )
1564 .unwrap();
1565
1566 check_value(
1567 &batch,
1568 vec![vec![
1569 Value::Timestamp(Timestamp::new_millisecond(1)),
1570 Value::UInt64(2),
1571 Value::UInt8(OpType::Put as u8),
1572 Value::Int64(10),
1573 Value::Float64(OrderedFloat(1.5)),
1574 ]],
1575 );
1576 }
1577
1578 #[test]
1579 fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1580 let schema = schema_for_test();
1581 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1582
1583 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1589 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1590 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1591 let fields = vec![
1592 Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1593 Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1594 ];
1595 let values = Values {
1596 timestamp: timestamp as Arc<_>,
1597 sequence,
1598 op_type,
1599 fields,
1600 };
1601
1602 let batch = values
1603 .to_batch(
1604 b"test",
1605 &schema,
1606 &projection,
1607 Some(SequenceRange::Gt { min: 1 }),
1608 true,
1609 MergeMode::LastNonNull,
1610 )
1611 .unwrap();
1612
1613 check_value(
1614 &batch,
1615 vec![vec![
1616 Value::Timestamp(Timestamp::new_millisecond(1)),
1617 Value::UInt64(2),
1618 Value::UInt8(OpType::Put as u8),
1619 Value::Null,
1620 Value::Float64(OrderedFloat(1.0)),
1621 ]],
1622 );
1623 }
1624
1625 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1626 let column_schema = schema
1627 .column_metadatas
1628 .iter()
1629 .map(|c| api::v1::ColumnSchema {
1630 column_name: c.column_schema.name.clone(),
1631 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1632 .unwrap()
1633 .datatype() as i32,
1634 semantic_type: c.semantic_type as i32,
1635 ..Default::default()
1636 })
1637 .collect();
1638
1639 let rows = (0..len)
1640 .map(|i| {
1641 row(vec![
1642 ValueData::StringValue(k0.clone()),
1643 ValueData::I64Value(k1),
1644 ValueData::TimestampMillisecondValue(i as i64),
1645 ValueData::I64Value(i as i64),
1646 ValueData::F64Value(i as f64),
1647 ])
1648 })
1649 .collect();
1650 let mutation = api::v1::Mutation {
1651 op_type: 1,
1652 sequence: 0,
1653 rows: Some(Rows {
1654 schema: column_schema,
1655 rows,
1656 }),
1657 write_hint: None,
1658 };
1659 KeyValues::new(schema.as_ref(), mutation).unwrap()
1660 }
1661
1662 #[test]
1663 fn test_series_set_concurrency() {
1664 let schema = schema_for_test();
1665 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1666 schema
1667 .primary_key_columns()
1668 .map(|c| {
1669 (
1670 c.column_id,
1671 SortField::new(c.column_schema.data_type.clone()),
1672 )
1673 })
1674 .collect(),
1675 ));
1676 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1677
1678 let concurrency = 32;
1679 let pk_num = concurrency * 2;
1680 let mut handles = Vec::with_capacity(concurrency);
1681 for i in 0..concurrency {
1682 let set = set.clone();
1683 let schema = schema.clone();
1684 let column_schemas = schema
1685 .column_metadatas
1686 .iter()
1687 .map(column_metadata_to_column_schema)
1688 .collect::<Vec<_>>();
1689 let handle = std::thread::spawn(move || {
1690 for j in i * 100..(i + 1) * 100 {
1691 let pk = j % pk_num;
1692 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1693
1694 let kvs = KeyValues::new(
1695 &schema,
1696 Mutation {
1697 op_type: OpType::Put as i32,
1698 sequence: j as u64,
1699 rows: Some(Rows {
1700 schema: column_schemas.clone(),
1701 rows: vec![row(vec![
1702 ValueData::StringValue(format!("{}", j)),
1703 ValueData::I64Value(j as i64),
1704 ValueData::TimestampMillisecondValue(j as i64),
1705 ValueData::I64Value(j as i64),
1706 ValueData::F64Value(j as f64),
1707 ])],
1708 }),
1709 write_hint: None,
1710 },
1711 )
1712 .unwrap();
1713 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1714 }
1715 });
1716 handles.push(handle);
1717 }
1718 for h in handles {
1719 h.join().unwrap();
1720 }
1721
1722 let mut timestamps = Vec::with_capacity(concurrency * 100);
1723 let mut sequences = Vec::with_capacity(concurrency * 100);
1724 let mut op_types = Vec::with_capacity(concurrency * 100);
1725 let mut v0 = Vec::with_capacity(concurrency * 100);
1726
1727 for i in 0..pk_num {
1728 let pk = format!("pk-{}", i).as_bytes().to_vec();
1729 let series = set.get_series(&pk).unwrap();
1730 let mut guard = series.write().unwrap();
1731 let values = guard.compact(&schema).unwrap();
1732 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1733 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1734 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1735 v0.extend(
1736 values
1737 .fields
1738 .first()
1739 .unwrap()
1740 .as_any()
1741 .downcast_ref::<Int64Vector>()
1742 .unwrap()
1743 .iter_data()
1744 .map(|v| v.unwrap()),
1745 );
1746 }
1747
1748 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1749 assert_eq!(
1750 expected_sequence,
1751 sequences.iter().copied().collect::<HashSet<_>>()
1752 );
1753
1754 op_types.iter().all(|op| *op == OpType::Put as u8);
1755 assert_eq!(
1756 expected_sequence,
1757 timestamps.iter().copied().collect::<HashSet<_>>()
1758 );
1759
1760 assert_eq!(timestamps, sequences);
1761 assert_eq!(v0, timestamps);
1762 }
1763
1764 #[test]
1765 fn test_memtable() {
1766 common_telemetry::init_default_ut_logging();
1767 check_memtable_dedup(true);
1768 check_memtable_dedup(false);
1769 }
1770
1771 fn check_memtable_dedup(dedup: bool) {
1772 let schema = schema_for_test();
1773 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1774 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1775 memtable.write(&kvs).unwrap();
1776 memtable.write(&kvs).unwrap();
1777
1778 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1779 for ts in kvs.iter().map(|kv| {
1780 kv.timestamp()
1781 .try_into_timestamp()
1782 .unwrap()
1783 .unwrap()
1784 .value()
1785 }) {
1786 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1787 }
1788
1789 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1790 let range = ranges.ranges.into_values().next().unwrap();
1791 let iter = range.build_iter().unwrap();
1792 let mut read = HashMap::new();
1793
1794 for ts in iter
1795 .flat_map(|batch| {
1796 batch
1797 .unwrap()
1798 .timestamps()
1799 .as_any()
1800 .downcast_ref::<TimestampMillisecondVector>()
1801 .unwrap()
1802 .iter_data()
1803 .collect::<Vec<_>>()
1804 .into_iter()
1805 })
1806 .map(|v| v.unwrap().0.value())
1807 {
1808 *read.entry(ts).or_default() += 1;
1809 }
1810 assert_eq!(expected_ts, read);
1811
1812 let stats = memtable.stats();
1813 assert!(stats.bytes_allocated() > 0);
1814 assert_eq!(
1815 Some((
1816 Timestamp::new_millisecond(0),
1817 Timestamp::new_millisecond(99)
1818 )),
1819 stats.time_range()
1820 );
1821 }
1822
1823 #[test]
1824 fn test_memtable_projection() {
1825 common_telemetry::init_default_ut_logging();
1826 let schema = schema_for_test();
1827 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1828 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1829 memtable.write(&kvs).unwrap();
1830
1831 let iter = memtable
1832 .ranges(Some(&[3]), RangesOptions::default())
1833 .unwrap()
1834 .build(None)
1835 .unwrap();
1836
1837 let mut v0_all = vec![];
1838
1839 for res in iter {
1840 let batch = res.unwrap();
1841 assert_eq!(1, batch.fields().len());
1842 let v0 = batch
1843 .fields()
1844 .first()
1845 .unwrap()
1846 .data
1847 .as_any()
1848 .downcast_ref::<Int64Vector>()
1849 .unwrap();
1850 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1851 }
1852 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1853 }
1854
1855 #[test]
1856 fn test_memtable_concurrent_write_read() {
1857 common_telemetry::init_default_ut_logging();
1858 let schema = schema_for_test();
1859 let memtable = Arc::new(TimeSeriesMemtable::new(
1860 schema.clone(),
1861 42,
1862 None,
1863 true,
1864 MergeMode::LastRow,
1865 ));
1866
1867 let num_writers = 10;
1869 let num_readers = 5;
1871 let series_per_writer = 100;
1873 let rows_per_series = 10;
1875 let total_series = num_writers * series_per_writer;
1877
1878 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1880
1881 let mut writer_handles = Vec::with_capacity(num_writers);
1883 for writer_id in 0..num_writers {
1884 let memtable = memtable.clone();
1885 let schema = schema.clone();
1886 let barrier = barrier.clone();
1887
1888 let handle = std::thread::spawn(move || {
1889 barrier.wait();
1891
1892 for series_id in 0..series_per_writer {
1894 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1895 let kvs =
1896 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1897 memtable.write(&kvs).unwrap();
1898 }
1899 });
1900
1901 writer_handles.push(handle);
1902 }
1903
1904 let mut reader_handles = Vec::with_capacity(num_readers);
1906 for _ in 0..num_readers {
1907 let memtable = memtable.clone();
1908 let barrier = barrier.clone();
1909
1910 let handle = std::thread::spawn(move || {
1911 barrier.wait();
1912
1913 for _ in 0..10 {
1914 let iter = memtable
1915 .ranges(None, RangesOptions::default())
1916 .unwrap()
1917 .build(None)
1918 .unwrap();
1919 for batch_result in iter {
1920 let _ = batch_result.unwrap();
1921 }
1922 }
1923 });
1924
1925 reader_handles.push(handle);
1926 }
1927
1928 barrier.wait();
1929
1930 for handle in writer_handles {
1931 handle.join().unwrap();
1932 }
1933 for handle in reader_handles {
1934 handle.join().unwrap();
1935 }
1936
1937 let iter = memtable
1938 .ranges(None, RangesOptions::default())
1939 .unwrap()
1940 .build(None)
1941 .unwrap();
1942 let mut series_count = 0;
1943 let mut row_count = 0;
1944
1945 for batch_result in iter {
1946 let batch = batch_result.unwrap();
1947 series_count += 1;
1948 row_count += batch.num_rows();
1949 }
1950 assert_eq!(total_series, series_count);
1951 assert_eq!(total_series * rows_per_series, row_count);
1952 }
1953
1954 #[test]
1955 fn test_build_record_batch_iter_from_memtable() {
1956 let schema = schema_for_test();
1957 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1958
1959 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1960 memtable.write(&kvs).unwrap();
1961
1962 let read_column_ids: Vec<ColumnId> = schema
1963 .column_metadatas
1964 .iter()
1965 .map(|c| c.column_id)
1966 .collect();
1967 let ranges = memtable
1968 .ranges(Some(&read_column_ids), RangesOptions::default())
1969 .unwrap();
1970 assert_eq!(1, ranges.ranges.len());
1971
1972 let range = ranges.ranges.into_values().next().unwrap();
1973 let mut iter = range.build_record_batch_iter(None, None).unwrap();
1974 let rb = iter.next().transpose().unwrap().unwrap();
1975 assert_eq!(10, rb.num_rows());
1976 let schema = rb.schema();
1978 let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1979 assert_eq!(
1980 column_names,
1981 vec![
1982 "k0",
1983 "k1",
1984 "v0",
1985 "v1",
1986 "ts",
1987 "__primary_key",
1988 "__sequence",
1989 "__op_type",
1990 ]
1991 );
1992 assert!(iter.next().is_none());
1993 }
1994
1995 #[test]
1996 fn test_build_record_batch_iter_with_time_range() {
1997 let schema = schema_for_test();
1998 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1999
2000 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
2001 memtable.write(&kvs).unwrap();
2002
2003 let read_column_ids: Vec<ColumnId> = schema
2004 .column_metadatas
2005 .iter()
2006 .map(|c| c.column_id)
2007 .collect();
2008 let ranges = memtable
2009 .ranges(Some(&read_column_ids), RangesOptions::default())
2010 .unwrap();
2011 assert_eq!(1, ranges.ranges.len());
2012
2013 let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
2014
2015 let range = ranges.ranges.into_values().next().unwrap();
2016 let mut iter = range
2017 .build_record_batch_iter(Some(time_range), None)
2018 .unwrap();
2019
2020 let mut total_rows = 0;
2021 let mut all_timestamps = Vec::new();
2022 while let Some(rb) = iter.next().transpose().unwrap() {
2023 total_rows += rb.num_rows();
2024 let ts_col = rb
2025 .column_by_name("ts")
2026 .unwrap()
2027 .as_any()
2028 .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2029 .unwrap();
2030 for i in 0..ts_col.len() {
2031 all_timestamps.push(ts_col.value(i));
2032 }
2033 }
2034 assert_eq!(5, total_rows);
2035 all_timestamps.sort();
2036 assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2037 }
2038
2039 fn build_iter_builder(
2041 schema: &RegionMetadataRef,
2042 memtable: &TimeSeriesMemtable,
2043 projection: Option<&[ColumnId]>,
2044 dedup: bool,
2045 merge_mode: MergeMode,
2046 sequence: Option<SequenceRange>,
2047 ) -> TimeSeriesIterBuilder {
2048 let read_column_ids = read_column_ids_from_projection(schema, projection);
2049 let field_projection = if let Some(projection) = projection {
2050 projection.iter().copied().collect()
2051 } else {
2052 schema.field_columns().map(|c| c.column_id).collect()
2053 };
2054 let adapter_context = Arc::new(BatchToRecordBatchContext::new(
2055 schema.clone(),
2056 read_column_ids,
2057 ));
2058 TimeSeriesIterBuilder {
2059 series_set: memtable.series_set.clone(),
2060 projection: field_projection,
2061 predicate: PredicateGroup::default(),
2062 dedup,
2063 merge_mode,
2064 sequence,
2065 batch_to_record_batch: adapter_context,
2066 }
2067 }
2068
2069 #[test]
2070 fn test_iter_builder_build_record_batch_basic() {
2071 let schema = schema_for_test();
2072 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2073
2074 let kvs = build_key_values(&schema, "hello".to_string(), 42, 10);
2075 memtable.write(&kvs).unwrap();
2076
2077 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2078
2079 let mut iter = builder.build_record_batch(None, None).unwrap();
2080 let rb = iter.next().transpose().unwrap().unwrap();
2081 assert_eq!(10, rb.num_rows());
2082
2083 let rb_schema = rb.schema();
2084 let col_names: Vec<_> = rb_schema
2085 .fields()
2086 .iter()
2087 .map(|f| f.name().as_str())
2088 .collect();
2089 assert_eq!(
2090 col_names,
2091 vec![
2092 "k0",
2093 "k1",
2094 "v0",
2095 "v1",
2096 "ts",
2097 "__primary_key",
2098 "__sequence",
2099 "__op_type",
2100 ]
2101 );
2102
2103 assert!(iter.next().is_none());
2104 }
2105
2106 #[test]
2107 fn test_iter_builder_build_record_batch_with_projection() {
2108 let schema = schema_for_test();
2109 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2110
2111 let kvs = build_key_values(&schema, "test".to_string(), 1, 5);
2112 memtable.write(&kvs).unwrap();
2113
2114 let projection = vec![2, 3];
2116 let builder = build_iter_builder(
2117 &schema,
2118 &memtable,
2119 Some(&projection),
2120 true,
2121 MergeMode::LastRow,
2122 None,
2123 );
2124
2125 let mut iter = builder.build_record_batch(None, None).unwrap();
2126 let rb = iter.next().transpose().unwrap().unwrap();
2127 assert_eq!(5, rb.num_rows());
2128
2129 let rb_schema = rb.schema();
2130 let col_names: Vec<_> = rb_schema
2131 .fields()
2132 .iter()
2133 .map(|f| f.name().as_str())
2134 .collect();
2135 assert_eq!(
2137 col_names,
2138 vec!["v0", "ts", "__primary_key", "__sequence", "__op_type",]
2139 );
2140
2141 assert!(iter.next().is_none());
2142 }
2143
2144 #[test]
2145 fn test_iter_builder_build_record_batch_multiple_series() {
2146 let schema = schema_for_test();
2147 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2148
2149 let kvs_a = build_key_values(&schema, "aaa".to_string(), 1, 3);
2150 let kvs_b = build_key_values(&schema, "bbb".to_string(), 2, 4);
2151 memtable.write(&kvs_a).unwrap();
2152 memtable.write(&kvs_b).unwrap();
2153
2154 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2155
2156 let iter = builder.build_record_batch(None, None).unwrap();
2157 let mut total_rows = 0;
2158 for rb in iter {
2159 let rb = rb.unwrap();
2160 total_rows += rb.num_rows();
2161 assert_eq!(8, rb.num_columns());
2162 }
2163 assert_eq!(7, total_rows);
2164 }
2165
2166 #[test]
2167 fn test_iter_builder_build_record_batch_dedup() {
2168 let schema = schema_for_test();
2169 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2170
2171 let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2173 memtable.write(&kvs).unwrap();
2174 memtable.write(&kvs).unwrap();
2175
2176 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2177
2178 let iter = builder.build_record_batch(None, None).unwrap();
2179 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2180 assert_eq!(5, total_rows);
2181 }
2182
2183 #[test]
2184 fn test_iter_builder_build_record_batch_no_dedup() {
2185 let schema = schema_for_test();
2186 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, false, MergeMode::LastRow);
2187
2188 let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2189 memtable.write(&kvs).unwrap();
2190 memtable.write(&kvs).unwrap();
2191
2192 let builder = build_iter_builder(&schema, &memtable, None, false, MergeMode::LastRow, None);
2193
2194 let iter = builder.build_record_batch(None, None).unwrap();
2195 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2196 assert_eq!(10, total_rows);
2197 }
2198
2199 #[test]
2200 fn test_iter_builder_build_record_batch_with_sequence_filter() {
2201 let schema = schema_for_test();
2202 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2203
2204 let kvs = build_key_values(&schema, "seq".to_string(), 1, 5);
2207 memtable.write(&kvs).unwrap();
2208
2209 let builder = build_iter_builder(
2211 &schema,
2212 &memtable,
2213 None,
2214 true,
2215 MergeMode::LastRow,
2216 Some(SequenceRange::Gt { min: 4 }),
2217 );
2218
2219 let iter = builder.build_record_batch(None, None).unwrap();
2220 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2221 assert_eq!(0, total_rows);
2222
2223 let builder = build_iter_builder(
2225 &schema,
2226 &memtable,
2227 None,
2228 true,
2229 MergeMode::LastRow,
2230 Some(SequenceRange::LtEq { max: 2 }),
2231 );
2232
2233 let iter = builder.build_record_batch(None, None).unwrap();
2234 let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2235 assert_eq!(3, total_rows);
2236 }
2237
2238 #[test]
2239 fn test_iter_builder_build_record_batch_data_correctness() {
2240 use datatypes::arrow::array::{
2241 Float64Array, Int64Array, TimestampMillisecondArray, UInt8Array,
2242 };
2243
2244 let schema = schema_for_test();
2245 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2246
2247 let kvs = build_key_values(&schema, "check".to_string(), 7, 3);
2248 memtable.write(&kvs).unwrap();
2249
2250 let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2251
2252 let mut iter = builder.build_record_batch(None, None).unwrap();
2253 let rb = iter.next().transpose().unwrap().unwrap();
2254 assert_eq!(3, rb.num_rows());
2255
2256 let ts_col = rb
2258 .column_by_name("ts")
2259 .unwrap()
2260 .as_any()
2261 .downcast_ref::<TimestampMillisecondArray>()
2262 .unwrap();
2263 let timestamps: Vec<_> = (0..ts_col.len()).map(|i| ts_col.value(i)).collect();
2264 assert_eq!(vec![0, 1, 2], timestamps);
2265
2266 let v0_col = rb
2268 .column_by_name("v0")
2269 .unwrap()
2270 .as_any()
2271 .downcast_ref::<Int64Array>()
2272 .unwrap();
2273 let v0_values: Vec<_> = (0..v0_col.len()).map(|i| v0_col.value(i)).collect();
2274 assert_eq!(vec![0, 1, 2], v0_values);
2275
2276 let v1_col = rb
2278 .column_by_name("v1")
2279 .unwrap()
2280 .as_any()
2281 .downcast_ref::<Float64Array>()
2282 .unwrap();
2283 let v1_values: Vec<_> = (0..v1_col.len()).map(|i| v1_col.value(i)).collect();
2284 assert_eq!(vec![0.0, 1.0, 2.0], v1_values);
2285
2286 let op_col = rb
2288 .column_by_name("__op_type")
2289 .unwrap()
2290 .as_any()
2291 .downcast_ref::<UInt8Array>()
2292 .unwrap();
2293 for i in 0..op_col.len() {
2294 assert_eq!(OpType::Put as u8, op_col.value(i));
2295 }
2296
2297 assert!(iter.next().is_none());
2298 }
2299}