mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
45    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
46    MemtableStats, PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55/// Id of a shard, only unique inside a partition.
56type ShardId = u32;
57/// Index of a primary key in a shard.
58type PkIndex = u16;
59
60/// Id of a primary key inside a tree.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63    shard_id: ShardId,
64    pk_index: PkIndex,
65}
66
67// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
68// memtable then we will use a lot memory. We should find a better way to control the
69// dictionary size.
70/// Config for the partition tree memtable.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74    /// Max keys in an index shard.
75    pub index_max_keys_per_shard: usize,
76    /// Number of rows to freeze a data part.
77    pub data_freeze_threshold: usize,
78    /// Whether to delete duplicates rows.
79    ///
80    /// Skips deserializing as it should be determined by whether the
81    /// table is append only.
82    #[serde(skip_deserializing)]
83    pub dedup: bool,
84    /// Total bytes of dictionary to keep in fork.
85    pub fork_dictionary_bytes: ReadableSize,
86    /// Merge mode of the tree.
87    #[serde(skip_deserializing)]
88    pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92    fn default() -> Self {
93        let mut fork_dictionary_bytes = ReadableSize::mb(512);
94        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95            let adjust_dictionary_bytes =
96                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97            if adjust_dictionary_bytes.0 > 0 {
98                fork_dictionary_bytes = adjust_dictionary_bytes;
99            }
100        }
101
102        Self {
103            index_max_keys_per_shard: 8192,
104            data_freeze_threshold: 131072,
105            dedup: true,
106            fork_dictionary_bytes,
107            merge_mode: MergeMode::LastRow,
108        }
109    }
110}
111
112/// Memtable based on a partition tree.
113pub struct PartitionTreeMemtable {
114    id: MemtableId,
115    tree: Arc<PartitionTree>,
116    alloc_tracker: AllocTracker,
117    max_timestamp: AtomicI64,
118    min_timestamp: AtomicI64,
119    max_sequence: AtomicU64,
120    /// Total written rows in memtable. This also includes deleted and duplicated rows.
121    num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("PartitionTreeMemtable")
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl Memtable for PartitionTreeMemtable {
133    fn id(&self) -> MemtableId {
134        self.id
135    }
136
137    fn write(&self, kvs: &KeyValues) -> Result<()> {
138        if kvs.is_empty() {
139            return Ok(());
140        }
141
142        // TODO(yingwen): Validate schema while inserting rows.
143
144        let mut metrics = WriteMetrics::default();
145        let mut pk_buffer = Vec::new();
146        // Ensures the memtable always updates stats.
147        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149        if res.is_ok() {
150            metrics.max_sequence = kvs.max_sequence();
151            metrics.num_rows = kvs.num_rows();
152            self.update_stats(&metrics);
153        }
154        res
155    }
156
157    fn write_one(&self, key_value: KeyValue) -> Result<()> {
158        let mut metrics = WriteMetrics::default();
159        let mut pk_buffer = Vec::new();
160        // Ensures the memtable always updates stats.
161        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163        // update max_sequence
164        if res.is_ok() {
165            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166            metrics.num_rows = 1;
167            self.update_stats(&metrics);
168        }
169        res
170    }
171
172    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173        UnsupportedOperationSnafu {
174            err_msg: "PartitionTreeMemtable does not support write_bulk",
175        }
176        .fail()
177    }
178
179    #[cfg(any(test, feature = "test"))]
180    fn iter(
181        &self,
182        projection: Option<&[ColumnId]>,
183        predicate: Option<Predicate>,
184        sequence: Option<SequenceNumber>,
185    ) -> Result<BoxedBatchIterator> {
186        self.tree.read(projection, predicate, sequence, None)
187    }
188
189    fn ranges(
190        &self,
191        projection: Option<&[ColumnId]>,
192        predicate: PredicateGroup,
193        sequence: Option<SequenceNumber>,
194    ) -> Result<MemtableRanges> {
195        let projection = projection.map(|ids| ids.to_vec());
196        let builder = Box::new(PartitionTreeIterBuilder {
197            tree: self.tree.clone(),
198            projection,
199            predicate: predicate.predicate().cloned(),
200            sequence,
201        });
202        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
203
204        let stats = self.stats();
205        Ok(MemtableRanges {
206            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
207            stats,
208        })
209    }
210
211    fn is_empty(&self) -> bool {
212        self.tree.is_empty()
213    }
214
215    fn freeze(&self) -> Result<()> {
216        self.alloc_tracker.done_allocating();
217
218        self.tree.freeze()
219    }
220
221    fn stats(&self) -> MemtableStats {
222        let estimated_bytes = self.alloc_tracker.bytes_allocated();
223
224        if estimated_bytes == 0 {
225            // no rows ever written
226            return MemtableStats {
227                estimated_bytes,
228                time_range: None,
229                num_rows: 0,
230                num_ranges: 0,
231                max_sequence: 0,
232                series_count: 0,
233            };
234        }
235
236        let ts_type = self
237            .tree
238            .metadata
239            .time_index_column()
240            .column_schema
241            .data_type
242            .clone()
243            .as_timestamp()
244            .expect("Timestamp column must have timestamp type");
245        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
246        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
247        let series_count = self.tree.series_count();
248        MemtableStats {
249            estimated_bytes,
250            time_range: Some((min_timestamp, max_timestamp)),
251            num_rows: self.num_rows.load(Ordering::Relaxed),
252            num_ranges: 1,
253            max_sequence: self.max_sequence.load(Ordering::Relaxed),
254            series_count,
255        }
256    }
257
258    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
259        let tree = self.tree.fork(metadata.clone());
260
261        let memtable = PartitionTreeMemtable::with_tree(id, tree);
262        Arc::new(memtable)
263    }
264}
265
266impl PartitionTreeMemtable {
267    /// Returns a new memtable.
268    pub fn new(
269        id: MemtableId,
270        row_codec: Arc<dyn PrimaryKeyCodec>,
271        metadata: RegionMetadataRef,
272        write_buffer_manager: Option<WriteBufferManagerRef>,
273        config: &PartitionTreeConfig,
274    ) -> Self {
275        Self::with_tree(
276            id,
277            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
278        )
279    }
280
281    /// Creates a mutable memtable from the tree.
282    ///
283    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
284    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
285        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
286
287        Self {
288            id,
289            tree: Arc::new(tree),
290            alloc_tracker,
291            max_timestamp: AtomicI64::new(i64::MIN),
292            min_timestamp: AtomicI64::new(i64::MAX),
293            num_rows: AtomicUsize::new(0),
294            max_sequence: AtomicU64::new(0),
295        }
296    }
297
298    /// Updates stats of the memtable.
299    fn update_stats(&self, metrics: &WriteMetrics) {
300        // Only let the tracker tracks value bytes.
301        self.alloc_tracker.on_allocation(metrics.value_bytes);
302        self.max_timestamp
303            .fetch_max(metrics.max_ts, Ordering::SeqCst);
304        self.min_timestamp
305            .fetch_min(metrics.min_ts, Ordering::SeqCst);
306        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
307        self.max_sequence
308            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
309    }
310
311    #[cfg(any(test, feature = "test"))]
312    pub fn iter(
313        &self,
314        projection: Option<&[ColumnId]>,
315        predicate: Option<Predicate>,
316        sequence: Option<SequenceNumber>,
317    ) -> Result<BoxedBatchIterator> {
318        self.tree.read(projection, predicate, sequence, None)
319    }
320}
321
322/// Builder to build a [PartitionTreeMemtable].
323#[derive(Debug, Default)]
324pub struct PartitionTreeMemtableBuilder {
325    config: PartitionTreeConfig,
326    write_buffer_manager: Option<WriteBufferManagerRef>,
327}
328
329impl PartitionTreeMemtableBuilder {
330    /// Creates a new builder with specific `write_buffer_manager`.
331    pub fn new(
332        config: PartitionTreeConfig,
333        write_buffer_manager: Option<WriteBufferManagerRef>,
334    ) -> Self {
335        Self {
336            config,
337            write_buffer_manager,
338        }
339    }
340}
341
342impl MemtableBuilder for PartitionTreeMemtableBuilder {
343    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
344        let codec = build_primary_key_codec(metadata);
345        Arc::new(PartitionTreeMemtable::new(
346            id,
347            codec,
348            metadata.clone(),
349            self.write_buffer_manager.clone(),
350            &self.config,
351        ))
352    }
353
354    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
355        false
356    }
357}
358
359struct PartitionTreeIterBuilder {
360    tree: Arc<PartitionTree>,
361    projection: Option<Vec<ColumnId>>,
362    predicate: Option<Predicate>,
363    sequence: Option<SequenceNumber>,
364}
365
366impl IterBuilder for PartitionTreeIterBuilder {
367    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
368        self.tree.read(
369            self.projection.as_deref(),
370            self.predicate.clone(),
371            self.sequence,
372            metrics,
373        )
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use std::collections::HashMap;
380    use std::sync::Arc;
381
382    use api::v1::value::ValueData;
383    use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
384    use common_time::Timestamp;
385    use datafusion_common::{Column, ScalarValue};
386    use datafusion_expr::{BinaryExpr, Expr, Operator};
387    use datatypes::data_type::ConcreteDataType;
388    use datatypes::prelude::Vector;
389    use datatypes::scalars::ScalarVector;
390    use datatypes::schema::ColumnSchema;
391    use datatypes::value::Value;
392    use datatypes::vectors::{Int64Vector, StringVector};
393    use mito_codec::row_converter::DensePrimaryKeyCodec;
394    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
395    use store_api::storage::RegionId;
396
397    use super::*;
398    use crate::test_util::memtable_util::{
399        self, collect_iter_timestamps, region_metadata_to_row_schema,
400    };
401
402    #[test]
403    fn test_memtable_sorted_input() {
404        write_iter_sorted_input(true);
405        write_iter_sorted_input(false);
406    }
407
408    fn write_iter_sorted_input(has_pk: bool) {
409        let metadata = if has_pk {
410            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
411        } else {
412            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
413        };
414        let timestamps = (0..100).collect::<Vec<_>>();
415        let kvs =
416            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
417        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
418        let memtable = PartitionTreeMemtable::new(
419            1,
420            codec,
421            metadata.clone(),
422            None,
423            &PartitionTreeConfig::default(),
424        );
425        memtable.write(&kvs).unwrap();
426
427        let expected_ts = kvs
428            .iter()
429            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
430            .collect::<Vec<_>>();
431
432        let iter = memtable.iter(None, None, None).unwrap();
433        let read = collect_iter_timestamps(iter);
434        assert_eq!(expected_ts, read);
435
436        let stats = memtable.stats();
437        assert!(stats.bytes_allocated() > 0);
438        assert_eq!(
439            Some((
440                Timestamp::new_millisecond(0),
441                Timestamp::new_millisecond(99)
442            )),
443            stats.time_range()
444        );
445    }
446
447    #[test]
448    fn test_memtable_unsorted_input() {
449        write_iter_unsorted_input(true);
450        write_iter_unsorted_input(false);
451    }
452
453    fn write_iter_unsorted_input(has_pk: bool) {
454        let metadata = if has_pk {
455            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
456        } else {
457            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
458        };
459        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
460        let memtable = PartitionTreeMemtable::new(
461            1,
462            codec,
463            metadata.clone(),
464            None,
465            &PartitionTreeConfig::default(),
466        );
467
468        let kvs = memtable_util::build_key_values(
469            &metadata,
470            "hello".to_string(),
471            0,
472            &[1, 3, 7, 5, 6],
473            0, // sequence 0, 1, 2, 3, 4
474        );
475        memtable.write(&kvs).unwrap();
476
477        let kvs = memtable_util::build_key_values(
478            &metadata,
479            "hello".to_string(),
480            0,
481            &[5, 2, 4, 0, 7],
482            5, // sequence 5, 6, 7, 8, 9
483        );
484        memtable.write(&kvs).unwrap();
485
486        let iter = memtable.iter(None, None, None).unwrap();
487        let read = collect_iter_timestamps(iter);
488        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
489
490        let iter = memtable.iter(None, None, None).unwrap();
491        let read = iter
492            .flat_map(|batch| {
493                batch
494                    .unwrap()
495                    .sequences()
496                    .iter_data()
497                    .collect::<Vec<_>>()
498                    .into_iter()
499            })
500            .map(|v| v.unwrap())
501            .collect::<Vec<_>>();
502        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
503
504        let stats = memtable.stats();
505        assert!(stats.bytes_allocated() > 0);
506        assert_eq!(
507            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
508            stats.time_range()
509        );
510    }
511
512    #[test]
513    fn test_memtable_projection() {
514        write_iter_projection(true);
515        write_iter_projection(false);
516    }
517
518    fn write_iter_projection(has_pk: bool) {
519        let metadata = if has_pk {
520            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
521        } else {
522            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
523        };
524        // Try to build a memtable via the builder.
525        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
526            .build(1, &metadata);
527
528        let expect = (0..100).collect::<Vec<_>>();
529        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
530        memtable.write(&kvs).unwrap();
531        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
532
533        let mut v0_all = vec![];
534        for res in iter {
535            let batch = res.unwrap();
536            assert_eq!(1, batch.fields().len());
537            let v0 = batch
538                .fields()
539                .first()
540                .unwrap()
541                .data
542                .as_any()
543                .downcast_ref::<Int64Vector>()
544                .unwrap();
545            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
546        }
547        assert_eq!(expect, v0_all);
548    }
549
550    #[test]
551    fn test_write_iter_multi_keys() {
552        write_iter_multi_keys(1, 100);
553        write_iter_multi_keys(2, 100);
554        write_iter_multi_keys(4, 100);
555        write_iter_multi_keys(8, 5);
556        write_iter_multi_keys(2, 10);
557    }
558
559    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
560        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
561        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
562        let memtable = PartitionTreeMemtable::new(
563            1,
564            codec,
565            metadata.clone(),
566            None,
567            &PartitionTreeConfig {
568                index_max_keys_per_shard: max_keys,
569                data_freeze_threshold: freeze_threshold,
570                ..Default::default()
571            },
572        );
573
574        let mut data = Vec::new();
575        // 4 partitions, each partition 4 pks.
576        for i in 0..4 {
577            for j in 0..4 {
578                // key: i, a{j}
579                let timestamps = [11, 13, 1, 5, 3, 7, 9];
580                let key = format!("a{j}");
581                let kvs =
582                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
583                memtable.write(&kvs).unwrap();
584                for ts in timestamps {
585                    data.push((i, key.clone(), ts));
586                }
587            }
588            for j in 0..4 {
589                // key: i, a{j}
590                let timestamps = [10, 2, 4, 8, 6];
591                let key = format!("a{j}");
592                let kvs =
593                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
594                memtable.write(&kvs).unwrap();
595                for ts in timestamps {
596                    data.push((i, key.clone(), ts));
597                }
598            }
599        }
600        data.sort_unstable();
601
602        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
603        let iter = memtable.iter(None, None, None).unwrap();
604        let read = collect_iter_timestamps(iter);
605        assert_eq!(expect, read);
606    }
607
608    #[test]
609    fn test_memtable_filter() {
610        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
611        // Try to build a memtable via the builder.
612        let memtable = PartitionTreeMemtableBuilder::new(
613            PartitionTreeConfig {
614                index_max_keys_per_shard: 40,
615                ..Default::default()
616            },
617            None,
618        )
619        .build(1, &metadata);
620
621        for i in 0..100 {
622            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
623            let kvs =
624                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
625            memtable.write(&kvs).unwrap();
626        }
627
628        for i in 0..100 {
629            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
630            let expr = Expr::BinaryExpr(BinaryExpr {
631                left: Box::new(Expr::Column(Column::from_name("k1"))),
632                op: Operator::Eq,
633                right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
634            });
635            let iter = memtable
636                .iter(None, Some(Predicate::new(vec![expr])), None)
637                .unwrap();
638            let read = collect_iter_timestamps(iter);
639            assert_eq!(timestamps, read);
640        }
641    }
642
643    #[test]
644    fn test_deserialize_config() {
645        let config = PartitionTreeConfig {
646            dedup: false,
647            ..Default::default()
648        };
649        // Creates a json with dedup = false.
650        let json = serde_json::to_string(&config).unwrap();
651        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
652        assert!(config.dedup);
653        assert_eq!(PartitionTreeConfig::default(), config);
654    }
655
656    fn metadata_for_metric_engine() -> RegionMetadataRef {
657        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
658        builder
659            .push_column_metadata(ColumnMetadata {
660                column_schema: ColumnSchema::new(
661                    "__table_id",
662                    ConcreteDataType::uint32_datatype(),
663                    false,
664                ),
665                semantic_type: SemanticType::Tag,
666                column_id: 2147483652,
667            })
668            .push_column_metadata(ColumnMetadata {
669                column_schema: ColumnSchema::new(
670                    "__tsid",
671                    ConcreteDataType::uint64_datatype(),
672                    false,
673                ),
674                semantic_type: SemanticType::Tag,
675                column_id: 2147483651,
676            })
677            .push_column_metadata(ColumnMetadata {
678                column_schema: ColumnSchema::new(
679                    "test_label",
680                    ConcreteDataType::string_datatype(),
681                    false,
682                ),
683                semantic_type: SemanticType::Tag,
684                column_id: 2,
685            })
686            .push_column_metadata(ColumnMetadata {
687                column_schema: ColumnSchema::new(
688                    "greptime_timestamp",
689                    ConcreteDataType::timestamp_millisecond_datatype(),
690                    false,
691                ),
692                semantic_type: SemanticType::Timestamp,
693                column_id: 0,
694            })
695            .push_column_metadata(ColumnMetadata {
696                column_schema: ColumnSchema::new(
697                    "greptime_value",
698                    ConcreteDataType::float64_datatype(),
699                    true,
700                ),
701                semantic_type: SemanticType::Field,
702                column_id: 1,
703            })
704            .primary_key(vec![2147483652, 2147483651, 2]);
705        let region_metadata = builder.build().unwrap();
706        Arc::new(region_metadata)
707    }
708
709    fn build_key_values(
710        metadata: RegionMetadataRef,
711        labels: &[&str],
712        table_id: &[u32],
713        ts_id: &[u64],
714        ts: &[i64],
715        values: &[f64],
716        sequence: u64,
717    ) -> KeyValues {
718        let column_schema = region_metadata_to_row_schema(&metadata);
719
720        let rows = ts
721            .iter()
722            .zip(table_id.iter())
723            .zip(ts_id.iter())
724            .zip(labels.iter())
725            .zip(values.iter())
726            .map(|((((ts, table_id), ts_id), label), val)| Row {
727                values: vec![
728                    api::v1::Value {
729                        value_data: Some(ValueData::U32Value(*table_id)),
730                    },
731                    api::v1::Value {
732                        value_data: Some(ValueData::U64Value(*ts_id)),
733                    },
734                    api::v1::Value {
735                        value_data: Some(ValueData::StringValue(label.to_string())),
736                    },
737                    api::v1::Value {
738                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
739                    },
740                    api::v1::Value {
741                        value_data: Some(ValueData::F64Value(*val)),
742                    },
743                ],
744            })
745            .collect();
746        let mutation = api::v1::Mutation {
747            op_type: 1,
748            sequence,
749            rows: Some(Rows {
750                schema: column_schema,
751                rows,
752            }),
753            write_hint: None,
754        };
755        KeyValues::new(metadata.as_ref(), mutation).unwrap()
756    }
757
758    #[test]
759    fn test_write_freeze() {
760        let metadata = metadata_for_metric_engine();
761        let memtable = PartitionTreeMemtableBuilder::new(
762            PartitionTreeConfig {
763                index_max_keys_per_shard: 40,
764                ..Default::default()
765            },
766            None,
767        )
768        .build(1, &metadata);
769
770        let codec = DensePrimaryKeyCodec::new(&metadata);
771
772        memtable
773            .write(&build_key_values(
774                metadata.clone(),
775                &["daily", "10min", "daily", "10min"],
776                &[1025, 1025, 1025, 1025],
777                &[
778                    16442255374049317291,
779                    5686004715529701024,
780                    16442255374049317291,
781                    5686004715529701024,
782                ],
783                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
784                &[0.0, 0.0, 0.0, 0.0],
785                1,
786            ))
787            .unwrap();
788
789        memtable.freeze().unwrap();
790        let new_memtable = memtable.fork(2, &metadata);
791
792        new_memtable
793            .write(&build_key_values(
794                metadata.clone(),
795                &["10min"],
796                &[1025],
797                &[5686004715529701024],
798                &[1714643131000],
799                &[0.1],
800                2,
801            ))
802            .unwrap();
803
804        let mut reader = new_memtable.iter(None, None, None).unwrap();
805        let batch = reader.next().unwrap().unwrap();
806        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
807        if let Value::String(s) = &pk[2] {
808            assert_eq!("10min", s.as_utf8());
809        } else {
810            unreachable!()
811        }
812    }
813
814    fn kv_region_metadata() -> RegionMetadataRef {
815        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
816        builder
817            .push_column_metadata(ColumnMetadata {
818                column_schema: ColumnSchema::new(
819                    "ts",
820                    ConcreteDataType::timestamp_millisecond_datatype(),
821                    false,
822                ),
823                semantic_type: SemanticType::Timestamp,
824                column_id: 0,
825            })
826            .push_column_metadata(ColumnMetadata {
827                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
828                semantic_type: SemanticType::Tag,
829                column_id: 1,
830            })
831            .push_column_metadata(ColumnMetadata {
832                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
833                semantic_type: SemanticType::Field,
834                column_id: 2,
835            })
836            .primary_key(vec![1]);
837        let region_metadata = builder.build().unwrap();
838        Arc::new(region_metadata)
839    }
840
841    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
842        vec![
843            api::v1::ColumnSchema {
844                column_name: "ts".to_string(),
845                datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
846                semantic_type: SemanticType::Timestamp as i32,
847                ..Default::default()
848            },
849            api::v1::ColumnSchema {
850                column_name: "k".to_string(),
851                datatype: api::v1::ColumnDataType::String as i32,
852                semantic_type: SemanticType::Tag as i32,
853                ..Default::default()
854            },
855            api::v1::ColumnSchema {
856                column_name: "v".to_string(),
857                datatype: api::v1::ColumnDataType::String as i32,
858                semantic_type: SemanticType::Field as i32,
859                ..Default::default()
860            },
861        ]
862    }
863
864    fn key_values<T: AsRef<str>>(
865        metadata: &RegionMetadataRef,
866        keys: impl Iterator<Item = T>,
867    ) -> KeyValues {
868        let rows = keys
869            .map(|c| Row {
870                values: vec![
871                    api::v1::Value {
872                        value_data: Some(ValueData::TimestampMillisecondValue(0)),
873                    },
874                    api::v1::Value {
875                        value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
876                    },
877                    api::v1::Value {
878                        value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
879                    },
880                ],
881            })
882            .collect();
883        let mutation = Mutation {
884            op_type: OpType::Put as i32,
885            sequence: 0,
886            rows: Some(Rows {
887                schema: kv_column_schemas(),
888                rows,
889            }),
890            write_hint: None,
891        };
892        KeyValues::new(metadata, mutation).unwrap()
893    }
894
895    fn collect_kvs(
896        iter: BoxedBatchIterator,
897        region_meta: &RegionMetadataRef,
898    ) -> HashMap<String, String> {
899        let decoder = DensePrimaryKeyCodec::new(region_meta);
900        let mut res = HashMap::new();
901        for v in iter {
902            let batch = v.unwrap();
903            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
904            let field_vector = batch.fields()[0]
905                .data
906                .as_any()
907                .downcast_ref::<StringVector>()
908                .unwrap();
909            for row in 0..batch.num_rows() {
910                res.insert(
911                    values[0].as_string().unwrap(),
912                    field_vector.get(row).as_string().unwrap(),
913                );
914            }
915        }
916        res
917    }
918
919    #[test]
920    fn test_reorder_insert_key_values() {
921        let metadata = kv_region_metadata();
922        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
923            .build(1, &metadata);
924
925        memtable
926            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
927            .unwrap();
928        memtable.freeze().unwrap();
929        assert_eq!(
930            collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
931            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
932        );
933        let forked = memtable.fork(2, &metadata);
934
935        let keys = ["c", "f", "i", "h", "b", "e", "g"];
936        forked.write(&key_values(&metadata, keys.iter())).unwrap();
937        forked.freeze().unwrap();
938        assert_eq!(
939            collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
940            keys.iter()
941                .map(|c| (c.to_string(), c.to_string()))
942                .collect()
943        );
944
945        let forked2 = forked.fork(3, &metadata);
946
947        let keys = ["g", "e", "a", "f", "b", "c", "h"];
948        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
949
950        let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
951        let expected = keys
952            .iter()
953            .map(|c| (c.to_string(), c.to_string()))
954            .collect::<HashMap<_, _>>();
955        assert_eq!(kvs, expected);
956    }
957}