1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
55 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
56 PredicateGroup,
57};
58use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
59use crate::read::dedup::LastNonNullIter;
60use crate::read::{Batch, BatchBuilder, BatchColumn};
61use crate::region::options::MergeMode;
62
63const INITIAL_BUILDER_CAPACITY: usize = 4;
65
66const BUILDER_CAPACITY: usize = 512;
68
69#[derive(Debug, Default)]
71pub struct TimeSeriesMemtableBuilder {
72 write_buffer_manager: Option<WriteBufferManagerRef>,
73 dedup: bool,
74 merge_mode: MergeMode,
75}
76
77impl TimeSeriesMemtableBuilder {
78 pub fn new(
80 write_buffer_manager: Option<WriteBufferManagerRef>,
81 dedup: bool,
82 merge_mode: MergeMode,
83 ) -> Self {
84 Self {
85 write_buffer_manager,
86 dedup,
87 merge_mode,
88 }
89 }
90}
91
92impl MemtableBuilder for TimeSeriesMemtableBuilder {
93 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
94 if metadata.primary_key.is_empty() {
95 Arc::new(SimpleBulkMemtable::new(
96 id,
97 metadata.clone(),
98 self.write_buffer_manager.clone(),
99 self.dedup,
100 self.merge_mode,
101 ))
102 } else {
103 Arc::new(TimeSeriesMemtable::new(
104 metadata.clone(),
105 id,
106 self.write_buffer_manager.clone(),
107 self.dedup,
108 self.merge_mode,
109 ))
110 }
111 }
112}
113
114pub struct TimeSeriesMemtable {
116 id: MemtableId,
117 region_metadata: RegionMetadataRef,
118 row_codec: Arc<DensePrimaryKeyCodec>,
119 series_set: SeriesSet,
120 alloc_tracker: AllocTracker,
121 max_timestamp: AtomicI64,
122 min_timestamp: AtomicI64,
123 max_sequence: AtomicU64,
124 dedup: bool,
125 merge_mode: MergeMode,
126 num_rows: AtomicUsize,
128}
129
130impl TimeSeriesMemtable {
131 pub fn new(
132 region_metadata: RegionMetadataRef,
133 id: MemtableId,
134 write_buffer_manager: Option<WriteBufferManagerRef>,
135 dedup: bool,
136 merge_mode: MergeMode,
137 ) -> Self {
138 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
139 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
140 let dedup = if merge_mode == MergeMode::LastNonNull {
141 false
142 } else {
143 dedup
144 };
145 Self {
146 id,
147 region_metadata,
148 series_set,
149 row_codec,
150 alloc_tracker: AllocTracker::new(write_buffer_manager),
151 max_timestamp: AtomicI64::new(i64::MIN),
152 min_timestamp: AtomicI64::new(i64::MAX),
153 max_sequence: AtomicU64::new(0),
154 dedup,
155 merge_mode,
156 num_rows: Default::default(),
157 }
158 }
159
160 fn update_stats(&self, stats: WriteMetrics) {
162 self.alloc_tracker
163 .on_allocation(stats.key_bytes + stats.value_bytes);
164 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
165 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
166 self.max_sequence
167 .fetch_max(stats.max_sequence, Ordering::SeqCst);
168 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
169 }
170
171 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
172 ensure!(
173 self.row_codec.num_fields() == kv.num_primary_keys(),
174 PrimaryKeyLengthMismatchSnafu {
175 expect: self.row_codec.num_fields(),
176 actual: kv.num_primary_keys(),
177 }
178 );
179
180 let primary_key_encoded = self
181 .row_codec
182 .encode(kv.primary_keys())
183 .context(EncodeSnafu)?;
184
185 let (key_allocated, value_allocated) =
186 self.series_set.push_to_series(primary_key_encoded, &kv);
187 stats.key_bytes += key_allocated;
188 stats.value_bytes += value_allocated;
189
190 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
192 stats.min_ts = stats.min_ts.min(ts);
193 stats.max_ts = stats.max_ts.max(ts);
194 Ok(())
195 }
196}
197
198impl Debug for TimeSeriesMemtable {
199 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("TimeSeriesMemtable").finish()
201 }
202}
203
204impl Memtable for TimeSeriesMemtable {
205 fn id(&self) -> MemtableId {
206 self.id
207 }
208
209 fn write(&self, kvs: &KeyValues) -> Result<()> {
210 if kvs.is_empty() {
211 return Ok(());
212 }
213
214 let mut local_stats = WriteMetrics::default();
215
216 for kv in kvs.iter() {
217 self.write_key_value(kv, &mut local_stats)?;
218 }
219 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
220 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
221 local_stats.max_sequence = kvs.max_sequence();
222 local_stats.num_rows = kvs.num_rows();
223 self.update_stats(local_stats);
227 Ok(())
228 }
229
230 fn write_one(&self, key_value: KeyValue) -> Result<()> {
231 let mut metrics = WriteMetrics::default();
232 let res = self.write_key_value(key_value, &mut metrics);
233 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
234 metrics.max_sequence = key_value.sequence();
235 metrics.num_rows = 1;
236
237 if res.is_ok() {
238 self.update_stats(metrics);
239 }
240 res
241 }
242
243 fn write_bulk(&self, part: BulkPart) -> Result<()> {
244 let mutation = part.to_mutation(&self.region_metadata)?;
246 let mut metrics = WriteMetrics::default();
247 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
248 for kv in key_values.iter() {
249 self.write_key_value(kv, &mut metrics)?
250 }
251 }
252
253 metrics.max_sequence = part.sequence;
254 metrics.max_ts = part.max_ts;
255 metrics.min_ts = part.min_ts;
256 metrics.num_rows = part.num_rows();
257 self.update_stats(metrics);
258 Ok(())
259 }
260
261 fn iter(
262 &self,
263 projection: Option<&[ColumnId]>,
264 filters: Option<Predicate>,
265 sequence: Option<SequenceNumber>,
266 ) -> Result<BoxedBatchIterator> {
267 let projection = if let Some(projection) = projection {
268 projection.iter().copied().collect()
269 } else {
270 self.region_metadata
271 .field_columns()
272 .map(|c| c.column_id)
273 .collect()
274 };
275
276 let iter = self
277 .series_set
278 .iter_series(projection, filters, self.dedup, sequence)?;
279
280 if self.merge_mode == MergeMode::LastNonNull {
281 let iter = LastNonNullIter::new(iter);
282 Ok(Box::new(iter))
283 } else {
284 Ok(Box::new(iter))
285 }
286 }
287
288 fn ranges(
289 &self,
290 projection: Option<&[ColumnId]>,
291 predicate: PredicateGroup,
292 sequence: Option<SequenceNumber>,
293 ) -> Result<MemtableRanges> {
294 let projection = if let Some(projection) = projection {
295 projection.iter().copied().collect()
296 } else {
297 self.region_metadata
298 .field_columns()
299 .map(|c| c.column_id)
300 .collect()
301 };
302 let builder = Box::new(TimeSeriesIterBuilder {
303 series_set: self.series_set.clone(),
304 projection,
305 predicate: predicate.predicate().cloned(),
306 dedup: self.dedup,
307 merge_mode: self.merge_mode,
308 sequence,
309 });
310 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
311
312 Ok(MemtableRanges {
313 ranges: [(0, MemtableRange::new(context))].into(),
314 stats: self.stats(),
315 })
316 }
317
318 fn is_empty(&self) -> bool {
319 self.series_set.series.read().unwrap().is_empty()
320 }
321
322 fn freeze(&self) -> Result<()> {
323 self.alloc_tracker.done_allocating();
324
325 Ok(())
326 }
327
328 fn stats(&self) -> MemtableStats {
329 let estimated_bytes = self.alloc_tracker.bytes_allocated();
330
331 if estimated_bytes == 0 {
332 return MemtableStats {
334 estimated_bytes,
335 time_range: None,
336 num_rows: 0,
337 num_ranges: 0,
338 max_sequence: 0,
339 };
340 }
341 let ts_type = self
342 .region_metadata
343 .time_index_column()
344 .column_schema
345 .data_type
346 .clone()
347 .as_timestamp()
348 .expect("Timestamp column must have timestamp type");
349 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
350 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
351 MemtableStats {
352 estimated_bytes,
353 time_range: Some((min_timestamp, max_timestamp)),
354 num_rows: self.num_rows.load(Ordering::Relaxed),
355 num_ranges: 1,
356 max_sequence: self.max_sequence.load(Ordering::Relaxed),
357 }
358 }
359
360 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
361 Arc::new(TimeSeriesMemtable::new(
362 metadata.clone(),
363 id,
364 self.alloc_tracker.write_buffer_manager(),
365 self.dedup,
366 self.merge_mode,
367 ))
368 }
369}
370
371type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
372
373#[derive(Clone)]
374pub(crate) struct SeriesSet {
375 pub(crate) region_metadata: RegionMetadataRef,
376 pub(crate) series: Arc<SeriesRwLockMap>,
377 pub(crate) codec: Arc<DensePrimaryKeyCodec>,
378}
379
380impl SeriesSet {
381 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
382 Self {
383 region_metadata,
384 series: Default::default(),
385 codec,
386 }
387 }
388}
389
390impl SeriesSet {
391 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
393 if let Some(series) = self.series.read().unwrap().get(&primary_key) {
394 let value_allocated = series.write().unwrap().push(
395 kv.timestamp(),
396 kv.sequence(),
397 kv.op_type(),
398 kv.fields(),
399 );
400 return (0, value_allocated);
401 };
402
403 let mut indices = self.series.write().unwrap();
404 match indices.entry(primary_key) {
405 Entry::Vacant(v) => {
406 let key_len = v.key().len();
407 let mut series = Series::new(&self.region_metadata);
408 let value_allocated =
409 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
410 v.insert(Arc::new(RwLock::new(series)));
411 (key_len, value_allocated)
412 }
413 Entry::Occupied(v) => {
415 let value_allocated = v.get().write().unwrap().push(
416 kv.timestamp(),
417 kv.sequence(),
418 kv.op_type(),
419 kv.fields(),
420 );
421 (0, value_allocated)
422 }
423 }
424 }
425
426 #[cfg(test)]
427 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
428 self.series.read().unwrap().get(primary_key).cloned()
429 }
430
431 fn iter_series(
433 &self,
434 projection: HashSet<ColumnId>,
435 predicate: Option<Predicate>,
436 dedup: bool,
437 sequence: Option<SequenceNumber>,
438 ) -> Result<Iter> {
439 let primary_key_schema = primary_key_schema(&self.region_metadata);
440 let primary_key_datatypes = self
441 .region_metadata
442 .primary_key_columns()
443 .map(|pk| pk.column_schema.data_type.clone())
444 .collect();
445
446 Iter::try_new(
447 self.region_metadata.clone(),
448 self.series.clone(),
449 projection,
450 predicate,
451 primary_key_schema,
452 primary_key_datatypes,
453 self.codec.clone(),
454 dedup,
455 sequence,
456 )
457 }
458}
459
460pub(crate) fn primary_key_schema(
463 region_metadata: &RegionMetadataRef,
464) -> arrow::datatypes::SchemaRef {
465 let fields = region_metadata
466 .primary_key_columns()
467 .map(|pk| {
468 arrow::datatypes::Field::new(
469 pk.column_schema.name.clone(),
470 pk.column_schema.data_type.as_arrow_type(),
471 pk.column_schema.is_nullable(),
472 )
473 })
474 .collect::<Vec<_>>();
475 Arc::new(arrow::datatypes::Schema::new(fields))
476}
477
478#[derive(Debug, Default)]
480struct Metrics {
481 total_series: usize,
483 num_pruned_series: usize,
485 num_rows: usize,
487 num_batches: usize,
489 scan_cost: Duration,
491}
492
493struct Iter {
494 metadata: RegionMetadataRef,
495 series: Arc<SeriesRwLockMap>,
496 projection: HashSet<ColumnId>,
497 last_key: Option<Vec<u8>>,
498 predicate: Vec<SimpleFilterEvaluator>,
499 pk_schema: arrow::datatypes::SchemaRef,
500 pk_datatypes: Vec<ConcreteDataType>,
501 codec: Arc<DensePrimaryKeyCodec>,
502 dedup: bool,
503 sequence: Option<SequenceNumber>,
504 metrics: Metrics,
505}
506
507impl Iter {
508 #[allow(clippy::too_many_arguments)]
509 pub(crate) fn try_new(
510 metadata: RegionMetadataRef,
511 series: Arc<SeriesRwLockMap>,
512 projection: HashSet<ColumnId>,
513 predicate: Option<Predicate>,
514 pk_schema: arrow::datatypes::SchemaRef,
515 pk_datatypes: Vec<ConcreteDataType>,
516 codec: Arc<DensePrimaryKeyCodec>,
517 dedup: bool,
518 sequence: Option<SequenceNumber>,
519 ) -> Result<Self> {
520 let predicate = predicate
521 .map(|predicate| {
522 predicate
523 .exprs()
524 .iter()
525 .filter_map(SimpleFilterEvaluator::try_new)
526 .collect::<Vec<_>>()
527 })
528 .unwrap_or_default();
529 Ok(Self {
530 metadata,
531 series,
532 projection,
533 last_key: None,
534 predicate,
535 pk_schema,
536 pk_datatypes,
537 codec,
538 dedup,
539 sequence,
540 metrics: Metrics::default(),
541 })
542 }
543}
544
545impl Drop for Iter {
546 fn drop(&mut self) {
547 debug!(
548 "Iter {} time series memtable, metrics: {:?}",
549 self.metadata.region_id, self.metrics
550 );
551
552 READ_ROWS_TOTAL
553 .with_label_values(&["time_series_memtable"])
554 .inc_by(self.metrics.num_rows as u64);
555 READ_STAGE_ELAPSED
556 .with_label_values(&["scan_memtable"])
557 .observe(self.metrics.scan_cost.as_secs_f64());
558 }
559}
560
561impl Iterator for Iter {
562 type Item = Result<Batch>;
563
564 fn next(&mut self) -> Option<Self::Item> {
565 let start = Instant::now();
566 let map = self.series.read().unwrap();
567 let range = match &self.last_key {
568 None => map.range::<Vec<u8>, _>(..),
569 Some(last_key) => {
570 map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
571 }
572 };
573
574 for (primary_key, series) in range {
576 self.metrics.total_series += 1;
577
578 let mut series = series.write().unwrap();
579 if !self.predicate.is_empty()
580 && !prune_primary_key(
581 &self.codec,
582 primary_key.as_slice(),
583 &mut series,
584 &self.pk_datatypes,
585 self.pk_schema.clone(),
586 &self.predicate,
587 )
588 {
589 self.metrics.num_pruned_series += 1;
591 continue;
592 }
593 self.last_key = Some(primary_key.clone());
594
595 let values = series.compact(&self.metadata);
596 let batch = values.and_then(|v| {
597 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
598 });
599
600 self.metrics.num_batches += 1;
602 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
603 self.metrics.scan_cost += start.elapsed();
604
605 let mut batch = batch;
606 batch = batch.and_then(|mut batch| {
607 batch.filter_by_sequence(self.sequence)?;
608 Ok(batch)
609 });
610 return Some(batch);
611 }
612 self.metrics.scan_cost += start.elapsed();
613
614 None
615 }
616}
617
618fn prune_primary_key(
619 codec: &Arc<DensePrimaryKeyCodec>,
620 pk: &[u8],
621 series: &mut Series,
622 datatypes: &[ConcreteDataType],
623 pk_schema: arrow::datatypes::SchemaRef,
624 predicates: &[SimpleFilterEvaluator],
625) -> bool {
626 if pk_schema.fields().is_empty() {
628 return true;
629 }
630
631 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
633 pk_values
634 } else {
635 let pk_values = codec.decode_dense_without_column_id(pk);
636 if let Err(e) = pk_values {
637 error!(e; "Failed to decode primary key");
638 return true;
639 }
640 series.update_pk_cache(pk_values.unwrap());
641 series.pk_cache.as_ref().unwrap()
642 };
643
644 let mut result = true;
646 for predicate in predicates {
647 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
649 continue;
650 };
651 let scalar_value = pk_values[index]
653 .try_to_scalar_value(&datatypes[index])
654 .unwrap();
655 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
656 }
657
658 result
659}
660
661pub(crate) struct Series {
663 pk_cache: Option<Vec<Value>>,
664 active: ValueBuilder,
665 frozen: Vec<Values>,
666 region_metadata: RegionMetadataRef,
667}
668
669impl Series {
670 pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
671 Self {
672 pk_cache: None,
673 active: ValueBuilder::new(region_metadata, builder_cap),
674 frozen: vec![],
675 region_metadata: region_metadata.clone(),
676 }
677 }
678
679 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
680 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
681 }
682
683 pub fn is_empty(&self) -> bool {
684 self.active.len() == 0 && self.frozen.is_empty()
685 }
686
687 pub(crate) fn push<'a>(
689 &mut self,
690 ts: ValueRef<'a>,
691 sequence: u64,
692 op_type: OpType,
693 values: impl Iterator<Item = ValueRef<'a>>,
694 ) -> usize {
695 if self.active.len() + 10 > BUILDER_CAPACITY {
697 let region_metadata = self.region_metadata.clone();
698 self.freeze(®ion_metadata);
699 }
700 self.active.push(ts, sequence, op_type as u8, values)
701 }
702
703 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
704 self.pk_cache = Some(pk_values);
705 }
706
707 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
709 if self.active.len() != 0 {
710 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
711 std::mem::swap(&mut self.active, &mut builder);
712 self.frozen.push(Values::from(builder));
713 }
714 }
715
716 pub(crate) fn extend(
717 &mut self,
718 ts_v: VectorRef,
719 op_type_v: u8,
720 sequence_v: u64,
721 fields: impl Iterator<Item = VectorRef>,
722 ) -> Result<()> {
723 self.active.extend(ts_v, op_type_v, sequence_v, fields)
724 }
725
726 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
729 self.freeze(region_metadata);
730
731 let frozen = &self.frozen;
732
733 debug_assert!(!frozen.is_empty());
735
736 if frozen.len() > 1 {
737 let column_size = frozen[0].fields.len() + 3;
741
742 if cfg!(debug_assertions) {
743 debug_assert!(frozen
744 .iter()
745 .zip(frozen.iter().skip(1))
746 .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
747 }
748
749 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
750 let concatenated = (0..column_size)
751 .map(|i| {
752 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
753 arrow::compute::concat(&to_concat)
754 })
755 .collect::<std::result::Result<Vec<_>, _>>()
756 .context(ComputeArrowSnafu)?;
757
758 debug_assert_eq!(concatenated.len(), column_size);
759 let values = Values::from_columns(&concatenated)?;
760 self.frozen = vec![values];
761 };
762 Ok(&self.frozen[0])
763 }
764}
765
766struct ValueBuilder {
768 timestamp: Vec<i64>,
769 timestamp_type: ConcreteDataType,
770 sequence: Vec<u64>,
771 op_type: Vec<u8>,
772 fields: Vec<Option<FieldBuilder>>,
773 field_types: Vec<ConcreteDataType>,
774}
775
776impl ValueBuilder {
777 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
778 let timestamp_type = region_metadata
779 .time_index_column()
780 .column_schema
781 .data_type
782 .clone();
783 let sequence = Vec::with_capacity(capacity);
784 let op_type = Vec::with_capacity(capacity);
785
786 let field_types = region_metadata
787 .field_columns()
788 .map(|c| c.column_schema.data_type.clone())
789 .collect::<Vec<_>>();
790 let fields = (0..field_types.len()).map(|_| None).collect();
791
792 Self {
793 timestamp: Vec::with_capacity(capacity),
794 timestamp_type,
795 sequence,
796 op_type,
797 fields,
798 field_types,
799 }
800 }
801
802 fn push<'a>(
808 &mut self,
809 ts: ValueRef,
810 sequence: u64,
811 op_type: u8,
812 fields: impl Iterator<Item = ValueRef<'a>>,
813 ) -> usize {
814 #[cfg(debug_assertions)]
815 let fields = {
816 let field_vec = fields.collect::<Vec<_>>();
817 debug_assert_eq!(field_vec.len(), self.fields.len());
818 field_vec.into_iter()
819 };
820
821 self.timestamp
822 .push(ts.as_timestamp().unwrap().unwrap().value());
823 self.sequence.push(sequence);
824 self.op_type.push(op_type);
825 let num_rows = self.timestamp.len();
826 let mut size = 0;
827 for (idx, field_value) in fields.enumerate() {
828 size += field_value.data_size();
829 if !field_value.is_null() || self.fields[idx].is_some() {
830 if let Some(field) = self.fields[idx].as_mut() {
831 let _ = field.push(field_value);
832 } else {
833 let mut mutable_vector =
834 if let ConcreteDataType::String(_) = &self.field_types[idx] {
835 FieldBuilder::String(StringBuilder::with_capacity(256, 4096))
836 } else {
837 FieldBuilder::Other(
838 self.field_types[idx]
839 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
840 )
841 };
842 mutable_vector.push_nulls(num_rows - 1);
843 let _ = mutable_vector.push(field_value);
844 self.fields[idx] = Some(mutable_vector);
845 }
846 }
847 }
848
849 size
850 }
851
852 pub(crate) fn extend(
853 &mut self,
854 ts_v: VectorRef,
855 op_type: u8,
856 sequence: u64,
857 fields: impl Iterator<Item = VectorRef>,
858 ) -> error::Result<()> {
859 let num_rows_before = self.timestamp.len();
860 let num_rows_to_write = ts_v.len();
861 self.timestamp.reserve(num_rows_to_write);
862 match self.timestamp_type {
863 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
864 self.timestamp.extend(
865 ts_v.as_any()
866 .downcast_ref::<TimestampSecondVector>()
867 .unwrap()
868 .iter_data()
869 .map(|v| v.unwrap().0.value()),
870 );
871 }
872 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
873 self.timestamp.extend(
874 ts_v.as_any()
875 .downcast_ref::<TimestampMillisecondVector>()
876 .unwrap()
877 .iter_data()
878 .map(|v| v.unwrap().0.value()),
879 );
880 }
881 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
882 self.timestamp.extend(
883 ts_v.as_any()
884 .downcast_ref::<TimestampMicrosecondVector>()
885 .unwrap()
886 .iter_data()
887 .map(|v| v.unwrap().0.value()),
888 );
889 }
890 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
891 self.timestamp.extend(
892 ts_v.as_any()
893 .downcast_ref::<TimestampNanosecondVector>()
894 .unwrap()
895 .iter_data()
896 .map(|v| v.unwrap().0.value()),
897 );
898 }
899 _ => unreachable!(),
900 };
901
902 self.op_type.reserve(num_rows_to_write);
903 self.op_type
904 .extend(iter::repeat_n(op_type, num_rows_to_write));
905 self.sequence.reserve(num_rows_to_write);
906 self.sequence
907 .extend(iter::repeat_n(sequence, num_rows_to_write));
908
909 for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
910 let builder = field_dest.get_or_insert_with(|| {
911 let mut field_builder =
912 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
913 field_builder.push_nulls(num_rows_before);
914 field_builder
915 });
916 match builder {
917 FieldBuilder::String(builder) => {
918 let array = field_src.to_arrow_array();
919 let string_array =
920 array
921 .as_any()
922 .downcast_ref::<StringArray>()
923 .with_context(|| error::InvalidBatchSnafu {
924 reason: format!(
925 "Field type mismatch, expecting String, given: {}",
926 field_src.data_type()
927 ),
928 })?;
929 builder.append_array(string_array);
930 }
931 FieldBuilder::Other(builder) => {
932 let len = field_src.len();
933 builder
934 .extend_slice_of(&*field_src, 0, len)
935 .context(error::ComputeVectorSnafu)?;
936 }
937 }
938 }
939 Ok(())
940 }
941
942 fn len(&self) -> usize {
944 let sequence_len = self.sequence.len();
945 debug_assert_eq!(sequence_len, self.op_type.len());
946 debug_assert_eq!(sequence_len, self.timestamp.len());
947 sequence_len
948 }
949}
950
951#[derive(Clone)]
953pub(crate) struct Values {
954 timestamp: VectorRef,
955 sequence: Arc<UInt64Vector>,
956 op_type: Arc<UInt8Vector>,
957 fields: Vec<VectorRef>,
958}
959
960impl Values {
961 pub fn to_batch(
964 &self,
965 primary_key: &[u8],
966 metadata: &RegionMetadataRef,
967 projection: &HashSet<ColumnId>,
968 dedup: bool,
969 ) -> Result<Batch> {
970 let builder = BatchBuilder::with_required_columns(
971 primary_key.to_vec(),
972 self.timestamp.clone(),
973 self.sequence.clone(),
974 self.op_type.clone(),
975 );
976
977 let fields = metadata
978 .field_columns()
979 .zip(self.fields.iter())
980 .filter_map(|(c, f)| {
981 projection.get(&c.column_id).map(|c| BatchColumn {
982 column_id: *c,
983 data: f.clone(),
984 })
985 })
986 .collect();
987
988 let mut batch = builder.with_fields(fields).build()?;
989 batch.sort(dedup)?;
990 Ok(batch)
991 }
992
993 fn columns(&self) -> Vec<ArrayRef> {
995 let mut res = Vec::with_capacity(3 + self.fields.len());
996 res.push(self.timestamp.to_arrow_array());
997 res.push(self.sequence.to_arrow_array());
998 res.push(self.op_type.to_arrow_array());
999 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1000 res
1001 }
1002
1003 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1005 debug_assert!(cols.len() >= 3);
1006 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1007 let sequence =
1008 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1009 let op_type =
1010 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1011 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1012
1013 Ok(Self {
1014 timestamp,
1015 sequence,
1016 op_type,
1017 fields,
1018 })
1019 }
1020}
1021
1022impl From<ValueBuilder> for Values {
1023 fn from(mut value: ValueBuilder) -> Self {
1024 let num_rows = value.len();
1025 let fields = value
1026 .fields
1027 .iter_mut()
1028 .enumerate()
1029 .map(|(i, v)| {
1030 if let Some(v) = v {
1031 v.finish()
1032 } else {
1033 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1034 single_null.push_nulls(num_rows);
1035 single_null.to_vector()
1036 }
1037 })
1038 .collect::<Vec<_>>();
1039
1040 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1041 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1042 let timestamp: VectorRef = match value.timestamp_type {
1043 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1044 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1045 }
1046 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1047 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1048 }
1049 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1050 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1051 }
1052 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1053 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1054 }
1055 _ => unreachable!(),
1056 };
1057
1058 if cfg!(debug_assertions) {
1059 debug_assert_eq!(timestamp.len(), sequence.len());
1060 debug_assert_eq!(timestamp.len(), op_type.len());
1061 for field in &fields {
1062 debug_assert_eq!(timestamp.len(), field.len());
1063 }
1064 }
1065
1066 Self {
1067 timestamp,
1068 sequence,
1069 op_type,
1070 fields,
1071 }
1072 }
1073}
1074
1075struct TimeSeriesIterBuilder {
1076 series_set: SeriesSet,
1077 projection: HashSet<ColumnId>,
1078 predicate: Option<Predicate>,
1079 dedup: bool,
1080 sequence: Option<SequenceNumber>,
1081 merge_mode: MergeMode,
1082}
1083
1084impl IterBuilder for TimeSeriesIterBuilder {
1085 fn build(&self) -> Result<BoxedBatchIterator> {
1086 let iter = self.series_set.iter_series(
1087 self.projection.clone(),
1088 self.predicate.clone(),
1089 self.dedup,
1090 self.sequence,
1091 )?;
1092
1093 if self.merge_mode == MergeMode::LastNonNull {
1094 let iter = LastNonNullIter::new(iter);
1095 Ok(Box::new(iter))
1096 } else {
1097 Ok(Box::new(iter))
1098 }
1099 }
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104 use std::collections::{HashMap, HashSet};
1105
1106 use api::helper::ColumnDataTypeWrapper;
1107 use api::v1::value::ValueData;
1108 use api::v1::{Mutation, Row, Rows, SemanticType};
1109 use common_time::Timestamp;
1110 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1111 use datatypes::schema::ColumnSchema;
1112 use datatypes::value::{OrderedFloat, Value};
1113 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1114 use mito_codec::row_converter::SortField;
1115 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1116 use store_api::storage::RegionId;
1117
1118 use super::*;
1119 use crate::test_util::column_metadata_to_column_schema;
1120
1121 fn schema_for_test() -> RegionMetadataRef {
1122 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1123 builder
1124 .push_column_metadata(ColumnMetadata {
1125 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1126 semantic_type: SemanticType::Tag,
1127 column_id: 0,
1128 })
1129 .push_column_metadata(ColumnMetadata {
1130 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1131 semantic_type: SemanticType::Tag,
1132 column_id: 1,
1133 })
1134 .push_column_metadata(ColumnMetadata {
1135 column_schema: ColumnSchema::new(
1136 "ts",
1137 ConcreteDataType::timestamp_millisecond_datatype(),
1138 false,
1139 ),
1140 semantic_type: SemanticType::Timestamp,
1141 column_id: 2,
1142 })
1143 .push_column_metadata(ColumnMetadata {
1144 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1145 semantic_type: SemanticType::Field,
1146 column_id: 3,
1147 })
1148 .push_column_metadata(ColumnMetadata {
1149 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1150 semantic_type: SemanticType::Field,
1151 column_id: 4,
1152 })
1153 .primary_key(vec![0, 1]);
1154 let region_metadata = builder.build().unwrap();
1155 Arc::new(region_metadata)
1156 }
1157
1158 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1159 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1160 }
1161
1162 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1163 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1164 }
1165
1166 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1167 let ts = values
1168 .timestamp
1169 .as_any()
1170 .downcast_ref::<TimestampMillisecondVector>()
1171 .unwrap();
1172
1173 let v0 = values.fields[0]
1174 .as_any()
1175 .downcast_ref::<Int64Vector>()
1176 .unwrap();
1177 let v1 = values.fields[1]
1178 .as_any()
1179 .downcast_ref::<Float64Vector>()
1180 .unwrap();
1181 let read = ts
1182 .iter_data()
1183 .zip(values.sequence.iter_data())
1184 .zip(values.op_type.iter_data())
1185 .zip(v0.iter_data())
1186 .zip(v1.iter_data())
1187 .map(|((((ts, sequence), op_type), v0), v1)| {
1188 (
1189 ts.unwrap().0.value(),
1190 sequence.unwrap(),
1191 op_type.unwrap(),
1192 v0.unwrap(),
1193 v1.unwrap(),
1194 )
1195 })
1196 .collect::<Vec<_>>();
1197 assert_eq!(expect, &read);
1198 }
1199
1200 #[test]
1201 fn test_series() {
1202 let region_metadata = schema_for_test();
1203 let mut series = Series::new(®ion_metadata);
1204 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1205 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1206 assert_eq!(2, series.active.timestamp.len());
1207 assert_eq!(0, series.frozen.len());
1208
1209 let values = series.compact(®ion_metadata).unwrap();
1210 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1211 assert_eq!(0, series.active.timestamp.len());
1212 assert_eq!(1, series.frozen.len());
1213 }
1214
1215 #[test]
1216 fn test_series_with_nulls() {
1217 let region_metadata = schema_for_test();
1218 let mut series = Series::new(®ion_metadata);
1219 series.push(
1222 ts_value_ref(1),
1223 0,
1224 OpType::Put,
1225 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1226 );
1227 series.push(
1228 ts_value_ref(1),
1229 0,
1230 OpType::Put,
1231 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1232 );
1233 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1234 series.push(
1235 ts_value_ref(1),
1236 3,
1237 OpType::Put,
1238 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1239 );
1240 assert_eq!(4, series.active.timestamp.len());
1241 assert_eq!(0, series.frozen.len());
1242
1243 let values = series.compact(®ion_metadata).unwrap();
1244 assert_eq!(values.fields[0].null_count(), 1);
1245 assert_eq!(values.fields[1].null_count(), 3);
1246 assert_eq!(0, series.active.timestamp.len());
1247 assert_eq!(1, series.frozen.len());
1248 }
1249
1250 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1251 let ts_len = batch.timestamps().len();
1252 assert_eq!(batch.sequences().len(), ts_len);
1253 assert_eq!(batch.op_types().len(), ts_len);
1254 for f in batch.fields() {
1255 assert_eq!(f.data.len(), ts_len);
1256 }
1257
1258 let mut rows = vec![];
1259 for idx in 0..ts_len {
1260 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1261 row.push(batch.timestamps().get(idx));
1262 row.push(batch.sequences().get(idx));
1263 row.push(batch.op_types().get(idx));
1264 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1265 rows.push(row);
1266 }
1267
1268 assert_eq!(expect.len(), rows.len());
1269 for (idx, row) in rows.iter().enumerate() {
1270 assert_eq!(&expect[idx], row);
1271 }
1272 }
1273
1274 #[test]
1275 fn test_values_sort() {
1276 let schema = schema_for_test();
1277 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1278 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1279 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1280
1281 let fields = vec![
1282 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1283 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1284 ];
1285 let values = Values {
1286 timestamp: timestamp as Arc<_>,
1287 sequence,
1288 op_type,
1289 fields,
1290 };
1291
1292 let batch = values
1293 .to_batch(
1294 b"test",
1295 &schema,
1296 &[0, 1, 2, 3, 4].into_iter().collect(),
1297 true,
1298 )
1299 .unwrap();
1300 check_value(
1301 &batch,
1302 vec![
1303 vec![
1304 Value::Timestamp(Timestamp::new_millisecond(1)),
1305 Value::UInt64(1),
1306 Value::UInt8(1),
1307 Value::Int64(4),
1308 Value::Float64(OrderedFloat(1.1)),
1309 ],
1310 vec![
1311 Value::Timestamp(Timestamp::new_millisecond(2)),
1312 Value::UInt64(1),
1313 Value::UInt8(1),
1314 Value::Int64(3),
1315 Value::Float64(OrderedFloat(2.1)),
1316 ],
1317 vec![
1318 Value::Timestamp(Timestamp::new_millisecond(3)),
1319 Value::UInt64(2),
1320 Value::UInt8(0),
1321 Value::Int64(2),
1322 Value::Float64(OrderedFloat(4.2)),
1323 ],
1324 vec![
1325 Value::Timestamp(Timestamp::new_millisecond(4)),
1326 Value::UInt64(1),
1327 Value::UInt8(1),
1328 Value::Int64(1),
1329 Value::Float64(OrderedFloat(3.3)),
1330 ],
1331 ],
1332 )
1333 }
1334
1335 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1336 let column_schema = schema
1337 .column_metadatas
1338 .iter()
1339 .map(|c| api::v1::ColumnSchema {
1340 column_name: c.column_schema.name.clone(),
1341 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1342 .unwrap()
1343 .datatype() as i32,
1344 semantic_type: c.semantic_type as i32,
1345 ..Default::default()
1346 })
1347 .collect();
1348
1349 let rows = (0..len)
1350 .map(|i| Row {
1351 values: vec![
1352 api::v1::Value {
1353 value_data: Some(ValueData::StringValue(k0.clone())),
1354 },
1355 api::v1::Value {
1356 value_data: Some(ValueData::I64Value(k1)),
1357 },
1358 api::v1::Value {
1359 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1360 },
1361 api::v1::Value {
1362 value_data: Some(ValueData::I64Value(i as i64)),
1363 },
1364 api::v1::Value {
1365 value_data: Some(ValueData::F64Value(i as f64)),
1366 },
1367 ],
1368 })
1369 .collect();
1370 let mutation = api::v1::Mutation {
1371 op_type: 1,
1372 sequence: 0,
1373 rows: Some(Rows {
1374 schema: column_schema,
1375 rows,
1376 }),
1377 write_hint: None,
1378 };
1379 KeyValues::new(schema.as_ref(), mutation).unwrap()
1380 }
1381
1382 #[test]
1383 fn test_series_set_concurrency() {
1384 let schema = schema_for_test();
1385 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1386 schema
1387 .primary_key_columns()
1388 .map(|c| {
1389 (
1390 c.column_id,
1391 SortField::new(c.column_schema.data_type.clone()),
1392 )
1393 })
1394 .collect(),
1395 ));
1396 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1397
1398 let concurrency = 32;
1399 let pk_num = concurrency * 2;
1400 let mut handles = Vec::with_capacity(concurrency);
1401 for i in 0..concurrency {
1402 let set = set.clone();
1403 let schema = schema.clone();
1404 let column_schemas = schema
1405 .column_metadatas
1406 .iter()
1407 .map(column_metadata_to_column_schema)
1408 .collect::<Vec<_>>();
1409 let handle = std::thread::spawn(move || {
1410 for j in i * 100..(i + 1) * 100 {
1411 let pk = j % pk_num;
1412 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1413
1414 let kvs = KeyValues::new(
1415 &schema,
1416 Mutation {
1417 op_type: OpType::Put as i32,
1418 sequence: j as u64,
1419 rows: Some(Rows {
1420 schema: column_schemas.clone(),
1421 rows: vec![Row {
1422 values: vec![
1423 api::v1::Value {
1424 value_data: Some(ValueData::StringValue(format!(
1425 "{}",
1426 j
1427 ))),
1428 },
1429 api::v1::Value {
1430 value_data: Some(ValueData::I64Value(j as i64)),
1431 },
1432 api::v1::Value {
1433 value_data: Some(ValueData::TimestampMillisecondValue(
1434 j as i64,
1435 )),
1436 },
1437 api::v1::Value {
1438 value_data: Some(ValueData::I64Value(j as i64)),
1439 },
1440 api::v1::Value {
1441 value_data: Some(ValueData::F64Value(j as f64)),
1442 },
1443 ],
1444 }],
1445 }),
1446 write_hint: None,
1447 },
1448 )
1449 .unwrap();
1450 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1451 }
1452 });
1453 handles.push(handle);
1454 }
1455 for h in handles {
1456 h.join().unwrap();
1457 }
1458
1459 let mut timestamps = Vec::with_capacity(concurrency * 100);
1460 let mut sequences = Vec::with_capacity(concurrency * 100);
1461 let mut op_types = Vec::with_capacity(concurrency * 100);
1462 let mut v0 = Vec::with_capacity(concurrency * 100);
1463
1464 for i in 0..pk_num {
1465 let pk = format!("pk-{}", i).as_bytes().to_vec();
1466 let series = set.get_series(&pk).unwrap();
1467 let mut guard = series.write().unwrap();
1468 let values = guard.compact(&schema).unwrap();
1469 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1470 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1471 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1472 v0.extend(
1473 values
1474 .fields
1475 .first()
1476 .unwrap()
1477 .as_any()
1478 .downcast_ref::<Int64Vector>()
1479 .unwrap()
1480 .iter_data()
1481 .map(|v| v.unwrap()),
1482 );
1483 }
1484
1485 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1486 assert_eq!(
1487 expected_sequence,
1488 sequences.iter().copied().collect::<HashSet<_>>()
1489 );
1490
1491 op_types.iter().all(|op| *op == OpType::Put as u8);
1492 assert_eq!(
1493 expected_sequence,
1494 timestamps.iter().copied().collect::<HashSet<_>>()
1495 );
1496
1497 assert_eq!(timestamps, sequences);
1498 assert_eq!(v0, timestamps);
1499 }
1500
1501 #[test]
1502 fn test_memtable() {
1503 common_telemetry::init_default_ut_logging();
1504 check_memtable_dedup(true);
1505 check_memtable_dedup(false);
1506 }
1507
1508 fn check_memtable_dedup(dedup: bool) {
1509 let schema = schema_for_test();
1510 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1511 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1512 memtable.write(&kvs).unwrap();
1513 memtable.write(&kvs).unwrap();
1514
1515 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1516 for ts in kvs
1517 .iter()
1518 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1519 {
1520 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1521 }
1522
1523 let iter = memtable.iter(None, None, None).unwrap();
1524 let mut read = HashMap::new();
1525
1526 for ts in iter
1527 .flat_map(|batch| {
1528 batch
1529 .unwrap()
1530 .timestamps()
1531 .as_any()
1532 .downcast_ref::<TimestampMillisecondVector>()
1533 .unwrap()
1534 .iter_data()
1535 .collect::<Vec<_>>()
1536 .into_iter()
1537 })
1538 .map(|v| v.unwrap().0.value())
1539 {
1540 *read.entry(ts).or_default() += 1;
1541 }
1542 assert_eq!(expected_ts, read);
1543
1544 let stats = memtable.stats();
1545 assert!(stats.bytes_allocated() > 0);
1546 assert_eq!(
1547 Some((
1548 Timestamp::new_millisecond(0),
1549 Timestamp::new_millisecond(99)
1550 )),
1551 stats.time_range()
1552 );
1553 }
1554
1555 #[test]
1556 fn test_memtable_projection() {
1557 common_telemetry::init_default_ut_logging();
1558 let schema = schema_for_test();
1559 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1560 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1561 memtable.write(&kvs).unwrap();
1562
1563 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1564
1565 let mut v0_all = vec![];
1566
1567 for res in iter {
1568 let batch = res.unwrap();
1569 assert_eq!(1, batch.fields().len());
1570 let v0 = batch
1571 .fields()
1572 .first()
1573 .unwrap()
1574 .data
1575 .as_any()
1576 .downcast_ref::<Int64Vector>()
1577 .unwrap();
1578 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1579 }
1580 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1581 }
1582
1583 #[test]
1584 fn test_memtable_concurrent_write_read() {
1585 common_telemetry::init_default_ut_logging();
1586 let schema = schema_for_test();
1587 let memtable = Arc::new(TimeSeriesMemtable::new(
1588 schema.clone(),
1589 42,
1590 None,
1591 true,
1592 MergeMode::LastRow,
1593 ));
1594
1595 let num_writers = 10;
1597 let num_readers = 5;
1599 let series_per_writer = 100;
1601 let rows_per_series = 10;
1603 let total_series = num_writers * series_per_writer;
1605
1606 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1608
1609 let mut writer_handles = Vec::with_capacity(num_writers);
1611 for writer_id in 0..num_writers {
1612 let memtable = memtable.clone();
1613 let schema = schema.clone();
1614 let barrier = barrier.clone();
1615
1616 let handle = std::thread::spawn(move || {
1617 barrier.wait();
1619
1620 for series_id in 0..series_per_writer {
1622 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1623 let kvs =
1624 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1625 memtable.write(&kvs).unwrap();
1626 }
1627 });
1628
1629 writer_handles.push(handle);
1630 }
1631
1632 let mut reader_handles = Vec::with_capacity(num_readers);
1634 for _ in 0..num_readers {
1635 let memtable = memtable.clone();
1636 let barrier = barrier.clone();
1637
1638 let handle = std::thread::spawn(move || {
1639 barrier.wait();
1640
1641 for _ in 0..10 {
1642 let iter = memtable.iter(None, None, None).unwrap();
1643 for batch_result in iter {
1644 let _ = batch_result.unwrap();
1645 }
1646 }
1647 });
1648
1649 reader_handles.push(handle);
1650 }
1651
1652 barrier.wait();
1653
1654 for handle in writer_handles {
1655 handle.join().unwrap();
1656 }
1657 for handle in reader_handles {
1658 handle.join().unwrap();
1659 }
1660
1661 let iter = memtable.iter(None, None, None).unwrap();
1662 let mut series_count = 0;
1663 let mut row_count = 0;
1664
1665 for batch_result in iter {
1666 let batch = batch_result.unwrap();
1667 series_count += 1;
1668 row_count += batch.num_rows();
1669 }
1670 assert_eq!(total_series, series_count);
1671 assert_eq!(total_series * rows_per_series, row_count);
1672 }
1673}