mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod primary_key_filter;
23mod shard;
24mod shard_builder;
25mod tree;
26
27use std::fmt;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29use std::sync::Arc;
30
31use common_base::readable_size::ReadableSize;
32pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::key_values::KeyValue;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
46    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
47    PredicateGroup,
48};
49use crate::region::options::MergeMode;
50use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
51
52/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
53pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
54pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
55pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
56
57/// Id of a shard, only unique inside a partition.
58type ShardId = u32;
59/// Index of a primary key in a shard.
60type PkIndex = u16;
61
62/// Id of a primary key inside a tree.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64struct PkId {
65    shard_id: ShardId,
66    pk_index: PkIndex,
67}
68
69// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
70// memtable then we will use a lot memory. We should find a better way to control the
71// dictionary size.
72/// Config for the partition tree memtable.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74#[serde(default)]
75pub struct PartitionTreeConfig {
76    /// Max keys in an index shard.
77    pub index_max_keys_per_shard: usize,
78    /// Number of rows to freeze a data part.
79    pub data_freeze_threshold: usize,
80    /// Whether to delete duplicates rows.
81    ///
82    /// Skips deserializing as it should be determined by whether the
83    /// table is append only.
84    #[serde(skip_deserializing)]
85    pub dedup: bool,
86    /// Total bytes of dictionary to keep in fork.
87    pub fork_dictionary_bytes: ReadableSize,
88    /// Merge mode of the tree.
89    #[serde(skip_deserializing)]
90    pub merge_mode: MergeMode,
91}
92
93impl Default for PartitionTreeConfig {
94    fn default() -> Self {
95        let mut fork_dictionary_bytes = ReadableSize::mb(512);
96        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
97            let adjust_dictionary_bytes =
98                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
99            if adjust_dictionary_bytes.0 > 0 {
100                fork_dictionary_bytes = adjust_dictionary_bytes;
101            }
102        }
103
104        Self {
105            index_max_keys_per_shard: 8192,
106            data_freeze_threshold: 131072,
107            dedup: true,
108            fork_dictionary_bytes,
109            merge_mode: MergeMode::LastRow,
110        }
111    }
112}
113
114/// Memtable based on a partition tree.
115pub struct PartitionTreeMemtable {
116    id: MemtableId,
117    tree: Arc<PartitionTree>,
118    alloc_tracker: AllocTracker,
119    max_timestamp: AtomicI64,
120    min_timestamp: AtomicI64,
121    max_sequence: AtomicU64,
122    /// Total written rows in memtable. This also includes deleted and duplicated rows.
123    num_rows: AtomicUsize,
124}
125
126impl fmt::Debug for PartitionTreeMemtable {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("PartitionTreeMemtable")
129            .field("id", &self.id)
130            .finish()
131    }
132}
133
134impl Memtable for PartitionTreeMemtable {
135    fn id(&self) -> MemtableId {
136        self.id
137    }
138
139    fn write(&self, kvs: &KeyValues) -> Result<()> {
140        if kvs.is_empty() {
141            return Ok(());
142        }
143
144        // TODO(yingwen): Validate schema while inserting rows.
145
146        let mut metrics = WriteMetrics::default();
147        let mut pk_buffer = Vec::new();
148        // Ensures the memtable always updates stats.
149        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
150
151        if res.is_ok() {
152            metrics.max_sequence = kvs.max_sequence();
153            metrics.num_rows = kvs.num_rows();
154            self.update_stats(&metrics);
155        }
156        res
157    }
158
159    fn write_one(&self, key_value: KeyValue) -> Result<()> {
160        let mut metrics = WriteMetrics::default();
161        let mut pk_buffer = Vec::new();
162        // Ensures the memtable always updates stats.
163        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
164
165        // update max_sequence
166        if res.is_ok() {
167            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
168            metrics.num_rows = 1;
169            self.update_stats(&metrics);
170        }
171        res
172    }
173
174    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
175        UnsupportedOperationSnafu {
176            err_msg: "PartitionTreeMemtable does not support write_bulk",
177        }
178        .fail()
179    }
180
181    fn iter(
182        &self,
183        projection: Option<&[ColumnId]>,
184        predicate: Option<Predicate>,
185        sequence: Option<SequenceNumber>,
186    ) -> Result<BoxedBatchIterator> {
187        self.tree.read(projection, predicate, sequence)
188    }
189
190    fn ranges(
191        &self,
192        projection: Option<&[ColumnId]>,
193        predicate: PredicateGroup,
194        sequence: Option<SequenceNumber>,
195    ) -> Result<MemtableRanges> {
196        let projection = projection.map(|ids| ids.to_vec());
197        let builder = Box::new(PartitionTreeIterBuilder {
198            tree: self.tree.clone(),
199            projection,
200            predicate: predicate.predicate().cloned(),
201            sequence,
202        });
203        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
204
205        Ok(MemtableRanges {
206            ranges: [(0, MemtableRange::new(context))].into(),
207            stats: self.stats(),
208        })
209    }
210
211    fn is_empty(&self) -> bool {
212        self.tree.is_empty()
213    }
214
215    fn freeze(&self) -> Result<()> {
216        self.alloc_tracker.done_allocating();
217
218        self.tree.freeze()
219    }
220
221    fn stats(&self) -> MemtableStats {
222        let estimated_bytes = self.alloc_tracker.bytes_allocated();
223
224        if estimated_bytes == 0 {
225            // no rows ever written
226            return MemtableStats {
227                estimated_bytes,
228                time_range: None,
229                num_rows: 0,
230                num_ranges: 0,
231                max_sequence: 0,
232            };
233        }
234
235        let ts_type = self
236            .tree
237            .metadata
238            .time_index_column()
239            .column_schema
240            .data_type
241            .clone()
242            .as_timestamp()
243            .expect("Timestamp column must have timestamp type");
244        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
245        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
246        MemtableStats {
247            estimated_bytes,
248            time_range: Some((min_timestamp, max_timestamp)),
249            num_rows: self.num_rows.load(Ordering::Relaxed),
250            num_ranges: 1,
251            max_sequence: self.max_sequence.load(Ordering::Relaxed),
252        }
253    }
254
255    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
256        let tree = self.tree.fork(metadata.clone());
257
258        let memtable = PartitionTreeMemtable::with_tree(id, tree);
259        Arc::new(memtable)
260    }
261}
262
263impl PartitionTreeMemtable {
264    /// Returns a new memtable.
265    pub fn new(
266        id: MemtableId,
267        row_codec: Arc<dyn PrimaryKeyCodec>,
268        metadata: RegionMetadataRef,
269        write_buffer_manager: Option<WriteBufferManagerRef>,
270        config: &PartitionTreeConfig,
271    ) -> Self {
272        Self::with_tree(
273            id,
274            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
275        )
276    }
277
278    /// Creates a mutable memtable from the tree.
279    ///
280    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
281    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
282        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
283
284        Self {
285            id,
286            tree: Arc::new(tree),
287            alloc_tracker,
288            max_timestamp: AtomicI64::new(i64::MIN),
289            min_timestamp: AtomicI64::new(i64::MAX),
290            num_rows: AtomicUsize::new(0),
291            max_sequence: AtomicU64::new(0),
292        }
293    }
294
295    /// Updates stats of the memtable.
296    fn update_stats(&self, metrics: &WriteMetrics) {
297        // Only let the tracker tracks value bytes.
298        self.alloc_tracker.on_allocation(metrics.value_bytes);
299        self.max_timestamp
300            .fetch_max(metrics.max_ts, Ordering::SeqCst);
301        self.min_timestamp
302            .fetch_min(metrics.min_ts, Ordering::SeqCst);
303        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
304        self.max_sequence
305            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
306    }
307}
308
309/// Builder to build a [PartitionTreeMemtable].
310#[derive(Debug, Default)]
311pub struct PartitionTreeMemtableBuilder {
312    config: PartitionTreeConfig,
313    write_buffer_manager: Option<WriteBufferManagerRef>,
314}
315
316impl PartitionTreeMemtableBuilder {
317    /// Creates a new builder with specific `write_buffer_manager`.
318    pub fn new(
319        config: PartitionTreeConfig,
320        write_buffer_manager: Option<WriteBufferManagerRef>,
321    ) -> Self {
322        Self {
323            config,
324            write_buffer_manager,
325        }
326    }
327}
328
329impl MemtableBuilder for PartitionTreeMemtableBuilder {
330    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
331        let codec = build_primary_key_codec(metadata);
332        Arc::new(PartitionTreeMemtable::new(
333            id,
334            codec,
335            metadata.clone(),
336            self.write_buffer_manager.clone(),
337            &self.config,
338        ))
339    }
340}
341
342struct PartitionTreeIterBuilder {
343    tree: Arc<PartitionTree>,
344    projection: Option<Vec<ColumnId>>,
345    predicate: Option<Predicate>,
346    sequence: Option<SequenceNumber>,
347}
348
349impl IterBuilder for PartitionTreeIterBuilder {
350    fn build(&self) -> Result<BoxedBatchIterator> {
351        self.tree.read(
352            self.projection.as_deref(),
353            self.predicate.clone(),
354            self.sequence,
355        )
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use api::v1::value::ValueData;
362    use api::v1::{Row, Rows, SemanticType};
363    use common_time::Timestamp;
364    use datafusion_common::{Column, ScalarValue};
365    use datafusion_expr::{BinaryExpr, Expr, Operator};
366    use datatypes::data_type::ConcreteDataType;
367    use datatypes::scalars::ScalarVector;
368    use datatypes::schema::ColumnSchema;
369    use datatypes::value::Value;
370    use datatypes::vectors::Int64Vector;
371    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
372    use store_api::storage::RegionId;
373
374    use super::*;
375    use crate::row_converter::DensePrimaryKeyCodec;
376    use crate::test_util::memtable_util::{
377        self, collect_iter_timestamps, region_metadata_to_row_schema,
378    };
379
380    #[test]
381    fn test_memtable_sorted_input() {
382        write_iter_sorted_input(true);
383        write_iter_sorted_input(false);
384    }
385
386    fn write_iter_sorted_input(has_pk: bool) {
387        let metadata = if has_pk {
388            memtable_util::metadata_with_primary_key(vec![1, 0], true)
389        } else {
390            memtable_util::metadata_with_primary_key(vec![], false)
391        };
392        let timestamps = (0..100).collect::<Vec<_>>();
393        let kvs =
394            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
395        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
396        let memtable = PartitionTreeMemtable::new(
397            1,
398            codec,
399            metadata.clone(),
400            None,
401            &PartitionTreeConfig::default(),
402        );
403        memtable.write(&kvs).unwrap();
404
405        let expected_ts = kvs
406            .iter()
407            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
408            .collect::<Vec<_>>();
409
410        let iter = memtable.iter(None, None, None).unwrap();
411        let read = collect_iter_timestamps(iter);
412        assert_eq!(expected_ts, read);
413
414        let stats = memtable.stats();
415        assert!(stats.bytes_allocated() > 0);
416        assert_eq!(
417            Some((
418                Timestamp::new_millisecond(0),
419                Timestamp::new_millisecond(99)
420            )),
421            stats.time_range()
422        );
423    }
424
425    #[test]
426    fn test_memtable_unsorted_input() {
427        write_iter_unsorted_input(true);
428        write_iter_unsorted_input(false);
429    }
430
431    fn write_iter_unsorted_input(has_pk: bool) {
432        let metadata = if has_pk {
433            memtable_util::metadata_with_primary_key(vec![1, 0], true)
434        } else {
435            memtable_util::metadata_with_primary_key(vec![], false)
436        };
437        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
438        let memtable = PartitionTreeMemtable::new(
439            1,
440            codec,
441            metadata.clone(),
442            None,
443            &PartitionTreeConfig::default(),
444        );
445
446        let kvs = memtable_util::build_key_values(
447            &metadata,
448            "hello".to_string(),
449            0,
450            &[1, 3, 7, 5, 6],
451            0, // sequence 0, 1, 2, 3, 4
452        );
453        memtable.write(&kvs).unwrap();
454
455        let kvs = memtable_util::build_key_values(
456            &metadata,
457            "hello".to_string(),
458            0,
459            &[5, 2, 4, 0, 7],
460            5, // sequence 5, 6, 7, 8, 9
461        );
462        memtable.write(&kvs).unwrap();
463
464        let iter = memtable.iter(None, None, None).unwrap();
465        let read = collect_iter_timestamps(iter);
466        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
467
468        let iter = memtable.iter(None, None, None).unwrap();
469        let read = iter
470            .flat_map(|batch| {
471                batch
472                    .unwrap()
473                    .sequences()
474                    .iter_data()
475                    .collect::<Vec<_>>()
476                    .into_iter()
477            })
478            .map(|v| v.unwrap())
479            .collect::<Vec<_>>();
480        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
481
482        let stats = memtable.stats();
483        assert!(stats.bytes_allocated() > 0);
484        assert_eq!(
485            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
486            stats.time_range()
487        );
488    }
489
490    #[test]
491    fn test_memtable_projection() {
492        write_iter_projection(true);
493        write_iter_projection(false);
494    }
495
496    fn write_iter_projection(has_pk: bool) {
497        let metadata = if has_pk {
498            memtable_util::metadata_with_primary_key(vec![1, 0], true)
499        } else {
500            memtable_util::metadata_with_primary_key(vec![], false)
501        };
502        // Try to build a memtable via the builder.
503        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
504            .build(1, &metadata);
505
506        let expect = (0..100).collect::<Vec<_>>();
507        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
508        memtable.write(&kvs).unwrap();
509        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
510
511        let mut v0_all = vec![];
512        for res in iter {
513            let batch = res.unwrap();
514            assert_eq!(1, batch.fields().len());
515            let v0 = batch
516                .fields()
517                .first()
518                .unwrap()
519                .data
520                .as_any()
521                .downcast_ref::<Int64Vector>()
522                .unwrap();
523            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
524        }
525        assert_eq!(expect, v0_all);
526    }
527
528    #[test]
529    fn test_write_iter_multi_keys() {
530        write_iter_multi_keys(1, 100);
531        write_iter_multi_keys(2, 100);
532        write_iter_multi_keys(4, 100);
533        write_iter_multi_keys(8, 5);
534        write_iter_multi_keys(2, 10);
535    }
536
537    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
538        let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
539        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
540        let memtable = PartitionTreeMemtable::new(
541            1,
542            codec,
543            metadata.clone(),
544            None,
545            &PartitionTreeConfig {
546                index_max_keys_per_shard: max_keys,
547                data_freeze_threshold: freeze_threshold,
548                ..Default::default()
549            },
550        );
551
552        let mut data = Vec::new();
553        // 4 partitions, each partition 4 pks.
554        for i in 0..4 {
555            for j in 0..4 {
556                // key: i, a{j}
557                let timestamps = [11, 13, 1, 5, 3, 7, 9];
558                let key = format!("a{j}");
559                let kvs =
560                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
561                memtable.write(&kvs).unwrap();
562                for ts in timestamps {
563                    data.push((i, key.clone(), ts));
564                }
565            }
566            for j in 0..4 {
567                // key: i, a{j}
568                let timestamps = [10, 2, 4, 8, 6];
569                let key = format!("a{j}");
570                let kvs =
571                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
572                memtable.write(&kvs).unwrap();
573                for ts in timestamps {
574                    data.push((i, key.clone(), ts));
575                }
576            }
577        }
578        data.sort_unstable();
579
580        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
581        let iter = memtable.iter(None, None, None).unwrap();
582        let read = collect_iter_timestamps(iter);
583        assert_eq!(expect, read);
584    }
585
586    #[test]
587    fn test_memtable_filter() {
588        let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
589        // Try to build a memtable via the builder.
590        let memtable = PartitionTreeMemtableBuilder::new(
591            PartitionTreeConfig {
592                index_max_keys_per_shard: 40,
593                ..Default::default()
594            },
595            None,
596        )
597        .build(1, &metadata);
598
599        for i in 0..100 {
600            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
601            let kvs =
602                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
603            memtable.write(&kvs).unwrap();
604        }
605
606        for i in 0..100 {
607            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
608            let expr = Expr::BinaryExpr(BinaryExpr {
609                left: Box::new(Expr::Column(Column::from_name("k1"))),
610                op: Operator::Eq,
611                right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
612            });
613            let iter = memtable
614                .iter(None, Some(Predicate::new(vec![expr])), None)
615                .unwrap();
616            let read = collect_iter_timestamps(iter);
617            assert_eq!(timestamps, read);
618        }
619    }
620
621    #[test]
622    fn test_deserialize_config() {
623        let config = PartitionTreeConfig {
624            dedup: false,
625            ..Default::default()
626        };
627        // Creates a json with dedup = false.
628        let json = serde_json::to_string(&config).unwrap();
629        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
630        assert!(config.dedup);
631        assert_eq!(PartitionTreeConfig::default(), config);
632    }
633
634    fn metadata_for_metric_engine() -> RegionMetadataRef {
635        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
636        builder
637            .push_column_metadata(ColumnMetadata {
638                column_schema: ColumnSchema::new(
639                    "__table_id",
640                    ConcreteDataType::uint32_datatype(),
641                    false,
642                ),
643                semantic_type: SemanticType::Tag,
644                column_id: 2147483652,
645            })
646            .push_column_metadata(ColumnMetadata {
647                column_schema: ColumnSchema::new(
648                    "__tsid",
649                    ConcreteDataType::uint64_datatype(),
650                    false,
651                ),
652                semantic_type: SemanticType::Tag,
653                column_id: 2147483651,
654            })
655            .push_column_metadata(ColumnMetadata {
656                column_schema: ColumnSchema::new(
657                    "test_label",
658                    ConcreteDataType::string_datatype(),
659                    false,
660                ),
661                semantic_type: SemanticType::Tag,
662                column_id: 2,
663            })
664            .push_column_metadata(ColumnMetadata {
665                column_schema: ColumnSchema::new(
666                    "greptime_timestamp",
667                    ConcreteDataType::timestamp_millisecond_datatype(),
668                    false,
669                ),
670                semantic_type: SemanticType::Timestamp,
671                column_id: 0,
672            })
673            .push_column_metadata(ColumnMetadata {
674                column_schema: ColumnSchema::new(
675                    "greptime_value",
676                    ConcreteDataType::float64_datatype(),
677                    true,
678                ),
679                semantic_type: SemanticType::Field,
680                column_id: 1,
681            })
682            .primary_key(vec![2147483652, 2147483651, 2]);
683        let region_metadata = builder.build().unwrap();
684        Arc::new(region_metadata)
685    }
686
687    fn build_key_values(
688        metadata: RegionMetadataRef,
689        labels: &[&str],
690        table_id: &[u32],
691        ts_id: &[u64],
692        ts: &[i64],
693        values: &[f64],
694        sequence: u64,
695    ) -> KeyValues {
696        let column_schema = region_metadata_to_row_schema(&metadata);
697
698        let rows = ts
699            .iter()
700            .zip(table_id.iter())
701            .zip(ts_id.iter())
702            .zip(labels.iter())
703            .zip(values.iter())
704            .map(|((((ts, table_id), ts_id), label), val)| Row {
705                values: vec![
706                    api::v1::Value {
707                        value_data: Some(ValueData::U32Value(*table_id)),
708                    },
709                    api::v1::Value {
710                        value_data: Some(ValueData::U64Value(*ts_id)),
711                    },
712                    api::v1::Value {
713                        value_data: Some(ValueData::StringValue(label.to_string())),
714                    },
715                    api::v1::Value {
716                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
717                    },
718                    api::v1::Value {
719                        value_data: Some(ValueData::F64Value(*val)),
720                    },
721                ],
722            })
723            .collect();
724        let mutation = api::v1::Mutation {
725            op_type: 1,
726            sequence,
727            rows: Some(Rows {
728                schema: column_schema,
729                rows,
730            }),
731            write_hint: None,
732        };
733        KeyValues::new(metadata.as_ref(), mutation).unwrap()
734    }
735
736    #[test]
737    fn test_write_freeze() {
738        let metadata = metadata_for_metric_engine();
739        let memtable = PartitionTreeMemtableBuilder::new(
740            PartitionTreeConfig {
741                index_max_keys_per_shard: 40,
742                ..Default::default()
743            },
744            None,
745        )
746        .build(1, &metadata);
747
748        let codec = DensePrimaryKeyCodec::new(&metadata);
749
750        memtable
751            .write(&build_key_values(
752                metadata.clone(),
753                &["daily", "10min", "daily", "10min"],
754                &[1025, 1025, 1025, 1025],
755                &[
756                    16442255374049317291,
757                    5686004715529701024,
758                    16442255374049317291,
759                    5686004715529701024,
760                ],
761                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
762                &[0.0, 0.0, 0.0, 0.0],
763                1,
764            ))
765            .unwrap();
766
767        memtable.freeze().unwrap();
768        let new_memtable = memtable.fork(2, &metadata);
769
770        new_memtable
771            .write(&build_key_values(
772                metadata.clone(),
773                &["10min"],
774                &[1025],
775                &[5686004715529701024],
776                &[1714643131000],
777                &[0.1],
778                2,
779            ))
780            .unwrap();
781
782        let mut reader = new_memtable.iter(None, None, None).unwrap();
783        let batch = reader.next().unwrap().unwrap();
784        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
785        if let Value::String(s) = &pk[2] {
786            assert_eq!("10min", s.as_utf8());
787        } else {
788            unreachable!()
789        }
790    }
791}