mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod primary_key_filter;
23mod shard;
24mod shard_builder;
25mod tree;
26
27use std::fmt;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29use std::sync::Arc;
30
31use common_base::readable_size::ReadableSize;
32pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::key_values::KeyValue;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46    PredicateGroup,
47};
48use crate::region::options::MergeMode;
49use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
50
51/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
52pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56/// Id of a shard, only unique inside a partition.
57type ShardId = u32;
58/// Index of a primary key in a shard.
59type PkIndex = u16;
60
61/// Id of a primary key inside a tree.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64    shard_id: ShardId,
65    pk_index: PkIndex,
66}
67
68// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
69// memtable then we will use a lot memory. We should find a better way to control the
70// dictionary size.
71/// Config for the partition tree memtable.
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75    /// Max keys in an index shard.
76    pub index_max_keys_per_shard: usize,
77    /// Number of rows to freeze a data part.
78    pub data_freeze_threshold: usize,
79    /// Whether to delete duplicates rows.
80    ///
81    /// Skips deserializing as it should be determined by whether the
82    /// table is append only.
83    #[serde(skip_deserializing)]
84    pub dedup: bool,
85    /// Total bytes of dictionary to keep in fork.
86    pub fork_dictionary_bytes: ReadableSize,
87    /// Merge mode of the tree.
88    #[serde(skip_deserializing)]
89    pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93    fn default() -> Self {
94        let mut fork_dictionary_bytes = ReadableSize::mb(512);
95        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
96            let adjust_dictionary_bytes =
97                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98            if adjust_dictionary_bytes.0 > 0 {
99                fork_dictionary_bytes = adjust_dictionary_bytes;
100            }
101        }
102
103        Self {
104            index_max_keys_per_shard: 8192,
105            data_freeze_threshold: 131072,
106            dedup: true,
107            fork_dictionary_bytes,
108            merge_mode: MergeMode::LastRow,
109        }
110    }
111}
112
113/// Memtable based on a partition tree.
114pub struct PartitionTreeMemtable {
115    id: MemtableId,
116    tree: Arc<PartitionTree>,
117    alloc_tracker: AllocTracker,
118    max_timestamp: AtomicI64,
119    min_timestamp: AtomicI64,
120    max_sequence: AtomicU64,
121    /// Total written rows in memtable. This also includes deleted and duplicated rows.
122    num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("PartitionTreeMemtable")
128            .field("id", &self.id)
129            .finish()
130    }
131}
132
133impl Memtable for PartitionTreeMemtable {
134    fn id(&self) -> MemtableId {
135        self.id
136    }
137
138    fn write(&self, kvs: &KeyValues) -> Result<()> {
139        if kvs.is_empty() {
140            return Ok(());
141        }
142
143        // TODO(yingwen): Validate schema while inserting rows.
144
145        let mut metrics = WriteMetrics::default();
146        let mut pk_buffer = Vec::new();
147        // Ensures the memtable always updates stats.
148        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150        self.update_stats(&metrics);
151
152        // update max_sequence
153        if res.is_ok() {
154            let sequence = kvs.max_sequence();
155            self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
156        }
157
158        self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
159        res
160    }
161
162    fn write_one(&self, key_value: KeyValue) -> Result<()> {
163        let mut metrics = WriteMetrics::default();
164        let mut pk_buffer = Vec::new();
165        // Ensures the memtable always updates stats.
166        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
167
168        self.update_stats(&metrics);
169
170        // update max_sequence
171        if res.is_ok() {
172            self.max_sequence
173                .fetch_max(key_value.sequence(), Ordering::Relaxed);
174        }
175
176        self.num_rows.fetch_add(1, Ordering::Relaxed);
177        res
178    }
179
180    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
181        UnsupportedOperationSnafu {
182            err_msg: "PartitionTreeMemtable does not support write_bulk",
183        }
184        .fail()
185    }
186
187    fn iter(
188        &self,
189        projection: Option<&[ColumnId]>,
190        predicate: Option<Predicate>,
191        sequence: Option<SequenceNumber>,
192    ) -> Result<BoxedBatchIterator> {
193        self.tree.read(projection, predicate, sequence)
194    }
195
196    fn ranges(
197        &self,
198        projection: Option<&[ColumnId]>,
199        predicate: PredicateGroup,
200        sequence: Option<SequenceNumber>,
201    ) -> MemtableRanges {
202        let projection = projection.map(|ids| ids.to_vec());
203        let builder = Box::new(PartitionTreeIterBuilder {
204            tree: self.tree.clone(),
205            projection,
206            predicate: predicate.predicate().cloned(),
207            sequence,
208        });
209        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
210
211        MemtableRanges {
212            ranges: [(0, MemtableRange::new(context))].into(),
213            stats: self.stats(),
214        }
215    }
216
217    fn is_empty(&self) -> bool {
218        self.tree.is_empty()
219    }
220
221    fn freeze(&self) -> Result<()> {
222        self.alloc_tracker.done_allocating();
223
224        self.tree.freeze()
225    }
226
227    fn stats(&self) -> MemtableStats {
228        let estimated_bytes = self.alloc_tracker.bytes_allocated();
229
230        if estimated_bytes == 0 {
231            // no rows ever written
232            return MemtableStats {
233                estimated_bytes,
234                time_range: None,
235                num_rows: 0,
236                num_ranges: 0,
237                max_sequence: 0,
238            };
239        }
240
241        let ts_type = self
242            .tree
243            .metadata
244            .time_index_column()
245            .column_schema
246            .data_type
247            .clone()
248            .as_timestamp()
249            .expect("Timestamp column must have timestamp type");
250        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
251        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
252        MemtableStats {
253            estimated_bytes,
254            time_range: Some((min_timestamp, max_timestamp)),
255            num_rows: self.num_rows.load(Ordering::Relaxed),
256            num_ranges: 1,
257            max_sequence: self.max_sequence.load(Ordering::Relaxed),
258        }
259    }
260
261    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
262        let tree = self.tree.fork(metadata.clone());
263
264        let memtable = PartitionTreeMemtable::with_tree(id, tree);
265        Arc::new(memtable)
266    }
267}
268
269impl PartitionTreeMemtable {
270    /// Returns a new memtable.
271    pub fn new(
272        id: MemtableId,
273        row_codec: Arc<dyn PrimaryKeyCodec>,
274        metadata: RegionMetadataRef,
275        write_buffer_manager: Option<WriteBufferManagerRef>,
276        config: &PartitionTreeConfig,
277    ) -> Self {
278        Self::with_tree(
279            id,
280            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
281        )
282    }
283
284    /// Creates a mutable memtable from the tree.
285    ///
286    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
287    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
288        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
289
290        Self {
291            id,
292            tree: Arc::new(tree),
293            alloc_tracker,
294            max_timestamp: AtomicI64::new(i64::MIN),
295            min_timestamp: AtomicI64::new(i64::MAX),
296            num_rows: AtomicUsize::new(0),
297            max_sequence: AtomicU64::new(0),
298        }
299    }
300
301    /// Updates stats of the memtable.
302    fn update_stats(&self, metrics: &WriteMetrics) {
303        // Only let the tracker tracks value bytes.
304        self.alloc_tracker.on_allocation(metrics.value_bytes);
305        self.max_timestamp
306            .fetch_max(metrics.max_ts, Ordering::SeqCst);
307        self.min_timestamp
308            .fetch_min(metrics.min_ts, Ordering::SeqCst);
309    }
310}
311
312/// Builder to build a [PartitionTreeMemtable].
313#[derive(Debug, Default)]
314pub struct PartitionTreeMemtableBuilder {
315    config: PartitionTreeConfig,
316    write_buffer_manager: Option<WriteBufferManagerRef>,
317}
318
319impl PartitionTreeMemtableBuilder {
320    /// Creates a new builder with specific `write_buffer_manager`.
321    pub fn new(
322        config: PartitionTreeConfig,
323        write_buffer_manager: Option<WriteBufferManagerRef>,
324    ) -> Self {
325        Self {
326            config,
327            write_buffer_manager,
328        }
329    }
330}
331
332impl MemtableBuilder for PartitionTreeMemtableBuilder {
333    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
334        let codec = build_primary_key_codec(metadata);
335        Arc::new(PartitionTreeMemtable::new(
336            id,
337            codec,
338            metadata.clone(),
339            self.write_buffer_manager.clone(),
340            &self.config,
341        ))
342    }
343}
344
345struct PartitionTreeIterBuilder {
346    tree: Arc<PartitionTree>,
347    projection: Option<Vec<ColumnId>>,
348    predicate: Option<Predicate>,
349    sequence: Option<SequenceNumber>,
350}
351
352impl IterBuilder for PartitionTreeIterBuilder {
353    fn build(&self) -> Result<BoxedBatchIterator> {
354        self.tree.read(
355            self.projection.as_deref(),
356            self.predicate.clone(),
357            self.sequence,
358        )
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use api::v1::value::ValueData;
365    use api::v1::{Row, Rows, SemanticType};
366    use common_time::Timestamp;
367    use datafusion_common::{Column, ScalarValue};
368    use datafusion_expr::{BinaryExpr, Expr, Operator};
369    use datatypes::data_type::ConcreteDataType;
370    use datatypes::scalars::ScalarVector;
371    use datatypes::schema::ColumnSchema;
372    use datatypes::value::Value;
373    use datatypes::vectors::Int64Vector;
374    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
375    use store_api::storage::RegionId;
376
377    use super::*;
378    use crate::row_converter::DensePrimaryKeyCodec;
379    use crate::test_util::memtable_util::{
380        self, collect_iter_timestamps, region_metadata_to_row_schema,
381    };
382
383    #[test]
384    fn test_memtable_sorted_input() {
385        write_iter_sorted_input(true);
386        write_iter_sorted_input(false);
387    }
388
389    fn write_iter_sorted_input(has_pk: bool) {
390        let metadata = if has_pk {
391            memtable_util::metadata_with_primary_key(vec![1, 0], true)
392        } else {
393            memtable_util::metadata_with_primary_key(vec![], false)
394        };
395        let timestamps = (0..100).collect::<Vec<_>>();
396        let kvs =
397            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
398        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
399        let memtable = PartitionTreeMemtable::new(
400            1,
401            codec,
402            metadata.clone(),
403            None,
404            &PartitionTreeConfig::default(),
405        );
406        memtable.write(&kvs).unwrap();
407
408        let expected_ts = kvs
409            .iter()
410            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
411            .collect::<Vec<_>>();
412
413        let iter = memtable.iter(None, None, None).unwrap();
414        let read = collect_iter_timestamps(iter);
415        assert_eq!(expected_ts, read);
416
417        let stats = memtable.stats();
418        assert!(stats.bytes_allocated() > 0);
419        assert_eq!(
420            Some((
421                Timestamp::new_millisecond(0),
422                Timestamp::new_millisecond(99)
423            )),
424            stats.time_range()
425        );
426    }
427
428    #[test]
429    fn test_memtable_unsorted_input() {
430        write_iter_unsorted_input(true);
431        write_iter_unsorted_input(false);
432    }
433
434    fn write_iter_unsorted_input(has_pk: bool) {
435        let metadata = if has_pk {
436            memtable_util::metadata_with_primary_key(vec![1, 0], true)
437        } else {
438            memtable_util::metadata_with_primary_key(vec![], false)
439        };
440        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
441        let memtable = PartitionTreeMemtable::new(
442            1,
443            codec,
444            metadata.clone(),
445            None,
446            &PartitionTreeConfig::default(),
447        );
448
449        let kvs = memtable_util::build_key_values(
450            &metadata,
451            "hello".to_string(),
452            0,
453            &[1, 3, 7, 5, 6],
454            0, // sequence 0, 1, 2, 3, 4
455        );
456        memtable.write(&kvs).unwrap();
457
458        let kvs = memtable_util::build_key_values(
459            &metadata,
460            "hello".to_string(),
461            0,
462            &[5, 2, 4, 0, 7],
463            5, // sequence 5, 6, 7, 8, 9
464        );
465        memtable.write(&kvs).unwrap();
466
467        let iter = memtable.iter(None, None, None).unwrap();
468        let read = collect_iter_timestamps(iter);
469        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
470
471        let iter = memtable.iter(None, None, None).unwrap();
472        let read = iter
473            .flat_map(|batch| {
474                batch
475                    .unwrap()
476                    .sequences()
477                    .iter_data()
478                    .collect::<Vec<_>>()
479                    .into_iter()
480            })
481            .map(|v| v.unwrap())
482            .collect::<Vec<_>>();
483        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
484
485        let stats = memtable.stats();
486        assert!(stats.bytes_allocated() > 0);
487        assert_eq!(
488            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
489            stats.time_range()
490        );
491    }
492
493    #[test]
494    fn test_memtable_projection() {
495        write_iter_projection(true);
496        write_iter_projection(false);
497    }
498
499    fn write_iter_projection(has_pk: bool) {
500        let metadata = if has_pk {
501            memtable_util::metadata_with_primary_key(vec![1, 0], true)
502        } else {
503            memtable_util::metadata_with_primary_key(vec![], false)
504        };
505        // Try to build a memtable via the builder.
506        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
507            .build(1, &metadata);
508
509        let expect = (0..100).collect::<Vec<_>>();
510        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
511        memtable.write(&kvs).unwrap();
512        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
513
514        let mut v0_all = vec![];
515        for res in iter {
516            let batch = res.unwrap();
517            assert_eq!(1, batch.fields().len());
518            let v0 = batch
519                .fields()
520                .first()
521                .unwrap()
522                .data
523                .as_any()
524                .downcast_ref::<Int64Vector>()
525                .unwrap();
526            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
527        }
528        assert_eq!(expect, v0_all);
529    }
530
531    #[test]
532    fn test_write_iter_multi_keys() {
533        write_iter_multi_keys(1, 100);
534        write_iter_multi_keys(2, 100);
535        write_iter_multi_keys(4, 100);
536        write_iter_multi_keys(8, 5);
537        write_iter_multi_keys(2, 10);
538    }
539
540    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
541        let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
542        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
543        let memtable = PartitionTreeMemtable::new(
544            1,
545            codec,
546            metadata.clone(),
547            None,
548            &PartitionTreeConfig {
549                index_max_keys_per_shard: max_keys,
550                data_freeze_threshold: freeze_threshold,
551                ..Default::default()
552            },
553        );
554
555        let mut data = Vec::new();
556        // 4 partitions, each partition 4 pks.
557        for i in 0..4 {
558            for j in 0..4 {
559                // key: i, a{j}
560                let timestamps = [11, 13, 1, 5, 3, 7, 9];
561                let key = format!("a{j}");
562                let kvs =
563                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
564                memtable.write(&kvs).unwrap();
565                for ts in timestamps {
566                    data.push((i, key.clone(), ts));
567                }
568            }
569            for j in 0..4 {
570                // key: i, a{j}
571                let timestamps = [10, 2, 4, 8, 6];
572                let key = format!("a{j}");
573                let kvs =
574                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
575                memtable.write(&kvs).unwrap();
576                for ts in timestamps {
577                    data.push((i, key.clone(), ts));
578                }
579            }
580        }
581        data.sort_unstable();
582
583        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
584        let iter = memtable.iter(None, None, None).unwrap();
585        let read = collect_iter_timestamps(iter);
586        assert_eq!(expect, read);
587    }
588
589    #[test]
590    fn test_memtable_filter() {
591        let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
592        // Try to build a memtable via the builder.
593        let memtable = PartitionTreeMemtableBuilder::new(
594            PartitionTreeConfig {
595                index_max_keys_per_shard: 40,
596                ..Default::default()
597            },
598            None,
599        )
600        .build(1, &metadata);
601
602        for i in 0..100 {
603            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
604            let kvs =
605                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
606            memtable.write(&kvs).unwrap();
607        }
608
609        for i in 0..100 {
610            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
611            let expr = Expr::BinaryExpr(BinaryExpr {
612                left: Box::new(Expr::Column(Column::from_name("k1"))),
613                op: Operator::Eq,
614                right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
615            });
616            let iter = memtable
617                .iter(None, Some(Predicate::new(vec![expr])), None)
618                .unwrap();
619            let read = collect_iter_timestamps(iter);
620            assert_eq!(timestamps, read);
621        }
622    }
623
624    #[test]
625    fn test_deserialize_config() {
626        let config = PartitionTreeConfig {
627            dedup: false,
628            ..Default::default()
629        };
630        // Creates a json with dedup = false.
631        let json = serde_json::to_string(&config).unwrap();
632        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
633        assert!(config.dedup);
634        assert_eq!(PartitionTreeConfig::default(), config);
635    }
636
637    fn metadata_for_metric_engine() -> RegionMetadataRef {
638        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
639        builder
640            .push_column_metadata(ColumnMetadata {
641                column_schema: ColumnSchema::new(
642                    "__table_id",
643                    ConcreteDataType::uint32_datatype(),
644                    false,
645                ),
646                semantic_type: SemanticType::Tag,
647                column_id: 2147483652,
648            })
649            .push_column_metadata(ColumnMetadata {
650                column_schema: ColumnSchema::new(
651                    "__tsid",
652                    ConcreteDataType::uint64_datatype(),
653                    false,
654                ),
655                semantic_type: SemanticType::Tag,
656                column_id: 2147483651,
657            })
658            .push_column_metadata(ColumnMetadata {
659                column_schema: ColumnSchema::new(
660                    "test_label",
661                    ConcreteDataType::string_datatype(),
662                    false,
663                ),
664                semantic_type: SemanticType::Tag,
665                column_id: 2,
666            })
667            .push_column_metadata(ColumnMetadata {
668                column_schema: ColumnSchema::new(
669                    "greptime_timestamp",
670                    ConcreteDataType::timestamp_millisecond_datatype(),
671                    false,
672                ),
673                semantic_type: SemanticType::Timestamp,
674                column_id: 0,
675            })
676            .push_column_metadata(ColumnMetadata {
677                column_schema: ColumnSchema::new(
678                    "greptime_value",
679                    ConcreteDataType::float64_datatype(),
680                    true,
681                ),
682                semantic_type: SemanticType::Field,
683                column_id: 1,
684            })
685            .primary_key(vec![2147483652, 2147483651, 2]);
686        let region_metadata = builder.build().unwrap();
687        Arc::new(region_metadata)
688    }
689
690    fn build_key_values(
691        metadata: RegionMetadataRef,
692        labels: &[&str],
693        table_id: &[u32],
694        ts_id: &[u64],
695        ts: &[i64],
696        values: &[f64],
697        sequence: u64,
698    ) -> KeyValues {
699        let column_schema = region_metadata_to_row_schema(&metadata);
700
701        let rows = ts
702            .iter()
703            .zip(table_id.iter())
704            .zip(ts_id.iter())
705            .zip(labels.iter())
706            .zip(values.iter())
707            .map(|((((ts, table_id), ts_id), label), val)| Row {
708                values: vec![
709                    api::v1::Value {
710                        value_data: Some(ValueData::U32Value(*table_id)),
711                    },
712                    api::v1::Value {
713                        value_data: Some(ValueData::U64Value(*ts_id)),
714                    },
715                    api::v1::Value {
716                        value_data: Some(ValueData::StringValue(label.to_string())),
717                    },
718                    api::v1::Value {
719                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
720                    },
721                    api::v1::Value {
722                        value_data: Some(ValueData::F64Value(*val)),
723                    },
724                ],
725            })
726            .collect();
727        let mutation = api::v1::Mutation {
728            op_type: 1,
729            sequence,
730            rows: Some(Rows {
731                schema: column_schema,
732                rows,
733            }),
734            write_hint: None,
735        };
736        KeyValues::new(metadata.as_ref(), mutation).unwrap()
737    }
738
739    #[test]
740    fn test_write_freeze() {
741        let metadata = metadata_for_metric_engine();
742        let memtable = PartitionTreeMemtableBuilder::new(
743            PartitionTreeConfig {
744                index_max_keys_per_shard: 40,
745                ..Default::default()
746            },
747            None,
748        )
749        .build(1, &metadata);
750
751        let codec = DensePrimaryKeyCodec::new(&metadata);
752
753        memtable
754            .write(&build_key_values(
755                metadata.clone(),
756                &["daily", "10min", "daily", "10min"],
757                &[1025, 1025, 1025, 1025],
758                &[
759                    16442255374049317291,
760                    5686004715529701024,
761                    16442255374049317291,
762                    5686004715529701024,
763                ],
764                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
765                &[0.0, 0.0, 0.0, 0.0],
766                1,
767            ))
768            .unwrap();
769
770        memtable.freeze().unwrap();
771        let new_memtable = memtable.fork(2, &metadata);
772
773        new_memtable
774            .write(&build_key_values(
775                metadata.clone(),
776                &["10min"],
777                &[1025],
778                &[5686004715529701024],
779                &[1714643131000],
780                &[0.1],
781                2,
782            ))
783            .unwrap();
784
785        let mut reader = new_memtable.iter(None, None, None).unwrap();
786        let batch = reader.next().unwrap().unwrap();
787        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
788        if let Value::String(s) = &pk[2] {
789            assert_eq!("10min", s.as_utf8());
790        } else {
791            unreachable!()
792        }
793    }
794}