mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46    PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55/// Id of a shard, only unique inside a partition.
56type ShardId = u32;
57/// Index of a primary key in a shard.
58type PkIndex = u16;
59
60/// Id of a primary key inside a tree.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63    shard_id: ShardId,
64    pk_index: PkIndex,
65}
66
67// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
68// memtable then we will use a lot memory. We should find a better way to control the
69// dictionary size.
70/// Config for the partition tree memtable.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74    /// Max keys in an index shard.
75    pub index_max_keys_per_shard: usize,
76    /// Number of rows to freeze a data part.
77    pub data_freeze_threshold: usize,
78    /// Whether to delete duplicates rows.
79    ///
80    /// Skips deserializing as it should be determined by whether the
81    /// table is append only.
82    #[serde(skip_deserializing)]
83    pub dedup: bool,
84    /// Total bytes of dictionary to keep in fork.
85    pub fork_dictionary_bytes: ReadableSize,
86    /// Merge mode of the tree.
87    #[serde(skip_deserializing)]
88    pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92    fn default() -> Self {
93        let mut fork_dictionary_bytes = ReadableSize::mb(512);
94        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95            let adjust_dictionary_bytes =
96                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97            if adjust_dictionary_bytes.0 > 0 {
98                fork_dictionary_bytes = adjust_dictionary_bytes;
99            }
100        }
101
102        Self {
103            index_max_keys_per_shard: 8192,
104            data_freeze_threshold: 131072,
105            dedup: true,
106            fork_dictionary_bytes,
107            merge_mode: MergeMode::LastRow,
108        }
109    }
110}
111
112/// Memtable based on a partition tree.
113pub struct PartitionTreeMemtable {
114    id: MemtableId,
115    tree: Arc<PartitionTree>,
116    alloc_tracker: AllocTracker,
117    max_timestamp: AtomicI64,
118    min_timestamp: AtomicI64,
119    max_sequence: AtomicU64,
120    /// Total written rows in memtable. This also includes deleted and duplicated rows.
121    num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("PartitionTreeMemtable")
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl Memtable for PartitionTreeMemtable {
133    fn id(&self) -> MemtableId {
134        self.id
135    }
136
137    fn write(&self, kvs: &KeyValues) -> Result<()> {
138        if kvs.is_empty() {
139            return Ok(());
140        }
141
142        // TODO(yingwen): Validate schema while inserting rows.
143
144        let mut metrics = WriteMetrics::default();
145        let mut pk_buffer = Vec::new();
146        // Ensures the memtable always updates stats.
147        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149        if res.is_ok() {
150            metrics.max_sequence = kvs.max_sequence();
151            metrics.num_rows = kvs.num_rows();
152            self.update_stats(&metrics);
153        }
154        res
155    }
156
157    fn write_one(&self, key_value: KeyValue) -> Result<()> {
158        let mut metrics = WriteMetrics::default();
159        let mut pk_buffer = Vec::new();
160        // Ensures the memtable always updates stats.
161        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163        // update max_sequence
164        if res.is_ok() {
165            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166            metrics.num_rows = 1;
167            self.update_stats(&metrics);
168        }
169        res
170    }
171
172    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173        UnsupportedOperationSnafu {
174            err_msg: "PartitionTreeMemtable does not support write_bulk",
175        }
176        .fail()
177    }
178
179    fn iter(
180        &self,
181        projection: Option<&[ColumnId]>,
182        predicate: Option<Predicate>,
183        sequence: Option<SequenceNumber>,
184    ) -> Result<BoxedBatchIterator> {
185        self.tree.read(projection, predicate, sequence)
186    }
187
188    fn ranges(
189        &self,
190        projection: Option<&[ColumnId]>,
191        predicate: PredicateGroup,
192        sequence: Option<SequenceNumber>,
193    ) -> Result<MemtableRanges> {
194        let projection = projection.map(|ids| ids.to_vec());
195        let builder = Box::new(PartitionTreeIterBuilder {
196            tree: self.tree.clone(),
197            projection,
198            predicate: predicate.predicate().cloned(),
199            sequence,
200        });
201        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
202
203        Ok(MemtableRanges {
204            ranges: [(0, MemtableRange::new(context))].into(),
205            stats: self.stats(),
206        })
207    }
208
209    fn is_empty(&self) -> bool {
210        self.tree.is_empty()
211    }
212
213    fn freeze(&self) -> Result<()> {
214        self.alloc_tracker.done_allocating();
215
216        self.tree.freeze()
217    }
218
219    fn stats(&self) -> MemtableStats {
220        let estimated_bytes = self.alloc_tracker.bytes_allocated();
221
222        if estimated_bytes == 0 {
223            // no rows ever written
224            return MemtableStats {
225                estimated_bytes,
226                time_range: None,
227                num_rows: 0,
228                num_ranges: 0,
229                max_sequence: 0,
230            };
231        }
232
233        let ts_type = self
234            .tree
235            .metadata
236            .time_index_column()
237            .column_schema
238            .data_type
239            .clone()
240            .as_timestamp()
241            .expect("Timestamp column must have timestamp type");
242        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
243        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
244        MemtableStats {
245            estimated_bytes,
246            time_range: Some((min_timestamp, max_timestamp)),
247            num_rows: self.num_rows.load(Ordering::Relaxed),
248            num_ranges: 1,
249            max_sequence: self.max_sequence.load(Ordering::Relaxed),
250        }
251    }
252
253    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
254        let tree = self.tree.fork(metadata.clone());
255
256        let memtable = PartitionTreeMemtable::with_tree(id, tree);
257        Arc::new(memtable)
258    }
259}
260
261impl PartitionTreeMemtable {
262    /// Returns a new memtable.
263    pub fn new(
264        id: MemtableId,
265        row_codec: Arc<dyn PrimaryKeyCodec>,
266        metadata: RegionMetadataRef,
267        write_buffer_manager: Option<WriteBufferManagerRef>,
268        config: &PartitionTreeConfig,
269    ) -> Self {
270        Self::with_tree(
271            id,
272            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
273        )
274    }
275
276    /// Creates a mutable memtable from the tree.
277    ///
278    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
279    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
280        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
281
282        Self {
283            id,
284            tree: Arc::new(tree),
285            alloc_tracker,
286            max_timestamp: AtomicI64::new(i64::MIN),
287            min_timestamp: AtomicI64::new(i64::MAX),
288            num_rows: AtomicUsize::new(0),
289            max_sequence: AtomicU64::new(0),
290        }
291    }
292
293    /// Updates stats of the memtable.
294    fn update_stats(&self, metrics: &WriteMetrics) {
295        // Only let the tracker tracks value bytes.
296        self.alloc_tracker.on_allocation(metrics.value_bytes);
297        self.max_timestamp
298            .fetch_max(metrics.max_ts, Ordering::SeqCst);
299        self.min_timestamp
300            .fetch_min(metrics.min_ts, Ordering::SeqCst);
301        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
302        self.max_sequence
303            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
304    }
305}
306
307/// Builder to build a [PartitionTreeMemtable].
308#[derive(Debug, Default)]
309pub struct PartitionTreeMemtableBuilder {
310    config: PartitionTreeConfig,
311    write_buffer_manager: Option<WriteBufferManagerRef>,
312}
313
314impl PartitionTreeMemtableBuilder {
315    /// Creates a new builder with specific `write_buffer_manager`.
316    pub fn new(
317        config: PartitionTreeConfig,
318        write_buffer_manager: Option<WriteBufferManagerRef>,
319    ) -> Self {
320        Self {
321            config,
322            write_buffer_manager,
323        }
324    }
325}
326
327impl MemtableBuilder for PartitionTreeMemtableBuilder {
328    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
329        let codec = build_primary_key_codec(metadata);
330        Arc::new(PartitionTreeMemtable::new(
331            id,
332            codec,
333            metadata.clone(),
334            self.write_buffer_manager.clone(),
335            &self.config,
336        ))
337    }
338}
339
340struct PartitionTreeIterBuilder {
341    tree: Arc<PartitionTree>,
342    projection: Option<Vec<ColumnId>>,
343    predicate: Option<Predicate>,
344    sequence: Option<SequenceNumber>,
345}
346
347impl IterBuilder for PartitionTreeIterBuilder {
348    fn build(&self) -> Result<BoxedBatchIterator> {
349        self.tree.read(
350            self.projection.as_deref(),
351            self.predicate.clone(),
352            self.sequence,
353        )
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use api::v1::value::ValueData;
360    use api::v1::{Row, Rows, SemanticType};
361    use common_time::Timestamp;
362    use datafusion_common::{Column, ScalarValue};
363    use datafusion_expr::{BinaryExpr, Expr, Operator};
364    use datatypes::data_type::ConcreteDataType;
365    use datatypes::scalars::ScalarVector;
366    use datatypes::schema::ColumnSchema;
367    use datatypes::value::Value;
368    use datatypes::vectors::Int64Vector;
369    use mito_codec::row_converter::DensePrimaryKeyCodec;
370    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
371    use store_api::storage::RegionId;
372
373    use super::*;
374    use crate::test_util::memtable_util::{
375        self, collect_iter_timestamps, region_metadata_to_row_schema,
376    };
377
378    #[test]
379    fn test_memtable_sorted_input() {
380        write_iter_sorted_input(true);
381        write_iter_sorted_input(false);
382    }
383
384    fn write_iter_sorted_input(has_pk: bool) {
385        let metadata = if has_pk {
386            memtable_util::metadata_with_primary_key(vec![1, 0], true)
387        } else {
388            memtable_util::metadata_with_primary_key(vec![], false)
389        };
390        let timestamps = (0..100).collect::<Vec<_>>();
391        let kvs =
392            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
393        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
394        let memtable = PartitionTreeMemtable::new(
395            1,
396            codec,
397            metadata.clone(),
398            None,
399            &PartitionTreeConfig::default(),
400        );
401        memtable.write(&kvs).unwrap();
402
403        let expected_ts = kvs
404            .iter()
405            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
406            .collect::<Vec<_>>();
407
408        let iter = memtable.iter(None, None, None).unwrap();
409        let read = collect_iter_timestamps(iter);
410        assert_eq!(expected_ts, read);
411
412        let stats = memtable.stats();
413        assert!(stats.bytes_allocated() > 0);
414        assert_eq!(
415            Some((
416                Timestamp::new_millisecond(0),
417                Timestamp::new_millisecond(99)
418            )),
419            stats.time_range()
420        );
421    }
422
423    #[test]
424    fn test_memtable_unsorted_input() {
425        write_iter_unsorted_input(true);
426        write_iter_unsorted_input(false);
427    }
428
429    fn write_iter_unsorted_input(has_pk: bool) {
430        let metadata = if has_pk {
431            memtable_util::metadata_with_primary_key(vec![1, 0], true)
432        } else {
433            memtable_util::metadata_with_primary_key(vec![], false)
434        };
435        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
436        let memtable = PartitionTreeMemtable::new(
437            1,
438            codec,
439            metadata.clone(),
440            None,
441            &PartitionTreeConfig::default(),
442        );
443
444        let kvs = memtable_util::build_key_values(
445            &metadata,
446            "hello".to_string(),
447            0,
448            &[1, 3, 7, 5, 6],
449            0, // sequence 0, 1, 2, 3, 4
450        );
451        memtable.write(&kvs).unwrap();
452
453        let kvs = memtable_util::build_key_values(
454            &metadata,
455            "hello".to_string(),
456            0,
457            &[5, 2, 4, 0, 7],
458            5, // sequence 5, 6, 7, 8, 9
459        );
460        memtable.write(&kvs).unwrap();
461
462        let iter = memtable.iter(None, None, None).unwrap();
463        let read = collect_iter_timestamps(iter);
464        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
465
466        let iter = memtable.iter(None, None, None).unwrap();
467        let read = iter
468            .flat_map(|batch| {
469                batch
470                    .unwrap()
471                    .sequences()
472                    .iter_data()
473                    .collect::<Vec<_>>()
474                    .into_iter()
475            })
476            .map(|v| v.unwrap())
477            .collect::<Vec<_>>();
478        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
479
480        let stats = memtable.stats();
481        assert!(stats.bytes_allocated() > 0);
482        assert_eq!(
483            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
484            stats.time_range()
485        );
486    }
487
488    #[test]
489    fn test_memtable_projection() {
490        write_iter_projection(true);
491        write_iter_projection(false);
492    }
493
494    fn write_iter_projection(has_pk: bool) {
495        let metadata = if has_pk {
496            memtable_util::metadata_with_primary_key(vec![1, 0], true)
497        } else {
498            memtable_util::metadata_with_primary_key(vec![], false)
499        };
500        // Try to build a memtable via the builder.
501        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
502            .build(1, &metadata);
503
504        let expect = (0..100).collect::<Vec<_>>();
505        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
506        memtable.write(&kvs).unwrap();
507        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
508
509        let mut v0_all = vec![];
510        for res in iter {
511            let batch = res.unwrap();
512            assert_eq!(1, batch.fields().len());
513            let v0 = batch
514                .fields()
515                .first()
516                .unwrap()
517                .data
518                .as_any()
519                .downcast_ref::<Int64Vector>()
520                .unwrap();
521            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
522        }
523        assert_eq!(expect, v0_all);
524    }
525
526    #[test]
527    fn test_write_iter_multi_keys() {
528        write_iter_multi_keys(1, 100);
529        write_iter_multi_keys(2, 100);
530        write_iter_multi_keys(4, 100);
531        write_iter_multi_keys(8, 5);
532        write_iter_multi_keys(2, 10);
533    }
534
535    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
536        let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
537        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
538        let memtable = PartitionTreeMemtable::new(
539            1,
540            codec,
541            metadata.clone(),
542            None,
543            &PartitionTreeConfig {
544                index_max_keys_per_shard: max_keys,
545                data_freeze_threshold: freeze_threshold,
546                ..Default::default()
547            },
548        );
549
550        let mut data = Vec::new();
551        // 4 partitions, each partition 4 pks.
552        for i in 0..4 {
553            for j in 0..4 {
554                // key: i, a{j}
555                let timestamps = [11, 13, 1, 5, 3, 7, 9];
556                let key = format!("a{j}");
557                let kvs =
558                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
559                memtable.write(&kvs).unwrap();
560                for ts in timestamps {
561                    data.push((i, key.clone(), ts));
562                }
563            }
564            for j in 0..4 {
565                // key: i, a{j}
566                let timestamps = [10, 2, 4, 8, 6];
567                let key = format!("a{j}");
568                let kvs =
569                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
570                memtable.write(&kvs).unwrap();
571                for ts in timestamps {
572                    data.push((i, key.clone(), ts));
573                }
574            }
575        }
576        data.sort_unstable();
577
578        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
579        let iter = memtable.iter(None, None, None).unwrap();
580        let read = collect_iter_timestamps(iter);
581        assert_eq!(expect, read);
582    }
583
584    #[test]
585    fn test_memtable_filter() {
586        let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
587        // Try to build a memtable via the builder.
588        let memtable = PartitionTreeMemtableBuilder::new(
589            PartitionTreeConfig {
590                index_max_keys_per_shard: 40,
591                ..Default::default()
592            },
593            None,
594        )
595        .build(1, &metadata);
596
597        for i in 0..100 {
598            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
599            let kvs =
600                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
601            memtable.write(&kvs).unwrap();
602        }
603
604        for i in 0..100 {
605            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
606            let expr = Expr::BinaryExpr(BinaryExpr {
607                left: Box::new(Expr::Column(Column::from_name("k1"))),
608                op: Operator::Eq,
609                right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
610            });
611            let iter = memtable
612                .iter(None, Some(Predicate::new(vec![expr])), None)
613                .unwrap();
614            let read = collect_iter_timestamps(iter);
615            assert_eq!(timestamps, read);
616        }
617    }
618
619    #[test]
620    fn test_deserialize_config() {
621        let config = PartitionTreeConfig {
622            dedup: false,
623            ..Default::default()
624        };
625        // Creates a json with dedup = false.
626        let json = serde_json::to_string(&config).unwrap();
627        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
628        assert!(config.dedup);
629        assert_eq!(PartitionTreeConfig::default(), config);
630    }
631
632    fn metadata_for_metric_engine() -> RegionMetadataRef {
633        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
634        builder
635            .push_column_metadata(ColumnMetadata {
636                column_schema: ColumnSchema::new(
637                    "__table_id",
638                    ConcreteDataType::uint32_datatype(),
639                    false,
640                ),
641                semantic_type: SemanticType::Tag,
642                column_id: 2147483652,
643            })
644            .push_column_metadata(ColumnMetadata {
645                column_schema: ColumnSchema::new(
646                    "__tsid",
647                    ConcreteDataType::uint64_datatype(),
648                    false,
649                ),
650                semantic_type: SemanticType::Tag,
651                column_id: 2147483651,
652            })
653            .push_column_metadata(ColumnMetadata {
654                column_schema: ColumnSchema::new(
655                    "test_label",
656                    ConcreteDataType::string_datatype(),
657                    false,
658                ),
659                semantic_type: SemanticType::Tag,
660                column_id: 2,
661            })
662            .push_column_metadata(ColumnMetadata {
663                column_schema: ColumnSchema::new(
664                    "greptime_timestamp",
665                    ConcreteDataType::timestamp_millisecond_datatype(),
666                    false,
667                ),
668                semantic_type: SemanticType::Timestamp,
669                column_id: 0,
670            })
671            .push_column_metadata(ColumnMetadata {
672                column_schema: ColumnSchema::new(
673                    "greptime_value",
674                    ConcreteDataType::float64_datatype(),
675                    true,
676                ),
677                semantic_type: SemanticType::Field,
678                column_id: 1,
679            })
680            .primary_key(vec![2147483652, 2147483651, 2]);
681        let region_metadata = builder.build().unwrap();
682        Arc::new(region_metadata)
683    }
684
685    fn build_key_values(
686        metadata: RegionMetadataRef,
687        labels: &[&str],
688        table_id: &[u32],
689        ts_id: &[u64],
690        ts: &[i64],
691        values: &[f64],
692        sequence: u64,
693    ) -> KeyValues {
694        let column_schema = region_metadata_to_row_schema(&metadata);
695
696        let rows = ts
697            .iter()
698            .zip(table_id.iter())
699            .zip(ts_id.iter())
700            .zip(labels.iter())
701            .zip(values.iter())
702            .map(|((((ts, table_id), ts_id), label), val)| Row {
703                values: vec![
704                    api::v1::Value {
705                        value_data: Some(ValueData::U32Value(*table_id)),
706                    },
707                    api::v1::Value {
708                        value_data: Some(ValueData::U64Value(*ts_id)),
709                    },
710                    api::v1::Value {
711                        value_data: Some(ValueData::StringValue(label.to_string())),
712                    },
713                    api::v1::Value {
714                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
715                    },
716                    api::v1::Value {
717                        value_data: Some(ValueData::F64Value(*val)),
718                    },
719                ],
720            })
721            .collect();
722        let mutation = api::v1::Mutation {
723            op_type: 1,
724            sequence,
725            rows: Some(Rows {
726                schema: column_schema,
727                rows,
728            }),
729            write_hint: None,
730        };
731        KeyValues::new(metadata.as_ref(), mutation).unwrap()
732    }
733
734    #[test]
735    fn test_write_freeze() {
736        let metadata = metadata_for_metric_engine();
737        let memtable = PartitionTreeMemtableBuilder::new(
738            PartitionTreeConfig {
739                index_max_keys_per_shard: 40,
740                ..Default::default()
741            },
742            None,
743        )
744        .build(1, &metadata);
745
746        let codec = DensePrimaryKeyCodec::new(&metadata);
747
748        memtable
749            .write(&build_key_values(
750                metadata.clone(),
751                &["daily", "10min", "daily", "10min"],
752                &[1025, 1025, 1025, 1025],
753                &[
754                    16442255374049317291,
755                    5686004715529701024,
756                    16442255374049317291,
757                    5686004715529701024,
758                ],
759                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
760                &[0.0, 0.0, 0.0, 0.0],
761                1,
762            ))
763            .unwrap();
764
765        memtable.freeze().unwrap();
766        let new_memtable = memtable.fork(2, &metadata);
767
768        new_memtable
769            .write(&build_key_values(
770                metadata.clone(),
771                &["10min"],
772                &[1025],
773                &[5686004715529701024],
774                &[1714643131000],
775                &[0.1],
776                2,
777            ))
778            .unwrap();
779
780        let mut reader = new_memtable.iter(None, None, None).unwrap();
781        let batch = reader.next().unwrap().unwrap();
782        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
783        if let Value::String(s) = &pk[2] {
784            assert_eq!("10min", s.as_utf8());
785        } else {
786            unreachable!()
787        }
788    }
789}