1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
55 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
56 PredicateGroup,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115}
116
117pub struct TimeSeriesMemtable {
119 id: MemtableId,
120 region_metadata: RegionMetadataRef,
121 row_codec: Arc<DensePrimaryKeyCodec>,
122 series_set: SeriesSet,
123 alloc_tracker: AllocTracker,
124 max_timestamp: AtomicI64,
125 min_timestamp: AtomicI64,
126 max_sequence: AtomicU64,
127 dedup: bool,
128 merge_mode: MergeMode,
129 num_rows: AtomicUsize,
131}
132
133impl TimeSeriesMemtable {
134 pub fn new(
135 region_metadata: RegionMetadataRef,
136 id: MemtableId,
137 write_buffer_manager: Option<WriteBufferManagerRef>,
138 dedup: bool,
139 merge_mode: MergeMode,
140 ) -> Self {
141 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
142 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
143 let dedup = if merge_mode == MergeMode::LastNonNull {
144 false
145 } else {
146 dedup
147 };
148 Self {
149 id,
150 region_metadata,
151 series_set,
152 row_codec,
153 alloc_tracker: AllocTracker::new(write_buffer_manager),
154 max_timestamp: AtomicI64::new(i64::MIN),
155 min_timestamp: AtomicI64::new(i64::MAX),
156 max_sequence: AtomicU64::new(0),
157 dedup,
158 merge_mode,
159 num_rows: Default::default(),
160 }
161 }
162
163 fn update_stats(&self, stats: WriteMetrics) {
165 self.alloc_tracker
166 .on_allocation(stats.key_bytes + stats.value_bytes);
167 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
168 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
169 self.max_sequence
170 .fetch_max(stats.max_sequence, Ordering::SeqCst);
171 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
172 }
173
174 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
175 ensure!(
176 self.row_codec.num_fields() == kv.num_primary_keys(),
177 PrimaryKeyLengthMismatchSnafu {
178 expect: self.row_codec.num_fields(),
179 actual: kv.num_primary_keys(),
180 }
181 );
182
183 let primary_key_encoded = self
184 .row_codec
185 .encode(kv.primary_keys())
186 .context(EncodeSnafu)?;
187
188 let (key_allocated, value_allocated) =
189 self.series_set.push_to_series(primary_key_encoded, &kv);
190 stats.key_bytes += key_allocated;
191 stats.value_bytes += value_allocated;
192
193 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
195 stats.min_ts = stats.min_ts.min(ts);
196 stats.max_ts = stats.max_ts.max(ts);
197 Ok(())
198 }
199}
200
201impl Debug for TimeSeriesMemtable {
202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("TimeSeriesMemtable").finish()
204 }
205}
206
207impl Memtable for TimeSeriesMemtable {
208 fn id(&self) -> MemtableId {
209 self.id
210 }
211
212 fn write(&self, kvs: &KeyValues) -> Result<()> {
213 if kvs.is_empty() {
214 return Ok(());
215 }
216
217 let mut local_stats = WriteMetrics::default();
218
219 for kv in kvs.iter() {
220 self.write_key_value(kv, &mut local_stats)?;
221 }
222 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
223 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
224 local_stats.max_sequence = kvs.max_sequence();
225 local_stats.num_rows = kvs.num_rows();
226 self.update_stats(local_stats);
230 Ok(())
231 }
232
233 fn write_one(&self, key_value: KeyValue) -> Result<()> {
234 let mut metrics = WriteMetrics::default();
235 let res = self.write_key_value(key_value, &mut metrics);
236 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
237 metrics.max_sequence = key_value.sequence();
238 metrics.num_rows = 1;
239
240 if res.is_ok() {
241 self.update_stats(metrics);
242 }
243 res
244 }
245
246 fn write_bulk(&self, part: BulkPart) -> Result<()> {
247 let mutation = part.to_mutation(&self.region_metadata)?;
249 let mut metrics = WriteMetrics::default();
250 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
251 for kv in key_values.iter() {
252 self.write_key_value(kv, &mut metrics)?
253 }
254 }
255
256 metrics.max_sequence = part.sequence;
257 metrics.max_ts = part.max_ts;
258 metrics.min_ts = part.min_ts;
259 metrics.num_rows = part.num_rows();
260 self.update_stats(metrics);
261 Ok(())
262 }
263
264 fn iter(
265 &self,
266 projection: Option<&[ColumnId]>,
267 filters: Option<Predicate>,
268 sequence: Option<SequenceNumber>,
269 ) -> Result<BoxedBatchIterator> {
270 let projection = if let Some(projection) = projection {
271 projection.iter().copied().collect()
272 } else {
273 self.region_metadata
274 .field_columns()
275 .map(|c| c.column_id)
276 .collect()
277 };
278
279 let iter = self
280 .series_set
281 .iter_series(projection, filters, self.dedup, sequence)?;
282
283 if self.merge_mode == MergeMode::LastNonNull {
284 let iter = LastNonNullIter::new(iter);
285 Ok(Box::new(iter))
286 } else {
287 Ok(Box::new(iter))
288 }
289 }
290
291 fn ranges(
292 &self,
293 projection: Option<&[ColumnId]>,
294 predicate: PredicateGroup,
295 sequence: Option<SequenceNumber>,
296 ) -> Result<MemtableRanges> {
297 let projection = if let Some(projection) = projection {
298 projection.iter().copied().collect()
299 } else {
300 self.region_metadata
301 .field_columns()
302 .map(|c| c.column_id)
303 .collect()
304 };
305 let builder = Box::new(TimeSeriesIterBuilder {
306 series_set: self.series_set.clone(),
307 projection,
308 predicate: predicate.predicate().cloned(),
309 dedup: self.dedup,
310 merge_mode: self.merge_mode,
311 sequence,
312 });
313 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
314
315 Ok(MemtableRanges {
316 ranges: [(0, MemtableRange::new(context))].into(),
317 stats: self.stats(),
318 })
319 }
320
321 fn is_empty(&self) -> bool {
322 self.series_set.series.read().unwrap().0.is_empty()
323 }
324
325 fn freeze(&self) -> Result<()> {
326 self.alloc_tracker.done_allocating();
327
328 Ok(())
329 }
330
331 fn stats(&self) -> MemtableStats {
332 let estimated_bytes = self.alloc_tracker.bytes_allocated();
333
334 if estimated_bytes == 0 {
335 return MemtableStats {
337 estimated_bytes,
338 time_range: None,
339 num_rows: 0,
340 num_ranges: 0,
341 max_sequence: 0,
342 series_count: 0,
343 };
344 }
345 let ts_type = self
346 .region_metadata
347 .time_index_column()
348 .column_schema
349 .data_type
350 .clone()
351 .as_timestamp()
352 .expect("Timestamp column must have timestamp type");
353 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
354 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
355 let series_count = self.series_set.series.read().unwrap().0.len();
356 MemtableStats {
357 estimated_bytes,
358 time_range: Some((min_timestamp, max_timestamp)),
359 num_rows: self.num_rows.load(Ordering::Relaxed),
360 num_ranges: 1,
361 max_sequence: self.max_sequence.load(Ordering::Relaxed),
362 series_count,
363 }
364 }
365
366 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
367 Arc::new(TimeSeriesMemtable::new(
368 metadata.clone(),
369 id,
370 self.alloc_tracker.write_buffer_manager(),
371 self.dedup,
372 self.merge_mode,
373 ))
374 }
375}
376
377#[derive(Default)]
378struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
379
380impl Drop for SeriesMap {
381 fn drop(&mut self) {
382 let num_series = self.0.len();
383 let num_field_builders = self
384 .0
385 .values()
386 .map(|v| v.read().unwrap().active.num_field_builders())
387 .sum::<usize>();
388 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
389 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
390 }
391}
392
393#[derive(Clone)]
394pub(crate) struct SeriesSet {
395 region_metadata: RegionMetadataRef,
396 series: Arc<RwLock<SeriesMap>>,
397 codec: Arc<DensePrimaryKeyCodec>,
398}
399
400impl SeriesSet {
401 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
402 Self {
403 region_metadata,
404 series: Default::default(),
405 codec,
406 }
407 }
408}
409
410impl SeriesSet {
411 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
413 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
414 let value_allocated = series.write().unwrap().push(
415 kv.timestamp(),
416 kv.sequence(),
417 kv.op_type(),
418 kv.fields(),
419 );
420 return (0, value_allocated);
421 };
422
423 let mut indices = self.series.write().unwrap();
424 match indices.0.entry(primary_key) {
425 Entry::Vacant(v) => {
426 let key_len = v.key().len();
427 let mut series = Series::new(&self.region_metadata);
428 let value_allocated =
429 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
430 v.insert(Arc::new(RwLock::new(series)));
431 (key_len, value_allocated)
432 }
433 Entry::Occupied(v) => {
435 let value_allocated = v.get().write().unwrap().push(
436 kv.timestamp(),
437 kv.sequence(),
438 kv.op_type(),
439 kv.fields(),
440 );
441 (0, value_allocated)
442 }
443 }
444 }
445
446 #[cfg(test)]
447 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
448 self.series.read().unwrap().0.get(primary_key).cloned()
449 }
450
451 fn iter_series(
453 &self,
454 projection: HashSet<ColumnId>,
455 predicate: Option<Predicate>,
456 dedup: bool,
457 sequence: Option<SequenceNumber>,
458 ) -> Result<Iter> {
459 let primary_key_schema = primary_key_schema(&self.region_metadata);
460 let primary_key_datatypes = self
461 .region_metadata
462 .primary_key_columns()
463 .map(|pk| pk.column_schema.data_type.clone())
464 .collect();
465
466 Iter::try_new(
467 self.region_metadata.clone(),
468 self.series.clone(),
469 projection,
470 predicate,
471 primary_key_schema,
472 primary_key_datatypes,
473 self.codec.clone(),
474 dedup,
475 sequence,
476 )
477 }
478}
479
480pub(crate) fn primary_key_schema(
483 region_metadata: &RegionMetadataRef,
484) -> arrow::datatypes::SchemaRef {
485 let fields = region_metadata
486 .primary_key_columns()
487 .map(|pk| {
488 arrow::datatypes::Field::new(
489 pk.column_schema.name.clone(),
490 pk.column_schema.data_type.as_arrow_type(),
491 pk.column_schema.is_nullable(),
492 )
493 })
494 .collect::<Vec<_>>();
495 Arc::new(arrow::datatypes::Schema::new(fields))
496}
497
498#[derive(Debug, Default)]
500struct Metrics {
501 total_series: usize,
503 num_pruned_series: usize,
505 num_rows: usize,
507 num_batches: usize,
509 scan_cost: Duration,
511}
512
513struct Iter {
514 metadata: RegionMetadataRef,
515 series: Arc<RwLock<SeriesMap>>,
516 projection: HashSet<ColumnId>,
517 last_key: Option<Vec<u8>>,
518 predicate: Vec<SimpleFilterEvaluator>,
519 pk_schema: arrow::datatypes::SchemaRef,
520 pk_datatypes: Vec<ConcreteDataType>,
521 codec: Arc<DensePrimaryKeyCodec>,
522 dedup: bool,
523 sequence: Option<SequenceNumber>,
524 metrics: Metrics,
525}
526
527impl Iter {
528 #[allow(clippy::too_many_arguments)]
529 pub(crate) fn try_new(
530 metadata: RegionMetadataRef,
531 series: Arc<RwLock<SeriesMap>>,
532 projection: HashSet<ColumnId>,
533 predicate: Option<Predicate>,
534 pk_schema: arrow::datatypes::SchemaRef,
535 pk_datatypes: Vec<ConcreteDataType>,
536 codec: Arc<DensePrimaryKeyCodec>,
537 dedup: bool,
538 sequence: Option<SequenceNumber>,
539 ) -> Result<Self> {
540 let predicate = predicate
541 .map(|predicate| {
542 predicate
543 .exprs()
544 .iter()
545 .filter_map(SimpleFilterEvaluator::try_new)
546 .collect::<Vec<_>>()
547 })
548 .unwrap_or_default();
549 Ok(Self {
550 metadata,
551 series,
552 projection,
553 last_key: None,
554 predicate,
555 pk_schema,
556 pk_datatypes,
557 codec,
558 dedup,
559 sequence,
560 metrics: Metrics::default(),
561 })
562 }
563}
564
565impl Drop for Iter {
566 fn drop(&mut self) {
567 debug!(
568 "Iter {} time series memtable, metrics: {:?}",
569 self.metadata.region_id, self.metrics
570 );
571
572 READ_ROWS_TOTAL
573 .with_label_values(&["time_series_memtable"])
574 .inc_by(self.metrics.num_rows as u64);
575 READ_STAGE_ELAPSED
576 .with_label_values(&["scan_memtable"])
577 .observe(self.metrics.scan_cost.as_secs_f64());
578 }
579}
580
581impl Iterator for Iter {
582 type Item = Result<Batch>;
583
584 fn next(&mut self) -> Option<Self::Item> {
585 let start = Instant::now();
586 let map = self.series.read().unwrap();
587 let range = match &self.last_key {
588 None => map.0.range::<Vec<u8>, _>(..),
589 Some(last_key) => map
590 .0
591 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
592 };
593
594 for (primary_key, series) in range {
596 self.metrics.total_series += 1;
597
598 let mut series = series.write().unwrap();
599 if !self.predicate.is_empty()
600 && !prune_primary_key(
601 &self.codec,
602 primary_key.as_slice(),
603 &mut series,
604 &self.pk_datatypes,
605 self.pk_schema.clone(),
606 &self.predicate,
607 )
608 {
609 self.metrics.num_pruned_series += 1;
611 continue;
612 }
613 self.last_key = Some(primary_key.clone());
614
615 let values = series.compact(&self.metadata);
616 let batch = values.and_then(|v| {
617 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
618 });
619
620 self.metrics.num_batches += 1;
622 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
623 self.metrics.scan_cost += start.elapsed();
624
625 let mut batch = batch;
626 batch = batch.and_then(|mut batch| {
627 batch.filter_by_sequence(self.sequence)?;
628 Ok(batch)
629 });
630 return Some(batch);
631 }
632 self.metrics.scan_cost += start.elapsed();
633
634 None
635 }
636}
637
638fn prune_primary_key(
639 codec: &Arc<DensePrimaryKeyCodec>,
640 pk: &[u8],
641 series: &mut Series,
642 datatypes: &[ConcreteDataType],
643 pk_schema: arrow::datatypes::SchemaRef,
644 predicates: &[SimpleFilterEvaluator],
645) -> bool {
646 if pk_schema.fields().is_empty() {
648 return true;
649 }
650
651 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
653 pk_values
654 } else {
655 let pk_values = codec.decode_dense_without_column_id(pk);
656 if let Err(e) = pk_values {
657 error!(e; "Failed to decode primary key");
658 return true;
659 }
660 series.update_pk_cache(pk_values.unwrap());
661 series.pk_cache.as_ref().unwrap()
662 };
663
664 let mut result = true;
666 for predicate in predicates {
667 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
669 continue;
670 };
671 let scalar_value = pk_values[index]
673 .try_to_scalar_value(&datatypes[index])
674 .unwrap();
675 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
676 }
677
678 result
679}
680
681pub(crate) struct Series {
683 pk_cache: Option<Vec<Value>>,
684 active: ValueBuilder,
685 frozen: Vec<Values>,
686 region_metadata: RegionMetadataRef,
687}
688
689impl Series {
690 pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
691 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
692 Self {
693 pk_cache: None,
694 active: ValueBuilder::new(region_metadata, builder_cap),
695 frozen: vec![],
696 region_metadata: region_metadata.clone(),
697 }
698 }
699
700 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
701 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
702 }
703
704 pub fn is_empty(&self) -> bool {
705 self.active.len() == 0 && self.frozen.is_empty()
706 }
707
708 pub(crate) fn push<'a>(
710 &mut self,
711 ts: ValueRef<'a>,
712 sequence: u64,
713 op_type: OpType,
714 values: impl Iterator<Item = ValueRef<'a>>,
715 ) -> usize {
716 if self.active.len() + 10 > BUILDER_CAPACITY {
718 let region_metadata = self.region_metadata.clone();
719 self.freeze(®ion_metadata);
720 }
721 self.active.push(ts, sequence, op_type as u8, values)
722 }
723
724 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
725 self.pk_cache = Some(pk_values);
726 }
727
728 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
730 if self.active.len() != 0 {
731 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
732 std::mem::swap(&mut self.active, &mut builder);
733 self.frozen.push(Values::from(builder));
734 }
735 }
736
737 pub(crate) fn extend(
738 &mut self,
739 ts_v: VectorRef,
740 op_type_v: u8,
741 sequence_v: u64,
742 fields: impl Iterator<Item = VectorRef>,
743 ) -> Result<()> {
744 self.active.extend(ts_v, op_type_v, sequence_v, fields)
745 }
746
747 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
750 self.freeze(region_metadata);
751
752 let frozen = &self.frozen;
753
754 debug_assert!(!frozen.is_empty());
756
757 if frozen.len() > 1 {
758 let column_size = frozen[0].fields.len() + 3;
762
763 if cfg!(debug_assertions) {
764 debug_assert!(frozen
765 .iter()
766 .zip(frozen.iter().skip(1))
767 .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
768 }
769
770 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
771 let concatenated = (0..column_size)
772 .map(|i| {
773 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
774 arrow::compute::concat(&to_concat)
775 })
776 .collect::<std::result::Result<Vec<_>, _>>()
777 .context(ComputeArrowSnafu)?;
778
779 debug_assert_eq!(concatenated.len(), column_size);
780 let values = Values::from_columns(&concatenated)?;
781 self.frozen = vec![values];
782 };
783 Ok(&self.frozen[0])
784 }
785}
786
787struct ValueBuilder {
789 timestamp: Vec<i64>,
790 timestamp_type: ConcreteDataType,
791 sequence: Vec<u64>,
792 op_type: Vec<u8>,
793 fields: Vec<Option<FieldBuilder>>,
794 field_types: Vec<ConcreteDataType>,
795}
796
797impl ValueBuilder {
798 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
799 let timestamp_type = region_metadata
800 .time_index_column()
801 .column_schema
802 .data_type
803 .clone();
804 let sequence = Vec::with_capacity(capacity);
805 let op_type = Vec::with_capacity(capacity);
806
807 let field_types = region_metadata
808 .field_columns()
809 .map(|c| c.column_schema.data_type.clone())
810 .collect::<Vec<_>>();
811 let fields = (0..field_types.len()).map(|_| None).collect();
812 Self {
813 timestamp: Vec::with_capacity(capacity),
814 timestamp_type,
815 sequence,
816 op_type,
817 fields,
818 field_types,
819 }
820 }
821
822 pub fn num_field_builders(&self) -> usize {
824 self.fields.iter().flatten().count()
825 }
826
827 fn push<'a>(
833 &mut self,
834 ts: ValueRef,
835 sequence: u64,
836 op_type: u8,
837 fields: impl Iterator<Item = ValueRef<'a>>,
838 ) -> usize {
839 #[cfg(debug_assertions)]
840 let fields = {
841 let field_vec = fields.collect::<Vec<_>>();
842 debug_assert_eq!(field_vec.len(), self.fields.len());
843 field_vec.into_iter()
844 };
845
846 self.timestamp
847 .push(ts.as_timestamp().unwrap().unwrap().value());
848 self.sequence.push(sequence);
849 self.op_type.push(op_type);
850 let num_rows = self.timestamp.len();
851 let mut size = 0;
852 for (idx, field_value) in fields.enumerate() {
853 size += field_value.data_size();
854 if !field_value.is_null() || self.fields[idx].is_some() {
855 if let Some(field) = self.fields[idx].as_mut() {
856 let _ = field.push(field_value);
857 } else {
858 let mut mutable_vector =
859 if let ConcreteDataType::String(_) = &self.field_types[idx] {
860 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
861 } else {
862 FieldBuilder::Other(
863 self.field_types[idx]
864 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
865 )
866 };
867 mutable_vector.push_nulls(num_rows - 1);
868 let _ = mutable_vector.push(field_value);
869 self.fields[idx] = Some(mutable_vector);
870 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
871 }
872 }
873 }
874
875 size
876 }
877
878 pub(crate) fn extend(
879 &mut self,
880 ts_v: VectorRef,
881 op_type: u8,
882 sequence: u64,
883 fields: impl Iterator<Item = VectorRef>,
884 ) -> error::Result<()> {
885 let num_rows_before = self.timestamp.len();
886 let num_rows_to_write = ts_v.len();
887 self.timestamp.reserve(num_rows_to_write);
888 match self.timestamp_type {
889 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
890 self.timestamp.extend(
891 ts_v.as_any()
892 .downcast_ref::<TimestampSecondVector>()
893 .unwrap()
894 .iter_data()
895 .map(|v| v.unwrap().0.value()),
896 );
897 }
898 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
899 self.timestamp.extend(
900 ts_v.as_any()
901 .downcast_ref::<TimestampMillisecondVector>()
902 .unwrap()
903 .iter_data()
904 .map(|v| v.unwrap().0.value()),
905 );
906 }
907 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
908 self.timestamp.extend(
909 ts_v.as_any()
910 .downcast_ref::<TimestampMicrosecondVector>()
911 .unwrap()
912 .iter_data()
913 .map(|v| v.unwrap().0.value()),
914 );
915 }
916 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
917 self.timestamp.extend(
918 ts_v.as_any()
919 .downcast_ref::<TimestampNanosecondVector>()
920 .unwrap()
921 .iter_data()
922 .map(|v| v.unwrap().0.value()),
923 );
924 }
925 _ => unreachable!(),
926 };
927
928 self.op_type.reserve(num_rows_to_write);
929 self.op_type
930 .extend(iter::repeat_n(op_type, num_rows_to_write));
931 self.sequence.reserve(num_rows_to_write);
932 self.sequence
933 .extend(iter::repeat_n(sequence, num_rows_to_write));
934
935 for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
936 let builder = field_dest.get_or_insert_with(|| {
937 let mut field_builder =
938 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
939 field_builder.push_nulls(num_rows_before);
940 field_builder
941 });
942 match builder {
943 FieldBuilder::String(builder) => {
944 let array = field_src.to_arrow_array();
945 let string_array =
946 array
947 .as_any()
948 .downcast_ref::<StringArray>()
949 .with_context(|| error::InvalidBatchSnafu {
950 reason: format!(
951 "Field type mismatch, expecting String, given: {}",
952 field_src.data_type()
953 ),
954 })?;
955 builder.append_array(string_array);
956 }
957 FieldBuilder::Other(builder) => {
958 let len = field_src.len();
959 builder
960 .extend_slice_of(&*field_src, 0, len)
961 .context(error::ComputeVectorSnafu)?;
962 }
963 }
964 }
965 Ok(())
966 }
967
968 fn len(&self) -> usize {
970 let sequence_len = self.sequence.len();
971 debug_assert_eq!(sequence_len, self.op_type.len());
972 debug_assert_eq!(sequence_len, self.timestamp.len());
973 sequence_len
974 }
975}
976
977#[derive(Clone)]
979pub(crate) struct Values {
980 timestamp: VectorRef,
981 sequence: Arc<UInt64Vector>,
982 op_type: Arc<UInt8Vector>,
983 fields: Vec<VectorRef>,
984}
985
986impl Values {
987 pub fn to_batch(
990 &self,
991 primary_key: &[u8],
992 metadata: &RegionMetadataRef,
993 projection: &HashSet<ColumnId>,
994 dedup: bool,
995 ) -> Result<Batch> {
996 let builder = BatchBuilder::with_required_columns(
997 primary_key.to_vec(),
998 self.timestamp.clone(),
999 self.sequence.clone(),
1000 self.op_type.clone(),
1001 );
1002
1003 let fields = metadata
1004 .field_columns()
1005 .zip(self.fields.iter())
1006 .filter_map(|(c, f)| {
1007 projection.get(&c.column_id).map(|c| BatchColumn {
1008 column_id: *c,
1009 data: f.clone(),
1010 })
1011 })
1012 .collect();
1013
1014 let mut batch = builder.with_fields(fields).build()?;
1015 batch.sort(dedup)?;
1016 Ok(batch)
1017 }
1018
1019 fn columns(&self) -> Vec<ArrayRef> {
1021 let mut res = Vec::with_capacity(3 + self.fields.len());
1022 res.push(self.timestamp.to_arrow_array());
1023 res.push(self.sequence.to_arrow_array());
1024 res.push(self.op_type.to_arrow_array());
1025 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1026 res
1027 }
1028
1029 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1031 debug_assert!(cols.len() >= 3);
1032 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1033 let sequence =
1034 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1035 let op_type =
1036 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1037 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1038
1039 Ok(Self {
1040 timestamp,
1041 sequence,
1042 op_type,
1043 fields,
1044 })
1045 }
1046}
1047
1048impl From<ValueBuilder> for Values {
1049 fn from(mut value: ValueBuilder) -> Self {
1050 let num_rows = value.len();
1051 let fields = value
1052 .fields
1053 .iter_mut()
1054 .enumerate()
1055 .map(|(i, v)| {
1056 if let Some(v) = v {
1057 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1058 v.finish()
1059 } else {
1060 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1061 single_null.push_nulls(num_rows);
1062 single_null.to_vector()
1063 }
1064 })
1065 .collect::<Vec<_>>();
1066
1067 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1068 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1069 let timestamp: VectorRef = match value.timestamp_type {
1070 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1071 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1072 }
1073 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1074 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1075 }
1076 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1077 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1078 }
1079 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1080 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1081 }
1082 _ => unreachable!(),
1083 };
1084
1085 if cfg!(debug_assertions) {
1086 debug_assert_eq!(timestamp.len(), sequence.len());
1087 debug_assert_eq!(timestamp.len(), op_type.len());
1088 for field in &fields {
1089 debug_assert_eq!(timestamp.len(), field.len());
1090 }
1091 }
1092
1093 Self {
1094 timestamp,
1095 sequence,
1096 op_type,
1097 fields,
1098 }
1099 }
1100}
1101
1102struct TimeSeriesIterBuilder {
1103 series_set: SeriesSet,
1104 projection: HashSet<ColumnId>,
1105 predicate: Option<Predicate>,
1106 dedup: bool,
1107 sequence: Option<SequenceNumber>,
1108 merge_mode: MergeMode,
1109}
1110
1111impl IterBuilder for TimeSeriesIterBuilder {
1112 fn build(&self) -> Result<BoxedBatchIterator> {
1113 let iter = self.series_set.iter_series(
1114 self.projection.clone(),
1115 self.predicate.clone(),
1116 self.dedup,
1117 self.sequence,
1118 )?;
1119
1120 if self.merge_mode == MergeMode::LastNonNull {
1121 let iter = LastNonNullIter::new(iter);
1122 Ok(Box::new(iter))
1123 } else {
1124 Ok(Box::new(iter))
1125 }
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use std::collections::{HashMap, HashSet};
1132
1133 use api::helper::ColumnDataTypeWrapper;
1134 use api::v1::value::ValueData;
1135 use api::v1::{Mutation, Row, Rows, SemanticType};
1136 use common_time::Timestamp;
1137 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1138 use datatypes::schema::ColumnSchema;
1139 use datatypes::value::{OrderedFloat, Value};
1140 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1141 use mito_codec::row_converter::SortField;
1142 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1143 use store_api::storage::RegionId;
1144
1145 use super::*;
1146 use crate::test_util::column_metadata_to_column_schema;
1147
1148 fn schema_for_test() -> RegionMetadataRef {
1149 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1150 builder
1151 .push_column_metadata(ColumnMetadata {
1152 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1153 semantic_type: SemanticType::Tag,
1154 column_id: 0,
1155 })
1156 .push_column_metadata(ColumnMetadata {
1157 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1158 semantic_type: SemanticType::Tag,
1159 column_id: 1,
1160 })
1161 .push_column_metadata(ColumnMetadata {
1162 column_schema: ColumnSchema::new(
1163 "ts",
1164 ConcreteDataType::timestamp_millisecond_datatype(),
1165 false,
1166 ),
1167 semantic_type: SemanticType::Timestamp,
1168 column_id: 2,
1169 })
1170 .push_column_metadata(ColumnMetadata {
1171 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1172 semantic_type: SemanticType::Field,
1173 column_id: 3,
1174 })
1175 .push_column_metadata(ColumnMetadata {
1176 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1177 semantic_type: SemanticType::Field,
1178 column_id: 4,
1179 })
1180 .primary_key(vec![0, 1]);
1181 let region_metadata = builder.build().unwrap();
1182 Arc::new(region_metadata)
1183 }
1184
1185 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1186 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1187 }
1188
1189 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1190 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1191 }
1192
1193 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1194 let ts = values
1195 .timestamp
1196 .as_any()
1197 .downcast_ref::<TimestampMillisecondVector>()
1198 .unwrap();
1199
1200 let v0 = values.fields[0]
1201 .as_any()
1202 .downcast_ref::<Int64Vector>()
1203 .unwrap();
1204 let v1 = values.fields[1]
1205 .as_any()
1206 .downcast_ref::<Float64Vector>()
1207 .unwrap();
1208 let read = ts
1209 .iter_data()
1210 .zip(values.sequence.iter_data())
1211 .zip(values.op_type.iter_data())
1212 .zip(v0.iter_data())
1213 .zip(v1.iter_data())
1214 .map(|((((ts, sequence), op_type), v0), v1)| {
1215 (
1216 ts.unwrap().0.value(),
1217 sequence.unwrap(),
1218 op_type.unwrap(),
1219 v0.unwrap(),
1220 v1.unwrap(),
1221 )
1222 })
1223 .collect::<Vec<_>>();
1224 assert_eq!(expect, &read);
1225 }
1226
1227 #[test]
1228 fn test_series() {
1229 let region_metadata = schema_for_test();
1230 let mut series = Series::new(®ion_metadata);
1231 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1232 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1233 assert_eq!(2, series.active.timestamp.len());
1234 assert_eq!(0, series.frozen.len());
1235
1236 let values = series.compact(®ion_metadata).unwrap();
1237 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1238 assert_eq!(0, series.active.timestamp.len());
1239 assert_eq!(1, series.frozen.len());
1240 }
1241
1242 #[test]
1243 fn test_series_with_nulls() {
1244 let region_metadata = schema_for_test();
1245 let mut series = Series::new(®ion_metadata);
1246 series.push(
1249 ts_value_ref(1),
1250 0,
1251 OpType::Put,
1252 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1253 );
1254 series.push(
1255 ts_value_ref(1),
1256 0,
1257 OpType::Put,
1258 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1259 );
1260 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1261 series.push(
1262 ts_value_ref(1),
1263 3,
1264 OpType::Put,
1265 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1266 );
1267 assert_eq!(4, series.active.timestamp.len());
1268 assert_eq!(0, series.frozen.len());
1269
1270 let values = series.compact(®ion_metadata).unwrap();
1271 assert_eq!(values.fields[0].null_count(), 1);
1272 assert_eq!(values.fields[1].null_count(), 3);
1273 assert_eq!(0, series.active.timestamp.len());
1274 assert_eq!(1, series.frozen.len());
1275 }
1276
1277 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1278 let ts_len = batch.timestamps().len();
1279 assert_eq!(batch.sequences().len(), ts_len);
1280 assert_eq!(batch.op_types().len(), ts_len);
1281 for f in batch.fields() {
1282 assert_eq!(f.data.len(), ts_len);
1283 }
1284
1285 let mut rows = vec![];
1286 for idx in 0..ts_len {
1287 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1288 row.push(batch.timestamps().get(idx));
1289 row.push(batch.sequences().get(idx));
1290 row.push(batch.op_types().get(idx));
1291 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1292 rows.push(row);
1293 }
1294
1295 assert_eq!(expect.len(), rows.len());
1296 for (idx, row) in rows.iter().enumerate() {
1297 assert_eq!(&expect[idx], row);
1298 }
1299 }
1300
1301 #[test]
1302 fn test_values_sort() {
1303 let schema = schema_for_test();
1304 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1305 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1306 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1307
1308 let fields = vec![
1309 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1310 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1311 ];
1312 let values = Values {
1313 timestamp: timestamp as Arc<_>,
1314 sequence,
1315 op_type,
1316 fields,
1317 };
1318
1319 let batch = values
1320 .to_batch(
1321 b"test",
1322 &schema,
1323 &[0, 1, 2, 3, 4].into_iter().collect(),
1324 true,
1325 )
1326 .unwrap();
1327 check_value(
1328 &batch,
1329 vec![
1330 vec![
1331 Value::Timestamp(Timestamp::new_millisecond(1)),
1332 Value::UInt64(1),
1333 Value::UInt8(1),
1334 Value::Int64(4),
1335 Value::Float64(OrderedFloat(1.1)),
1336 ],
1337 vec![
1338 Value::Timestamp(Timestamp::new_millisecond(2)),
1339 Value::UInt64(1),
1340 Value::UInt8(1),
1341 Value::Int64(3),
1342 Value::Float64(OrderedFloat(2.1)),
1343 ],
1344 vec![
1345 Value::Timestamp(Timestamp::new_millisecond(3)),
1346 Value::UInt64(2),
1347 Value::UInt8(0),
1348 Value::Int64(2),
1349 Value::Float64(OrderedFloat(4.2)),
1350 ],
1351 vec![
1352 Value::Timestamp(Timestamp::new_millisecond(4)),
1353 Value::UInt64(1),
1354 Value::UInt8(1),
1355 Value::Int64(1),
1356 Value::Float64(OrderedFloat(3.3)),
1357 ],
1358 ],
1359 )
1360 }
1361
1362 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1363 let column_schema = schema
1364 .column_metadatas
1365 .iter()
1366 .map(|c| api::v1::ColumnSchema {
1367 column_name: c.column_schema.name.clone(),
1368 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1369 .unwrap()
1370 .datatype() as i32,
1371 semantic_type: c.semantic_type as i32,
1372 ..Default::default()
1373 })
1374 .collect();
1375
1376 let rows = (0..len)
1377 .map(|i| Row {
1378 values: vec![
1379 api::v1::Value {
1380 value_data: Some(ValueData::StringValue(k0.clone())),
1381 },
1382 api::v1::Value {
1383 value_data: Some(ValueData::I64Value(k1)),
1384 },
1385 api::v1::Value {
1386 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1387 },
1388 api::v1::Value {
1389 value_data: Some(ValueData::I64Value(i as i64)),
1390 },
1391 api::v1::Value {
1392 value_data: Some(ValueData::F64Value(i as f64)),
1393 },
1394 ],
1395 })
1396 .collect();
1397 let mutation = api::v1::Mutation {
1398 op_type: 1,
1399 sequence: 0,
1400 rows: Some(Rows {
1401 schema: column_schema,
1402 rows,
1403 }),
1404 write_hint: None,
1405 };
1406 KeyValues::new(schema.as_ref(), mutation).unwrap()
1407 }
1408
1409 #[test]
1410 fn test_series_set_concurrency() {
1411 let schema = schema_for_test();
1412 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1413 schema
1414 .primary_key_columns()
1415 .map(|c| {
1416 (
1417 c.column_id,
1418 SortField::new(c.column_schema.data_type.clone()),
1419 )
1420 })
1421 .collect(),
1422 ));
1423 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1424
1425 let concurrency = 32;
1426 let pk_num = concurrency * 2;
1427 let mut handles = Vec::with_capacity(concurrency);
1428 for i in 0..concurrency {
1429 let set = set.clone();
1430 let schema = schema.clone();
1431 let column_schemas = schema
1432 .column_metadatas
1433 .iter()
1434 .map(column_metadata_to_column_schema)
1435 .collect::<Vec<_>>();
1436 let handle = std::thread::spawn(move || {
1437 for j in i * 100..(i + 1) * 100 {
1438 let pk = j % pk_num;
1439 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1440
1441 let kvs = KeyValues::new(
1442 &schema,
1443 Mutation {
1444 op_type: OpType::Put as i32,
1445 sequence: j as u64,
1446 rows: Some(Rows {
1447 schema: column_schemas.clone(),
1448 rows: vec![Row {
1449 values: vec![
1450 api::v1::Value {
1451 value_data: Some(ValueData::StringValue(format!(
1452 "{}",
1453 j
1454 ))),
1455 },
1456 api::v1::Value {
1457 value_data: Some(ValueData::I64Value(j as i64)),
1458 },
1459 api::v1::Value {
1460 value_data: Some(ValueData::TimestampMillisecondValue(
1461 j as i64,
1462 )),
1463 },
1464 api::v1::Value {
1465 value_data: Some(ValueData::I64Value(j as i64)),
1466 },
1467 api::v1::Value {
1468 value_data: Some(ValueData::F64Value(j as f64)),
1469 },
1470 ],
1471 }],
1472 }),
1473 write_hint: None,
1474 },
1475 )
1476 .unwrap();
1477 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1478 }
1479 });
1480 handles.push(handle);
1481 }
1482 for h in handles {
1483 h.join().unwrap();
1484 }
1485
1486 let mut timestamps = Vec::with_capacity(concurrency * 100);
1487 let mut sequences = Vec::with_capacity(concurrency * 100);
1488 let mut op_types = Vec::with_capacity(concurrency * 100);
1489 let mut v0 = Vec::with_capacity(concurrency * 100);
1490
1491 for i in 0..pk_num {
1492 let pk = format!("pk-{}", i).as_bytes().to_vec();
1493 let series = set.get_series(&pk).unwrap();
1494 let mut guard = series.write().unwrap();
1495 let values = guard.compact(&schema).unwrap();
1496 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1497 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1498 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1499 v0.extend(
1500 values
1501 .fields
1502 .first()
1503 .unwrap()
1504 .as_any()
1505 .downcast_ref::<Int64Vector>()
1506 .unwrap()
1507 .iter_data()
1508 .map(|v| v.unwrap()),
1509 );
1510 }
1511
1512 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1513 assert_eq!(
1514 expected_sequence,
1515 sequences.iter().copied().collect::<HashSet<_>>()
1516 );
1517
1518 op_types.iter().all(|op| *op == OpType::Put as u8);
1519 assert_eq!(
1520 expected_sequence,
1521 timestamps.iter().copied().collect::<HashSet<_>>()
1522 );
1523
1524 assert_eq!(timestamps, sequences);
1525 assert_eq!(v0, timestamps);
1526 }
1527
1528 #[test]
1529 fn test_memtable() {
1530 common_telemetry::init_default_ut_logging();
1531 check_memtable_dedup(true);
1532 check_memtable_dedup(false);
1533 }
1534
1535 fn check_memtable_dedup(dedup: bool) {
1536 let schema = schema_for_test();
1537 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1538 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1539 memtable.write(&kvs).unwrap();
1540 memtable.write(&kvs).unwrap();
1541
1542 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1543 for ts in kvs
1544 .iter()
1545 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1546 {
1547 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1548 }
1549
1550 let iter = memtable.iter(None, None, None).unwrap();
1551 let mut read = HashMap::new();
1552
1553 for ts in iter
1554 .flat_map(|batch| {
1555 batch
1556 .unwrap()
1557 .timestamps()
1558 .as_any()
1559 .downcast_ref::<TimestampMillisecondVector>()
1560 .unwrap()
1561 .iter_data()
1562 .collect::<Vec<_>>()
1563 .into_iter()
1564 })
1565 .map(|v| v.unwrap().0.value())
1566 {
1567 *read.entry(ts).or_default() += 1;
1568 }
1569 assert_eq!(expected_ts, read);
1570
1571 let stats = memtable.stats();
1572 assert!(stats.bytes_allocated() > 0);
1573 assert_eq!(
1574 Some((
1575 Timestamp::new_millisecond(0),
1576 Timestamp::new_millisecond(99)
1577 )),
1578 stats.time_range()
1579 );
1580 }
1581
1582 #[test]
1583 fn test_memtable_projection() {
1584 common_telemetry::init_default_ut_logging();
1585 let schema = schema_for_test();
1586 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1587 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1588 memtable.write(&kvs).unwrap();
1589
1590 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1591
1592 let mut v0_all = vec![];
1593
1594 for res in iter {
1595 let batch = res.unwrap();
1596 assert_eq!(1, batch.fields().len());
1597 let v0 = batch
1598 .fields()
1599 .first()
1600 .unwrap()
1601 .data
1602 .as_any()
1603 .downcast_ref::<Int64Vector>()
1604 .unwrap();
1605 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1606 }
1607 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1608 }
1609
1610 #[test]
1611 fn test_memtable_concurrent_write_read() {
1612 common_telemetry::init_default_ut_logging();
1613 let schema = schema_for_test();
1614 let memtable = Arc::new(TimeSeriesMemtable::new(
1615 schema.clone(),
1616 42,
1617 None,
1618 true,
1619 MergeMode::LastRow,
1620 ));
1621
1622 let num_writers = 10;
1624 let num_readers = 5;
1626 let series_per_writer = 100;
1628 let rows_per_series = 10;
1630 let total_series = num_writers * series_per_writer;
1632
1633 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1635
1636 let mut writer_handles = Vec::with_capacity(num_writers);
1638 for writer_id in 0..num_writers {
1639 let memtable = memtable.clone();
1640 let schema = schema.clone();
1641 let barrier = barrier.clone();
1642
1643 let handle = std::thread::spawn(move || {
1644 barrier.wait();
1646
1647 for series_id in 0..series_per_writer {
1649 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1650 let kvs =
1651 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1652 memtable.write(&kvs).unwrap();
1653 }
1654 });
1655
1656 writer_handles.push(handle);
1657 }
1658
1659 let mut reader_handles = Vec::with_capacity(num_readers);
1661 for _ in 0..num_readers {
1662 let memtable = memtable.clone();
1663 let barrier = barrier.clone();
1664
1665 let handle = std::thread::spawn(move || {
1666 barrier.wait();
1667
1668 for _ in 0..10 {
1669 let iter = memtable.iter(None, None, None).unwrap();
1670 for batch_result in iter {
1671 let _ = batch_result.unwrap();
1672 }
1673 }
1674 });
1675
1676 reader_handles.push(handle);
1677 }
1678
1679 barrier.wait();
1680
1681 for handle in writer_handles {
1682 handle.join().unwrap();
1683 }
1684 for handle in reader_handles {
1685 handle.join().unwrap();
1686 }
1687
1688 let iter = memtable.iter(None, None, None).unwrap();
1689 let mut series_count = 0;
1690 let mut row_count = 0;
1691
1692 for batch_result in iter {
1693 let batch = batch_result.unwrap();
1694 series_count += 1;
1695 row_count += batch.num_rows();
1696 }
1697 assert_eq!(total_series, series_count);
1698 assert_eq!(total_series * rows_per_series, row_count);
1699 }
1700}