mito2/memtable/partition_tree/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partition of a partition tree.
16//!
17//! We only support partitioning the tree by pre-defined internal columns.
18
19use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
28use store_api::storage::ColumnId;
29
30use crate::error::Result;
31use crate::memtable::key_values::KeyValue;
32use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
33use crate::memtable::partition_tree::dedup::DedupReader;
34use crate::memtable::partition_tree::shard::{
35    BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
36};
37use crate::memtable::partition_tree::shard_builder::ShardBuilder;
38use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
39use crate::memtable::stats::WriteMetrics;
40use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
41use crate::read::{Batch, BatchBuilder};
42use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
43
44/// Key of a partition.
45pub type PartitionKey = u32;
46
47/// A tree partition.
48pub struct Partition {
49    inner: RwLock<Inner>,
50    /// Whether to dedup batches.
51    dedup: bool,
52}
53
54pub type PartitionRef = Arc<Partition>;
55
56impl Partition {
57    /// Creates a new partition.
58    pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
59        Partition {
60            inner: RwLock::new(Inner::new(metadata, config)),
61            dedup: config.dedup,
62        }
63    }
64
65    /// Writes to the partition with a primary key.
66    pub fn write_with_key(
67        &self,
68        primary_key: &mut Vec<u8>,
69        row_codec: &dyn PrimaryKeyCodec,
70        key_value: KeyValue,
71        re_encode: bool,
72        metrics: &mut WriteMetrics,
73    ) -> Result<()> {
74        let mut inner = self.inner.write().unwrap();
75        // Freeze the shard builder if needed.
76        if inner.shard_builder.should_freeze() {
77            inner.freeze_active_shard()?;
78        }
79
80        // Finds key in shards, now we ensure one key only exists in one shard.
81        if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
82            inner.write_to_shard(pk_id, &key_value)?;
83            inner.num_rows += 1;
84            return Ok(());
85        }
86
87        // Key does not yet exist in shard or builder, encode and insert the full primary key.
88        if re_encode {
89            match row_codec.encoding() {
90                PrimaryKeyEncoding::Dense => {
91                    // `primary_key` is sparse, re-encode the full primary key.
92                    let sparse_key = primary_key.clone();
93                    primary_key.clear();
94                    row_codec.encode_key_value(&key_value, primary_key)?;
95                    let pk_id = inner.shard_builder.write_with_key(
96                        primary_key,
97                        Some(&sparse_key),
98                        &key_value,
99                        metrics,
100                    );
101                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
102                }
103                PrimaryKeyEncoding::Sparse => {
104                    let sparse_key = primary_key.clone();
105                    let pk_id = inner.shard_builder.write_with_key(
106                        primary_key,
107                        Some(&sparse_key),
108                        &key_value,
109                        metrics,
110                    );
111                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
112                }
113            }
114        } else {
115            // `primary_key` is already the full primary key.
116            let pk_id = inner
117                .shard_builder
118                .write_with_key(primary_key, None, &key_value, metrics);
119            inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
120        };
121
122        inner.num_rows += 1;
123        Ok(())
124    }
125
126    /// Writes to the partition without a primary key.
127    pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
128        let mut inner = self.inner.write().unwrap();
129        // If no primary key, always write to the first shard.
130        debug_assert!(!inner.shards.is_empty());
131        debug_assert_eq!(1, inner.shard_builder.current_shard_id());
132
133        // A dummy pk id.
134        let pk_id = PkId {
135            shard_id: 0,
136            pk_index: 0,
137        };
138        inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
139        inner.num_rows += 1;
140
141        Ok(())
142    }
143
144    fn build_primary_key_filter(
145        need_prune_key: bool,
146        metadata: &RegionMetadataRef,
147        row_codec: &dyn PrimaryKeyCodec,
148        filters: &Arc<Vec<SimpleFilterEvaluator>>,
149    ) -> Option<Box<dyn PrimaryKeyFilter>> {
150        if need_prune_key {
151            let filter = row_codec.primary_key_filter(metadata, filters.clone());
152            Some(filter)
153        } else {
154            None
155        }
156    }
157
158    /// Scans data in the partition.
159    pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
160        let start = Instant::now();
161        let (builder_source, shard_reader_builders) = {
162            let inner = self.inner.read().unwrap();
163            let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
164            let builder_reader = if !inner.shard_builder.is_empty() {
165                let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
166                Some(builder_reader)
167            } else {
168                None
169            };
170            for shard in &inner.shards {
171                if !shard.is_empty() {
172                    let shard_reader_builder = shard.read()?;
173                    shard_source.push(shard_reader_builder);
174                }
175            }
176            (builder_reader, shard_source)
177        };
178
179        context.metrics.num_shards += shard_reader_builders.len();
180
181        let mut nodes = shard_reader_builders
182            .into_iter()
183            .map(|builder| {
184                let primary_key_filter = Self::build_primary_key_filter(
185                    context.need_prune_key,
186                    &context.metadata,
187                    context.row_codec.as_ref(),
188                    &context.filters,
189                );
190                Ok(ShardNode::new(ShardSource::Shard(
191                    builder.build(primary_key_filter)?,
192                )))
193            })
194            .collect::<Result<Vec<_>>>()?;
195
196        if let Some(builder) = builder_source {
197            context.metrics.num_builder += 1;
198            let primary_key_filter = Self::build_primary_key_filter(
199                context.need_prune_key,
200                &context.metadata,
201                context.row_codec.as_ref(),
202                &context.filters,
203            );
204            // Move the initialization of ShardBuilderReader out of read lock.
205            let shard_builder_reader =
206                builder.build(Some(&context.pk_weights), primary_key_filter)?;
207            nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
208        }
209
210        // Creating a shard merger will invoke next so we do it outside the lock.
211        let merger = ShardMerger::try_new(nodes)?;
212        if self.dedup {
213            let source = DedupReader::try_new(merger)?;
214            context.metrics.build_partition_reader += start.elapsed();
215            PartitionReader::new(context, Box::new(source))
216        } else {
217            context.metrics.build_partition_reader += start.elapsed();
218            PartitionReader::new(context, Box::new(merger))
219        }
220    }
221
222    /// Freezes the partition.
223    pub fn freeze(&self) -> Result<()> {
224        let mut inner = self.inner.write().unwrap();
225        inner.freeze_active_shard()?;
226        Ok(())
227    }
228
229    /// Forks the partition.
230    ///
231    /// Must freeze the partition before fork.
232    pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
233        let (shards, shard_builder) = {
234            let inner = self.inner.read().unwrap();
235            debug_assert!(inner.shard_builder.is_empty());
236            let shard_builder = ShardBuilder::new(
237                metadata.clone(),
238                config,
239                inner.shard_builder.current_shard_id(),
240            );
241            let shards = inner
242                .shards
243                .iter()
244                .map(|shard| shard.fork(metadata.clone()))
245                .collect();
246
247            (shards, shard_builder)
248        };
249        let pk_to_pk_id = {
250            let mut inner = self.inner.write().unwrap();
251            std::mem::take(&mut inner.pk_to_pk_id)
252        };
253
254        Partition {
255            inner: RwLock::new(Inner {
256                metadata: metadata.clone(),
257                shard_builder,
258                shards,
259                num_rows: 0,
260                pk_to_pk_id,
261                frozen: false,
262            }),
263            dedup: self.dedup,
264        }
265    }
266
267    /// Returns true if the partition has data.
268    pub fn has_data(&self) -> bool {
269        let inner = self.inner.read().unwrap();
270        inner.num_rows > 0
271    }
272
273    /// Gets the stats of the partition.
274    pub(crate) fn stats(&self) -> PartitionStats {
275        let inner = self.inner.read().unwrap();
276        let num_rows = inner.num_rows;
277        let shard_num = inner.shards.len();
278        let shared_memory_size = inner
279            .shards
280            .iter()
281            .map(|shard| shard.shared_memory_size())
282            .sum();
283        PartitionStats {
284            num_rows,
285            shard_num,
286            shared_memory_size,
287        }
288    }
289
290    /// Get partition key from the key value.
291    pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
292        if !is_partitioned {
293            return PartitionKey::default();
294        }
295
296        key_value.partition_key()
297    }
298
299    /// Returns true if the region can be partitioned.
300    pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
301        metadata
302            .primary_key_columns()
303            .next()
304            .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
305            .unwrap_or(false)
306    }
307
308    /// Returns true if this is a partition column.
309    pub(crate) fn is_partition_column(name: &str) -> bool {
310        name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
311    }
312}
313
314pub(crate) struct PartitionStats {
315    pub(crate) num_rows: usize,
316    pub(crate) shard_num: usize,
317    pub(crate) shared_memory_size: usize,
318}
319
320#[derive(Default)]
321struct PartitionReaderMetrics {
322    build_partition_reader: Duration,
323    read_source: Duration,
324    data_batch_to_batch: Duration,
325    num_builder: usize,
326    num_shards: usize,
327}
328
329/// Reader to scan rows in a partition.
330///
331/// It can merge rows from multiple shards.
332pub struct PartitionReader {
333    context: ReadPartitionContext,
334    source: BoxedDataBatchSource,
335}
336
337impl PartitionReader {
338    fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
339        let reader = Self { context, source };
340
341        Ok(reader)
342    }
343
344    /// Returns true if the reader is valid.
345    pub fn is_valid(&self) -> bool {
346        self.source.is_valid()
347    }
348
349    /// Advances the reader.
350    ///
351    /// # Panics
352    /// Panics if the reader is invalid.
353    pub fn next(&mut self) -> Result<()> {
354        self.advance_source()
355    }
356
357    /// Converts current data batch into a [Batch].
358    ///
359    /// # Panics
360    /// Panics if the reader is invalid.
361    pub fn convert_current_batch(&mut self) -> Result<Batch> {
362        let start = Instant::now();
363        let data_batch = self.source.current_data_batch();
364        let batch = data_batch_to_batch(
365            &self.context.metadata,
366            &self.context.projection,
367            self.source.current_key(),
368            data_batch,
369        )?;
370        self.context.metrics.data_batch_to_batch += start.elapsed();
371        Ok(batch)
372    }
373
374    pub(crate) fn into_context(self) -> ReadPartitionContext {
375        self.context
376    }
377
378    fn advance_source(&mut self) -> Result<()> {
379        let read_source = Instant::now();
380        self.source.next()?;
381        self.context.metrics.read_source += read_source.elapsed();
382        Ok(())
383    }
384}
385
386/// Structs to reuse across readers to avoid allocating for each reader.
387pub(crate) struct ReadPartitionContext {
388    metadata: RegionMetadataRef,
389    row_codec: Arc<dyn PrimaryKeyCodec>,
390    projection: HashSet<ColumnId>,
391    filters: Arc<Vec<SimpleFilterEvaluator>>,
392    /// Buffer to store pk weights.
393    pk_weights: Vec<u16>,
394    need_prune_key: bool,
395    metrics: PartitionReaderMetrics,
396}
397
398impl Drop for ReadPartitionContext {
399    fn drop(&mut self) {
400        let partition_read_source = self.metrics.read_source.as_secs_f64();
401        PARTITION_TREE_READ_STAGE_ELAPSED
402            .with_label_values(&["partition_read_source"])
403            .observe(partition_read_source);
404        let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
405        PARTITION_TREE_READ_STAGE_ELAPSED
406            .with_label_values(&["partition_data_batch_to_batch"])
407            .observe(partition_data_batch_to_batch);
408
409        common_telemetry::debug!(
410            "TreeIter partitions metrics, \
411            num_builder: {}, \
412            num_shards: {}, \
413            build_partition_reader: {}s, \
414            partition_read_source: {}s, \
415            partition_data_batch_to_batch: {}s",
416            self.metrics.num_builder,
417            self.metrics.num_shards,
418            self.metrics.build_partition_reader.as_secs_f64(),
419            partition_read_source,
420            partition_data_batch_to_batch,
421        );
422    }
423}
424
425impl ReadPartitionContext {
426    pub(crate) fn new(
427        metadata: RegionMetadataRef,
428        row_codec: Arc<dyn PrimaryKeyCodec>,
429        projection: HashSet<ColumnId>,
430        filters: Arc<Vec<SimpleFilterEvaluator>>,
431    ) -> ReadPartitionContext {
432        let need_prune_key = Self::need_prune_key(&metadata, &filters);
433        ReadPartitionContext {
434            metadata,
435            row_codec,
436            projection,
437            filters,
438            pk_weights: Vec::new(),
439            need_prune_key,
440            metrics: Default::default(),
441        }
442    }
443
444    /// Does filter contain predicate on primary key columns after pruning the
445    /// partition column.
446    fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
447        for filter in filters {
448            // We already pruned partitions before so we skip the partition column.
449            if Partition::is_partition_column(filter.column_name()) {
450                continue;
451            }
452            let Some(column) = metadata.column_by_name(filter.column_name()) else {
453                continue;
454            };
455            if column.semantic_type != SemanticType::Tag {
456                continue;
457            }
458
459            return true;
460        }
461
462        false
463    }
464}
465
466// TODO(yingwen): Pushdown projection to shard readers.
467/// Converts a [DataBatch] to a [Batch].
468fn data_batch_to_batch(
469    metadata: &RegionMetadataRef,
470    projection: &HashSet<ColumnId>,
471    key: Option<&[u8]>,
472    data_batch: DataBatch,
473) -> Result<Batch> {
474    let record_batch = data_batch.slice_record_batch();
475    let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
476    let mut builder = BatchBuilder::new(primary_key);
477    builder
478        .timestamps_array(record_batch.column(1).clone())?
479        .sequences_array(record_batch.column(2).clone())?
480        .op_types_array(record_batch.column(3).clone())?;
481
482    if record_batch.num_columns() <= 4 {
483        // No fields.
484        return builder.build();
485    }
486
487    // Iterate all field columns.
488    for (array, field) in record_batch
489        .columns()
490        .iter()
491        .zip(record_batch.schema().fields().iter())
492        .skip(4)
493    {
494        // TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch.
495        // Safety: metadata should contain all fields.
496        let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
497        if !projection.contains(&column_id) {
498            continue;
499        }
500        builder.push_field_array(column_id, array.clone())?;
501    }
502
503    builder.build()
504}
505
506/// Inner struct of the partition.
507///
508/// A key only exists in one shard.
509struct Inner {
510    metadata: RegionMetadataRef,
511    /// Map to index pk to pk id.
512    pk_to_pk_id: HashMap<Vec<u8>, PkId>,
513    /// Shard whose dictionary is active.
514    shard_builder: ShardBuilder,
515    /// Shards with frozen dictionary.
516    shards: Vec<Shard>,
517    num_rows: usize,
518    frozen: bool,
519}
520
521impl Inner {
522    fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
523        let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
524            let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
525            (
526                vec![Shard::new(
527                    0,
528                    None,
529                    data_parts,
530                    config.dedup,
531                    config.data_freeze_threshold,
532                )],
533                1,
534            )
535        } else {
536            (Vec::new(), 0)
537        };
538        let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
539        Self {
540            metadata,
541            pk_to_pk_id: HashMap::new(),
542            shard_builder,
543            shards,
544            num_rows: 0,
545            frozen: false,
546        }
547    }
548
549    fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
550        assert!(!self.frozen);
551        self.pk_to_pk_id.get(primary_key).copied()
552    }
553
554    fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
555        if pk_id.shard_id == self.shard_builder.current_shard_id() {
556            self.shard_builder.write_with_pk_id(pk_id, key_value);
557            return Ok(());
558        }
559
560        // Safety: We find the shard by shard id.
561        let shard = self
562            .shards
563            .iter_mut()
564            .find(|shard| shard.shard_id == pk_id.shard_id)
565            .unwrap();
566        shard.write_with_pk_id(pk_id, key_value)?;
567        self.num_rows += 1;
568
569        Ok(())
570    }
571
572    fn freeze_active_shard(&mut self) -> Result<()> {
573        if let Some(shard) = self
574            .shard_builder
575            .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
576        {
577            self.shards.push(shard);
578        }
579        Ok(())
580    }
581}