1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56 MemtableStats, RangesOptions,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115
116 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117 false
120 }
121}
122
123pub struct TimeSeriesMemtable {
125 id: MemtableId,
126 region_metadata: RegionMetadataRef,
127 row_codec: Arc<DensePrimaryKeyCodec>,
128 series_set: SeriesSet,
129 alloc_tracker: AllocTracker,
130 max_timestamp: AtomicI64,
131 min_timestamp: AtomicI64,
132 max_sequence: AtomicU64,
133 dedup: bool,
134 merge_mode: MergeMode,
135 num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140 pub fn new(
141 region_metadata: RegionMetadataRef,
142 id: MemtableId,
143 write_buffer_manager: Option<WriteBufferManagerRef>,
144 dedup: bool,
145 merge_mode: MergeMode,
146 ) -> Self {
147 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
148 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149 let dedup = if merge_mode == MergeMode::LastNonNull {
150 false
151 } else {
152 dedup
153 };
154 Self {
155 id,
156 region_metadata,
157 series_set,
158 row_codec,
159 alloc_tracker: AllocTracker::new(write_buffer_manager),
160 max_timestamp: AtomicI64::new(i64::MIN),
161 min_timestamp: AtomicI64::new(i64::MAX),
162 max_sequence: AtomicU64::new(0),
163 dedup,
164 merge_mode,
165 num_rows: Default::default(),
166 }
167 }
168
169 fn update_stats(&self, stats: WriteMetrics) {
171 self.alloc_tracker
172 .on_allocation(stats.key_bytes + stats.value_bytes);
173 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175 self.max_sequence
176 .fetch_max(stats.max_sequence, Ordering::SeqCst);
177 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178 }
179
180 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181 ensure!(
182 self.row_codec.num_fields() == kv.num_primary_keys(),
183 PrimaryKeyLengthMismatchSnafu {
184 expect: self.row_codec.num_fields(),
185 actual: kv.num_primary_keys(),
186 }
187 );
188
189 let primary_key_encoded = self
190 .row_codec
191 .encode(kv.primary_keys())
192 .context(EncodeSnafu)?;
193
194 let (key_allocated, value_allocated) =
195 self.series_set.push_to_series(primary_key_encoded, &kv);
196 stats.key_bytes += key_allocated;
197 stats.value_bytes += value_allocated;
198
199 let ts = kv
201 .timestamp()
202 .try_into_timestamp()
203 .unwrap()
204 .unwrap()
205 .value();
206 stats.min_ts = stats.min_ts.min(ts);
207 stats.max_ts = stats.max_ts.max(ts);
208 Ok(())
209 }
210}
211
212impl Debug for TimeSeriesMemtable {
213 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
214 f.debug_struct("TimeSeriesMemtable").finish()
215 }
216}
217
218impl Memtable for TimeSeriesMemtable {
219 fn id(&self) -> MemtableId {
220 self.id
221 }
222
223 fn write(&self, kvs: &KeyValues) -> Result<()> {
224 if kvs.is_empty() {
225 return Ok(());
226 }
227
228 let mut local_stats = WriteMetrics::default();
229
230 for kv in kvs.iter() {
231 self.write_key_value(kv, &mut local_stats)?;
232 }
233 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
234 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
235 local_stats.max_sequence = kvs.max_sequence();
236 local_stats.num_rows = kvs.num_rows();
237 self.update_stats(local_stats);
241 Ok(())
242 }
243
244 fn write_one(&self, key_value: KeyValue) -> Result<()> {
245 let mut metrics = WriteMetrics::default();
246 let res = self.write_key_value(key_value, &mut metrics);
247 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
248 metrics.max_sequence = key_value.sequence();
249 metrics.num_rows = 1;
250
251 if res.is_ok() {
252 self.update_stats(metrics);
253 }
254 res
255 }
256
257 fn write_bulk(&self, part: BulkPart) -> Result<()> {
258 let mutation = part.to_mutation(&self.region_metadata)?;
260 let mut metrics = WriteMetrics::default();
261 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
262 for kv in key_values.iter() {
263 self.write_key_value(kv, &mut metrics)?
264 }
265 }
266
267 metrics.max_sequence = part.sequence;
268 metrics.max_ts = part.max_timestamp;
269 metrics.min_ts = part.min_timestamp;
270 metrics.num_rows = part.num_rows();
271 self.update_stats(metrics);
272 Ok(())
273 }
274
275 #[cfg(any(test, feature = "test"))]
276 fn iter(
277 &self,
278 projection: Option<&[ColumnId]>,
279 filters: Option<Predicate>,
280 sequence: Option<SequenceRange>,
281 ) -> Result<BoxedBatchIterator> {
282 let projection = if let Some(projection) = projection {
283 projection.iter().copied().collect()
284 } else {
285 self.region_metadata
286 .field_columns()
287 .map(|c| c.column_id)
288 .collect()
289 };
290
291 let iter = self
292 .series_set
293 .iter_series(projection, filters, self.dedup, sequence, None)?;
294
295 if self.merge_mode == MergeMode::LastNonNull {
296 let iter = LastNonNullIter::new(iter);
297 Ok(Box::new(iter))
298 } else {
299 Ok(Box::new(iter))
300 }
301 }
302
303 fn ranges(
304 &self,
305 projection: Option<&[ColumnId]>,
306 options: RangesOptions,
307 ) -> Result<MemtableRanges> {
308 let predicate = options.predicate;
309 let sequence = options.sequence;
310 let projection = if let Some(projection) = projection {
311 projection.iter().copied().collect()
312 } else {
313 self.region_metadata
314 .field_columns()
315 .map(|c| c.column_id)
316 .collect()
317 };
318 let builder = Box::new(TimeSeriesIterBuilder {
319 series_set: self.series_set.clone(),
320 projection,
321 predicate: predicate.predicate().cloned(),
322 dedup: self.dedup,
323 merge_mode: self.merge_mode,
324 sequence,
325 });
326 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
327
328 let stats = self.stats();
329 Ok(MemtableRanges {
330 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
331 stats,
332 })
333 }
334
335 fn is_empty(&self) -> bool {
336 self.series_set.series.read().unwrap().0.is_empty()
337 }
338
339 fn freeze(&self) -> Result<()> {
340 self.alloc_tracker.done_allocating();
341
342 Ok(())
343 }
344
345 fn stats(&self) -> MemtableStats {
346 let estimated_bytes = self.alloc_tracker.bytes_allocated();
347
348 if estimated_bytes == 0 {
349 return MemtableStats {
351 estimated_bytes,
352 time_range: None,
353 num_rows: 0,
354 num_ranges: 0,
355 max_sequence: 0,
356 series_count: 0,
357 };
358 }
359 let ts_type = self
360 .region_metadata
361 .time_index_column()
362 .column_schema
363 .data_type
364 .clone()
365 .as_timestamp()
366 .expect("Timestamp column must have timestamp type");
367 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
368 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
369 let series_count = self.series_set.series.read().unwrap().0.len();
370 MemtableStats {
371 estimated_bytes,
372 time_range: Some((min_timestamp, max_timestamp)),
373 num_rows: self.num_rows.load(Ordering::Relaxed),
374 num_ranges: 1,
375 max_sequence: self.max_sequence.load(Ordering::Relaxed),
376 series_count,
377 }
378 }
379
380 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
381 Arc::new(TimeSeriesMemtable::new(
382 metadata.clone(),
383 id,
384 self.alloc_tracker.write_buffer_manager(),
385 self.dedup,
386 self.merge_mode,
387 ))
388 }
389}
390
391#[derive(Default)]
392struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
393
394impl Drop for SeriesMap {
395 fn drop(&mut self) {
396 let num_series = self.0.len();
397 let num_field_builders = self
398 .0
399 .values()
400 .map(|v| v.read().unwrap().active.num_field_builders())
401 .sum::<usize>();
402 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
403 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
404 }
405}
406
407#[derive(Clone)]
408pub(crate) struct SeriesSet {
409 region_metadata: RegionMetadataRef,
410 series: Arc<RwLock<SeriesMap>>,
411 codec: Arc<DensePrimaryKeyCodec>,
412}
413
414impl SeriesSet {
415 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
416 Self {
417 region_metadata,
418 series: Default::default(),
419 codec,
420 }
421 }
422}
423
424impl SeriesSet {
425 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
427 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
428 let value_allocated = series.write().unwrap().push(
429 kv.timestamp(),
430 kv.sequence(),
431 kv.op_type(),
432 kv.fields(),
433 );
434 return (0, value_allocated);
435 };
436
437 let mut indices = self.series.write().unwrap();
438 match indices.0.entry(primary_key) {
439 Entry::Vacant(v) => {
440 let key_len = v.key().len();
441 let mut series = Series::new(&self.region_metadata);
442 let value_allocated =
443 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
444 v.insert(Arc::new(RwLock::new(series)));
445 (key_len, value_allocated)
446 }
447 Entry::Occupied(v) => {
449 let value_allocated = v.get().write().unwrap().push(
450 kv.timestamp(),
451 kv.sequence(),
452 kv.op_type(),
453 kv.fields(),
454 );
455 (0, value_allocated)
456 }
457 }
458 }
459
460 #[cfg(test)]
461 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
462 self.series.read().unwrap().0.get(primary_key).cloned()
463 }
464
465 fn iter_series(
467 &self,
468 projection: HashSet<ColumnId>,
469 predicate: Option<Predicate>,
470 dedup: bool,
471 sequence: Option<SequenceRange>,
472 mem_scan_metrics: Option<MemScanMetrics>,
473 ) -> Result<Iter> {
474 let primary_key_schema = primary_key_schema(&self.region_metadata);
475 let primary_key_datatypes = self
476 .region_metadata
477 .primary_key_columns()
478 .map(|pk| pk.column_schema.data_type.clone())
479 .collect();
480
481 Iter::try_new(
482 self.region_metadata.clone(),
483 self.series.clone(),
484 projection,
485 predicate,
486 primary_key_schema,
487 primary_key_datatypes,
488 self.codec.clone(),
489 dedup,
490 sequence,
491 mem_scan_metrics,
492 )
493 }
494}
495
496pub(crate) fn primary_key_schema(
499 region_metadata: &RegionMetadataRef,
500) -> arrow::datatypes::SchemaRef {
501 let fields = region_metadata
502 .primary_key_columns()
503 .map(|pk| {
504 arrow::datatypes::Field::new(
505 pk.column_schema.name.clone(),
506 pk.column_schema.data_type.as_arrow_type(),
507 pk.column_schema.is_nullable(),
508 )
509 })
510 .collect::<Vec<_>>();
511 Arc::new(arrow::datatypes::Schema::new(fields))
512}
513
514#[derive(Debug, Default)]
516struct Metrics {
517 total_series: usize,
519 num_pruned_series: usize,
521 num_rows: usize,
523 num_batches: usize,
525 scan_cost: Duration,
527}
528
529struct Iter {
530 metadata: RegionMetadataRef,
531 series: Arc<RwLock<SeriesMap>>,
532 projection: HashSet<ColumnId>,
533 last_key: Option<Vec<u8>>,
534 predicate: Vec<SimpleFilterEvaluator>,
535 pk_schema: arrow::datatypes::SchemaRef,
536 pk_datatypes: Vec<ConcreteDataType>,
537 codec: Arc<DensePrimaryKeyCodec>,
538 dedup: bool,
539 sequence: Option<SequenceRange>,
540 metrics: Metrics,
541 mem_scan_metrics: Option<MemScanMetrics>,
542}
543
544impl Iter {
545 #[allow(clippy::too_many_arguments)]
546 pub(crate) fn try_new(
547 metadata: RegionMetadataRef,
548 series: Arc<RwLock<SeriesMap>>,
549 projection: HashSet<ColumnId>,
550 predicate: Option<Predicate>,
551 pk_schema: arrow::datatypes::SchemaRef,
552 pk_datatypes: Vec<ConcreteDataType>,
553 codec: Arc<DensePrimaryKeyCodec>,
554 dedup: bool,
555 sequence: Option<SequenceRange>,
556 mem_scan_metrics: Option<MemScanMetrics>,
557 ) -> Result<Self> {
558 let predicate = predicate
559 .map(|predicate| {
560 predicate
561 .exprs()
562 .iter()
563 .filter_map(SimpleFilterEvaluator::try_new)
564 .collect::<Vec<_>>()
565 })
566 .unwrap_or_default();
567 Ok(Self {
568 metadata,
569 series,
570 projection,
571 last_key: None,
572 predicate,
573 pk_schema,
574 pk_datatypes,
575 codec,
576 dedup,
577 sequence,
578 metrics: Metrics::default(),
579 mem_scan_metrics,
580 })
581 }
582
583 fn report_mem_scan_metrics(&mut self) {
584 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
585 let inner = crate::memtable::MemScanMetricsData {
586 total_series: self.metrics.total_series,
587 num_rows: self.metrics.num_rows,
588 num_batches: self.metrics.num_batches,
589 scan_cost: self.metrics.scan_cost,
590 };
591 mem_scan_metrics.merge_inner(&inner);
592 }
593 }
594}
595
596impl Drop for Iter {
597 fn drop(&mut self) {
598 debug!(
599 "Iter {} time series memtable, metrics: {:?}",
600 self.metadata.region_id, self.metrics
601 );
602
603 self.report_mem_scan_metrics();
605
606 READ_ROWS_TOTAL
607 .with_label_values(&["time_series_memtable"])
608 .inc_by(self.metrics.num_rows as u64);
609 READ_STAGE_ELAPSED
610 .with_label_values(&["scan_memtable"])
611 .observe(self.metrics.scan_cost.as_secs_f64());
612 }
613}
614
615impl Iterator for Iter {
616 type Item = Result<Batch>;
617
618 fn next(&mut self) -> Option<Self::Item> {
619 let start = Instant::now();
620 let map = self.series.read().unwrap();
621 let range = match &self.last_key {
622 None => map.0.range::<Vec<u8>, _>(..),
623 Some(last_key) => map
624 .0
625 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
626 };
627
628 for (primary_key, series) in range {
630 self.metrics.total_series += 1;
631
632 let mut series = series.write().unwrap();
633 if !self.predicate.is_empty()
634 && !prune_primary_key(
635 &self.codec,
636 primary_key.as_slice(),
637 &mut series,
638 &self.pk_datatypes,
639 self.pk_schema.clone(),
640 &self.predicate,
641 )
642 {
643 self.metrics.num_pruned_series += 1;
645 continue;
646 }
647 self.last_key = Some(primary_key.clone());
648
649 let values = series.compact(&self.metadata);
650 let batch = values.and_then(|v| {
651 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
652 });
653
654 self.metrics.num_batches += 1;
656 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
657 self.metrics.scan_cost += start.elapsed();
658
659 let mut batch = batch;
660 batch = batch.and_then(|mut batch| {
661 batch.filter_by_sequence(self.sequence)?;
662 Ok(batch)
663 });
664 return Some(batch);
665 }
666 drop(map); self.metrics.scan_cost += start.elapsed();
668
669 self.report_mem_scan_metrics();
671
672 None
673 }
674}
675
676fn prune_primary_key(
677 codec: &Arc<DensePrimaryKeyCodec>,
678 pk: &[u8],
679 series: &mut Series,
680 datatypes: &[ConcreteDataType],
681 pk_schema: arrow::datatypes::SchemaRef,
682 predicates: &[SimpleFilterEvaluator],
683) -> bool {
684 if pk_schema.fields().is_empty() {
686 return true;
687 }
688
689 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
691 pk_values
692 } else {
693 let pk_values = codec.decode_dense_without_column_id(pk);
694 if let Err(e) = pk_values {
695 error!(e; "Failed to decode primary key");
696 return true;
697 }
698 series.update_pk_cache(pk_values.unwrap());
699 series.pk_cache.as_ref().unwrap()
700 };
701
702 let mut result = true;
704 for predicate in predicates {
705 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
707 continue;
708 };
709 let scalar_value = pk_values[index]
711 .try_to_scalar_value(&datatypes[index])
712 .unwrap();
713 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
714 }
715
716 result
717}
718
719pub struct Series {
721 pk_cache: Option<Vec<Value>>,
722 active: ValueBuilder,
723 frozen: Vec<Values>,
724 region_metadata: RegionMetadataRef,
725 capacity: usize,
726}
727
728impl Series {
729 pub(crate) fn with_capacity(
730 region_metadata: &RegionMetadataRef,
731 init_capacity: usize,
732 capacity: usize,
733 ) -> Self {
734 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
735 Self {
736 pk_cache: None,
737 active: ValueBuilder::new(region_metadata, init_capacity),
738 frozen: vec![],
739 region_metadata: region_metadata.clone(),
740 capacity,
741 }
742 }
743
744 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
745 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
746 }
747
748 pub fn is_empty(&self) -> bool {
749 self.active.len() == 0 && self.frozen.is_empty()
750 }
751
752 pub(crate) fn push<'a>(
754 &mut self,
755 ts: ValueRef<'a>,
756 sequence: u64,
757 op_type: OpType,
758 values: impl Iterator<Item = ValueRef<'a>>,
759 ) -> usize {
760 if self.active.len() + 10 > self.capacity {
762 let region_metadata = self.region_metadata.clone();
763 self.freeze(®ion_metadata);
764 }
765 self.active.push(ts, sequence, op_type as u8, values)
766 }
767
768 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
769 self.pk_cache = Some(pk_values);
770 }
771
772 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
774 if self.active.len() != 0 {
775 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
776 std::mem::swap(&mut self.active, &mut builder);
777 self.frozen.push(Values::from(builder));
778 }
779 }
780
781 pub(crate) fn extend(
782 &mut self,
783 ts_v: VectorRef,
784 op_type_v: u8,
785 sequence_v: u64,
786 fields: Vec<VectorRef>,
787 ) -> Result<()> {
788 if !self.active.can_accommodate(&fields)? {
789 let region_metadata = self.region_metadata.clone();
790 self.freeze(®ion_metadata);
791 }
792 self.active.extend(ts_v, op_type_v, sequence_v, fields)
793 }
794
795 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
798 self.freeze(region_metadata);
799
800 let frozen = &self.frozen;
801
802 debug_assert!(!frozen.is_empty());
804
805 if frozen.len() > 1 {
806 let column_size = frozen[0].fields.len() + 3;
810
811 if cfg!(debug_assertions) {
812 debug_assert!(
813 frozen
814 .iter()
815 .zip(frozen.iter().skip(1))
816 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
817 );
818 }
819
820 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
821 let concatenated = (0..column_size)
822 .map(|i| {
823 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
824 arrow::compute::concat(&to_concat)
825 })
826 .collect::<std::result::Result<Vec<_>, _>>()
827 .context(ComputeArrowSnafu)?;
828
829 debug_assert_eq!(concatenated.len(), column_size);
830 let values = Values::from_columns(&concatenated)?;
831 self.frozen = vec![values];
832 };
833 Ok(&self.frozen[0])
834 }
835
836 pub fn read_to_values(&self) -> Vec<Values> {
837 let mut res = Vec::with_capacity(self.frozen.len() + 1);
838 res.extend(self.frozen.iter().cloned());
839 res.push(self.active.finish_cloned());
840 res
841 }
842}
843
844pub(crate) struct ValueBuilder {
846 timestamp: Vec<i64>,
847 timestamp_type: ConcreteDataType,
848 sequence: Vec<u64>,
849 op_type: Vec<u8>,
850 fields: Vec<Option<FieldBuilder>>,
851 field_types: Vec<ConcreteDataType>,
852}
853
854impl ValueBuilder {
855 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
856 let timestamp_type = region_metadata
857 .time_index_column()
858 .column_schema
859 .data_type
860 .clone();
861 let sequence = Vec::with_capacity(capacity);
862 let op_type = Vec::with_capacity(capacity);
863
864 let field_types = region_metadata
865 .field_columns()
866 .map(|c| c.column_schema.data_type.clone())
867 .collect::<Vec<_>>();
868 let fields = (0..field_types.len()).map(|_| None).collect();
869 Self {
870 timestamp: Vec::with_capacity(capacity),
871 timestamp_type,
872 sequence,
873 op_type,
874 fields,
875 field_types,
876 }
877 }
878
879 pub fn num_field_builders(&self) -> usize {
881 self.fields.iter().flatten().count()
882 }
883
884 pub(crate) fn push<'a>(
890 &mut self,
891 ts: ValueRef,
892 sequence: u64,
893 op_type: u8,
894 fields: impl Iterator<Item = ValueRef<'a>>,
895 ) -> usize {
896 #[cfg(debug_assertions)]
897 let fields = {
898 let field_vec = fields.collect::<Vec<_>>();
899 debug_assert_eq!(field_vec.len(), self.fields.len());
900 field_vec.into_iter()
901 };
902
903 self.timestamp
904 .push(ts.try_into_timestamp().unwrap().unwrap().value());
905 self.sequence.push(sequence);
906 self.op_type.push(op_type);
907 let num_rows = self.timestamp.len();
908 let mut size = 0;
909 for (idx, field_value) in fields.enumerate() {
910 size += field_value.data_size();
911 if !field_value.is_null() || self.fields[idx].is_some() {
912 if let Some(field) = self.fields[idx].as_mut() {
913 let _ = field.push(field_value);
914 } else {
915 let mut mutable_vector =
916 if let ConcreteDataType::String(_) = &self.field_types[idx] {
917 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
918 } else {
919 FieldBuilder::Other(
920 self.field_types[idx]
921 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
922 )
923 };
924 mutable_vector.push_nulls(num_rows - 1);
925 mutable_vector
926 .push(field_value)
927 .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
928 self.fields[idx] = Some(mutable_vector);
929 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
930 }
931 }
932 }
933
934 size
935 }
936
937 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
940 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
941 let Some(builder) = field_dest else {
942 continue;
943 };
944 let FieldBuilder::String(builder) = builder else {
945 continue;
946 };
947 let array = field_src.to_arrow_array();
948 let string_array = array
949 .as_any()
950 .downcast_ref::<StringArray>()
951 .with_context(|| error::InvalidBatchSnafu {
952 reason: format!(
953 "Field type mismatch, expecting String, given: {}",
954 field_src.data_type()
955 ),
956 })?;
957 let space_needed = string_array.value_data().len() as i32;
958 if builder.next_offset().checked_add(space_needed).is_none() {
960 return Ok(false);
961 }
962 }
963 Ok(true)
964 }
965
966 pub(crate) fn extend(
967 &mut self,
968 ts_v: VectorRef,
969 op_type: u8,
970 sequence: u64,
971 fields: Vec<VectorRef>,
972 ) -> Result<()> {
973 let num_rows_before = self.timestamp.len();
974 let num_rows_to_write = ts_v.len();
975 self.timestamp.reserve(num_rows_to_write);
976 match self.timestamp_type {
977 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
978 self.timestamp.extend(
979 ts_v.as_any()
980 .downcast_ref::<TimestampSecondVector>()
981 .unwrap()
982 .iter_data()
983 .map(|v| v.unwrap().0.value()),
984 );
985 }
986 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
987 self.timestamp.extend(
988 ts_v.as_any()
989 .downcast_ref::<TimestampMillisecondVector>()
990 .unwrap()
991 .iter_data()
992 .map(|v| v.unwrap().0.value()),
993 );
994 }
995 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
996 self.timestamp.extend(
997 ts_v.as_any()
998 .downcast_ref::<TimestampMicrosecondVector>()
999 .unwrap()
1000 .iter_data()
1001 .map(|v| v.unwrap().0.value()),
1002 );
1003 }
1004 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1005 self.timestamp.extend(
1006 ts_v.as_any()
1007 .downcast_ref::<TimestampNanosecondVector>()
1008 .unwrap()
1009 .iter_data()
1010 .map(|v| v.unwrap().0.value()),
1011 );
1012 }
1013 _ => unreachable!(),
1014 };
1015
1016 self.op_type.reserve(num_rows_to_write);
1017 self.op_type
1018 .extend(iter::repeat_n(op_type, num_rows_to_write));
1019 self.sequence.reserve(num_rows_to_write);
1020 self.sequence
1021 .extend(iter::repeat_n(sequence, num_rows_to_write));
1022
1023 for (field_idx, (field_src, field_dest)) in
1024 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1025 {
1026 let builder = field_dest.get_or_insert_with(|| {
1027 let mut field_builder =
1028 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1029 field_builder.push_nulls(num_rows_before);
1030 field_builder
1031 });
1032 match builder {
1033 FieldBuilder::String(builder) => {
1034 let array = field_src.to_arrow_array();
1035 let string_array =
1036 array
1037 .as_any()
1038 .downcast_ref::<StringArray>()
1039 .with_context(|| error::InvalidBatchSnafu {
1040 reason: format!(
1041 "Field type mismatch, expecting String, given: {}",
1042 field_src.data_type()
1043 ),
1044 })?;
1045 builder.append_array(string_array);
1046 }
1047 FieldBuilder::Other(builder) => {
1048 let len = field_src.len();
1049 builder
1050 .extend_slice_of(&*field_src, 0, len)
1051 .context(error::ComputeVectorSnafu)?;
1052 }
1053 }
1054 }
1055 Ok(())
1056 }
1057
1058 fn len(&self) -> usize {
1060 let sequence_len = self.sequence.len();
1061 debug_assert_eq!(sequence_len, self.op_type.len());
1062 debug_assert_eq!(sequence_len, self.timestamp.len());
1063 sequence_len
1064 }
1065
1066 fn finish_cloned(&self) -> Values {
1067 let num_rows = self.sequence.len();
1068 let fields = self
1069 .fields
1070 .iter()
1071 .enumerate()
1072 .map(|(i, v)| {
1073 if let Some(v) = v {
1074 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1075 v.finish_cloned()
1076 } else {
1077 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1078 single_null.push_nulls(num_rows);
1079 single_null.to_vector()
1080 }
1081 })
1082 .collect::<Vec<_>>();
1083
1084 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1085 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1086 let timestamp: VectorRef = match self.timestamp_type {
1087 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1088 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1089 }
1090 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1091 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1092 }
1093 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1094 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1095 }
1096 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1097 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1098 }
1099 _ => unreachable!(),
1100 };
1101
1102 if cfg!(debug_assertions) {
1103 debug_assert_eq!(timestamp.len(), sequence.len());
1104 debug_assert_eq!(timestamp.len(), op_type.len());
1105 for field in &fields {
1106 debug_assert_eq!(timestamp.len(), field.len());
1107 }
1108 }
1109
1110 Values {
1111 timestamp,
1112 sequence,
1113 op_type,
1114 fields,
1115 }
1116 }
1117}
1118
1119#[derive(Clone)]
1121pub struct Values {
1122 pub(crate) timestamp: VectorRef,
1123 pub(crate) sequence: Arc<UInt64Vector>,
1124 pub(crate) op_type: Arc<UInt8Vector>,
1125 pub(crate) fields: Vec<VectorRef>,
1126}
1127
1128impl Values {
1129 pub fn to_batch(
1132 &self,
1133 primary_key: &[u8],
1134 metadata: &RegionMetadataRef,
1135 projection: &HashSet<ColumnId>,
1136 dedup: bool,
1137 ) -> Result<Batch> {
1138 let builder = BatchBuilder::with_required_columns(
1139 primary_key.to_vec(),
1140 self.timestamp.clone(),
1141 self.sequence.clone(),
1142 self.op_type.clone(),
1143 );
1144
1145 let fields = metadata
1146 .field_columns()
1147 .zip(self.fields.iter())
1148 .filter_map(|(c, f)| {
1149 projection.get(&c.column_id).map(|c| BatchColumn {
1150 column_id: *c,
1151 data: f.clone(),
1152 })
1153 })
1154 .collect();
1155
1156 let mut batch = builder.with_fields(fields).build()?;
1157 batch.sort(dedup)?;
1158 Ok(batch)
1159 }
1160
1161 fn columns(&self) -> Vec<ArrayRef> {
1163 let mut res = Vec::with_capacity(3 + self.fields.len());
1164 res.push(self.timestamp.to_arrow_array());
1165 res.push(self.sequence.to_arrow_array());
1166 res.push(self.op_type.to_arrow_array());
1167 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1168 res
1169 }
1170
1171 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1173 debug_assert!(cols.len() >= 3);
1174 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1175 let sequence =
1176 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1177 let op_type =
1178 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1179 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1180
1181 Ok(Self {
1182 timestamp,
1183 sequence,
1184 op_type,
1185 fields,
1186 })
1187 }
1188}
1189
1190impl From<ValueBuilder> for Values {
1191 fn from(mut value: ValueBuilder) -> Self {
1192 let num_rows = value.len();
1193 let fields = value
1194 .fields
1195 .iter_mut()
1196 .enumerate()
1197 .map(|(i, v)| {
1198 if let Some(v) = v {
1199 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1200 v.finish()
1201 } else {
1202 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1203 single_null.push_nulls(num_rows);
1204 single_null.to_vector()
1205 }
1206 })
1207 .collect::<Vec<_>>();
1208
1209 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1210 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1211 let timestamp: VectorRef = match value.timestamp_type {
1212 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1213 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1214 }
1215 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1216 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1217 }
1218 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1219 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1220 }
1221 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1222 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1223 }
1224 _ => unreachable!(),
1225 };
1226
1227 if cfg!(debug_assertions) {
1228 debug_assert_eq!(timestamp.len(), sequence.len());
1229 debug_assert_eq!(timestamp.len(), op_type.len());
1230 for field in &fields {
1231 debug_assert_eq!(timestamp.len(), field.len());
1232 }
1233 }
1234
1235 Self {
1236 timestamp,
1237 sequence,
1238 op_type,
1239 fields,
1240 }
1241 }
1242}
1243
1244struct TimeSeriesIterBuilder {
1245 series_set: SeriesSet,
1246 projection: HashSet<ColumnId>,
1247 predicate: Option<Predicate>,
1248 dedup: bool,
1249 sequence: Option<SequenceRange>,
1250 merge_mode: MergeMode,
1251}
1252
1253impl IterBuilder for TimeSeriesIterBuilder {
1254 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1255 let iter = self.series_set.iter_series(
1256 self.projection.clone(),
1257 self.predicate.clone(),
1258 self.dedup,
1259 self.sequence,
1260 metrics,
1261 )?;
1262
1263 if self.merge_mode == MergeMode::LastNonNull {
1264 let iter = LastNonNullIter::new(iter);
1265 Ok(Box::new(iter))
1266 } else {
1267 Ok(Box::new(iter))
1268 }
1269 }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274 use std::collections::{HashMap, HashSet};
1275
1276 use api::helper::ColumnDataTypeWrapper;
1277 use api::v1::helper::row;
1278 use api::v1::value::ValueData;
1279 use api::v1::{Mutation, Rows, SemanticType};
1280 use common_time::Timestamp;
1281 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1282 use datatypes::schema::ColumnSchema;
1283 use datatypes::value::{OrderedFloat, Value};
1284 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1285 use mito_codec::row_converter::SortField;
1286 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1287 use store_api::storage::RegionId;
1288
1289 use super::*;
1290 use crate::test_util::column_metadata_to_column_schema;
1291
1292 fn schema_for_test() -> RegionMetadataRef {
1293 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1294 builder
1295 .push_column_metadata(ColumnMetadata {
1296 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1297 semantic_type: SemanticType::Tag,
1298 column_id: 0,
1299 })
1300 .push_column_metadata(ColumnMetadata {
1301 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1302 semantic_type: SemanticType::Tag,
1303 column_id: 1,
1304 })
1305 .push_column_metadata(ColumnMetadata {
1306 column_schema: ColumnSchema::new(
1307 "ts",
1308 ConcreteDataType::timestamp_millisecond_datatype(),
1309 false,
1310 ),
1311 semantic_type: SemanticType::Timestamp,
1312 column_id: 2,
1313 })
1314 .push_column_metadata(ColumnMetadata {
1315 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1316 semantic_type: SemanticType::Field,
1317 column_id: 3,
1318 })
1319 .push_column_metadata(ColumnMetadata {
1320 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1321 semantic_type: SemanticType::Field,
1322 column_id: 4,
1323 })
1324 .primary_key(vec![0, 1]);
1325 let region_metadata = builder.build().unwrap();
1326 Arc::new(region_metadata)
1327 }
1328
1329 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1330 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1331 }
1332
1333 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1334 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1335 }
1336
1337 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1338 let ts = values
1339 .timestamp
1340 .as_any()
1341 .downcast_ref::<TimestampMillisecondVector>()
1342 .unwrap();
1343
1344 let v0 = values.fields[0]
1345 .as_any()
1346 .downcast_ref::<Int64Vector>()
1347 .unwrap();
1348 let v1 = values.fields[1]
1349 .as_any()
1350 .downcast_ref::<Float64Vector>()
1351 .unwrap();
1352 let read = ts
1353 .iter_data()
1354 .zip(values.sequence.iter_data())
1355 .zip(values.op_type.iter_data())
1356 .zip(v0.iter_data())
1357 .zip(v1.iter_data())
1358 .map(|((((ts, sequence), op_type), v0), v1)| {
1359 (
1360 ts.unwrap().0.value(),
1361 sequence.unwrap(),
1362 op_type.unwrap(),
1363 v0.unwrap(),
1364 v1.unwrap(),
1365 )
1366 })
1367 .collect::<Vec<_>>();
1368 assert_eq!(expect, &read);
1369 }
1370
1371 #[test]
1372 fn test_series() {
1373 let region_metadata = schema_for_test();
1374 let mut series = Series::new(®ion_metadata);
1375 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1376 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1377 assert_eq!(2, series.active.timestamp.len());
1378 assert_eq!(0, series.frozen.len());
1379
1380 let values = series.compact(®ion_metadata).unwrap();
1381 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1382 assert_eq!(0, series.active.timestamp.len());
1383 assert_eq!(1, series.frozen.len());
1384 }
1385
1386 #[test]
1387 fn test_series_with_nulls() {
1388 let region_metadata = schema_for_test();
1389 let mut series = Series::new(®ion_metadata);
1390 series.push(
1393 ts_value_ref(1),
1394 0,
1395 OpType::Put,
1396 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1397 );
1398 series.push(
1399 ts_value_ref(1),
1400 0,
1401 OpType::Put,
1402 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1403 );
1404 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1405 series.push(
1406 ts_value_ref(1),
1407 3,
1408 OpType::Put,
1409 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1410 );
1411 assert_eq!(4, series.active.timestamp.len());
1412 assert_eq!(0, series.frozen.len());
1413
1414 let values = series.compact(®ion_metadata).unwrap();
1415 assert_eq!(values.fields[0].null_count(), 1);
1416 assert_eq!(values.fields[1].null_count(), 3);
1417 assert_eq!(0, series.active.timestamp.len());
1418 assert_eq!(1, series.frozen.len());
1419 }
1420
1421 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1422 let ts_len = batch.timestamps().len();
1423 assert_eq!(batch.sequences().len(), ts_len);
1424 assert_eq!(batch.op_types().len(), ts_len);
1425 for f in batch.fields() {
1426 assert_eq!(f.data.len(), ts_len);
1427 }
1428
1429 let mut rows = vec![];
1430 for idx in 0..ts_len {
1431 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1432 row.push(batch.timestamps().get(idx));
1433 row.push(batch.sequences().get(idx));
1434 row.push(batch.op_types().get(idx));
1435 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1436 rows.push(row);
1437 }
1438
1439 assert_eq!(expect.len(), rows.len());
1440 for (idx, row) in rows.iter().enumerate() {
1441 assert_eq!(&expect[idx], row);
1442 }
1443 }
1444
1445 #[test]
1446 fn test_values_sort() {
1447 let schema = schema_for_test();
1448 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1449 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1450 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1451
1452 let fields = vec![
1453 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1454 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1455 ];
1456 let values = Values {
1457 timestamp: timestamp as Arc<_>,
1458 sequence,
1459 op_type,
1460 fields,
1461 };
1462
1463 let batch = values
1464 .to_batch(
1465 b"test",
1466 &schema,
1467 &[0, 1, 2, 3, 4].into_iter().collect(),
1468 true,
1469 )
1470 .unwrap();
1471 check_value(
1472 &batch,
1473 vec![
1474 vec![
1475 Value::Timestamp(Timestamp::new_millisecond(1)),
1476 Value::UInt64(1),
1477 Value::UInt8(1),
1478 Value::Int64(4),
1479 Value::Float64(OrderedFloat(1.1)),
1480 ],
1481 vec![
1482 Value::Timestamp(Timestamp::new_millisecond(2)),
1483 Value::UInt64(1),
1484 Value::UInt8(1),
1485 Value::Int64(3),
1486 Value::Float64(OrderedFloat(2.1)),
1487 ],
1488 vec![
1489 Value::Timestamp(Timestamp::new_millisecond(3)),
1490 Value::UInt64(2),
1491 Value::UInt8(0),
1492 Value::Int64(2),
1493 Value::Float64(OrderedFloat(4.2)),
1494 ],
1495 vec![
1496 Value::Timestamp(Timestamp::new_millisecond(4)),
1497 Value::UInt64(1),
1498 Value::UInt8(1),
1499 Value::Int64(1),
1500 Value::Float64(OrderedFloat(3.3)),
1501 ],
1502 ],
1503 )
1504 }
1505
1506 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1507 let column_schema = schema
1508 .column_metadatas
1509 .iter()
1510 .map(|c| api::v1::ColumnSchema {
1511 column_name: c.column_schema.name.clone(),
1512 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1513 .unwrap()
1514 .datatype() as i32,
1515 semantic_type: c.semantic_type as i32,
1516 ..Default::default()
1517 })
1518 .collect();
1519
1520 let rows = (0..len)
1521 .map(|i| {
1522 row(vec![
1523 ValueData::StringValue(k0.clone()),
1524 ValueData::I64Value(k1),
1525 ValueData::TimestampMillisecondValue(i as i64),
1526 ValueData::I64Value(i as i64),
1527 ValueData::F64Value(i as f64),
1528 ])
1529 })
1530 .collect();
1531 let mutation = api::v1::Mutation {
1532 op_type: 1,
1533 sequence: 0,
1534 rows: Some(Rows {
1535 schema: column_schema,
1536 rows,
1537 }),
1538 write_hint: None,
1539 };
1540 KeyValues::new(schema.as_ref(), mutation).unwrap()
1541 }
1542
1543 #[test]
1544 fn test_series_set_concurrency() {
1545 let schema = schema_for_test();
1546 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1547 schema
1548 .primary_key_columns()
1549 .map(|c| {
1550 (
1551 c.column_id,
1552 SortField::new(c.column_schema.data_type.clone()),
1553 )
1554 })
1555 .collect(),
1556 ));
1557 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1558
1559 let concurrency = 32;
1560 let pk_num = concurrency * 2;
1561 let mut handles = Vec::with_capacity(concurrency);
1562 for i in 0..concurrency {
1563 let set = set.clone();
1564 let schema = schema.clone();
1565 let column_schemas = schema
1566 .column_metadatas
1567 .iter()
1568 .map(column_metadata_to_column_schema)
1569 .collect::<Vec<_>>();
1570 let handle = std::thread::spawn(move || {
1571 for j in i * 100..(i + 1) * 100 {
1572 let pk = j % pk_num;
1573 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1574
1575 let kvs = KeyValues::new(
1576 &schema,
1577 Mutation {
1578 op_type: OpType::Put as i32,
1579 sequence: j as u64,
1580 rows: Some(Rows {
1581 schema: column_schemas.clone(),
1582 rows: vec![row(vec![
1583 ValueData::StringValue(format!("{}", j)),
1584 ValueData::I64Value(j as i64),
1585 ValueData::TimestampMillisecondValue(j as i64),
1586 ValueData::I64Value(j as i64),
1587 ValueData::F64Value(j as f64),
1588 ])],
1589 }),
1590 write_hint: None,
1591 },
1592 )
1593 .unwrap();
1594 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1595 }
1596 });
1597 handles.push(handle);
1598 }
1599 for h in handles {
1600 h.join().unwrap();
1601 }
1602
1603 let mut timestamps = Vec::with_capacity(concurrency * 100);
1604 let mut sequences = Vec::with_capacity(concurrency * 100);
1605 let mut op_types = Vec::with_capacity(concurrency * 100);
1606 let mut v0 = Vec::with_capacity(concurrency * 100);
1607
1608 for i in 0..pk_num {
1609 let pk = format!("pk-{}", i).as_bytes().to_vec();
1610 let series = set.get_series(&pk).unwrap();
1611 let mut guard = series.write().unwrap();
1612 let values = guard.compact(&schema).unwrap();
1613 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1614 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1615 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1616 v0.extend(
1617 values
1618 .fields
1619 .first()
1620 .unwrap()
1621 .as_any()
1622 .downcast_ref::<Int64Vector>()
1623 .unwrap()
1624 .iter_data()
1625 .map(|v| v.unwrap()),
1626 );
1627 }
1628
1629 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1630 assert_eq!(
1631 expected_sequence,
1632 sequences.iter().copied().collect::<HashSet<_>>()
1633 );
1634
1635 op_types.iter().all(|op| *op == OpType::Put as u8);
1636 assert_eq!(
1637 expected_sequence,
1638 timestamps.iter().copied().collect::<HashSet<_>>()
1639 );
1640
1641 assert_eq!(timestamps, sequences);
1642 assert_eq!(v0, timestamps);
1643 }
1644
1645 #[test]
1646 fn test_memtable() {
1647 common_telemetry::init_default_ut_logging();
1648 check_memtable_dedup(true);
1649 check_memtable_dedup(false);
1650 }
1651
1652 fn check_memtable_dedup(dedup: bool) {
1653 let schema = schema_for_test();
1654 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1655 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1656 memtable.write(&kvs).unwrap();
1657 memtable.write(&kvs).unwrap();
1658
1659 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1660 for ts in kvs.iter().map(|kv| {
1661 kv.timestamp()
1662 .try_into_timestamp()
1663 .unwrap()
1664 .unwrap()
1665 .value()
1666 }) {
1667 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1668 }
1669
1670 let iter = memtable.iter(None, None, None).unwrap();
1671 let mut read = HashMap::new();
1672
1673 for ts in iter
1674 .flat_map(|batch| {
1675 batch
1676 .unwrap()
1677 .timestamps()
1678 .as_any()
1679 .downcast_ref::<TimestampMillisecondVector>()
1680 .unwrap()
1681 .iter_data()
1682 .collect::<Vec<_>>()
1683 .into_iter()
1684 })
1685 .map(|v| v.unwrap().0.value())
1686 {
1687 *read.entry(ts).or_default() += 1;
1688 }
1689 assert_eq!(expected_ts, read);
1690
1691 let stats = memtable.stats();
1692 assert!(stats.bytes_allocated() > 0);
1693 assert_eq!(
1694 Some((
1695 Timestamp::new_millisecond(0),
1696 Timestamp::new_millisecond(99)
1697 )),
1698 stats.time_range()
1699 );
1700 }
1701
1702 #[test]
1703 fn test_memtable_projection() {
1704 common_telemetry::init_default_ut_logging();
1705 let schema = schema_for_test();
1706 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1707 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1708 memtable.write(&kvs).unwrap();
1709
1710 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1711
1712 let mut v0_all = vec![];
1713
1714 for res in iter {
1715 let batch = res.unwrap();
1716 assert_eq!(1, batch.fields().len());
1717 let v0 = batch
1718 .fields()
1719 .first()
1720 .unwrap()
1721 .data
1722 .as_any()
1723 .downcast_ref::<Int64Vector>()
1724 .unwrap();
1725 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1726 }
1727 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1728 }
1729
1730 #[test]
1731 fn test_memtable_concurrent_write_read() {
1732 common_telemetry::init_default_ut_logging();
1733 let schema = schema_for_test();
1734 let memtable = Arc::new(TimeSeriesMemtable::new(
1735 schema.clone(),
1736 42,
1737 None,
1738 true,
1739 MergeMode::LastRow,
1740 ));
1741
1742 let num_writers = 10;
1744 let num_readers = 5;
1746 let series_per_writer = 100;
1748 let rows_per_series = 10;
1750 let total_series = num_writers * series_per_writer;
1752
1753 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1755
1756 let mut writer_handles = Vec::with_capacity(num_writers);
1758 for writer_id in 0..num_writers {
1759 let memtable = memtable.clone();
1760 let schema = schema.clone();
1761 let barrier = barrier.clone();
1762
1763 let handle = std::thread::spawn(move || {
1764 barrier.wait();
1766
1767 for series_id in 0..series_per_writer {
1769 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1770 let kvs =
1771 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1772 memtable.write(&kvs).unwrap();
1773 }
1774 });
1775
1776 writer_handles.push(handle);
1777 }
1778
1779 let mut reader_handles = Vec::with_capacity(num_readers);
1781 for _ in 0..num_readers {
1782 let memtable = memtable.clone();
1783 let barrier = barrier.clone();
1784
1785 let handle = std::thread::spawn(move || {
1786 barrier.wait();
1787
1788 for _ in 0..10 {
1789 let iter = memtable.iter(None, None, None).unwrap();
1790 for batch_result in iter {
1791 let _ = batch_result.unwrap();
1792 }
1793 }
1794 });
1795
1796 reader_handles.push(handle);
1797 }
1798
1799 barrier.wait();
1800
1801 for handle in writer_handles {
1802 handle.join().unwrap();
1803 }
1804 for handle in reader_handles {
1805 handle.join().unwrap();
1806 }
1807
1808 let iter = memtable.iter(None, None, None).unwrap();
1809 let mut series_count = 0;
1810 let mut row_count = 0;
1811
1812 for batch_result in iter {
1813 let batch = batch_result.unwrap();
1814 series_count += 1;
1815 row_count += batch.num_rows();
1816 }
1817 assert_eq!(total_series, series_count);
1818 assert_eq!(total_series * rows_per_series, row_count);
1819 }
1820}