mito2/memtable/partition_tree/
tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of the partition tree.
16
17use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use memcomparable::Serializer;
27use serde::Serialize;
28use snafu::{ensure, ResultExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32use table::predicate::Predicate;
33
34use crate::error::{
35    EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
36};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::key_values::KeyValue;
39use crate::memtable::partition_tree::partition::{
40    Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
41};
42use crate::memtable::partition_tree::PartitionTreeConfig;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{BoxedBatchIterator, KeyValues};
45use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
46use crate::read::dedup::LastNonNullIter;
47use crate::read::Batch;
48use crate::region::options::MergeMode;
49use crate::row_converter::{PrimaryKeyCodec, SortField};
50
51/// The partition tree.
52pub struct PartitionTree {
53    /// Config of the tree.
54    config: PartitionTreeConfig,
55    /// Metadata of the region.
56    pub(crate) metadata: RegionMetadataRef,
57    /// Primary key codec.
58    row_codec: Arc<dyn PrimaryKeyCodec>,
59    /// Partitions in the tree.
60    partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61    /// Whether the tree has multiple partitions.
62    is_partitioned: bool,
63    /// Manager to report size of the tree.
64    write_buffer_manager: Option<WriteBufferManagerRef>,
65    sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69    /// Creates a new partition tree.
70    pub fn new(
71        row_codec: Arc<dyn PrimaryKeyCodec>,
72        metadata: RegionMetadataRef,
73        config: &PartitionTreeConfig,
74        write_buffer_manager: Option<WriteBufferManagerRef>,
75    ) -> Self {
76        let sparse_encoder = SparseEncoder {
77            fields: metadata
78                .primary_key_columns()
79                .map(|c| FieldWithId {
80                    field: SortField::new(c.column_schema.data_type.clone()),
81                    column_id: c.column_id,
82                })
83                .collect(),
84        };
85        let is_partitioned = Partition::has_multi_partitions(&metadata);
86        let mut config = config.clone();
87        if config.merge_mode == MergeMode::LastNonNull {
88            config.dedup = false;
89        }
90
91        PartitionTree {
92            config,
93            metadata,
94            row_codec,
95            partitions: Default::default(),
96            is_partitioned,
97            write_buffer_manager,
98            sparse_encoder: Arc::new(sparse_encoder),
99        }
100    }
101
102    fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103        // The sparse primary key codec does not have a fixed number of fields.
104        if let Some(expected_num_fields) = self.row_codec.num_fields() {
105            ensure!(
106                expected_num_fields == kv.num_primary_keys(),
107                PrimaryKeyLengthMismatchSnafu {
108                    expect: expected_num_fields,
109                    actual: kv.num_primary_keys(),
110                }
111            );
112        }
113        // TODO(weny): verify the primary key length for sparse primary key codec.
114        Ok(())
115    }
116
117    /// Encodes the given key value into a sparse primary key.
118    fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119        if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120            // If the primary key encoding is sparse and already encoded in the metric engine,
121            // we only need to copy the encoded primary key into the destination buffer.
122            let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123                return EncodeSparsePrimaryKeySnafu {
124                    reason: "sparse primary key is not binary".to_string(),
125                }
126                .fail();
127            };
128            buffer.extend_from_slice(primary_key);
129        } else {
130            // For compatibility, use the sparse encoder for dense primary key.
131            self.sparse_encoder
132                .encode_to_vec(kv.primary_keys(), buffer)?;
133        }
134        Ok(())
135    }
136
137    // TODO(yingwen): The size computed from values is inaccurate.
138    /// Write key-values into the tree.
139    ///
140    /// # Panics
141    /// Panics if the tree is immutable (frozen).
142    pub fn write(
143        &self,
144        kvs: &KeyValues,
145        pk_buffer: &mut Vec<u8>,
146        metrics: &mut WriteMetrics,
147    ) -> Result<()> {
148        let has_pk = !self.metadata.primary_key.is_empty();
149
150        for kv in kvs.iter() {
151            self.verify_primary_key_length(&kv)?;
152            // Safety: timestamp of kv must be both present and a valid timestamp value.
153            let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
154            metrics.min_ts = metrics.min_ts.min(ts);
155            metrics.max_ts = metrics.max_ts.max(ts);
156            metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
157
158            if !has_pk {
159                // No primary key.
160                self.write_no_key(kv)?;
161                continue;
162            }
163
164            // Encode primary key.
165            pk_buffer.clear();
166            if self.is_partitioned {
167                self.encode_sparse_primary_key(&kv, pk_buffer)?;
168            } else {
169                self.row_codec.encode_key_value(&kv, pk_buffer)?;
170            }
171
172            // Write rows with
173            self.write_with_key(pk_buffer, kv, metrics)?;
174        }
175
176        metrics.value_bytes +=
177            kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
178
179        Ok(())
180    }
181
182    /// Write one key value pair into the tree.
183    ///
184    /// # Panics
185    /// Panics if the tree is immutable (frozen).
186    pub fn write_one(
187        &self,
188        kv: KeyValue,
189        pk_buffer: &mut Vec<u8>,
190        metrics: &mut WriteMetrics,
191    ) -> Result<()> {
192        let has_pk = !self.metadata.primary_key.is_empty();
193
194        self.verify_primary_key_length(&kv)?;
195        // Safety: timestamp of kv must be both present and a valid timestamp value.
196        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
197        metrics.min_ts = metrics.min_ts.min(ts);
198        metrics.max_ts = metrics.max_ts.max(ts);
199        metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
200
201        if !has_pk {
202            // No primary key.
203            return self.write_no_key(kv);
204        }
205
206        // Encode primary key.
207        pk_buffer.clear();
208        if self.is_partitioned {
209            self.encode_sparse_primary_key(&kv, pk_buffer)?;
210        } else {
211            self.row_codec.encode_key_value(&kv, pk_buffer)?;
212        }
213
214        // Write rows with
215        self.write_with_key(pk_buffer, kv, metrics)?;
216
217        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
218
219        Ok(())
220    }
221
222    /// Scans the tree.
223    pub fn read(
224        &self,
225        projection: Option<&[ColumnId]>,
226        predicate: Option<Predicate>,
227        sequence: Option<SequenceNumber>,
228    ) -> Result<BoxedBatchIterator> {
229        let start = Instant::now();
230        // Creates the projection set.
231        let projection: HashSet<_> = if let Some(projection) = projection {
232            projection.iter().copied().collect()
233        } else {
234            self.metadata.field_columns().map(|c| c.column_id).collect()
235        };
236
237        let filters = predicate
238            .map(|predicate| {
239                predicate
240                    .exprs()
241                    .iter()
242                    .filter_map(SimpleFilterEvaluator::try_new)
243                    .collect::<Vec<_>>()
244            })
245            .unwrap_or_default();
246
247        let mut tree_iter_metric = TreeIterMetrics::default();
248        let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
249
250        let mut iter = TreeIter {
251            sequence,
252            partitions,
253            current_reader: None,
254            metrics: tree_iter_metric,
255        };
256        let context = ReadPartitionContext::new(
257            self.metadata.clone(),
258            self.row_codec.clone(),
259            projection,
260            Arc::new(filters),
261        );
262        iter.fetch_next_partition(context)?;
263
264        iter.metrics.iter_elapsed += start.elapsed();
265
266        if self.config.merge_mode == MergeMode::LastNonNull {
267            let iter = LastNonNullIter::new(iter);
268            Ok(Box::new(iter))
269        } else {
270            Ok(Box::new(iter))
271        }
272    }
273
274    /// Returns true if the tree is empty.
275    ///
276    /// A tree is empty if no partition has data.
277    pub fn is_empty(&self) -> bool {
278        let partitions = self.partitions.read().unwrap();
279        partitions.values().all(|part| !part.has_data())
280    }
281
282    /// Marks the tree as immutable.
283    ///
284    /// Once the tree becomes immutable, callers should not write to it again.
285    pub fn freeze(&self) -> Result<()> {
286        let partitions = self.partitions.read().unwrap();
287        for partition in partitions.values() {
288            partition.freeze()?;
289        }
290        Ok(())
291    }
292
293    /// Forks an immutable tree. Returns a mutable tree that inherits the index
294    /// of this tree.
295    pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
296        if self.metadata.schema_version != metadata.schema_version
297            || self.metadata.column_metadatas != metadata.column_metadatas
298        {
299            // The schema has changed, we can't reuse the tree.
300            return PartitionTree::new(
301                self.row_codec.clone(),
302                metadata,
303                &self.config,
304                self.write_buffer_manager.clone(),
305            );
306        }
307
308        let mut total_shared_size = 0;
309        let mut part_infos = {
310            let partitions = self.partitions.read().unwrap();
311            partitions
312                .iter()
313                .filter_map(|(part_key, part)| {
314                    let stats = part.stats();
315                    if stats.num_rows > 0 {
316                        // Only fork partitions that have data.
317                        total_shared_size += stats.shared_memory_size;
318                        Some((*part_key, part.clone(), stats))
319                    } else {
320                        None
321                    }
322                })
323                .collect::<Vec<_>>()
324        };
325
326        // TODO(yingwen): Optimize eviction strategy. Now we evict the whole partition.
327        let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
328        if total_shared_size > fork_size {
329            // Sort partitions by memory size desc.
330            part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
331            while total_shared_size > fork_size {
332                let Some(info) = part_infos.pop() else {
333                    break;
334                };
335
336                common_telemetry::debug!(
337                    "Evict partition {} with memory size {}, {} shards",
338                    info.0,
339                    info.2.shared_memory_size,
340                    info.2.shard_num,
341                );
342
343                total_shared_size -= info.2.shared_memory_size;
344            }
345        }
346
347        let mut forked = BTreeMap::new();
348        for (part_key, part, _) in part_infos {
349            let forked_part = part.fork(&metadata, &self.config);
350            forked.insert(part_key, Arc::new(forked_part));
351        }
352
353        PartitionTree {
354            config: self.config.clone(),
355            metadata,
356            row_codec: self.row_codec.clone(),
357            partitions: RwLock::new(forked),
358            is_partitioned: self.is_partitioned,
359            write_buffer_manager: self.write_buffer_manager.clone(),
360            sparse_encoder: self.sparse_encoder.clone(),
361        }
362    }
363
364    /// Returns the write buffer manager.
365    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
366        self.write_buffer_manager.clone()
367    }
368
369    fn write_with_key(
370        &self,
371        primary_key: &mut Vec<u8>,
372        key_value: KeyValue,
373        metrics: &mut WriteMetrics,
374    ) -> Result<()> {
375        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
376        let partition = self.get_or_create_partition(partition_key);
377
378        partition.write_with_key(
379            primary_key,
380            self.row_codec.as_ref(),
381            key_value,
382            self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
383            metrics,
384        )
385    }
386
387    fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
388        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
389        let partition = self.get_or_create_partition(partition_key);
390
391        partition.write_no_key(key_value)
392    }
393
394    fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
395        let mut partitions = self.partitions.write().unwrap();
396        partitions
397            .entry(partition_key)
398            .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
399            .clone()
400    }
401
402    fn prune_partitions(
403        &self,
404        filters: &[SimpleFilterEvaluator],
405        metrics: &mut TreeIterMetrics,
406    ) -> VecDeque<PartitionRef> {
407        let partitions = self.partitions.read().unwrap();
408        metrics.partitions_total = partitions.len();
409        if !self.is_partitioned {
410            return partitions.values().cloned().collect();
411        }
412
413        let mut pruned = VecDeque::new();
414        // Prune partition keys.
415        for (key, partition) in partitions.iter() {
416            let mut is_needed = true;
417            for filter in filters {
418                if !Partition::is_partition_column(filter.column_name()) {
419                    continue;
420                }
421
422                if !filter
423                    .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
424                    .unwrap_or(true)
425                {
426                    is_needed = false;
427                }
428            }
429
430            if is_needed {
431                pruned.push_back(partition.clone());
432            }
433        }
434        metrics.partitions_after_pruning = pruned.len();
435        pruned
436    }
437}
438
439struct FieldWithId {
440    field: SortField,
441    column_id: ColumnId,
442}
443
444struct SparseEncoder {
445    fields: Vec<FieldWithId>,
446}
447
448impl SparseEncoder {
449    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
450    where
451        I: Iterator<Item = ValueRef<'a>>,
452    {
453        let mut serializer = Serializer::new(buffer);
454        for (value, field) in row.zip(self.fields.iter()) {
455            if !value.is_null() {
456                field
457                    .column_id
458                    .serialize(&mut serializer)
459                    .context(SerializeFieldSnafu)?;
460                field.field.serialize(&mut serializer, &value)?;
461            }
462        }
463        Ok(())
464    }
465}
466
467#[derive(Default)]
468struct TreeIterMetrics {
469    iter_elapsed: Duration,
470    fetch_partition_elapsed: Duration,
471    rows_fetched: usize,
472    batches_fetched: usize,
473    partitions_total: usize,
474    partitions_after_pruning: usize,
475}
476
477struct TreeIter {
478    /// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
479    sequence: Option<SequenceNumber>,
480    partitions: VecDeque<PartitionRef>,
481    current_reader: Option<PartitionReader>,
482    metrics: TreeIterMetrics,
483}
484
485impl Drop for TreeIter {
486    fn drop(&mut self) {
487        READ_ROWS_TOTAL
488            .with_label_values(&["partition_tree_memtable"])
489            .inc_by(self.metrics.rows_fetched as u64);
490        PARTITION_TREE_READ_STAGE_ELAPSED
491            .with_label_values(&["fetch_next_partition"])
492            .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
493        let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
494        READ_STAGE_ELAPSED
495            .with_label_values(&["scan_memtable"])
496            .observe(scan_elapsed);
497        common_telemetry::debug!(
498            "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
499            self.metrics.partitions_total,
500            self.metrics.partitions_after_pruning,
501            self.metrics.rows_fetched,
502            self.metrics.batches_fetched,
503            scan_elapsed
504        );
505    }
506}
507
508impl Iterator for TreeIter {
509    type Item = Result<Batch>;
510
511    fn next(&mut self) -> Option<Self::Item> {
512        let start = Instant::now();
513        let res = self.next_batch().transpose();
514        self.metrics.iter_elapsed += start.elapsed();
515        res
516    }
517}
518
519impl TreeIter {
520    /// Fetch next partition.
521    fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
522        let start = Instant::now();
523        while let Some(partition) = self.partitions.pop_front() {
524            let part_reader = partition.read(context)?;
525            if !part_reader.is_valid() {
526                context = part_reader.into_context();
527                continue;
528            }
529            self.current_reader = Some(part_reader);
530            break;
531        }
532        self.metrics.fetch_partition_elapsed += start.elapsed();
533        Ok(())
534    }
535
536    /// Fetches next batch.
537    fn next_batch(&mut self) -> Result<Option<Batch>> {
538        let Some(part_reader) = &mut self.current_reader else {
539            return Ok(None);
540        };
541
542        debug_assert!(part_reader.is_valid());
543        let batch = part_reader.convert_current_batch()?;
544        part_reader.next()?;
545        if part_reader.is_valid() {
546            self.metrics.rows_fetched += batch.num_rows();
547            self.metrics.batches_fetched += 1;
548            let mut batch = batch;
549            batch.filter_by_sequence(self.sequence)?;
550            return Ok(Some(batch));
551        }
552
553        // Safety: current reader is Some.
554        let part_reader = self.current_reader.take().unwrap();
555        let context = part_reader.into_context();
556        self.fetch_next_partition(context)?;
557
558        self.metrics.rows_fetched += batch.num_rows();
559        self.metrics.batches_fetched += 1;
560        let mut batch = batch;
561        batch.filter_by_sequence(self.sequence)?;
562        Ok(Some(batch))
563    }
564}