mito2/memtable/partition_tree/
shard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shard in a partition.
16
17use std::cmp::Ordering;
18use std::time::{Duration, Instant};
19
20use store_api::metadata::RegionMetadataRef;
21
22use crate::error::Result;
23use crate::memtable::key_values::KeyValue;
24use crate::memtable::partition_tree::data::{
25    DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
26};
27use crate::memtable::partition_tree::dict::KeyDictRef;
28use crate::memtable::partition_tree::merger::{Merger, Node};
29use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
30use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
31use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
32use crate::row_converter::PrimaryKeyFilter;
33
34/// Shard stores data related to the same key dictionary.
35pub struct Shard {
36    pub(crate) shard_id: ShardId,
37    /// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key.
38    key_dict: Option<KeyDictRef>,
39    /// Data in the shard.
40    data_parts: DataParts,
41    dedup: bool,
42    /// Number of rows to freeze a data part.
43    data_freeze_threshold: usize,
44}
45
46impl Shard {
47    /// Returns a new shard.
48    pub fn new(
49        shard_id: ShardId,
50        key_dict: Option<KeyDictRef>,
51        data_parts: DataParts,
52        dedup: bool,
53        data_freeze_threshold: usize,
54    ) -> Shard {
55        Shard {
56            shard_id,
57            key_dict,
58            data_parts,
59            dedup,
60            data_freeze_threshold,
61        }
62    }
63
64    /// Writes a key value into the shard.
65    ///
66    /// It will freezes the active buffer if it is full.
67    pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
68        debug_assert_eq!(self.shard_id, pk_id.shard_id);
69
70        if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
71            self.data_parts.freeze()?;
72        }
73
74        self.data_parts.write_row(pk_id.pk_index, key_value);
75        Ok(())
76    }
77
78    /// Scans the shard.
79    // TODO(yingwen): Push down projection to data parts.
80    pub fn read(&self) -> Result<ShardReaderBuilder> {
81        let parts_reader = self.data_parts.read()?;
82
83        Ok(ShardReaderBuilder {
84            shard_id: self.shard_id,
85            key_dict: self.key_dict.clone(),
86            inner: parts_reader,
87        })
88    }
89
90    /// Forks a shard.
91    pub fn fork(&self, metadata: RegionMetadataRef) -> Shard {
92        Shard {
93            shard_id: self.shard_id,
94            key_dict: self.key_dict.clone(),
95            data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
96            dedup: self.dedup,
97            data_freeze_threshold: self.data_freeze_threshold,
98        }
99    }
100
101    /// Returns true if the shard is empty (No data).
102    pub fn is_empty(&self) -> bool {
103        self.data_parts.is_empty()
104    }
105
106    /// Returns the memory size of the shard part.
107    pub(crate) fn shared_memory_size(&self) -> usize {
108        self.key_dict
109            .as_ref()
110            .map(|dict| dict.shared_memory_size())
111            .unwrap_or(0)
112    }
113}
114
115/// Source that returns [DataBatch].
116pub trait DataBatchSource {
117    /// Returns whether current source is still valid.
118    fn is_valid(&self) -> bool;
119
120    /// Advances source to next data batch.
121    fn next(&mut self) -> Result<()>;
122
123    /// Returns current pk id.
124    /// # Panics
125    /// If source is not valid.
126    fn current_pk_id(&self) -> PkId;
127
128    /// Returns the current primary key bytes or None if it doesn't have primary key.
129    ///
130    /// # Panics
131    /// If source is not valid.
132    fn current_key(&self) -> Option<&[u8]>;
133
134    /// Returns the data part.
135    /// # Panics
136    /// If source is not valid.
137    fn current_data_batch(&self) -> DataBatch;
138}
139
140pub type BoxedDataBatchSource = Box<dyn DataBatchSource + Send>;
141
142pub struct ShardReaderBuilder {
143    shard_id: ShardId,
144    key_dict: Option<KeyDictRef>,
145    inner: DataPartsReaderBuilder,
146}
147
148impl ShardReaderBuilder {
149    pub(crate) fn build(
150        self,
151        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
152    ) -> Result<ShardReader> {
153        let ShardReaderBuilder {
154            shard_id,
155            key_dict,
156            inner,
157        } = self;
158        let now = Instant::now();
159        let parts_reader = inner.build()?;
160        ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed())
161    }
162}
163
164/// Reader to read rows in a shard.
165pub struct ShardReader {
166    shard_id: ShardId,
167    key_dict: Option<KeyDictRef>,
168    parts_reader: DataPartsReader,
169    key_filter: Option<Box<dyn PrimaryKeyFilter>>,
170    last_yield_pk_index: Option<PkIndex>,
171    keys_before_pruning: usize,
172    keys_after_pruning: usize,
173    prune_pk_cost: Duration,
174    data_build_cost: Duration,
175}
176
177impl ShardReader {
178    fn new(
179        shard_id: ShardId,
180        key_dict: Option<KeyDictRef>,
181        parts_reader: DataPartsReader,
182        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
183        data_build_cost: Duration,
184    ) -> Result<Self> {
185        let has_pk = key_dict.is_some();
186        let mut reader = Self {
187            shard_id,
188            key_dict,
189            parts_reader,
190            key_filter: if has_pk { key_filter } else { None },
191            last_yield_pk_index: None,
192            keys_before_pruning: 0,
193            keys_after_pruning: 0,
194            prune_pk_cost: Duration::default(),
195            data_build_cost,
196        };
197        reader.prune_batch_by_key()?;
198
199        Ok(reader)
200    }
201
202    fn is_valid(&self) -> bool {
203        self.parts_reader.is_valid()
204    }
205
206    fn next(&mut self) -> Result<()> {
207        self.parts_reader.next()?;
208        self.prune_batch_by_key()
209    }
210
211    fn current_key(&self) -> Option<&[u8]> {
212        let pk_index = self.parts_reader.current_data_batch().pk_index();
213        self.key_dict
214            .as_ref()
215            .map(|dict| dict.key_by_pk_index(pk_index))
216    }
217
218    fn current_pk_id(&self) -> PkId {
219        let pk_index = self.parts_reader.current_data_batch().pk_index();
220        PkId {
221            shard_id: self.shard_id,
222            pk_index,
223        }
224    }
225
226    fn current_data_batch(&self) -> DataBatch {
227        self.parts_reader.current_data_batch()
228    }
229
230    fn prune_batch_by_key(&mut self) -> Result<()> {
231        let Some(key_filter) = &mut self.key_filter else {
232            return Ok(());
233        };
234
235        while self.parts_reader.is_valid() {
236            let pk_index = self.parts_reader.current_data_batch().pk_index();
237            if let Some(yield_pk_index) = self.last_yield_pk_index {
238                if pk_index == yield_pk_index {
239                    break;
240                }
241            }
242            self.keys_before_pruning += 1;
243            // Safety: `key_filter` is some so the shard has primary keys.
244            let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
245            let now = Instant::now();
246            if key_filter.matches(key) {
247                self.prune_pk_cost += now.elapsed();
248                self.last_yield_pk_index = Some(pk_index);
249                self.keys_after_pruning += 1;
250                break;
251            }
252            self.prune_pk_cost += now.elapsed();
253            self.parts_reader.next()?;
254        }
255
256        Ok(())
257    }
258}
259
260impl Drop for ShardReader {
261    fn drop(&mut self) {
262        let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
263        PARTITION_TREE_READ_STAGE_ELAPSED
264            .with_label_values(&["shard_prune_pk"])
265            .observe(shard_prune_pk);
266        if self.keys_before_pruning > 0 {
267            common_telemetry::debug!(
268                "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
269                self.parts_reader.num_parts(),
270                self.keys_before_pruning,
271                self.keys_after_pruning,
272                shard_prune_pk,
273                self.data_build_cost.as_secs_f64(),
274            );
275        }
276    }
277}
278
279/// A merger that merges batches from multiple shards.
280pub(crate) struct ShardMerger {
281    merger: Merger<ShardNode>,
282}
283
284impl ShardMerger {
285    pub(crate) fn try_new(nodes: Vec<ShardNode>) -> Result<Self> {
286        let merger = Merger::try_new(nodes)?;
287        Ok(ShardMerger { merger })
288    }
289}
290
291impl DataBatchSource for ShardMerger {
292    fn is_valid(&self) -> bool {
293        self.merger.is_valid()
294    }
295
296    fn next(&mut self) -> Result<()> {
297        self.merger.next()
298    }
299
300    fn current_pk_id(&self) -> PkId {
301        self.merger.current_node().current_pk_id()
302    }
303
304    fn current_key(&self) -> Option<&[u8]> {
305        self.merger.current_node().current_key()
306    }
307
308    fn current_data_batch(&self) -> DataBatch {
309        let batch = self.merger.current_node().current_data_batch();
310        batch.slice(0, self.merger.current_rows())
311    }
312}
313
314pub(crate) enum ShardSource {
315    Builder(ShardBuilderReader),
316    Shard(ShardReader),
317}
318
319impl ShardSource {
320    fn is_valid(&self) -> bool {
321        match self {
322            ShardSource::Builder(r) => r.is_valid(),
323            ShardSource::Shard(r) => r.is_valid(),
324        }
325    }
326
327    fn next(&mut self) -> Result<()> {
328        match self {
329            ShardSource::Builder(r) => r.next(),
330            ShardSource::Shard(r) => r.next(),
331        }
332    }
333
334    fn current_pk_id(&self) -> PkId {
335        match self {
336            ShardSource::Builder(r) => r.current_pk_id(),
337            ShardSource::Shard(r) => r.current_pk_id(),
338        }
339    }
340
341    fn current_key(&self) -> Option<&[u8]> {
342        match self {
343            ShardSource::Builder(r) => r.current_key(),
344            ShardSource::Shard(r) => r.current_key(),
345        }
346    }
347
348    fn current_data_batch(&self) -> DataBatch {
349        match self {
350            ShardSource::Builder(r) => r.current_data_batch(),
351            ShardSource::Shard(r) => r.current_data_batch(),
352        }
353    }
354}
355
356/// Node for the merger to get items.
357pub(crate) struct ShardNode {
358    source: ShardSource,
359}
360
361impl ShardNode {
362    pub(crate) fn new(source: ShardSource) -> Self {
363        Self { source }
364    }
365
366    fn current_pk_id(&self) -> PkId {
367        self.source.current_pk_id()
368    }
369
370    fn current_key(&self) -> Option<&[u8]> {
371        self.source.current_key()
372    }
373
374    fn current_data_batch(&self) -> DataBatch {
375        self.source.current_data_batch()
376    }
377}
378
379impl PartialEq for ShardNode {
380    fn eq(&self, other: &Self) -> bool {
381        self.source.current_key() == other.source.current_key()
382    }
383}
384
385impl Eq for ShardNode {}
386
387impl Ord for ShardNode {
388    fn cmp(&self, other: &Self) -> Ordering {
389        self.source
390            .current_key()
391            .cmp(&other.source.current_key())
392            .reverse()
393    }
394}
395
396impl PartialOrd for ShardNode {
397    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
398        Some(self.cmp(other))
399    }
400}
401
402impl Node for ShardNode {
403    fn is_valid(&self) -> bool {
404        self.source.is_valid()
405    }
406
407    fn is_behind(&self, other: &Self) -> bool {
408        // We expect a key only belongs to one shard.
409        debug_assert_ne!(self.source.current_key(), other.source.current_key());
410        self.source.current_key() < other.source.current_key()
411    }
412
413    fn advance(&mut self, len: usize) -> Result<()> {
414        debug_assert_eq!(self.source.current_data_batch().num_rows(), len);
415        self.source.next()
416    }
417
418    fn current_item_len(&self) -> usize {
419        self.current_data_batch().num_rows()
420    }
421
422    fn search_key_in_current_item(&self, _other: &Self) -> Result<usize, usize> {
423        Err(self.source.current_data_batch().num_rows())
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use std::sync::Arc;
430
431    use super::*;
432    use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
433    use crate::memtable::partition_tree::dict::KeyDictBuilder;
434    use crate::memtable::partition_tree::PkIndex;
435    use crate::memtable::stats::WriteMetrics;
436    use crate::memtable::KeyValues;
437    use crate::test_util::memtable_util::{
438        build_key_values_with_ts_seq_values, encode_keys, metadata_for_test,
439    };
440
441    /// Returns key values and expect pk index.
442    fn input_with_key(metadata: &RegionMetadataRef) -> Vec<(KeyValues, PkIndex)> {
443        vec![
444            (
445                build_key_values_with_ts_seq_values(
446                    metadata,
447                    "shard".to_string(),
448                    2,
449                    [20, 21].into_iter(),
450                    [Some(0.0), Some(1.0)].into_iter(),
451                    0,
452                ),
453                2,
454            ),
455            (
456                build_key_values_with_ts_seq_values(
457                    metadata,
458                    "shard".to_string(),
459                    0,
460                    [0, 1].into_iter(),
461                    [Some(0.0), Some(1.0)].into_iter(),
462                    1,
463                ),
464                0,
465            ),
466            (
467                build_key_values_with_ts_seq_values(
468                    metadata,
469                    "shard".to_string(),
470                    1,
471                    [10, 11].into_iter(),
472                    [Some(0.0), Some(1.0)].into_iter(),
473                    2,
474                ),
475                1,
476            ),
477        ]
478    }
479
480    fn new_shard_with_dict(
481        shard_id: ShardId,
482        metadata: RegionMetadataRef,
483        input: &[(KeyValues, PkIndex)],
484        data_freeze_threshold: usize,
485    ) -> Shard {
486        let mut dict_builder = KeyDictBuilder::new(1024);
487        let mut metrics = WriteMetrics::default();
488        let mut keys = Vec::with_capacity(input.len());
489        for (kvs, _) in input {
490            encode_keys(&metadata, kvs, &mut keys);
491        }
492        for key in &keys {
493            dict_builder.insert_key(key, None, &mut metrics);
494        }
495
496        let (dict, _) = dict_builder.finish().unwrap();
497        let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
498
499        Shard::new(
500            shard_id,
501            Some(Arc::new(dict)),
502            data_parts,
503            true,
504            data_freeze_threshold,
505        )
506    }
507
508    fn collect_timestamps(shard: &Shard) -> Vec<i64> {
509        let mut reader = shard.read().unwrap().build(None).unwrap();
510        let mut timestamps = Vec::new();
511        while reader.is_valid() {
512            let rb = reader.current_data_batch().slice_record_batch();
513            let ts_array = rb.column(1);
514            let ts_slice = timestamp_array_to_i64_slice(ts_array);
515            timestamps.extend_from_slice(ts_slice);
516
517            reader.next().unwrap();
518        }
519        timestamps
520    }
521
522    #[test]
523    fn test_write_read_shard() {
524        let metadata = metadata_for_test();
525        let input = input_with_key(&metadata);
526        let mut shard = new_shard_with_dict(8, metadata, &input, 100);
527        assert!(shard.is_empty());
528        for (key_values, pk_index) in &input {
529            for kv in key_values.iter() {
530                let pk_id = PkId {
531                    shard_id: shard.shard_id,
532                    pk_index: *pk_index,
533                };
534                shard.write_with_pk_id(pk_id, &kv).unwrap();
535            }
536        }
537        assert!(!shard.is_empty());
538
539        let timestamps = collect_timestamps(&shard);
540        assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
541    }
542
543    #[test]
544    fn test_shard_freeze() {
545        let metadata = metadata_for_test();
546        let kvs = build_key_values_with_ts_seq_values(
547            &metadata,
548            "shard".to_string(),
549            0,
550            [0].into_iter(),
551            [Some(0.0)].into_iter(),
552            0,
553        );
554        let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
555        let expected: Vec<_> = (0..200).collect();
556        for i in &expected {
557            let kvs = build_key_values_with_ts_seq_values(
558                &metadata,
559                "shard".to_string(),
560                0,
561                [*i].into_iter(),
562                [Some(0.0)].into_iter(),
563                *i as u64,
564            );
565            let pk_id = PkId {
566                shard_id: shard.shard_id,
567                pk_index: *i as PkIndex,
568            };
569            for kv in kvs.iter() {
570                shard.write_with_pk_id(pk_id, &kv).unwrap();
571            }
572        }
573        assert!(!shard.is_empty());
574        assert_eq!(3, shard.data_parts.frozen_len());
575
576        let timestamps = collect_timestamps(&shard);
577        assert_eq!(expected, timestamps);
578    }
579}