mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
45    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
46    MemtableStats, PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55/// Id of a shard, only unique inside a partition.
56type ShardId = u32;
57/// Index of a primary key in a shard.
58type PkIndex = u16;
59
60/// Id of a primary key inside a tree.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63    shard_id: ShardId,
64    pk_index: PkIndex,
65}
66
67// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
68// memtable then we will use a lot memory. We should find a better way to control the
69// dictionary size.
70/// Config for the partition tree memtable.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74    /// Max keys in an index shard.
75    pub index_max_keys_per_shard: usize,
76    /// Number of rows to freeze a data part.
77    pub data_freeze_threshold: usize,
78    /// Whether to delete duplicates rows.
79    ///
80    /// Skips deserializing as it should be determined by whether the
81    /// table is append only.
82    #[serde(skip_deserializing)]
83    pub dedup: bool,
84    /// Total bytes of dictionary to keep in fork.
85    pub fork_dictionary_bytes: ReadableSize,
86    /// Merge mode of the tree.
87    #[serde(skip_deserializing)]
88    pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92    fn default() -> Self {
93        let mut fork_dictionary_bytes = ReadableSize::mb(512);
94        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95            let adjust_dictionary_bytes =
96                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97            if adjust_dictionary_bytes.0 > 0 {
98                fork_dictionary_bytes = adjust_dictionary_bytes;
99            }
100        }
101
102        Self {
103            index_max_keys_per_shard: 8192,
104            data_freeze_threshold: 131072,
105            dedup: true,
106            fork_dictionary_bytes,
107            merge_mode: MergeMode::LastRow,
108        }
109    }
110}
111
112/// Memtable based on a partition tree.
113pub struct PartitionTreeMemtable {
114    id: MemtableId,
115    tree: Arc<PartitionTree>,
116    alloc_tracker: AllocTracker,
117    max_timestamp: AtomicI64,
118    min_timestamp: AtomicI64,
119    max_sequence: AtomicU64,
120    /// Total written rows in memtable. This also includes deleted and duplicated rows.
121    num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("PartitionTreeMemtable")
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl Memtable for PartitionTreeMemtable {
133    fn id(&self) -> MemtableId {
134        self.id
135    }
136
137    fn write(&self, kvs: &KeyValues) -> Result<()> {
138        if kvs.is_empty() {
139            return Ok(());
140        }
141
142        // TODO(yingwen): Validate schema while inserting rows.
143
144        let mut metrics = WriteMetrics::default();
145        let mut pk_buffer = Vec::new();
146        // Ensures the memtable always updates stats.
147        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149        if res.is_ok() {
150            metrics.max_sequence = kvs.max_sequence();
151            metrics.num_rows = kvs.num_rows();
152            self.update_stats(&metrics);
153        }
154        res
155    }
156
157    fn write_one(&self, key_value: KeyValue) -> Result<()> {
158        let mut metrics = WriteMetrics::default();
159        let mut pk_buffer = Vec::new();
160        // Ensures the memtable always updates stats.
161        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163        // update max_sequence
164        if res.is_ok() {
165            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166            metrics.num_rows = 1;
167            self.update_stats(&metrics);
168        }
169        res
170    }
171
172    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173        UnsupportedOperationSnafu {
174            err_msg: "PartitionTreeMemtable does not support write_bulk",
175        }
176        .fail()
177    }
178
179    #[cfg(any(test, feature = "test"))]
180    fn iter(
181        &self,
182        projection: Option<&[ColumnId]>,
183        predicate: Option<Predicate>,
184        sequence: Option<SequenceNumber>,
185    ) -> Result<BoxedBatchIterator> {
186        self.tree.read(projection, predicate, sequence, None)
187    }
188
189    fn ranges(
190        &self,
191        projection: Option<&[ColumnId]>,
192        predicate: PredicateGroup,
193        sequence: Option<SequenceNumber>,
194        _for_flush: bool,
195    ) -> Result<MemtableRanges> {
196        let projection = projection.map(|ids| ids.to_vec());
197        let builder = Box::new(PartitionTreeIterBuilder {
198            tree: self.tree.clone(),
199            projection,
200            predicate: predicate.predicate().cloned(),
201            sequence,
202        });
203        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
204
205        let stats = self.stats();
206        Ok(MemtableRanges {
207            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
208            stats,
209        })
210    }
211
212    fn is_empty(&self) -> bool {
213        self.tree.is_empty()
214    }
215
216    fn freeze(&self) -> Result<()> {
217        self.alloc_tracker.done_allocating();
218
219        self.tree.freeze()
220    }
221
222    fn stats(&self) -> MemtableStats {
223        let estimated_bytes = self.alloc_tracker.bytes_allocated();
224
225        if estimated_bytes == 0 {
226            // no rows ever written
227            return MemtableStats {
228                estimated_bytes,
229                time_range: None,
230                num_rows: 0,
231                num_ranges: 0,
232                max_sequence: 0,
233                series_count: 0,
234            };
235        }
236
237        let ts_type = self
238            .tree
239            .metadata
240            .time_index_column()
241            .column_schema
242            .data_type
243            .clone()
244            .as_timestamp()
245            .expect("Timestamp column must have timestamp type");
246        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
247        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
248        let series_count = self.tree.series_count();
249        MemtableStats {
250            estimated_bytes,
251            time_range: Some((min_timestamp, max_timestamp)),
252            num_rows: self.num_rows.load(Ordering::Relaxed),
253            num_ranges: 1,
254            max_sequence: self.max_sequence.load(Ordering::Relaxed),
255            series_count,
256        }
257    }
258
259    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
260        let tree = self.tree.fork(metadata.clone());
261
262        let memtable = PartitionTreeMemtable::with_tree(id, tree);
263        Arc::new(memtable)
264    }
265}
266
267impl PartitionTreeMemtable {
268    /// Returns a new memtable.
269    pub fn new(
270        id: MemtableId,
271        row_codec: Arc<dyn PrimaryKeyCodec>,
272        metadata: RegionMetadataRef,
273        write_buffer_manager: Option<WriteBufferManagerRef>,
274        config: &PartitionTreeConfig,
275    ) -> Self {
276        Self::with_tree(
277            id,
278            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
279        )
280    }
281
282    /// Creates a mutable memtable from the tree.
283    ///
284    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
285    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
286        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
287
288        Self {
289            id,
290            tree: Arc::new(tree),
291            alloc_tracker,
292            max_timestamp: AtomicI64::new(i64::MIN),
293            min_timestamp: AtomicI64::new(i64::MAX),
294            num_rows: AtomicUsize::new(0),
295            max_sequence: AtomicU64::new(0),
296        }
297    }
298
299    /// Updates stats of the memtable.
300    fn update_stats(&self, metrics: &WriteMetrics) {
301        // Only let the tracker tracks value bytes.
302        self.alloc_tracker.on_allocation(metrics.value_bytes);
303        self.max_timestamp
304            .fetch_max(metrics.max_ts, Ordering::SeqCst);
305        self.min_timestamp
306            .fetch_min(metrics.min_ts, Ordering::SeqCst);
307        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
308        self.max_sequence
309            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
310    }
311
312    #[cfg(any(test, feature = "test"))]
313    pub fn iter(
314        &self,
315        projection: Option<&[ColumnId]>,
316        predicate: Option<Predicate>,
317        sequence: Option<SequenceNumber>,
318    ) -> Result<BoxedBatchIterator> {
319        self.tree.read(projection, predicate, sequence, None)
320    }
321}
322
323/// Builder to build a [PartitionTreeMemtable].
324#[derive(Debug, Default)]
325pub struct PartitionTreeMemtableBuilder {
326    config: PartitionTreeConfig,
327    write_buffer_manager: Option<WriteBufferManagerRef>,
328}
329
330impl PartitionTreeMemtableBuilder {
331    /// Creates a new builder with specific `write_buffer_manager`.
332    pub fn new(
333        config: PartitionTreeConfig,
334        write_buffer_manager: Option<WriteBufferManagerRef>,
335    ) -> Self {
336        Self {
337            config,
338            write_buffer_manager,
339        }
340    }
341}
342
343impl MemtableBuilder for PartitionTreeMemtableBuilder {
344    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
345        let codec = build_primary_key_codec(metadata);
346        Arc::new(PartitionTreeMemtable::new(
347            id,
348            codec,
349            metadata.clone(),
350            self.write_buffer_manager.clone(),
351            &self.config,
352        ))
353    }
354
355    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
356        false
357    }
358}
359
360struct PartitionTreeIterBuilder {
361    tree: Arc<PartitionTree>,
362    projection: Option<Vec<ColumnId>>,
363    predicate: Option<Predicate>,
364    sequence: Option<SequenceNumber>,
365}
366
367impl IterBuilder for PartitionTreeIterBuilder {
368    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
369        self.tree.read(
370            self.projection.as_deref(),
371            self.predicate.clone(),
372            self.sequence,
373            metrics,
374        )
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use std::collections::HashMap;
381    use std::sync::Arc;
382
383    use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
384    use api::v1::value::ValueData;
385    use api::v1::{Mutation, OpType, Rows, SemanticType};
386    use common_time::Timestamp;
387    use datafusion_common::Column;
388    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
389    use datatypes::data_type::ConcreteDataType;
390    use datatypes::prelude::Vector;
391    use datatypes::scalars::ScalarVector;
392    use datatypes::schema::ColumnSchema;
393    use datatypes::value::Value;
394    use datatypes::vectors::{Int64Vector, StringVector};
395    use mito_codec::row_converter::DensePrimaryKeyCodec;
396    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
397    use store_api::storage::RegionId;
398
399    use super::*;
400    use crate::test_util::memtable_util::{
401        self, collect_iter_timestamps, region_metadata_to_row_schema,
402    };
403
404    #[test]
405    fn test_memtable_sorted_input() {
406        write_iter_sorted_input(true);
407        write_iter_sorted_input(false);
408    }
409
410    fn write_iter_sorted_input(has_pk: bool) {
411        let metadata = if has_pk {
412            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
413        } else {
414            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
415        };
416        let timestamps = (0..100).collect::<Vec<_>>();
417        let kvs =
418            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
419        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
420        let memtable = PartitionTreeMemtable::new(
421            1,
422            codec,
423            metadata.clone(),
424            None,
425            &PartitionTreeConfig::default(),
426        );
427        memtable.write(&kvs).unwrap();
428
429        let expected_ts = kvs
430            .iter()
431            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
432            .collect::<Vec<_>>();
433
434        let iter = memtable.iter(None, None, None).unwrap();
435        let read = collect_iter_timestamps(iter);
436        assert_eq!(expected_ts, read);
437
438        let stats = memtable.stats();
439        assert!(stats.bytes_allocated() > 0);
440        assert_eq!(
441            Some((
442                Timestamp::new_millisecond(0),
443                Timestamp::new_millisecond(99)
444            )),
445            stats.time_range()
446        );
447    }
448
449    #[test]
450    fn test_memtable_unsorted_input() {
451        write_iter_unsorted_input(true);
452        write_iter_unsorted_input(false);
453    }
454
455    fn write_iter_unsorted_input(has_pk: bool) {
456        let metadata = if has_pk {
457            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
458        } else {
459            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
460        };
461        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
462        let memtable = PartitionTreeMemtable::new(
463            1,
464            codec,
465            metadata.clone(),
466            None,
467            &PartitionTreeConfig::default(),
468        );
469
470        let kvs = memtable_util::build_key_values(
471            &metadata,
472            "hello".to_string(),
473            0,
474            &[1, 3, 7, 5, 6],
475            0, // sequence 0, 1, 2, 3, 4
476        );
477        memtable.write(&kvs).unwrap();
478
479        let kvs = memtable_util::build_key_values(
480            &metadata,
481            "hello".to_string(),
482            0,
483            &[5, 2, 4, 0, 7],
484            5, // sequence 5, 6, 7, 8, 9
485        );
486        memtable.write(&kvs).unwrap();
487
488        let iter = memtable.iter(None, None, None).unwrap();
489        let read = collect_iter_timestamps(iter);
490        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
491
492        let iter = memtable.iter(None, None, None).unwrap();
493        let read = iter
494            .flat_map(|batch| {
495                batch
496                    .unwrap()
497                    .sequences()
498                    .iter_data()
499                    .collect::<Vec<_>>()
500                    .into_iter()
501            })
502            .map(|v| v.unwrap())
503            .collect::<Vec<_>>();
504        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
505
506        let stats = memtable.stats();
507        assert!(stats.bytes_allocated() > 0);
508        assert_eq!(
509            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
510            stats.time_range()
511        );
512    }
513
514    #[test]
515    fn test_memtable_projection() {
516        write_iter_projection(true);
517        write_iter_projection(false);
518    }
519
520    fn write_iter_projection(has_pk: bool) {
521        let metadata = if has_pk {
522            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
523        } else {
524            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
525        };
526        // Try to build a memtable via the builder.
527        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
528            .build(1, &metadata);
529
530        let expect = (0..100).collect::<Vec<_>>();
531        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
532        memtable.write(&kvs).unwrap();
533        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
534
535        let mut v0_all = vec![];
536        for res in iter {
537            let batch = res.unwrap();
538            assert_eq!(1, batch.fields().len());
539            let v0 = batch
540                .fields()
541                .first()
542                .unwrap()
543                .data
544                .as_any()
545                .downcast_ref::<Int64Vector>()
546                .unwrap();
547            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
548        }
549        assert_eq!(expect, v0_all);
550    }
551
552    #[test]
553    fn test_write_iter_multi_keys() {
554        write_iter_multi_keys(1, 100);
555        write_iter_multi_keys(2, 100);
556        write_iter_multi_keys(4, 100);
557        write_iter_multi_keys(8, 5);
558        write_iter_multi_keys(2, 10);
559    }
560
561    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
562        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
563        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
564        let memtable = PartitionTreeMemtable::new(
565            1,
566            codec,
567            metadata.clone(),
568            None,
569            &PartitionTreeConfig {
570                index_max_keys_per_shard: max_keys,
571                data_freeze_threshold: freeze_threshold,
572                ..Default::default()
573            },
574        );
575
576        let mut data = Vec::new();
577        // 4 partitions, each partition 4 pks.
578        for i in 0..4 {
579            for j in 0..4 {
580                // key: i, a{j}
581                let timestamps = [11, 13, 1, 5, 3, 7, 9];
582                let key = format!("a{j}");
583                let kvs =
584                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
585                memtable.write(&kvs).unwrap();
586                for ts in timestamps {
587                    data.push((i, key.clone(), ts));
588                }
589            }
590            for j in 0..4 {
591                // key: i, a{j}
592                let timestamps = [10, 2, 4, 8, 6];
593                let key = format!("a{j}");
594                let kvs =
595                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
596                memtable.write(&kvs).unwrap();
597                for ts in timestamps {
598                    data.push((i, key.clone(), ts));
599                }
600            }
601        }
602        data.sort_unstable();
603
604        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
605        let iter = memtable.iter(None, None, None).unwrap();
606        let read = collect_iter_timestamps(iter);
607        assert_eq!(expect, read);
608    }
609
610    #[test]
611    fn test_memtable_filter() {
612        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
613        // Try to build a memtable via the builder.
614        let memtable = PartitionTreeMemtableBuilder::new(
615            PartitionTreeConfig {
616                index_max_keys_per_shard: 40,
617                ..Default::default()
618            },
619            None,
620        )
621        .build(1, &metadata);
622
623        for i in 0..100 {
624            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
625            let kvs =
626                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
627            memtable.write(&kvs).unwrap();
628        }
629
630        for i in 0..100 {
631            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
632            let expr = Expr::BinaryExpr(BinaryExpr {
633                left: Box::new(Expr::Column(Column::from_name("k1"))),
634                op: Operator::Eq,
635                right: Box::new((i as u32).lit()),
636            });
637            let iter = memtable
638                .iter(None, Some(Predicate::new(vec![expr])), None)
639                .unwrap();
640            let read = collect_iter_timestamps(iter);
641            assert_eq!(timestamps, read);
642        }
643    }
644
645    #[test]
646    fn test_deserialize_config() {
647        let config = PartitionTreeConfig {
648            dedup: false,
649            ..Default::default()
650        };
651        // Creates a json with dedup = false.
652        let json = serde_json::to_string(&config).unwrap();
653        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
654        assert!(config.dedup);
655        assert_eq!(PartitionTreeConfig::default(), config);
656    }
657
658    fn metadata_for_metric_engine() -> RegionMetadataRef {
659        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
660        builder
661            .push_column_metadata(ColumnMetadata {
662                column_schema: ColumnSchema::new(
663                    "__table_id",
664                    ConcreteDataType::uint32_datatype(),
665                    false,
666                ),
667                semantic_type: SemanticType::Tag,
668                column_id: 2147483652,
669            })
670            .push_column_metadata(ColumnMetadata {
671                column_schema: ColumnSchema::new(
672                    "__tsid",
673                    ConcreteDataType::uint64_datatype(),
674                    false,
675                ),
676                semantic_type: SemanticType::Tag,
677                column_id: 2147483651,
678            })
679            .push_column_metadata(ColumnMetadata {
680                column_schema: ColumnSchema::new(
681                    "test_label",
682                    ConcreteDataType::string_datatype(),
683                    false,
684                ),
685                semantic_type: SemanticType::Tag,
686                column_id: 2,
687            })
688            .push_column_metadata(ColumnMetadata {
689                column_schema: ColumnSchema::new(
690                    "greptime_timestamp",
691                    ConcreteDataType::timestamp_millisecond_datatype(),
692                    false,
693                ),
694                semantic_type: SemanticType::Timestamp,
695                column_id: 0,
696            })
697            .push_column_metadata(ColumnMetadata {
698                column_schema: ColumnSchema::new(
699                    "greptime_value",
700                    ConcreteDataType::float64_datatype(),
701                    true,
702                ),
703                semantic_type: SemanticType::Field,
704                column_id: 1,
705            })
706            .primary_key(vec![2147483652, 2147483651, 2]);
707        let region_metadata = builder.build().unwrap();
708        Arc::new(region_metadata)
709    }
710
711    fn build_key_values(
712        metadata: RegionMetadataRef,
713        labels: &[&str],
714        table_id: &[u32],
715        ts_id: &[u64],
716        ts: &[i64],
717        values: &[f64],
718        sequence: u64,
719    ) -> KeyValues {
720        let column_schema = region_metadata_to_row_schema(&metadata);
721
722        let rows = ts
723            .iter()
724            .zip(table_id.iter())
725            .zip(ts_id.iter())
726            .zip(labels.iter())
727            .zip(values.iter())
728            .map(|((((ts, table_id), ts_id), label), val)| {
729                row(vec![
730                    ValueData::U32Value(*table_id),
731                    ValueData::U64Value(*ts_id),
732                    ValueData::StringValue(label.to_string()),
733                    ValueData::TimestampMillisecondValue(*ts),
734                    ValueData::F64Value(*val),
735                ])
736            })
737            .collect();
738        let mutation = api::v1::Mutation {
739            op_type: 1,
740            sequence,
741            rows: Some(Rows {
742                schema: column_schema,
743                rows,
744            }),
745            write_hint: None,
746        };
747        KeyValues::new(metadata.as_ref(), mutation).unwrap()
748    }
749
750    #[test]
751    fn test_write_freeze() {
752        let metadata = metadata_for_metric_engine();
753        let memtable = PartitionTreeMemtableBuilder::new(
754            PartitionTreeConfig {
755                index_max_keys_per_shard: 40,
756                ..Default::default()
757            },
758            None,
759        )
760        .build(1, &metadata);
761
762        let codec = DensePrimaryKeyCodec::new(&metadata);
763
764        memtable
765            .write(&build_key_values(
766                metadata.clone(),
767                &["daily", "10min", "daily", "10min"],
768                &[1025, 1025, 1025, 1025],
769                &[
770                    16442255374049317291,
771                    5686004715529701024,
772                    16442255374049317291,
773                    5686004715529701024,
774                ],
775                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
776                &[0.0, 0.0, 0.0, 0.0],
777                1,
778            ))
779            .unwrap();
780
781        memtable.freeze().unwrap();
782        let new_memtable = memtable.fork(2, &metadata);
783
784        new_memtable
785            .write(&build_key_values(
786                metadata.clone(),
787                &["10min"],
788                &[1025],
789                &[5686004715529701024],
790                &[1714643131000],
791                &[0.1],
792                2,
793            ))
794            .unwrap();
795
796        let mut reader = new_memtable.iter(None, None, None).unwrap();
797        let batch = reader.next().unwrap().unwrap();
798        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
799        if let Value::String(s) = &pk[2] {
800            assert_eq!("10min", s.as_utf8());
801        } else {
802            unreachable!()
803        }
804    }
805
806    fn kv_region_metadata() -> RegionMetadataRef {
807        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
808        builder
809            .push_column_metadata(ColumnMetadata {
810                column_schema: ColumnSchema::new(
811                    "ts",
812                    ConcreteDataType::timestamp_millisecond_datatype(),
813                    false,
814                ),
815                semantic_type: SemanticType::Timestamp,
816                column_id: 0,
817            })
818            .push_column_metadata(ColumnMetadata {
819                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
820                semantic_type: SemanticType::Tag,
821                column_id: 1,
822            })
823            .push_column_metadata(ColumnMetadata {
824                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
825                semantic_type: SemanticType::Field,
826                column_id: 2,
827            })
828            .primary_key(vec![1]);
829        let region_metadata = builder.build().unwrap();
830        Arc::new(region_metadata)
831    }
832
833    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
834        vec![
835            time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
836            tag_column_schema("k", api::v1::ColumnDataType::String),
837            field_column_schema("v", api::v1::ColumnDataType::String),
838        ]
839    }
840
841    fn key_values<T: AsRef<str>>(
842        metadata: &RegionMetadataRef,
843        keys: impl Iterator<Item = T>,
844    ) -> KeyValues {
845        let rows = keys
846            .map(|c| {
847                row(vec![
848                    ValueData::TimestampMillisecondValue(0),
849                    ValueData::StringValue(c.as_ref().to_string()),
850                    ValueData::StringValue(c.as_ref().to_string()),
851                ])
852            })
853            .collect();
854        let mutation = Mutation {
855            op_type: OpType::Put as i32,
856            sequence: 0,
857            rows: Some(Rows {
858                schema: kv_column_schemas(),
859                rows,
860            }),
861            write_hint: None,
862        };
863        KeyValues::new(metadata, mutation).unwrap()
864    }
865
866    fn collect_kvs(
867        iter: BoxedBatchIterator,
868        region_meta: &RegionMetadataRef,
869    ) -> HashMap<String, String> {
870        let decoder = DensePrimaryKeyCodec::new(region_meta);
871        let mut res = HashMap::new();
872        for v in iter {
873            let batch = v.unwrap();
874            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
875            let field_vector = batch.fields()[0]
876                .data
877                .as_any()
878                .downcast_ref::<StringVector>()
879                .unwrap();
880            for row in 0..batch.num_rows() {
881                res.insert(
882                    values[0].as_string().unwrap(),
883                    field_vector.get(row).as_string().unwrap(),
884                );
885            }
886        }
887        res
888    }
889
890    #[test]
891    fn test_reorder_insert_key_values() {
892        let metadata = kv_region_metadata();
893        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
894            .build(1, &metadata);
895
896        memtable
897            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
898            .unwrap();
899        memtable.freeze().unwrap();
900        assert_eq!(
901            collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
902            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
903        );
904        let forked = memtable.fork(2, &metadata);
905
906        let keys = ["c", "f", "i", "h", "b", "e", "g"];
907        forked.write(&key_values(&metadata, keys.iter())).unwrap();
908        forked.freeze().unwrap();
909        assert_eq!(
910            collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
911            keys.iter()
912                .map(|c| (c.to_string(), c.to_string()))
913                .collect()
914        );
915
916        let forked2 = forked.fork(3, &metadata);
917
918        let keys = ["g", "e", "a", "f", "b", "c", "h"];
919        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
920
921        let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
922        let expected = keys
923            .iter()
924            .map(|c| (c.to_string(), c.to_string()))
925            .collect::<HashMap<_, _>>();
926        assert_eq!(kvs, expected);
927    }
928}