mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46    PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55/// Id of a shard, only unique inside a partition.
56type ShardId = u32;
57/// Index of a primary key in a shard.
58type PkIndex = u16;
59
60/// Id of a primary key inside a tree.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63    shard_id: ShardId,
64    pk_index: PkIndex,
65}
66
67// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
68// memtable then we will use a lot memory. We should find a better way to control the
69// dictionary size.
70/// Config for the partition tree memtable.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74    /// Max keys in an index shard.
75    pub index_max_keys_per_shard: usize,
76    /// Number of rows to freeze a data part.
77    pub data_freeze_threshold: usize,
78    /// Whether to delete duplicates rows.
79    ///
80    /// Skips deserializing as it should be determined by whether the
81    /// table is append only.
82    #[serde(skip_deserializing)]
83    pub dedup: bool,
84    /// Total bytes of dictionary to keep in fork.
85    pub fork_dictionary_bytes: ReadableSize,
86    /// Merge mode of the tree.
87    #[serde(skip_deserializing)]
88    pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92    fn default() -> Self {
93        let mut fork_dictionary_bytes = ReadableSize::mb(512);
94        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95            let adjust_dictionary_bytes =
96                std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97            if adjust_dictionary_bytes.0 > 0 {
98                fork_dictionary_bytes = adjust_dictionary_bytes;
99            }
100        }
101
102        Self {
103            index_max_keys_per_shard: 8192,
104            data_freeze_threshold: 131072,
105            dedup: true,
106            fork_dictionary_bytes,
107            merge_mode: MergeMode::LastRow,
108        }
109    }
110}
111
112/// Memtable based on a partition tree.
113pub struct PartitionTreeMemtable {
114    id: MemtableId,
115    tree: Arc<PartitionTree>,
116    alloc_tracker: AllocTracker,
117    max_timestamp: AtomicI64,
118    min_timestamp: AtomicI64,
119    max_sequence: AtomicU64,
120    /// Total written rows in memtable. This also includes deleted and duplicated rows.
121    num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("PartitionTreeMemtable")
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl Memtable for PartitionTreeMemtable {
133    fn id(&self) -> MemtableId {
134        self.id
135    }
136
137    fn write(&self, kvs: &KeyValues) -> Result<()> {
138        if kvs.is_empty() {
139            return Ok(());
140        }
141
142        // TODO(yingwen): Validate schema while inserting rows.
143
144        let mut metrics = WriteMetrics::default();
145        let mut pk_buffer = Vec::new();
146        // Ensures the memtable always updates stats.
147        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149        if res.is_ok() {
150            metrics.max_sequence = kvs.max_sequence();
151            metrics.num_rows = kvs.num_rows();
152            self.update_stats(&metrics);
153        }
154        res
155    }
156
157    fn write_one(&self, key_value: KeyValue) -> Result<()> {
158        let mut metrics = WriteMetrics::default();
159        let mut pk_buffer = Vec::new();
160        // Ensures the memtable always updates stats.
161        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163        // update max_sequence
164        if res.is_ok() {
165            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166            metrics.num_rows = 1;
167            self.update_stats(&metrics);
168        }
169        res
170    }
171
172    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173        UnsupportedOperationSnafu {
174            err_msg: "PartitionTreeMemtable does not support write_bulk",
175        }
176        .fail()
177    }
178
179    fn iter(
180        &self,
181        projection: Option<&[ColumnId]>,
182        predicate: Option<Predicate>,
183        sequence: Option<SequenceNumber>,
184    ) -> Result<BoxedBatchIterator> {
185        self.tree.read(projection, predicate, sequence)
186    }
187
188    fn ranges(
189        &self,
190        projection: Option<&[ColumnId]>,
191        predicate: PredicateGroup,
192        sequence: Option<SequenceNumber>,
193    ) -> Result<MemtableRanges> {
194        let projection = projection.map(|ids| ids.to_vec());
195        let builder = Box::new(PartitionTreeIterBuilder {
196            tree: self.tree.clone(),
197            projection,
198            predicate: predicate.predicate().cloned(),
199            sequence,
200        });
201        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
202
203        Ok(MemtableRanges {
204            ranges: [(0, MemtableRange::new(context))].into(),
205            stats: self.stats(),
206        })
207    }
208
209    fn is_empty(&self) -> bool {
210        self.tree.is_empty()
211    }
212
213    fn freeze(&self) -> Result<()> {
214        self.alloc_tracker.done_allocating();
215
216        self.tree.freeze()
217    }
218
219    fn stats(&self) -> MemtableStats {
220        let estimated_bytes = self.alloc_tracker.bytes_allocated();
221
222        if estimated_bytes == 0 {
223            // no rows ever written
224            return MemtableStats {
225                estimated_bytes,
226                time_range: None,
227                num_rows: 0,
228                num_ranges: 0,
229                max_sequence: 0,
230                series_count: 0,
231            };
232        }
233
234        let ts_type = self
235            .tree
236            .metadata
237            .time_index_column()
238            .column_schema
239            .data_type
240            .clone()
241            .as_timestamp()
242            .expect("Timestamp column must have timestamp type");
243        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
244        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
245        let series_count = self.tree.series_count();
246        MemtableStats {
247            estimated_bytes,
248            time_range: Some((min_timestamp, max_timestamp)),
249            num_rows: self.num_rows.load(Ordering::Relaxed),
250            num_ranges: 1,
251            max_sequence: self.max_sequence.load(Ordering::Relaxed),
252            series_count,
253        }
254    }
255
256    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
257        let tree = self.tree.fork(metadata.clone());
258
259        let memtable = PartitionTreeMemtable::with_tree(id, tree);
260        Arc::new(memtable)
261    }
262}
263
264impl PartitionTreeMemtable {
265    /// Returns a new memtable.
266    pub fn new(
267        id: MemtableId,
268        row_codec: Arc<dyn PrimaryKeyCodec>,
269        metadata: RegionMetadataRef,
270        write_buffer_manager: Option<WriteBufferManagerRef>,
271        config: &PartitionTreeConfig,
272    ) -> Self {
273        Self::with_tree(
274            id,
275            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
276        )
277    }
278
279    /// Creates a mutable memtable from the tree.
280    ///
281    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
282    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
283        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
284
285        Self {
286            id,
287            tree: Arc::new(tree),
288            alloc_tracker,
289            max_timestamp: AtomicI64::new(i64::MIN),
290            min_timestamp: AtomicI64::new(i64::MAX),
291            num_rows: AtomicUsize::new(0),
292            max_sequence: AtomicU64::new(0),
293        }
294    }
295
296    /// Updates stats of the memtable.
297    fn update_stats(&self, metrics: &WriteMetrics) {
298        // Only let the tracker tracks value bytes.
299        self.alloc_tracker.on_allocation(metrics.value_bytes);
300        self.max_timestamp
301            .fetch_max(metrics.max_ts, Ordering::SeqCst);
302        self.min_timestamp
303            .fetch_min(metrics.min_ts, Ordering::SeqCst);
304        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
305        self.max_sequence
306            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
307    }
308}
309
310/// Builder to build a [PartitionTreeMemtable].
311#[derive(Debug, Default)]
312pub struct PartitionTreeMemtableBuilder {
313    config: PartitionTreeConfig,
314    write_buffer_manager: Option<WriteBufferManagerRef>,
315}
316
317impl PartitionTreeMemtableBuilder {
318    /// Creates a new builder with specific `write_buffer_manager`.
319    pub fn new(
320        config: PartitionTreeConfig,
321        write_buffer_manager: Option<WriteBufferManagerRef>,
322    ) -> Self {
323        Self {
324            config,
325            write_buffer_manager,
326        }
327    }
328}
329
330impl MemtableBuilder for PartitionTreeMemtableBuilder {
331    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
332        let codec = build_primary_key_codec(metadata);
333        Arc::new(PartitionTreeMemtable::new(
334            id,
335            codec,
336            metadata.clone(),
337            self.write_buffer_manager.clone(),
338            &self.config,
339        ))
340    }
341}
342
343struct PartitionTreeIterBuilder {
344    tree: Arc<PartitionTree>,
345    projection: Option<Vec<ColumnId>>,
346    predicate: Option<Predicate>,
347    sequence: Option<SequenceNumber>,
348}
349
350impl IterBuilder for PartitionTreeIterBuilder {
351    fn build(&self) -> Result<BoxedBatchIterator> {
352        self.tree.read(
353            self.projection.as_deref(),
354            self.predicate.clone(),
355            self.sequence,
356        )
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::collections::HashMap;
363
364    use api::v1::value::ValueData;
365    use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
366    use common_time::Timestamp;
367    use datafusion_common::{Column, ScalarValue};
368    use datafusion_expr::{BinaryExpr, Expr, Operator};
369    use datatypes::data_type::ConcreteDataType;
370    use datatypes::prelude::Vector;
371    use datatypes::scalars::ScalarVector;
372    use datatypes::schema::ColumnSchema;
373    use datatypes::value::Value;
374    use datatypes::vectors::{Int64Vector, StringVector};
375    use mito_codec::row_converter::DensePrimaryKeyCodec;
376    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
377    use store_api::storage::RegionId;
378
379    use super::*;
380    use crate::test_util::memtable_util::{
381        self, collect_iter_timestamps, region_metadata_to_row_schema,
382    };
383
384    #[test]
385    fn test_memtable_sorted_input() {
386        write_iter_sorted_input(true);
387        write_iter_sorted_input(false);
388    }
389
390    fn write_iter_sorted_input(has_pk: bool) {
391        let metadata = if has_pk {
392            memtable_util::metadata_with_primary_key(vec![1, 0], true)
393        } else {
394            memtable_util::metadata_with_primary_key(vec![], false)
395        };
396        let timestamps = (0..100).collect::<Vec<_>>();
397        let kvs =
398            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
399        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
400        let memtable = PartitionTreeMemtable::new(
401            1,
402            codec,
403            metadata.clone(),
404            None,
405            &PartitionTreeConfig::default(),
406        );
407        memtable.write(&kvs).unwrap();
408
409        let expected_ts = kvs
410            .iter()
411            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
412            .collect::<Vec<_>>();
413
414        let iter = memtable.iter(None, None, None).unwrap();
415        let read = collect_iter_timestamps(iter);
416        assert_eq!(expected_ts, read);
417
418        let stats = memtable.stats();
419        assert!(stats.bytes_allocated() > 0);
420        assert_eq!(
421            Some((
422                Timestamp::new_millisecond(0),
423                Timestamp::new_millisecond(99)
424            )),
425            stats.time_range()
426        );
427    }
428
429    #[test]
430    fn test_memtable_unsorted_input() {
431        write_iter_unsorted_input(true);
432        write_iter_unsorted_input(false);
433    }
434
435    fn write_iter_unsorted_input(has_pk: bool) {
436        let metadata = if has_pk {
437            memtable_util::metadata_with_primary_key(vec![1, 0], true)
438        } else {
439            memtable_util::metadata_with_primary_key(vec![], false)
440        };
441        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
442        let memtable = PartitionTreeMemtable::new(
443            1,
444            codec,
445            metadata.clone(),
446            None,
447            &PartitionTreeConfig::default(),
448        );
449
450        let kvs = memtable_util::build_key_values(
451            &metadata,
452            "hello".to_string(),
453            0,
454            &[1, 3, 7, 5, 6],
455            0, // sequence 0, 1, 2, 3, 4
456        );
457        memtable.write(&kvs).unwrap();
458
459        let kvs = memtable_util::build_key_values(
460            &metadata,
461            "hello".to_string(),
462            0,
463            &[5, 2, 4, 0, 7],
464            5, // sequence 5, 6, 7, 8, 9
465        );
466        memtable.write(&kvs).unwrap();
467
468        let iter = memtable.iter(None, None, None).unwrap();
469        let read = collect_iter_timestamps(iter);
470        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
471
472        let iter = memtable.iter(None, None, None).unwrap();
473        let read = iter
474            .flat_map(|batch| {
475                batch
476                    .unwrap()
477                    .sequences()
478                    .iter_data()
479                    .collect::<Vec<_>>()
480                    .into_iter()
481            })
482            .map(|v| v.unwrap())
483            .collect::<Vec<_>>();
484        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
485
486        let stats = memtable.stats();
487        assert!(stats.bytes_allocated() > 0);
488        assert_eq!(
489            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
490            stats.time_range()
491        );
492    }
493
494    #[test]
495    fn test_memtable_projection() {
496        write_iter_projection(true);
497        write_iter_projection(false);
498    }
499
500    fn write_iter_projection(has_pk: bool) {
501        let metadata = if has_pk {
502            memtable_util::metadata_with_primary_key(vec![1, 0], true)
503        } else {
504            memtable_util::metadata_with_primary_key(vec![], false)
505        };
506        // Try to build a memtable via the builder.
507        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
508            .build(1, &metadata);
509
510        let expect = (0..100).collect::<Vec<_>>();
511        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
512        memtable.write(&kvs).unwrap();
513        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
514
515        let mut v0_all = vec![];
516        for res in iter {
517            let batch = res.unwrap();
518            assert_eq!(1, batch.fields().len());
519            let v0 = batch
520                .fields()
521                .first()
522                .unwrap()
523                .data
524                .as_any()
525                .downcast_ref::<Int64Vector>()
526                .unwrap();
527            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
528        }
529        assert_eq!(expect, v0_all);
530    }
531
532    #[test]
533    fn test_write_iter_multi_keys() {
534        write_iter_multi_keys(1, 100);
535        write_iter_multi_keys(2, 100);
536        write_iter_multi_keys(4, 100);
537        write_iter_multi_keys(8, 5);
538        write_iter_multi_keys(2, 10);
539    }
540
541    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
542        let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
543        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
544        let memtable = PartitionTreeMemtable::new(
545            1,
546            codec,
547            metadata.clone(),
548            None,
549            &PartitionTreeConfig {
550                index_max_keys_per_shard: max_keys,
551                data_freeze_threshold: freeze_threshold,
552                ..Default::default()
553            },
554        );
555
556        let mut data = Vec::new();
557        // 4 partitions, each partition 4 pks.
558        for i in 0..4 {
559            for j in 0..4 {
560                // key: i, a{j}
561                let timestamps = [11, 13, 1, 5, 3, 7, 9];
562                let key = format!("a{j}");
563                let kvs =
564                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
565                memtable.write(&kvs).unwrap();
566                for ts in timestamps {
567                    data.push((i, key.clone(), ts));
568                }
569            }
570            for j in 0..4 {
571                // key: i, a{j}
572                let timestamps = [10, 2, 4, 8, 6];
573                let key = format!("a{j}");
574                let kvs =
575                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
576                memtable.write(&kvs).unwrap();
577                for ts in timestamps {
578                    data.push((i, key.clone(), ts));
579                }
580            }
581        }
582        data.sort_unstable();
583
584        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
585        let iter = memtable.iter(None, None, None).unwrap();
586        let read = collect_iter_timestamps(iter);
587        assert_eq!(expect, read);
588    }
589
590    #[test]
591    fn test_memtable_filter() {
592        let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
593        // Try to build a memtable via the builder.
594        let memtable = PartitionTreeMemtableBuilder::new(
595            PartitionTreeConfig {
596                index_max_keys_per_shard: 40,
597                ..Default::default()
598            },
599            None,
600        )
601        .build(1, &metadata);
602
603        for i in 0..100 {
604            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
605            let kvs =
606                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
607            memtable.write(&kvs).unwrap();
608        }
609
610        for i in 0..100 {
611            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
612            let expr = Expr::BinaryExpr(BinaryExpr {
613                left: Box::new(Expr::Column(Column::from_name("k1"))),
614                op: Operator::Eq,
615                right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
616            });
617            let iter = memtable
618                .iter(None, Some(Predicate::new(vec![expr])), None)
619                .unwrap();
620            let read = collect_iter_timestamps(iter);
621            assert_eq!(timestamps, read);
622        }
623    }
624
625    #[test]
626    fn test_deserialize_config() {
627        let config = PartitionTreeConfig {
628            dedup: false,
629            ..Default::default()
630        };
631        // Creates a json with dedup = false.
632        let json = serde_json::to_string(&config).unwrap();
633        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
634        assert!(config.dedup);
635        assert_eq!(PartitionTreeConfig::default(), config);
636    }
637
638    fn metadata_for_metric_engine() -> RegionMetadataRef {
639        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
640        builder
641            .push_column_metadata(ColumnMetadata {
642                column_schema: ColumnSchema::new(
643                    "__table_id",
644                    ConcreteDataType::uint32_datatype(),
645                    false,
646                ),
647                semantic_type: SemanticType::Tag,
648                column_id: 2147483652,
649            })
650            .push_column_metadata(ColumnMetadata {
651                column_schema: ColumnSchema::new(
652                    "__tsid",
653                    ConcreteDataType::uint64_datatype(),
654                    false,
655                ),
656                semantic_type: SemanticType::Tag,
657                column_id: 2147483651,
658            })
659            .push_column_metadata(ColumnMetadata {
660                column_schema: ColumnSchema::new(
661                    "test_label",
662                    ConcreteDataType::string_datatype(),
663                    false,
664                ),
665                semantic_type: SemanticType::Tag,
666                column_id: 2,
667            })
668            .push_column_metadata(ColumnMetadata {
669                column_schema: ColumnSchema::new(
670                    "greptime_timestamp",
671                    ConcreteDataType::timestamp_millisecond_datatype(),
672                    false,
673                ),
674                semantic_type: SemanticType::Timestamp,
675                column_id: 0,
676            })
677            .push_column_metadata(ColumnMetadata {
678                column_schema: ColumnSchema::new(
679                    "greptime_value",
680                    ConcreteDataType::float64_datatype(),
681                    true,
682                ),
683                semantic_type: SemanticType::Field,
684                column_id: 1,
685            })
686            .primary_key(vec![2147483652, 2147483651, 2]);
687        let region_metadata = builder.build().unwrap();
688        Arc::new(region_metadata)
689    }
690
691    fn build_key_values(
692        metadata: RegionMetadataRef,
693        labels: &[&str],
694        table_id: &[u32],
695        ts_id: &[u64],
696        ts: &[i64],
697        values: &[f64],
698        sequence: u64,
699    ) -> KeyValues {
700        let column_schema = region_metadata_to_row_schema(&metadata);
701
702        let rows = ts
703            .iter()
704            .zip(table_id.iter())
705            .zip(ts_id.iter())
706            .zip(labels.iter())
707            .zip(values.iter())
708            .map(|((((ts, table_id), ts_id), label), val)| Row {
709                values: vec![
710                    api::v1::Value {
711                        value_data: Some(ValueData::U32Value(*table_id)),
712                    },
713                    api::v1::Value {
714                        value_data: Some(ValueData::U64Value(*ts_id)),
715                    },
716                    api::v1::Value {
717                        value_data: Some(ValueData::StringValue(label.to_string())),
718                    },
719                    api::v1::Value {
720                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
721                    },
722                    api::v1::Value {
723                        value_data: Some(ValueData::F64Value(*val)),
724                    },
725                ],
726            })
727            .collect();
728        let mutation = api::v1::Mutation {
729            op_type: 1,
730            sequence,
731            rows: Some(Rows {
732                schema: column_schema,
733                rows,
734            }),
735            write_hint: None,
736        };
737        KeyValues::new(metadata.as_ref(), mutation).unwrap()
738    }
739
740    #[test]
741    fn test_write_freeze() {
742        let metadata = metadata_for_metric_engine();
743        let memtable = PartitionTreeMemtableBuilder::new(
744            PartitionTreeConfig {
745                index_max_keys_per_shard: 40,
746                ..Default::default()
747            },
748            None,
749        )
750        .build(1, &metadata);
751
752        let codec = DensePrimaryKeyCodec::new(&metadata);
753
754        memtable
755            .write(&build_key_values(
756                metadata.clone(),
757                &["daily", "10min", "daily", "10min"],
758                &[1025, 1025, 1025, 1025],
759                &[
760                    16442255374049317291,
761                    5686004715529701024,
762                    16442255374049317291,
763                    5686004715529701024,
764                ],
765                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
766                &[0.0, 0.0, 0.0, 0.0],
767                1,
768            ))
769            .unwrap();
770
771        memtable.freeze().unwrap();
772        let new_memtable = memtable.fork(2, &metadata);
773
774        new_memtable
775            .write(&build_key_values(
776                metadata.clone(),
777                &["10min"],
778                &[1025],
779                &[5686004715529701024],
780                &[1714643131000],
781                &[0.1],
782                2,
783            ))
784            .unwrap();
785
786        let mut reader = new_memtable.iter(None, None, None).unwrap();
787        let batch = reader.next().unwrap().unwrap();
788        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
789        if let Value::String(s) = &pk[2] {
790            assert_eq!("10min", s.as_utf8());
791        } else {
792            unreachable!()
793        }
794    }
795
796    fn kv_region_metadata() -> RegionMetadataRef {
797        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
798        builder
799            .push_column_metadata(ColumnMetadata {
800                column_schema: ColumnSchema::new(
801                    "ts",
802                    ConcreteDataType::timestamp_millisecond_datatype(),
803                    false,
804                ),
805                semantic_type: SemanticType::Timestamp,
806                column_id: 0,
807            })
808            .push_column_metadata(ColumnMetadata {
809                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
810                semantic_type: SemanticType::Tag,
811                column_id: 1,
812            })
813            .push_column_metadata(ColumnMetadata {
814                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
815                semantic_type: SemanticType::Field,
816                column_id: 2,
817            })
818            .primary_key(vec![1]);
819        let region_metadata = builder.build().unwrap();
820        Arc::new(region_metadata)
821    }
822
823    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
824        vec![
825            api::v1::ColumnSchema {
826                column_name: "ts".to_string(),
827                datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
828                semantic_type: SemanticType::Timestamp as i32,
829                ..Default::default()
830            },
831            api::v1::ColumnSchema {
832                column_name: "k".to_string(),
833                datatype: api::v1::ColumnDataType::String as i32,
834                semantic_type: SemanticType::Tag as i32,
835                ..Default::default()
836            },
837            api::v1::ColumnSchema {
838                column_name: "v".to_string(),
839                datatype: api::v1::ColumnDataType::String as i32,
840                semantic_type: SemanticType::Field as i32,
841                ..Default::default()
842            },
843        ]
844    }
845
846    fn key_values<T: AsRef<str>>(
847        metadata: &RegionMetadataRef,
848        keys: impl Iterator<Item = T>,
849    ) -> KeyValues {
850        let rows = keys
851            .map(|c| Row {
852                values: vec![
853                    api::v1::Value {
854                        value_data: Some(ValueData::TimestampMillisecondValue(0)),
855                    },
856                    api::v1::Value {
857                        value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
858                    },
859                    api::v1::Value {
860                        value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
861                    },
862                ],
863            })
864            .collect();
865        let mutation = Mutation {
866            op_type: OpType::Put as i32,
867            sequence: 0,
868            rows: Some(Rows {
869                schema: kv_column_schemas(),
870                rows,
871            }),
872            write_hint: None,
873        };
874        KeyValues::new(metadata, mutation).unwrap()
875    }
876
877    fn collect_kvs(
878        iter: BoxedBatchIterator,
879        region_meta: &RegionMetadataRef,
880    ) -> HashMap<String, String> {
881        let decoder = DensePrimaryKeyCodec::new(region_meta);
882        let mut res = HashMap::new();
883        for v in iter {
884            let batch = v.unwrap();
885            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
886            let field_vector = batch.fields()[0]
887                .data
888                .as_any()
889                .downcast_ref::<StringVector>()
890                .unwrap();
891            for row in 0..batch.num_rows() {
892                res.insert(
893                    values[0].as_string().unwrap(),
894                    field_vector.get(row).as_string().unwrap(),
895                );
896            }
897        }
898        res
899    }
900
901    #[test]
902    fn test_reorder_insert_key_values() {
903        let metadata = kv_region_metadata();
904        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
905            .build(1, &metadata);
906
907        memtable
908            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
909            .unwrap();
910        memtable.freeze().unwrap();
911        assert_eq!(
912            collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
913            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
914        );
915        let forked = memtable.fork(2, &metadata);
916
917        let keys = ["c", "f", "i", "h", "b", "e", "g"];
918        forked.write(&key_values(&metadata, keys.iter())).unwrap();
919        forked.freeze().unwrap();
920        assert_eq!(
921            collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
922            keys.iter()
923                .map(|c| (c.to_string(), c.to_string()))
924                .collect()
925        );
926
927        let forked2 = forked.fork(3, &metadata);
928
929        let keys = ["g", "e", "a", "f", "b", "c", "h"];
930        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
931
932        let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
933        let expected = keys
934            .iter()
935            .map(|c| (c.to_string(), c.to_string()))
936            .collect::<HashMap<_, _>>();
937        assert_eq!(kvs, expected);
938    }
939}