mito2/memtable/
partition_tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation based on a partition tree.
16
17pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use common_stat::get_total_memory_readable;
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use serde::{Deserialize, Serialize};
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, SequenceRange};
37use table::predicate::Predicate;
38
39use crate::error::{Result, UnsupportedOperationSnafu};
40use crate::flush::WriteBufferManagerRef;
41use crate::memtable::bulk::part::BulkPart;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45    AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
46    MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
47    MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
48};
49use crate::region::options::MergeMode;
50
51/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
52pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56/// Id of a shard, only unique inside a partition.
57type ShardId = u32;
58/// Index of a primary key in a shard.
59type PkIndex = u16;
60
61/// Id of a primary key inside a tree.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64    shard_id: ShardId,
65    pk_index: PkIndex,
66}
67
68// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
69// memtable then we will use a lot memory. We should find a better way to control the
70// dictionary size.
71/// Config for the partition tree memtable.
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75    /// Max keys in an index shard.
76    pub index_max_keys_per_shard: usize,
77    /// Number of rows to freeze a data part.
78    pub data_freeze_threshold: usize,
79    /// Whether to delete duplicates rows.
80    ///
81    /// Skips deserializing as it should be determined by whether the
82    /// table is append only.
83    #[serde(skip_deserializing)]
84    pub dedup: bool,
85    /// Total bytes of dictionary to keep in fork.
86    pub fork_dictionary_bytes: ReadableSize,
87    /// Merge mode of the tree.
88    #[serde(skip_deserializing)]
89    pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93    fn default() -> Self {
94        let mut fork_dictionary_bytes = ReadableSize::mb(512);
95        if let Some(total_memory) = get_total_memory_readable() {
96            let adjust_dictionary_bytes =
97                std::cmp::min(total_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98            if adjust_dictionary_bytes.0 > 0 {
99                fork_dictionary_bytes = adjust_dictionary_bytes;
100            }
101        }
102
103        Self {
104            index_max_keys_per_shard: 8192,
105            data_freeze_threshold: 131072,
106            dedup: true,
107            fork_dictionary_bytes,
108            merge_mode: MergeMode::LastRow,
109        }
110    }
111}
112
113/// Memtable based on a partition tree.
114pub struct PartitionTreeMemtable {
115    id: MemtableId,
116    tree: Arc<PartitionTree>,
117    alloc_tracker: AllocTracker,
118    max_timestamp: AtomicI64,
119    min_timestamp: AtomicI64,
120    max_sequence: AtomicU64,
121    /// Total written rows in memtable. This also includes deleted and duplicated rows.
122    num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("PartitionTreeMemtable")
128            .field("id", &self.id)
129            .finish()
130    }
131}
132
133impl Memtable for PartitionTreeMemtable {
134    fn id(&self) -> MemtableId {
135        self.id
136    }
137
138    fn write(&self, kvs: &KeyValues) -> Result<()> {
139        if kvs.is_empty() {
140            return Ok(());
141        }
142
143        // TODO(yingwen): Validate schema while inserting rows.
144
145        let mut metrics = WriteMetrics::default();
146        let mut pk_buffer = Vec::new();
147        // Ensures the memtable always updates stats.
148        let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150        if res.is_ok() {
151            metrics.max_sequence = kvs.max_sequence();
152            metrics.num_rows = kvs.num_rows();
153            self.update_stats(&metrics);
154        }
155        res
156    }
157
158    fn write_one(&self, key_value: KeyValue) -> Result<()> {
159        let mut metrics = WriteMetrics::default();
160        let mut pk_buffer = Vec::new();
161        // Ensures the memtable always updates stats.
162        let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
163
164        // update max_sequence
165        if res.is_ok() {
166            metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
167            metrics.num_rows = 1;
168            self.update_stats(&metrics);
169        }
170        res
171    }
172
173    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
174        UnsupportedOperationSnafu {
175            err_msg: "PartitionTreeMemtable does not support write_bulk",
176        }
177        .fail()
178    }
179
180    fn ranges(
181        &self,
182        projection: Option<&[ColumnId]>,
183        options: RangesOptions,
184    ) -> Result<MemtableRanges> {
185        let predicate = options.predicate;
186        let sequence = options.sequence;
187        let read_column_ids = read_column_ids_from_projection(&self.tree.metadata, projection);
188        let projection = projection.map(|ids| ids.to_vec());
189        let builder = Box::new(PartitionTreeIterBuilder {
190            tree: self.tree.clone(),
191            projection,
192            predicate: predicate.predicate().cloned(),
193            sequence,
194        });
195        let adapter_context = Arc::new(BatchToRecordBatchContext::new(
196            self.tree.metadata.clone(),
197            read_column_ids,
198        ));
199        let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
200            self.id,
201            builder,
202            predicate,
203            Some(adapter_context),
204        ));
205
206        let range_stats = self.stats();
207        let range = MemtableRange::new(context, range_stats);
208        Ok(MemtableRanges {
209            ranges: [(0, range)].into(),
210        })
211    }
212
213    fn is_empty(&self) -> bool {
214        self.tree.is_empty()
215    }
216
217    fn freeze(&self) -> Result<()> {
218        self.alloc_tracker.done_allocating();
219
220        self.tree.freeze()
221    }
222
223    fn stats(&self) -> MemtableStats {
224        let estimated_bytes = self.alloc_tracker.bytes_allocated();
225
226        if estimated_bytes == 0 {
227            // no rows ever written
228            return MemtableStats {
229                estimated_bytes,
230                time_range: None,
231                num_rows: 0,
232                num_ranges: 0,
233                max_sequence: 0,
234                series_count: 0,
235            };
236        }
237
238        let ts_type = self
239            .tree
240            .metadata
241            .time_index_column()
242            .column_schema
243            .data_type
244            .clone()
245            .as_timestamp()
246            .expect("Timestamp column must have timestamp type");
247        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
248        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
249        let series_count = self.tree.series_count();
250        MemtableStats {
251            estimated_bytes,
252            time_range: Some((min_timestamp, max_timestamp)),
253            num_rows: self.num_rows.load(Ordering::Relaxed),
254            num_ranges: 1,
255            max_sequence: self.max_sequence.load(Ordering::Relaxed),
256            series_count,
257        }
258    }
259
260    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
261        let tree = self.tree.fork(metadata.clone());
262
263        let memtable = PartitionTreeMemtable::with_tree(id, tree);
264        Arc::new(memtable)
265    }
266}
267
268impl PartitionTreeMemtable {
269    /// Returns a new memtable.
270    pub fn new(
271        id: MemtableId,
272        row_codec: Arc<dyn PrimaryKeyCodec>,
273        metadata: RegionMetadataRef,
274        write_buffer_manager: Option<WriteBufferManagerRef>,
275        config: &PartitionTreeConfig,
276    ) -> Self {
277        Self::with_tree(
278            id,
279            PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
280        )
281    }
282
283    /// Creates a mutable memtable from the tree.
284    ///
285    /// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
286    fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
287        let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
288
289        Self {
290            id,
291            tree: Arc::new(tree),
292            alloc_tracker,
293            max_timestamp: AtomicI64::new(i64::MIN),
294            min_timestamp: AtomicI64::new(i64::MAX),
295            num_rows: AtomicUsize::new(0),
296            max_sequence: AtomicU64::new(0),
297        }
298    }
299
300    /// Updates stats of the memtable.
301    fn update_stats(&self, metrics: &WriteMetrics) {
302        // Only let the tracker tracks value bytes.
303        self.alloc_tracker.on_allocation(metrics.value_bytes);
304        self.max_timestamp
305            .fetch_max(metrics.max_ts, Ordering::SeqCst);
306        self.min_timestamp
307            .fetch_min(metrics.min_ts, Ordering::SeqCst);
308        self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
309        self.max_sequence
310            .fetch_max(metrics.max_sequence, Ordering::SeqCst);
311    }
312
313    #[cfg(any(test, feature = "test"))]
314    pub fn iter(
315        &self,
316        projection: Option<&[ColumnId]>,
317        predicate: Option<Predicate>,
318        sequence: Option<SequenceRange>,
319    ) -> Result<BoxedBatchIterator> {
320        self.tree.read(projection, predicate, sequence, None)
321    }
322}
323
324/// Builder to build a [PartitionTreeMemtable].
325#[derive(Debug, Default)]
326pub struct PartitionTreeMemtableBuilder {
327    config: PartitionTreeConfig,
328    write_buffer_manager: Option<WriteBufferManagerRef>,
329}
330
331impl PartitionTreeMemtableBuilder {
332    /// Creates a new builder with specific `write_buffer_manager`.
333    pub fn new(
334        config: PartitionTreeConfig,
335        write_buffer_manager: Option<WriteBufferManagerRef>,
336    ) -> Self {
337        Self {
338            config,
339            write_buffer_manager,
340        }
341    }
342}
343
344impl MemtableBuilder for PartitionTreeMemtableBuilder {
345    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
346        let codec = build_primary_key_codec(metadata);
347        Arc::new(PartitionTreeMemtable::new(
348            id,
349            codec,
350            metadata.clone(),
351            self.write_buffer_manager.clone(),
352            &self.config,
353        ))
354    }
355
356    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
357        false
358    }
359}
360
361struct PartitionTreeIterBuilder {
362    tree: Arc<PartitionTree>,
363    projection: Option<Vec<ColumnId>>,
364    predicate: Option<Predicate>,
365    sequence: Option<SequenceRange>,
366}
367
368impl IterBuilder for PartitionTreeIterBuilder {
369    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
370        self.tree.read(
371            self.projection.as_deref(),
372            self.predicate.clone(),
373            self.sequence,
374            metrics,
375        )
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::collections::HashMap;
382    use std::sync::Arc;
383
384    use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
385    use api::v1::value::ValueData;
386    use api::v1::{Mutation, OpType, Rows, SemanticType};
387    use common_query::prelude::{greptime_timestamp, greptime_value};
388    use common_time::Timestamp;
389    use datatypes::data_type::ConcreteDataType;
390    use datatypes::prelude::Vector;
391    use datatypes::scalars::ScalarVector;
392    use datatypes::schema::ColumnSchema;
393    use datatypes::value::Value;
394    use datatypes::vectors::{Int64Vector, StringVector};
395    use mito_codec::row_converter::DensePrimaryKeyCodec;
396    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
397    use store_api::storage::RegionId;
398
399    use super::*;
400    use crate::test_util::memtable_util::{
401        self, collect_iter_timestamps, region_metadata_to_row_schema,
402    };
403
404    #[test]
405    fn test_memtable_sorted_input() {
406        write_iter_sorted_input(true);
407        write_iter_sorted_input(false);
408    }
409
410    fn write_iter_sorted_input(has_pk: bool) {
411        let metadata = if has_pk {
412            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
413        } else {
414            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
415        };
416        let timestamps = (0..100).collect::<Vec<_>>();
417        let kvs =
418            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
419        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
420        let memtable = PartitionTreeMemtable::new(
421            1,
422            codec,
423            metadata.clone(),
424            None,
425            &PartitionTreeConfig::default(),
426        );
427        memtable.write(&kvs).unwrap();
428
429        let expected_ts = kvs
430            .iter()
431            .map(|kv| {
432                kv.timestamp()
433                    .try_into_timestamp()
434                    .unwrap()
435                    .unwrap()
436                    .value()
437            })
438            .collect::<Vec<_>>();
439
440        let iter = memtable.iter(None, None, None).unwrap();
441        let read = collect_iter_timestamps(iter);
442        assert_eq!(expected_ts, read);
443
444        let stats = memtable.stats();
445        assert!(stats.bytes_allocated() > 0);
446        assert_eq!(
447            Some((
448                Timestamp::new_millisecond(0),
449                Timestamp::new_millisecond(99)
450            )),
451            stats.time_range()
452        );
453    }
454
455    #[test]
456    fn test_memtable_unsorted_input() {
457        write_iter_unsorted_input(true);
458        write_iter_unsorted_input(false);
459    }
460
461    fn write_iter_unsorted_input(has_pk: bool) {
462        let metadata = if has_pk {
463            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
464        } else {
465            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
466        };
467        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
468        let memtable = PartitionTreeMemtable::new(
469            1,
470            codec,
471            metadata.clone(),
472            None,
473            &PartitionTreeConfig::default(),
474        );
475
476        let kvs = memtable_util::build_key_values(
477            &metadata,
478            "hello".to_string(),
479            0,
480            &[1, 3, 7, 5, 6],
481            0, // sequence 0, 1, 2, 3, 4
482        );
483        memtable.write(&kvs).unwrap();
484
485        let kvs = memtable_util::build_key_values(
486            &metadata,
487            "hello".to_string(),
488            0,
489            &[5, 2, 4, 0, 7],
490            5, // sequence 5, 6, 7, 8, 9
491        );
492        memtable.write(&kvs).unwrap();
493
494        let iter = memtable.iter(None, None, None).unwrap();
495        let read = collect_iter_timestamps(iter);
496        assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
497
498        let iter = memtable.iter(None, None, None).unwrap();
499        let read = iter
500            .flat_map(|batch| {
501                batch
502                    .unwrap()
503                    .sequences()
504                    .iter_data()
505                    .collect::<Vec<_>>()
506                    .into_iter()
507            })
508            .map(|v| v.unwrap())
509            .collect::<Vec<_>>();
510        assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
511
512        let stats = memtable.stats();
513        assert!(stats.bytes_allocated() > 0);
514        assert_eq!(
515            Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
516            stats.time_range()
517        );
518    }
519
520    #[test]
521    fn test_memtable_projection() {
522        write_iter_projection(true);
523        write_iter_projection(false);
524    }
525
526    fn write_iter_projection(has_pk: bool) {
527        let metadata = if has_pk {
528            Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
529        } else {
530            Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
531        };
532        // Try to build a memtable via the builder.
533        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
534            .build(1, &metadata);
535
536        let expect = (0..100).collect::<Vec<_>>();
537        let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
538        memtable.write(&kvs).unwrap();
539        let ranges = memtable
540            .ranges(Some(&[3]), RangesOptions::default())
541            .unwrap();
542        let iter = ranges.build(None).unwrap();
543
544        let mut v0_all = vec![];
545        for res in iter {
546            let batch = res.unwrap();
547            assert_eq!(1, batch.fields().len());
548            let v0 = batch
549                .fields()
550                .first()
551                .unwrap()
552                .data
553                .as_any()
554                .downcast_ref::<Int64Vector>()
555                .unwrap();
556            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
557        }
558        assert_eq!(expect, v0_all);
559    }
560
561    #[test]
562    fn test_write_iter_multi_keys() {
563        write_iter_multi_keys(1, 100);
564        write_iter_multi_keys(2, 100);
565        write_iter_multi_keys(4, 100);
566        write_iter_multi_keys(8, 5);
567        write_iter_multi_keys(2, 10);
568    }
569
570    fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
571        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
572        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
573        let memtable = PartitionTreeMemtable::new(
574            1,
575            codec,
576            metadata.clone(),
577            None,
578            &PartitionTreeConfig {
579                index_max_keys_per_shard: max_keys,
580                data_freeze_threshold: freeze_threshold,
581                ..Default::default()
582            },
583        );
584
585        let mut data = Vec::new();
586        // 4 partitions, each partition 4 pks.
587        for i in 0..4 {
588            for j in 0..4 {
589                // key: i, a{j}
590                let timestamps = [11, 13, 1, 5, 3, 7, 9];
591                let key = format!("a{j}");
592                let kvs =
593                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
594                memtable.write(&kvs).unwrap();
595                for ts in timestamps {
596                    data.push((i, key.clone(), ts));
597                }
598            }
599            for j in 0..4 {
600                // key: i, a{j}
601                let timestamps = [10, 2, 4, 8, 6];
602                let key = format!("a{j}");
603                let kvs =
604                    memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
605                memtable.write(&kvs).unwrap();
606                for ts in timestamps {
607                    data.push((i, key.clone(), ts));
608                }
609            }
610        }
611        data.sort_unstable();
612
613        let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
614        let iter = memtable.iter(None, None, None).unwrap();
615        let read = collect_iter_timestamps(iter);
616        assert_eq!(expect, read);
617    }
618
619    #[test]
620    fn test_deserialize_config() {
621        let config = PartitionTreeConfig {
622            dedup: false,
623            ..Default::default()
624        };
625        // Creates a json with dedup = false.
626        let json = serde_json::to_string(&config).unwrap();
627        let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
628        assert!(config.dedup);
629        assert_eq!(PartitionTreeConfig::default(), config);
630    }
631
632    fn metadata_for_metric_engine() -> RegionMetadataRef {
633        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
634        builder
635            .push_column_metadata(ColumnMetadata {
636                column_schema: ColumnSchema::new(
637                    "__table_id",
638                    ConcreteDataType::uint32_datatype(),
639                    false,
640                ),
641                semantic_type: SemanticType::Tag,
642                column_id: 2147483652,
643            })
644            .push_column_metadata(ColumnMetadata {
645                column_schema: ColumnSchema::new(
646                    "__tsid",
647                    ConcreteDataType::uint64_datatype(),
648                    false,
649                ),
650                semantic_type: SemanticType::Tag,
651                column_id: 2147483651,
652            })
653            .push_column_metadata(ColumnMetadata {
654                column_schema: ColumnSchema::new(
655                    "test_label",
656                    ConcreteDataType::string_datatype(),
657                    false,
658                ),
659                semantic_type: SemanticType::Tag,
660                column_id: 2,
661            })
662            .push_column_metadata(ColumnMetadata {
663                column_schema: ColumnSchema::new(
664                    greptime_timestamp(),
665                    ConcreteDataType::timestamp_millisecond_datatype(),
666                    false,
667                ),
668                semantic_type: SemanticType::Timestamp,
669                column_id: 0,
670            })
671            .push_column_metadata(ColumnMetadata {
672                column_schema: ColumnSchema::new(
673                    greptime_value(),
674                    ConcreteDataType::float64_datatype(),
675                    true,
676                ),
677                semantic_type: SemanticType::Field,
678                column_id: 1,
679            })
680            .primary_key(vec![2147483652, 2147483651, 2]);
681        let region_metadata = builder.build().unwrap();
682        Arc::new(region_metadata)
683    }
684
685    fn build_key_values(
686        metadata: RegionMetadataRef,
687        labels: &[&str],
688        table_id: &[u32],
689        ts_id: &[u64],
690        ts: &[i64],
691        values: &[f64],
692        sequence: u64,
693    ) -> KeyValues {
694        let column_schema = region_metadata_to_row_schema(&metadata);
695
696        let rows = ts
697            .iter()
698            .zip(table_id.iter())
699            .zip(ts_id.iter())
700            .zip(labels.iter())
701            .zip(values.iter())
702            .map(|((((ts, table_id), ts_id), label), val)| {
703                row(vec![
704                    ValueData::U32Value(*table_id),
705                    ValueData::U64Value(*ts_id),
706                    ValueData::StringValue(label.to_string()),
707                    ValueData::TimestampMillisecondValue(*ts),
708                    ValueData::F64Value(*val),
709                ])
710            })
711            .collect();
712        let mutation = api::v1::Mutation {
713            op_type: 1,
714            sequence,
715            rows: Some(Rows {
716                schema: column_schema,
717                rows,
718            }),
719            write_hint: None,
720        };
721        KeyValues::new(metadata.as_ref(), mutation).unwrap()
722    }
723
724    #[test]
725    fn test_write_freeze() {
726        let metadata = metadata_for_metric_engine();
727        let memtable = PartitionTreeMemtableBuilder::new(
728            PartitionTreeConfig {
729                index_max_keys_per_shard: 40,
730                ..Default::default()
731            },
732            None,
733        )
734        .build(1, &metadata);
735
736        let codec = DensePrimaryKeyCodec::new(&metadata);
737
738        memtable
739            .write(&build_key_values(
740                metadata.clone(),
741                &["daily", "10min", "daily", "10min"],
742                &[1025, 1025, 1025, 1025],
743                &[
744                    16442255374049317291,
745                    5686004715529701024,
746                    16442255374049317291,
747                    5686004715529701024,
748                ],
749                &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
750                &[0.0, 0.0, 0.0, 0.0],
751                1,
752            ))
753            .unwrap();
754
755        memtable.freeze().unwrap();
756        let new_memtable = memtable.fork(2, &metadata);
757
758        new_memtable
759            .write(&build_key_values(
760                metadata.clone(),
761                &["10min"],
762                &[1025],
763                &[5686004715529701024],
764                &[1714643131000],
765                &[0.1],
766                2,
767            ))
768            .unwrap();
769
770        let mut reader = new_memtable
771            .ranges(None, RangesOptions::default())
772            .unwrap()
773            .build(None)
774            .unwrap();
775        let batch = reader.next().unwrap().unwrap();
776        let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
777        if let Value::String(s) = &pk[2] {
778            assert_eq!("10min", s.as_utf8());
779        } else {
780            unreachable!()
781        }
782    }
783
784    fn kv_region_metadata() -> RegionMetadataRef {
785        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
786        builder
787            .push_column_metadata(ColumnMetadata {
788                column_schema: ColumnSchema::new(
789                    "ts",
790                    ConcreteDataType::timestamp_millisecond_datatype(),
791                    false,
792                ),
793                semantic_type: SemanticType::Timestamp,
794                column_id: 0,
795            })
796            .push_column_metadata(ColumnMetadata {
797                column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
798                semantic_type: SemanticType::Tag,
799                column_id: 1,
800            })
801            .push_column_metadata(ColumnMetadata {
802                column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
803                semantic_type: SemanticType::Field,
804                column_id: 2,
805            })
806            .primary_key(vec![1]);
807        let region_metadata = builder.build().unwrap();
808        Arc::new(region_metadata)
809    }
810
811    fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
812        vec![
813            time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
814            tag_column_schema("k", api::v1::ColumnDataType::String),
815            field_column_schema("v", api::v1::ColumnDataType::String),
816        ]
817    }
818
819    fn key_values<T: AsRef<str>>(
820        metadata: &RegionMetadataRef,
821        keys: impl Iterator<Item = T>,
822    ) -> KeyValues {
823        let rows = keys
824            .map(|c| {
825                row(vec![
826                    ValueData::TimestampMillisecondValue(0),
827                    ValueData::StringValue(c.as_ref().to_string()),
828                    ValueData::StringValue(c.as_ref().to_string()),
829                ])
830            })
831            .collect();
832        let mutation = Mutation {
833            op_type: OpType::Put as i32,
834            sequence: 0,
835            rows: Some(Rows {
836                schema: kv_column_schemas(),
837                rows,
838            }),
839            write_hint: None,
840        };
841        KeyValues::new(metadata, mutation).unwrap()
842    }
843
844    fn collect_kvs(
845        iter: BoxedBatchIterator,
846        region_meta: &RegionMetadataRef,
847    ) -> HashMap<String, String> {
848        let decoder = DensePrimaryKeyCodec::new(region_meta);
849        let mut res = HashMap::new();
850        for v in iter {
851            let batch = v.unwrap();
852            let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
853            let field_vector = batch.fields()[0]
854                .data
855                .as_any()
856                .downcast_ref::<StringVector>()
857                .unwrap();
858            for row in 0..batch.num_rows() {
859                res.insert(
860                    values[0].as_string().unwrap(),
861                    field_vector.get(row).as_string().unwrap(),
862                );
863            }
864        }
865        res
866    }
867
868    #[test]
869    fn test_reorder_insert_key_values() {
870        let metadata = kv_region_metadata();
871        let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
872            .build(1, &metadata);
873
874        memtable
875            .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
876            .unwrap();
877        memtable.freeze().unwrap();
878        assert_eq!(
879            collect_kvs(
880                memtable
881                    .ranges(None, RangesOptions::default())
882                    .unwrap()
883                    .build(None)
884                    .unwrap(),
885                &metadata
886            ),
887            ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
888        );
889        let forked = memtable.fork(2, &metadata);
890
891        let keys = ["c", "f", "i", "h", "b", "e", "g"];
892        forked.write(&key_values(&metadata, keys.iter())).unwrap();
893        forked.freeze().unwrap();
894        assert_eq!(
895            collect_kvs(
896                forked
897                    .ranges(None, RangesOptions::default())
898                    .unwrap()
899                    .build(None)
900                    .unwrap(),
901                &metadata
902            ),
903            keys.iter()
904                .map(|c| (c.to_string(), c.to_string()))
905                .collect()
906        );
907
908        let forked2 = forked.fork(3, &metadata);
909
910        let keys = ["g", "e", "a", "f", "b", "c", "h"];
911        forked2.write(&key_values(&metadata, keys.iter())).unwrap();
912
913        let kvs = collect_kvs(
914            forked2
915                .ranges(None, RangesOptions::default())
916                .unwrap()
917                .build(None)
918                .unwrap(),
919            &metadata,
920        );
921        let expected = keys
922            .iter()
923            .map(|c| (c.to_string(), c.to_string()))
924            .collect::<HashMap<_, _>>();
925        assert_eq!(kvs, expected);
926    }
927
928    #[test]
929    fn test_build_record_batch_iter_from_memtable() {
930        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
931        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
932        let memtable = PartitionTreeMemtable::new(
933            1,
934            codec,
935            metadata.clone(),
936            None,
937            &PartitionTreeConfig::default(),
938        );
939
940        let kvs =
941            memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &[1, 2, 3], 0);
942        memtable.write(&kvs).unwrap();
943
944        let read_column_ids: Vec<ColumnId> = metadata
945            .column_metadatas
946            .iter()
947            .map(|c| c.column_id)
948            .collect();
949        let ranges = memtable
950            .ranges(Some(&read_column_ids), RangesOptions::default())
951            .unwrap();
952        assert!(!ranges.ranges.is_empty());
953
954        let mut total_rows = 0;
955        for range in ranges.ranges.into_values() {
956            let mut iter = range.build_record_batch_iter(None, None).unwrap();
957            while let Some(rb) = iter.next().transpose().unwrap() {
958                total_rows += rb.num_rows();
959                let schema = rb.schema();
960                let column_names: Vec<_> =
961                    schema.fields().iter().map(|f| f.name().as_str()).collect();
962                assert_eq!(
963                    column_names,
964                    vec![
965                        "__table_id",
966                        "k0",
967                        "v0",
968                        "v1",
969                        "ts",
970                        "__primary_key",
971                        "__sequence",
972                        "__op_type",
973                    ]
974                );
975            }
976        }
977        assert_eq!(3, total_rows);
978    }
979
980    #[test]
981    fn test_build_record_batch_iter_with_time_range() {
982        let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
983        let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
984        let memtable = PartitionTreeMemtable::new(
985            1,
986            codec,
987            metadata.clone(),
988            None,
989            &PartitionTreeConfig::default(),
990        );
991
992        let kvs = memtable_util::build_key_values(
993            &metadata,
994            "hello".to_string(),
995            42,
996            &[1, 2, 3, 4, 5],
997            0,
998        );
999        memtable.write(&kvs).unwrap();
1000
1001        let read_column_ids: Vec<ColumnId> = metadata
1002            .column_metadatas
1003            .iter()
1004            .map(|c| c.column_id)
1005            .collect();
1006        let ranges = memtable
1007            .ranges(Some(&read_column_ids), RangesOptions::default())
1008            .unwrap();
1009        assert!(!ranges.ranges.is_empty());
1010
1011        let time_range = (Timestamp::new_millisecond(2), Timestamp::new_millisecond(4));
1012
1013        let mut total_rows = 0;
1014        let mut all_timestamps = Vec::new();
1015        for range in ranges.ranges.into_values() {
1016            let mut iter = range
1017                .build_record_batch_iter(Some(time_range), None)
1018                .unwrap();
1019            while let Some(rb) = iter.next().transpose().unwrap() {
1020                total_rows += rb.num_rows();
1021                // ts column is at index 4 (after __table_id, k0, v0, v1)
1022                let ts_col = rb
1023                    .column_by_name("ts")
1024                    .unwrap()
1025                    .as_any()
1026                    .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
1027                    .unwrap();
1028                for i in 0..ts_col.len() {
1029                    all_timestamps.push(ts_col.value(i));
1030                }
1031            }
1032        }
1033        assert_eq!(3, total_rows);
1034        all_timestamps.sort();
1035        assert_eq!(vec![2, 3, 4], all_timestamps);
1036    }
1037}