1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46 PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55type ShardId = u32;
57type PkIndex = u16;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63 shard_id: ShardId,
64 pk_index: PkIndex,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74 pub index_max_keys_per_shard: usize,
76 pub data_freeze_threshold: usize,
78 #[serde(skip_deserializing)]
83 pub dedup: bool,
84 pub fork_dictionary_bytes: ReadableSize,
86 #[serde(skip_deserializing)]
88 pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92 fn default() -> Self {
93 let mut fork_dictionary_bytes = ReadableSize::mb(512);
94 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95 let adjust_dictionary_bytes =
96 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97 if adjust_dictionary_bytes.0 > 0 {
98 fork_dictionary_bytes = adjust_dictionary_bytes;
99 }
100 }
101
102 Self {
103 index_max_keys_per_shard: 8192,
104 data_freeze_threshold: 131072,
105 dedup: true,
106 fork_dictionary_bytes,
107 merge_mode: MergeMode::LastRow,
108 }
109 }
110}
111
112pub struct PartitionTreeMemtable {
114 id: MemtableId,
115 tree: Arc<PartitionTree>,
116 alloc_tracker: AllocTracker,
117 max_timestamp: AtomicI64,
118 min_timestamp: AtomicI64,
119 max_sequence: AtomicU64,
120 num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("PartitionTreeMemtable")
127 .field("id", &self.id)
128 .finish()
129 }
130}
131
132impl Memtable for PartitionTreeMemtable {
133 fn id(&self) -> MemtableId {
134 self.id
135 }
136
137 fn write(&self, kvs: &KeyValues) -> Result<()> {
138 if kvs.is_empty() {
139 return Ok(());
140 }
141
142 let mut metrics = WriteMetrics::default();
145 let mut pk_buffer = Vec::new();
146 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149 if res.is_ok() {
150 metrics.max_sequence = kvs.max_sequence();
151 metrics.num_rows = kvs.num_rows();
152 self.update_stats(&metrics);
153 }
154 res
155 }
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()> {
158 let mut metrics = WriteMetrics::default();
159 let mut pk_buffer = Vec::new();
160 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163 if res.is_ok() {
165 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166 metrics.num_rows = 1;
167 self.update_stats(&metrics);
168 }
169 res
170 }
171
172 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173 UnsupportedOperationSnafu {
174 err_msg: "PartitionTreeMemtable does not support write_bulk",
175 }
176 .fail()
177 }
178
179 fn iter(
180 &self,
181 projection: Option<&[ColumnId]>,
182 predicate: Option<Predicate>,
183 sequence: Option<SequenceNumber>,
184 ) -> Result<BoxedBatchIterator> {
185 self.tree.read(projection, predicate, sequence)
186 }
187
188 fn ranges(
189 &self,
190 projection: Option<&[ColumnId]>,
191 predicate: PredicateGroup,
192 sequence: Option<SequenceNumber>,
193 ) -> Result<MemtableRanges> {
194 let projection = projection.map(|ids| ids.to_vec());
195 let builder = Box::new(PartitionTreeIterBuilder {
196 tree: self.tree.clone(),
197 projection,
198 predicate: predicate.predicate().cloned(),
199 sequence,
200 });
201 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
202
203 Ok(MemtableRanges {
204 ranges: [(0, MemtableRange::new(context))].into(),
205 stats: self.stats(),
206 })
207 }
208
209 fn is_empty(&self) -> bool {
210 self.tree.is_empty()
211 }
212
213 fn freeze(&self) -> Result<()> {
214 self.alloc_tracker.done_allocating();
215
216 self.tree.freeze()
217 }
218
219 fn stats(&self) -> MemtableStats {
220 let estimated_bytes = self.alloc_tracker.bytes_allocated();
221
222 if estimated_bytes == 0 {
223 return MemtableStats {
225 estimated_bytes,
226 time_range: None,
227 num_rows: 0,
228 num_ranges: 0,
229 max_sequence: 0,
230 series_count: 0,
231 };
232 }
233
234 let ts_type = self
235 .tree
236 .metadata
237 .time_index_column()
238 .column_schema
239 .data_type
240 .clone()
241 .as_timestamp()
242 .expect("Timestamp column must have timestamp type");
243 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
244 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
245 let series_count = self.tree.series_count();
246 MemtableStats {
247 estimated_bytes,
248 time_range: Some((min_timestamp, max_timestamp)),
249 num_rows: self.num_rows.load(Ordering::Relaxed),
250 num_ranges: 1,
251 max_sequence: self.max_sequence.load(Ordering::Relaxed),
252 series_count,
253 }
254 }
255
256 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
257 let tree = self.tree.fork(metadata.clone());
258
259 let memtable = PartitionTreeMemtable::with_tree(id, tree);
260 Arc::new(memtable)
261 }
262}
263
264impl PartitionTreeMemtable {
265 pub fn new(
267 id: MemtableId,
268 row_codec: Arc<dyn PrimaryKeyCodec>,
269 metadata: RegionMetadataRef,
270 write_buffer_manager: Option<WriteBufferManagerRef>,
271 config: &PartitionTreeConfig,
272 ) -> Self {
273 Self::with_tree(
274 id,
275 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
276 )
277 }
278
279 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
283 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
284
285 Self {
286 id,
287 tree: Arc::new(tree),
288 alloc_tracker,
289 max_timestamp: AtomicI64::new(i64::MIN),
290 min_timestamp: AtomicI64::new(i64::MAX),
291 num_rows: AtomicUsize::new(0),
292 max_sequence: AtomicU64::new(0),
293 }
294 }
295
296 fn update_stats(&self, metrics: &WriteMetrics) {
298 self.alloc_tracker.on_allocation(metrics.value_bytes);
300 self.max_timestamp
301 .fetch_max(metrics.max_ts, Ordering::SeqCst);
302 self.min_timestamp
303 .fetch_min(metrics.min_ts, Ordering::SeqCst);
304 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
305 self.max_sequence
306 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
307 }
308}
309
310#[derive(Debug, Default)]
312pub struct PartitionTreeMemtableBuilder {
313 config: PartitionTreeConfig,
314 write_buffer_manager: Option<WriteBufferManagerRef>,
315}
316
317impl PartitionTreeMemtableBuilder {
318 pub fn new(
320 config: PartitionTreeConfig,
321 write_buffer_manager: Option<WriteBufferManagerRef>,
322 ) -> Self {
323 Self {
324 config,
325 write_buffer_manager,
326 }
327 }
328}
329
330impl MemtableBuilder for PartitionTreeMemtableBuilder {
331 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
332 let codec = build_primary_key_codec(metadata);
333 Arc::new(PartitionTreeMemtable::new(
334 id,
335 codec,
336 metadata.clone(),
337 self.write_buffer_manager.clone(),
338 &self.config,
339 ))
340 }
341}
342
343struct PartitionTreeIterBuilder {
344 tree: Arc<PartitionTree>,
345 projection: Option<Vec<ColumnId>>,
346 predicate: Option<Predicate>,
347 sequence: Option<SequenceNumber>,
348}
349
350impl IterBuilder for PartitionTreeIterBuilder {
351 fn build(&self) -> Result<BoxedBatchIterator> {
352 self.tree.read(
353 self.projection.as_deref(),
354 self.predicate.clone(),
355 self.sequence,
356 )
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use std::collections::HashMap;
363
364 use api::v1::value::ValueData;
365 use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
366 use common_time::Timestamp;
367 use datafusion_common::{Column, ScalarValue};
368 use datafusion_expr::{BinaryExpr, Expr, Operator};
369 use datatypes::data_type::ConcreteDataType;
370 use datatypes::prelude::Vector;
371 use datatypes::scalars::ScalarVector;
372 use datatypes::schema::ColumnSchema;
373 use datatypes::value::Value;
374 use datatypes::vectors::{Int64Vector, StringVector};
375 use mito_codec::row_converter::DensePrimaryKeyCodec;
376 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
377 use store_api::storage::RegionId;
378
379 use super::*;
380 use crate::test_util::memtable_util::{
381 self, collect_iter_timestamps, region_metadata_to_row_schema,
382 };
383
384 #[test]
385 fn test_memtable_sorted_input() {
386 write_iter_sorted_input(true);
387 write_iter_sorted_input(false);
388 }
389
390 fn write_iter_sorted_input(has_pk: bool) {
391 let metadata = if has_pk {
392 memtable_util::metadata_with_primary_key(vec![1, 0], true)
393 } else {
394 memtable_util::metadata_with_primary_key(vec![], false)
395 };
396 let timestamps = (0..100).collect::<Vec<_>>();
397 let kvs =
398 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
399 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
400 let memtable = PartitionTreeMemtable::new(
401 1,
402 codec,
403 metadata.clone(),
404 None,
405 &PartitionTreeConfig::default(),
406 );
407 memtable.write(&kvs).unwrap();
408
409 let expected_ts = kvs
410 .iter()
411 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
412 .collect::<Vec<_>>();
413
414 let iter = memtable.iter(None, None, None).unwrap();
415 let read = collect_iter_timestamps(iter);
416 assert_eq!(expected_ts, read);
417
418 let stats = memtable.stats();
419 assert!(stats.bytes_allocated() > 0);
420 assert_eq!(
421 Some((
422 Timestamp::new_millisecond(0),
423 Timestamp::new_millisecond(99)
424 )),
425 stats.time_range()
426 );
427 }
428
429 #[test]
430 fn test_memtable_unsorted_input() {
431 write_iter_unsorted_input(true);
432 write_iter_unsorted_input(false);
433 }
434
435 fn write_iter_unsorted_input(has_pk: bool) {
436 let metadata = if has_pk {
437 memtable_util::metadata_with_primary_key(vec![1, 0], true)
438 } else {
439 memtable_util::metadata_with_primary_key(vec![], false)
440 };
441 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
442 let memtable = PartitionTreeMemtable::new(
443 1,
444 codec,
445 metadata.clone(),
446 None,
447 &PartitionTreeConfig::default(),
448 );
449
450 let kvs = memtable_util::build_key_values(
451 &metadata,
452 "hello".to_string(),
453 0,
454 &[1, 3, 7, 5, 6],
455 0, );
457 memtable.write(&kvs).unwrap();
458
459 let kvs = memtable_util::build_key_values(
460 &metadata,
461 "hello".to_string(),
462 0,
463 &[5, 2, 4, 0, 7],
464 5, );
466 memtable.write(&kvs).unwrap();
467
468 let iter = memtable.iter(None, None, None).unwrap();
469 let read = collect_iter_timestamps(iter);
470 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
471
472 let iter = memtable.iter(None, None, None).unwrap();
473 let read = iter
474 .flat_map(|batch| {
475 batch
476 .unwrap()
477 .sequences()
478 .iter_data()
479 .collect::<Vec<_>>()
480 .into_iter()
481 })
482 .map(|v| v.unwrap())
483 .collect::<Vec<_>>();
484 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
485
486 let stats = memtable.stats();
487 assert!(stats.bytes_allocated() > 0);
488 assert_eq!(
489 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
490 stats.time_range()
491 );
492 }
493
494 #[test]
495 fn test_memtable_projection() {
496 write_iter_projection(true);
497 write_iter_projection(false);
498 }
499
500 fn write_iter_projection(has_pk: bool) {
501 let metadata = if has_pk {
502 memtable_util::metadata_with_primary_key(vec![1, 0], true)
503 } else {
504 memtable_util::metadata_with_primary_key(vec![], false)
505 };
506 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
508 .build(1, &metadata);
509
510 let expect = (0..100).collect::<Vec<_>>();
511 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
512 memtable.write(&kvs).unwrap();
513 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
514
515 let mut v0_all = vec![];
516 for res in iter {
517 let batch = res.unwrap();
518 assert_eq!(1, batch.fields().len());
519 let v0 = batch
520 .fields()
521 .first()
522 .unwrap()
523 .data
524 .as_any()
525 .downcast_ref::<Int64Vector>()
526 .unwrap();
527 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
528 }
529 assert_eq!(expect, v0_all);
530 }
531
532 #[test]
533 fn test_write_iter_multi_keys() {
534 write_iter_multi_keys(1, 100);
535 write_iter_multi_keys(2, 100);
536 write_iter_multi_keys(4, 100);
537 write_iter_multi_keys(8, 5);
538 write_iter_multi_keys(2, 10);
539 }
540
541 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
542 let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
543 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
544 let memtable = PartitionTreeMemtable::new(
545 1,
546 codec,
547 metadata.clone(),
548 None,
549 &PartitionTreeConfig {
550 index_max_keys_per_shard: max_keys,
551 data_freeze_threshold: freeze_threshold,
552 ..Default::default()
553 },
554 );
555
556 let mut data = Vec::new();
557 for i in 0..4 {
559 for j in 0..4 {
560 let timestamps = [11, 13, 1, 5, 3, 7, 9];
562 let key = format!("a{j}");
563 let kvs =
564 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
565 memtable.write(&kvs).unwrap();
566 for ts in timestamps {
567 data.push((i, key.clone(), ts));
568 }
569 }
570 for j in 0..4 {
571 let timestamps = [10, 2, 4, 8, 6];
573 let key = format!("a{j}");
574 let kvs =
575 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
576 memtable.write(&kvs).unwrap();
577 for ts in timestamps {
578 data.push((i, key.clone(), ts));
579 }
580 }
581 }
582 data.sort_unstable();
583
584 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
585 let iter = memtable.iter(None, None, None).unwrap();
586 let read = collect_iter_timestamps(iter);
587 assert_eq!(expect, read);
588 }
589
590 #[test]
591 fn test_memtable_filter() {
592 let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
593 let memtable = PartitionTreeMemtableBuilder::new(
595 PartitionTreeConfig {
596 index_max_keys_per_shard: 40,
597 ..Default::default()
598 },
599 None,
600 )
601 .build(1, &metadata);
602
603 for i in 0..100 {
604 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
605 let kvs =
606 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
607 memtable.write(&kvs).unwrap();
608 }
609
610 for i in 0..100 {
611 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
612 let expr = Expr::BinaryExpr(BinaryExpr {
613 left: Box::new(Expr::Column(Column::from_name("k1"))),
614 op: Operator::Eq,
615 right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
616 });
617 let iter = memtable
618 .iter(None, Some(Predicate::new(vec![expr])), None)
619 .unwrap();
620 let read = collect_iter_timestamps(iter);
621 assert_eq!(timestamps, read);
622 }
623 }
624
625 #[test]
626 fn test_deserialize_config() {
627 let config = PartitionTreeConfig {
628 dedup: false,
629 ..Default::default()
630 };
631 let json = serde_json::to_string(&config).unwrap();
633 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
634 assert!(config.dedup);
635 assert_eq!(PartitionTreeConfig::default(), config);
636 }
637
638 fn metadata_for_metric_engine() -> RegionMetadataRef {
639 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
640 builder
641 .push_column_metadata(ColumnMetadata {
642 column_schema: ColumnSchema::new(
643 "__table_id",
644 ConcreteDataType::uint32_datatype(),
645 false,
646 ),
647 semantic_type: SemanticType::Tag,
648 column_id: 2147483652,
649 })
650 .push_column_metadata(ColumnMetadata {
651 column_schema: ColumnSchema::new(
652 "__tsid",
653 ConcreteDataType::uint64_datatype(),
654 false,
655 ),
656 semantic_type: SemanticType::Tag,
657 column_id: 2147483651,
658 })
659 .push_column_metadata(ColumnMetadata {
660 column_schema: ColumnSchema::new(
661 "test_label",
662 ConcreteDataType::string_datatype(),
663 false,
664 ),
665 semantic_type: SemanticType::Tag,
666 column_id: 2,
667 })
668 .push_column_metadata(ColumnMetadata {
669 column_schema: ColumnSchema::new(
670 "greptime_timestamp",
671 ConcreteDataType::timestamp_millisecond_datatype(),
672 false,
673 ),
674 semantic_type: SemanticType::Timestamp,
675 column_id: 0,
676 })
677 .push_column_metadata(ColumnMetadata {
678 column_schema: ColumnSchema::new(
679 "greptime_value",
680 ConcreteDataType::float64_datatype(),
681 true,
682 ),
683 semantic_type: SemanticType::Field,
684 column_id: 1,
685 })
686 .primary_key(vec![2147483652, 2147483651, 2]);
687 let region_metadata = builder.build().unwrap();
688 Arc::new(region_metadata)
689 }
690
691 fn build_key_values(
692 metadata: RegionMetadataRef,
693 labels: &[&str],
694 table_id: &[u32],
695 ts_id: &[u64],
696 ts: &[i64],
697 values: &[f64],
698 sequence: u64,
699 ) -> KeyValues {
700 let column_schema = region_metadata_to_row_schema(&metadata);
701
702 let rows = ts
703 .iter()
704 .zip(table_id.iter())
705 .zip(ts_id.iter())
706 .zip(labels.iter())
707 .zip(values.iter())
708 .map(|((((ts, table_id), ts_id), label), val)| Row {
709 values: vec![
710 api::v1::Value {
711 value_data: Some(ValueData::U32Value(*table_id)),
712 },
713 api::v1::Value {
714 value_data: Some(ValueData::U64Value(*ts_id)),
715 },
716 api::v1::Value {
717 value_data: Some(ValueData::StringValue(label.to_string())),
718 },
719 api::v1::Value {
720 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
721 },
722 api::v1::Value {
723 value_data: Some(ValueData::F64Value(*val)),
724 },
725 ],
726 })
727 .collect();
728 let mutation = api::v1::Mutation {
729 op_type: 1,
730 sequence,
731 rows: Some(Rows {
732 schema: column_schema,
733 rows,
734 }),
735 write_hint: None,
736 };
737 KeyValues::new(metadata.as_ref(), mutation).unwrap()
738 }
739
740 #[test]
741 fn test_write_freeze() {
742 let metadata = metadata_for_metric_engine();
743 let memtable = PartitionTreeMemtableBuilder::new(
744 PartitionTreeConfig {
745 index_max_keys_per_shard: 40,
746 ..Default::default()
747 },
748 None,
749 )
750 .build(1, &metadata);
751
752 let codec = DensePrimaryKeyCodec::new(&metadata);
753
754 memtable
755 .write(&build_key_values(
756 metadata.clone(),
757 &["daily", "10min", "daily", "10min"],
758 &[1025, 1025, 1025, 1025],
759 &[
760 16442255374049317291,
761 5686004715529701024,
762 16442255374049317291,
763 5686004715529701024,
764 ],
765 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
766 &[0.0, 0.0, 0.0, 0.0],
767 1,
768 ))
769 .unwrap();
770
771 memtable.freeze().unwrap();
772 let new_memtable = memtable.fork(2, &metadata);
773
774 new_memtable
775 .write(&build_key_values(
776 metadata.clone(),
777 &["10min"],
778 &[1025],
779 &[5686004715529701024],
780 &[1714643131000],
781 &[0.1],
782 2,
783 ))
784 .unwrap();
785
786 let mut reader = new_memtable.iter(None, None, None).unwrap();
787 let batch = reader.next().unwrap().unwrap();
788 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
789 if let Value::String(s) = &pk[2] {
790 assert_eq!("10min", s.as_utf8());
791 } else {
792 unreachable!()
793 }
794 }
795
796 fn kv_region_metadata() -> RegionMetadataRef {
797 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
798 builder
799 .push_column_metadata(ColumnMetadata {
800 column_schema: ColumnSchema::new(
801 "ts",
802 ConcreteDataType::timestamp_millisecond_datatype(),
803 false,
804 ),
805 semantic_type: SemanticType::Timestamp,
806 column_id: 0,
807 })
808 .push_column_metadata(ColumnMetadata {
809 column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
810 semantic_type: SemanticType::Tag,
811 column_id: 1,
812 })
813 .push_column_metadata(ColumnMetadata {
814 column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
815 semantic_type: SemanticType::Field,
816 column_id: 2,
817 })
818 .primary_key(vec![1]);
819 let region_metadata = builder.build().unwrap();
820 Arc::new(region_metadata)
821 }
822
823 fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
824 vec![
825 api::v1::ColumnSchema {
826 column_name: "ts".to_string(),
827 datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
828 semantic_type: SemanticType::Timestamp as i32,
829 ..Default::default()
830 },
831 api::v1::ColumnSchema {
832 column_name: "k".to_string(),
833 datatype: api::v1::ColumnDataType::String as i32,
834 semantic_type: SemanticType::Tag as i32,
835 ..Default::default()
836 },
837 api::v1::ColumnSchema {
838 column_name: "v".to_string(),
839 datatype: api::v1::ColumnDataType::String as i32,
840 semantic_type: SemanticType::Field as i32,
841 ..Default::default()
842 },
843 ]
844 }
845
846 fn key_values<T: AsRef<str>>(
847 metadata: &RegionMetadataRef,
848 keys: impl Iterator<Item = T>,
849 ) -> KeyValues {
850 let rows = keys
851 .map(|c| Row {
852 values: vec![
853 api::v1::Value {
854 value_data: Some(ValueData::TimestampMillisecondValue(0)),
855 },
856 api::v1::Value {
857 value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
858 },
859 api::v1::Value {
860 value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
861 },
862 ],
863 })
864 .collect();
865 let mutation = Mutation {
866 op_type: OpType::Put as i32,
867 sequence: 0,
868 rows: Some(Rows {
869 schema: kv_column_schemas(),
870 rows,
871 }),
872 write_hint: None,
873 };
874 KeyValues::new(metadata, mutation).unwrap()
875 }
876
877 fn collect_kvs(
878 iter: BoxedBatchIterator,
879 region_meta: &RegionMetadataRef,
880 ) -> HashMap<String, String> {
881 let decoder = DensePrimaryKeyCodec::new(region_meta);
882 let mut res = HashMap::new();
883 for v in iter {
884 let batch = v.unwrap();
885 let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
886 let field_vector = batch.fields()[0]
887 .data
888 .as_any()
889 .downcast_ref::<StringVector>()
890 .unwrap();
891 for row in 0..batch.num_rows() {
892 res.insert(
893 values[0].as_string().unwrap(),
894 field_vector.get(row).as_string().unwrap(),
895 );
896 }
897 }
898 res
899 }
900
901 #[test]
902 fn test_reorder_insert_key_values() {
903 let metadata = kv_region_metadata();
904 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
905 .build(1, &metadata);
906
907 memtable
908 .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
909 .unwrap();
910 memtable.freeze().unwrap();
911 assert_eq!(
912 collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
913 ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
914 );
915 let forked = memtable.fork(2, &metadata);
916
917 let keys = ["c", "f", "i", "h", "b", "e", "g"];
918 forked.write(&key_values(&metadata, keys.iter())).unwrap();
919 forked.freeze().unwrap();
920 assert_eq!(
921 collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
922 keys.iter()
923 .map(|c| (c.to_string(), c.to_string()))
924 .collect()
925 );
926
927 let forked2 = forked.fork(3, &metadata);
928
929 let keys = ["g", "e", "a", "f", "b", "c", "h"];
930 forked2.write(&key_values(&metadata, keys.iter())).unwrap();
931
932 let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
933 let expected = keys
934 .iter()
935 .map(|c| (c.to_string(), c.to_string()))
936 .collect::<HashMap<_, _>>();
937 assert_eq!(kvs, expected);
938 }
939}