1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
45 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
46 MemtableStats, PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55type ShardId = u32;
57type PkIndex = u16;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63 shard_id: ShardId,
64 pk_index: PkIndex,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74 pub index_max_keys_per_shard: usize,
76 pub data_freeze_threshold: usize,
78 #[serde(skip_deserializing)]
83 pub dedup: bool,
84 pub fork_dictionary_bytes: ReadableSize,
86 #[serde(skip_deserializing)]
88 pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92 fn default() -> Self {
93 let mut fork_dictionary_bytes = ReadableSize::mb(512);
94 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95 let adjust_dictionary_bytes =
96 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97 if adjust_dictionary_bytes.0 > 0 {
98 fork_dictionary_bytes = adjust_dictionary_bytes;
99 }
100 }
101
102 Self {
103 index_max_keys_per_shard: 8192,
104 data_freeze_threshold: 131072,
105 dedup: true,
106 fork_dictionary_bytes,
107 merge_mode: MergeMode::LastRow,
108 }
109 }
110}
111
112pub struct PartitionTreeMemtable {
114 id: MemtableId,
115 tree: Arc<PartitionTree>,
116 alloc_tracker: AllocTracker,
117 max_timestamp: AtomicI64,
118 min_timestamp: AtomicI64,
119 max_sequence: AtomicU64,
120 num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("PartitionTreeMemtable")
127 .field("id", &self.id)
128 .finish()
129 }
130}
131
132impl Memtable for PartitionTreeMemtable {
133 fn id(&self) -> MemtableId {
134 self.id
135 }
136
137 fn write(&self, kvs: &KeyValues) -> Result<()> {
138 if kvs.is_empty() {
139 return Ok(());
140 }
141
142 let mut metrics = WriteMetrics::default();
145 let mut pk_buffer = Vec::new();
146 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149 if res.is_ok() {
150 metrics.max_sequence = kvs.max_sequence();
151 metrics.num_rows = kvs.num_rows();
152 self.update_stats(&metrics);
153 }
154 res
155 }
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()> {
158 let mut metrics = WriteMetrics::default();
159 let mut pk_buffer = Vec::new();
160 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163 if res.is_ok() {
165 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166 metrics.num_rows = 1;
167 self.update_stats(&metrics);
168 }
169 res
170 }
171
172 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173 UnsupportedOperationSnafu {
174 err_msg: "PartitionTreeMemtable does not support write_bulk",
175 }
176 .fail()
177 }
178
179 #[cfg(any(test, feature = "test"))]
180 fn iter(
181 &self,
182 projection: Option<&[ColumnId]>,
183 predicate: Option<Predicate>,
184 sequence: Option<SequenceNumber>,
185 ) -> Result<BoxedBatchIterator> {
186 self.tree.read(projection, predicate, sequence, None)
187 }
188
189 fn ranges(
190 &self,
191 projection: Option<&[ColumnId]>,
192 predicate: PredicateGroup,
193 sequence: Option<SequenceNumber>,
194 ) -> Result<MemtableRanges> {
195 let projection = projection.map(|ids| ids.to_vec());
196 let builder = Box::new(PartitionTreeIterBuilder {
197 tree: self.tree.clone(),
198 projection,
199 predicate: predicate.predicate().cloned(),
200 sequence,
201 });
202 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
203
204 let stats = self.stats();
205 Ok(MemtableRanges {
206 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
207 stats,
208 })
209 }
210
211 fn is_empty(&self) -> bool {
212 self.tree.is_empty()
213 }
214
215 fn freeze(&self) -> Result<()> {
216 self.alloc_tracker.done_allocating();
217
218 self.tree.freeze()
219 }
220
221 fn stats(&self) -> MemtableStats {
222 let estimated_bytes = self.alloc_tracker.bytes_allocated();
223
224 if estimated_bytes == 0 {
225 return MemtableStats {
227 estimated_bytes,
228 time_range: None,
229 num_rows: 0,
230 num_ranges: 0,
231 max_sequence: 0,
232 series_count: 0,
233 };
234 }
235
236 let ts_type = self
237 .tree
238 .metadata
239 .time_index_column()
240 .column_schema
241 .data_type
242 .clone()
243 .as_timestamp()
244 .expect("Timestamp column must have timestamp type");
245 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
246 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
247 let series_count = self.tree.series_count();
248 MemtableStats {
249 estimated_bytes,
250 time_range: Some((min_timestamp, max_timestamp)),
251 num_rows: self.num_rows.load(Ordering::Relaxed),
252 num_ranges: 1,
253 max_sequence: self.max_sequence.load(Ordering::Relaxed),
254 series_count,
255 }
256 }
257
258 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
259 let tree = self.tree.fork(metadata.clone());
260
261 let memtable = PartitionTreeMemtable::with_tree(id, tree);
262 Arc::new(memtable)
263 }
264}
265
266impl PartitionTreeMemtable {
267 pub fn new(
269 id: MemtableId,
270 row_codec: Arc<dyn PrimaryKeyCodec>,
271 metadata: RegionMetadataRef,
272 write_buffer_manager: Option<WriteBufferManagerRef>,
273 config: &PartitionTreeConfig,
274 ) -> Self {
275 Self::with_tree(
276 id,
277 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
278 )
279 }
280
281 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
285 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
286
287 Self {
288 id,
289 tree: Arc::new(tree),
290 alloc_tracker,
291 max_timestamp: AtomicI64::new(i64::MIN),
292 min_timestamp: AtomicI64::new(i64::MAX),
293 num_rows: AtomicUsize::new(0),
294 max_sequence: AtomicU64::new(0),
295 }
296 }
297
298 fn update_stats(&self, metrics: &WriteMetrics) {
300 self.alloc_tracker.on_allocation(metrics.value_bytes);
302 self.max_timestamp
303 .fetch_max(metrics.max_ts, Ordering::SeqCst);
304 self.min_timestamp
305 .fetch_min(metrics.min_ts, Ordering::SeqCst);
306 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
307 self.max_sequence
308 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
309 }
310
311 #[cfg(any(test, feature = "test"))]
312 pub fn iter(
313 &self,
314 projection: Option<&[ColumnId]>,
315 predicate: Option<Predicate>,
316 sequence: Option<SequenceNumber>,
317 ) -> Result<BoxedBatchIterator> {
318 self.tree.read(projection, predicate, sequence, None)
319 }
320}
321
322#[derive(Debug, Default)]
324pub struct PartitionTreeMemtableBuilder {
325 config: PartitionTreeConfig,
326 write_buffer_manager: Option<WriteBufferManagerRef>,
327}
328
329impl PartitionTreeMemtableBuilder {
330 pub fn new(
332 config: PartitionTreeConfig,
333 write_buffer_manager: Option<WriteBufferManagerRef>,
334 ) -> Self {
335 Self {
336 config,
337 write_buffer_manager,
338 }
339 }
340}
341
342impl MemtableBuilder for PartitionTreeMemtableBuilder {
343 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
344 let codec = build_primary_key_codec(metadata);
345 Arc::new(PartitionTreeMemtable::new(
346 id,
347 codec,
348 metadata.clone(),
349 self.write_buffer_manager.clone(),
350 &self.config,
351 ))
352 }
353
354 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
355 false
356 }
357}
358
359struct PartitionTreeIterBuilder {
360 tree: Arc<PartitionTree>,
361 projection: Option<Vec<ColumnId>>,
362 predicate: Option<Predicate>,
363 sequence: Option<SequenceNumber>,
364}
365
366impl IterBuilder for PartitionTreeIterBuilder {
367 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
368 self.tree.read(
369 self.projection.as_deref(),
370 self.predicate.clone(),
371 self.sequence,
372 metrics,
373 )
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use std::collections::HashMap;
380 use std::sync::Arc;
381
382 use api::v1::value::ValueData;
383 use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
384 use common_time::Timestamp;
385 use datafusion_common::{Column, ScalarValue};
386 use datafusion_expr::{BinaryExpr, Expr, Operator};
387 use datatypes::data_type::ConcreteDataType;
388 use datatypes::prelude::Vector;
389 use datatypes::scalars::ScalarVector;
390 use datatypes::schema::ColumnSchema;
391 use datatypes::value::Value;
392 use datatypes::vectors::{Int64Vector, StringVector};
393 use mito_codec::row_converter::DensePrimaryKeyCodec;
394 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
395 use store_api::storage::RegionId;
396
397 use super::*;
398 use crate::test_util::memtable_util::{
399 self, collect_iter_timestamps, region_metadata_to_row_schema,
400 };
401
402 #[test]
403 fn test_memtable_sorted_input() {
404 write_iter_sorted_input(true);
405 write_iter_sorted_input(false);
406 }
407
408 fn write_iter_sorted_input(has_pk: bool) {
409 let metadata = if has_pk {
410 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
411 } else {
412 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
413 };
414 let timestamps = (0..100).collect::<Vec<_>>();
415 let kvs =
416 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
417 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
418 let memtable = PartitionTreeMemtable::new(
419 1,
420 codec,
421 metadata.clone(),
422 None,
423 &PartitionTreeConfig::default(),
424 );
425 memtable.write(&kvs).unwrap();
426
427 let expected_ts = kvs
428 .iter()
429 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
430 .collect::<Vec<_>>();
431
432 let iter = memtable.iter(None, None, None).unwrap();
433 let read = collect_iter_timestamps(iter);
434 assert_eq!(expected_ts, read);
435
436 let stats = memtable.stats();
437 assert!(stats.bytes_allocated() > 0);
438 assert_eq!(
439 Some((
440 Timestamp::new_millisecond(0),
441 Timestamp::new_millisecond(99)
442 )),
443 stats.time_range()
444 );
445 }
446
447 #[test]
448 fn test_memtable_unsorted_input() {
449 write_iter_unsorted_input(true);
450 write_iter_unsorted_input(false);
451 }
452
453 fn write_iter_unsorted_input(has_pk: bool) {
454 let metadata = if has_pk {
455 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
456 } else {
457 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
458 };
459 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
460 let memtable = PartitionTreeMemtable::new(
461 1,
462 codec,
463 metadata.clone(),
464 None,
465 &PartitionTreeConfig::default(),
466 );
467
468 let kvs = memtable_util::build_key_values(
469 &metadata,
470 "hello".to_string(),
471 0,
472 &[1, 3, 7, 5, 6],
473 0, );
475 memtable.write(&kvs).unwrap();
476
477 let kvs = memtable_util::build_key_values(
478 &metadata,
479 "hello".to_string(),
480 0,
481 &[5, 2, 4, 0, 7],
482 5, );
484 memtable.write(&kvs).unwrap();
485
486 let iter = memtable.iter(None, None, None).unwrap();
487 let read = collect_iter_timestamps(iter);
488 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
489
490 let iter = memtable.iter(None, None, None).unwrap();
491 let read = iter
492 .flat_map(|batch| {
493 batch
494 .unwrap()
495 .sequences()
496 .iter_data()
497 .collect::<Vec<_>>()
498 .into_iter()
499 })
500 .map(|v| v.unwrap())
501 .collect::<Vec<_>>();
502 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
503
504 let stats = memtable.stats();
505 assert!(stats.bytes_allocated() > 0);
506 assert_eq!(
507 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
508 stats.time_range()
509 );
510 }
511
512 #[test]
513 fn test_memtable_projection() {
514 write_iter_projection(true);
515 write_iter_projection(false);
516 }
517
518 fn write_iter_projection(has_pk: bool) {
519 let metadata = if has_pk {
520 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
521 } else {
522 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
523 };
524 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
526 .build(1, &metadata);
527
528 let expect = (0..100).collect::<Vec<_>>();
529 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
530 memtable.write(&kvs).unwrap();
531 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
532
533 let mut v0_all = vec![];
534 for res in iter {
535 let batch = res.unwrap();
536 assert_eq!(1, batch.fields().len());
537 let v0 = batch
538 .fields()
539 .first()
540 .unwrap()
541 .data
542 .as_any()
543 .downcast_ref::<Int64Vector>()
544 .unwrap();
545 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
546 }
547 assert_eq!(expect, v0_all);
548 }
549
550 #[test]
551 fn test_write_iter_multi_keys() {
552 write_iter_multi_keys(1, 100);
553 write_iter_multi_keys(2, 100);
554 write_iter_multi_keys(4, 100);
555 write_iter_multi_keys(8, 5);
556 write_iter_multi_keys(2, 10);
557 }
558
559 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
560 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
561 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
562 let memtable = PartitionTreeMemtable::new(
563 1,
564 codec,
565 metadata.clone(),
566 None,
567 &PartitionTreeConfig {
568 index_max_keys_per_shard: max_keys,
569 data_freeze_threshold: freeze_threshold,
570 ..Default::default()
571 },
572 );
573
574 let mut data = Vec::new();
575 for i in 0..4 {
577 for j in 0..4 {
578 let timestamps = [11, 13, 1, 5, 3, 7, 9];
580 let key = format!("a{j}");
581 let kvs =
582 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
583 memtable.write(&kvs).unwrap();
584 for ts in timestamps {
585 data.push((i, key.clone(), ts));
586 }
587 }
588 for j in 0..4 {
589 let timestamps = [10, 2, 4, 8, 6];
591 let key = format!("a{j}");
592 let kvs =
593 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
594 memtable.write(&kvs).unwrap();
595 for ts in timestamps {
596 data.push((i, key.clone(), ts));
597 }
598 }
599 }
600 data.sort_unstable();
601
602 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
603 let iter = memtable.iter(None, None, None).unwrap();
604 let read = collect_iter_timestamps(iter);
605 assert_eq!(expect, read);
606 }
607
608 #[test]
609 fn test_memtable_filter() {
610 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
611 let memtable = PartitionTreeMemtableBuilder::new(
613 PartitionTreeConfig {
614 index_max_keys_per_shard: 40,
615 ..Default::default()
616 },
617 None,
618 )
619 .build(1, &metadata);
620
621 for i in 0..100 {
622 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
623 let kvs =
624 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
625 memtable.write(&kvs).unwrap();
626 }
627
628 for i in 0..100 {
629 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
630 let expr = Expr::BinaryExpr(BinaryExpr {
631 left: Box::new(Expr::Column(Column::from_name("k1"))),
632 op: Operator::Eq,
633 right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
634 });
635 let iter = memtable
636 .iter(None, Some(Predicate::new(vec![expr])), None)
637 .unwrap();
638 let read = collect_iter_timestamps(iter);
639 assert_eq!(timestamps, read);
640 }
641 }
642
643 #[test]
644 fn test_deserialize_config() {
645 let config = PartitionTreeConfig {
646 dedup: false,
647 ..Default::default()
648 };
649 let json = serde_json::to_string(&config).unwrap();
651 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
652 assert!(config.dedup);
653 assert_eq!(PartitionTreeConfig::default(), config);
654 }
655
656 fn metadata_for_metric_engine() -> RegionMetadataRef {
657 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
658 builder
659 .push_column_metadata(ColumnMetadata {
660 column_schema: ColumnSchema::new(
661 "__table_id",
662 ConcreteDataType::uint32_datatype(),
663 false,
664 ),
665 semantic_type: SemanticType::Tag,
666 column_id: 2147483652,
667 })
668 .push_column_metadata(ColumnMetadata {
669 column_schema: ColumnSchema::new(
670 "__tsid",
671 ConcreteDataType::uint64_datatype(),
672 false,
673 ),
674 semantic_type: SemanticType::Tag,
675 column_id: 2147483651,
676 })
677 .push_column_metadata(ColumnMetadata {
678 column_schema: ColumnSchema::new(
679 "test_label",
680 ConcreteDataType::string_datatype(),
681 false,
682 ),
683 semantic_type: SemanticType::Tag,
684 column_id: 2,
685 })
686 .push_column_metadata(ColumnMetadata {
687 column_schema: ColumnSchema::new(
688 "greptime_timestamp",
689 ConcreteDataType::timestamp_millisecond_datatype(),
690 false,
691 ),
692 semantic_type: SemanticType::Timestamp,
693 column_id: 0,
694 })
695 .push_column_metadata(ColumnMetadata {
696 column_schema: ColumnSchema::new(
697 "greptime_value",
698 ConcreteDataType::float64_datatype(),
699 true,
700 ),
701 semantic_type: SemanticType::Field,
702 column_id: 1,
703 })
704 .primary_key(vec![2147483652, 2147483651, 2]);
705 let region_metadata = builder.build().unwrap();
706 Arc::new(region_metadata)
707 }
708
709 fn build_key_values(
710 metadata: RegionMetadataRef,
711 labels: &[&str],
712 table_id: &[u32],
713 ts_id: &[u64],
714 ts: &[i64],
715 values: &[f64],
716 sequence: u64,
717 ) -> KeyValues {
718 let column_schema = region_metadata_to_row_schema(&metadata);
719
720 let rows = ts
721 .iter()
722 .zip(table_id.iter())
723 .zip(ts_id.iter())
724 .zip(labels.iter())
725 .zip(values.iter())
726 .map(|((((ts, table_id), ts_id), label), val)| Row {
727 values: vec![
728 api::v1::Value {
729 value_data: Some(ValueData::U32Value(*table_id)),
730 },
731 api::v1::Value {
732 value_data: Some(ValueData::U64Value(*ts_id)),
733 },
734 api::v1::Value {
735 value_data: Some(ValueData::StringValue(label.to_string())),
736 },
737 api::v1::Value {
738 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
739 },
740 api::v1::Value {
741 value_data: Some(ValueData::F64Value(*val)),
742 },
743 ],
744 })
745 .collect();
746 let mutation = api::v1::Mutation {
747 op_type: 1,
748 sequence,
749 rows: Some(Rows {
750 schema: column_schema,
751 rows,
752 }),
753 write_hint: None,
754 };
755 KeyValues::new(metadata.as_ref(), mutation).unwrap()
756 }
757
758 #[test]
759 fn test_write_freeze() {
760 let metadata = metadata_for_metric_engine();
761 let memtable = PartitionTreeMemtableBuilder::new(
762 PartitionTreeConfig {
763 index_max_keys_per_shard: 40,
764 ..Default::default()
765 },
766 None,
767 )
768 .build(1, &metadata);
769
770 let codec = DensePrimaryKeyCodec::new(&metadata);
771
772 memtable
773 .write(&build_key_values(
774 metadata.clone(),
775 &["daily", "10min", "daily", "10min"],
776 &[1025, 1025, 1025, 1025],
777 &[
778 16442255374049317291,
779 5686004715529701024,
780 16442255374049317291,
781 5686004715529701024,
782 ],
783 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
784 &[0.0, 0.0, 0.0, 0.0],
785 1,
786 ))
787 .unwrap();
788
789 memtable.freeze().unwrap();
790 let new_memtable = memtable.fork(2, &metadata);
791
792 new_memtable
793 .write(&build_key_values(
794 metadata.clone(),
795 &["10min"],
796 &[1025],
797 &[5686004715529701024],
798 &[1714643131000],
799 &[0.1],
800 2,
801 ))
802 .unwrap();
803
804 let mut reader = new_memtable.iter(None, None, None).unwrap();
805 let batch = reader.next().unwrap().unwrap();
806 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
807 if let Value::String(s) = &pk[2] {
808 assert_eq!("10min", s.as_utf8());
809 } else {
810 unreachable!()
811 }
812 }
813
814 fn kv_region_metadata() -> RegionMetadataRef {
815 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
816 builder
817 .push_column_metadata(ColumnMetadata {
818 column_schema: ColumnSchema::new(
819 "ts",
820 ConcreteDataType::timestamp_millisecond_datatype(),
821 false,
822 ),
823 semantic_type: SemanticType::Timestamp,
824 column_id: 0,
825 })
826 .push_column_metadata(ColumnMetadata {
827 column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
828 semantic_type: SemanticType::Tag,
829 column_id: 1,
830 })
831 .push_column_metadata(ColumnMetadata {
832 column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
833 semantic_type: SemanticType::Field,
834 column_id: 2,
835 })
836 .primary_key(vec![1]);
837 let region_metadata = builder.build().unwrap();
838 Arc::new(region_metadata)
839 }
840
841 fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
842 vec![
843 api::v1::ColumnSchema {
844 column_name: "ts".to_string(),
845 datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
846 semantic_type: SemanticType::Timestamp as i32,
847 ..Default::default()
848 },
849 api::v1::ColumnSchema {
850 column_name: "k".to_string(),
851 datatype: api::v1::ColumnDataType::String as i32,
852 semantic_type: SemanticType::Tag as i32,
853 ..Default::default()
854 },
855 api::v1::ColumnSchema {
856 column_name: "v".to_string(),
857 datatype: api::v1::ColumnDataType::String as i32,
858 semantic_type: SemanticType::Field as i32,
859 ..Default::default()
860 },
861 ]
862 }
863
864 fn key_values<T: AsRef<str>>(
865 metadata: &RegionMetadataRef,
866 keys: impl Iterator<Item = T>,
867 ) -> KeyValues {
868 let rows = keys
869 .map(|c| Row {
870 values: vec![
871 api::v1::Value {
872 value_data: Some(ValueData::TimestampMillisecondValue(0)),
873 },
874 api::v1::Value {
875 value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
876 },
877 api::v1::Value {
878 value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
879 },
880 ],
881 })
882 .collect();
883 let mutation = Mutation {
884 op_type: OpType::Put as i32,
885 sequence: 0,
886 rows: Some(Rows {
887 schema: kv_column_schemas(),
888 rows,
889 }),
890 write_hint: None,
891 };
892 KeyValues::new(metadata, mutation).unwrap()
893 }
894
895 fn collect_kvs(
896 iter: BoxedBatchIterator,
897 region_meta: &RegionMetadataRef,
898 ) -> HashMap<String, String> {
899 let decoder = DensePrimaryKeyCodec::new(region_meta);
900 let mut res = HashMap::new();
901 for v in iter {
902 let batch = v.unwrap();
903 let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
904 let field_vector = batch.fields()[0]
905 .data
906 .as_any()
907 .downcast_ref::<StringVector>()
908 .unwrap();
909 for row in 0..batch.num_rows() {
910 res.insert(
911 values[0].as_string().unwrap(),
912 field_vector.get(row).as_string().unwrap(),
913 );
914 }
915 }
916 res
917 }
918
919 #[test]
920 fn test_reorder_insert_key_values() {
921 let metadata = kv_region_metadata();
922 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
923 .build(1, &metadata);
924
925 memtable
926 .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
927 .unwrap();
928 memtable.freeze().unwrap();
929 assert_eq!(
930 collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
931 ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
932 );
933 let forked = memtable.fork(2, &metadata);
934
935 let keys = ["c", "f", "i", "h", "b", "e", "g"];
936 forked.write(&key_values(&metadata, keys.iter())).unwrap();
937 forked.freeze().unwrap();
938 assert_eq!(
939 collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
940 keys.iter()
941 .map(|c| (c.to_string(), c.to_string()))
942 .collect()
943 );
944
945 let forked2 = forked.fork(3, &metadata);
946
947 let keys = ["g", "e", "a", "f", "b", "c", "h"];
948 forked2.write(&key_values(&metadata, keys.iter())).unwrap();
949
950 let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
951 let expected = keys
952 .iter()
953 .map(|c| (c.to_string(), c.to_string()))
954 .collect::<HashMap<_, _>>();
955 assert_eq!(kvs, expected);
956 }
957}