1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56 MemtableStats, RangesOptions,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115
116 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117 false
120 }
121}
122
123pub struct TimeSeriesMemtable {
125 id: MemtableId,
126 region_metadata: RegionMetadataRef,
127 row_codec: Arc<DensePrimaryKeyCodec>,
128 series_set: SeriesSet,
129 alloc_tracker: AllocTracker,
130 max_timestamp: AtomicI64,
131 min_timestamp: AtomicI64,
132 max_sequence: AtomicU64,
133 dedup: bool,
134 merge_mode: MergeMode,
135 num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140 pub fn new(
141 region_metadata: RegionMetadataRef,
142 id: MemtableId,
143 write_buffer_manager: Option<WriteBufferManagerRef>,
144 dedup: bool,
145 merge_mode: MergeMode,
146 ) -> Self {
147 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
148 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149 Self {
150 id,
151 region_metadata,
152 series_set,
153 row_codec,
154 alloc_tracker: AllocTracker::new(write_buffer_manager),
155 max_timestamp: AtomicI64::new(i64::MIN),
156 min_timestamp: AtomicI64::new(i64::MAX),
157 max_sequence: AtomicU64::new(0),
158 dedup,
159 merge_mode,
160 num_rows: Default::default(),
161 }
162 }
163
164 fn update_stats(&self, stats: WriteMetrics) {
166 self.alloc_tracker
167 .on_allocation(stats.key_bytes + stats.value_bytes);
168 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
169 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
170 self.max_sequence
171 .fetch_max(stats.max_sequence, Ordering::SeqCst);
172 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
173 }
174
175 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
176 ensure!(
177 self.row_codec.num_fields() == kv.num_primary_keys(),
178 PrimaryKeyLengthMismatchSnafu {
179 expect: self.row_codec.num_fields(),
180 actual: kv.num_primary_keys(),
181 }
182 );
183
184 let primary_key_encoded = self
185 .row_codec
186 .encode(kv.primary_keys())
187 .context(EncodeSnafu)?;
188
189 let (key_allocated, value_allocated) =
190 self.series_set.push_to_series(primary_key_encoded, &kv);
191 stats.key_bytes += key_allocated;
192 stats.value_bytes += value_allocated;
193
194 let ts = kv
196 .timestamp()
197 .try_into_timestamp()
198 .unwrap()
199 .unwrap()
200 .value();
201 stats.min_ts = stats.min_ts.min(ts);
202 stats.max_ts = stats.max_ts.max(ts);
203 Ok(())
204 }
205}
206
207impl Debug for TimeSeriesMemtable {
208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("TimeSeriesMemtable").finish()
210 }
211}
212
213impl Memtable for TimeSeriesMemtable {
214 fn id(&self) -> MemtableId {
215 self.id
216 }
217
218 fn write(&self, kvs: &KeyValues) -> Result<()> {
219 if kvs.is_empty() {
220 return Ok(());
221 }
222
223 let mut local_stats = WriteMetrics::default();
224
225 for kv in kvs.iter() {
226 self.write_key_value(kv, &mut local_stats)?;
227 }
228 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230 local_stats.max_sequence = kvs.max_sequence();
231 local_stats.num_rows = kvs.num_rows();
232 self.update_stats(local_stats);
236 Ok(())
237 }
238
239 fn write_one(&self, key_value: KeyValue) -> Result<()> {
240 let mut metrics = WriteMetrics::default();
241 let res = self.write_key_value(key_value, &mut metrics);
242 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243 metrics.max_sequence = key_value.sequence();
244 metrics.num_rows = 1;
245
246 if res.is_ok() {
247 self.update_stats(metrics);
248 }
249 res
250 }
251
252 fn write_bulk(&self, part: BulkPart) -> Result<()> {
253 let mutation = part.to_mutation(&self.region_metadata)?;
255 let mut metrics = WriteMetrics::default();
256 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257 for kv in key_values.iter() {
258 self.write_key_value(kv, &mut metrics)?
259 }
260 }
261
262 metrics.max_sequence = part.sequence;
263 metrics.max_ts = part.max_timestamp;
264 metrics.min_ts = part.min_timestamp;
265 metrics.num_rows = part.num_rows();
266 self.update_stats(metrics);
267 Ok(())
268 }
269
270 #[cfg(any(test, feature = "test"))]
271 fn iter(
272 &self,
273 projection: Option<&[ColumnId]>,
274 filters: Option<Predicate>,
275 sequence: Option<SequenceRange>,
276 ) -> Result<BoxedBatchIterator> {
277 let projection = if let Some(projection) = projection {
278 projection.iter().copied().collect()
279 } else {
280 self.region_metadata
281 .field_columns()
282 .map(|c| c.column_id)
283 .collect()
284 };
285
286 let iter = self.series_set.iter_series(
287 projection,
288 filters,
289 self.dedup,
290 self.merge_mode,
291 sequence,
292 None,
293 )?;
294
295 if self.merge_mode == MergeMode::LastNonNull {
296 let iter = LastNonNullIter::new(iter);
297 Ok(Box::new(iter))
298 } else {
299 Ok(Box::new(iter))
300 }
301 }
302
303 fn ranges(
304 &self,
305 projection: Option<&[ColumnId]>,
306 options: RangesOptions,
307 ) -> Result<MemtableRanges> {
308 let predicate = options.predicate;
309 let sequence = options.sequence;
310 let projection = if let Some(projection) = projection {
311 projection.iter().copied().collect()
312 } else {
313 self.region_metadata
314 .field_columns()
315 .map(|c| c.column_id)
316 .collect()
317 };
318 let builder = Box::new(TimeSeriesIterBuilder {
319 series_set: self.series_set.clone(),
320 projection,
321 predicate: predicate.predicate().cloned(),
322 dedup: self.dedup,
323 merge_mode: self.merge_mode,
324 sequence,
325 });
326 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
327
328 let range_stats = self.stats();
329 let range = MemtableRange::new(context, range_stats);
330 Ok(MemtableRanges {
331 ranges: [(0, range)].into(),
332 })
333 }
334
335 fn is_empty(&self) -> bool {
336 self.series_set.series.read().unwrap().0.is_empty()
337 }
338
339 fn freeze(&self) -> Result<()> {
340 self.alloc_tracker.done_allocating();
341
342 Ok(())
343 }
344
345 fn stats(&self) -> MemtableStats {
346 let estimated_bytes = self.alloc_tracker.bytes_allocated();
347
348 if estimated_bytes == 0 {
349 return MemtableStats {
351 estimated_bytes,
352 time_range: None,
353 num_rows: 0,
354 num_ranges: 0,
355 max_sequence: 0,
356 series_count: 0,
357 };
358 }
359 let ts_type = self
360 .region_metadata
361 .time_index_column()
362 .column_schema
363 .data_type
364 .clone()
365 .as_timestamp()
366 .expect("Timestamp column must have timestamp type");
367 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
368 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
369 let series_count = self.series_set.series.read().unwrap().0.len();
370 MemtableStats {
371 estimated_bytes,
372 time_range: Some((min_timestamp, max_timestamp)),
373 num_rows: self.num_rows.load(Ordering::Relaxed),
374 num_ranges: 1,
375 max_sequence: self.max_sequence.load(Ordering::Relaxed),
376 series_count,
377 }
378 }
379
380 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
381 Arc::new(TimeSeriesMemtable::new(
382 metadata.clone(),
383 id,
384 self.alloc_tracker.write_buffer_manager(),
385 self.dedup,
386 self.merge_mode,
387 ))
388 }
389}
390
391#[derive(Default)]
392struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
393
394impl Drop for SeriesMap {
395 fn drop(&mut self) {
396 let num_series = self.0.len();
397 let num_field_builders = self
398 .0
399 .values()
400 .map(|v| v.read().unwrap().active.num_field_builders())
401 .sum::<usize>();
402 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
403 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
404 }
405}
406
407#[derive(Clone)]
408pub(crate) struct SeriesSet {
409 region_metadata: RegionMetadataRef,
410 series: Arc<RwLock<SeriesMap>>,
411 codec: Arc<DensePrimaryKeyCodec>,
412}
413
414impl SeriesSet {
415 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
416 Self {
417 region_metadata,
418 series: Default::default(),
419 codec,
420 }
421 }
422}
423
424impl SeriesSet {
425 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
427 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
428 let value_allocated = series.write().unwrap().push(
429 kv.timestamp(),
430 kv.sequence(),
431 kv.op_type(),
432 kv.fields(),
433 );
434 return (0, value_allocated);
435 };
436
437 let mut indices = self.series.write().unwrap();
438 match indices.0.entry(primary_key) {
439 Entry::Vacant(v) => {
440 let key_len = v.key().len();
441 let mut series = Series::new(&self.region_metadata);
442 let value_allocated =
443 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
444 v.insert(Arc::new(RwLock::new(series)));
445 (key_len, value_allocated)
446 }
447 Entry::Occupied(v) => {
449 let value_allocated = v.get().write().unwrap().push(
450 kv.timestamp(),
451 kv.sequence(),
452 kv.op_type(),
453 kv.fields(),
454 );
455 (0, value_allocated)
456 }
457 }
458 }
459
460 #[cfg(test)]
461 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
462 self.series.read().unwrap().0.get(primary_key).cloned()
463 }
464
465 fn iter_series(
467 &self,
468 projection: HashSet<ColumnId>,
469 predicate: Option<Predicate>,
470 dedup: bool,
471 merge_mode: MergeMode,
472 sequence: Option<SequenceRange>,
473 mem_scan_metrics: Option<MemScanMetrics>,
474 ) -> Result<Iter> {
475 let primary_key_schema = primary_key_schema(&self.region_metadata);
476 let primary_key_datatypes = self
477 .region_metadata
478 .primary_key_columns()
479 .map(|pk| pk.column_schema.data_type.clone())
480 .collect();
481
482 Iter::try_new(
483 self.region_metadata.clone(),
484 self.series.clone(),
485 projection,
486 predicate,
487 primary_key_schema,
488 primary_key_datatypes,
489 self.codec.clone(),
490 dedup,
491 merge_mode,
492 sequence,
493 mem_scan_metrics,
494 )
495 }
496}
497
498pub(crate) fn primary_key_schema(
501 region_metadata: &RegionMetadataRef,
502) -> arrow::datatypes::SchemaRef {
503 let fields = region_metadata
504 .primary_key_columns()
505 .map(|pk| {
506 arrow::datatypes::Field::new(
507 pk.column_schema.name.clone(),
508 pk.column_schema.data_type.as_arrow_type(),
509 pk.column_schema.is_nullable(),
510 )
511 })
512 .collect::<Vec<_>>();
513 Arc::new(arrow::datatypes::Schema::new(fields))
514}
515
516#[derive(Debug, Default)]
518struct Metrics {
519 total_series: usize,
521 num_pruned_series: usize,
523 num_rows: usize,
525 num_batches: usize,
527 scan_cost: Duration,
529}
530
531struct Iter {
532 metadata: RegionMetadataRef,
533 series: Arc<RwLock<SeriesMap>>,
534 projection: HashSet<ColumnId>,
535 last_key: Option<Vec<u8>>,
536 predicate: Vec<SimpleFilterEvaluator>,
537 pk_schema: arrow::datatypes::SchemaRef,
538 pk_datatypes: Vec<ConcreteDataType>,
539 codec: Arc<DensePrimaryKeyCodec>,
540 dedup: bool,
541 merge_mode: MergeMode,
542 sequence: Option<SequenceRange>,
543 metrics: Metrics,
544 mem_scan_metrics: Option<MemScanMetrics>,
545}
546
547impl Iter {
548 #[allow(clippy::too_many_arguments)]
549 pub(crate) fn try_new(
550 metadata: RegionMetadataRef,
551 series: Arc<RwLock<SeriesMap>>,
552 projection: HashSet<ColumnId>,
553 predicate: Option<Predicate>,
554 pk_schema: arrow::datatypes::SchemaRef,
555 pk_datatypes: Vec<ConcreteDataType>,
556 codec: Arc<DensePrimaryKeyCodec>,
557 dedup: bool,
558 merge_mode: MergeMode,
559 sequence: Option<SequenceRange>,
560 mem_scan_metrics: Option<MemScanMetrics>,
561 ) -> Result<Self> {
562 let predicate = predicate
563 .map(|predicate| {
564 predicate
565 .exprs()
566 .iter()
567 .filter_map(SimpleFilterEvaluator::try_new)
568 .collect::<Vec<_>>()
569 })
570 .unwrap_or_default();
571 Ok(Self {
572 metadata,
573 series,
574 projection,
575 last_key: None,
576 predicate,
577 pk_schema,
578 pk_datatypes,
579 codec,
580 dedup,
581 merge_mode,
582 sequence,
583 metrics: Metrics::default(),
584 mem_scan_metrics,
585 })
586 }
587
588 fn report_mem_scan_metrics(&mut self) {
589 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
590 let inner = crate::memtable::MemScanMetricsData {
591 total_series: self.metrics.total_series,
592 num_rows: self.metrics.num_rows,
593 num_batches: self.metrics.num_batches,
594 scan_cost: self.metrics.scan_cost,
595 };
596 mem_scan_metrics.merge_inner(&inner);
597 }
598 }
599}
600
601impl Drop for Iter {
602 fn drop(&mut self) {
603 debug!(
604 "Iter {} time series memtable, metrics: {:?}",
605 self.metadata.region_id, self.metrics
606 );
607
608 self.report_mem_scan_metrics();
610
611 READ_ROWS_TOTAL
612 .with_label_values(&["time_series_memtable"])
613 .inc_by(self.metrics.num_rows as u64);
614 READ_STAGE_ELAPSED
615 .with_label_values(&["scan_memtable"])
616 .observe(self.metrics.scan_cost.as_secs_f64());
617 }
618}
619
620impl Iterator for Iter {
621 type Item = Result<Batch>;
622
623 fn next(&mut self) -> Option<Self::Item> {
624 let start = Instant::now();
625 let map = self.series.read().unwrap();
626 let range = match &self.last_key {
627 None => map.0.range::<Vec<u8>, _>(..),
628 Some(last_key) => map
629 .0
630 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
631 };
632
633 for (primary_key, series) in range {
635 self.metrics.total_series += 1;
636
637 let mut series = series.write().unwrap();
638 if !self.predicate.is_empty()
639 && !prune_primary_key(
640 &self.codec,
641 primary_key.as_slice(),
642 &mut series,
643 &self.pk_datatypes,
644 self.pk_schema.clone(),
645 &self.predicate,
646 )
647 {
648 self.metrics.num_pruned_series += 1;
650 continue;
651 }
652 self.last_key = Some(primary_key.clone());
653
654 let values = series.compact(&self.metadata);
655 let batch = values.and_then(|v| {
656 v.to_batch(
657 primary_key,
658 &self.metadata,
659 &self.projection,
660 self.sequence,
661 self.dedup,
662 self.merge_mode,
663 )
664 });
665
666 self.metrics.num_batches += 1;
668 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
669 self.metrics.scan_cost += start.elapsed();
670
671 return Some(batch);
672 }
673 drop(map); self.metrics.scan_cost += start.elapsed();
675
676 self.report_mem_scan_metrics();
678
679 None
680 }
681}
682
683fn prune_primary_key(
684 codec: &Arc<DensePrimaryKeyCodec>,
685 pk: &[u8],
686 series: &mut Series,
687 datatypes: &[ConcreteDataType],
688 pk_schema: arrow::datatypes::SchemaRef,
689 predicates: &[SimpleFilterEvaluator],
690) -> bool {
691 if pk_schema.fields().is_empty() {
693 return true;
694 }
695
696 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
698 pk_values
699 } else {
700 let pk_values = codec.decode_dense_without_column_id(pk);
701 if let Err(e) = pk_values {
702 error!(e; "Failed to decode primary key");
703 return true;
704 }
705 series.update_pk_cache(pk_values.unwrap());
706 series.pk_cache.as_ref().unwrap()
707 };
708
709 let mut result = true;
711 for predicate in predicates {
712 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
714 continue;
715 };
716 let scalar_value = pk_values[index]
718 .try_to_scalar_value(&datatypes[index])
719 .unwrap();
720 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
721 }
722
723 result
724}
725
726pub struct Series {
728 pk_cache: Option<Vec<Value>>,
729 active: ValueBuilder,
730 frozen: Vec<Values>,
731 region_metadata: RegionMetadataRef,
732 capacity: usize,
733}
734
735impl Series {
736 pub(crate) fn with_capacity(
737 region_metadata: &RegionMetadataRef,
738 init_capacity: usize,
739 capacity: usize,
740 ) -> Self {
741 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
742 Self {
743 pk_cache: None,
744 active: ValueBuilder::new(region_metadata, init_capacity),
745 frozen: vec![],
746 region_metadata: region_metadata.clone(),
747 capacity,
748 }
749 }
750
751 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
752 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
753 }
754
755 pub fn is_empty(&self) -> bool {
756 self.active.len() == 0 && self.frozen.is_empty()
757 }
758
759 pub(crate) fn push<'a>(
761 &mut self,
762 ts: ValueRef<'a>,
763 sequence: u64,
764 op_type: OpType,
765 values: impl Iterator<Item = ValueRef<'a>>,
766 ) -> usize {
767 if self.active.len() + 10 > self.capacity {
769 let region_metadata = self.region_metadata.clone();
770 self.freeze(®ion_metadata);
771 }
772 self.active.push(ts, sequence, op_type as u8, values)
773 }
774
775 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
776 self.pk_cache = Some(pk_values);
777 }
778
779 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
781 if self.active.len() != 0 {
782 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
783 std::mem::swap(&mut self.active, &mut builder);
784 self.frozen.push(Values::from(builder));
785 }
786 }
787
788 pub(crate) fn extend(
789 &mut self,
790 ts_v: VectorRef,
791 op_type_v: u8,
792 sequence_v: u64,
793 fields: Vec<VectorRef>,
794 ) -> Result<()> {
795 if !self.active.can_accommodate(&fields)? {
796 let region_metadata = self.region_metadata.clone();
797 self.freeze(®ion_metadata);
798 }
799 self.active.extend(ts_v, op_type_v, sequence_v, fields)
800 }
801
802 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
805 self.freeze(region_metadata);
806
807 let frozen = &self.frozen;
808
809 debug_assert!(!frozen.is_empty());
811
812 if frozen.len() > 1 {
813 let column_size = frozen[0].fields.len() + 3;
817
818 if cfg!(debug_assertions) {
819 debug_assert!(
820 frozen
821 .iter()
822 .zip(frozen.iter().skip(1))
823 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
824 );
825 }
826
827 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
828 let concatenated = (0..column_size)
829 .map(|i| {
830 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
831 arrow::compute::concat(&to_concat)
832 })
833 .collect::<std::result::Result<Vec<_>, _>>()
834 .context(ComputeArrowSnafu)?;
835
836 debug_assert_eq!(concatenated.len(), column_size);
837 let values = Values::from_columns(&concatenated)?;
838 self.frozen = vec![values];
839 };
840 Ok(&self.frozen[0])
841 }
842
843 pub fn read_to_values(&self) -> Vec<Values> {
844 let mut res = Vec::with_capacity(self.frozen.len() + 1);
845 res.extend(self.frozen.iter().cloned());
846 res.push(self.active.finish_cloned());
847 res
848 }
849}
850
851pub(crate) struct ValueBuilder {
853 timestamp: Vec<i64>,
854 timestamp_type: ConcreteDataType,
855 sequence: Vec<u64>,
856 op_type: Vec<u8>,
857 fields: Vec<Option<FieldBuilder>>,
858 field_types: Vec<ConcreteDataType>,
859}
860
861impl ValueBuilder {
862 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
863 let timestamp_type = region_metadata
864 .time_index_column()
865 .column_schema
866 .data_type
867 .clone();
868 let sequence = Vec::with_capacity(capacity);
869 let op_type = Vec::with_capacity(capacity);
870
871 let field_types = region_metadata
872 .field_columns()
873 .map(|c| c.column_schema.data_type.clone())
874 .collect::<Vec<_>>();
875 let fields = (0..field_types.len()).map(|_| None).collect();
876 Self {
877 timestamp: Vec::with_capacity(capacity),
878 timestamp_type,
879 sequence,
880 op_type,
881 fields,
882 field_types,
883 }
884 }
885
886 pub fn num_field_builders(&self) -> usize {
888 self.fields.iter().flatten().count()
889 }
890
891 pub(crate) fn push<'a>(
897 &mut self,
898 ts: ValueRef,
899 sequence: u64,
900 op_type: u8,
901 fields: impl Iterator<Item = ValueRef<'a>>,
902 ) -> usize {
903 #[cfg(debug_assertions)]
904 let fields = {
905 let field_vec = fields.collect::<Vec<_>>();
906 debug_assert_eq!(field_vec.len(), self.fields.len());
907 field_vec.into_iter()
908 };
909
910 self.timestamp
911 .push(ts.try_into_timestamp().unwrap().unwrap().value());
912 self.sequence.push(sequence);
913 self.op_type.push(op_type);
914 let num_rows = self.timestamp.len();
915 let mut size = 0;
916 for (idx, field_value) in fields.enumerate() {
917 size += field_value.data_size();
918 if !field_value.is_null() || self.fields[idx].is_some() {
919 if let Some(field) = self.fields[idx].as_mut() {
920 let _ = field.push(field_value);
921 } else {
922 let mut mutable_vector =
923 if let ConcreteDataType::String(_) = &self.field_types[idx] {
924 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
925 } else {
926 FieldBuilder::Other(
927 self.field_types[idx]
928 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
929 )
930 };
931 mutable_vector.push_nulls(num_rows - 1);
932 mutable_vector
933 .push(field_value)
934 .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
935 self.fields[idx] = Some(mutable_vector);
936 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
937 }
938 }
939 }
940
941 size
942 }
943
944 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
947 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
948 let Some(builder) = field_dest else {
949 continue;
950 };
951 let FieldBuilder::String(builder) = builder else {
952 continue;
953 };
954 let array = field_src.to_arrow_array();
955 let string_array = array
956 .as_any()
957 .downcast_ref::<StringArray>()
958 .with_context(|| error::InvalidBatchSnafu {
959 reason: format!(
960 "Field type mismatch, expecting String, given: {}",
961 field_src.data_type()
962 ),
963 })?;
964 let space_needed = string_array.value_data().len() as i32;
965 if builder.next_offset().checked_add(space_needed).is_none() {
967 return Ok(false);
968 }
969 }
970 Ok(true)
971 }
972
973 pub(crate) fn extend(
974 &mut self,
975 ts_v: VectorRef,
976 op_type: u8,
977 sequence: u64,
978 fields: Vec<VectorRef>,
979 ) -> Result<()> {
980 let num_rows_before = self.timestamp.len();
981 let num_rows_to_write = ts_v.len();
982 self.timestamp.reserve(num_rows_to_write);
983 match self.timestamp_type {
984 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
985 self.timestamp.extend(
986 ts_v.as_any()
987 .downcast_ref::<TimestampSecondVector>()
988 .unwrap()
989 .iter_data()
990 .map(|v| v.unwrap().0.value()),
991 );
992 }
993 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
994 self.timestamp.extend(
995 ts_v.as_any()
996 .downcast_ref::<TimestampMillisecondVector>()
997 .unwrap()
998 .iter_data()
999 .map(|v| v.unwrap().0.value()),
1000 );
1001 }
1002 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1003 self.timestamp.extend(
1004 ts_v.as_any()
1005 .downcast_ref::<TimestampMicrosecondVector>()
1006 .unwrap()
1007 .iter_data()
1008 .map(|v| v.unwrap().0.value()),
1009 );
1010 }
1011 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1012 self.timestamp.extend(
1013 ts_v.as_any()
1014 .downcast_ref::<TimestampNanosecondVector>()
1015 .unwrap()
1016 .iter_data()
1017 .map(|v| v.unwrap().0.value()),
1018 );
1019 }
1020 _ => unreachable!(),
1021 };
1022
1023 self.op_type.reserve(num_rows_to_write);
1024 self.op_type
1025 .extend(iter::repeat_n(op_type, num_rows_to_write));
1026 self.sequence.reserve(num_rows_to_write);
1027 self.sequence
1028 .extend(iter::repeat_n(sequence, num_rows_to_write));
1029
1030 for (field_idx, (field_src, field_dest)) in
1031 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1032 {
1033 let builder = field_dest.get_or_insert_with(|| {
1034 let mut field_builder =
1035 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1036 field_builder.push_nulls(num_rows_before);
1037 field_builder
1038 });
1039 match builder {
1040 FieldBuilder::String(builder) => {
1041 let array = field_src.to_arrow_array();
1042 let string_array =
1043 array
1044 .as_any()
1045 .downcast_ref::<StringArray>()
1046 .with_context(|| error::InvalidBatchSnafu {
1047 reason: format!(
1048 "Field type mismatch, expecting String, given: {}",
1049 field_src.data_type()
1050 ),
1051 })?;
1052 builder.append_array(string_array);
1053 }
1054 FieldBuilder::Other(builder) => {
1055 let len = field_src.len();
1056 builder
1057 .extend_slice_of(&*field_src, 0, len)
1058 .context(error::ComputeVectorSnafu)?;
1059 }
1060 }
1061 }
1062 Ok(())
1063 }
1064
1065 fn len(&self) -> usize {
1067 let sequence_len = self.sequence.len();
1068 debug_assert_eq!(sequence_len, self.op_type.len());
1069 debug_assert_eq!(sequence_len, self.timestamp.len());
1070 sequence_len
1071 }
1072
1073 fn finish_cloned(&self) -> Values {
1074 let num_rows = self.sequence.len();
1075 let fields = self
1076 .fields
1077 .iter()
1078 .enumerate()
1079 .map(|(i, v)| {
1080 if let Some(v) = v {
1081 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1082 v.finish_cloned()
1083 } else {
1084 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1085 single_null.push_nulls(num_rows);
1086 single_null.to_vector()
1087 }
1088 })
1089 .collect::<Vec<_>>();
1090
1091 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1092 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1093 let timestamp: VectorRef = match self.timestamp_type {
1094 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1095 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1096 }
1097 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1098 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1099 }
1100 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1101 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1102 }
1103 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1104 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1105 }
1106 _ => unreachable!(),
1107 };
1108
1109 if cfg!(debug_assertions) {
1110 debug_assert_eq!(timestamp.len(), sequence.len());
1111 debug_assert_eq!(timestamp.len(), op_type.len());
1112 for field in &fields {
1113 debug_assert_eq!(timestamp.len(), field.len());
1114 }
1115 }
1116
1117 Values {
1118 timestamp,
1119 sequence,
1120 op_type,
1121 fields,
1122 }
1123 }
1124}
1125
1126#[derive(Clone)]
1128pub struct Values {
1129 pub(crate) timestamp: VectorRef,
1130 pub(crate) sequence: Arc<UInt64Vector>,
1131 pub(crate) op_type: Arc<UInt8Vector>,
1132 pub(crate) fields: Vec<VectorRef>,
1133}
1134
1135impl Values {
1136 pub fn to_batch(
1139 &self,
1140 primary_key: &[u8],
1141 metadata: &RegionMetadataRef,
1142 projection: &HashSet<ColumnId>,
1143 sequence: Option<SequenceRange>,
1144 dedup: bool,
1145 merge_mode: MergeMode,
1146 ) -> Result<Batch> {
1147 let builder = BatchBuilder::with_required_columns(
1148 primary_key.to_vec(),
1149 self.timestamp.clone(),
1150 self.sequence.clone(),
1151 self.op_type.clone(),
1152 );
1153
1154 let fields = metadata
1155 .field_columns()
1156 .zip(self.fields.iter())
1157 .filter_map(|(c, f)| {
1158 projection.get(&c.column_id).map(|c| BatchColumn {
1159 column_id: *c,
1160 data: f.clone(),
1161 })
1162 })
1163 .collect();
1164
1165 let mut batch = builder.with_fields(fields).build()?;
1166 batch.filter_by_sequence(sequence)?;
1170
1171 match (dedup, merge_mode) {
1172 (false, _) => batch.sort(false)?,
1174 (true, MergeMode::LastRow) => batch.sort(true)?,
1176 (true, MergeMode::LastNonNull) => {
1178 batch.sort(false)?;
1179 batch.merge_last_non_null()?;
1180 }
1181 }
1182 Ok(batch)
1183 }
1184
1185 fn columns(&self) -> Vec<ArrayRef> {
1187 let mut res = Vec::with_capacity(3 + self.fields.len());
1188 res.push(self.timestamp.to_arrow_array());
1189 res.push(self.sequence.to_arrow_array());
1190 res.push(self.op_type.to_arrow_array());
1191 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1192 res
1193 }
1194
1195 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1197 debug_assert!(cols.len() >= 3);
1198 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1199 let sequence =
1200 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1201 let op_type =
1202 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1203 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1204
1205 Ok(Self {
1206 timestamp,
1207 sequence,
1208 op_type,
1209 fields,
1210 })
1211 }
1212}
1213
1214impl From<ValueBuilder> for Values {
1215 fn from(mut value: ValueBuilder) -> Self {
1216 let num_rows = value.len();
1217 let fields = value
1218 .fields
1219 .iter_mut()
1220 .enumerate()
1221 .map(|(i, v)| {
1222 if let Some(v) = v {
1223 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1224 v.finish()
1225 } else {
1226 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1227 single_null.push_nulls(num_rows);
1228 single_null.to_vector()
1229 }
1230 })
1231 .collect::<Vec<_>>();
1232
1233 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1234 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1235 let timestamp: VectorRef = match value.timestamp_type {
1236 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1237 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1238 }
1239 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1240 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1241 }
1242 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1243 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1244 }
1245 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1246 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1247 }
1248 _ => unreachable!(),
1249 };
1250
1251 if cfg!(debug_assertions) {
1252 debug_assert_eq!(timestamp.len(), sequence.len());
1253 debug_assert_eq!(timestamp.len(), op_type.len());
1254 for field in &fields {
1255 debug_assert_eq!(timestamp.len(), field.len());
1256 }
1257 }
1258
1259 Self {
1260 timestamp,
1261 sequence,
1262 op_type,
1263 fields,
1264 }
1265 }
1266}
1267
1268struct TimeSeriesIterBuilder {
1269 series_set: SeriesSet,
1270 projection: HashSet<ColumnId>,
1271 predicate: Option<Predicate>,
1272 dedup: bool,
1273 sequence: Option<SequenceRange>,
1274 merge_mode: MergeMode,
1275}
1276
1277impl IterBuilder for TimeSeriesIterBuilder {
1278 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1279 let iter = self.series_set.iter_series(
1280 self.projection.clone(),
1281 self.predicate.clone(),
1282 self.dedup,
1283 self.merge_mode,
1284 self.sequence,
1285 metrics,
1286 )?;
1287 if self.merge_mode == MergeMode::LastNonNull {
1288 let iter = LastNonNullIter::new(iter);
1289 Ok(Box::new(iter))
1290 } else {
1291 Ok(Box::new(iter))
1292 }
1293 }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use std::collections::{HashMap, HashSet};
1299
1300 use api::helper::ColumnDataTypeWrapper;
1301 use api::v1::helper::row;
1302 use api::v1::value::ValueData;
1303 use api::v1::{Mutation, Rows, SemanticType};
1304 use common_time::Timestamp;
1305 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1306 use datatypes::schema::ColumnSchema;
1307 use datatypes::value::{OrderedFloat, Value};
1308 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1309 use mito_codec::row_converter::SortField;
1310 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1311 use store_api::storage::RegionId;
1312
1313 use super::*;
1314 use crate::test_util::column_metadata_to_column_schema;
1315
1316 fn schema_for_test() -> RegionMetadataRef {
1317 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1318 builder
1319 .push_column_metadata(ColumnMetadata {
1320 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1321 semantic_type: SemanticType::Tag,
1322 column_id: 0,
1323 })
1324 .push_column_metadata(ColumnMetadata {
1325 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1326 semantic_type: SemanticType::Tag,
1327 column_id: 1,
1328 })
1329 .push_column_metadata(ColumnMetadata {
1330 column_schema: ColumnSchema::new(
1331 "ts",
1332 ConcreteDataType::timestamp_millisecond_datatype(),
1333 false,
1334 ),
1335 semantic_type: SemanticType::Timestamp,
1336 column_id: 2,
1337 })
1338 .push_column_metadata(ColumnMetadata {
1339 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1340 semantic_type: SemanticType::Field,
1341 column_id: 3,
1342 })
1343 .push_column_metadata(ColumnMetadata {
1344 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1345 semantic_type: SemanticType::Field,
1346 column_id: 4,
1347 })
1348 .primary_key(vec![0, 1]);
1349 let region_metadata = builder.build().unwrap();
1350 Arc::new(region_metadata)
1351 }
1352
1353 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1354 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1355 }
1356
1357 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1358 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1359 }
1360
1361 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1362 let ts = values
1363 .timestamp
1364 .as_any()
1365 .downcast_ref::<TimestampMillisecondVector>()
1366 .unwrap();
1367
1368 let v0 = values.fields[0]
1369 .as_any()
1370 .downcast_ref::<Int64Vector>()
1371 .unwrap();
1372 let v1 = values.fields[1]
1373 .as_any()
1374 .downcast_ref::<Float64Vector>()
1375 .unwrap();
1376 let read = ts
1377 .iter_data()
1378 .zip(values.sequence.iter_data())
1379 .zip(values.op_type.iter_data())
1380 .zip(v0.iter_data())
1381 .zip(v1.iter_data())
1382 .map(|((((ts, sequence), op_type), v0), v1)| {
1383 (
1384 ts.unwrap().0.value(),
1385 sequence.unwrap(),
1386 op_type.unwrap(),
1387 v0.unwrap(),
1388 v1.unwrap(),
1389 )
1390 })
1391 .collect::<Vec<_>>();
1392 assert_eq!(expect, &read);
1393 }
1394
1395 #[test]
1396 fn test_series() {
1397 let region_metadata = schema_for_test();
1398 let mut series = Series::new(®ion_metadata);
1399 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1400 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1401 assert_eq!(2, series.active.timestamp.len());
1402 assert_eq!(0, series.frozen.len());
1403
1404 let values = series.compact(®ion_metadata).unwrap();
1405 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1406 assert_eq!(0, series.active.timestamp.len());
1407 assert_eq!(1, series.frozen.len());
1408 }
1409
1410 #[test]
1411 fn test_series_with_nulls() {
1412 let region_metadata = schema_for_test();
1413 let mut series = Series::new(®ion_metadata);
1414 series.push(
1417 ts_value_ref(1),
1418 0,
1419 OpType::Put,
1420 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1421 );
1422 series.push(
1423 ts_value_ref(1),
1424 0,
1425 OpType::Put,
1426 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1427 );
1428 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1429 series.push(
1430 ts_value_ref(1),
1431 3,
1432 OpType::Put,
1433 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1434 );
1435 assert_eq!(4, series.active.timestamp.len());
1436 assert_eq!(0, series.frozen.len());
1437
1438 let values = series.compact(®ion_metadata).unwrap();
1439 assert_eq!(values.fields[0].null_count(), 1);
1440 assert_eq!(values.fields[1].null_count(), 3);
1441 assert_eq!(0, series.active.timestamp.len());
1442 assert_eq!(1, series.frozen.len());
1443 }
1444
1445 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1446 let ts_len = batch.timestamps().len();
1447 assert_eq!(batch.sequences().len(), ts_len);
1448 assert_eq!(batch.op_types().len(), ts_len);
1449 for f in batch.fields() {
1450 assert_eq!(f.data.len(), ts_len);
1451 }
1452
1453 let mut rows = vec![];
1454 for idx in 0..ts_len {
1455 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1456 row.push(batch.timestamps().get(idx));
1457 row.push(batch.sequences().get(idx));
1458 row.push(batch.op_types().get(idx));
1459 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1460 rows.push(row);
1461 }
1462
1463 assert_eq!(expect.len(), rows.len());
1464 for (idx, row) in rows.iter().enumerate() {
1465 assert_eq!(&expect[idx], row);
1466 }
1467 }
1468
1469 #[test]
1470 fn test_values_sort() {
1471 let schema = schema_for_test();
1472 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1473 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1474 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1475
1476 let fields = vec![
1477 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1478 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1479 ];
1480 let values = Values {
1481 timestamp: timestamp as Arc<_>,
1482 sequence,
1483 op_type,
1484 fields,
1485 };
1486
1487 let batch = values
1488 .to_batch(
1489 b"test",
1490 &schema,
1491 &[0, 1, 2, 3, 4].into_iter().collect(),
1492 None,
1493 true,
1494 MergeMode::LastRow,
1495 )
1496 .unwrap();
1497 check_value(
1498 &batch,
1499 vec![
1500 vec![
1501 Value::Timestamp(Timestamp::new_millisecond(1)),
1502 Value::UInt64(1),
1503 Value::UInt8(1),
1504 Value::Int64(4),
1505 Value::Float64(OrderedFloat(1.1)),
1506 ],
1507 vec![
1508 Value::Timestamp(Timestamp::new_millisecond(2)),
1509 Value::UInt64(1),
1510 Value::UInt8(1),
1511 Value::Int64(3),
1512 Value::Float64(OrderedFloat(2.1)),
1513 ],
1514 vec![
1515 Value::Timestamp(Timestamp::new_millisecond(3)),
1516 Value::UInt64(2),
1517 Value::UInt8(0),
1518 Value::Int64(2),
1519 Value::Float64(OrderedFloat(4.2)),
1520 ],
1521 vec![
1522 Value::Timestamp(Timestamp::new_millisecond(4)),
1523 Value::UInt64(1),
1524 Value::UInt8(1),
1525 Value::Int64(1),
1526 Value::Float64(OrderedFloat(3.3)),
1527 ],
1528 ],
1529 )
1530 }
1531
1532 #[test]
1533 fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1534 let schema = schema_for_test();
1535 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1536
1537 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1544 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1545 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1546 let fields = vec![
1547 Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1548 Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1549 ];
1550 let values = Values {
1551 timestamp: timestamp as Arc<_>,
1552 sequence,
1553 op_type,
1554 fields,
1555 };
1556
1557 let batch = values
1558 .to_batch(
1559 b"test",
1560 &schema,
1561 &projection,
1562 Some(SequenceRange::LtEq { max: 2 }),
1563 true,
1564 MergeMode::LastNonNull,
1565 )
1566 .unwrap();
1567
1568 check_value(
1569 &batch,
1570 vec![vec![
1571 Value::Timestamp(Timestamp::new_millisecond(1)),
1572 Value::UInt64(2),
1573 Value::UInt8(OpType::Put as u8),
1574 Value::Int64(10),
1575 Value::Float64(OrderedFloat(1.5)),
1576 ]],
1577 );
1578 }
1579
1580 #[test]
1581 fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1582 let schema = schema_for_test();
1583 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1584
1585 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1591 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1592 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1593 let fields = vec![
1594 Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1595 Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1596 ];
1597 let values = Values {
1598 timestamp: timestamp as Arc<_>,
1599 sequence,
1600 op_type,
1601 fields,
1602 };
1603
1604 let batch = values
1605 .to_batch(
1606 b"test",
1607 &schema,
1608 &projection,
1609 Some(SequenceRange::Gt { min: 1 }),
1610 true,
1611 MergeMode::LastNonNull,
1612 )
1613 .unwrap();
1614
1615 check_value(
1616 &batch,
1617 vec![vec![
1618 Value::Timestamp(Timestamp::new_millisecond(1)),
1619 Value::UInt64(2),
1620 Value::UInt8(OpType::Put as u8),
1621 Value::Null,
1622 Value::Float64(OrderedFloat(1.0)),
1623 ]],
1624 );
1625 }
1626
1627 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1628 let column_schema = schema
1629 .column_metadatas
1630 .iter()
1631 .map(|c| api::v1::ColumnSchema {
1632 column_name: c.column_schema.name.clone(),
1633 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1634 .unwrap()
1635 .datatype() as i32,
1636 semantic_type: c.semantic_type as i32,
1637 ..Default::default()
1638 })
1639 .collect();
1640
1641 let rows = (0..len)
1642 .map(|i| {
1643 row(vec![
1644 ValueData::StringValue(k0.clone()),
1645 ValueData::I64Value(k1),
1646 ValueData::TimestampMillisecondValue(i as i64),
1647 ValueData::I64Value(i as i64),
1648 ValueData::F64Value(i as f64),
1649 ])
1650 })
1651 .collect();
1652 let mutation = api::v1::Mutation {
1653 op_type: 1,
1654 sequence: 0,
1655 rows: Some(Rows {
1656 schema: column_schema,
1657 rows,
1658 }),
1659 write_hint: None,
1660 };
1661 KeyValues::new(schema.as_ref(), mutation).unwrap()
1662 }
1663
1664 #[test]
1665 fn test_series_set_concurrency() {
1666 let schema = schema_for_test();
1667 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1668 schema
1669 .primary_key_columns()
1670 .map(|c| {
1671 (
1672 c.column_id,
1673 SortField::new(c.column_schema.data_type.clone()),
1674 )
1675 })
1676 .collect(),
1677 ));
1678 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1679
1680 let concurrency = 32;
1681 let pk_num = concurrency * 2;
1682 let mut handles = Vec::with_capacity(concurrency);
1683 for i in 0..concurrency {
1684 let set = set.clone();
1685 let schema = schema.clone();
1686 let column_schemas = schema
1687 .column_metadatas
1688 .iter()
1689 .map(column_metadata_to_column_schema)
1690 .collect::<Vec<_>>();
1691 let handle = std::thread::spawn(move || {
1692 for j in i * 100..(i + 1) * 100 {
1693 let pk = j % pk_num;
1694 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1695
1696 let kvs = KeyValues::new(
1697 &schema,
1698 Mutation {
1699 op_type: OpType::Put as i32,
1700 sequence: j as u64,
1701 rows: Some(Rows {
1702 schema: column_schemas.clone(),
1703 rows: vec![row(vec![
1704 ValueData::StringValue(format!("{}", j)),
1705 ValueData::I64Value(j as i64),
1706 ValueData::TimestampMillisecondValue(j as i64),
1707 ValueData::I64Value(j as i64),
1708 ValueData::F64Value(j as f64),
1709 ])],
1710 }),
1711 write_hint: None,
1712 },
1713 )
1714 .unwrap();
1715 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1716 }
1717 });
1718 handles.push(handle);
1719 }
1720 for h in handles {
1721 h.join().unwrap();
1722 }
1723
1724 let mut timestamps = Vec::with_capacity(concurrency * 100);
1725 let mut sequences = Vec::with_capacity(concurrency * 100);
1726 let mut op_types = Vec::with_capacity(concurrency * 100);
1727 let mut v0 = Vec::with_capacity(concurrency * 100);
1728
1729 for i in 0..pk_num {
1730 let pk = format!("pk-{}", i).as_bytes().to_vec();
1731 let series = set.get_series(&pk).unwrap();
1732 let mut guard = series.write().unwrap();
1733 let values = guard.compact(&schema).unwrap();
1734 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1735 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1736 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1737 v0.extend(
1738 values
1739 .fields
1740 .first()
1741 .unwrap()
1742 .as_any()
1743 .downcast_ref::<Int64Vector>()
1744 .unwrap()
1745 .iter_data()
1746 .map(|v| v.unwrap()),
1747 );
1748 }
1749
1750 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1751 assert_eq!(
1752 expected_sequence,
1753 sequences.iter().copied().collect::<HashSet<_>>()
1754 );
1755
1756 op_types.iter().all(|op| *op == OpType::Put as u8);
1757 assert_eq!(
1758 expected_sequence,
1759 timestamps.iter().copied().collect::<HashSet<_>>()
1760 );
1761
1762 assert_eq!(timestamps, sequences);
1763 assert_eq!(v0, timestamps);
1764 }
1765
1766 #[test]
1767 fn test_memtable() {
1768 common_telemetry::init_default_ut_logging();
1769 check_memtable_dedup(true);
1770 check_memtable_dedup(false);
1771 }
1772
1773 fn check_memtable_dedup(dedup: bool) {
1774 let schema = schema_for_test();
1775 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1776 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1777 memtable.write(&kvs).unwrap();
1778 memtable.write(&kvs).unwrap();
1779
1780 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1781 for ts in kvs.iter().map(|kv| {
1782 kv.timestamp()
1783 .try_into_timestamp()
1784 .unwrap()
1785 .unwrap()
1786 .value()
1787 }) {
1788 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1789 }
1790
1791 let iter = memtable.iter(None, None, None).unwrap();
1792 let mut read = HashMap::new();
1793
1794 for ts in iter
1795 .flat_map(|batch| {
1796 batch
1797 .unwrap()
1798 .timestamps()
1799 .as_any()
1800 .downcast_ref::<TimestampMillisecondVector>()
1801 .unwrap()
1802 .iter_data()
1803 .collect::<Vec<_>>()
1804 .into_iter()
1805 })
1806 .map(|v| v.unwrap().0.value())
1807 {
1808 *read.entry(ts).or_default() += 1;
1809 }
1810 assert_eq!(expected_ts, read);
1811
1812 let stats = memtable.stats();
1813 assert!(stats.bytes_allocated() > 0);
1814 assert_eq!(
1815 Some((
1816 Timestamp::new_millisecond(0),
1817 Timestamp::new_millisecond(99)
1818 )),
1819 stats.time_range()
1820 );
1821 }
1822
1823 #[test]
1824 fn test_memtable_projection() {
1825 common_telemetry::init_default_ut_logging();
1826 let schema = schema_for_test();
1827 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1828 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1829 memtable.write(&kvs).unwrap();
1830
1831 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1832
1833 let mut v0_all = vec![];
1834
1835 for res in iter {
1836 let batch = res.unwrap();
1837 assert_eq!(1, batch.fields().len());
1838 let v0 = batch
1839 .fields()
1840 .first()
1841 .unwrap()
1842 .data
1843 .as_any()
1844 .downcast_ref::<Int64Vector>()
1845 .unwrap();
1846 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1847 }
1848 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1849 }
1850
1851 #[test]
1852 fn test_memtable_concurrent_write_read() {
1853 common_telemetry::init_default_ut_logging();
1854 let schema = schema_for_test();
1855 let memtable = Arc::new(TimeSeriesMemtable::new(
1856 schema.clone(),
1857 42,
1858 None,
1859 true,
1860 MergeMode::LastRow,
1861 ));
1862
1863 let num_writers = 10;
1865 let num_readers = 5;
1867 let series_per_writer = 100;
1869 let rows_per_series = 10;
1871 let total_series = num_writers * series_per_writer;
1873
1874 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1876
1877 let mut writer_handles = Vec::with_capacity(num_writers);
1879 for writer_id in 0..num_writers {
1880 let memtable = memtable.clone();
1881 let schema = schema.clone();
1882 let barrier = barrier.clone();
1883
1884 let handle = std::thread::spawn(move || {
1885 barrier.wait();
1887
1888 for series_id in 0..series_per_writer {
1890 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1891 let kvs =
1892 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1893 memtable.write(&kvs).unwrap();
1894 }
1895 });
1896
1897 writer_handles.push(handle);
1898 }
1899
1900 let mut reader_handles = Vec::with_capacity(num_readers);
1902 for _ in 0..num_readers {
1903 let memtable = memtable.clone();
1904 let barrier = barrier.clone();
1905
1906 let handle = std::thread::spawn(move || {
1907 barrier.wait();
1908
1909 for _ in 0..10 {
1910 let iter = memtable.iter(None, None, None).unwrap();
1911 for batch_result in iter {
1912 let _ = batch_result.unwrap();
1913 }
1914 }
1915 });
1916
1917 reader_handles.push(handle);
1918 }
1919
1920 barrier.wait();
1921
1922 for handle in writer_handles {
1923 handle.join().unwrap();
1924 }
1925 for handle in reader_handles {
1926 handle.join().unwrap();
1927 }
1928
1929 let iter = memtable.iter(None, None, None).unwrap();
1930 let mut series_count = 0;
1931 let mut row_count = 0;
1932
1933 for batch_result in iter {
1934 let batch = batch_result.unwrap();
1935 series_count += 1;
1936 row_count += batch.num_rows();
1937 }
1938 assert_eq!(total_series, series_count);
1939 assert_eq!(total_series * rows_per_series, row_count);
1940 }
1941}