mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
45    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
46    MemtableStats, PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55/// Id of a shard, only unique inside a partition.
56type ShardId = u32;
57/// Index of a primary key in a shard.
58type PkIndex = u16;
59
60/// Id of a primary key inside a tree.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63    shard_id: ShardId,
64    pk_index: PkIndex,
65}
66
67// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
68// memtable then we will use a lot memory. We should find a better way to control the
69// dictionary size.
70/// Config for the partition tree memtable.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74    /// Max keys in an index shard.
75    pub index_max_keys_per_shard: usize,
76    /// Number of rows to freeze a data part.
77    pub data_freeze_threshold: usize,
78    /// Whether to delete duplicates rows.
79    ///
80    /// Skips deserializing as it should be determined by whether the
81    /// table is append only.
82    #[serde(skip_deserializing)]
83    pub dedup: bool,
84    /// Total bytes of dictionary to keep in fork.
85    pub fork_dictionary_bytes: ReadableSize,
86    /// Merge mode of the tree.
87    #[serde(skip_deserializing)]
88    pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92    fn default() -> Self {
93        let mut fork_dictionary_bytes = ReadableSize::mb(512);
94        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95            let adjust_dictionary_bytes =
96                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97            if adjust_dictionary_bytes.0 > 0 {
98                fork_dictionary_bytes = adjust_dictionary_bytes;
99            }
100        }
101
102        Self {
103            index_max_keys_per_shard: 8192,
104            data_freeze_threshold: 131072,
105            dedup: true,
106            fork_dictionary_bytes,
107            merge_mode: MergeMode::LastRow,
108        }
109    }
110}
111
112/// Memtable based on a partition tree.
113pub struct PartitionTreeMemtable {
114    id: MemtableId,
115    tree: Arc<PartitionTree>,
116    alloc_tracker: AllocTracker,
117    max_timestamp: AtomicI64,
118    min_timestamp: AtomicI64,
119    max_sequence: AtomicU64,
120    /// Total written rows in memtable. This also includes deleted and duplicated rows.
121    num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("PartitionTreeMemtable")
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl Memtable for PartitionTreeMemtable {
133    fn id(&self) -> MemtableId {
134        self.id
135    }
136
137    fn write(&self, kvs: &KeyValues) -> Result<()> {
138        if kvs.is_empty() {
139            return Ok(());
140        }
141
142        // TODO(yingwen): Validate schema while inserting rows.
143
144        let mut metrics = WriteMetrics::default();
145        let mut pk_buffer = Vec::new();
146        // Ensures the memtable always updates stats.
147        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149        if res.is_ok() {
150            metrics.max_sequence = kvs.max_sequence();
151            metrics.num_rows = kvs.num_rows();
152            self.update_stats(&metrics);
153        }
154        res
155    }
156
157    fn write_one(&self, key_value: KeyValue) -> Result<()> {
158        let mut metrics = WriteMetrics::default();
159        let mut pk_buffer = Vec::new();
160        // Ensures the memtable always updates stats.
161        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163        // update max_sequence
164        if res.is_ok() {
165            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166            metrics.num_rows = 1;
167            self.update_stats(&metrics);
168        }
169        res
170    }
171
172    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173        UnsupportedOperationSnafu {
174            err_msg: "PartitionTreeMemtable does not support write_bulk",
175        }
176        .fail()
177    }
178
179    #[cfg(any(test, feature = "test"))]
180    fn iter(
181        &self,
182        projection: Option<&[ColumnId]>,
183        predicate: Option<Predicate>,
184        sequence: Option<SequenceNumber>,
185    ) -> Result<BoxedBatchIterator> {
186        self.tree.read(projection, predicate, sequence, None)
187    }
188
189    fn ranges(
190        &self,
191        projection: Option<&[ColumnId]>,
192        predicate: PredicateGroup,
193        sequence: Option<SequenceNumber>,
194    ) -> Result<MemtableRanges> {
195        let projection = projection.map(|ids| ids.to_vec());
196        let builder = Box::new(PartitionTreeIterBuilder {
197            tree: self.tree.clone(),
198            projection,
199            predicate: predicate.predicate().cloned(),
200            sequence,
201        });
202        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
203
204        let stats = self.stats();
205        Ok(MemtableRanges {
206            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
207            stats,
208        })
209    }
210
211    fn is_empty(&self) -> bool {
212        self.tree.is_empty()
213    }
214
215    fn freeze(&self) -> Result<()> {
216        self.alloc_tracker.done_allocating();
217
218        self.tree.freeze()
219    }
220
221    fn stats(&self) -> MemtableStats {
222        let estimated_bytes = self.alloc_tracker.bytes_allocated();
223
224        if estimated_bytes == 0 {
225            // no rows ever written
226            return MemtableStats {
227                estimated_bytes,
228                time_range: None,
229                num_rows: 0,
230                num_ranges: 0,
231                max_sequence: 0,
232                series_count: 0,
233            };
234        }
235
236        let ts_type = self
237            .tree
238            .metadata
239            .time_index_column()
240            .column_schema
241            .data_type
242            .clone()
243            .as_timestamp()
244            .expect("Timestamp column must have timestamp type");
245        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
246        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
247        let series_count = self.tree.series_count();
248        MemtableStats {
249            estimated_bytes,
250            time_range: Some((min_timestamp, max_timestamp)),
251            num_rows: self.num_rows.load(Ordering::Relaxed),
252            num_ranges: 1,
253            max_sequence: self.max_sequence.load(Ordering::Relaxed),
254            series_count,
255        }
256    }
257
258    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
259        let tree = self.tree.fork(metadata.clone());
260
261        let memtable = PartitionTreeMemtable::with_tree(id, tree);
262        Arc::new(memtable)
263    }
264}
265
266impl PartitionTreeMemtable {
267    /// Returns a new memtable.
268    pub fn new(
269        id: MemtableId,
270        row_codec: Arc<dyn PrimaryKeyCodec>,
271        metadata: RegionMetadataRef,
272        write_buffer_manager: Option<WriteBufferManagerRef>,
273        config: &PartitionTreeConfig,
274    ) -> Self {
275        Self::with_tree(
276            id,
277            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
278        )
279    }
280
281    /// Creates a mutable memtable from the tree.
282    ///
283    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
284    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
285        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
286
287        Self {
288            id,
289            tree: Arc::new(tree),
290            alloc_tracker,
291            max_timestamp: AtomicI64::new(i64::MIN),
292            min_timestamp: AtomicI64::new(i64::MAX),
293            num_rows: AtomicUsize::new(0),
294            max_sequence: AtomicU64::new(0),
295        }
296    }
297
298    /// Updates stats of the memtable.
299    fn update_stats(&self, metrics: &WriteMetrics) {
300        // Only let the tracker tracks value bytes.
301        self.alloc_tracker.on_allocation(metrics.value_bytes);
302        self.max_timestamp
303            .fetch_max(metrics.max_ts, Ordering::SeqCst);
304        self.min_timestamp
305            .fetch_min(metrics.min_ts, Ordering::SeqCst);
306        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
307        self.max_sequence
308            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
309    }
310
311    #[cfg(any(test, feature = "test"))]
312    pub fn iter(
313        &self,
314        projection: Option<&[ColumnId]>,
315        predicate: Option<Predicate>,
316        sequence: Option<SequenceNumber>,
317    ) -> Result<BoxedBatchIterator> {
318        self.tree.read(projection, predicate, sequence, None)
319    }
320}
321
322/// Builder to build a [PartitionTreeMemtable].
323#[derive(Debug, Default)]
324pub struct PartitionTreeMemtableBuilder {
325    config: PartitionTreeConfig,
326    write_buffer_manager: Option<WriteBufferManagerRef>,
327}
328
329impl PartitionTreeMemtableBuilder {
330    /// Creates a new builder with specific `write_buffer_manager`.
331    pub fn new(
332        config: PartitionTreeConfig,
333        write_buffer_manager: Option<WriteBufferManagerRef>,
334    ) -> Self {
335        Self {
336            config,
337            write_buffer_manager,
338        }
339    }
340}
341
342impl MemtableBuilder for PartitionTreeMemtableBuilder {
343    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
344        let codec = build_primary_key_codec(metadata);
345        Arc::new(PartitionTreeMemtable::new(
346            id,
347            codec,
348            metadata.clone(),
349            self.write_buffer_manager.clone(),
350            &self.config,
351        ))
352    }
353
354    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
355        false
356    }
357}
358
359struct PartitionTreeIterBuilder {
360    tree: Arc<PartitionTree>,
361    projection: Option<Vec<ColumnId>>,
362    predicate: Option<Predicate>,
363    sequence: Option<SequenceNumber>,
364}
365
366impl IterBuilder for PartitionTreeIterBuilder {
367    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
368        self.tree.read(
369            self.projection.as_deref(),
370            self.predicate.clone(),
371            self.sequence,
372            metrics,
373        )
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use std::collections::HashMap;
380    use std::sync::Arc;
381
382    use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
383    use api::v1::value::ValueData;
384    use api::v1::{Mutation, OpType, Rows, SemanticType};
385    use common_time::Timestamp;
386    use datafusion_common::Column;
387    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
388    use datatypes::data_type::ConcreteDataType;
389    use datatypes::prelude::Vector;
390    use datatypes::scalars::ScalarVector;
391    use datatypes::schema::ColumnSchema;
392    use datatypes::value::Value;
393    use datatypes::vectors::{Int64Vector, StringVector};
394    use mito_codec::row_converter::DensePrimaryKeyCodec;
395    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
396    use store_api::storage::RegionId;
397
398    use super::*;
399    use crate::test_util::memtable_util::{
400        self, collect_iter_timestamps, region_metadata_to_row_schema,
401    };
402
403    #[test]
404    fn test_memtable_sorted_input() {
405        write_iter_sorted_input(true);
406        write_iter_sorted_input(false);
407    }
408
409    fn write_iter_sorted_input(has_pk: bool) {
410        let metadata = if has_pk {
411            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
412        } else {
413            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
414        };
415        let timestamps = (0..100).collect::<Vec<_>>();
416        let kvs =
417            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
418        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
419        let memtable = PartitionTreeMemtable::new(
420            1,
421            codec,
422            metadata.clone(),
423            None,
424            &PartitionTreeConfig::default(),
425        );
426        memtable.write(&kvs).unwrap();
427
428        let expected_ts = kvs
429            .iter()
430            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
431            .collect::<Vec<_>>();
432
433        let iter = memtable.iter(None, None, None).unwrap();
434        let read = collect_iter_timestamps(iter);
435        assert_eq!(expected_ts, read);
436
437        let stats = memtable.stats();
438        assert!(stats.bytes_allocated() > 0);
439        assert_eq!(
440            Some((
441                Timestamp::new_millisecond(0),
442                Timestamp::new_millisecond(99)
443            )),
444            stats.time_range()
445        );
446    }
447
448    #[test]
449    fn test_memtable_unsorted_input() {
450        write_iter_unsorted_input(true);
451        write_iter_unsorted_input(false);
452    }
453
454    fn write_iter_unsorted_input(has_pk: bool) {
455        let metadata = if has_pk {
456            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
457        } else {
458            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
459        };
460        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
461        let memtable = PartitionTreeMemtable::new(
462            1,
463            codec,
464            metadata.clone(),
465            None,
466            &PartitionTreeConfig::default(),
467        );
468
469        let kvs = memtable_util::build_key_values(
470            &metadata,
471            "hello".to_string(),
472            0,
473            &[1, 3, 7, 5, 6],
474            0, // sequence 0, 1, 2, 3, 4
475        );
476        memtable.write(&kvs).unwrap();
477
478        let kvs = memtable_util::build_key_values(
479            &metadata,
480            "hello".to_string(),
481            0,
482            &[5, 2, 4, 0, 7],
483            5, // sequence 5, 6, 7, 8, 9
484        );
485        memtable.write(&kvs).unwrap();
486
487        let iter = memtable.iter(None, None, None).unwrap();
488        let read = collect_iter_timestamps(iter);
489        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
490
491        let iter = memtable.iter(None, None, None).unwrap();
492        let read = iter
493            .flat_map(|batch| {
494                batch
495                    .unwrap()
496                    .sequences()
497                    .iter_data()
498                    .collect::<Vec<_>>()
499                    .into_iter()
500            })
501            .map(|v| v.unwrap())
502            .collect::<Vec<_>>();
503        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
504
505        let stats = memtable.stats();
506        assert!(stats.bytes_allocated() > 0);
507        assert_eq!(
508            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
509            stats.time_range()
510        );
511    }
512
513    #[test]
514    fn test_memtable_projection() {
515        write_iter_projection(true);
516        write_iter_projection(false);
517    }
518
519    fn write_iter_projection(has_pk: bool) {
520        let metadata = if has_pk {
521            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
522        } else {
523            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
524        };
525        // Try to build a memtable via the builder.
526        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
527            .build(1, &metadata);
528
529        let expect = (0..100).collect::<Vec<_>>();
530        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
531        memtable.write(&kvs).unwrap();
532        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
533
534        let mut v0_all = vec![];
535        for res in iter {
536            let batch = res.unwrap();
537            assert_eq!(1, batch.fields().len());
538            let v0 = batch
539                .fields()
540                .first()
541                .unwrap()
542                .data
543                .as_any()
544                .downcast_ref::<Int64Vector>()
545                .unwrap();
546            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
547        }
548        assert_eq!(expect, v0_all);
549    }
550
551    #[test]
552    fn test_write_iter_multi_keys() {
553        write_iter_multi_keys(1, 100);
554        write_iter_multi_keys(2, 100);
555        write_iter_multi_keys(4, 100);
556        write_iter_multi_keys(8, 5);
557        write_iter_multi_keys(2, 10);
558    }
559
560    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
561        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
562        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
563        let memtable = PartitionTreeMemtable::new(
564            1,
565            codec,
566            metadata.clone(),
567            None,
568            &PartitionTreeConfig {
569                index_max_keys_per_shard: max_keys,
570                data_freeze_threshold: freeze_threshold,
571                ..Default::default()
572            },
573        );
574
575        let mut data = Vec::new();
576        // 4 partitions, each partition 4 pks.
577        for i in 0..4 {
578            for j in 0..4 {
579                // key: i, a{j}
580                let timestamps = [11, 13, 1, 5, 3, 7, 9];
581                let key = format!("a{j}");
582                let kvs =
583                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
584                memtable.write(&kvs).unwrap();
585                for ts in timestamps {
586                    data.push((i, key.clone(), ts));
587                }
588            }
589            for j in 0..4 {
590                // key: i, a{j}
591                let timestamps = [10, 2, 4, 8, 6];
592                let key = format!("a{j}");
593                let kvs =
594                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
595                memtable.write(&kvs).unwrap();
596                for ts in timestamps {
597                    data.push((i, key.clone(), ts));
598                }
599            }
600        }
601        data.sort_unstable();
602
603        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
604        let iter = memtable.iter(None, None, None).unwrap();
605        let read = collect_iter_timestamps(iter);
606        assert_eq!(expect, read);
607    }
608
609    #[test]
610    fn test_memtable_filter() {
611        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
612        // Try to build a memtable via the builder.
613        let memtable = PartitionTreeMemtableBuilder::new(
614            PartitionTreeConfig {
615                index_max_keys_per_shard: 40,
616                ..Default::default()
617            },
618            None,
619        )
620        .build(1, &metadata);
621
622        for i in 0..100 {
623            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
624            let kvs =
625                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
626            memtable.write(&kvs).unwrap();
627        }
628
629        for i in 0..100 {
630            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
631            let expr = Expr::BinaryExpr(BinaryExpr {
632                left: Box::new(Expr::Column(Column::from_name("k1"))),
633                op: Operator::Eq,
634                right: Box::new((i as u32).lit()),
635            });
636            let iter = memtable
637                .iter(None, Some(Predicate::new(vec![expr])), None)
638                .unwrap();
639            let read = collect_iter_timestamps(iter);
640            assert_eq!(timestamps, read);
641        }
642    }
643
644    #[test]
645    fn test_deserialize_config() {
646        let config = PartitionTreeConfig {
647            dedup: false,
648            ..Default::default()
649        };
650        // Creates a json with dedup = false.
651        let json = serde_json::to_string(&config).unwrap();
652        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
653        assert!(config.dedup);
654        assert_eq!(PartitionTreeConfig::default(), config);
655    }
656
657    fn metadata_for_metric_engine() -> RegionMetadataRef {
658        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
659        builder
660            .push_column_metadata(ColumnMetadata {
661                column_schema: ColumnSchema::new(
662                    "__table_id",
663                    ConcreteDataType::uint32_datatype(),
664                    false,
665                ),
666                semantic_type: SemanticType::Tag,
667                column_id: 2147483652,
668            })
669            .push_column_metadata(ColumnMetadata {
670                column_schema: ColumnSchema::new(
671                    "__tsid",
672                    ConcreteDataType::uint64_datatype(),
673                    false,
674                ),
675                semantic_type: SemanticType::Tag,
676                column_id: 2147483651,
677            })
678            .push_column_metadata(ColumnMetadata {
679                column_schema: ColumnSchema::new(
680                    "test_label",
681                    ConcreteDataType::string_datatype(),
682                    false,
683                ),
684                semantic_type: SemanticType::Tag,
685                column_id: 2,
686            })
687            .push_column_metadata(ColumnMetadata {
688                column_schema: ColumnSchema::new(
689                    "greptime_timestamp",
690                    ConcreteDataType::timestamp_millisecond_datatype(),
691                    false,
692                ),
693                semantic_type: SemanticType::Timestamp,
694                column_id: 0,
695            })
696            .push_column_metadata(ColumnMetadata {
697                column_schema: ColumnSchema::new(
698                    "greptime_value",
699                    ConcreteDataType::float64_datatype(),
700                    true,
701                ),
702                semantic_type: SemanticType::Field,
703                column_id: 1,
704            })
705            .primary_key(vec![2147483652, 2147483651, 2]);
706        let region_metadata = builder.build().unwrap();
707        Arc::new(region_metadata)
708    }
709
710    fn build_key_values(
711        metadata: RegionMetadataRef,
712        labels: &[&str],
713        table_id: &[u32],
714        ts_id: &[u64],
715        ts: &[i64],
716        values: &[f64],
717        sequence: u64,
718    ) -> KeyValues {
719        let column_schema = region_metadata_to_row_schema(&metadata);
720
721        let rows = ts
722            .iter()
723            .zip(table_id.iter())
724            .zip(ts_id.iter())
725            .zip(labels.iter())
726            .zip(values.iter())
727            .map(|((((ts, table_id), ts_id), label), val)| {
728                row(vec![
729                    ValueData::U32Value(*table_id),
730                    ValueData::U64Value(*ts_id),
731                    ValueData::StringValue(label.to_string()),
732                    ValueData::TimestampMillisecondValue(*ts),
733                    ValueData::F64Value(*val),
734                ])
735            })
736            .collect();
737        let mutation = api::v1::Mutation {
738            op_type: 1,
739            sequence,
740            rows: Some(Rows {
741                schema: column_schema,
742                rows,
743            }),
744            write_hint: None,
745        };
746        KeyValues::new(metadata.as_ref(), mutation).unwrap()
747    }
748
749    #[test]
750    fn test_write_freeze() {
751        let metadata = metadata_for_metric_engine();
752        let memtable = PartitionTreeMemtableBuilder::new(
753            PartitionTreeConfig {
754                index_max_keys_per_shard: 40,
755                ..Default::default()
756            },
757            None,
758        )
759        .build(1, &metadata);
760
761        let codec = DensePrimaryKeyCodec::new(&metadata);
762
763        memtable
764            .write(&build_key_values(
765                metadata.clone(),
766                &["daily", "10min", "daily", "10min"],
767                &[1025, 1025, 1025, 1025],
768                &[
769                    16442255374049317291,
770                    5686004715529701024,
771                    16442255374049317291,
772                    5686004715529701024,
773                ],
774                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
775                &[0.0, 0.0, 0.0, 0.0],
776                1,
777            ))
778            .unwrap();
779
780        memtable.freeze().unwrap();
781        let new_memtable = memtable.fork(2, &metadata);
782
783        new_memtable
784            .write(&build_key_values(
785                metadata.clone(),
786                &["10min"],
787                &[1025],
788                &[5686004715529701024],
789                &[1714643131000],
790                &[0.1],
791                2,
792            ))
793            .unwrap();
794
795        let mut reader = new_memtable.iter(None, None, None).unwrap();
796        let batch = reader.next().unwrap().unwrap();
797        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
798        if let Value::String(s) = &pk[2] {
799            assert_eq!("10min", s.as_utf8());
800        } else {
801            unreachable!()
802        }
803    }
804
805    fn kv_region_metadata() -> RegionMetadataRef {
806        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
807        builder
808            .push_column_metadata(ColumnMetadata {
809                column_schema: ColumnSchema::new(
810                    "ts",
811                    ConcreteDataType::timestamp_millisecond_datatype(),
812                    false,
813                ),
814                semantic_type: SemanticType::Timestamp,
815                column_id: 0,
816            })
817            .push_column_metadata(ColumnMetadata {
818                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
819                semantic_type: SemanticType::Tag,
820                column_id: 1,
821            })
822            .push_column_metadata(ColumnMetadata {
823                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
824                semantic_type: SemanticType::Field,
825                column_id: 2,
826            })
827            .primary_key(vec![1]);
828        let region_metadata = builder.build().unwrap();
829        Arc::new(region_metadata)
830    }
831
832    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
833        vec![
834            time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
835            tag_column_schema("k", api::v1::ColumnDataType::String),
836            field_column_schema("v", api::v1::ColumnDataType::String),
837        ]
838    }
839
840    fn key_values<T: AsRef<str>>(
841        metadata: &RegionMetadataRef,
842        keys: impl Iterator<Item = T>,
843    ) -> KeyValues {
844        let rows = keys
845            .map(|c| {
846                row(vec![
847                    ValueData::TimestampMillisecondValue(0),
848                    ValueData::StringValue(c.as_ref().to_string()),
849                    ValueData::StringValue(c.as_ref().to_string()),
850                ])
851            })
852            .collect();
853        let mutation = Mutation {
854            op_type: OpType::Put as i32,
855            sequence: 0,
856            rows: Some(Rows {
857                schema: kv_column_schemas(),
858                rows,
859            }),
860            write_hint: None,
861        };
862        KeyValues::new(metadata, mutation).unwrap()
863    }
864
865    fn collect_kvs(
866        iter: BoxedBatchIterator,
867        region_meta: &RegionMetadataRef,
868    ) -> HashMap<String, String> {
869        let decoder = DensePrimaryKeyCodec::new(region_meta);
870        let mut res = HashMap::new();
871        for v in iter {
872            let batch = v.unwrap();
873            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
874            let field_vector = batch.fields()[0]
875                .data
876                .as_any()
877                .downcast_ref::<StringVector>()
878                .unwrap();
879            for row in 0..batch.num_rows() {
880                res.insert(
881                    values[0].as_string().unwrap(),
882                    field_vector.get(row).as_string().unwrap(),
883                );
884            }
885        }
886        res
887    }
888
889    #[test]
890    fn test_reorder_insert_key_values() {
891        let metadata = kv_region_metadata();
892        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
893            .build(1, &metadata);
894
895        memtable
896            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
897            .unwrap();
898        memtable.freeze().unwrap();
899        assert_eq!(
900            collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
901            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
902        );
903        let forked = memtable.fork(2, &metadata);
904
905        let keys = ["c", "f", "i", "h", "b", "e", "g"];
906        forked.write(&key_values(&metadata, keys.iter())).unwrap();
907        forked.freeze().unwrap();
908        assert_eq!(
909            collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
910            keys.iter()
911                .map(|c| (c.to_string(), c.to_string()))
912                .collect()
913        );
914
915        let forked2 = forked.fork(3, &metadata);
916
917        let keys = ["g", "e", "a", "f", "b", "c", "h"];
918        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
919
920        let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
921        let expected = keys
922            .iter()
923            .map(|c| (c.to_string(), c.to_string()))
924            .collect::<HashMap<_, _>>();
925        assert_eq!(kvs, expected);
926    }
927}