1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56 MemtableStats, PredicateGroup,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115
116 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117 false
120 }
121}
122
123pub struct TimeSeriesMemtable {
125 id: MemtableId,
126 region_metadata: RegionMetadataRef,
127 row_codec: Arc<DensePrimaryKeyCodec>,
128 series_set: SeriesSet,
129 alloc_tracker: AllocTracker,
130 max_timestamp: AtomicI64,
131 min_timestamp: AtomicI64,
132 max_sequence: AtomicU64,
133 dedup: bool,
134 merge_mode: MergeMode,
135 num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140 pub fn new(
141 region_metadata: RegionMetadataRef,
142 id: MemtableId,
143 write_buffer_manager: Option<WriteBufferManagerRef>,
144 dedup: bool,
145 merge_mode: MergeMode,
146 ) -> Self {
147 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
148 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149 let dedup = if merge_mode == MergeMode::LastNonNull {
150 false
151 } else {
152 dedup
153 };
154 Self {
155 id,
156 region_metadata,
157 series_set,
158 row_codec,
159 alloc_tracker: AllocTracker::new(write_buffer_manager),
160 max_timestamp: AtomicI64::new(i64::MIN),
161 min_timestamp: AtomicI64::new(i64::MAX),
162 max_sequence: AtomicU64::new(0),
163 dedup,
164 merge_mode,
165 num_rows: Default::default(),
166 }
167 }
168
169 fn update_stats(&self, stats: WriteMetrics) {
171 self.alloc_tracker
172 .on_allocation(stats.key_bytes + stats.value_bytes);
173 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175 self.max_sequence
176 .fetch_max(stats.max_sequence, Ordering::SeqCst);
177 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178 }
179
180 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181 ensure!(
182 self.row_codec.num_fields() == kv.num_primary_keys(),
183 PrimaryKeyLengthMismatchSnafu {
184 expect: self.row_codec.num_fields(),
185 actual: kv.num_primary_keys(),
186 }
187 );
188
189 let primary_key_encoded = self
190 .row_codec
191 .encode(kv.primary_keys())
192 .context(EncodeSnafu)?;
193
194 let (key_allocated, value_allocated) =
195 self.series_set.push_to_series(primary_key_encoded, &kv);
196 stats.key_bytes += key_allocated;
197 stats.value_bytes += value_allocated;
198
199 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
201 stats.min_ts = stats.min_ts.min(ts);
202 stats.max_ts = stats.max_ts.max(ts);
203 Ok(())
204 }
205}
206
207impl Debug for TimeSeriesMemtable {
208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("TimeSeriesMemtable").finish()
210 }
211}
212
213impl Memtable for TimeSeriesMemtable {
214 fn id(&self) -> MemtableId {
215 self.id
216 }
217
218 fn write(&self, kvs: &KeyValues) -> Result<()> {
219 if kvs.is_empty() {
220 return Ok(());
221 }
222
223 let mut local_stats = WriteMetrics::default();
224
225 for kv in kvs.iter() {
226 self.write_key_value(kv, &mut local_stats)?;
227 }
228 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230 local_stats.max_sequence = kvs.max_sequence();
231 local_stats.num_rows = kvs.num_rows();
232 self.update_stats(local_stats);
236 Ok(())
237 }
238
239 fn write_one(&self, key_value: KeyValue) -> Result<()> {
240 let mut metrics = WriteMetrics::default();
241 let res = self.write_key_value(key_value, &mut metrics);
242 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243 metrics.max_sequence = key_value.sequence();
244 metrics.num_rows = 1;
245
246 if res.is_ok() {
247 self.update_stats(metrics);
248 }
249 res
250 }
251
252 fn write_bulk(&self, part: BulkPart) -> Result<()> {
253 let mutation = part.to_mutation(&self.region_metadata)?;
255 let mut metrics = WriteMetrics::default();
256 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257 for kv in key_values.iter() {
258 self.write_key_value(kv, &mut metrics)?
259 }
260 }
261
262 metrics.max_sequence = part.sequence;
263 metrics.max_ts = part.max_timestamp;
264 metrics.min_ts = part.min_timestamp;
265 metrics.num_rows = part.num_rows();
266 self.update_stats(metrics);
267 Ok(())
268 }
269
270 #[cfg(any(test, feature = "test"))]
271 fn iter(
272 &self,
273 projection: Option<&[ColumnId]>,
274 filters: Option<Predicate>,
275 sequence: Option<SequenceNumber>,
276 ) -> Result<BoxedBatchIterator> {
277 let projection = if let Some(projection) = projection {
278 projection.iter().copied().collect()
279 } else {
280 self.region_metadata
281 .field_columns()
282 .map(|c| c.column_id)
283 .collect()
284 };
285
286 let iter = self
287 .series_set
288 .iter_series(projection, filters, self.dedup, sequence, None)?;
289
290 if self.merge_mode == MergeMode::LastNonNull {
291 let iter = LastNonNullIter::new(iter);
292 Ok(Box::new(iter))
293 } else {
294 Ok(Box::new(iter))
295 }
296 }
297
298 fn ranges(
299 &self,
300 projection: Option<&[ColumnId]>,
301 predicate: PredicateGroup,
302 sequence: Option<SequenceNumber>,
303 _for_flush: bool,
304 ) -> Result<MemtableRanges> {
305 let projection = if let Some(projection) = projection {
306 projection.iter().copied().collect()
307 } else {
308 self.region_metadata
309 .field_columns()
310 .map(|c| c.column_id)
311 .collect()
312 };
313 let builder = Box::new(TimeSeriesIterBuilder {
314 series_set: self.series_set.clone(),
315 projection,
316 predicate: predicate.predicate().cloned(),
317 dedup: self.dedup,
318 merge_mode: self.merge_mode,
319 sequence,
320 });
321 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
322
323 let stats = self.stats();
324 Ok(MemtableRanges {
325 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
326 stats,
327 })
328 }
329
330 fn is_empty(&self) -> bool {
331 self.series_set.series.read().unwrap().0.is_empty()
332 }
333
334 fn freeze(&self) -> Result<()> {
335 self.alloc_tracker.done_allocating();
336
337 Ok(())
338 }
339
340 fn stats(&self) -> MemtableStats {
341 let estimated_bytes = self.alloc_tracker.bytes_allocated();
342
343 if estimated_bytes == 0 {
344 return MemtableStats {
346 estimated_bytes,
347 time_range: None,
348 num_rows: 0,
349 num_ranges: 0,
350 max_sequence: 0,
351 series_count: 0,
352 };
353 }
354 let ts_type = self
355 .region_metadata
356 .time_index_column()
357 .column_schema
358 .data_type
359 .clone()
360 .as_timestamp()
361 .expect("Timestamp column must have timestamp type");
362 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
363 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
364 let series_count = self.series_set.series.read().unwrap().0.len();
365 MemtableStats {
366 estimated_bytes,
367 time_range: Some((min_timestamp, max_timestamp)),
368 num_rows: self.num_rows.load(Ordering::Relaxed),
369 num_ranges: 1,
370 max_sequence: self.max_sequence.load(Ordering::Relaxed),
371 series_count,
372 }
373 }
374
375 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
376 Arc::new(TimeSeriesMemtable::new(
377 metadata.clone(),
378 id,
379 self.alloc_tracker.write_buffer_manager(),
380 self.dedup,
381 self.merge_mode,
382 ))
383 }
384}
385
386#[derive(Default)]
387struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
388
389impl Drop for SeriesMap {
390 fn drop(&mut self) {
391 let num_series = self.0.len();
392 let num_field_builders = self
393 .0
394 .values()
395 .map(|v| v.read().unwrap().active.num_field_builders())
396 .sum::<usize>();
397 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
398 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
399 }
400}
401
402#[derive(Clone)]
403pub(crate) struct SeriesSet {
404 region_metadata: RegionMetadataRef,
405 series: Arc<RwLock<SeriesMap>>,
406 codec: Arc<DensePrimaryKeyCodec>,
407}
408
409impl SeriesSet {
410 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
411 Self {
412 region_metadata,
413 series: Default::default(),
414 codec,
415 }
416 }
417}
418
419impl SeriesSet {
420 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
422 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
423 let value_allocated = series.write().unwrap().push(
424 kv.timestamp(),
425 kv.sequence(),
426 kv.op_type(),
427 kv.fields(),
428 );
429 return (0, value_allocated);
430 };
431
432 let mut indices = self.series.write().unwrap();
433 match indices.0.entry(primary_key) {
434 Entry::Vacant(v) => {
435 let key_len = v.key().len();
436 let mut series = Series::new(&self.region_metadata);
437 let value_allocated =
438 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
439 v.insert(Arc::new(RwLock::new(series)));
440 (key_len, value_allocated)
441 }
442 Entry::Occupied(v) => {
444 let value_allocated = v.get().write().unwrap().push(
445 kv.timestamp(),
446 kv.sequence(),
447 kv.op_type(),
448 kv.fields(),
449 );
450 (0, value_allocated)
451 }
452 }
453 }
454
455 #[cfg(test)]
456 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
457 self.series.read().unwrap().0.get(primary_key).cloned()
458 }
459
460 fn iter_series(
462 &self,
463 projection: HashSet<ColumnId>,
464 predicate: Option<Predicate>,
465 dedup: bool,
466 sequence: Option<SequenceNumber>,
467 mem_scan_metrics: Option<MemScanMetrics>,
468 ) -> Result<Iter> {
469 let primary_key_schema = primary_key_schema(&self.region_metadata);
470 let primary_key_datatypes = self
471 .region_metadata
472 .primary_key_columns()
473 .map(|pk| pk.column_schema.data_type.clone())
474 .collect();
475
476 Iter::try_new(
477 self.region_metadata.clone(),
478 self.series.clone(),
479 projection,
480 predicate,
481 primary_key_schema,
482 primary_key_datatypes,
483 self.codec.clone(),
484 dedup,
485 sequence,
486 mem_scan_metrics,
487 )
488 }
489}
490
491pub(crate) fn primary_key_schema(
494 region_metadata: &RegionMetadataRef,
495) -> arrow::datatypes::SchemaRef {
496 let fields = region_metadata
497 .primary_key_columns()
498 .map(|pk| {
499 arrow::datatypes::Field::new(
500 pk.column_schema.name.clone(),
501 pk.column_schema.data_type.as_arrow_type(),
502 pk.column_schema.is_nullable(),
503 )
504 })
505 .collect::<Vec<_>>();
506 Arc::new(arrow::datatypes::Schema::new(fields))
507}
508
509#[derive(Debug, Default)]
511struct Metrics {
512 total_series: usize,
514 num_pruned_series: usize,
516 num_rows: usize,
518 num_batches: usize,
520 scan_cost: Duration,
522}
523
524struct Iter {
525 metadata: RegionMetadataRef,
526 series: Arc<RwLock<SeriesMap>>,
527 projection: HashSet<ColumnId>,
528 last_key: Option<Vec<u8>>,
529 predicate: Vec<SimpleFilterEvaluator>,
530 pk_schema: arrow::datatypes::SchemaRef,
531 pk_datatypes: Vec<ConcreteDataType>,
532 codec: Arc<DensePrimaryKeyCodec>,
533 dedup: bool,
534 sequence: Option<SequenceNumber>,
535 metrics: Metrics,
536 mem_scan_metrics: Option<MemScanMetrics>,
537}
538
539impl Iter {
540 #[allow(clippy::too_many_arguments)]
541 pub(crate) fn try_new(
542 metadata: RegionMetadataRef,
543 series: Arc<RwLock<SeriesMap>>,
544 projection: HashSet<ColumnId>,
545 predicate: Option<Predicate>,
546 pk_schema: arrow::datatypes::SchemaRef,
547 pk_datatypes: Vec<ConcreteDataType>,
548 codec: Arc<DensePrimaryKeyCodec>,
549 dedup: bool,
550 sequence: Option<SequenceNumber>,
551 mem_scan_metrics: Option<MemScanMetrics>,
552 ) -> Result<Self> {
553 let predicate = predicate
554 .map(|predicate| {
555 predicate
556 .exprs()
557 .iter()
558 .filter_map(SimpleFilterEvaluator::try_new)
559 .collect::<Vec<_>>()
560 })
561 .unwrap_or_default();
562 Ok(Self {
563 metadata,
564 series,
565 projection,
566 last_key: None,
567 predicate,
568 pk_schema,
569 pk_datatypes,
570 codec,
571 dedup,
572 sequence,
573 metrics: Metrics::default(),
574 mem_scan_metrics,
575 })
576 }
577
578 fn report_mem_scan_metrics(&mut self) {
579 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
580 let inner = crate::memtable::MemScanMetricsData {
581 total_series: self.metrics.total_series,
582 num_rows: self.metrics.num_rows,
583 num_batches: self.metrics.num_batches,
584 scan_cost: self.metrics.scan_cost,
585 };
586 mem_scan_metrics.merge_inner(&inner);
587 }
588 }
589}
590
591impl Drop for Iter {
592 fn drop(&mut self) {
593 debug!(
594 "Iter {} time series memtable, metrics: {:?}",
595 self.metadata.region_id, self.metrics
596 );
597
598 self.report_mem_scan_metrics();
600
601 READ_ROWS_TOTAL
602 .with_label_values(&["time_series_memtable"])
603 .inc_by(self.metrics.num_rows as u64);
604 READ_STAGE_ELAPSED
605 .with_label_values(&["scan_memtable"])
606 .observe(self.metrics.scan_cost.as_secs_f64());
607 }
608}
609
610impl Iterator for Iter {
611 type Item = Result<Batch>;
612
613 fn next(&mut self) -> Option<Self::Item> {
614 let start = Instant::now();
615 let map = self.series.read().unwrap();
616 let range = match &self.last_key {
617 None => map.0.range::<Vec<u8>, _>(..),
618 Some(last_key) => map
619 .0
620 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
621 };
622
623 for (primary_key, series) in range {
625 self.metrics.total_series += 1;
626
627 let mut series = series.write().unwrap();
628 if !self.predicate.is_empty()
629 && !prune_primary_key(
630 &self.codec,
631 primary_key.as_slice(),
632 &mut series,
633 &self.pk_datatypes,
634 self.pk_schema.clone(),
635 &self.predicate,
636 )
637 {
638 self.metrics.num_pruned_series += 1;
640 continue;
641 }
642 self.last_key = Some(primary_key.clone());
643
644 let values = series.compact(&self.metadata);
645 let batch = values.and_then(|v| {
646 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
647 });
648
649 self.metrics.num_batches += 1;
651 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
652 self.metrics.scan_cost += start.elapsed();
653
654 let mut batch = batch;
655 batch = batch.and_then(|mut batch| {
656 batch.filter_by_sequence(self.sequence)?;
657 Ok(batch)
658 });
659 return Some(batch);
660 }
661 drop(map); self.metrics.scan_cost += start.elapsed();
663
664 self.report_mem_scan_metrics();
666
667 None
668 }
669}
670
671fn prune_primary_key(
672 codec: &Arc<DensePrimaryKeyCodec>,
673 pk: &[u8],
674 series: &mut Series,
675 datatypes: &[ConcreteDataType],
676 pk_schema: arrow::datatypes::SchemaRef,
677 predicates: &[SimpleFilterEvaluator],
678) -> bool {
679 if pk_schema.fields().is_empty() {
681 return true;
682 }
683
684 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
686 pk_values
687 } else {
688 let pk_values = codec.decode_dense_without_column_id(pk);
689 if let Err(e) = pk_values {
690 error!(e; "Failed to decode primary key");
691 return true;
692 }
693 series.update_pk_cache(pk_values.unwrap());
694 series.pk_cache.as_ref().unwrap()
695 };
696
697 let mut result = true;
699 for predicate in predicates {
700 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
702 continue;
703 };
704 let scalar_value = pk_values[index]
706 .try_to_scalar_value(&datatypes[index])
707 .unwrap();
708 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
709 }
710
711 result
712}
713
714pub struct Series {
716 pk_cache: Option<Vec<Value>>,
717 active: ValueBuilder,
718 frozen: Vec<Values>,
719 region_metadata: RegionMetadataRef,
720 capacity: usize,
721}
722
723impl Series {
724 pub(crate) fn with_capacity(
725 region_metadata: &RegionMetadataRef,
726 init_capacity: usize,
727 capacity: usize,
728 ) -> Self {
729 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
730 Self {
731 pk_cache: None,
732 active: ValueBuilder::new(region_metadata, init_capacity),
733 frozen: vec![],
734 region_metadata: region_metadata.clone(),
735 capacity,
736 }
737 }
738
739 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
740 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
741 }
742
743 pub fn is_empty(&self) -> bool {
744 self.active.len() == 0 && self.frozen.is_empty()
745 }
746
747 pub(crate) fn push<'a>(
749 &mut self,
750 ts: ValueRef<'a>,
751 sequence: u64,
752 op_type: OpType,
753 values: impl Iterator<Item = ValueRef<'a>>,
754 ) -> usize {
755 if self.active.len() + 10 > self.capacity {
757 let region_metadata = self.region_metadata.clone();
758 self.freeze(®ion_metadata);
759 }
760 self.active.push(ts, sequence, op_type as u8, values)
761 }
762
763 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
764 self.pk_cache = Some(pk_values);
765 }
766
767 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
769 if self.active.len() != 0 {
770 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
771 std::mem::swap(&mut self.active, &mut builder);
772 self.frozen.push(Values::from(builder));
773 }
774 }
775
776 pub(crate) fn extend(
777 &mut self,
778 ts_v: VectorRef,
779 op_type_v: u8,
780 sequence_v: u64,
781 fields: Vec<VectorRef>,
782 ) -> Result<()> {
783 if !self.active.can_accommodate(&fields)? {
784 let region_metadata = self.region_metadata.clone();
785 self.freeze(®ion_metadata);
786 }
787 self.active.extend(ts_v, op_type_v, sequence_v, fields)
788 }
789
790 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
793 self.freeze(region_metadata);
794
795 let frozen = &self.frozen;
796
797 debug_assert!(!frozen.is_empty());
799
800 if frozen.len() > 1 {
801 let column_size = frozen[0].fields.len() + 3;
805
806 if cfg!(debug_assertions) {
807 debug_assert!(
808 frozen
809 .iter()
810 .zip(frozen.iter().skip(1))
811 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
812 );
813 }
814
815 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
816 let concatenated = (0..column_size)
817 .map(|i| {
818 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
819 arrow::compute::concat(&to_concat)
820 })
821 .collect::<std::result::Result<Vec<_>, _>>()
822 .context(ComputeArrowSnafu)?;
823
824 debug_assert_eq!(concatenated.len(), column_size);
825 let values = Values::from_columns(&concatenated)?;
826 self.frozen = vec![values];
827 };
828 Ok(&self.frozen[0])
829 }
830
831 pub fn read_to_values(&self) -> Vec<Values> {
832 let mut res = Vec::with_capacity(self.frozen.len() + 1);
833 res.extend(self.frozen.iter().cloned());
834 res.push(self.active.finish_cloned());
835 res
836 }
837}
838
839pub(crate) struct ValueBuilder {
841 timestamp: Vec<i64>,
842 timestamp_type: ConcreteDataType,
843 sequence: Vec<u64>,
844 op_type: Vec<u8>,
845 fields: Vec<Option<FieldBuilder>>,
846 field_types: Vec<ConcreteDataType>,
847}
848
849impl ValueBuilder {
850 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
851 let timestamp_type = region_metadata
852 .time_index_column()
853 .column_schema
854 .data_type
855 .clone();
856 let sequence = Vec::with_capacity(capacity);
857 let op_type = Vec::with_capacity(capacity);
858
859 let field_types = region_metadata
860 .field_columns()
861 .map(|c| c.column_schema.data_type.clone())
862 .collect::<Vec<_>>();
863 let fields = (0..field_types.len()).map(|_| None).collect();
864 Self {
865 timestamp: Vec::with_capacity(capacity),
866 timestamp_type,
867 sequence,
868 op_type,
869 fields,
870 field_types,
871 }
872 }
873
874 pub fn num_field_builders(&self) -> usize {
876 self.fields.iter().flatten().count()
877 }
878
879 pub(crate) fn push<'a>(
885 &mut self,
886 ts: ValueRef,
887 sequence: u64,
888 op_type: u8,
889 fields: impl Iterator<Item = ValueRef<'a>>,
890 ) -> usize {
891 #[cfg(debug_assertions)]
892 let fields = {
893 let field_vec = fields.collect::<Vec<_>>();
894 debug_assert_eq!(field_vec.len(), self.fields.len());
895 field_vec.into_iter()
896 };
897
898 self.timestamp
899 .push(ts.as_timestamp().unwrap().unwrap().value());
900 self.sequence.push(sequence);
901 self.op_type.push(op_type);
902 let num_rows = self.timestamp.len();
903 let mut size = 0;
904 for (idx, field_value) in fields.enumerate() {
905 size += field_value.data_size();
906 if !field_value.is_null() || self.fields[idx].is_some() {
907 if let Some(field) = self.fields[idx].as_mut() {
908 let _ = field.push(field_value);
909 } else {
910 let mut mutable_vector =
911 if let ConcreteDataType::String(_) = &self.field_types[idx] {
912 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
913 } else {
914 FieldBuilder::Other(
915 self.field_types[idx]
916 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
917 )
918 };
919 mutable_vector.push_nulls(num_rows - 1);
920 let _ = mutable_vector.push(field_value);
921 self.fields[idx] = Some(mutable_vector);
922 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
923 }
924 }
925 }
926
927 size
928 }
929
930 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
933 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
934 let Some(builder) = field_dest else {
935 continue;
936 };
937 let FieldBuilder::String(builder) = builder else {
938 continue;
939 };
940 let array = field_src.to_arrow_array();
941 let string_array = array
942 .as_any()
943 .downcast_ref::<StringArray>()
944 .with_context(|| error::InvalidBatchSnafu {
945 reason: format!(
946 "Field type mismatch, expecting String, given: {}",
947 field_src.data_type()
948 ),
949 })?;
950 let space_needed = string_array.value_data().len() as i32;
951 if builder.next_offset().checked_add(space_needed).is_none() {
953 return Ok(false);
954 }
955 }
956 Ok(true)
957 }
958
959 pub(crate) fn extend(
960 &mut self,
961 ts_v: VectorRef,
962 op_type: u8,
963 sequence: u64,
964 fields: Vec<VectorRef>,
965 ) -> Result<()> {
966 let num_rows_before = self.timestamp.len();
967 let num_rows_to_write = ts_v.len();
968 self.timestamp.reserve(num_rows_to_write);
969 match self.timestamp_type {
970 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
971 self.timestamp.extend(
972 ts_v.as_any()
973 .downcast_ref::<TimestampSecondVector>()
974 .unwrap()
975 .iter_data()
976 .map(|v| v.unwrap().0.value()),
977 );
978 }
979 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
980 self.timestamp.extend(
981 ts_v.as_any()
982 .downcast_ref::<TimestampMillisecondVector>()
983 .unwrap()
984 .iter_data()
985 .map(|v| v.unwrap().0.value()),
986 );
987 }
988 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
989 self.timestamp.extend(
990 ts_v.as_any()
991 .downcast_ref::<TimestampMicrosecondVector>()
992 .unwrap()
993 .iter_data()
994 .map(|v| v.unwrap().0.value()),
995 );
996 }
997 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
998 self.timestamp.extend(
999 ts_v.as_any()
1000 .downcast_ref::<TimestampNanosecondVector>()
1001 .unwrap()
1002 .iter_data()
1003 .map(|v| v.unwrap().0.value()),
1004 );
1005 }
1006 _ => unreachable!(),
1007 };
1008
1009 self.op_type.reserve(num_rows_to_write);
1010 self.op_type
1011 .extend(iter::repeat_n(op_type, num_rows_to_write));
1012 self.sequence.reserve(num_rows_to_write);
1013 self.sequence
1014 .extend(iter::repeat_n(sequence, num_rows_to_write));
1015
1016 for (field_idx, (field_src, field_dest)) in
1017 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1018 {
1019 let builder = field_dest.get_or_insert_with(|| {
1020 let mut field_builder =
1021 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1022 field_builder.push_nulls(num_rows_before);
1023 field_builder
1024 });
1025 match builder {
1026 FieldBuilder::String(builder) => {
1027 let array = field_src.to_arrow_array();
1028 let string_array =
1029 array
1030 .as_any()
1031 .downcast_ref::<StringArray>()
1032 .with_context(|| error::InvalidBatchSnafu {
1033 reason: format!(
1034 "Field type mismatch, expecting String, given: {}",
1035 field_src.data_type()
1036 ),
1037 })?;
1038 builder.append_array(string_array);
1039 }
1040 FieldBuilder::Other(builder) => {
1041 let len = field_src.len();
1042 builder
1043 .extend_slice_of(&*field_src, 0, len)
1044 .context(error::ComputeVectorSnafu)?;
1045 }
1046 }
1047 }
1048 Ok(())
1049 }
1050
1051 fn len(&self) -> usize {
1053 let sequence_len = self.sequence.len();
1054 debug_assert_eq!(sequence_len, self.op_type.len());
1055 debug_assert_eq!(sequence_len, self.timestamp.len());
1056 sequence_len
1057 }
1058
1059 fn finish_cloned(&self) -> Values {
1060 let num_rows = self.sequence.len();
1061 let fields = self
1062 .fields
1063 .iter()
1064 .enumerate()
1065 .map(|(i, v)| {
1066 if let Some(v) = v {
1067 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1068 v.finish_cloned()
1069 } else {
1070 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1071 single_null.push_nulls(num_rows);
1072 single_null.to_vector()
1073 }
1074 })
1075 .collect::<Vec<_>>();
1076
1077 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1078 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1079 let timestamp: VectorRef = match self.timestamp_type {
1080 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1081 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1082 }
1083 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1084 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1085 }
1086 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1087 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1088 }
1089 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1090 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1091 }
1092 _ => unreachable!(),
1093 };
1094
1095 if cfg!(debug_assertions) {
1096 debug_assert_eq!(timestamp.len(), sequence.len());
1097 debug_assert_eq!(timestamp.len(), op_type.len());
1098 for field in &fields {
1099 debug_assert_eq!(timestamp.len(), field.len());
1100 }
1101 }
1102
1103 Values {
1104 timestamp,
1105 sequence,
1106 op_type,
1107 fields,
1108 }
1109 }
1110}
1111
1112#[derive(Clone)]
1114pub struct Values {
1115 pub(crate) timestamp: VectorRef,
1116 pub(crate) sequence: Arc<UInt64Vector>,
1117 pub(crate) op_type: Arc<UInt8Vector>,
1118 pub(crate) fields: Vec<VectorRef>,
1119}
1120
1121impl Values {
1122 pub fn to_batch(
1125 &self,
1126 primary_key: &[u8],
1127 metadata: &RegionMetadataRef,
1128 projection: &HashSet<ColumnId>,
1129 dedup: bool,
1130 ) -> Result<Batch> {
1131 let builder = BatchBuilder::with_required_columns(
1132 primary_key.to_vec(),
1133 self.timestamp.clone(),
1134 self.sequence.clone(),
1135 self.op_type.clone(),
1136 );
1137
1138 let fields = metadata
1139 .field_columns()
1140 .zip(self.fields.iter())
1141 .filter_map(|(c, f)| {
1142 projection.get(&c.column_id).map(|c| BatchColumn {
1143 column_id: *c,
1144 data: f.clone(),
1145 })
1146 })
1147 .collect();
1148
1149 let mut batch = builder.with_fields(fields).build()?;
1150 batch.sort(dedup)?;
1151 Ok(batch)
1152 }
1153
1154 fn columns(&self) -> Vec<ArrayRef> {
1156 let mut res = Vec::with_capacity(3 + self.fields.len());
1157 res.push(self.timestamp.to_arrow_array());
1158 res.push(self.sequence.to_arrow_array());
1159 res.push(self.op_type.to_arrow_array());
1160 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1161 res
1162 }
1163
1164 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1166 debug_assert!(cols.len() >= 3);
1167 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1168 let sequence =
1169 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1170 let op_type =
1171 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1172 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1173
1174 Ok(Self {
1175 timestamp,
1176 sequence,
1177 op_type,
1178 fields,
1179 })
1180 }
1181}
1182
1183impl From<ValueBuilder> for Values {
1184 fn from(mut value: ValueBuilder) -> Self {
1185 let num_rows = value.len();
1186 let fields = value
1187 .fields
1188 .iter_mut()
1189 .enumerate()
1190 .map(|(i, v)| {
1191 if let Some(v) = v {
1192 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1193 v.finish()
1194 } else {
1195 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1196 single_null.push_nulls(num_rows);
1197 single_null.to_vector()
1198 }
1199 })
1200 .collect::<Vec<_>>();
1201
1202 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1203 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1204 let timestamp: VectorRef = match value.timestamp_type {
1205 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1206 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1207 }
1208 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1209 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1210 }
1211 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1212 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1213 }
1214 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1215 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1216 }
1217 _ => unreachable!(),
1218 };
1219
1220 if cfg!(debug_assertions) {
1221 debug_assert_eq!(timestamp.len(), sequence.len());
1222 debug_assert_eq!(timestamp.len(), op_type.len());
1223 for field in &fields {
1224 debug_assert_eq!(timestamp.len(), field.len());
1225 }
1226 }
1227
1228 Self {
1229 timestamp,
1230 sequence,
1231 op_type,
1232 fields,
1233 }
1234 }
1235}
1236
1237struct TimeSeriesIterBuilder {
1238 series_set: SeriesSet,
1239 projection: HashSet<ColumnId>,
1240 predicate: Option<Predicate>,
1241 dedup: bool,
1242 sequence: Option<SequenceNumber>,
1243 merge_mode: MergeMode,
1244}
1245
1246impl IterBuilder for TimeSeriesIterBuilder {
1247 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1248 let iter = self.series_set.iter_series(
1249 self.projection.clone(),
1250 self.predicate.clone(),
1251 self.dedup,
1252 self.sequence,
1253 metrics,
1254 )?;
1255
1256 if self.merge_mode == MergeMode::LastNonNull {
1257 let iter = LastNonNullIter::new(iter);
1258 Ok(Box::new(iter))
1259 } else {
1260 Ok(Box::new(iter))
1261 }
1262 }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267 use std::collections::{HashMap, HashSet};
1268
1269 use api::helper::ColumnDataTypeWrapper;
1270 use api::v1::helper::row;
1271 use api::v1::value::ValueData;
1272 use api::v1::{Mutation, Rows, SemanticType};
1273 use common_time::Timestamp;
1274 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1275 use datatypes::schema::ColumnSchema;
1276 use datatypes::value::{OrderedFloat, Value};
1277 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1278 use mito_codec::row_converter::SortField;
1279 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1280 use store_api::storage::RegionId;
1281
1282 use super::*;
1283 use crate::test_util::column_metadata_to_column_schema;
1284
1285 fn schema_for_test() -> RegionMetadataRef {
1286 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1287 builder
1288 .push_column_metadata(ColumnMetadata {
1289 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1290 semantic_type: SemanticType::Tag,
1291 column_id: 0,
1292 })
1293 .push_column_metadata(ColumnMetadata {
1294 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1295 semantic_type: SemanticType::Tag,
1296 column_id: 1,
1297 })
1298 .push_column_metadata(ColumnMetadata {
1299 column_schema: ColumnSchema::new(
1300 "ts",
1301 ConcreteDataType::timestamp_millisecond_datatype(),
1302 false,
1303 ),
1304 semantic_type: SemanticType::Timestamp,
1305 column_id: 2,
1306 })
1307 .push_column_metadata(ColumnMetadata {
1308 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1309 semantic_type: SemanticType::Field,
1310 column_id: 3,
1311 })
1312 .push_column_metadata(ColumnMetadata {
1313 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1314 semantic_type: SemanticType::Field,
1315 column_id: 4,
1316 })
1317 .primary_key(vec![0, 1]);
1318 let region_metadata = builder.build().unwrap();
1319 Arc::new(region_metadata)
1320 }
1321
1322 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1323 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1324 }
1325
1326 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1327 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1328 }
1329
1330 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1331 let ts = values
1332 .timestamp
1333 .as_any()
1334 .downcast_ref::<TimestampMillisecondVector>()
1335 .unwrap();
1336
1337 let v0 = values.fields[0]
1338 .as_any()
1339 .downcast_ref::<Int64Vector>()
1340 .unwrap();
1341 let v1 = values.fields[1]
1342 .as_any()
1343 .downcast_ref::<Float64Vector>()
1344 .unwrap();
1345 let read = ts
1346 .iter_data()
1347 .zip(values.sequence.iter_data())
1348 .zip(values.op_type.iter_data())
1349 .zip(v0.iter_data())
1350 .zip(v1.iter_data())
1351 .map(|((((ts, sequence), op_type), v0), v1)| {
1352 (
1353 ts.unwrap().0.value(),
1354 sequence.unwrap(),
1355 op_type.unwrap(),
1356 v0.unwrap(),
1357 v1.unwrap(),
1358 )
1359 })
1360 .collect::<Vec<_>>();
1361 assert_eq!(expect, &read);
1362 }
1363
1364 #[test]
1365 fn test_series() {
1366 let region_metadata = schema_for_test();
1367 let mut series = Series::new(®ion_metadata);
1368 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1369 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1370 assert_eq!(2, series.active.timestamp.len());
1371 assert_eq!(0, series.frozen.len());
1372
1373 let values = series.compact(®ion_metadata).unwrap();
1374 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1375 assert_eq!(0, series.active.timestamp.len());
1376 assert_eq!(1, series.frozen.len());
1377 }
1378
1379 #[test]
1380 fn test_series_with_nulls() {
1381 let region_metadata = schema_for_test();
1382 let mut series = Series::new(®ion_metadata);
1383 series.push(
1386 ts_value_ref(1),
1387 0,
1388 OpType::Put,
1389 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1390 );
1391 series.push(
1392 ts_value_ref(1),
1393 0,
1394 OpType::Put,
1395 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1396 );
1397 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1398 series.push(
1399 ts_value_ref(1),
1400 3,
1401 OpType::Put,
1402 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1403 );
1404 assert_eq!(4, series.active.timestamp.len());
1405 assert_eq!(0, series.frozen.len());
1406
1407 let values = series.compact(®ion_metadata).unwrap();
1408 assert_eq!(values.fields[0].null_count(), 1);
1409 assert_eq!(values.fields[1].null_count(), 3);
1410 assert_eq!(0, series.active.timestamp.len());
1411 assert_eq!(1, series.frozen.len());
1412 }
1413
1414 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1415 let ts_len = batch.timestamps().len();
1416 assert_eq!(batch.sequences().len(), ts_len);
1417 assert_eq!(batch.op_types().len(), ts_len);
1418 for f in batch.fields() {
1419 assert_eq!(f.data.len(), ts_len);
1420 }
1421
1422 let mut rows = vec![];
1423 for idx in 0..ts_len {
1424 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1425 row.push(batch.timestamps().get(idx));
1426 row.push(batch.sequences().get(idx));
1427 row.push(batch.op_types().get(idx));
1428 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1429 rows.push(row);
1430 }
1431
1432 assert_eq!(expect.len(), rows.len());
1433 for (idx, row) in rows.iter().enumerate() {
1434 assert_eq!(&expect[idx], row);
1435 }
1436 }
1437
1438 #[test]
1439 fn test_values_sort() {
1440 let schema = schema_for_test();
1441 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1442 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1443 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1444
1445 let fields = vec![
1446 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1447 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1448 ];
1449 let values = Values {
1450 timestamp: timestamp as Arc<_>,
1451 sequence,
1452 op_type,
1453 fields,
1454 };
1455
1456 let batch = values
1457 .to_batch(
1458 b"test",
1459 &schema,
1460 &[0, 1, 2, 3, 4].into_iter().collect(),
1461 true,
1462 )
1463 .unwrap();
1464 check_value(
1465 &batch,
1466 vec![
1467 vec![
1468 Value::Timestamp(Timestamp::new_millisecond(1)),
1469 Value::UInt64(1),
1470 Value::UInt8(1),
1471 Value::Int64(4),
1472 Value::Float64(OrderedFloat(1.1)),
1473 ],
1474 vec![
1475 Value::Timestamp(Timestamp::new_millisecond(2)),
1476 Value::UInt64(1),
1477 Value::UInt8(1),
1478 Value::Int64(3),
1479 Value::Float64(OrderedFloat(2.1)),
1480 ],
1481 vec![
1482 Value::Timestamp(Timestamp::new_millisecond(3)),
1483 Value::UInt64(2),
1484 Value::UInt8(0),
1485 Value::Int64(2),
1486 Value::Float64(OrderedFloat(4.2)),
1487 ],
1488 vec![
1489 Value::Timestamp(Timestamp::new_millisecond(4)),
1490 Value::UInt64(1),
1491 Value::UInt8(1),
1492 Value::Int64(1),
1493 Value::Float64(OrderedFloat(3.3)),
1494 ],
1495 ],
1496 )
1497 }
1498
1499 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1500 let column_schema = schema
1501 .column_metadatas
1502 .iter()
1503 .map(|c| api::v1::ColumnSchema {
1504 column_name: c.column_schema.name.clone(),
1505 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1506 .unwrap()
1507 .datatype() as i32,
1508 semantic_type: c.semantic_type as i32,
1509 ..Default::default()
1510 })
1511 .collect();
1512
1513 let rows = (0..len)
1514 .map(|i| {
1515 row(vec![
1516 ValueData::StringValue(k0.clone()),
1517 ValueData::I64Value(k1),
1518 ValueData::TimestampMillisecondValue(i as i64),
1519 ValueData::I64Value(i as i64),
1520 ValueData::F64Value(i as f64),
1521 ])
1522 })
1523 .collect();
1524 let mutation = api::v1::Mutation {
1525 op_type: 1,
1526 sequence: 0,
1527 rows: Some(Rows {
1528 schema: column_schema,
1529 rows,
1530 }),
1531 write_hint: None,
1532 };
1533 KeyValues::new(schema.as_ref(), mutation).unwrap()
1534 }
1535
1536 #[test]
1537 fn test_series_set_concurrency() {
1538 let schema = schema_for_test();
1539 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1540 schema
1541 .primary_key_columns()
1542 .map(|c| {
1543 (
1544 c.column_id,
1545 SortField::new(c.column_schema.data_type.clone()),
1546 )
1547 })
1548 .collect(),
1549 ));
1550 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1551
1552 let concurrency = 32;
1553 let pk_num = concurrency * 2;
1554 let mut handles = Vec::with_capacity(concurrency);
1555 for i in 0..concurrency {
1556 let set = set.clone();
1557 let schema = schema.clone();
1558 let column_schemas = schema
1559 .column_metadatas
1560 .iter()
1561 .map(column_metadata_to_column_schema)
1562 .collect::<Vec<_>>();
1563 let handle = std::thread::spawn(move || {
1564 for j in i * 100..(i + 1) * 100 {
1565 let pk = j % pk_num;
1566 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1567
1568 let kvs = KeyValues::new(
1569 &schema,
1570 Mutation {
1571 op_type: OpType::Put as i32,
1572 sequence: j as u64,
1573 rows: Some(Rows {
1574 schema: column_schemas.clone(),
1575 rows: vec![row(vec![
1576 ValueData::StringValue(format!("{}", j)),
1577 ValueData::I64Value(j as i64),
1578 ValueData::TimestampMillisecondValue(j as i64),
1579 ValueData::I64Value(j as i64),
1580 ValueData::F64Value(j as f64),
1581 ])],
1582 }),
1583 write_hint: None,
1584 },
1585 )
1586 .unwrap();
1587 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1588 }
1589 });
1590 handles.push(handle);
1591 }
1592 for h in handles {
1593 h.join().unwrap();
1594 }
1595
1596 let mut timestamps = Vec::with_capacity(concurrency * 100);
1597 let mut sequences = Vec::with_capacity(concurrency * 100);
1598 let mut op_types = Vec::with_capacity(concurrency * 100);
1599 let mut v0 = Vec::with_capacity(concurrency * 100);
1600
1601 for i in 0..pk_num {
1602 let pk = format!("pk-{}", i).as_bytes().to_vec();
1603 let series = set.get_series(&pk).unwrap();
1604 let mut guard = series.write().unwrap();
1605 let values = guard.compact(&schema).unwrap();
1606 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1607 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1608 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1609 v0.extend(
1610 values
1611 .fields
1612 .first()
1613 .unwrap()
1614 .as_any()
1615 .downcast_ref::<Int64Vector>()
1616 .unwrap()
1617 .iter_data()
1618 .map(|v| v.unwrap()),
1619 );
1620 }
1621
1622 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1623 assert_eq!(
1624 expected_sequence,
1625 sequences.iter().copied().collect::<HashSet<_>>()
1626 );
1627
1628 op_types.iter().all(|op| *op == OpType::Put as u8);
1629 assert_eq!(
1630 expected_sequence,
1631 timestamps.iter().copied().collect::<HashSet<_>>()
1632 );
1633
1634 assert_eq!(timestamps, sequences);
1635 assert_eq!(v0, timestamps);
1636 }
1637
1638 #[test]
1639 fn test_memtable() {
1640 common_telemetry::init_default_ut_logging();
1641 check_memtable_dedup(true);
1642 check_memtable_dedup(false);
1643 }
1644
1645 fn check_memtable_dedup(dedup: bool) {
1646 let schema = schema_for_test();
1647 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1648 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1649 memtable.write(&kvs).unwrap();
1650 memtable.write(&kvs).unwrap();
1651
1652 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1653 for ts in kvs
1654 .iter()
1655 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1656 {
1657 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1658 }
1659
1660 let iter = memtable.iter(None, None, None).unwrap();
1661 let mut read = HashMap::new();
1662
1663 for ts in iter
1664 .flat_map(|batch| {
1665 batch
1666 .unwrap()
1667 .timestamps()
1668 .as_any()
1669 .downcast_ref::<TimestampMillisecondVector>()
1670 .unwrap()
1671 .iter_data()
1672 .collect::<Vec<_>>()
1673 .into_iter()
1674 })
1675 .map(|v| v.unwrap().0.value())
1676 {
1677 *read.entry(ts).or_default() += 1;
1678 }
1679 assert_eq!(expected_ts, read);
1680
1681 let stats = memtable.stats();
1682 assert!(stats.bytes_allocated() > 0);
1683 assert_eq!(
1684 Some((
1685 Timestamp::new_millisecond(0),
1686 Timestamp::new_millisecond(99)
1687 )),
1688 stats.time_range()
1689 );
1690 }
1691
1692 #[test]
1693 fn test_memtable_projection() {
1694 common_telemetry::init_default_ut_logging();
1695 let schema = schema_for_test();
1696 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1697 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1698 memtable.write(&kvs).unwrap();
1699
1700 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1701
1702 let mut v0_all = vec![];
1703
1704 for res in iter {
1705 let batch = res.unwrap();
1706 assert_eq!(1, batch.fields().len());
1707 let v0 = batch
1708 .fields()
1709 .first()
1710 .unwrap()
1711 .data
1712 .as_any()
1713 .downcast_ref::<Int64Vector>()
1714 .unwrap();
1715 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1716 }
1717 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1718 }
1719
1720 #[test]
1721 fn test_memtable_concurrent_write_read() {
1722 common_telemetry::init_default_ut_logging();
1723 let schema = schema_for_test();
1724 let memtable = Arc::new(TimeSeriesMemtable::new(
1725 schema.clone(),
1726 42,
1727 None,
1728 true,
1729 MergeMode::LastRow,
1730 ));
1731
1732 let num_writers = 10;
1734 let num_readers = 5;
1736 let series_per_writer = 100;
1738 let rows_per_series = 10;
1740 let total_series = num_writers * series_per_writer;
1742
1743 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1745
1746 let mut writer_handles = Vec::with_capacity(num_writers);
1748 for writer_id in 0..num_writers {
1749 let memtable = memtable.clone();
1750 let schema = schema.clone();
1751 let barrier = barrier.clone();
1752
1753 let handle = std::thread::spawn(move || {
1754 barrier.wait();
1756
1757 for series_id in 0..series_per_writer {
1759 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1760 let kvs =
1761 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1762 memtable.write(&kvs).unwrap();
1763 }
1764 });
1765
1766 writer_handles.push(handle);
1767 }
1768
1769 let mut reader_handles = Vec::with_capacity(num_readers);
1771 for _ in 0..num_readers {
1772 let memtable = memtable.clone();
1773 let barrier = barrier.clone();
1774
1775 let handle = std::thread::spawn(move || {
1776 barrier.wait();
1777
1778 for _ in 0..10 {
1779 let iter = memtable.iter(None, None, None).unwrap();
1780 for batch_result in iter {
1781 let _ = batch_result.unwrap();
1782 }
1783 }
1784 });
1785
1786 reader_handles.push(handle);
1787 }
1788
1789 barrier.wait();
1790
1791 for handle in writer_handles {
1792 handle.join().unwrap();
1793 }
1794 for handle in reader_handles {
1795 handle.join().unwrap();
1796 }
1797
1798 let iter = memtable.iter(None, None, None).unwrap();
1799 let mut series_count = 0;
1800 let mut row_count = 0;
1801
1802 for batch_result in iter {
1803 let batch = batch_result.unwrap();
1804 series_count += 1;
1805 row_count += batch.num_rows();
1806 }
1807 assert_eq!(total_series, series_count);
1808 assert_eq!(total_series * rows_per_series, row_count);
1809 }
1810}