1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use snafu::{ensure, OptionExt, ResultExt};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, SequenceNumber};
41use table::predicate::Predicate;
42
43use crate::error;
44use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::builder::{FieldBuilder, StringBuilder};
47use crate::memtable::bulk::part::BulkPart;
48use crate::memtable::key_values::KeyValue;
49use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
50use crate::memtable::stats::WriteMetrics;
51use crate::memtable::{
52 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
53 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
54 PredicateGroup,
55};
56use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
57use crate::read::dedup::LastNonNullIter;
58use crate::read::{Batch, BatchBuilder, BatchColumn};
59use crate::region::options::MergeMode;
60use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
61
62const INITIAL_BUILDER_CAPACITY: usize = 1024 * 8;
64
65const BUILDER_CAPACITY: usize = 512;
67
68#[derive(Debug, Default)]
70pub struct TimeSeriesMemtableBuilder {
71 write_buffer_manager: Option<WriteBufferManagerRef>,
72 dedup: bool,
73 merge_mode: MergeMode,
74}
75
76impl TimeSeriesMemtableBuilder {
77 pub fn new(
79 write_buffer_manager: Option<WriteBufferManagerRef>,
80 dedup: bool,
81 merge_mode: MergeMode,
82 ) -> Self {
83 Self {
84 write_buffer_manager,
85 dedup,
86 merge_mode,
87 }
88 }
89}
90
91impl MemtableBuilder for TimeSeriesMemtableBuilder {
92 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
93 if metadata.primary_key.is_empty() {
94 Arc::new(SimpleBulkMemtable::new(
95 id,
96 metadata.clone(),
97 self.write_buffer_manager.clone(),
98 self.dedup,
99 self.merge_mode,
100 ))
101 } else {
102 Arc::new(TimeSeriesMemtable::new(
103 metadata.clone(),
104 id,
105 self.write_buffer_manager.clone(),
106 self.dedup,
107 self.merge_mode,
108 ))
109 }
110 }
111}
112
113pub struct TimeSeriesMemtable {
115 id: MemtableId,
116 region_metadata: RegionMetadataRef,
117 row_codec: Arc<DensePrimaryKeyCodec>,
118 series_set: SeriesSet,
119 alloc_tracker: AllocTracker,
120 max_timestamp: AtomicI64,
121 min_timestamp: AtomicI64,
122 max_sequence: AtomicU64,
123 dedup: bool,
124 merge_mode: MergeMode,
125 num_rows: AtomicUsize,
127}
128
129impl TimeSeriesMemtable {
130 pub fn new(
131 region_metadata: RegionMetadataRef,
132 id: MemtableId,
133 write_buffer_manager: Option<WriteBufferManagerRef>,
134 dedup: bool,
135 merge_mode: MergeMode,
136 ) -> Self {
137 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
138 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
139 let dedup = if merge_mode == MergeMode::LastNonNull {
140 false
141 } else {
142 dedup
143 };
144 Self {
145 id,
146 region_metadata,
147 series_set,
148 row_codec,
149 alloc_tracker: AllocTracker::new(write_buffer_manager),
150 max_timestamp: AtomicI64::new(i64::MIN),
151 min_timestamp: AtomicI64::new(i64::MAX),
152 max_sequence: AtomicU64::new(0),
153 dedup,
154 merge_mode,
155 num_rows: Default::default(),
156 }
157 }
158
159 fn update_stats(&self, stats: WriteMetrics) {
161 self.alloc_tracker
162 .on_allocation(stats.key_bytes + stats.value_bytes);
163 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
164 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
165 self.max_sequence
166 .fetch_max(stats.max_sequence, Ordering::SeqCst);
167 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
168 }
169
170 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
171 ensure!(
172 self.row_codec.num_fields() == kv.num_primary_keys(),
173 PrimaryKeyLengthMismatchSnafu {
174 expect: self.row_codec.num_fields(),
175 actual: kv.num_primary_keys(),
176 }
177 );
178
179 let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
180
181 let (key_allocated, value_allocated) =
182 self.series_set.push_to_series(primary_key_encoded, &kv);
183 stats.key_bytes += key_allocated;
184 stats.value_bytes += value_allocated;
185
186 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
188 stats.min_ts = stats.min_ts.min(ts);
189 stats.max_ts = stats.max_ts.max(ts);
190 Ok(())
191 }
192}
193
194impl Debug for TimeSeriesMemtable {
195 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196 f.debug_struct("TimeSeriesMemtable").finish()
197 }
198}
199
200impl Memtable for TimeSeriesMemtable {
201 fn id(&self) -> MemtableId {
202 self.id
203 }
204
205 fn write(&self, kvs: &KeyValues) -> Result<()> {
206 if kvs.is_empty() {
207 return Ok(());
208 }
209
210 let mut local_stats = WriteMetrics::default();
211
212 for kv in kvs.iter() {
213 self.write_key_value(kv, &mut local_stats)?;
214 }
215 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
216 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
217 local_stats.max_sequence = kvs.max_sequence();
218 local_stats.num_rows = kvs.num_rows();
219 self.update_stats(local_stats);
223 Ok(())
224 }
225
226 fn write_one(&self, key_value: KeyValue) -> Result<()> {
227 let mut metrics = WriteMetrics::default();
228 let res = self.write_key_value(key_value, &mut metrics);
229 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
230 metrics.max_sequence = key_value.sequence();
231 metrics.num_rows = 1;
232
233 if res.is_ok() {
234 self.update_stats(metrics);
235 }
236 res
237 }
238
239 fn write_bulk(&self, part: BulkPart) -> Result<()> {
240 let mutation = part.to_mutation(&self.region_metadata)?;
242 let mut metrics = WriteMetrics::default();
243 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
244 for kv in key_values.iter() {
245 self.write_key_value(kv, &mut metrics)?
246 }
247 }
248
249 metrics.max_sequence = part.sequence;
250 metrics.max_ts = part.max_ts;
251 metrics.min_ts = part.min_ts;
252 metrics.num_rows = part.num_rows;
253 self.update_stats(metrics);
254 Ok(())
255 }
256
257 fn iter(
258 &self,
259 projection: Option<&[ColumnId]>,
260 filters: Option<Predicate>,
261 sequence: Option<SequenceNumber>,
262 ) -> Result<BoxedBatchIterator> {
263 let projection = if let Some(projection) = projection {
264 projection.iter().copied().collect()
265 } else {
266 self.region_metadata
267 .field_columns()
268 .map(|c| c.column_id)
269 .collect()
270 };
271
272 let iter = self
273 .series_set
274 .iter_series(projection, filters, self.dedup, sequence)?;
275
276 if self.merge_mode == MergeMode::LastNonNull {
277 let iter = LastNonNullIter::new(iter);
278 Ok(Box::new(iter))
279 } else {
280 Ok(Box::new(iter))
281 }
282 }
283
284 fn ranges(
285 &self,
286 projection: Option<&[ColumnId]>,
287 predicate: PredicateGroup,
288 sequence: Option<SequenceNumber>,
289 ) -> Result<MemtableRanges> {
290 let projection = if let Some(projection) = projection {
291 projection.iter().copied().collect()
292 } else {
293 self.region_metadata
294 .field_columns()
295 .map(|c| c.column_id)
296 .collect()
297 };
298 let builder = Box::new(TimeSeriesIterBuilder {
299 series_set: self.series_set.clone(),
300 projection,
301 predicate: predicate.predicate().cloned(),
302 dedup: self.dedup,
303 merge_mode: self.merge_mode,
304 sequence,
305 });
306 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
307
308 Ok(MemtableRanges {
309 ranges: [(0, MemtableRange::new(context))].into(),
310 stats: self.stats(),
311 })
312 }
313
314 fn is_empty(&self) -> bool {
315 self.series_set.series.read().unwrap().is_empty()
316 }
317
318 fn freeze(&self) -> Result<()> {
319 self.alloc_tracker.done_allocating();
320
321 Ok(())
322 }
323
324 fn stats(&self) -> MemtableStats {
325 let estimated_bytes = self.alloc_tracker.bytes_allocated();
326
327 if estimated_bytes == 0 {
328 return MemtableStats {
330 estimated_bytes,
331 time_range: None,
332 num_rows: 0,
333 num_ranges: 0,
334 max_sequence: 0,
335 };
336 }
337 let ts_type = self
338 .region_metadata
339 .time_index_column()
340 .column_schema
341 .data_type
342 .clone()
343 .as_timestamp()
344 .expect("Timestamp column must have timestamp type");
345 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
346 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
347 MemtableStats {
348 estimated_bytes,
349 time_range: Some((min_timestamp, max_timestamp)),
350 num_rows: self.num_rows.load(Ordering::Relaxed),
351 num_ranges: 1,
352 max_sequence: self.max_sequence.load(Ordering::Relaxed),
353 }
354 }
355
356 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
357 Arc::new(TimeSeriesMemtable::new(
358 metadata.clone(),
359 id,
360 self.alloc_tracker.write_buffer_manager(),
361 self.dedup,
362 self.merge_mode,
363 ))
364 }
365}
366
367type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
368
369#[derive(Clone)]
370pub(crate) struct SeriesSet {
371 pub(crate) region_metadata: RegionMetadataRef,
372 pub(crate) series: Arc<SeriesRwLockMap>,
373 pub(crate) codec: Arc<DensePrimaryKeyCodec>,
374}
375
376impl SeriesSet {
377 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
378 Self {
379 region_metadata,
380 series: Default::default(),
381 codec,
382 }
383 }
384}
385
386impl SeriesSet {
387 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
389 if let Some(series) = self.series.read().unwrap().get(&primary_key) {
390 let value_allocated = series.write().unwrap().push(
391 kv.timestamp(),
392 kv.sequence(),
393 kv.op_type(),
394 kv.fields(),
395 );
396 return (0, value_allocated);
397 };
398
399 let mut indices = self.series.write().unwrap();
400 match indices.entry(primary_key) {
401 Entry::Vacant(v) => {
402 let key_len = v.key().len();
403 let mut series = Series::new(&self.region_metadata);
404 let value_allocated =
405 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
406 v.insert(Arc::new(RwLock::new(series)));
407 (key_len, value_allocated)
408 }
409 Entry::Occupied(v) => {
411 let value_allocated = v.get().write().unwrap().push(
412 kv.timestamp(),
413 kv.sequence(),
414 kv.op_type(),
415 kv.fields(),
416 );
417 (0, value_allocated)
418 }
419 }
420 }
421
422 #[cfg(test)]
423 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
424 self.series.read().unwrap().get(primary_key).cloned()
425 }
426
427 fn iter_series(
429 &self,
430 projection: HashSet<ColumnId>,
431 predicate: Option<Predicate>,
432 dedup: bool,
433 sequence: Option<SequenceNumber>,
434 ) -> Result<Iter> {
435 let primary_key_schema = primary_key_schema(&self.region_metadata);
436 let primary_key_datatypes = self
437 .region_metadata
438 .primary_key_columns()
439 .map(|pk| pk.column_schema.data_type.clone())
440 .collect();
441
442 Iter::try_new(
443 self.region_metadata.clone(),
444 self.series.clone(),
445 projection,
446 predicate,
447 primary_key_schema,
448 primary_key_datatypes,
449 self.codec.clone(),
450 dedup,
451 sequence,
452 )
453 }
454}
455
456pub(crate) fn primary_key_schema(
459 region_metadata: &RegionMetadataRef,
460) -> arrow::datatypes::SchemaRef {
461 let fields = region_metadata
462 .primary_key_columns()
463 .map(|pk| {
464 arrow::datatypes::Field::new(
465 pk.column_schema.name.clone(),
466 pk.column_schema.data_type.as_arrow_type(),
467 pk.column_schema.is_nullable(),
468 )
469 })
470 .collect::<Vec<_>>();
471 Arc::new(arrow::datatypes::Schema::new(fields))
472}
473
474#[derive(Debug, Default)]
476struct Metrics {
477 total_series: usize,
479 num_pruned_series: usize,
481 num_rows: usize,
483 num_batches: usize,
485 scan_cost: Duration,
487}
488
489struct Iter {
490 metadata: RegionMetadataRef,
491 series: Arc<SeriesRwLockMap>,
492 projection: HashSet<ColumnId>,
493 last_key: Option<Vec<u8>>,
494 predicate: Vec<SimpleFilterEvaluator>,
495 pk_schema: arrow::datatypes::SchemaRef,
496 pk_datatypes: Vec<ConcreteDataType>,
497 codec: Arc<DensePrimaryKeyCodec>,
498 dedup: bool,
499 sequence: Option<SequenceNumber>,
500 metrics: Metrics,
501}
502
503impl Iter {
504 #[allow(clippy::too_many_arguments)]
505 pub(crate) fn try_new(
506 metadata: RegionMetadataRef,
507 series: Arc<SeriesRwLockMap>,
508 projection: HashSet<ColumnId>,
509 predicate: Option<Predicate>,
510 pk_schema: arrow::datatypes::SchemaRef,
511 pk_datatypes: Vec<ConcreteDataType>,
512 codec: Arc<DensePrimaryKeyCodec>,
513 dedup: bool,
514 sequence: Option<SequenceNumber>,
515 ) -> Result<Self> {
516 let predicate = predicate
517 .map(|predicate| {
518 predicate
519 .exprs()
520 .iter()
521 .filter_map(SimpleFilterEvaluator::try_new)
522 .collect::<Vec<_>>()
523 })
524 .unwrap_or_default();
525 Ok(Self {
526 metadata,
527 series,
528 projection,
529 last_key: None,
530 predicate,
531 pk_schema,
532 pk_datatypes,
533 codec,
534 dedup,
535 sequence,
536 metrics: Metrics::default(),
537 })
538 }
539}
540
541impl Drop for Iter {
542 fn drop(&mut self) {
543 debug!(
544 "Iter {} time series memtable, metrics: {:?}",
545 self.metadata.region_id, self.metrics
546 );
547
548 READ_ROWS_TOTAL
549 .with_label_values(&["time_series_memtable"])
550 .inc_by(self.metrics.num_rows as u64);
551 READ_STAGE_ELAPSED
552 .with_label_values(&["scan_memtable"])
553 .observe(self.metrics.scan_cost.as_secs_f64());
554 }
555}
556
557impl Iterator for Iter {
558 type Item = Result<Batch>;
559
560 fn next(&mut self) -> Option<Self::Item> {
561 let start = Instant::now();
562 let map = self.series.read().unwrap();
563 let range = match &self.last_key {
564 None => map.range::<Vec<u8>, _>(..),
565 Some(last_key) => {
566 map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
567 }
568 };
569
570 for (primary_key, series) in range {
572 self.metrics.total_series += 1;
573
574 let mut series = series.write().unwrap();
575 if !self.predicate.is_empty()
576 && !prune_primary_key(
577 &self.codec,
578 primary_key.as_slice(),
579 &mut series,
580 &self.pk_datatypes,
581 self.pk_schema.clone(),
582 &self.predicate,
583 )
584 {
585 self.metrics.num_pruned_series += 1;
587 continue;
588 }
589 self.last_key = Some(primary_key.clone());
590
591 let values = series.compact(&self.metadata);
592 let batch = values.and_then(|v| {
593 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
594 });
595
596 self.metrics.num_batches += 1;
598 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
599 self.metrics.scan_cost += start.elapsed();
600
601 let mut batch = batch;
602 batch = batch.and_then(|mut batch| {
603 batch.filter_by_sequence(self.sequence)?;
604 Ok(batch)
605 });
606 return Some(batch);
607 }
608 self.metrics.scan_cost += start.elapsed();
609
610 None
611 }
612}
613
614fn prune_primary_key(
615 codec: &Arc<DensePrimaryKeyCodec>,
616 pk: &[u8],
617 series: &mut Series,
618 datatypes: &[ConcreteDataType],
619 pk_schema: arrow::datatypes::SchemaRef,
620 predicates: &[SimpleFilterEvaluator],
621) -> bool {
622 if pk_schema.fields().is_empty() {
624 return true;
625 }
626
627 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
629 pk_values
630 } else {
631 let pk_values = codec.decode_dense_without_column_id(pk);
632 if let Err(e) = pk_values {
633 error!(e; "Failed to decode primary key");
634 return true;
635 }
636 series.update_pk_cache(pk_values.unwrap());
637 series.pk_cache.as_ref().unwrap()
638 };
639
640 let mut result = true;
642 for predicate in predicates {
643 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
645 continue;
646 };
647 let scalar_value = pk_values[index]
649 .try_to_scalar_value(&datatypes[index])
650 .unwrap();
651 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
652 }
653
654 result
655}
656
657pub(crate) struct Series {
659 pk_cache: Option<Vec<Value>>,
660 active: ValueBuilder,
661 frozen: Vec<Values>,
662 region_metadata: RegionMetadataRef,
663}
664
665impl Series {
666 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
667 Self {
668 pk_cache: None,
669 active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
670 frozen: vec![],
671 region_metadata: region_metadata.clone(),
672 }
673 }
674
675 pub fn is_empty(&self) -> bool {
676 self.active.len() == 0 && self.frozen.is_empty()
677 }
678
679 pub(crate) fn push<'a>(
681 &mut self,
682 ts: ValueRef<'a>,
683 sequence: u64,
684 op_type: OpType,
685 values: impl Iterator<Item = ValueRef<'a>>,
686 ) -> usize {
687 if self.active.len() + 10 > BUILDER_CAPACITY {
689 let region_metadata = self.region_metadata.clone();
690 self.freeze(®ion_metadata);
691 }
692 self.active.push(ts, sequence, op_type as u8, values)
693 }
694
695 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
696 self.pk_cache = Some(pk_values);
697 }
698
699 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
701 if self.active.len() != 0 {
702 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
703 std::mem::swap(&mut self.active, &mut builder);
704 self.frozen.push(Values::from(builder));
705 }
706 }
707
708 pub(crate) fn extend(
709 &mut self,
710 ts_v: VectorRef,
711 op_type_v: u8,
712 sequence_v: u64,
713 fields: impl Iterator<Item = VectorRef>,
714 ) -> Result<()> {
715 self.active.extend(ts_v, op_type_v, sequence_v, fields)
716 }
717
718 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
721 self.freeze(region_metadata);
722
723 let frozen = &self.frozen;
724
725 debug_assert!(!frozen.is_empty());
727
728 if frozen.len() > 1 {
729 let column_size = frozen[0].fields.len() + 3;
733
734 if cfg!(debug_assertions) {
735 debug_assert!(frozen
736 .iter()
737 .zip(frozen.iter().skip(1))
738 .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
739 }
740
741 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
742 let concatenated = (0..column_size)
743 .map(|i| {
744 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
745 arrow::compute::concat(&to_concat)
746 })
747 .collect::<std::result::Result<Vec<_>, _>>()
748 .context(ComputeArrowSnafu)?;
749
750 debug_assert_eq!(concatenated.len(), column_size);
751 let values = Values::from_columns(&concatenated)?;
752 self.frozen = vec![values];
753 };
754 Ok(&self.frozen[0])
755 }
756}
757
758struct ValueBuilder {
760 timestamp: Vec<i64>,
761 timestamp_type: ConcreteDataType,
762 sequence: Vec<u64>,
763 op_type: Vec<u8>,
764 fields: Vec<Option<FieldBuilder>>,
765 field_types: Vec<ConcreteDataType>,
766}
767
768impl ValueBuilder {
769 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
770 let timestamp_type = region_metadata
771 .time_index_column()
772 .column_schema
773 .data_type
774 .clone();
775 let sequence = Vec::with_capacity(capacity);
776 let op_type = Vec::with_capacity(capacity);
777
778 let field_types = region_metadata
779 .field_columns()
780 .map(|c| c.column_schema.data_type.clone())
781 .collect::<Vec<_>>();
782 let fields = (0..field_types.len()).map(|_| None).collect();
783
784 Self {
785 timestamp: Vec::with_capacity(capacity),
786 timestamp_type,
787 sequence,
788 op_type,
789 fields,
790 field_types,
791 }
792 }
793
794 fn push<'a>(
800 &mut self,
801 ts: ValueRef,
802 sequence: u64,
803 op_type: u8,
804 fields: impl Iterator<Item = ValueRef<'a>>,
805 ) -> usize {
806 #[cfg(debug_assertions)]
807 let fields = {
808 let field_vec = fields.collect::<Vec<_>>();
809 debug_assert_eq!(field_vec.len(), self.fields.len());
810 field_vec.into_iter()
811 };
812
813 self.timestamp
814 .push(ts.as_timestamp().unwrap().unwrap().value());
815 self.sequence.push(sequence);
816 self.op_type.push(op_type);
817 let num_rows = self.timestamp.len();
818 let mut size = 0;
819 for (idx, field_value) in fields.enumerate() {
820 size += field_value.data_size();
821 if !field_value.is_null() || self.fields[idx].is_some() {
822 if let Some(field) = self.fields[idx].as_mut() {
823 let _ = field.push(field_value);
824 } else {
825 let mut mutable_vector =
826 if let ConcreteDataType::String(_) = &self.field_types[idx] {
827 FieldBuilder::String(StringBuilder::with_capacity(256, 4096))
828 } else {
829 FieldBuilder::Other(
830 self.field_types[idx]
831 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
832 )
833 };
834 mutable_vector.push_nulls(num_rows - 1);
835 let _ = mutable_vector.push(field_value);
836 self.fields[idx] = Some(mutable_vector);
837 }
838 }
839 }
840
841 size
842 }
843
844 pub(crate) fn extend(
845 &mut self,
846 ts_v: VectorRef,
847 op_type: u8,
848 sequence: u64,
849 fields: impl Iterator<Item = VectorRef>,
850 ) -> error::Result<()> {
851 let num_rows_before = self.timestamp.len();
852 let num_rows_to_write = ts_v.len();
853 self.timestamp.reserve(num_rows_to_write);
854 match self.timestamp_type {
855 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
856 self.timestamp.extend(
857 ts_v.as_any()
858 .downcast_ref::<TimestampSecondVector>()
859 .unwrap()
860 .iter_data()
861 .map(|v| v.unwrap().0.value()),
862 );
863 }
864 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
865 self.timestamp.extend(
866 ts_v.as_any()
867 .downcast_ref::<TimestampMillisecondVector>()
868 .unwrap()
869 .iter_data()
870 .map(|v| v.unwrap().0.value()),
871 );
872 }
873 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
874 self.timestamp.extend(
875 ts_v.as_any()
876 .downcast_ref::<TimestampMicrosecondVector>()
877 .unwrap()
878 .iter_data()
879 .map(|v| v.unwrap().0.value()),
880 );
881 }
882 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
883 self.timestamp.extend(
884 ts_v.as_any()
885 .downcast_ref::<TimestampNanosecondVector>()
886 .unwrap()
887 .iter_data()
888 .map(|v| v.unwrap().0.value()),
889 );
890 }
891 _ => unreachable!(),
892 };
893
894 self.op_type.reserve(num_rows_to_write);
895 self.op_type
896 .extend(iter::repeat_n(op_type, num_rows_to_write));
897 self.sequence.reserve(num_rows_to_write);
898 self.sequence
899 .extend(iter::repeat_n(sequence, num_rows_to_write));
900
901 for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
902 let builder = field_dest.get_or_insert_with(|| {
903 let mut field_builder =
904 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
905 field_builder.push_nulls(num_rows_before);
906 field_builder
907 });
908 match builder {
909 FieldBuilder::String(builder) => {
910 let array = field_src.to_arrow_array();
911 let string_array =
912 array
913 .as_any()
914 .downcast_ref::<StringArray>()
915 .with_context(|| error::InvalidBatchSnafu {
916 reason: format!(
917 "Field type mismatch, expecting String, given: {}",
918 field_src.data_type()
919 ),
920 })?;
921 builder.append_array(string_array);
922 }
923 FieldBuilder::Other(builder) => {
924 let len = field_src.len();
925 builder
926 .extend_slice_of(&*field_src, 0, len)
927 .context(error::ComputeVectorSnafu)?;
928 }
929 }
930 }
931 Ok(())
932 }
933
934 fn len(&self) -> usize {
936 let sequence_len = self.sequence.len();
937 debug_assert_eq!(sequence_len, self.op_type.len());
938 debug_assert_eq!(sequence_len, self.timestamp.len());
939 sequence_len
940 }
941}
942
943#[derive(Clone)]
945pub(crate) struct Values {
946 timestamp: VectorRef,
947 sequence: Arc<UInt64Vector>,
948 op_type: Arc<UInt8Vector>,
949 fields: Vec<VectorRef>,
950}
951
952impl Values {
953 pub fn to_batch(
956 &self,
957 primary_key: &[u8],
958 metadata: &RegionMetadataRef,
959 projection: &HashSet<ColumnId>,
960 dedup: bool,
961 ) -> Result<Batch> {
962 let builder = BatchBuilder::with_required_columns(
963 primary_key.to_vec(),
964 self.timestamp.clone(),
965 self.sequence.clone(),
966 self.op_type.clone(),
967 );
968
969 let fields = metadata
970 .field_columns()
971 .zip(self.fields.iter())
972 .filter_map(|(c, f)| {
973 projection.get(&c.column_id).map(|c| BatchColumn {
974 column_id: *c,
975 data: f.clone(),
976 })
977 })
978 .collect();
979
980 let mut batch = builder.with_fields(fields).build()?;
981 batch.sort(dedup)?;
982 Ok(batch)
983 }
984
985 fn columns(&self) -> Vec<ArrayRef> {
987 let mut res = Vec::with_capacity(3 + self.fields.len());
988 res.push(self.timestamp.to_arrow_array());
989 res.push(self.sequence.to_arrow_array());
990 res.push(self.op_type.to_arrow_array());
991 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
992 res
993 }
994
995 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
997 debug_assert!(cols.len() >= 3);
998 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
999 let sequence =
1000 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1001 let op_type =
1002 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1003 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1004
1005 Ok(Self {
1006 timestamp,
1007 sequence,
1008 op_type,
1009 fields,
1010 })
1011 }
1012}
1013
1014impl From<ValueBuilder> for Values {
1015 fn from(mut value: ValueBuilder) -> Self {
1016 let num_rows = value.len();
1017 let fields = value
1018 .fields
1019 .iter_mut()
1020 .enumerate()
1021 .map(|(i, v)| {
1022 if let Some(v) = v {
1023 v.finish()
1024 } else {
1025 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1026 single_null.push_nulls(num_rows);
1027 single_null.to_vector()
1028 }
1029 })
1030 .collect::<Vec<_>>();
1031
1032 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1033 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1034 let timestamp: VectorRef = match value.timestamp_type {
1035 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1036 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1037 }
1038 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1039 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1040 }
1041 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1042 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1043 }
1044 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1045 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1046 }
1047 _ => unreachable!(),
1048 };
1049
1050 if cfg!(debug_assertions) {
1051 debug_assert_eq!(timestamp.len(), sequence.len());
1052 debug_assert_eq!(timestamp.len(), op_type.len());
1053 for field in &fields {
1054 debug_assert_eq!(timestamp.len(), field.len());
1055 }
1056 }
1057
1058 Self {
1059 timestamp,
1060 sequence,
1061 op_type,
1062 fields,
1063 }
1064 }
1065}
1066
1067struct TimeSeriesIterBuilder {
1068 series_set: SeriesSet,
1069 projection: HashSet<ColumnId>,
1070 predicate: Option<Predicate>,
1071 dedup: bool,
1072 sequence: Option<SequenceNumber>,
1073 merge_mode: MergeMode,
1074}
1075
1076impl IterBuilder for TimeSeriesIterBuilder {
1077 fn build(&self) -> Result<BoxedBatchIterator> {
1078 let iter = self.series_set.iter_series(
1079 self.projection.clone(),
1080 self.predicate.clone(),
1081 self.dedup,
1082 self.sequence,
1083 )?;
1084
1085 if self.merge_mode == MergeMode::LastNonNull {
1086 let iter = LastNonNullIter::new(iter);
1087 Ok(Box::new(iter))
1088 } else {
1089 Ok(Box::new(iter))
1090 }
1091 }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use std::collections::{HashMap, HashSet};
1097
1098 use api::helper::ColumnDataTypeWrapper;
1099 use api::v1::value::ValueData;
1100 use api::v1::{Mutation, Row, Rows, SemanticType};
1101 use common_time::Timestamp;
1102 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1103 use datatypes::schema::ColumnSchema;
1104 use datatypes::value::{OrderedFloat, Value};
1105 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1106 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1107 use store_api::storage::RegionId;
1108
1109 use super::*;
1110 use crate::row_converter::SortField;
1111 use crate::test_util::column_metadata_to_column_schema;
1112
1113 fn schema_for_test() -> RegionMetadataRef {
1114 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1115 builder
1116 .push_column_metadata(ColumnMetadata {
1117 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1118 semantic_type: SemanticType::Tag,
1119 column_id: 0,
1120 })
1121 .push_column_metadata(ColumnMetadata {
1122 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1123 semantic_type: SemanticType::Tag,
1124 column_id: 1,
1125 })
1126 .push_column_metadata(ColumnMetadata {
1127 column_schema: ColumnSchema::new(
1128 "ts",
1129 ConcreteDataType::timestamp_millisecond_datatype(),
1130 false,
1131 ),
1132 semantic_type: SemanticType::Timestamp,
1133 column_id: 2,
1134 })
1135 .push_column_metadata(ColumnMetadata {
1136 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1137 semantic_type: SemanticType::Field,
1138 column_id: 3,
1139 })
1140 .push_column_metadata(ColumnMetadata {
1141 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1142 semantic_type: SemanticType::Field,
1143 column_id: 4,
1144 })
1145 .primary_key(vec![0, 1]);
1146 let region_metadata = builder.build().unwrap();
1147 Arc::new(region_metadata)
1148 }
1149
1150 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1151 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1152 }
1153
1154 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1155 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1156 }
1157
1158 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1159 let ts = values
1160 .timestamp
1161 .as_any()
1162 .downcast_ref::<TimestampMillisecondVector>()
1163 .unwrap();
1164
1165 let v0 = values.fields[0]
1166 .as_any()
1167 .downcast_ref::<Int64Vector>()
1168 .unwrap();
1169 let v1 = values.fields[1]
1170 .as_any()
1171 .downcast_ref::<Float64Vector>()
1172 .unwrap();
1173 let read = ts
1174 .iter_data()
1175 .zip(values.sequence.iter_data())
1176 .zip(values.op_type.iter_data())
1177 .zip(v0.iter_data())
1178 .zip(v1.iter_data())
1179 .map(|((((ts, sequence), op_type), v0), v1)| {
1180 (
1181 ts.unwrap().0.value(),
1182 sequence.unwrap(),
1183 op_type.unwrap(),
1184 v0.unwrap(),
1185 v1.unwrap(),
1186 )
1187 })
1188 .collect::<Vec<_>>();
1189 assert_eq!(expect, &read);
1190 }
1191
1192 #[test]
1193 fn test_series() {
1194 let region_metadata = schema_for_test();
1195 let mut series = Series::new(®ion_metadata);
1196 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1197 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1198 assert_eq!(2, series.active.timestamp.len());
1199 assert_eq!(0, series.frozen.len());
1200
1201 let values = series.compact(®ion_metadata).unwrap();
1202 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1203 assert_eq!(0, series.active.timestamp.len());
1204 assert_eq!(1, series.frozen.len());
1205 }
1206
1207 #[test]
1208 fn test_series_with_nulls() {
1209 let region_metadata = schema_for_test();
1210 let mut series = Series::new(®ion_metadata);
1211 series.push(
1214 ts_value_ref(1),
1215 0,
1216 OpType::Put,
1217 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1218 );
1219 series.push(
1220 ts_value_ref(1),
1221 0,
1222 OpType::Put,
1223 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1224 );
1225 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1226 series.push(
1227 ts_value_ref(1),
1228 3,
1229 OpType::Put,
1230 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1231 );
1232 assert_eq!(4, series.active.timestamp.len());
1233 assert_eq!(0, series.frozen.len());
1234
1235 let values = series.compact(®ion_metadata).unwrap();
1236 assert_eq!(values.fields[0].null_count(), 1);
1237 assert_eq!(values.fields[1].null_count(), 3);
1238 assert_eq!(0, series.active.timestamp.len());
1239 assert_eq!(1, series.frozen.len());
1240 }
1241
1242 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1243 let ts_len = batch.timestamps().len();
1244 assert_eq!(batch.sequences().len(), ts_len);
1245 assert_eq!(batch.op_types().len(), ts_len);
1246 for f in batch.fields() {
1247 assert_eq!(f.data.len(), ts_len);
1248 }
1249
1250 let mut rows = vec![];
1251 for idx in 0..ts_len {
1252 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1253 row.push(batch.timestamps().get(idx));
1254 row.push(batch.sequences().get(idx));
1255 row.push(batch.op_types().get(idx));
1256 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1257 rows.push(row);
1258 }
1259
1260 assert_eq!(expect.len(), rows.len());
1261 for (idx, row) in rows.iter().enumerate() {
1262 assert_eq!(&expect[idx], row);
1263 }
1264 }
1265
1266 #[test]
1267 fn test_values_sort() {
1268 let schema = schema_for_test();
1269 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1270 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1271 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1272
1273 let fields = vec![
1274 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1275 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1276 ];
1277 let values = Values {
1278 timestamp: timestamp as Arc<_>,
1279 sequence,
1280 op_type,
1281 fields,
1282 };
1283
1284 let batch = values
1285 .to_batch(
1286 b"test",
1287 &schema,
1288 &[0, 1, 2, 3, 4].into_iter().collect(),
1289 true,
1290 )
1291 .unwrap();
1292 check_value(
1293 &batch,
1294 vec![
1295 vec![
1296 Value::Timestamp(Timestamp::new_millisecond(1)),
1297 Value::UInt64(1),
1298 Value::UInt8(1),
1299 Value::Int64(4),
1300 Value::Float64(OrderedFloat(1.1)),
1301 ],
1302 vec![
1303 Value::Timestamp(Timestamp::new_millisecond(2)),
1304 Value::UInt64(1),
1305 Value::UInt8(1),
1306 Value::Int64(3),
1307 Value::Float64(OrderedFloat(2.1)),
1308 ],
1309 vec![
1310 Value::Timestamp(Timestamp::new_millisecond(3)),
1311 Value::UInt64(2),
1312 Value::UInt8(0),
1313 Value::Int64(2),
1314 Value::Float64(OrderedFloat(4.2)),
1315 ],
1316 vec![
1317 Value::Timestamp(Timestamp::new_millisecond(4)),
1318 Value::UInt64(1),
1319 Value::UInt8(1),
1320 Value::Int64(1),
1321 Value::Float64(OrderedFloat(3.3)),
1322 ],
1323 ],
1324 )
1325 }
1326
1327 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1328 let column_schema = schema
1329 .column_metadatas
1330 .iter()
1331 .map(|c| api::v1::ColumnSchema {
1332 column_name: c.column_schema.name.clone(),
1333 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1334 .unwrap()
1335 .datatype() as i32,
1336 semantic_type: c.semantic_type as i32,
1337 ..Default::default()
1338 })
1339 .collect();
1340
1341 let rows = (0..len)
1342 .map(|i| Row {
1343 values: vec![
1344 api::v1::Value {
1345 value_data: Some(ValueData::StringValue(k0.clone())),
1346 },
1347 api::v1::Value {
1348 value_data: Some(ValueData::I64Value(k1)),
1349 },
1350 api::v1::Value {
1351 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1352 },
1353 api::v1::Value {
1354 value_data: Some(ValueData::I64Value(i as i64)),
1355 },
1356 api::v1::Value {
1357 value_data: Some(ValueData::F64Value(i as f64)),
1358 },
1359 ],
1360 })
1361 .collect();
1362 let mutation = api::v1::Mutation {
1363 op_type: 1,
1364 sequence: 0,
1365 rows: Some(Rows {
1366 schema: column_schema,
1367 rows,
1368 }),
1369 write_hint: None,
1370 };
1371 KeyValues::new(schema.as_ref(), mutation).unwrap()
1372 }
1373
1374 #[test]
1375 fn test_series_set_concurrency() {
1376 let schema = schema_for_test();
1377 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1378 schema
1379 .primary_key_columns()
1380 .map(|c| {
1381 (
1382 c.column_id,
1383 SortField::new(c.column_schema.data_type.clone()),
1384 )
1385 })
1386 .collect(),
1387 ));
1388 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1389
1390 let concurrency = 32;
1391 let pk_num = concurrency * 2;
1392 let mut handles = Vec::with_capacity(concurrency);
1393 for i in 0..concurrency {
1394 let set = set.clone();
1395 let schema = schema.clone();
1396 let column_schemas = schema
1397 .column_metadatas
1398 .iter()
1399 .map(column_metadata_to_column_schema)
1400 .collect::<Vec<_>>();
1401 let handle = std::thread::spawn(move || {
1402 for j in i * 100..(i + 1) * 100 {
1403 let pk = j % pk_num;
1404 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1405
1406 let kvs = KeyValues::new(
1407 &schema,
1408 Mutation {
1409 op_type: OpType::Put as i32,
1410 sequence: j as u64,
1411 rows: Some(Rows {
1412 schema: column_schemas.clone(),
1413 rows: vec![Row {
1414 values: vec![
1415 api::v1::Value {
1416 value_data: Some(ValueData::StringValue(format!(
1417 "{}",
1418 j
1419 ))),
1420 },
1421 api::v1::Value {
1422 value_data: Some(ValueData::I64Value(j as i64)),
1423 },
1424 api::v1::Value {
1425 value_data: Some(ValueData::TimestampMillisecondValue(
1426 j as i64,
1427 )),
1428 },
1429 api::v1::Value {
1430 value_data: Some(ValueData::I64Value(j as i64)),
1431 },
1432 api::v1::Value {
1433 value_data: Some(ValueData::F64Value(j as f64)),
1434 },
1435 ],
1436 }],
1437 }),
1438 write_hint: None,
1439 },
1440 )
1441 .unwrap();
1442 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1443 }
1444 });
1445 handles.push(handle);
1446 }
1447 for h in handles {
1448 h.join().unwrap();
1449 }
1450
1451 let mut timestamps = Vec::with_capacity(concurrency * 100);
1452 let mut sequences = Vec::with_capacity(concurrency * 100);
1453 let mut op_types = Vec::with_capacity(concurrency * 100);
1454 let mut v0 = Vec::with_capacity(concurrency * 100);
1455
1456 for i in 0..pk_num {
1457 let pk = format!("pk-{}", i).as_bytes().to_vec();
1458 let series = set.get_series(&pk).unwrap();
1459 let mut guard = series.write().unwrap();
1460 let values = guard.compact(&schema).unwrap();
1461 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1462 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1463 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1464 v0.extend(
1465 values
1466 .fields
1467 .first()
1468 .unwrap()
1469 .as_any()
1470 .downcast_ref::<Int64Vector>()
1471 .unwrap()
1472 .iter_data()
1473 .map(|v| v.unwrap()),
1474 );
1475 }
1476
1477 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1478 assert_eq!(
1479 expected_sequence,
1480 sequences.iter().copied().collect::<HashSet<_>>()
1481 );
1482
1483 op_types.iter().all(|op| *op == OpType::Put as u8);
1484 assert_eq!(
1485 expected_sequence,
1486 timestamps.iter().copied().collect::<HashSet<_>>()
1487 );
1488
1489 assert_eq!(timestamps, sequences);
1490 assert_eq!(v0, timestamps);
1491 }
1492
1493 #[test]
1494 fn test_memtable() {
1495 common_telemetry::init_default_ut_logging();
1496 check_memtable_dedup(true);
1497 check_memtable_dedup(false);
1498 }
1499
1500 fn check_memtable_dedup(dedup: bool) {
1501 let schema = schema_for_test();
1502 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1503 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1504 memtable.write(&kvs).unwrap();
1505 memtable.write(&kvs).unwrap();
1506
1507 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1508 for ts in kvs
1509 .iter()
1510 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1511 {
1512 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1513 }
1514
1515 let iter = memtable.iter(None, None, None).unwrap();
1516 let mut read = HashMap::new();
1517
1518 for ts in iter
1519 .flat_map(|batch| {
1520 batch
1521 .unwrap()
1522 .timestamps()
1523 .as_any()
1524 .downcast_ref::<TimestampMillisecondVector>()
1525 .unwrap()
1526 .iter_data()
1527 .collect::<Vec<_>>()
1528 .into_iter()
1529 })
1530 .map(|v| v.unwrap().0.value())
1531 {
1532 *read.entry(ts).or_default() += 1;
1533 }
1534 assert_eq!(expected_ts, read);
1535
1536 let stats = memtable.stats();
1537 assert!(stats.bytes_allocated() > 0);
1538 assert_eq!(
1539 Some((
1540 Timestamp::new_millisecond(0),
1541 Timestamp::new_millisecond(99)
1542 )),
1543 stats.time_range()
1544 );
1545 }
1546
1547 #[test]
1548 fn test_memtable_projection() {
1549 common_telemetry::init_default_ut_logging();
1550 let schema = schema_for_test();
1551 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1552 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1553 memtable.write(&kvs).unwrap();
1554
1555 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1556
1557 let mut v0_all = vec![];
1558
1559 for res in iter {
1560 let batch = res.unwrap();
1561 assert_eq!(1, batch.fields().len());
1562 let v0 = batch
1563 .fields()
1564 .first()
1565 .unwrap()
1566 .data
1567 .as_any()
1568 .downcast_ref::<Int64Vector>()
1569 .unwrap();
1570 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1571 }
1572 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1573 }
1574
1575 #[test]
1576 fn test_memtable_concurrent_write_read() {
1577 common_telemetry::init_default_ut_logging();
1578 let schema = schema_for_test();
1579 let memtable = Arc::new(TimeSeriesMemtable::new(
1580 schema.clone(),
1581 42,
1582 None,
1583 true,
1584 MergeMode::LastRow,
1585 ));
1586
1587 let num_writers = 10;
1589 let num_readers = 5;
1591 let series_per_writer = 100;
1593 let rows_per_series = 10;
1595 let total_series = num_writers * series_per_writer;
1597
1598 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1600
1601 let mut writer_handles = Vec::with_capacity(num_writers);
1603 for writer_id in 0..num_writers {
1604 let memtable = memtable.clone();
1605 let schema = schema.clone();
1606 let barrier = barrier.clone();
1607
1608 let handle = std::thread::spawn(move || {
1609 barrier.wait();
1611
1612 for series_id in 0..series_per_writer {
1614 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1615 let kvs =
1616 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1617 memtable.write(&kvs).unwrap();
1618 }
1619 });
1620
1621 writer_handles.push(handle);
1622 }
1623
1624 let mut reader_handles = Vec::with_capacity(num_readers);
1626 for _ in 0..num_readers {
1627 let memtable = memtable.clone();
1628 let barrier = barrier.clone();
1629
1630 let handle = std::thread::spawn(move || {
1631 barrier.wait();
1632
1633 for _ in 0..10 {
1634 let iter = memtable.iter(None, None, None).unwrap();
1635 for batch_result in iter {
1636 let _ = batch_result.unwrap();
1637 }
1638 }
1639 });
1640
1641 reader_handles.push(handle);
1642 }
1643
1644 barrier.wait();
1645
1646 for handle in writer_handles {
1647 handle.join().unwrap();
1648 }
1649 for handle in reader_handles {
1650 handle.join().unwrap();
1651 }
1652
1653 let iter = memtable.iter(None, None, None).unwrap();
1654 let mut series_count = 0;
1655 let mut row_count = 0;
1656
1657 for batch_result in iter {
1658 let batch = batch_result.unwrap();
1659 series_count += 1;
1660 row_count += batch.num_rows();
1661 }
1662 assert_eq!(total_series, series_count);
1663 assert_eq!(total_series * rows_per_series, row_count);
1664 }
1665}