1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
45 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
46 MemtableStats, PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55type ShardId = u32;
57type PkIndex = u16;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63 shard_id: ShardId,
64 pk_index: PkIndex,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74 pub index_max_keys_per_shard: usize,
76 pub data_freeze_threshold: usize,
78 #[serde(skip_deserializing)]
83 pub dedup: bool,
84 pub fork_dictionary_bytes: ReadableSize,
86 #[serde(skip_deserializing)]
88 pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92 fn default() -> Self {
93 let mut fork_dictionary_bytes = ReadableSize::mb(512);
94 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95 let adjust_dictionary_bytes =
96 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97 if adjust_dictionary_bytes.0 > 0 {
98 fork_dictionary_bytes = adjust_dictionary_bytes;
99 }
100 }
101
102 Self {
103 index_max_keys_per_shard: 8192,
104 data_freeze_threshold: 131072,
105 dedup: true,
106 fork_dictionary_bytes,
107 merge_mode: MergeMode::LastRow,
108 }
109 }
110}
111
112pub struct PartitionTreeMemtable {
114 id: MemtableId,
115 tree: Arc<PartitionTree>,
116 alloc_tracker: AllocTracker,
117 max_timestamp: AtomicI64,
118 min_timestamp: AtomicI64,
119 max_sequence: AtomicU64,
120 num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("PartitionTreeMemtable")
127 .field("id", &self.id)
128 .finish()
129 }
130}
131
132impl Memtable for PartitionTreeMemtable {
133 fn id(&self) -> MemtableId {
134 self.id
135 }
136
137 fn write(&self, kvs: &KeyValues) -> Result<()> {
138 if kvs.is_empty() {
139 return Ok(());
140 }
141
142 let mut metrics = WriteMetrics::default();
145 let mut pk_buffer = Vec::new();
146 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149 if res.is_ok() {
150 metrics.max_sequence = kvs.max_sequence();
151 metrics.num_rows = kvs.num_rows();
152 self.update_stats(&metrics);
153 }
154 res
155 }
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()> {
158 let mut metrics = WriteMetrics::default();
159 let mut pk_buffer = Vec::new();
160 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163 if res.is_ok() {
165 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166 metrics.num_rows = 1;
167 self.update_stats(&metrics);
168 }
169 res
170 }
171
172 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173 UnsupportedOperationSnafu {
174 err_msg: "PartitionTreeMemtable does not support write_bulk",
175 }
176 .fail()
177 }
178
179 #[cfg(any(test, feature = "test"))]
180 fn iter(
181 &self,
182 projection: Option<&[ColumnId]>,
183 predicate: Option<Predicate>,
184 sequence: Option<SequenceNumber>,
185 ) -> Result<BoxedBatchIterator> {
186 self.tree.read(projection, predicate, sequence, None)
187 }
188
189 fn ranges(
190 &self,
191 projection: Option<&[ColumnId]>,
192 predicate: PredicateGroup,
193 sequence: Option<SequenceNumber>,
194 _for_flush: bool,
195 ) -> Result<MemtableRanges> {
196 let projection = projection.map(|ids| ids.to_vec());
197 let builder = Box::new(PartitionTreeIterBuilder {
198 tree: self.tree.clone(),
199 projection,
200 predicate: predicate.predicate().cloned(),
201 sequence,
202 });
203 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
204
205 let stats = self.stats();
206 Ok(MemtableRanges {
207 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
208 stats,
209 })
210 }
211
212 fn is_empty(&self) -> bool {
213 self.tree.is_empty()
214 }
215
216 fn freeze(&self) -> Result<()> {
217 self.alloc_tracker.done_allocating();
218
219 self.tree.freeze()
220 }
221
222 fn stats(&self) -> MemtableStats {
223 let estimated_bytes = self.alloc_tracker.bytes_allocated();
224
225 if estimated_bytes == 0 {
226 return MemtableStats {
228 estimated_bytes,
229 time_range: None,
230 num_rows: 0,
231 num_ranges: 0,
232 max_sequence: 0,
233 series_count: 0,
234 };
235 }
236
237 let ts_type = self
238 .tree
239 .metadata
240 .time_index_column()
241 .column_schema
242 .data_type
243 .clone()
244 .as_timestamp()
245 .expect("Timestamp column must have timestamp type");
246 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
247 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
248 let series_count = self.tree.series_count();
249 MemtableStats {
250 estimated_bytes,
251 time_range: Some((min_timestamp, max_timestamp)),
252 num_rows: self.num_rows.load(Ordering::Relaxed),
253 num_ranges: 1,
254 max_sequence: self.max_sequence.load(Ordering::Relaxed),
255 series_count,
256 }
257 }
258
259 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
260 let tree = self.tree.fork(metadata.clone());
261
262 let memtable = PartitionTreeMemtable::with_tree(id, tree);
263 Arc::new(memtable)
264 }
265}
266
267impl PartitionTreeMemtable {
268 pub fn new(
270 id: MemtableId,
271 row_codec: Arc<dyn PrimaryKeyCodec>,
272 metadata: RegionMetadataRef,
273 write_buffer_manager: Option<WriteBufferManagerRef>,
274 config: &PartitionTreeConfig,
275 ) -> Self {
276 Self::with_tree(
277 id,
278 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
279 )
280 }
281
282 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
286 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
287
288 Self {
289 id,
290 tree: Arc::new(tree),
291 alloc_tracker,
292 max_timestamp: AtomicI64::new(i64::MIN),
293 min_timestamp: AtomicI64::new(i64::MAX),
294 num_rows: AtomicUsize::new(0),
295 max_sequence: AtomicU64::new(0),
296 }
297 }
298
299 fn update_stats(&self, metrics: &WriteMetrics) {
301 self.alloc_tracker.on_allocation(metrics.value_bytes);
303 self.max_timestamp
304 .fetch_max(metrics.max_ts, Ordering::SeqCst);
305 self.min_timestamp
306 .fetch_min(metrics.min_ts, Ordering::SeqCst);
307 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
308 self.max_sequence
309 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
310 }
311
312 #[cfg(any(test, feature = "test"))]
313 pub fn iter(
314 &self,
315 projection: Option<&[ColumnId]>,
316 predicate: Option<Predicate>,
317 sequence: Option<SequenceNumber>,
318 ) -> Result<BoxedBatchIterator> {
319 self.tree.read(projection, predicate, sequence, None)
320 }
321}
322
323#[derive(Debug, Default)]
325pub struct PartitionTreeMemtableBuilder {
326 config: PartitionTreeConfig,
327 write_buffer_manager: Option<WriteBufferManagerRef>,
328}
329
330impl PartitionTreeMemtableBuilder {
331 pub fn new(
333 config: PartitionTreeConfig,
334 write_buffer_manager: Option<WriteBufferManagerRef>,
335 ) -> Self {
336 Self {
337 config,
338 write_buffer_manager,
339 }
340 }
341}
342
343impl MemtableBuilder for PartitionTreeMemtableBuilder {
344 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
345 let codec = build_primary_key_codec(metadata);
346 Arc::new(PartitionTreeMemtable::new(
347 id,
348 codec,
349 metadata.clone(),
350 self.write_buffer_manager.clone(),
351 &self.config,
352 ))
353 }
354
355 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
356 false
357 }
358}
359
360struct PartitionTreeIterBuilder {
361 tree: Arc<PartitionTree>,
362 projection: Option<Vec<ColumnId>>,
363 predicate: Option<Predicate>,
364 sequence: Option<SequenceNumber>,
365}
366
367impl IterBuilder for PartitionTreeIterBuilder {
368 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
369 self.tree.read(
370 self.projection.as_deref(),
371 self.predicate.clone(),
372 self.sequence,
373 metrics,
374 )
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use std::collections::HashMap;
381 use std::sync::Arc;
382
383 use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
384 use api::v1::value::ValueData;
385 use api::v1::{Mutation, OpType, Rows, SemanticType};
386 use common_time::Timestamp;
387 use datafusion_common::Column;
388 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
389 use datatypes::data_type::ConcreteDataType;
390 use datatypes::prelude::Vector;
391 use datatypes::scalars::ScalarVector;
392 use datatypes::schema::ColumnSchema;
393 use datatypes::value::Value;
394 use datatypes::vectors::{Int64Vector, StringVector};
395 use mito_codec::row_converter::DensePrimaryKeyCodec;
396 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
397 use store_api::storage::RegionId;
398
399 use super::*;
400 use crate::test_util::memtable_util::{
401 self, collect_iter_timestamps, region_metadata_to_row_schema,
402 };
403
404 #[test]
405 fn test_memtable_sorted_input() {
406 write_iter_sorted_input(true);
407 write_iter_sorted_input(false);
408 }
409
410 fn write_iter_sorted_input(has_pk: bool) {
411 let metadata = if has_pk {
412 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
413 } else {
414 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
415 };
416 let timestamps = (0..100).collect::<Vec<_>>();
417 let kvs =
418 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
419 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
420 let memtable = PartitionTreeMemtable::new(
421 1,
422 codec,
423 metadata.clone(),
424 None,
425 &PartitionTreeConfig::default(),
426 );
427 memtable.write(&kvs).unwrap();
428
429 let expected_ts = kvs
430 .iter()
431 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
432 .collect::<Vec<_>>();
433
434 let iter = memtable.iter(None, None, None).unwrap();
435 let read = collect_iter_timestamps(iter);
436 assert_eq!(expected_ts, read);
437
438 let stats = memtable.stats();
439 assert!(stats.bytes_allocated() > 0);
440 assert_eq!(
441 Some((
442 Timestamp::new_millisecond(0),
443 Timestamp::new_millisecond(99)
444 )),
445 stats.time_range()
446 );
447 }
448
449 #[test]
450 fn test_memtable_unsorted_input() {
451 write_iter_unsorted_input(true);
452 write_iter_unsorted_input(false);
453 }
454
455 fn write_iter_unsorted_input(has_pk: bool) {
456 let metadata = if has_pk {
457 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
458 } else {
459 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
460 };
461 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
462 let memtable = PartitionTreeMemtable::new(
463 1,
464 codec,
465 metadata.clone(),
466 None,
467 &PartitionTreeConfig::default(),
468 );
469
470 let kvs = memtable_util::build_key_values(
471 &metadata,
472 "hello".to_string(),
473 0,
474 &[1, 3, 7, 5, 6],
475 0, );
477 memtable.write(&kvs).unwrap();
478
479 let kvs = memtable_util::build_key_values(
480 &metadata,
481 "hello".to_string(),
482 0,
483 &[5, 2, 4, 0, 7],
484 5, );
486 memtable.write(&kvs).unwrap();
487
488 let iter = memtable.iter(None, None, None).unwrap();
489 let read = collect_iter_timestamps(iter);
490 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
491
492 let iter = memtable.iter(None, None, None).unwrap();
493 let read = iter
494 .flat_map(|batch| {
495 batch
496 .unwrap()
497 .sequences()
498 .iter_data()
499 .collect::<Vec<_>>()
500 .into_iter()
501 })
502 .map(|v| v.unwrap())
503 .collect::<Vec<_>>();
504 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
505
506 let stats = memtable.stats();
507 assert!(stats.bytes_allocated() > 0);
508 assert_eq!(
509 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
510 stats.time_range()
511 );
512 }
513
514 #[test]
515 fn test_memtable_projection() {
516 write_iter_projection(true);
517 write_iter_projection(false);
518 }
519
520 fn write_iter_projection(has_pk: bool) {
521 let metadata = if has_pk {
522 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
523 } else {
524 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
525 };
526 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
528 .build(1, &metadata);
529
530 let expect = (0..100).collect::<Vec<_>>();
531 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
532 memtable.write(&kvs).unwrap();
533 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
534
535 let mut v0_all = vec![];
536 for res in iter {
537 let batch = res.unwrap();
538 assert_eq!(1, batch.fields().len());
539 let v0 = batch
540 .fields()
541 .first()
542 .unwrap()
543 .data
544 .as_any()
545 .downcast_ref::<Int64Vector>()
546 .unwrap();
547 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
548 }
549 assert_eq!(expect, v0_all);
550 }
551
552 #[test]
553 fn test_write_iter_multi_keys() {
554 write_iter_multi_keys(1, 100);
555 write_iter_multi_keys(2, 100);
556 write_iter_multi_keys(4, 100);
557 write_iter_multi_keys(8, 5);
558 write_iter_multi_keys(2, 10);
559 }
560
561 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
562 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
563 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
564 let memtable = PartitionTreeMemtable::new(
565 1,
566 codec,
567 metadata.clone(),
568 None,
569 &PartitionTreeConfig {
570 index_max_keys_per_shard: max_keys,
571 data_freeze_threshold: freeze_threshold,
572 ..Default::default()
573 },
574 );
575
576 let mut data = Vec::new();
577 for i in 0..4 {
579 for j in 0..4 {
580 let timestamps = [11, 13, 1, 5, 3, 7, 9];
582 let key = format!("a{j}");
583 let kvs =
584 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
585 memtable.write(&kvs).unwrap();
586 for ts in timestamps {
587 data.push((i, key.clone(), ts));
588 }
589 }
590 for j in 0..4 {
591 let timestamps = [10, 2, 4, 8, 6];
593 let key = format!("a{j}");
594 let kvs =
595 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
596 memtable.write(&kvs).unwrap();
597 for ts in timestamps {
598 data.push((i, key.clone(), ts));
599 }
600 }
601 }
602 data.sort_unstable();
603
604 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
605 let iter = memtable.iter(None, None, None).unwrap();
606 let read = collect_iter_timestamps(iter);
607 assert_eq!(expect, read);
608 }
609
610 #[test]
611 fn test_memtable_filter() {
612 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
613 let memtable = PartitionTreeMemtableBuilder::new(
615 PartitionTreeConfig {
616 index_max_keys_per_shard: 40,
617 ..Default::default()
618 },
619 None,
620 )
621 .build(1, &metadata);
622
623 for i in 0..100 {
624 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
625 let kvs =
626 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
627 memtable.write(&kvs).unwrap();
628 }
629
630 for i in 0..100 {
631 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
632 let expr = Expr::BinaryExpr(BinaryExpr {
633 left: Box::new(Expr::Column(Column::from_name("k1"))),
634 op: Operator::Eq,
635 right: Box::new((i as u32).lit()),
636 });
637 let iter = memtable
638 .iter(None, Some(Predicate::new(vec![expr])), None)
639 .unwrap();
640 let read = collect_iter_timestamps(iter);
641 assert_eq!(timestamps, read);
642 }
643 }
644
645 #[test]
646 fn test_deserialize_config() {
647 let config = PartitionTreeConfig {
648 dedup: false,
649 ..Default::default()
650 };
651 let json = serde_json::to_string(&config).unwrap();
653 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
654 assert!(config.dedup);
655 assert_eq!(PartitionTreeConfig::default(), config);
656 }
657
658 fn metadata_for_metric_engine() -> RegionMetadataRef {
659 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
660 builder
661 .push_column_metadata(ColumnMetadata {
662 column_schema: ColumnSchema::new(
663 "__table_id",
664 ConcreteDataType::uint32_datatype(),
665 false,
666 ),
667 semantic_type: SemanticType::Tag,
668 column_id: 2147483652,
669 })
670 .push_column_metadata(ColumnMetadata {
671 column_schema: ColumnSchema::new(
672 "__tsid",
673 ConcreteDataType::uint64_datatype(),
674 false,
675 ),
676 semantic_type: SemanticType::Tag,
677 column_id: 2147483651,
678 })
679 .push_column_metadata(ColumnMetadata {
680 column_schema: ColumnSchema::new(
681 "test_label",
682 ConcreteDataType::string_datatype(),
683 false,
684 ),
685 semantic_type: SemanticType::Tag,
686 column_id: 2,
687 })
688 .push_column_metadata(ColumnMetadata {
689 column_schema: ColumnSchema::new(
690 "greptime_timestamp",
691 ConcreteDataType::timestamp_millisecond_datatype(),
692 false,
693 ),
694 semantic_type: SemanticType::Timestamp,
695 column_id: 0,
696 })
697 .push_column_metadata(ColumnMetadata {
698 column_schema: ColumnSchema::new(
699 "greptime_value",
700 ConcreteDataType::float64_datatype(),
701 true,
702 ),
703 semantic_type: SemanticType::Field,
704 column_id: 1,
705 })
706 .primary_key(vec![2147483652, 2147483651, 2]);
707 let region_metadata = builder.build().unwrap();
708 Arc::new(region_metadata)
709 }
710
711 fn build_key_values(
712 metadata: RegionMetadataRef,
713 labels: &[&str],
714 table_id: &[u32],
715 ts_id: &[u64],
716 ts: &[i64],
717 values: &[f64],
718 sequence: u64,
719 ) -> KeyValues {
720 let column_schema = region_metadata_to_row_schema(&metadata);
721
722 let rows = ts
723 .iter()
724 .zip(table_id.iter())
725 .zip(ts_id.iter())
726 .zip(labels.iter())
727 .zip(values.iter())
728 .map(|((((ts, table_id), ts_id), label), val)| {
729 row(vec![
730 ValueData::U32Value(*table_id),
731 ValueData::U64Value(*ts_id),
732 ValueData::StringValue(label.to_string()),
733 ValueData::TimestampMillisecondValue(*ts),
734 ValueData::F64Value(*val),
735 ])
736 })
737 .collect();
738 let mutation = api::v1::Mutation {
739 op_type: 1,
740 sequence,
741 rows: Some(Rows {
742 schema: column_schema,
743 rows,
744 }),
745 write_hint: None,
746 };
747 KeyValues::new(metadata.as_ref(), mutation).unwrap()
748 }
749
750 #[test]
751 fn test_write_freeze() {
752 let metadata = metadata_for_metric_engine();
753 let memtable = PartitionTreeMemtableBuilder::new(
754 PartitionTreeConfig {
755 index_max_keys_per_shard: 40,
756 ..Default::default()
757 },
758 None,
759 )
760 .build(1, &metadata);
761
762 let codec = DensePrimaryKeyCodec::new(&metadata);
763
764 memtable
765 .write(&build_key_values(
766 metadata.clone(),
767 &["daily", "10min", "daily", "10min"],
768 &[1025, 1025, 1025, 1025],
769 &[
770 16442255374049317291,
771 5686004715529701024,
772 16442255374049317291,
773 5686004715529701024,
774 ],
775 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
776 &[0.0, 0.0, 0.0, 0.0],
777 1,
778 ))
779 .unwrap();
780
781 memtable.freeze().unwrap();
782 let new_memtable = memtable.fork(2, &metadata);
783
784 new_memtable
785 .write(&build_key_values(
786 metadata.clone(),
787 &["10min"],
788 &[1025],
789 &[5686004715529701024],
790 &[1714643131000],
791 &[0.1],
792 2,
793 ))
794 .unwrap();
795
796 let mut reader = new_memtable.iter(None, None, None).unwrap();
797 let batch = reader.next().unwrap().unwrap();
798 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
799 if let Value::String(s) = &pk[2] {
800 assert_eq!("10min", s.as_utf8());
801 } else {
802 unreachable!()
803 }
804 }
805
806 fn kv_region_metadata() -> RegionMetadataRef {
807 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
808 builder
809 .push_column_metadata(ColumnMetadata {
810 column_schema: ColumnSchema::new(
811 "ts",
812 ConcreteDataType::timestamp_millisecond_datatype(),
813 false,
814 ),
815 semantic_type: SemanticType::Timestamp,
816 column_id: 0,
817 })
818 .push_column_metadata(ColumnMetadata {
819 column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
820 semantic_type: SemanticType::Tag,
821 column_id: 1,
822 })
823 .push_column_metadata(ColumnMetadata {
824 column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
825 semantic_type: SemanticType::Field,
826 column_id: 2,
827 })
828 .primary_key(vec![1]);
829 let region_metadata = builder.build().unwrap();
830 Arc::new(region_metadata)
831 }
832
833 fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
834 vec![
835 time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
836 tag_column_schema("k", api::v1::ColumnDataType::String),
837 field_column_schema("v", api::v1::ColumnDataType::String),
838 ]
839 }
840
841 fn key_values<T: AsRef<str>>(
842 metadata: &RegionMetadataRef,
843 keys: impl Iterator<Item = T>,
844 ) -> KeyValues {
845 let rows = keys
846 .map(|c| {
847 row(vec![
848 ValueData::TimestampMillisecondValue(0),
849 ValueData::StringValue(c.as_ref().to_string()),
850 ValueData::StringValue(c.as_ref().to_string()),
851 ])
852 })
853 .collect();
854 let mutation = Mutation {
855 op_type: OpType::Put as i32,
856 sequence: 0,
857 rows: Some(Rows {
858 schema: kv_column_schemas(),
859 rows,
860 }),
861 write_hint: None,
862 };
863 KeyValues::new(metadata, mutation).unwrap()
864 }
865
866 fn collect_kvs(
867 iter: BoxedBatchIterator,
868 region_meta: &RegionMetadataRef,
869 ) -> HashMap<String, String> {
870 let decoder = DensePrimaryKeyCodec::new(region_meta);
871 let mut res = HashMap::new();
872 for v in iter {
873 let batch = v.unwrap();
874 let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
875 let field_vector = batch.fields()[0]
876 .data
877 .as_any()
878 .downcast_ref::<StringVector>()
879 .unwrap();
880 for row in 0..batch.num_rows() {
881 res.insert(
882 values[0].as_string().unwrap(),
883 field_vector.get(row).as_string().unwrap(),
884 );
885 }
886 }
887 res
888 }
889
890 #[test]
891 fn test_reorder_insert_key_values() {
892 let metadata = kv_region_metadata();
893 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
894 .build(1, &metadata);
895
896 memtable
897 .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
898 .unwrap();
899 memtable.freeze().unwrap();
900 assert_eq!(
901 collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
902 ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
903 );
904 let forked = memtable.fork(2, &metadata);
905
906 let keys = ["c", "f", "i", "h", "b", "e", "g"];
907 forked.write(&key_values(&metadata, keys.iter())).unwrap();
908 forked.freeze().unwrap();
909 assert_eq!(
910 collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
911 keys.iter()
912 .map(|c| (c.to_string(), c.to_string()))
913 .collect()
914 );
915
916 let forked2 = forked.fork(3, &metadata);
917
918 let keys = ["g", "e", "a", "f", "b", "c", "h"];
919 forked2.write(&key_values(&metadata, keys.iter())).unwrap();
920
921 let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
922 let expected = keys
923 .iter()
924 .map(|c| (c.to_string(), c.to_string()))
925 .collect::<HashMap<_, _>>();
926 assert_eq!(kvs, expected);
927 }
928}