mito2/memtable/partition_tree/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partition of a partition tree.
16//!
17//! We only support partitioning the tree by pre-defined internal columns.
18
19use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use mito_codec::key_values::KeyValue;
26use mito_codec::primary_key_filter::is_partition_column;
27use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
28use snafu::ResultExt;
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
32use store_api::storage::ColumnId;
33
34use crate::error::{EncodeSnafu, Result};
35use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
36use crate::memtable::partition_tree::dedup::DedupReader;
37use crate::memtable::partition_tree::shard::{
38    BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
39};
40use crate::memtable::partition_tree::shard_builder::ShardBuilder;
41use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
42use crate::memtable::stats::WriteMetrics;
43use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
44use crate::read::{Batch, BatchBuilder};
45
46/// Key of a partition.
47pub type PartitionKey = u32;
48
49/// A tree partition.
50pub struct Partition {
51    inner: RwLock<Inner>,
52    /// Whether to dedup batches.
53    dedup: bool,
54}
55
56pub type PartitionRef = Arc<Partition>;
57
58impl Partition {
59    /// Creates a new partition.
60    pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
61        Partition {
62            inner: RwLock::new(Inner::new(metadata, config)),
63            dedup: config.dedup,
64        }
65    }
66
67    /// Writes to the partition with a primary key.
68    pub fn write_with_key(
69        &self,
70        primary_key: &mut Vec<u8>,
71        row_codec: &dyn PrimaryKeyCodec,
72        key_value: KeyValue,
73        re_encode: bool,
74        metrics: &mut WriteMetrics,
75    ) -> Result<()> {
76        let mut inner = self.inner.write().unwrap();
77        // Freeze the shard builder if needed.
78        if inner.shard_builder.should_freeze() {
79            inner.freeze_active_shard()?;
80        }
81
82        // Finds key in shards, now we ensure one key only exists in one shard.
83        if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
84            inner.write_to_shard(pk_id, &key_value)?;
85            inner.num_rows += 1;
86            return Ok(());
87        }
88
89        // Key does not yet exist in shard or builder, encode and insert the full primary key.
90        if re_encode {
91            match row_codec.encoding() {
92                PrimaryKeyEncoding::Dense => {
93                    // `primary_key` is sparse, re-encode the full primary key.
94                    let sparse_key = primary_key.clone();
95                    primary_key.clear();
96                    row_codec
97                        .encode_key_value(&key_value, primary_key)
98                        .context(EncodeSnafu)?;
99                    let pk_id = inner.shard_builder.write_with_key(
100                        primary_key,
101                        Some(&sparse_key),
102                        &key_value,
103                        metrics,
104                    );
105                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
106                }
107                PrimaryKeyEncoding::Sparse => {
108                    let sparse_key = primary_key.clone();
109                    let pk_id = inner.shard_builder.write_with_key(
110                        primary_key,
111                        Some(&sparse_key),
112                        &key_value,
113                        metrics,
114                    );
115                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
116                }
117            }
118        } else {
119            // `primary_key` is already the full primary key.
120            let pk_id = inner
121                .shard_builder
122                .write_with_key(primary_key, None, &key_value, metrics);
123            inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
124        };
125
126        inner.num_rows += 1;
127        Ok(())
128    }
129
130    /// Writes to the partition without a primary key.
131    pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
132        let mut inner = self.inner.write().unwrap();
133        // If no primary key, always write to the first shard.
134        debug_assert!(!inner.shards.is_empty());
135        debug_assert_eq!(1, inner.shard_builder.current_shard_id());
136
137        // A dummy pk id.
138        let pk_id = PkId {
139            shard_id: 0,
140            pk_index: 0,
141        };
142        inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
143        inner.num_rows += 1;
144
145        Ok(())
146    }
147
148    fn build_primary_key_filter(
149        need_prune_key: bool,
150        metadata: &RegionMetadataRef,
151        row_codec: &dyn PrimaryKeyCodec,
152        filters: &Arc<Vec<SimpleFilterEvaluator>>,
153    ) -> Option<Box<dyn PrimaryKeyFilter>> {
154        if need_prune_key {
155            let filter = row_codec.primary_key_filter(metadata, filters.clone());
156            Some(filter)
157        } else {
158            None
159        }
160    }
161
162    /// Scans data in the partition.
163    pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
164        let start = Instant::now();
165        let (builder_source, shard_reader_builders) = {
166            let inner = self.inner.read().unwrap();
167            let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
168            let builder_reader = if !inner.shard_builder.is_empty() {
169                let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
170                Some(builder_reader)
171            } else {
172                None
173            };
174            for shard in &inner.shards {
175                if !shard.is_empty() {
176                    let shard_reader_builder = shard.read()?;
177                    shard_source.push(shard_reader_builder);
178                }
179            }
180            (builder_reader, shard_source)
181        };
182
183        context.metrics.num_shards += shard_reader_builders.len();
184
185        let mut nodes = shard_reader_builders
186            .into_iter()
187            .map(|builder| {
188                let primary_key_filter = Self::build_primary_key_filter(
189                    context.need_prune_key,
190                    &context.metadata,
191                    context.row_codec.as_ref(),
192                    &context.filters,
193                );
194                Ok(ShardNode::new(ShardSource::Shard(
195                    builder.build(primary_key_filter)?,
196                )))
197            })
198            .collect::<Result<Vec<_>>>()?;
199
200        if let Some(builder) = builder_source {
201            context.metrics.num_builder += 1;
202            let primary_key_filter = Self::build_primary_key_filter(
203                context.need_prune_key,
204                &context.metadata,
205                context.row_codec.as_ref(),
206                &context.filters,
207            );
208            // Move the initialization of ShardBuilderReader out of read lock.
209            let shard_builder_reader =
210                builder.build(Some(&context.pk_weights), primary_key_filter)?;
211            nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
212        }
213
214        // Creating a shard merger will invoke next so we do it outside the lock.
215        let merger = ShardMerger::try_new(nodes)?;
216        if self.dedup {
217            let source = DedupReader::try_new(merger)?;
218            context.metrics.build_partition_reader += start.elapsed();
219            PartitionReader::new(context, Box::new(source))
220        } else {
221            context.metrics.build_partition_reader += start.elapsed();
222            PartitionReader::new(context, Box::new(merger))
223        }
224    }
225
226    /// Freezes the partition.
227    pub fn freeze(&self) -> Result<()> {
228        let mut inner = self.inner.write().unwrap();
229        inner.freeze_active_shard()?;
230        Ok(())
231    }
232
233    /// Forks the partition.
234    ///
235    /// Must freeze the partition before fork.
236    pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
237        let (shards, shard_builder) = {
238            let inner = self.inner.read().unwrap();
239            debug_assert!(inner.shard_builder.is_empty());
240            let shard_builder = ShardBuilder::new(
241                metadata.clone(),
242                config,
243                inner.shard_builder.current_shard_id(),
244            );
245            let shards = inner
246                .shards
247                .iter()
248                .map(|shard| shard.fork(metadata.clone()))
249                .collect();
250
251            (shards, shard_builder)
252        };
253        let pk_to_pk_id = {
254            let mut inner = self.inner.write().unwrap();
255            std::mem::take(&mut inner.pk_to_pk_id)
256        };
257
258        Partition {
259            inner: RwLock::new(Inner {
260                metadata: metadata.clone(),
261                shard_builder,
262                shards,
263                num_rows: 0,
264                pk_to_pk_id,
265                frozen: false,
266            }),
267            dedup: self.dedup,
268        }
269    }
270
271    /// Returns true if the partition has data.
272    pub fn has_data(&self) -> bool {
273        let inner = self.inner.read().unwrap();
274        inner.num_rows > 0
275    }
276
277    /// Gets the stats of the partition.
278    pub(crate) fn stats(&self) -> PartitionStats {
279        let inner = self.inner.read().unwrap();
280        let num_rows = inner.num_rows;
281        let shard_num = inner.shards.len();
282        let shared_memory_size = inner
283            .shards
284            .iter()
285            .map(|shard| shard.shared_memory_size())
286            .sum();
287        PartitionStats {
288            num_rows,
289            shard_num,
290            shared_memory_size,
291        }
292    }
293
294    /// Get partition key from the key value.
295    pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
296        if !is_partitioned {
297            return PartitionKey::default();
298        }
299
300        key_value.partition_key()
301    }
302
303    /// Returns true if the region can be partitioned.
304    pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
305        metadata
306            .primary_key_columns()
307            .next()
308            .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
309            .unwrap_or(false)
310    }
311
312    pub(crate) fn series_count(&self) -> usize {
313        self.inner.read().unwrap().series_count()
314    }
315}
316
317pub(crate) struct PartitionStats {
318    pub(crate) num_rows: usize,
319    pub(crate) shard_num: usize,
320    pub(crate) shared_memory_size: usize,
321}
322
323#[derive(Default)]
324struct PartitionReaderMetrics {
325    build_partition_reader: Duration,
326    read_source: Duration,
327    data_batch_to_batch: Duration,
328    num_builder: usize,
329    num_shards: usize,
330}
331
332/// Reader to scan rows in a partition.
333///
334/// It can merge rows from multiple shards.
335pub struct PartitionReader {
336    context: ReadPartitionContext,
337    source: BoxedDataBatchSource,
338}
339
340impl PartitionReader {
341    fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
342        let reader = Self { context, source };
343
344        Ok(reader)
345    }
346
347    /// Returns true if the reader is valid.
348    pub fn is_valid(&self) -> bool {
349        self.source.is_valid()
350    }
351
352    /// Advances the reader.
353    ///
354    /// # Panics
355    /// Panics if the reader is invalid.
356    pub fn next(&mut self) -> Result<()> {
357        self.advance_source()
358    }
359
360    /// Converts current data batch into a [Batch].
361    ///
362    /// # Panics
363    /// Panics if the reader is invalid.
364    pub fn convert_current_batch(&mut self) -> Result<Batch> {
365        let start = Instant::now();
366        let data_batch = self.source.current_data_batch();
367        let batch = data_batch_to_batch(
368            &self.context.metadata,
369            &self.context.projection,
370            self.source.current_key(),
371            data_batch,
372        )?;
373        self.context.metrics.data_batch_to_batch += start.elapsed();
374        Ok(batch)
375    }
376
377    pub(crate) fn into_context(self) -> ReadPartitionContext {
378        self.context
379    }
380
381    fn advance_source(&mut self) -> Result<()> {
382        let read_source = Instant::now();
383        self.source.next()?;
384        self.context.metrics.read_source += read_source.elapsed();
385        Ok(())
386    }
387}
388
389/// Structs to reuse across readers to avoid allocating for each reader.
390pub(crate) struct ReadPartitionContext {
391    metadata: RegionMetadataRef,
392    row_codec: Arc<dyn PrimaryKeyCodec>,
393    projection: HashSet<ColumnId>,
394    filters: Arc<Vec<SimpleFilterEvaluator>>,
395    /// Buffer to store pk weights.
396    pk_weights: Vec<u16>,
397    need_prune_key: bool,
398    metrics: PartitionReaderMetrics,
399}
400
401impl Drop for ReadPartitionContext {
402    fn drop(&mut self) {
403        let partition_read_source = self.metrics.read_source.as_secs_f64();
404        PARTITION_TREE_READ_STAGE_ELAPSED
405            .with_label_values(&["partition_read_source"])
406            .observe(partition_read_source);
407        let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
408        PARTITION_TREE_READ_STAGE_ELAPSED
409            .with_label_values(&["partition_data_batch_to_batch"])
410            .observe(partition_data_batch_to_batch);
411
412        common_telemetry::debug!(
413            "TreeIter partitions metrics, \
414            num_builder: {}, \
415            num_shards: {}, \
416            build_partition_reader: {}s, \
417            partition_read_source: {}s, \
418            partition_data_batch_to_batch: {}s",
419            self.metrics.num_builder,
420            self.metrics.num_shards,
421            self.metrics.build_partition_reader.as_secs_f64(),
422            partition_read_source,
423            partition_data_batch_to_batch,
424        );
425    }
426}
427
428impl ReadPartitionContext {
429    pub(crate) fn new(
430        metadata: RegionMetadataRef,
431        row_codec: Arc<dyn PrimaryKeyCodec>,
432        projection: HashSet<ColumnId>,
433        filters: Arc<Vec<SimpleFilterEvaluator>>,
434    ) -> ReadPartitionContext {
435        let need_prune_key = Self::need_prune_key(&metadata, &filters);
436        ReadPartitionContext {
437            metadata,
438            row_codec,
439            projection,
440            filters,
441            pk_weights: Vec::new(),
442            need_prune_key,
443            metrics: Default::default(),
444        }
445    }
446
447    /// Does filter contain predicate on primary key columns after pruning the
448    /// partition column.
449    fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
450        for filter in filters {
451            // We already pruned partitions before so we skip the partition column.
452            if is_partition_column(filter.column_name()) {
453                continue;
454            }
455            let Some(column) = metadata.column_by_name(filter.column_name()) else {
456                continue;
457            };
458            if column.semantic_type != SemanticType::Tag {
459                continue;
460            }
461
462            return true;
463        }
464
465        false
466    }
467}
468
469// TODO(yingwen): Pushdown projection to shard readers.
470/// Converts a [DataBatch] to a [Batch].
471fn data_batch_to_batch(
472    metadata: &RegionMetadataRef,
473    projection: &HashSet<ColumnId>,
474    key: Option<&[u8]>,
475    data_batch: DataBatch,
476) -> Result<Batch> {
477    let record_batch = data_batch.slice_record_batch();
478    let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
479    let mut builder = BatchBuilder::new(primary_key);
480    builder
481        .timestamps_array(record_batch.column(1).clone())?
482        .sequences_array(record_batch.column(2).clone())?
483        .op_types_array(record_batch.column(3).clone())?;
484
485    if record_batch.num_columns() <= 4 {
486        // No fields.
487        return builder.build();
488    }
489
490    // Iterate all field columns.
491    for (array, field) in record_batch
492        .columns()
493        .iter()
494        .zip(record_batch.schema().fields().iter())
495        .skip(4)
496    {
497        // TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch.
498        // Safety: metadata should contain all fields.
499        let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
500        if !projection.contains(&column_id) {
501            continue;
502        }
503        builder.push_field_array(column_id, array.clone())?;
504    }
505
506    builder.build()
507}
508
509/// Inner struct of the partition.
510///
511/// A key only exists in one shard.
512struct Inner {
513    metadata: RegionMetadataRef,
514    /// Map to index pk to pk id.
515    pk_to_pk_id: HashMap<Vec<u8>, PkId>,
516    /// Shard whose dictionary is active.
517    shard_builder: ShardBuilder,
518    /// Shards with frozen dictionary.
519    shards: Vec<Shard>,
520    num_rows: usize,
521    frozen: bool,
522}
523
524impl Inner {
525    fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
526        let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
527            let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
528            (
529                vec![Shard::new(
530                    0,
531                    None,
532                    data_parts,
533                    config.dedup,
534                    config.data_freeze_threshold,
535                )],
536                1,
537            )
538        } else {
539            (Vec::new(), 0)
540        };
541        let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
542        Self {
543            metadata,
544            pk_to_pk_id: HashMap::new(),
545            shard_builder,
546            shards,
547            num_rows: 0,
548            frozen: false,
549        }
550    }
551
552    fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
553        assert!(!self.frozen);
554        self.pk_to_pk_id.get(primary_key).copied()
555    }
556
557    fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
558        if pk_id.shard_id == self.shard_builder.current_shard_id() {
559            self.shard_builder.write_with_pk_id(pk_id, key_value);
560            return Ok(());
561        }
562
563        // Safety: We find the shard by shard id.
564        let shard = self
565            .shards
566            .iter_mut()
567            .find(|shard| shard.shard_id == pk_id.shard_id)
568            .unwrap();
569        shard.write_with_pk_id(pk_id, key_value)?;
570        self.num_rows += 1;
571
572        Ok(())
573    }
574
575    fn freeze_active_shard(&mut self) -> Result<()> {
576        if let Some(shard) = self
577            .shard_builder
578            .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
579        {
580            self.shards.push(shard);
581        }
582        Ok(())
583    }
584
585    /// Returns count of timeseries.
586    fn series_count(&self) -> usize {
587        self.pk_to_pk_id.len()
588    }
589}