mito2/memtable/partition_tree/
tree.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of the partition tree.
16
17use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use mito_codec::key_values::KeyValue;
27use mito_codec::primary_key_filter::is_partition_column;
28use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
29use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceRange};
34use table::predicate::Predicate;
35
36use crate::error::{
37    EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
38};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::partition_tree::PartitionTreeConfig;
41use crate::memtable::partition_tree::partition::{
42    Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
43};
44use crate::memtable::stats::WriteMetrics;
45use crate::memtable::{BoxedBatchIterator, KeyValues};
46use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
47use crate::read::Batch;
48use crate::read::dedup::LastNonNullIter;
49use crate::region::options::MergeMode;
50
51/// The partition tree.
52pub struct PartitionTree {
53    /// Config of the tree.
54    config: PartitionTreeConfig,
55    /// Metadata of the region.
56    pub(crate) metadata: RegionMetadataRef,
57    /// Primary key codec.
58    row_codec: Arc<dyn PrimaryKeyCodec>,
59    /// Partitions in the tree.
60    partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61    /// Whether the tree has multiple partitions.
62    is_partitioned: bool,
63    /// Manager to report size of the tree.
64    write_buffer_manager: Option<WriteBufferManagerRef>,
65    sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69    /// Creates a new partition tree.
70    pub fn new(
71        row_codec: Arc<dyn PrimaryKeyCodec>,
72        metadata: RegionMetadataRef,
73        config: &PartitionTreeConfig,
74        write_buffer_manager: Option<WriteBufferManagerRef>,
75    ) -> Self {
76        let sparse_encoder = SparseEncoder::new(
77            metadata
78                .primary_key_columns()
79                .map(|c| FieldWithId {
80                    field: SortField::new(c.column_schema.data_type.clone()),
81                    column_id: c.column_id,
82                })
83                .collect(),
84        );
85        let is_partitioned = Partition::has_multi_partitions(&metadata);
86        let mut config = config.clone();
87        if config.merge_mode == MergeMode::LastNonNull {
88            config.dedup = false;
89        }
90
91        PartitionTree {
92            config,
93            metadata,
94            row_codec,
95            partitions: Default::default(),
96            is_partitioned,
97            write_buffer_manager,
98            sparse_encoder: Arc::new(sparse_encoder),
99        }
100    }
101
102    fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103        // The sparse primary key codec does not have a fixed number of fields.
104        if let Some(expected_num_fields) = self.row_codec.num_fields() {
105            ensure!(
106                expected_num_fields == kv.num_primary_keys(),
107                PrimaryKeyLengthMismatchSnafu {
108                    expect: expected_num_fields,
109                    actual: kv.num_primary_keys(),
110                }
111            );
112        }
113        // TODO(weny): verify the primary key length for sparse primary key codec.
114        Ok(())
115    }
116
117    /// Encodes the given key value into a sparse primary key.
118    fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119        if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120            // If the primary key encoding is sparse and already encoded in the metric engine,
121            // we only need to copy the encoded primary key into the destination buffer.
122            let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123                return EncodeSparsePrimaryKeySnafu {
124                    reason: "sparse primary key is not binary".to_string(),
125                }
126                .fail();
127            };
128            buffer.extend_from_slice(primary_key);
129        } else {
130            // For compatibility, use the sparse encoder for dense primary key.
131            self.sparse_encoder
132                .encode_to_vec(kv.primary_keys(), buffer)
133                .context(EncodeSnafu)?;
134        }
135        Ok(())
136    }
137
138    // TODO(yingwen): The size computed from values is inaccurate.
139    /// Write key-values into the tree.
140    ///
141    /// # Panics
142    /// Panics if the tree is immutable (frozen).
143    pub fn write(
144        &self,
145        kvs: &KeyValues,
146        pk_buffer: &mut Vec<u8>,
147        metrics: &mut WriteMetrics,
148    ) -> Result<()> {
149        let has_pk = !self.metadata.primary_key.is_empty();
150
151        for kv in kvs.iter() {
152            self.verify_primary_key_length(&kv)?;
153            // Safety: timestamp of kv must be both present and a valid timestamp value.
154            let ts = kv
155                .timestamp()
156                .try_into_timestamp()
157                .unwrap()
158                .unwrap()
159                .value();
160            metrics.min_ts = metrics.min_ts.min(ts);
161            metrics.max_ts = metrics.max_ts.max(ts);
162            metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
163
164            if !has_pk {
165                // No primary key.
166                self.write_no_key(kv)?;
167                continue;
168            }
169
170            // Encode primary key.
171            pk_buffer.clear();
172            if self.is_partitioned {
173                self.encode_sparse_primary_key(&kv, pk_buffer)?;
174            } else {
175                self.row_codec
176                    .encode_key_value(&kv, pk_buffer)
177                    .context(EncodeSnafu)?;
178            }
179
180            // Write rows with
181            self.write_with_key(pk_buffer, kv, metrics)?;
182        }
183
184        metrics.value_bytes +=
185            kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
186
187        Ok(())
188    }
189
190    /// Write one key value pair into the tree.
191    ///
192    /// # Panics
193    /// Panics if the tree is immutable (frozen).
194    pub fn write_one(
195        &self,
196        kv: KeyValue,
197        pk_buffer: &mut Vec<u8>,
198        metrics: &mut WriteMetrics,
199    ) -> Result<()> {
200        let has_pk = !self.metadata.primary_key.is_empty();
201
202        self.verify_primary_key_length(&kv)?;
203        // Safety: timestamp of kv must be both present and a valid timestamp value.
204        let ts = kv
205            .timestamp()
206            .try_into_timestamp()
207            .unwrap()
208            .unwrap()
209            .value();
210        metrics.min_ts = metrics.min_ts.min(ts);
211        metrics.max_ts = metrics.max_ts.max(ts);
212        metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
213
214        if !has_pk {
215            // No primary key.
216            return self.write_no_key(kv);
217        }
218
219        // Encode primary key.
220        pk_buffer.clear();
221        if self.is_partitioned {
222            self.encode_sparse_primary_key(&kv, pk_buffer)?;
223        } else {
224            self.row_codec
225                .encode_key_value(&kv, pk_buffer)
226                .context(EncodeSnafu)?;
227        }
228
229        // Write rows with
230        self.write_with_key(pk_buffer, kv, metrics)?;
231
232        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
233
234        Ok(())
235    }
236
237    /// Scans the tree.
238    pub fn read(
239        &self,
240        projection: Option<&[ColumnId]>,
241        predicate: Option<Predicate>,
242        sequence: Option<SequenceRange>,
243        mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
244    ) -> Result<BoxedBatchIterator> {
245        let start = Instant::now();
246        // Creates the projection set.
247        let projection: HashSet<_> = if let Some(projection) = projection {
248            projection.iter().copied().collect()
249        } else {
250            self.metadata.field_columns().map(|c| c.column_id).collect()
251        };
252
253        let filters = predicate
254            .map(|predicate| {
255                predicate
256                    .exprs()
257                    .iter()
258                    .filter_map(SimpleFilterEvaluator::try_new)
259                    .collect::<Vec<_>>()
260            })
261            .unwrap_or_default();
262
263        let mut tree_iter_metric = TreeIterMetrics::default();
264        let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
265
266        let mut iter = TreeIter {
267            sequence,
268            partitions,
269            current_reader: None,
270            metrics: tree_iter_metric,
271            mem_scan_metrics,
272        };
273        let context = ReadPartitionContext::new(
274            self.metadata.clone(),
275            self.row_codec.clone(),
276            projection,
277            Arc::new(filters),
278        );
279        iter.fetch_next_partition(context)?;
280
281        iter.metrics.iter_elapsed += start.elapsed();
282
283        if self.config.merge_mode == MergeMode::LastNonNull {
284            let iter = LastNonNullIter::new(iter);
285            Ok(Box::new(iter))
286        } else {
287            Ok(Box::new(iter))
288        }
289    }
290
291    /// Returns true if the tree is empty.
292    ///
293    /// A tree is empty if no partition has data.
294    pub fn is_empty(&self) -> bool {
295        let partitions = self.partitions.read().unwrap();
296        partitions.values().all(|part| !part.has_data())
297    }
298
299    /// Marks the tree as immutable.
300    ///
301    /// Once the tree becomes immutable, callers should not write to it again.
302    pub fn freeze(&self) -> Result<()> {
303        let partitions = self.partitions.read().unwrap();
304        for partition in partitions.values() {
305            partition.freeze()?;
306        }
307        Ok(())
308    }
309
310    /// Forks an immutable tree. Returns a mutable tree that inherits the index
311    /// of this tree.
312    pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
313        if self.metadata.schema_version != metadata.schema_version
314            || self.metadata.column_metadatas != metadata.column_metadatas
315        {
316            // The schema has changed, we can't reuse the tree.
317            return PartitionTree::new(
318                self.row_codec.clone(),
319                metadata,
320                &self.config,
321                self.write_buffer_manager.clone(),
322            );
323        }
324
325        let mut total_shared_size = 0;
326        let mut part_infos = {
327            let partitions = self.partitions.read().unwrap();
328            partitions
329                .iter()
330                .filter_map(|(part_key, part)| {
331                    let stats = part.stats();
332                    if stats.num_rows > 0 {
333                        // Only fork partitions that have data.
334                        total_shared_size += stats.shared_memory_size;
335                        Some((*part_key, part.clone(), stats))
336                    } else {
337                        None
338                    }
339                })
340                .collect::<Vec<_>>()
341        };
342
343        // TODO(yingwen): Optimize eviction strategy. Now we evict the whole partition.
344        let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
345        if total_shared_size > fork_size {
346            // Sort partitions by memory size desc.
347            part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
348            while total_shared_size > fork_size {
349                let Some(info) = part_infos.pop() else {
350                    break;
351                };
352
353                common_telemetry::debug!(
354                    "Evict partition {} with memory size {}, {} shards",
355                    info.0,
356                    info.2.shared_memory_size,
357                    info.2.shard_num,
358                );
359
360                total_shared_size -= info.2.shared_memory_size;
361            }
362        }
363
364        let mut forked = BTreeMap::new();
365        for (part_key, part, _) in part_infos {
366            let forked_part = part.fork(&metadata, &self.config);
367            forked.insert(part_key, Arc::new(forked_part));
368        }
369
370        PartitionTree {
371            config: self.config.clone(),
372            metadata,
373            row_codec: self.row_codec.clone(),
374            partitions: RwLock::new(forked),
375            is_partitioned: self.is_partitioned,
376            write_buffer_manager: self.write_buffer_manager.clone(),
377            sparse_encoder: self.sparse_encoder.clone(),
378        }
379    }
380
381    /// Returns the write buffer manager.
382    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
383        self.write_buffer_manager.clone()
384    }
385
386    fn write_with_key(
387        &self,
388        primary_key: &mut Vec<u8>,
389        key_value: KeyValue,
390        metrics: &mut WriteMetrics,
391    ) -> Result<()> {
392        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
393        let partition = self.get_or_create_partition(partition_key);
394
395        partition.write_with_key(
396            primary_key,
397            self.row_codec.as_ref(),
398            key_value,
399            self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
400            metrics,
401        )
402    }
403
404    fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
405        let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
406        let partition = self.get_or_create_partition(partition_key);
407
408        partition.write_no_key(key_value)
409    }
410
411    fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
412        let mut partitions = self.partitions.write().unwrap();
413        partitions
414            .entry(partition_key)
415            .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
416            .clone()
417    }
418
419    fn prune_partitions(
420        &self,
421        filters: &[SimpleFilterEvaluator],
422        metrics: &mut TreeIterMetrics,
423    ) -> VecDeque<PartitionRef> {
424        let partitions = self.partitions.read().unwrap();
425        metrics.partitions_total = partitions.len();
426        if !self.is_partitioned {
427            return partitions.values().cloned().collect();
428        }
429
430        let mut pruned = VecDeque::new();
431        // Prune partition keys.
432        for (key, partition) in partitions.iter() {
433            let mut is_needed = true;
434            for filter in filters {
435                if !is_partition_column(filter.column_name()) {
436                    continue;
437                }
438
439                if !filter
440                    .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
441                    .unwrap_or(true)
442                {
443                    is_needed = false;
444                }
445            }
446
447            if is_needed {
448                pruned.push_back(partition.clone());
449            }
450        }
451        metrics.partitions_after_pruning = pruned.len();
452        pruned
453    }
454
455    /// Returns all series count in all partitions.
456    pub(crate) fn series_count(&self) -> usize {
457        self.partitions
458            .read()
459            .unwrap()
460            .values()
461            .map(|p| p.series_count())
462            .sum()
463    }
464}
465
466#[derive(Default)]
467struct TreeIterMetrics {
468    iter_elapsed: Duration,
469    fetch_partition_elapsed: Duration,
470    rows_fetched: usize,
471    batches_fetched: usize,
472    partitions_total: usize,
473    partitions_after_pruning: usize,
474}
475
476struct TreeIter {
477    /// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
478    sequence: Option<SequenceRange>,
479    partitions: VecDeque<PartitionRef>,
480    current_reader: Option<PartitionReader>,
481    metrics: TreeIterMetrics,
482    mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
483}
484
485impl TreeIter {
486    fn report_mem_scan_metrics(&mut self) {
487        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
488            let inner = crate::memtable::MemScanMetricsData {
489                total_series: 0, // This is unavailable.
490                num_rows: self.metrics.rows_fetched,
491                num_batches: self.metrics.batches_fetched,
492                scan_cost: self.metrics.iter_elapsed,
493            };
494            mem_scan_metrics.merge_inner(&inner);
495        }
496    }
497}
498
499impl Drop for TreeIter {
500    fn drop(&mut self) {
501        // Report MemScanMetrics if not already reported
502        self.report_mem_scan_metrics();
503
504        READ_ROWS_TOTAL
505            .with_label_values(&["partition_tree_memtable"])
506            .inc_by(self.metrics.rows_fetched as u64);
507        PARTITION_TREE_READ_STAGE_ELAPSED
508            .with_label_values(&["fetch_next_partition"])
509            .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
510        let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
511        READ_STAGE_ELAPSED
512            .with_label_values(&["scan_memtable"])
513            .observe(scan_elapsed);
514        common_telemetry::debug!(
515            "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
516            self.metrics.partitions_total,
517            self.metrics.partitions_after_pruning,
518            self.metrics.rows_fetched,
519            self.metrics.batches_fetched,
520            scan_elapsed
521        );
522    }
523}
524
525impl Iterator for TreeIter {
526    type Item = Result<Batch>;
527
528    fn next(&mut self) -> Option<Self::Item> {
529        let start = Instant::now();
530        let res = self.next_batch().transpose();
531        self.metrics.iter_elapsed += start.elapsed();
532        res
533    }
534}
535
536impl TreeIter {
537    /// Fetch next partition.
538    fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
539        let start = Instant::now();
540        while let Some(partition) = self.partitions.pop_front() {
541            let part_reader = partition.read(context)?;
542            if !part_reader.is_valid() {
543                context = part_reader.into_context();
544                continue;
545            }
546            self.current_reader = Some(part_reader);
547            break;
548        }
549        self.metrics.fetch_partition_elapsed += start.elapsed();
550        Ok(())
551    }
552
553    /// Fetches next batch.
554    fn next_batch(&mut self) -> Result<Option<Batch>> {
555        let Some(part_reader) = &mut self.current_reader else {
556            // Report MemScanMetrics before returning None
557            self.report_mem_scan_metrics();
558            return Ok(None);
559        };
560
561        debug_assert!(part_reader.is_valid());
562        let batch = part_reader.convert_current_batch()?;
563        part_reader.next()?;
564        if part_reader.is_valid() {
565            self.metrics.rows_fetched += batch.num_rows();
566            self.metrics.batches_fetched += 1;
567            let mut batch = batch;
568            batch.filter_by_sequence(self.sequence)?;
569            return Ok(Some(batch));
570        }
571
572        // Safety: current reader is Some.
573        let part_reader = self.current_reader.take().unwrap();
574        let context = part_reader.into_context();
575        self.fetch_next_partition(context)?;
576
577        self.metrics.rows_fetched += batch.num_rows();
578        self.metrics.batches_fetched += 1;
579        let mut batch = batch;
580        batch.filter_by_sequence(self.sequence)?;
581        Ok(Some(batch))
582    }
583}