mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use common_stat::get_total_memory_readable;
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use serde::{Deserialize, Serialize};
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, SequenceRange};
37use table::predicate::Predicate;
38
39use crate::error::{Result, UnsupportedOperationSnafu};
40use crate::flush::WriteBufferManagerRef;
41use crate::memtable::bulk::part::BulkPart;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
46    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
47    MemtableStats, RangesOptions,
48};
49use crate::region::options::MergeMode;
50
51/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
52pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56/// Id of a shard, only unique inside a partition.
57type ShardId = u32;
58/// Index of a primary key in a shard.
59type PkIndex = u16;
60
61/// Id of a primary key inside a tree.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64    shard_id: ShardId,
65    pk_index: PkIndex,
66}
67
68// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
69// memtable then we will use a lot memory. We should find a better way to control the
70// dictionary size.
71/// Config for the partition tree memtable.
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75    /// Max keys in an index shard.
76    pub index_max_keys_per_shard: usize,
77    /// Number of rows to freeze a data part.
78    pub data_freeze_threshold: usize,
79    /// Whether to delete duplicates rows.
80    ///
81    /// Skips deserializing as it should be determined by whether the
82    /// table is append only.
83    #[serde(skip_deserializing)]
84    pub dedup: bool,
85    /// Total bytes of dictionary to keep in fork.
86    pub fork_dictionary_bytes: ReadableSize,
87    /// Merge mode of the tree.
88    #[serde(skip_deserializing)]
89    pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93    fn default() -> Self {
94        let mut fork_dictionary_bytes = ReadableSize::mb(512);
95        if let Some(total_memory) = get_total_memory_readable() {
96            let adjust_dictionary_bytes =
97                std::cmp::min(total_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98            if adjust_dictionary_bytes.0 > 0 {
99                fork_dictionary_bytes = adjust_dictionary_bytes;
100            }
101        }
102
103        Self {
104            index_max_keys_per_shard: 8192,
105            data_freeze_threshold: 131072,
106            dedup: true,
107            fork_dictionary_bytes,
108            merge_mode: MergeMode::LastRow,
109        }
110    }
111}
112
113/// Memtable based on a partition tree.
114pub struct PartitionTreeMemtable {
115    id: MemtableId,
116    tree: Arc<PartitionTree>,
117    alloc_tracker: AllocTracker,
118    max_timestamp: AtomicI64,
119    min_timestamp: AtomicI64,
120    max_sequence: AtomicU64,
121    /// Total written rows in memtable. This also includes deleted and duplicated rows.
122    num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("PartitionTreeMemtable")
128            .field("id", &self.id)
129            .finish()
130    }
131}
132
133impl Memtable for PartitionTreeMemtable {
134    fn id(&self) -> MemtableId {
135        self.id
136    }
137
138    fn write(&self, kvs: &KeyValues) -> Result<()> {
139        if kvs.is_empty() {
140            return Ok(());
141        }
142
143        // TODO(yingwen): Validate schema while inserting rows.
144
145        let mut metrics = WriteMetrics::default();
146        let mut pk_buffer = Vec::new();
147        // Ensures the memtable always updates stats.
148        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150        if res.is_ok() {
151            metrics.max_sequence = kvs.max_sequence();
152            metrics.num_rows = kvs.num_rows();
153            self.update_stats(&metrics);
154        }
155        res
156    }
157
158    fn write_one(&self, key_value: KeyValue) -> Result<()> {
159        let mut metrics = WriteMetrics::default();
160        let mut pk_buffer = Vec::new();
161        // Ensures the memtable always updates stats.
162        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
163
164        // update max_sequence
165        if res.is_ok() {
166            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
167            metrics.num_rows = 1;
168            self.update_stats(&metrics);
169        }
170        res
171    }
172
173    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
174        UnsupportedOperationSnafu {
175            err_msg: "PartitionTreeMemtable does not support write_bulk",
176        }
177        .fail()
178    }
179
180    #[cfg(any(test, feature = "test"))]
181    fn iter(
182        &self,
183        projection: Option<&[ColumnId]>,
184        predicate: Option<Predicate>,
185        sequence: Option<SequenceRange>,
186    ) -> Result<BoxedBatchIterator> {
187        self.tree.read(projection, predicate, sequence, None)
188    }
189
190    fn ranges(
191        &self,
192        projection: Option<&[ColumnId]>,
193        options: RangesOptions,
194    ) -> Result<MemtableRanges> {
195        let predicate = options.predicate;
196        let sequence = options.sequence;
197        let projection = projection.map(|ids| ids.to_vec());
198        let builder = Box::new(PartitionTreeIterBuilder {
199            tree: self.tree.clone(),
200            projection,
201            predicate: predicate.predicate().cloned(),
202            sequence,
203        });
204        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
205
206        let stats = self.stats();
207        Ok(MemtableRanges {
208            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
209            stats,
210        })
211    }
212
213    fn is_empty(&self) -> bool {
214        self.tree.is_empty()
215    }
216
217    fn freeze(&self) -> Result<()> {
218        self.alloc_tracker.done_allocating();
219
220        self.tree.freeze()
221    }
222
223    fn stats(&self) -> MemtableStats {
224        let estimated_bytes = self.alloc_tracker.bytes_allocated();
225
226        if estimated_bytes == 0 {
227            // no rows ever written
228            return MemtableStats {
229                estimated_bytes,
230                time_range: None,
231                num_rows: 0,
232                num_ranges: 0,
233                max_sequence: 0,
234                series_count: 0,
235            };
236        }
237
238        let ts_type = self
239            .tree
240            .metadata
241            .time_index_column()
242            .column_schema
243            .data_type
244            .clone()
245            .as_timestamp()
246            .expect("Timestamp column must have timestamp type");
247        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
248        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
249        let series_count = self.tree.series_count();
250        MemtableStats {
251            estimated_bytes,
252            time_range: Some((min_timestamp, max_timestamp)),
253            num_rows: self.num_rows.load(Ordering::Relaxed),
254            num_ranges: 1,
255            max_sequence: self.max_sequence.load(Ordering::Relaxed),
256            series_count,
257        }
258    }
259
260    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
261        let tree = self.tree.fork(metadata.clone());
262
263        let memtable = PartitionTreeMemtable::with_tree(id, tree);
264        Arc::new(memtable)
265    }
266}
267
268impl PartitionTreeMemtable {
269    /// Returns a new memtable.
270    pub fn new(
271        id: MemtableId,
272        row_codec: Arc<dyn PrimaryKeyCodec>,
273        metadata: RegionMetadataRef,
274        write_buffer_manager: Option<WriteBufferManagerRef>,
275        config: &PartitionTreeConfig,
276    ) -> Self {
277        Self::with_tree(
278            id,
279            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
280        )
281    }
282
283    /// Creates a mutable memtable from the tree.
284    ///
285    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
286    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
287        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
288
289        Self {
290            id,
291            tree: Arc::new(tree),
292            alloc_tracker,
293            max_timestamp: AtomicI64::new(i64::MIN),
294            min_timestamp: AtomicI64::new(i64::MAX),
295            num_rows: AtomicUsize::new(0),
296            max_sequence: AtomicU64::new(0),
297        }
298    }
299
300    /// Updates stats of the memtable.
301    fn update_stats(&self, metrics: &WriteMetrics) {
302        // Only let the tracker tracks value bytes.
303        self.alloc_tracker.on_allocation(metrics.value_bytes);
304        self.max_timestamp
305            .fetch_max(metrics.max_ts, Ordering::SeqCst);
306        self.min_timestamp
307            .fetch_min(metrics.min_ts, Ordering::SeqCst);
308        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
309        self.max_sequence
310            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
311    }
312
313    #[cfg(any(test, feature = "test"))]
314    pub fn iter(
315        &self,
316        projection: Option<&[ColumnId]>,
317        predicate: Option<Predicate>,
318        sequence: Option<SequenceRange>,
319    ) -> Result<BoxedBatchIterator> {
320        self.tree.read(projection, predicate, sequence, None)
321    }
322}
323
324/// Builder to build a [PartitionTreeMemtable].
325#[derive(Debug, Default)]
326pub struct PartitionTreeMemtableBuilder {
327    config: PartitionTreeConfig,
328    write_buffer_manager: Option<WriteBufferManagerRef>,
329}
330
331impl PartitionTreeMemtableBuilder {
332    /// Creates a new builder with specific `write_buffer_manager`.
333    pub fn new(
334        config: PartitionTreeConfig,
335        write_buffer_manager: Option<WriteBufferManagerRef>,
336    ) -> Self {
337        Self {
338            config,
339            write_buffer_manager,
340        }
341    }
342}
343
344impl MemtableBuilder for PartitionTreeMemtableBuilder {
345    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
346        let codec = build_primary_key_codec(metadata);
347        Arc::new(PartitionTreeMemtable::new(
348            id,
349            codec,
350            metadata.clone(),
351            self.write_buffer_manager.clone(),
352            &self.config,
353        ))
354    }
355
356    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
357        false
358    }
359}
360
361struct PartitionTreeIterBuilder {
362    tree: Arc<PartitionTree>,
363    projection: Option<Vec<ColumnId>>,
364    predicate: Option<Predicate>,
365    sequence: Option<SequenceRange>,
366}
367
368impl IterBuilder for PartitionTreeIterBuilder {
369    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
370        self.tree.read(
371            self.projection.as_deref(),
372            self.predicate.clone(),
373            self.sequence,
374            metrics,
375        )
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::collections::HashMap;
382    use std::sync::Arc;
383
384    use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
385    use api::v1::value::ValueData;
386    use api::v1::{Mutation, OpType, Rows, SemanticType};
387    use common_query::prelude::{greptime_timestamp, greptime_value};
388    use common_time::Timestamp;
389    use datafusion_common::Column;
390    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
391    use datatypes::data_type::ConcreteDataType;
392    use datatypes::prelude::Vector;
393    use datatypes::scalars::ScalarVector;
394    use datatypes::schema::ColumnSchema;
395    use datatypes::value::Value;
396    use datatypes::vectors::{Int64Vector, StringVector};
397    use mito_codec::row_converter::DensePrimaryKeyCodec;
398    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
399    use store_api::storage::RegionId;
400
401    use super::*;
402    use crate::test_util::memtable_util::{
403        self, collect_iter_timestamps, region_metadata_to_row_schema,
404    };
405
406    #[test]
407    fn test_memtable_sorted_input() {
408        write_iter_sorted_input(true);
409        write_iter_sorted_input(false);
410    }
411
412    fn write_iter_sorted_input(has_pk: bool) {
413        let metadata = if has_pk {
414            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
415        } else {
416            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
417        };
418        let timestamps = (0..100).collect::<Vec<_>>();
419        let kvs =
420            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
421        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
422        let memtable = PartitionTreeMemtable::new(
423            1,
424            codec,
425            metadata.clone(),
426            None,
427            &PartitionTreeConfig::default(),
428        );
429        memtable.write(&kvs).unwrap();
430
431        let expected_ts = kvs
432            .iter()
433            .map(|kv| {
434                kv.timestamp()
435                    .try_into_timestamp()
436                    .unwrap()
437                    .unwrap()
438                    .value()
439            })
440            .collect::<Vec<_>>();
441
442        let iter = memtable.iter(None, None, None).unwrap();
443        let read = collect_iter_timestamps(iter);
444        assert_eq!(expected_ts, read);
445
446        let stats = memtable.stats();
447        assert!(stats.bytes_allocated() > 0);
448        assert_eq!(
449            Some((
450                Timestamp::new_millisecond(0),
451                Timestamp::new_millisecond(99)
452            )),
453            stats.time_range()
454        );
455    }
456
457    #[test]
458    fn test_memtable_unsorted_input() {
459        write_iter_unsorted_input(true);
460        write_iter_unsorted_input(false);
461    }
462
463    fn write_iter_unsorted_input(has_pk: bool) {
464        let metadata = if has_pk {
465            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
466        } else {
467            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
468        };
469        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
470        let memtable = PartitionTreeMemtable::new(
471            1,
472            codec,
473            metadata.clone(),
474            None,
475            &PartitionTreeConfig::default(),
476        );
477
478        let kvs = memtable_util::build_key_values(
479            &metadata,
480            "hello".to_string(),
481            0,
482            &[1, 3, 7, 5, 6],
483            0, // sequence 0, 1, 2, 3, 4
484        );
485        memtable.write(&kvs).unwrap();
486
487        let kvs = memtable_util::build_key_values(
488            &metadata,
489            "hello".to_string(),
490            0,
491            &[5, 2, 4, 0, 7],
492            5, // sequence 5, 6, 7, 8, 9
493        );
494        memtable.write(&kvs).unwrap();
495
496        let iter = memtable.iter(None, None, None).unwrap();
497        let read = collect_iter_timestamps(iter);
498        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
499
500        let iter = memtable.iter(None, None, None).unwrap();
501        let read = iter
502            .flat_map(|batch| {
503                batch
504                    .unwrap()
505                    .sequences()
506                    .iter_data()
507                    .collect::<Vec<_>>()
508                    .into_iter()
509            })
510            .map(|v| v.unwrap())
511            .collect::<Vec<_>>();
512        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
513
514        let stats = memtable.stats();
515        assert!(stats.bytes_allocated() > 0);
516        assert_eq!(
517            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
518            stats.time_range()
519        );
520    }
521
522    #[test]
523    fn test_memtable_projection() {
524        write_iter_projection(true);
525        write_iter_projection(false);
526    }
527
528    fn write_iter_projection(has_pk: bool) {
529        let metadata = if has_pk {
530            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
531        } else {
532            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
533        };
534        // Try to build a memtable via the builder.
535        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
536            .build(1, &metadata);
537
538        let expect = (0..100).collect::<Vec<_>>();
539        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
540        memtable.write(&kvs).unwrap();
541        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
542
543        let mut v0_all = vec![];
544        for res in iter {
545            let batch = res.unwrap();
546            assert_eq!(1, batch.fields().len());
547            let v0 = batch
548                .fields()
549                .first()
550                .unwrap()
551                .data
552                .as_any()
553                .downcast_ref::<Int64Vector>()
554                .unwrap();
555            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
556        }
557        assert_eq!(expect, v0_all);
558    }
559
560    #[test]
561    fn test_write_iter_multi_keys() {
562        write_iter_multi_keys(1, 100);
563        write_iter_multi_keys(2, 100);
564        write_iter_multi_keys(4, 100);
565        write_iter_multi_keys(8, 5);
566        write_iter_multi_keys(2, 10);
567    }
568
569    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
570        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
571        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
572        let memtable = PartitionTreeMemtable::new(
573            1,
574            codec,
575            metadata.clone(),
576            None,
577            &PartitionTreeConfig {
578                index_max_keys_per_shard: max_keys,
579                data_freeze_threshold: freeze_threshold,
580                ..Default::default()
581            },
582        );
583
584        let mut data = Vec::new();
585        // 4 partitions, each partition 4 pks.
586        for i in 0..4 {
587            for j in 0..4 {
588                // key: i, a{j}
589                let timestamps = [11, 13, 1, 5, 3, 7, 9];
590                let key = format!("a{j}");
591                let kvs =
592                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
593                memtable.write(&kvs).unwrap();
594                for ts in timestamps {
595                    data.push((i, key.clone(), ts));
596                }
597            }
598            for j in 0..4 {
599                // key: i, a{j}
600                let timestamps = [10, 2, 4, 8, 6];
601                let key = format!("a{j}");
602                let kvs =
603                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
604                memtable.write(&kvs).unwrap();
605                for ts in timestamps {
606                    data.push((i, key.clone(), ts));
607                }
608            }
609        }
610        data.sort_unstable();
611
612        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
613        let iter = memtable.iter(None, None, None).unwrap();
614        let read = collect_iter_timestamps(iter);
615        assert_eq!(expect, read);
616    }
617
618    #[test]
619    fn test_memtable_filter() {
620        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
621        // Try to build a memtable via the builder.
622        let memtable = PartitionTreeMemtableBuilder::new(
623            PartitionTreeConfig {
624                index_max_keys_per_shard: 40,
625                ..Default::default()
626            },
627            None,
628        )
629        .build(1, &metadata);
630
631        for i in 0..100 {
632            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
633            let kvs =
634                memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
635            memtable.write(&kvs).unwrap();
636        }
637
638        for i in 0..100 {
639            let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
640            let expr = Expr::BinaryExpr(BinaryExpr {
641                left: Box::new(Expr::Column(Column::from_name("k1"))),
642                op: Operator::Eq,
643                right: Box::new((i as u32).lit()),
644            });
645            let iter = memtable
646                .iter(None, Some(Predicate::new(vec![expr])), None)
647                .unwrap();
648            let read = collect_iter_timestamps(iter);
649            assert_eq!(timestamps, read);
650        }
651    }
652
653    #[test]
654    fn test_deserialize_config() {
655        let config = PartitionTreeConfig {
656            dedup: false,
657            ..Default::default()
658        };
659        // Creates a json with dedup = false.
660        let json = serde_json::to_string(&config).unwrap();
661        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
662        assert!(config.dedup);
663        assert_eq!(PartitionTreeConfig::default(), config);
664    }
665
666    fn metadata_for_metric_engine() -> RegionMetadataRef {
667        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
668        builder
669            .push_column_metadata(ColumnMetadata {
670                column_schema: ColumnSchema::new(
671                    "__table_id",
672                    ConcreteDataType::uint32_datatype(),
673                    false,
674                ),
675                semantic_type: SemanticType::Tag,
676                column_id: 2147483652,
677            })
678            .push_column_metadata(ColumnMetadata {
679                column_schema: ColumnSchema::new(
680                    "__tsid",
681                    ConcreteDataType::uint64_datatype(),
682                    false,
683                ),
684                semantic_type: SemanticType::Tag,
685                column_id: 2147483651,
686            })
687            .push_column_metadata(ColumnMetadata {
688                column_schema: ColumnSchema::new(
689                    "test_label",
690                    ConcreteDataType::string_datatype(),
691                    false,
692                ),
693                semantic_type: SemanticType::Tag,
694                column_id: 2,
695            })
696            .push_column_metadata(ColumnMetadata {
697                column_schema: ColumnSchema::new(
698                    greptime_timestamp(),
699                    ConcreteDataType::timestamp_millisecond_datatype(),
700                    false,
701                ),
702                semantic_type: SemanticType::Timestamp,
703                column_id: 0,
704            })
705            .push_column_metadata(ColumnMetadata {
706                column_schema: ColumnSchema::new(
707                    greptime_value(),
708                    ConcreteDataType::float64_datatype(),
709                    true,
710                ),
711                semantic_type: SemanticType::Field,
712                column_id: 1,
713            })
714            .primary_key(vec![2147483652, 2147483651, 2]);
715        let region_metadata = builder.build().unwrap();
716        Arc::new(region_metadata)
717    }
718
719    fn build_key_values(
720        metadata: RegionMetadataRef,
721        labels: &[&str],
722        table_id: &[u32],
723        ts_id: &[u64],
724        ts: &[i64],
725        values: &[f64],
726        sequence: u64,
727    ) -> KeyValues {
728        let column_schema = region_metadata_to_row_schema(&metadata);
729
730        let rows = ts
731            .iter()
732            .zip(table_id.iter())
733            .zip(ts_id.iter())
734            .zip(labels.iter())
735            .zip(values.iter())
736            .map(|((((ts, table_id), ts_id), label), val)| {
737                row(vec![
738                    ValueData::U32Value(*table_id),
739                    ValueData::U64Value(*ts_id),
740                    ValueData::StringValue(label.to_string()),
741                    ValueData::TimestampMillisecondValue(*ts),
742                    ValueData::F64Value(*val),
743                ])
744            })
745            .collect();
746        let mutation = api::v1::Mutation {
747            op_type: 1,
748            sequence,
749            rows: Some(Rows {
750                schema: column_schema,
751                rows,
752            }),
753            write_hint: None,
754        };
755        KeyValues::new(metadata.as_ref(), mutation).unwrap()
756    }
757
758    #[test]
759    fn test_write_freeze() {
760        let metadata = metadata_for_metric_engine();
761        let memtable = PartitionTreeMemtableBuilder::new(
762            PartitionTreeConfig {
763                index_max_keys_per_shard: 40,
764                ..Default::default()
765            },
766            None,
767        )
768        .build(1, &metadata);
769
770        let codec = DensePrimaryKeyCodec::new(&metadata);
771
772        memtable
773            .write(&build_key_values(
774                metadata.clone(),
775                &["daily", "10min", "daily", "10min"],
776                &[1025, 1025, 1025, 1025],
777                &[
778                    16442255374049317291,
779                    5686004715529701024,
780                    16442255374049317291,
781                    5686004715529701024,
782                ],
783                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
784                &[0.0, 0.0, 0.0, 0.0],
785                1,
786            ))
787            .unwrap();
788
789        memtable.freeze().unwrap();
790        let new_memtable = memtable.fork(2, &metadata);
791
792        new_memtable
793            .write(&build_key_values(
794                metadata.clone(),
795                &["10min"],
796                &[1025],
797                &[5686004715529701024],
798                &[1714643131000],
799                &[0.1],
800                2,
801            ))
802            .unwrap();
803
804        let mut reader = new_memtable.iter(None, None, None).unwrap();
805        let batch = reader.next().unwrap().unwrap();
806        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
807        if let Value::String(s) = &pk[2] {
808            assert_eq!("10min", s.as_utf8());
809        } else {
810            unreachable!()
811        }
812    }
813
814    fn kv_region_metadata() -> RegionMetadataRef {
815        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
816        builder
817            .push_column_metadata(ColumnMetadata {
818                column_schema: ColumnSchema::new(
819                    "ts",
820                    ConcreteDataType::timestamp_millisecond_datatype(),
821                    false,
822                ),
823                semantic_type: SemanticType::Timestamp,
824                column_id: 0,
825            })
826            .push_column_metadata(ColumnMetadata {
827                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
828                semantic_type: SemanticType::Tag,
829                column_id: 1,
830            })
831            .push_column_metadata(ColumnMetadata {
832                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
833                semantic_type: SemanticType::Field,
834                column_id: 2,
835            })
836            .primary_key(vec![1]);
837        let region_metadata = builder.build().unwrap();
838        Arc::new(region_metadata)
839    }
840
841    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
842        vec![
843            time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
844            tag_column_schema("k", api::v1::ColumnDataType::String),
845            field_column_schema("v", api::v1::ColumnDataType::String),
846        ]
847    }
848
849    fn key_values<T: AsRef<str>>(
850        metadata: &RegionMetadataRef,
851        keys: impl Iterator<Item = T>,
852    ) -> KeyValues {
853        let rows = keys
854            .map(|c| {
855                row(vec![
856                    ValueData::TimestampMillisecondValue(0),
857                    ValueData::StringValue(c.as_ref().to_string()),
858                    ValueData::StringValue(c.as_ref().to_string()),
859                ])
860            })
861            .collect();
862        let mutation = Mutation {
863            op_type: OpType::Put as i32,
864            sequence: 0,
865            rows: Some(Rows {
866                schema: kv_column_schemas(),
867                rows,
868            }),
869            write_hint: None,
870        };
871        KeyValues::new(metadata, mutation).unwrap()
872    }
873
874    fn collect_kvs(
875        iter: BoxedBatchIterator,
876        region_meta: &RegionMetadataRef,
877    ) -> HashMap<String, String> {
878        let decoder = DensePrimaryKeyCodec::new(region_meta);
879        let mut res = HashMap::new();
880        for v in iter {
881            let batch = v.unwrap();
882            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
883            let field_vector = batch.fields()[0]
884                .data
885                .as_any()
886                .downcast_ref::<StringVector>()
887                .unwrap();
888            for row in 0..batch.num_rows() {
889                res.insert(
890                    values[0].as_string().unwrap(),
891                    field_vector.get(row).as_string().unwrap(),
892                );
893            }
894        }
895        res
896    }
897
898    #[test]
899    fn test_reorder_insert_key_values() {
900        let metadata = kv_region_metadata();
901        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
902            .build(1, &metadata);
903
904        memtable
905            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
906            .unwrap();
907        memtable.freeze().unwrap();
908        assert_eq!(
909            collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
910            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
911        );
912        let forked = memtable.fork(2, &metadata);
913
914        let keys = ["c", "f", "i", "h", "b", "e", "g"];
915        forked.write(&key_values(&metadata, keys.iter())).unwrap();
916        forked.freeze().unwrap();
917        assert_eq!(
918            collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
919            keys.iter()
920                .map(|c| (c.to_string(), c.to_string()))
921                .collect()
922        );
923
924        let forked2 = forked.fork(3, &metadata);
925
926        let keys = ["g", "e", "a", "f", "b", "c", "h"];
927        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
928
929        let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
930        let expected = keys
931            .iter()
932            .map(|c| (c.to_string(), c.to_string()))
933            .collect::<HashMap<_, _>>();
934        assert_eq!(kvs, expected);
935    }
936}