mito2/memtable/partition_tree/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partition of a partition tree.
16//!
17//! We only support partitioning the tree by pre-defined internal columns.
18
19use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use mito_codec::key_values::KeyValue;
26use mito_codec::primary_key_filter::is_partition_column;
27use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
28use snafu::ResultExt;
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
32use store_api::storage::ColumnId;
33
34use crate::error::{EncodeSnafu, Result};
35use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
36use crate::memtable::partition_tree::dedup::DedupReader;
37use crate::memtable::partition_tree::shard::{
38    BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
39};
40use crate::memtable::partition_tree::shard_builder::ShardBuilder;
41use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
42use crate::memtable::stats::WriteMetrics;
43use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
44use crate::read::{Batch, BatchBuilder};
45
46/// Key of a partition.
47pub type PartitionKey = u32;
48
49/// A tree partition.
50pub struct Partition {
51    inner: RwLock<Inner>,
52    /// Whether to dedup batches.
53    dedup: bool,
54}
55
56pub type PartitionRef = Arc<Partition>;
57
58impl Partition {
59    /// Creates a new partition.
60    pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
61        Partition {
62            inner: RwLock::new(Inner::new(metadata, config)),
63            dedup: config.dedup,
64        }
65    }
66
67    /// Writes to the partition with a primary key.
68    pub fn write_with_key(
69        &self,
70        primary_key: &mut Vec<u8>,
71        row_codec: &dyn PrimaryKeyCodec,
72        key_value: KeyValue,
73        re_encode: bool,
74        metrics: &mut WriteMetrics,
75    ) -> Result<()> {
76        let mut inner = self.inner.write().unwrap();
77        // Freeze the shard builder if needed.
78        if inner.shard_builder.should_freeze() {
79            inner.freeze_active_shard()?;
80        }
81
82        // Finds key in shards, now we ensure one key only exists in one shard.
83        if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
84            inner.write_to_shard(pk_id, &key_value)?;
85            inner.num_rows += 1;
86            return Ok(());
87        }
88
89        // Key does not yet exist in shard or builder, encode and insert the full primary key.
90        if re_encode {
91            match row_codec.encoding() {
92                PrimaryKeyEncoding::Dense => {
93                    // `primary_key` is sparse, re-encode the full primary key.
94                    let sparse_key = primary_key.clone();
95                    primary_key.clear();
96                    row_codec
97                        .encode_key_value(&key_value, primary_key)
98                        .context(EncodeSnafu)?;
99                    let pk_id = inner.shard_builder.write_with_key(
100                        primary_key,
101                        Some(&sparse_key),
102                        &key_value,
103                        metrics,
104                    );
105                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
106                }
107                PrimaryKeyEncoding::Sparse => {
108                    let sparse_key = primary_key.clone();
109                    let pk_id = inner.shard_builder.write_with_key(
110                        primary_key,
111                        Some(&sparse_key),
112                        &key_value,
113                        metrics,
114                    );
115                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
116                }
117            }
118        } else {
119            // `primary_key` is already the full primary key.
120            let pk_id = inner
121                .shard_builder
122                .write_with_key(primary_key, None, &key_value, metrics);
123            inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
124        };
125
126        inner.num_rows += 1;
127        Ok(())
128    }
129
130    /// Writes to the partition without a primary key.
131    pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
132        let mut inner = self.inner.write().unwrap();
133        // If no primary key, always write to the first shard.
134        debug_assert!(!inner.shards.is_empty());
135        debug_assert_eq!(1, inner.shard_builder.current_shard_id());
136
137        // A dummy pk id.
138        let pk_id = PkId {
139            shard_id: 0,
140            pk_index: 0,
141        };
142        inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
143        inner.num_rows += 1;
144
145        Ok(())
146    }
147
148    fn build_primary_key_filter(
149        need_prune_key: bool,
150        metadata: &RegionMetadataRef,
151        row_codec: &dyn PrimaryKeyCodec,
152        filters: &Arc<Vec<SimpleFilterEvaluator>>,
153    ) -> Option<Box<dyn PrimaryKeyFilter>> {
154        if need_prune_key {
155            let filter = row_codec.primary_key_filter(metadata, filters.clone());
156            Some(filter)
157        } else {
158            None
159        }
160    }
161
162    /// Scans data in the partition.
163    pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
164        let start = Instant::now();
165        let (builder_source, shard_reader_builders) = {
166            let inner = self.inner.read().unwrap();
167            let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
168            let builder_reader = if !inner.shard_builder.is_empty() {
169                let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
170                Some(builder_reader)
171            } else {
172                None
173            };
174            for shard in &inner.shards {
175                if !shard.is_empty() {
176                    let shard_reader_builder = shard.read()?;
177                    shard_source.push(shard_reader_builder);
178                }
179            }
180            (builder_reader, shard_source)
181        };
182
183        context.metrics.num_shards += shard_reader_builders.len();
184
185        let mut nodes = shard_reader_builders
186            .into_iter()
187            .map(|builder| {
188                let primary_key_filter = Self::build_primary_key_filter(
189                    context.need_prune_key,
190                    &context.metadata,
191                    context.row_codec.as_ref(),
192                    &context.filters,
193                );
194                Ok(ShardNode::new(ShardSource::Shard(
195                    builder.build(primary_key_filter)?,
196                )))
197            })
198            .collect::<Result<Vec<_>>>()?;
199
200        if let Some(builder) = builder_source {
201            context.metrics.num_builder += 1;
202            let primary_key_filter = Self::build_primary_key_filter(
203                context.need_prune_key,
204                &context.metadata,
205                context.row_codec.as_ref(),
206                &context.filters,
207            );
208            // Move the initialization of ShardBuilderReader out of read lock.
209            let shard_builder_reader =
210                builder.build(Some(&context.pk_weights), primary_key_filter)?;
211            nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
212        }
213
214        // Creating a shard merger will invoke next so we do it outside the lock.
215        let merger = ShardMerger::try_new(nodes)?;
216        if self.dedup {
217            let source = DedupReader::try_new(merger)?;
218            context.metrics.build_partition_reader += start.elapsed();
219            PartitionReader::new(context, Box::new(source))
220        } else {
221            context.metrics.build_partition_reader += start.elapsed();
222            PartitionReader::new(context, Box::new(merger))
223        }
224    }
225
226    /// Freezes the partition.
227    pub fn freeze(&self) -> Result<()> {
228        let mut inner = self.inner.write().unwrap();
229        inner.freeze_active_shard()?;
230        Ok(())
231    }
232
233    /// Forks the partition.
234    ///
235    /// Must freeze the partition before fork.
236    pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
237        let (shards, shard_builder) = {
238            let inner = self.inner.read().unwrap();
239            debug_assert!(inner.shard_builder.is_empty());
240            let shard_builder = ShardBuilder::new(
241                metadata.clone(),
242                config,
243                inner.shard_builder.current_shard_id(),
244            );
245            let shards = inner
246                .shards
247                .iter()
248                .map(|shard| shard.fork(metadata.clone()))
249                .collect();
250
251            (shards, shard_builder)
252        };
253        let pk_to_pk_id = {
254            let mut inner = self.inner.write().unwrap();
255            std::mem::take(&mut inner.pk_to_pk_id)
256        };
257
258        Partition {
259            inner: RwLock::new(Inner {
260                metadata: metadata.clone(),
261                shard_builder,
262                shards,
263                num_rows: 0,
264                pk_to_pk_id,
265                frozen: false,
266            }),
267            dedup: self.dedup,
268        }
269    }
270
271    /// Returns true if the partition has data.
272    pub fn has_data(&self) -> bool {
273        let inner = self.inner.read().unwrap();
274        inner.num_rows > 0
275    }
276
277    /// Gets the stats of the partition.
278    pub(crate) fn stats(&self) -> PartitionStats {
279        let inner = self.inner.read().unwrap();
280        let num_rows = inner.num_rows;
281        let shard_num = inner.shards.len();
282        let shared_memory_size = inner
283            .shards
284            .iter()
285            .map(|shard| shard.shared_memory_size())
286            .sum();
287        PartitionStats {
288            num_rows,
289            shard_num,
290            shared_memory_size,
291        }
292    }
293
294    /// Get partition key from the key value.
295    pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
296        if !is_partitioned {
297            return PartitionKey::default();
298        }
299
300        key_value.partition_key()
301    }
302
303    /// Returns true if the region can be partitioned.
304    pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
305        metadata
306            .primary_key_columns()
307            .next()
308            .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
309            .unwrap_or(false)
310    }
311}
312
313pub(crate) struct PartitionStats {
314    pub(crate) num_rows: usize,
315    pub(crate) shard_num: usize,
316    pub(crate) shared_memory_size: usize,
317}
318
319#[derive(Default)]
320struct PartitionReaderMetrics {
321    build_partition_reader: Duration,
322    read_source: Duration,
323    data_batch_to_batch: Duration,
324    num_builder: usize,
325    num_shards: usize,
326}
327
328/// Reader to scan rows in a partition.
329///
330/// It can merge rows from multiple shards.
331pub struct PartitionReader {
332    context: ReadPartitionContext,
333    source: BoxedDataBatchSource,
334}
335
336impl PartitionReader {
337    fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
338        let reader = Self { context, source };
339
340        Ok(reader)
341    }
342
343    /// Returns true if the reader is valid.
344    pub fn is_valid(&self) -> bool {
345        self.source.is_valid()
346    }
347
348    /// Advances the reader.
349    ///
350    /// # Panics
351    /// Panics if the reader is invalid.
352    pub fn next(&mut self) -> Result<()> {
353        self.advance_source()
354    }
355
356    /// Converts current data batch into a [Batch].
357    ///
358    /// # Panics
359    /// Panics if the reader is invalid.
360    pub fn convert_current_batch(&mut self) -> Result<Batch> {
361        let start = Instant::now();
362        let data_batch = self.source.current_data_batch();
363        let batch = data_batch_to_batch(
364            &self.context.metadata,
365            &self.context.projection,
366            self.source.current_key(),
367            data_batch,
368        )?;
369        self.context.metrics.data_batch_to_batch += start.elapsed();
370        Ok(batch)
371    }
372
373    pub(crate) fn into_context(self) -> ReadPartitionContext {
374        self.context
375    }
376
377    fn advance_source(&mut self) -> Result<()> {
378        let read_source = Instant::now();
379        self.source.next()?;
380        self.context.metrics.read_source += read_source.elapsed();
381        Ok(())
382    }
383}
384
385/// Structs to reuse across readers to avoid allocating for each reader.
386pub(crate) struct ReadPartitionContext {
387    metadata: RegionMetadataRef,
388    row_codec: Arc<dyn PrimaryKeyCodec>,
389    projection: HashSet<ColumnId>,
390    filters: Arc<Vec<SimpleFilterEvaluator>>,
391    /// Buffer to store pk weights.
392    pk_weights: Vec<u16>,
393    need_prune_key: bool,
394    metrics: PartitionReaderMetrics,
395}
396
397impl Drop for ReadPartitionContext {
398    fn drop(&mut self) {
399        let partition_read_source = self.metrics.read_source.as_secs_f64();
400        PARTITION_TREE_READ_STAGE_ELAPSED
401            .with_label_values(&["partition_read_source"])
402            .observe(partition_read_source);
403        let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
404        PARTITION_TREE_READ_STAGE_ELAPSED
405            .with_label_values(&["partition_data_batch_to_batch"])
406            .observe(partition_data_batch_to_batch);
407
408        common_telemetry::debug!(
409            "TreeIter partitions metrics, \
410            num_builder: {}, \
411            num_shards: {}, \
412            build_partition_reader: {}s, \
413            partition_read_source: {}s, \
414            partition_data_batch_to_batch: {}s",
415            self.metrics.num_builder,
416            self.metrics.num_shards,
417            self.metrics.build_partition_reader.as_secs_f64(),
418            partition_read_source,
419            partition_data_batch_to_batch,
420        );
421    }
422}
423
424impl ReadPartitionContext {
425    pub(crate) fn new(
426        metadata: RegionMetadataRef,
427        row_codec: Arc<dyn PrimaryKeyCodec>,
428        projection: HashSet<ColumnId>,
429        filters: Arc<Vec<SimpleFilterEvaluator>>,
430    ) -> ReadPartitionContext {
431        let need_prune_key = Self::need_prune_key(&metadata, &filters);
432        ReadPartitionContext {
433            metadata,
434            row_codec,
435            projection,
436            filters,
437            pk_weights: Vec::new(),
438            need_prune_key,
439            metrics: Default::default(),
440        }
441    }
442
443    /// Does filter contain predicate on primary key columns after pruning the
444    /// partition column.
445    fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
446        for filter in filters {
447            // We already pruned partitions before so we skip the partition column.
448            if is_partition_column(filter.column_name()) {
449                continue;
450            }
451            let Some(column) = metadata.column_by_name(filter.column_name()) else {
452                continue;
453            };
454            if column.semantic_type != SemanticType::Tag {
455                continue;
456            }
457
458            return true;
459        }
460
461        false
462    }
463}
464
465// TODO(yingwen): Pushdown projection to shard readers.
466/// Converts a [DataBatch] to a [Batch].
467fn data_batch_to_batch(
468    metadata: &RegionMetadataRef,
469    projection: &HashSet<ColumnId>,
470    key: Option<&[u8]>,
471    data_batch: DataBatch,
472) -> Result<Batch> {
473    let record_batch = data_batch.slice_record_batch();
474    let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
475    let mut builder = BatchBuilder::new(primary_key);
476    builder
477        .timestamps_array(record_batch.column(1).clone())?
478        .sequences_array(record_batch.column(2).clone())?
479        .op_types_array(record_batch.column(3).clone())?;
480
481    if record_batch.num_columns() <= 4 {
482        // No fields.
483        return builder.build();
484    }
485
486    // Iterate all field columns.
487    for (array, field) in record_batch
488        .columns()
489        .iter()
490        .zip(record_batch.schema().fields().iter())
491        .skip(4)
492    {
493        // TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch.
494        // Safety: metadata should contain all fields.
495        let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
496        if !projection.contains(&column_id) {
497            continue;
498        }
499        builder.push_field_array(column_id, array.clone())?;
500    }
501
502    builder.build()
503}
504
505/// Inner struct of the partition.
506///
507/// A key only exists in one shard.
508struct Inner {
509    metadata: RegionMetadataRef,
510    /// Map to index pk to pk id.
511    pk_to_pk_id: HashMap<Vec<u8>, PkId>,
512    /// Shard whose dictionary is active.
513    shard_builder: ShardBuilder,
514    /// Shards with frozen dictionary.
515    shards: Vec<Shard>,
516    num_rows: usize,
517    frozen: bool,
518}
519
520impl Inner {
521    fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
522        let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
523            let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
524            (
525                vec![Shard::new(
526                    0,
527                    None,
528                    data_parts,
529                    config.dedup,
530                    config.data_freeze_threshold,
531                )],
532                1,
533            )
534        } else {
535            (Vec::new(), 0)
536        };
537        let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
538        Self {
539            metadata,
540            pk_to_pk_id: HashMap::new(),
541            shard_builder,
542            shards,
543            num_rows: 0,
544            frozen: false,
545        }
546    }
547
548    fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
549        assert!(!self.frozen);
550        self.pk_to_pk_id.get(primary_key).copied()
551    }
552
553    fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
554        if pk_id.shard_id == self.shard_builder.current_shard_id() {
555            self.shard_builder.write_with_pk_id(pk_id, key_value);
556            return Ok(());
557        }
558
559        // Safety: We find the shard by shard id.
560        let shard = self
561            .shards
562            .iter_mut()
563            .find(|shard| shard.shard_id == pk_id.shard_id)
564            .unwrap();
565        shard.write_with_pk_id(pk_id, key_value)?;
566        self.num_rows += 1;
567
568        Ok(())
569    }
570
571    fn freeze_active_shard(&mut self) -> Result<()> {
572        if let Some(shard) = self
573            .shard_builder
574            .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
575        {
576            self.shards.push(shard);
577        }
578        Ok(())
579    }
580}