1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use common_stat::get_total_memory_readable;
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use serde::{Deserialize, Serialize};
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, SequenceRange};
37use table::predicate::Predicate;
38
39use crate::error::{Result, UnsupportedOperationSnafu};
40use crate::flush::WriteBufferManagerRef;
41use crate::memtable::bulk::part::BulkPart;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
46 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
47 MemtableStats, RangesOptions,
48};
49use crate::region::options::MergeMode;
50
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56type ShardId = u32;
58type PkIndex = u16;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64 shard_id: ShardId,
65 pk_index: PkIndex,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75 pub index_max_keys_per_shard: usize,
77 pub data_freeze_threshold: usize,
79 #[serde(skip_deserializing)]
84 pub dedup: bool,
85 pub fork_dictionary_bytes: ReadableSize,
87 #[serde(skip_deserializing)]
89 pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93 fn default() -> Self {
94 let mut fork_dictionary_bytes = ReadableSize::mb(512);
95 if let Some(total_memory) = get_total_memory_readable() {
96 let adjust_dictionary_bytes =
97 std::cmp::min(total_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98 if adjust_dictionary_bytes.0 > 0 {
99 fork_dictionary_bytes = adjust_dictionary_bytes;
100 }
101 }
102
103 Self {
104 index_max_keys_per_shard: 8192,
105 data_freeze_threshold: 131072,
106 dedup: true,
107 fork_dictionary_bytes,
108 merge_mode: MergeMode::LastRow,
109 }
110 }
111}
112
113pub struct PartitionTreeMemtable {
115 id: MemtableId,
116 tree: Arc<PartitionTree>,
117 alloc_tracker: AllocTracker,
118 max_timestamp: AtomicI64,
119 min_timestamp: AtomicI64,
120 max_sequence: AtomicU64,
121 num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_struct("PartitionTreeMemtable")
128 .field("id", &self.id)
129 .finish()
130 }
131}
132
133impl Memtable for PartitionTreeMemtable {
134 fn id(&self) -> MemtableId {
135 self.id
136 }
137
138 fn write(&self, kvs: &KeyValues) -> Result<()> {
139 if kvs.is_empty() {
140 return Ok(());
141 }
142
143 let mut metrics = WriteMetrics::default();
146 let mut pk_buffer = Vec::new();
147 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150 if res.is_ok() {
151 metrics.max_sequence = kvs.max_sequence();
152 metrics.num_rows = kvs.num_rows();
153 self.update_stats(&metrics);
154 }
155 res
156 }
157
158 fn write_one(&self, key_value: KeyValue) -> Result<()> {
159 let mut metrics = WriteMetrics::default();
160 let mut pk_buffer = Vec::new();
161 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
163
164 if res.is_ok() {
166 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
167 metrics.num_rows = 1;
168 self.update_stats(&metrics);
169 }
170 res
171 }
172
173 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
174 UnsupportedOperationSnafu {
175 err_msg: "PartitionTreeMemtable does not support write_bulk",
176 }
177 .fail()
178 }
179
180 #[cfg(any(test, feature = "test"))]
181 fn iter(
182 &self,
183 projection: Option<&[ColumnId]>,
184 predicate: Option<Predicate>,
185 sequence: Option<SequenceRange>,
186 ) -> Result<BoxedBatchIterator> {
187 self.tree.read(projection, predicate, sequence, None)
188 }
189
190 fn ranges(
191 &self,
192 projection: Option<&[ColumnId]>,
193 options: RangesOptions,
194 ) -> Result<MemtableRanges> {
195 let predicate = options.predicate;
196 let sequence = options.sequence;
197 let projection = projection.map(|ids| ids.to_vec());
198 let builder = Box::new(PartitionTreeIterBuilder {
199 tree: self.tree.clone(),
200 projection,
201 predicate: predicate.predicate().cloned(),
202 sequence,
203 });
204 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
205
206 let stats = self.stats();
207 Ok(MemtableRanges {
208 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
209 stats,
210 })
211 }
212
213 fn is_empty(&self) -> bool {
214 self.tree.is_empty()
215 }
216
217 fn freeze(&self) -> Result<()> {
218 self.alloc_tracker.done_allocating();
219
220 self.tree.freeze()
221 }
222
223 fn stats(&self) -> MemtableStats {
224 let estimated_bytes = self.alloc_tracker.bytes_allocated();
225
226 if estimated_bytes == 0 {
227 return MemtableStats {
229 estimated_bytes,
230 time_range: None,
231 num_rows: 0,
232 num_ranges: 0,
233 max_sequence: 0,
234 series_count: 0,
235 };
236 }
237
238 let ts_type = self
239 .tree
240 .metadata
241 .time_index_column()
242 .column_schema
243 .data_type
244 .clone()
245 .as_timestamp()
246 .expect("Timestamp column must have timestamp type");
247 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
248 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
249 let series_count = self.tree.series_count();
250 MemtableStats {
251 estimated_bytes,
252 time_range: Some((min_timestamp, max_timestamp)),
253 num_rows: self.num_rows.load(Ordering::Relaxed),
254 num_ranges: 1,
255 max_sequence: self.max_sequence.load(Ordering::Relaxed),
256 series_count,
257 }
258 }
259
260 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
261 let tree = self.tree.fork(metadata.clone());
262
263 let memtable = PartitionTreeMemtable::with_tree(id, tree);
264 Arc::new(memtable)
265 }
266}
267
268impl PartitionTreeMemtable {
269 pub fn new(
271 id: MemtableId,
272 row_codec: Arc<dyn PrimaryKeyCodec>,
273 metadata: RegionMetadataRef,
274 write_buffer_manager: Option<WriteBufferManagerRef>,
275 config: &PartitionTreeConfig,
276 ) -> Self {
277 Self::with_tree(
278 id,
279 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
280 )
281 }
282
283 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
287 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
288
289 Self {
290 id,
291 tree: Arc::new(tree),
292 alloc_tracker,
293 max_timestamp: AtomicI64::new(i64::MIN),
294 min_timestamp: AtomicI64::new(i64::MAX),
295 num_rows: AtomicUsize::new(0),
296 max_sequence: AtomicU64::new(0),
297 }
298 }
299
300 fn update_stats(&self, metrics: &WriteMetrics) {
302 self.alloc_tracker.on_allocation(metrics.value_bytes);
304 self.max_timestamp
305 .fetch_max(metrics.max_ts, Ordering::SeqCst);
306 self.min_timestamp
307 .fetch_min(metrics.min_ts, Ordering::SeqCst);
308 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
309 self.max_sequence
310 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
311 }
312
313 #[cfg(any(test, feature = "test"))]
314 pub fn iter(
315 &self,
316 projection: Option<&[ColumnId]>,
317 predicate: Option<Predicate>,
318 sequence: Option<SequenceRange>,
319 ) -> Result<BoxedBatchIterator> {
320 self.tree.read(projection, predicate, sequence, None)
321 }
322}
323
324#[derive(Debug, Default)]
326pub struct PartitionTreeMemtableBuilder {
327 config: PartitionTreeConfig,
328 write_buffer_manager: Option<WriteBufferManagerRef>,
329}
330
331impl PartitionTreeMemtableBuilder {
332 pub fn new(
334 config: PartitionTreeConfig,
335 write_buffer_manager: Option<WriteBufferManagerRef>,
336 ) -> Self {
337 Self {
338 config,
339 write_buffer_manager,
340 }
341 }
342}
343
344impl MemtableBuilder for PartitionTreeMemtableBuilder {
345 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
346 let codec = build_primary_key_codec(metadata);
347 Arc::new(PartitionTreeMemtable::new(
348 id,
349 codec,
350 metadata.clone(),
351 self.write_buffer_manager.clone(),
352 &self.config,
353 ))
354 }
355
356 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
357 false
358 }
359}
360
361struct PartitionTreeIterBuilder {
362 tree: Arc<PartitionTree>,
363 projection: Option<Vec<ColumnId>>,
364 predicate: Option<Predicate>,
365 sequence: Option<SequenceRange>,
366}
367
368impl IterBuilder for PartitionTreeIterBuilder {
369 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
370 self.tree.read(
371 self.projection.as_deref(),
372 self.predicate.clone(),
373 self.sequence,
374 metrics,
375 )
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::collections::HashMap;
382 use std::sync::Arc;
383
384 use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
385 use api::v1::value::ValueData;
386 use api::v1::{Mutation, OpType, Rows, SemanticType};
387 use common_query::prelude::{greptime_timestamp, greptime_value};
388 use common_time::Timestamp;
389 use datafusion_common::Column;
390 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
391 use datatypes::data_type::ConcreteDataType;
392 use datatypes::prelude::Vector;
393 use datatypes::scalars::ScalarVector;
394 use datatypes::schema::ColumnSchema;
395 use datatypes::value::Value;
396 use datatypes::vectors::{Int64Vector, StringVector};
397 use mito_codec::row_converter::DensePrimaryKeyCodec;
398 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
399 use store_api::storage::RegionId;
400
401 use super::*;
402 use crate::test_util::memtable_util::{
403 self, collect_iter_timestamps, region_metadata_to_row_schema,
404 };
405
406 #[test]
407 fn test_memtable_sorted_input() {
408 write_iter_sorted_input(true);
409 write_iter_sorted_input(false);
410 }
411
412 fn write_iter_sorted_input(has_pk: bool) {
413 let metadata = if has_pk {
414 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
415 } else {
416 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
417 };
418 let timestamps = (0..100).collect::<Vec<_>>();
419 let kvs =
420 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
421 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
422 let memtable = PartitionTreeMemtable::new(
423 1,
424 codec,
425 metadata.clone(),
426 None,
427 &PartitionTreeConfig::default(),
428 );
429 memtable.write(&kvs).unwrap();
430
431 let expected_ts = kvs
432 .iter()
433 .map(|kv| {
434 kv.timestamp()
435 .try_into_timestamp()
436 .unwrap()
437 .unwrap()
438 .value()
439 })
440 .collect::<Vec<_>>();
441
442 let iter = memtable.iter(None, None, None).unwrap();
443 let read = collect_iter_timestamps(iter);
444 assert_eq!(expected_ts, read);
445
446 let stats = memtable.stats();
447 assert!(stats.bytes_allocated() > 0);
448 assert_eq!(
449 Some((
450 Timestamp::new_millisecond(0),
451 Timestamp::new_millisecond(99)
452 )),
453 stats.time_range()
454 );
455 }
456
457 #[test]
458 fn test_memtable_unsorted_input() {
459 write_iter_unsorted_input(true);
460 write_iter_unsorted_input(false);
461 }
462
463 fn write_iter_unsorted_input(has_pk: bool) {
464 let metadata = if has_pk {
465 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
466 } else {
467 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
468 };
469 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
470 let memtable = PartitionTreeMemtable::new(
471 1,
472 codec,
473 metadata.clone(),
474 None,
475 &PartitionTreeConfig::default(),
476 );
477
478 let kvs = memtable_util::build_key_values(
479 &metadata,
480 "hello".to_string(),
481 0,
482 &[1, 3, 7, 5, 6],
483 0, );
485 memtable.write(&kvs).unwrap();
486
487 let kvs = memtable_util::build_key_values(
488 &metadata,
489 "hello".to_string(),
490 0,
491 &[5, 2, 4, 0, 7],
492 5, );
494 memtable.write(&kvs).unwrap();
495
496 let iter = memtable.iter(None, None, None).unwrap();
497 let read = collect_iter_timestamps(iter);
498 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
499
500 let iter = memtable.iter(None, None, None).unwrap();
501 let read = iter
502 .flat_map(|batch| {
503 batch
504 .unwrap()
505 .sequences()
506 .iter_data()
507 .collect::<Vec<_>>()
508 .into_iter()
509 })
510 .map(|v| v.unwrap())
511 .collect::<Vec<_>>();
512 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
513
514 let stats = memtable.stats();
515 assert!(stats.bytes_allocated() > 0);
516 assert_eq!(
517 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
518 stats.time_range()
519 );
520 }
521
522 #[test]
523 fn test_memtable_projection() {
524 write_iter_projection(true);
525 write_iter_projection(false);
526 }
527
528 fn write_iter_projection(has_pk: bool) {
529 let metadata = if has_pk {
530 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
531 } else {
532 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
533 };
534 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
536 .build(1, &metadata);
537
538 let expect = (0..100).collect::<Vec<_>>();
539 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
540 memtable.write(&kvs).unwrap();
541 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
542
543 let mut v0_all = vec![];
544 for res in iter {
545 let batch = res.unwrap();
546 assert_eq!(1, batch.fields().len());
547 let v0 = batch
548 .fields()
549 .first()
550 .unwrap()
551 .data
552 .as_any()
553 .downcast_ref::<Int64Vector>()
554 .unwrap();
555 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
556 }
557 assert_eq!(expect, v0_all);
558 }
559
560 #[test]
561 fn test_write_iter_multi_keys() {
562 write_iter_multi_keys(1, 100);
563 write_iter_multi_keys(2, 100);
564 write_iter_multi_keys(4, 100);
565 write_iter_multi_keys(8, 5);
566 write_iter_multi_keys(2, 10);
567 }
568
569 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
570 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
571 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
572 let memtable = PartitionTreeMemtable::new(
573 1,
574 codec,
575 metadata.clone(),
576 None,
577 &PartitionTreeConfig {
578 index_max_keys_per_shard: max_keys,
579 data_freeze_threshold: freeze_threshold,
580 ..Default::default()
581 },
582 );
583
584 let mut data = Vec::new();
585 for i in 0..4 {
587 for j in 0..4 {
588 let timestamps = [11, 13, 1, 5, 3, 7, 9];
590 let key = format!("a{j}");
591 let kvs =
592 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
593 memtable.write(&kvs).unwrap();
594 for ts in timestamps {
595 data.push((i, key.clone(), ts));
596 }
597 }
598 for j in 0..4 {
599 let timestamps = [10, 2, 4, 8, 6];
601 let key = format!("a{j}");
602 let kvs =
603 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
604 memtable.write(&kvs).unwrap();
605 for ts in timestamps {
606 data.push((i, key.clone(), ts));
607 }
608 }
609 }
610 data.sort_unstable();
611
612 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
613 let iter = memtable.iter(None, None, None).unwrap();
614 let read = collect_iter_timestamps(iter);
615 assert_eq!(expect, read);
616 }
617
618 #[test]
619 fn test_memtable_filter() {
620 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
621 let memtable = PartitionTreeMemtableBuilder::new(
623 PartitionTreeConfig {
624 index_max_keys_per_shard: 40,
625 ..Default::default()
626 },
627 None,
628 )
629 .build(1, &metadata);
630
631 for i in 0..100 {
632 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
633 let kvs =
634 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
635 memtable.write(&kvs).unwrap();
636 }
637
638 for i in 0..100 {
639 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
640 let expr = Expr::BinaryExpr(BinaryExpr {
641 left: Box::new(Expr::Column(Column::from_name("k1"))),
642 op: Operator::Eq,
643 right: Box::new((i as u32).lit()),
644 });
645 let iter = memtable
646 .iter(None, Some(Predicate::new(vec![expr])), None)
647 .unwrap();
648 let read = collect_iter_timestamps(iter);
649 assert_eq!(timestamps, read);
650 }
651 }
652
653 #[test]
654 fn test_deserialize_config() {
655 let config = PartitionTreeConfig {
656 dedup: false,
657 ..Default::default()
658 };
659 let json = serde_json::to_string(&config).unwrap();
661 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
662 assert!(config.dedup);
663 assert_eq!(PartitionTreeConfig::default(), config);
664 }
665
666 fn metadata_for_metric_engine() -> RegionMetadataRef {
667 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
668 builder
669 .push_column_metadata(ColumnMetadata {
670 column_schema: ColumnSchema::new(
671 "__table_id",
672 ConcreteDataType::uint32_datatype(),
673 false,
674 ),
675 semantic_type: SemanticType::Tag,
676 column_id: 2147483652,
677 })
678 .push_column_metadata(ColumnMetadata {
679 column_schema: ColumnSchema::new(
680 "__tsid",
681 ConcreteDataType::uint64_datatype(),
682 false,
683 ),
684 semantic_type: SemanticType::Tag,
685 column_id: 2147483651,
686 })
687 .push_column_metadata(ColumnMetadata {
688 column_schema: ColumnSchema::new(
689 "test_label",
690 ConcreteDataType::string_datatype(),
691 false,
692 ),
693 semantic_type: SemanticType::Tag,
694 column_id: 2,
695 })
696 .push_column_metadata(ColumnMetadata {
697 column_schema: ColumnSchema::new(
698 greptime_timestamp(),
699 ConcreteDataType::timestamp_millisecond_datatype(),
700 false,
701 ),
702 semantic_type: SemanticType::Timestamp,
703 column_id: 0,
704 })
705 .push_column_metadata(ColumnMetadata {
706 column_schema: ColumnSchema::new(
707 greptime_value(),
708 ConcreteDataType::float64_datatype(),
709 true,
710 ),
711 semantic_type: SemanticType::Field,
712 column_id: 1,
713 })
714 .primary_key(vec![2147483652, 2147483651, 2]);
715 let region_metadata = builder.build().unwrap();
716 Arc::new(region_metadata)
717 }
718
719 fn build_key_values(
720 metadata: RegionMetadataRef,
721 labels: &[&str],
722 table_id: &[u32],
723 ts_id: &[u64],
724 ts: &[i64],
725 values: &[f64],
726 sequence: u64,
727 ) -> KeyValues {
728 let column_schema = region_metadata_to_row_schema(&metadata);
729
730 let rows = ts
731 .iter()
732 .zip(table_id.iter())
733 .zip(ts_id.iter())
734 .zip(labels.iter())
735 .zip(values.iter())
736 .map(|((((ts, table_id), ts_id), label), val)| {
737 row(vec![
738 ValueData::U32Value(*table_id),
739 ValueData::U64Value(*ts_id),
740 ValueData::StringValue(label.to_string()),
741 ValueData::TimestampMillisecondValue(*ts),
742 ValueData::F64Value(*val),
743 ])
744 })
745 .collect();
746 let mutation = api::v1::Mutation {
747 op_type: 1,
748 sequence,
749 rows: Some(Rows {
750 schema: column_schema,
751 rows,
752 }),
753 write_hint: None,
754 };
755 KeyValues::new(metadata.as_ref(), mutation).unwrap()
756 }
757
758 #[test]
759 fn test_write_freeze() {
760 let metadata = metadata_for_metric_engine();
761 let memtable = PartitionTreeMemtableBuilder::new(
762 PartitionTreeConfig {
763 index_max_keys_per_shard: 40,
764 ..Default::default()
765 },
766 None,
767 )
768 .build(1, &metadata);
769
770 let codec = DensePrimaryKeyCodec::new(&metadata);
771
772 memtable
773 .write(&build_key_values(
774 metadata.clone(),
775 &["daily", "10min", "daily", "10min"],
776 &[1025, 1025, 1025, 1025],
777 &[
778 16442255374049317291,
779 5686004715529701024,
780 16442255374049317291,
781 5686004715529701024,
782 ],
783 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
784 &[0.0, 0.0, 0.0, 0.0],
785 1,
786 ))
787 .unwrap();
788
789 memtable.freeze().unwrap();
790 let new_memtable = memtable.fork(2, &metadata);
791
792 new_memtable
793 .write(&build_key_values(
794 metadata.clone(),
795 &["10min"],
796 &[1025],
797 &[5686004715529701024],
798 &[1714643131000],
799 &[0.1],
800 2,
801 ))
802 .unwrap();
803
804 let mut reader = new_memtable.iter(None, None, None).unwrap();
805 let batch = reader.next().unwrap().unwrap();
806 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
807 if let Value::String(s) = &pk[2] {
808 assert_eq!("10min", s.as_utf8());
809 } else {
810 unreachable!()
811 }
812 }
813
814 fn kv_region_metadata() -> RegionMetadataRef {
815 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
816 builder
817 .push_column_metadata(ColumnMetadata {
818 column_schema: ColumnSchema::new(
819 "ts",
820 ConcreteDataType::timestamp_millisecond_datatype(),
821 false,
822 ),
823 semantic_type: SemanticType::Timestamp,
824 column_id: 0,
825 })
826 .push_column_metadata(ColumnMetadata {
827 column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
828 semantic_type: SemanticType::Tag,
829 column_id: 1,
830 })
831 .push_column_metadata(ColumnMetadata {
832 column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
833 semantic_type: SemanticType::Field,
834 column_id: 2,
835 })
836 .primary_key(vec![1]);
837 let region_metadata = builder.build().unwrap();
838 Arc::new(region_metadata)
839 }
840
841 fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
842 vec![
843 time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
844 tag_column_schema("k", api::v1::ColumnDataType::String),
845 field_column_schema("v", api::v1::ColumnDataType::String),
846 ]
847 }
848
849 fn key_values<T: AsRef<str>>(
850 metadata: &RegionMetadataRef,
851 keys: impl Iterator<Item = T>,
852 ) -> KeyValues {
853 let rows = keys
854 .map(|c| {
855 row(vec![
856 ValueData::TimestampMillisecondValue(0),
857 ValueData::StringValue(c.as_ref().to_string()),
858 ValueData::StringValue(c.as_ref().to_string()),
859 ])
860 })
861 .collect();
862 let mutation = Mutation {
863 op_type: OpType::Put as i32,
864 sequence: 0,
865 rows: Some(Rows {
866 schema: kv_column_schemas(),
867 rows,
868 }),
869 write_hint: None,
870 };
871 KeyValues::new(metadata, mutation).unwrap()
872 }
873
874 fn collect_kvs(
875 iter: BoxedBatchIterator,
876 region_meta: &RegionMetadataRef,
877 ) -> HashMap<String, String> {
878 let decoder = DensePrimaryKeyCodec::new(region_meta);
879 let mut res = HashMap::new();
880 for v in iter {
881 let batch = v.unwrap();
882 let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
883 let field_vector = batch.fields()[0]
884 .data
885 .as_any()
886 .downcast_ref::<StringVector>()
887 .unwrap();
888 for row in 0..batch.num_rows() {
889 res.insert(
890 values[0].as_string().unwrap(),
891 field_vector.get(row).as_string().unwrap(),
892 );
893 }
894 }
895 res
896 }
897
898 #[test]
899 fn test_reorder_insert_key_values() {
900 let metadata = kv_region_metadata();
901 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
902 .build(1, &metadata);
903
904 memtable
905 .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
906 .unwrap();
907 memtable.freeze().unwrap();
908 assert_eq!(
909 collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
910 ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
911 );
912 let forked = memtable.fork(2, &metadata);
913
914 let keys = ["c", "f", "i", "h", "b", "e", "g"];
915 forked.write(&key_values(&metadata, keys.iter())).unwrap();
916 forked.freeze().unwrap();
917 assert_eq!(
918 collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
919 keys.iter()
920 .map(|c| (c.to_string(), c.to_string()))
921 .collect()
922 );
923
924 let forked2 = forked.fork(3, &metadata);
925
926 let keys = ["g", "e", "a", "f", "b", "c", "h"];
927 forked2.write(&key_values(&metadata, keys.iter())).unwrap();
928
929 let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
930 let expected = keys
931 .iter()
932 .map(|c| (c.to_string(), c.to_string()))
933 .collect::<HashMap<_, _>>();
934 assert_eq!(kvs, expected);
935 }
936}