mito2/memtable/partition_tree/
tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of the partition tree.
16
17use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use mito_codec::key_values::KeyValue;
27use mito_codec::primary_key_filter::is_partition_column;
28use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
29use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
30use snafu::{ensure, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceNumber};
34use table::predicate::Predicate;
35
36use crate::error::{
37    EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
38};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::partition_tree::partition::{
41    Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
42};
43use crate::memtable::partition_tree::PartitionTreeConfig;
44use crate::memtable::stats::WriteMetrics;
45use crate::memtable::{BoxedBatchIterator, KeyValues};
46use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
47use crate::read::dedup::LastNonNullIter;
48use crate::read::Batch;
49use crate::region::options::MergeMode;
50
51/// The partition tree.
52pub struct PartitionTree {
53    /// Config of the tree.
54    config: PartitionTreeConfig,
55    /// Metadata of the region.
56    pub(crate) metadata: RegionMetadataRef,
57    /// Primary key codec.
58    row_codec: Arc<dyn PrimaryKeyCodec>,
59    /// Partitions in the tree.
60    partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61    /// Whether the tree has multiple partitions.
62    is_partitioned: bool,
63    /// Manager to report size of the tree.
64    write_buffer_manager: Option<WriteBufferManagerRef>,
65    sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69    /// Creates a new partition tree.
70    pub fn new(
71        row_codec: Arc<dyn PrimaryKeyCodec>,
72        metadata: RegionMetadataRef,
73        config: &PartitionTreeConfig,
74        write_buffer_manager: Option<WriteBufferManagerRef>,
75    ) -> Self {
76        let sparse_encoder = SparseEncoder::new(
77            metadata
78                .primary_key_columns()
79                .map(|c| FieldWithId {
80                    field: SortField::new(c.column_schema.data_type.clone()),
81                    column_id: c.column_id,
82                })
83                .collect(),
84        );
85        let is_partitioned = Partition::has_multi_partitions(&metadata);
86        let mut config = config.clone();
87        if config.merge_mode == MergeMode::LastNonNull {
88            config.dedup = false;
89        }
90
91        PartitionTree {
92            config,
93            metadata,
94            row_codec,
95            partitions: Default::default(),
96            is_partitioned,
97            write_buffer_manager,
98            sparse_encoder: Arc::new(sparse_encoder),
99        }
100    }
101
102    fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103        // The sparse primary key codec does not have a fixed number of fields.
104        if let Some(expected_num_fields) = self.row_codec.num_fields() {
105            ensure!(
106                expected_num_fields == kv.num_primary_keys(),
107                PrimaryKeyLengthMismatchSnafu {
108                    expect: expected_num_fields,
109                    actual: kv.num_primary_keys(),
110                }
111            );
112        }
113        // TODO(weny): verify the primary key length for sparse primary key codec.
114        Ok(())
115    }
116
117    /// Encodes the given key value into a sparse primary key.
118    fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119        if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120            // If the primary key encoding is sparse and already encoded in the metric engine,
121            // we only need to copy the encoded primary key into the destination buffer.
122            let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123                return EncodeSparsePrimaryKeySnafu {
124                    reason: "sparse primary key is not binary".to_string(),
125                }
126                .fail();
127            };
128            buffer.extend_from_slice(primary_key);
129        } else {
130            // For compatibility, use the sparse encoder for dense primary key.
131            self.sparse_encoder
132                .encode_to_vec(kv.primary_keys(), buffer)
133                .context(EncodeSnafu)?;
134        }
135        Ok(())
136    }
137
138    // TODO(yingwen): The size computed from values is inaccurate.
139    /// Write key-values into the tree.
140    ///
141    /// # Panics
142    /// Panics if the tree is immutable (frozen).
143    pub fn write(
144        &self,
145        kvs: &KeyValues,
146        pk_buffer: &mut Vec<u8>,
147        metrics: &mut WriteMetrics,
148    ) -> Result<()> {
149        let has_pk = !self.metadata.primary_key.is_empty();
150
151        for kv in kvs.iter() {
152            self.verify_primary_key_length(&kv)?;
153            // Safety: timestamp of kv must be both present and a valid timestamp value.
154            let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
155            metrics.min_ts = metrics.min_ts.min(ts);
156            metrics.max_ts = metrics.max_ts.max(ts);
157            metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
158
159            if !has_pk {
160                // No primary key.
161                self.write_no_key(kv)?;
162                continue;
163            }
164
165            // Encode primary key.
166            pk_buffer.clear();
167            if self.is_partitioned {
168                self.encode_sparse_primary_key(&kv, pk_buffer)?;
169            } else {
170                self.row_codec
171                    .encode_key_value(&kv, pk_buffer)
172                    .context(EncodeSnafu)?;
173            }
174
175            // Write rows with
176            self.write_with_key(pk_buffer, kv, metrics)?;
177        }
178
179        metrics.value_bytes +=
180            kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
181
182        Ok(())
183    }
184
185    /// Write one key value pair into the tree.
186    ///
187    /// # Panics
188    /// Panics if the tree is immutable (frozen).
189    pub fn write_one(
190        &self,
191        kv: KeyValue,
192        pk_buffer: &mut Vec<u8>,
193        metrics: &mut WriteMetrics,
194    ) -> Result<()> {
195        let has_pk = !self.metadata.primary_key.is_empty();
196
197        self.verify_primary_key_length(&kv)?;
198        // Safety: timestamp of kv must be both present and a valid timestamp value.
199        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
200        metrics.min_ts = metrics.min_ts.min(ts);
201        metrics.max_ts = metrics.max_ts.max(ts);
202        metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
203
204        if !has_pk {
205            // No primary key.
206            return self.write_no_key(kv);
207        }
208
209        // Encode primary key.
210        pk_buffer.clear();
211        if self.is_partitioned {
212            self.encode_sparse_primary_key(&kv, pk_buffer)?;
213        } else {
214            self.row_codec
215                .encode_key_value(&kv, pk_buffer)
216                .context(EncodeSnafu)?;
217        }
218
219        // Write rows with
220        self.write_with_key(pk_buffer, kv, metrics)?;
221
222        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
223
224        Ok(())
225    }
226
227    /// Scans the tree.
228    pub fn read(
229        &self,
230        projection: Option<&[ColumnId]>,
231        predicate: Option<Predicate>,
232        sequence: Option<SequenceNumber>,
233    ) -> Result<BoxedBatchIterator> {
234        let start = Instant::now();
235        // Creates the projection set.
236        let projection: HashSet<_> = if let Some(projection) = projection {
237            projection.iter().copied().collect()
238        } else {
239            self.metadata.field_columns().map(|c| c.column_id).collect()
240        };
241
242        let filters = predicate
243            .map(|predicate| {
244                predicate
245                    .exprs()
246                    .iter()
247                    .filter_map(SimpleFilterEvaluator::try_new)
248                    .collect::<Vec<_>>()
249            })
250            .unwrap_or_default();
251
252        let mut tree_iter_metric = TreeIterMetrics::default();
253        let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
254
255        let mut iter = TreeIter {
256            sequence,
257            partitions,
258            current_reader: None,
259            metrics: tree_iter_metric,
260        };
261        let context = ReadPartitionContext::new(
262            self.metadata.clone(),
263            self.row_codec.clone(),
264            projection,
265            Arc::new(filters),
266        );
267        iter.fetch_next_partition(context)?;
268
269        iter.metrics.iter_elapsed += start.elapsed();
270
271        if self.config.merge_mode == MergeMode::LastNonNull {
272            let iter = LastNonNullIter::new(iter);
273            Ok(Box::new(iter))
274        } else {
275            Ok(Box::new(iter))
276        }
277    }
278
279    /// Returns true if the tree is empty.
280    ///
281    /// A tree is empty if no partition has data.
282    pub fn is_empty(&self) -> bool {
283        let partitions = self.partitions.read().unwrap();
284        partitions.values().all(|part| !part.has_data())
285    }
286
287    /// Marks the tree as immutable.
288    ///
289    /// Once the tree becomes immutable, callers should not write to it again.
290    pub fn freeze(&self) -> Result<()> {
291        let partitions = self.partitions.read().unwrap();
292        for partition in partitions.values() {
293            partition.freeze()?;
294        }
295        Ok(())
296    }
297
298    /// Forks an immutable tree. Returns a mutable tree that inherits the index
299    /// of this tree.
300    pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
301        if self.metadata.schema_version != metadata.schema_version
302            || self.metadata.column_metadatas != metadata.column_metadatas
303        {
304            // The schema has changed, we can't reuse the tree.
305            return PartitionTree::new(
306                self.row_codec.clone(),
307                metadata,
308                &self.config,
309                self.write_buffer_manager.clone(),
310            );
311        }
312
313        let mut total_shared_size = 0;
314        let mut part_infos = {
315            let partitions = self.partitions.read().unwrap();
316            partitions
317                .iter()
318                .filter_map(|(part_key, part)| {
319                    let stats = part.stats();
320                    if stats.num_rows > 0 {
321                        // Only fork partitions that have data.
322                        total_shared_size += stats.shared_memory_size;
323                        Some((*part_key, part.clone(), stats))
324                    } else {
325                        None
326                    }
327                })
328                .collect::<Vec<_>>()
329        };
330
331        // TODO(yingwen): Optimize eviction strategy. Now we evict the whole partition.
332        let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
333        if total_shared_size > fork_size {
334            // Sort partitions by memory size desc.
335            part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
336            while total_shared_size > fork_size {
337                let Some(info) = part_infos.pop() else {
338                    break;
339                };
340
341                common_telemetry::debug!(
342                    "Evict partition {} with memory size {}, {} shards",
343                    info.0,
344                    info.2.shared_memory_size,
345                    info.2.shard_num,
346                );
347
348                total_shared_size -= info.2.shared_memory_size;
349            }
350        }
351
352        let mut forked = BTreeMap::new();
353        for (part_key, part, _) in part_infos {
354            let forked_part = part.fork(&metadata, &self.config);
355            forked.insert(part_key, Arc::new(forked_part));
356        }
357
358        PartitionTree {
359            config: self.config.clone(),
360            metadata,
361            row_codec: self.row_codec.clone(),
362            partitions: RwLock::new(forked),
363            is_partitioned: self.is_partitioned,
364            write_buffer_manager: self.write_buffer_manager.clone(),
365            sparse_encoder: self.sparse_encoder.clone(),
366        }
367    }
368
369    /// Returns the write buffer manager.
370    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
371        self.write_buffer_manager.clone()
372    }
373
374    fn write_with_key(
375        &self,
376        primary_key: &mut Vec<u8>,
377        key_value: KeyValue,
378        metrics: &mut WriteMetrics,
379    ) -> Result<()> {
380        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
381        let partition = self.get_or_create_partition(partition_key);
382
383        partition.write_with_key(
384            primary_key,
385            self.row_codec.as_ref(),
386            key_value,
387            self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
388            metrics,
389        )
390    }
391
392    fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
393        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
394        let partition = self.get_or_create_partition(partition_key);
395
396        partition.write_no_key(key_value)
397    }
398
399    fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
400        let mut partitions = self.partitions.write().unwrap();
401        partitions
402            .entry(partition_key)
403            .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
404            .clone()
405    }
406
407    fn prune_partitions(
408        &self,
409        filters: &[SimpleFilterEvaluator],
410        metrics: &mut TreeIterMetrics,
411    ) -> VecDeque<PartitionRef> {
412        let partitions = self.partitions.read().unwrap();
413        metrics.partitions_total = partitions.len();
414        if !self.is_partitioned {
415            return partitions.values().cloned().collect();
416        }
417
418        let mut pruned = VecDeque::new();
419        // Prune partition keys.
420        for (key, partition) in partitions.iter() {
421            let mut is_needed = true;
422            for filter in filters {
423                if !is_partition_column(filter.column_name()) {
424                    continue;
425                }
426
427                if !filter
428                    .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
429                    .unwrap_or(true)
430                {
431                    is_needed = false;
432                }
433            }
434
435            if is_needed {
436                pruned.push_back(partition.clone());
437            }
438        }
439        metrics.partitions_after_pruning = pruned.len();
440        pruned
441    }
442}
443
444#[derive(Default)]
445struct TreeIterMetrics {
446    iter_elapsed: Duration,
447    fetch_partition_elapsed: Duration,
448    rows_fetched: usize,
449    batches_fetched: usize,
450    partitions_total: usize,
451    partitions_after_pruning: usize,
452}
453
454struct TreeIter {
455    /// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
456    sequence: Option<SequenceNumber>,
457    partitions: VecDeque<PartitionRef>,
458    current_reader: Option<PartitionReader>,
459    metrics: TreeIterMetrics,
460}
461
462impl Drop for TreeIter {
463    fn drop(&mut self) {
464        READ_ROWS_TOTAL
465            .with_label_values(&["partition_tree_memtable"])
466            .inc_by(self.metrics.rows_fetched as u64);
467        PARTITION_TREE_READ_STAGE_ELAPSED
468            .with_label_values(&["fetch_next_partition"])
469            .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
470        let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
471        READ_STAGE_ELAPSED
472            .with_label_values(&["scan_memtable"])
473            .observe(scan_elapsed);
474        common_telemetry::debug!(
475            "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
476            self.metrics.partitions_total,
477            self.metrics.partitions_after_pruning,
478            self.metrics.rows_fetched,
479            self.metrics.batches_fetched,
480            scan_elapsed
481        );
482    }
483}
484
485impl Iterator for TreeIter {
486    type Item = Result<Batch>;
487
488    fn next(&mut self) -> Option<Self::Item> {
489        let start = Instant::now();
490        let res = self.next_batch().transpose();
491        self.metrics.iter_elapsed += start.elapsed();
492        res
493    }
494}
495
496impl TreeIter {
497    /// Fetch next partition.
498    fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
499        let start = Instant::now();
500        while let Some(partition) = self.partitions.pop_front() {
501            let part_reader = partition.read(context)?;
502            if !part_reader.is_valid() {
503                context = part_reader.into_context();
504                continue;
505            }
506            self.current_reader = Some(part_reader);
507            break;
508        }
509        self.metrics.fetch_partition_elapsed += start.elapsed();
510        Ok(())
511    }
512
513    /// Fetches next batch.
514    fn next_batch(&mut self) -> Result<Option<Batch>> {
515        let Some(part_reader) = &mut self.current_reader else {
516            return Ok(None);
517        };
518
519        debug_assert!(part_reader.is_valid());
520        let batch = part_reader.convert_current_batch()?;
521        part_reader.next()?;
522        if part_reader.is_valid() {
523            self.metrics.rows_fetched += batch.num_rows();
524            self.metrics.batches_fetched += 1;
525            let mut batch = batch;
526            batch.filter_by_sequence(self.sequence)?;
527            return Ok(Some(batch));
528        }
529
530        // Safety: current reader is Some.
531        let part_reader = self.current_reader.take().unwrap();
532        let context = part_reader.into_context();
533        self.fetch_next_partition(context)?;
534
535        self.metrics.rows_fetched += batch.num_rows();
536        self.metrics.batches_fetched += 1;
537        let mut batch = batch;
538        batch.filter_by_sequence(self.sequence)?;
539        Ok(Some(batch))
540    }
541}