Skip to main content

mito2/memtable/partition_tree/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partition of a partition tree.
16//!
17//! We only support partitioning the tree by pre-defined internal columns.
18
19use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use mito_codec::key_values::KeyValue;
26use mito_codec::primary_key_filter::is_partition_column;
27use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
28use snafu::ResultExt;
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
32use store_api::storage::ColumnId;
33
34use crate::error::{EncodeSnafu, Result};
35use crate::memtable::partition_tree::data::{DATA_INIT_CAP, DataBatch, DataParts};
36use crate::memtable::partition_tree::dedup::DedupReader;
37use crate::memtable::partition_tree::shard::{
38    BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
39};
40use crate::memtable::partition_tree::shard_builder::ShardBuilder;
41use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
42use crate::memtable::stats::WriteMetrics;
43use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
44use crate::read::{Batch, BatchBuilder};
45
46/// Key of a partition.
47pub type PartitionKey = u32;
48
49/// A tree partition.
50pub struct Partition {
51    inner: RwLock<Inner>,
52    /// Whether to dedup batches.
53    dedup: bool,
54}
55
56pub type PartitionRef = Arc<Partition>;
57
58impl Partition {
59    /// Creates a new partition.
60    pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
61        Partition {
62            inner: RwLock::new(Inner::new(metadata, config)),
63            dedup: config.dedup,
64        }
65    }
66
67    /// Writes to the partition with a primary key.
68    pub fn write_with_key(
69        &self,
70        primary_key: &mut Vec<u8>,
71        row_codec: &dyn PrimaryKeyCodec,
72        key_value: KeyValue,
73        re_encode: bool,
74        metrics: &mut WriteMetrics,
75    ) -> Result<()> {
76        let mut inner = self.inner.write().unwrap();
77        // Freeze the shard builder if needed.
78        if inner.shard_builder.should_freeze() {
79            inner.freeze_active_shard()?;
80        }
81
82        // Finds key in shards, now we ensure one key only exists in one shard.
83        if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
84            inner.write_to_shard(pk_id, &key_value)?;
85            inner.num_rows += 1;
86            return Ok(());
87        }
88
89        // Key does not yet exist in shard or builder, encode and insert the full primary key.
90        if re_encode {
91            match row_codec.encoding() {
92                PrimaryKeyEncoding::Dense => {
93                    // `primary_key` is sparse, re-encode the full primary key.
94                    let sparse_key = primary_key.clone();
95                    primary_key.clear();
96                    row_codec
97                        .encode_key_value(&key_value, primary_key)
98                        .context(EncodeSnafu)?;
99                    let pk_id = inner.shard_builder.write_with_key(
100                        primary_key,
101                        Some(&sparse_key),
102                        &key_value,
103                        metrics,
104                    );
105                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
106                }
107                PrimaryKeyEncoding::Sparse => {
108                    let sparse_key = primary_key.clone();
109                    let pk_id = inner.shard_builder.write_with_key(
110                        primary_key,
111                        Some(&sparse_key),
112                        &key_value,
113                        metrics,
114                    );
115                    inner.pk_to_pk_id.insert(sparse_key, pk_id);
116                }
117            }
118        } else {
119            // `primary_key` is already the full primary key.
120            let pk_id = inner
121                .shard_builder
122                .write_with_key(primary_key, None, &key_value, metrics);
123            inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
124        };
125
126        inner.num_rows += 1;
127        Ok(())
128    }
129
130    /// Writes to the partition without a primary key.
131    pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
132        let mut inner = self.inner.write().unwrap();
133        // If no primary key, always write to the first shard.
134        debug_assert!(!inner.shards.is_empty());
135        debug_assert_eq!(1, inner.shard_builder.current_shard_id());
136
137        // A dummy pk id.
138        let pk_id = PkId {
139            shard_id: 0,
140            pk_index: 0,
141        };
142        inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
143        inner.num_rows += 1;
144
145        Ok(())
146    }
147
148    fn build_primary_key_filter(
149        need_prune_key: bool,
150        metadata: &RegionMetadataRef,
151        row_codec: &dyn PrimaryKeyCodec,
152        filters: &Arc<Vec<SimpleFilterEvaluator>>,
153    ) -> Option<Box<dyn PrimaryKeyFilter>> {
154        if need_prune_key {
155            // TODO(yingwen): Remove `skip_partition_column` after dropping PartitionTreeMemtable.
156            let filter = row_codec.primary_key_filter(metadata, filters.clone(), true);
157            Some(filter)
158        } else {
159            None
160        }
161    }
162
163    /// Scans data in the partition.
164    pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
165        let start = Instant::now();
166        let (builder_source, shard_reader_builders) = {
167            let inner = self.inner.read().unwrap();
168            let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
169            let builder_reader = if !inner.shard_builder.is_empty() {
170                let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
171                Some(builder_reader)
172            } else {
173                None
174            };
175            for shard in &inner.shards {
176                if !shard.is_empty() {
177                    let shard_reader_builder = shard.read()?;
178                    shard_source.push(shard_reader_builder);
179                }
180            }
181            (builder_reader, shard_source)
182        };
183
184        context.metrics.num_shards += shard_reader_builders.len();
185
186        let mut nodes = shard_reader_builders
187            .into_iter()
188            .map(|builder| {
189                let primary_key_filter = Self::build_primary_key_filter(
190                    context.need_prune_key,
191                    &context.metadata,
192                    context.row_codec.as_ref(),
193                    &context.filters,
194                );
195                Ok(ShardNode::new(ShardSource::Shard(
196                    builder.build(primary_key_filter)?,
197                )))
198            })
199            .collect::<Result<Vec<_>>>()?;
200
201        if let Some(builder) = builder_source {
202            context.metrics.num_builder += 1;
203            let primary_key_filter = Self::build_primary_key_filter(
204                context.need_prune_key,
205                &context.metadata,
206                context.row_codec.as_ref(),
207                &context.filters,
208            );
209            // Move the initialization of ShardBuilderReader out of read lock.
210            let shard_builder_reader =
211                builder.build(Some(&context.pk_weights), primary_key_filter)?;
212            nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
213        }
214
215        // Creating a shard merger will invoke next so we do it outside the lock.
216        let merger = ShardMerger::try_new(nodes)?;
217        if self.dedup {
218            let source = DedupReader::try_new(merger)?;
219            context.metrics.build_partition_reader += start.elapsed();
220            PartitionReader::new(context, Box::new(source))
221        } else {
222            context.metrics.build_partition_reader += start.elapsed();
223            PartitionReader::new(context, Box::new(merger))
224        }
225    }
226
227    /// Freezes the partition.
228    pub fn freeze(&self) -> Result<()> {
229        let mut inner = self.inner.write().unwrap();
230        inner.freeze_active_shard()?;
231        Ok(())
232    }
233
234    /// Forks the partition.
235    ///
236    /// Must freeze the partition before fork.
237    pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
238        let (shards, shard_builder) = {
239            let inner = self.inner.read().unwrap();
240            debug_assert!(inner.shard_builder.is_empty());
241            let shard_builder = ShardBuilder::new(
242                metadata.clone(),
243                config,
244                inner.shard_builder.current_shard_id(),
245            );
246            let shards = inner
247                .shards
248                .iter()
249                .map(|shard| shard.fork(metadata.clone()))
250                .collect();
251
252            (shards, shard_builder)
253        };
254        let pk_to_pk_id = {
255            let mut inner = self.inner.write().unwrap();
256            std::mem::take(&mut inner.pk_to_pk_id)
257        };
258
259        Partition {
260            inner: RwLock::new(Inner {
261                metadata: metadata.clone(),
262                shard_builder,
263                shards,
264                num_rows: 0,
265                pk_to_pk_id,
266                frozen: false,
267            }),
268            dedup: self.dedup,
269        }
270    }
271
272    /// Returns true if the partition has data.
273    pub fn has_data(&self) -> bool {
274        let inner = self.inner.read().unwrap();
275        inner.num_rows > 0
276    }
277
278    /// Gets the stats of the partition.
279    pub(crate) fn stats(&self) -> PartitionStats {
280        let inner = self.inner.read().unwrap();
281        let num_rows = inner.num_rows;
282        let shard_num = inner.shards.len();
283        let shared_memory_size = inner
284            .shards
285            .iter()
286            .map(|shard| shard.shared_memory_size())
287            .sum();
288        PartitionStats {
289            num_rows,
290            shard_num,
291            shared_memory_size,
292        }
293    }
294
295    /// Get partition key from the key value.
296    pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
297        if !is_partitioned {
298            return PartitionKey::default();
299        }
300
301        key_value.partition_key()
302    }
303
304    /// Returns true if the region can be partitioned.
305    pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
306        metadata
307            .primary_key_columns()
308            .next()
309            .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
310            .unwrap_or(false)
311    }
312
313    pub(crate) fn series_count(&self) -> usize {
314        self.inner.read().unwrap().series_count()
315    }
316}
317
318pub(crate) struct PartitionStats {
319    pub(crate) num_rows: usize,
320    pub(crate) shard_num: usize,
321    pub(crate) shared_memory_size: usize,
322}
323
324#[derive(Default)]
325struct PartitionReaderMetrics {
326    build_partition_reader: Duration,
327    read_source: Duration,
328    data_batch_to_batch: Duration,
329    num_builder: usize,
330    num_shards: usize,
331}
332
333/// Reader to scan rows in a partition.
334///
335/// It can merge rows from multiple shards.
336pub struct PartitionReader {
337    context: ReadPartitionContext,
338    source: BoxedDataBatchSource,
339}
340
341impl PartitionReader {
342    fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
343        let reader = Self { context, source };
344
345        Ok(reader)
346    }
347
348    /// Returns true if the reader is valid.
349    pub fn is_valid(&self) -> bool {
350        self.source.is_valid()
351    }
352
353    /// Advances the reader.
354    ///
355    /// # Panics
356    /// Panics if the reader is invalid.
357    pub fn next(&mut self) -> Result<()> {
358        self.advance_source()
359    }
360
361    /// Converts current data batch into a [Batch].
362    ///
363    /// # Panics
364    /// Panics if the reader is invalid.
365    pub fn convert_current_batch(&mut self) -> Result<Batch> {
366        let start = Instant::now();
367        let data_batch = self.source.current_data_batch();
368        let batch = data_batch_to_batch(
369            &self.context.metadata,
370            &self.context.projection,
371            self.source.current_key(),
372            data_batch,
373        )?;
374        self.context.metrics.data_batch_to_batch += start.elapsed();
375        Ok(batch)
376    }
377
378    pub(crate) fn into_context(self) -> ReadPartitionContext {
379        self.context
380    }
381
382    fn advance_source(&mut self) -> Result<()> {
383        let read_source = Instant::now();
384        self.source.next()?;
385        self.context.metrics.read_source += read_source.elapsed();
386        Ok(())
387    }
388}
389
390/// Structs to reuse across readers to avoid allocating for each reader.
391pub(crate) struct ReadPartitionContext {
392    metadata: RegionMetadataRef,
393    row_codec: Arc<dyn PrimaryKeyCodec>,
394    projection: HashSet<ColumnId>,
395    filters: Arc<Vec<SimpleFilterEvaluator>>,
396    /// Buffer to store pk weights.
397    pk_weights: Vec<u16>,
398    need_prune_key: bool,
399    metrics: PartitionReaderMetrics,
400}
401
402impl Drop for ReadPartitionContext {
403    fn drop(&mut self) {
404        let partition_read_source = self.metrics.read_source.as_secs_f64();
405        PARTITION_TREE_READ_STAGE_ELAPSED
406            .with_label_values(&["partition_read_source"])
407            .observe(partition_read_source);
408        let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
409        PARTITION_TREE_READ_STAGE_ELAPSED
410            .with_label_values(&["partition_data_batch_to_batch"])
411            .observe(partition_data_batch_to_batch);
412
413        common_telemetry::debug!(
414            "TreeIter partitions metrics, \
415            num_builder: {}, \
416            num_shards: {}, \
417            build_partition_reader: {}s, \
418            partition_read_source: {}s, \
419            partition_data_batch_to_batch: {}s",
420            self.metrics.num_builder,
421            self.metrics.num_shards,
422            self.metrics.build_partition_reader.as_secs_f64(),
423            partition_read_source,
424            partition_data_batch_to_batch,
425        );
426    }
427}
428
429impl ReadPartitionContext {
430    pub(crate) fn new(
431        metadata: RegionMetadataRef,
432        row_codec: Arc<dyn PrimaryKeyCodec>,
433        projection: HashSet<ColumnId>,
434        filters: Arc<Vec<SimpleFilterEvaluator>>,
435    ) -> ReadPartitionContext {
436        let need_prune_key = Self::need_prune_key(&metadata, &filters);
437        ReadPartitionContext {
438            metadata,
439            row_codec,
440            projection,
441            filters,
442            pk_weights: Vec::new(),
443            need_prune_key,
444            metrics: Default::default(),
445        }
446    }
447
448    /// Does filter contain predicate on primary key columns after pruning the
449    /// partition column.
450    fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
451        for filter in filters {
452            // We already pruned partitions before so we skip the partition column.
453            if is_partition_column(filter.column_name()) {
454                continue;
455            }
456            let Some(column) = metadata.column_by_name(filter.column_name()) else {
457                continue;
458            };
459            if column.semantic_type != SemanticType::Tag {
460                continue;
461            }
462
463            return true;
464        }
465
466        false
467    }
468}
469
470// TODO(yingwen): Pushdown projection to shard readers.
471/// Converts a [DataBatch] to a [Batch].
472fn data_batch_to_batch(
473    metadata: &RegionMetadataRef,
474    projection: &HashSet<ColumnId>,
475    key: Option<&[u8]>,
476    data_batch: DataBatch,
477) -> Result<Batch> {
478    let record_batch = data_batch.slice_record_batch();
479    let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
480    let mut builder = BatchBuilder::new(primary_key);
481    builder
482        .timestamps_array(record_batch.column(1).clone())?
483        .sequences_array(record_batch.column(2).clone())?
484        .op_types_array(record_batch.column(3).clone())?;
485
486    if record_batch.num_columns() <= 4 {
487        // No fields.
488        return builder.build();
489    }
490
491    // Iterate all field columns.
492    for (array, field) in record_batch
493        .columns()
494        .iter()
495        .zip(record_batch.schema().fields().iter())
496        .skip(4)
497    {
498        // TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch.
499        // Safety: metadata should contain all fields.
500        let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
501        if !projection.contains(&column_id) {
502            continue;
503        }
504        builder.push_field_array(column_id, array.clone())?;
505    }
506
507    builder.build()
508}
509
510/// Inner struct of the partition.
511///
512/// A key only exists in one shard.
513struct Inner {
514    metadata: RegionMetadataRef,
515    /// Map to index pk to pk id.
516    pk_to_pk_id: HashMap<Vec<u8>, PkId>,
517    /// Shard whose dictionary is active.
518    shard_builder: ShardBuilder,
519    /// Shards with frozen dictionary.
520    shards: Vec<Shard>,
521    num_rows: usize,
522    frozen: bool,
523}
524
525impl Inner {
526    fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
527        let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
528            let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
529            (
530                vec![Shard::new(
531                    0,
532                    None,
533                    data_parts,
534                    config.dedup,
535                    config.data_freeze_threshold,
536                )],
537                1,
538            )
539        } else {
540            (Vec::new(), 0)
541        };
542        let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
543        Self {
544            metadata,
545            pk_to_pk_id: HashMap::new(),
546            shard_builder,
547            shards,
548            num_rows: 0,
549            frozen: false,
550        }
551    }
552
553    fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
554        assert!(!self.frozen);
555        self.pk_to_pk_id.get(primary_key).copied()
556    }
557
558    fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
559        if pk_id.shard_id == self.shard_builder.current_shard_id() {
560            self.shard_builder.write_with_pk_id(pk_id, key_value);
561            return Ok(());
562        }
563
564        // Safety: We find the shard by shard id.
565        let shard = self
566            .shards
567            .iter_mut()
568            .find(|shard| shard.shard_id == pk_id.shard_id)
569            .unwrap();
570        shard.write_with_pk_id(pk_id, key_value)?;
571        self.num_rows += 1;
572
573        Ok(())
574    }
575
576    fn freeze_active_shard(&mut self) -> Result<()> {
577        if let Some(shard) = self
578            .shard_builder
579            .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
580        {
581            self.shards.push(shard);
582        }
583        Ok(())
584    }
585
586    /// Returns count of timeseries.
587    fn series_count(&self) -> usize {
588        self.pk_to_pk_id.len()
589    }
590}