Skip to main content

mito2/memtable/partition_tree/
shard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shard in a partition.
16
17use std::cmp::Ordering;
18use std::time::{Duration, Instant};
19
20use mito_codec::key_values::KeyValue;
21use mito_codec::row_converter::PrimaryKeyFilter;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24
25use crate::error::{DecodeSnafu, Result};
26use crate::memtable::partition_tree::data::{
27    DATA_INIT_CAP, DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder,
28};
29use crate::memtable::partition_tree::dict::KeyDictRef;
30use crate::memtable::partition_tree::merger::{Merger, Node};
31use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
32use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
33use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
34
35/// Shard stores data related to the same key dictionary.
36pub struct Shard {
37    pub(crate) shard_id: ShardId,
38    /// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key.
39    key_dict: Option<KeyDictRef>,
40    /// Data in the shard.
41    data_parts: DataParts,
42    dedup: bool,
43    /// Number of rows to freeze a data part.
44    data_freeze_threshold: usize,
45}
46
47impl Shard {
48    /// Returns a new shard.
49    pub fn new(
50        shard_id: ShardId,
51        key_dict: Option<KeyDictRef>,
52        data_parts: DataParts,
53        dedup: bool,
54        data_freeze_threshold: usize,
55    ) -> Shard {
56        Shard {
57            shard_id,
58            key_dict,
59            data_parts,
60            dedup,
61            data_freeze_threshold,
62        }
63    }
64
65    /// Writes a key value into the shard.
66    ///
67    /// It will freezes the active buffer if it is full.
68    pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
69        debug_assert_eq!(self.shard_id, pk_id.shard_id);
70
71        if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
72            self.data_parts.freeze()?;
73        }
74
75        self.data_parts.write_row(pk_id.pk_index, key_value);
76        Ok(())
77    }
78
79    /// Scans the shard.
80    // TODO(yingwen): Push down projection to data parts.
81    pub fn read(&self) -> Result<ShardReaderBuilder> {
82        let parts_reader = self.data_parts.read()?;
83
84        Ok(ShardReaderBuilder {
85            shard_id: self.shard_id,
86            key_dict: self.key_dict.clone(),
87            inner: parts_reader,
88        })
89    }
90
91    /// Forks a shard.
92    pub fn fork(&self, metadata: RegionMetadataRef) -> Shard {
93        Shard {
94            shard_id: self.shard_id,
95            key_dict: self.key_dict.clone(),
96            data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
97            dedup: self.dedup,
98            data_freeze_threshold: self.data_freeze_threshold,
99        }
100    }
101
102    /// Returns true if the shard is empty (No data).
103    pub fn is_empty(&self) -> bool {
104        self.data_parts.is_empty()
105    }
106
107    /// Returns the memory size of the shard part.
108    pub(crate) fn shared_memory_size(&self) -> usize {
109        self.key_dict
110            .as_ref()
111            .map(|dict| dict.shared_memory_size())
112            .unwrap_or(0)
113    }
114}
115
116/// Source that returns [DataBatch].
117pub trait DataBatchSource {
118    /// Returns whether current source is still valid.
119    fn is_valid(&self) -> bool;
120
121    /// Advances source to next data batch.
122    fn next(&mut self) -> Result<()>;
123
124    /// Returns current pk id.
125    /// # Panics
126    /// If source is not valid.
127    fn current_pk_id(&self) -> PkId;
128
129    /// Returns the current primary key bytes or None if it doesn't have primary key.
130    ///
131    /// # Panics
132    /// If source is not valid.
133    fn current_key(&self) -> Option<&[u8]>;
134
135    /// Returns the data part.
136    /// # Panics
137    /// If source is not valid.
138    fn current_data_batch(&self) -> DataBatch<'_>;
139}
140
141pub type BoxedDataBatchSource = Box<dyn DataBatchSource + Send>;
142
143pub struct ShardReaderBuilder {
144    shard_id: ShardId,
145    key_dict: Option<KeyDictRef>,
146    inner: DataPartsReaderBuilder,
147}
148
149impl ShardReaderBuilder {
150    pub(crate) fn build(
151        self,
152        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
153    ) -> Result<ShardReader> {
154        let ShardReaderBuilder {
155            shard_id,
156            key_dict,
157            inner,
158        } = self;
159        let now = Instant::now();
160        let parts_reader = inner.build()?;
161        ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed())
162    }
163}
164
165/// Reader to read rows in a shard.
166pub struct ShardReader {
167    shard_id: ShardId,
168    key_dict: Option<KeyDictRef>,
169    parts_reader: DataPartsReader,
170    key_filter: Option<Box<dyn PrimaryKeyFilter>>,
171    last_yield_pk_index: Option<PkIndex>,
172    keys_before_pruning: usize,
173    keys_after_pruning: usize,
174    prune_pk_cost: Duration,
175    data_build_cost: Duration,
176}
177
178impl ShardReader {
179    fn new(
180        shard_id: ShardId,
181        key_dict: Option<KeyDictRef>,
182        parts_reader: DataPartsReader,
183        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
184        data_build_cost: Duration,
185    ) -> Result<Self> {
186        let has_pk = key_dict.is_some();
187        let mut reader = Self {
188            shard_id,
189            key_dict,
190            parts_reader,
191            key_filter: if has_pk { key_filter } else { None },
192            last_yield_pk_index: None,
193            keys_before_pruning: 0,
194            keys_after_pruning: 0,
195            prune_pk_cost: Duration::default(),
196            data_build_cost,
197        };
198        reader.prune_batch_by_key()?;
199
200        Ok(reader)
201    }
202
203    fn is_valid(&self) -> bool {
204        self.parts_reader.is_valid()
205    }
206
207    fn next(&mut self) -> Result<()> {
208        self.parts_reader.next()?;
209        self.prune_batch_by_key()
210    }
211
212    fn current_key(&self) -> Option<&[u8]> {
213        let pk_index = self.parts_reader.current_data_batch().pk_index();
214        self.key_dict
215            .as_ref()
216            .map(|dict| dict.key_by_pk_index(pk_index))
217    }
218
219    fn current_pk_id(&self) -> PkId {
220        let pk_index = self.parts_reader.current_data_batch().pk_index();
221        PkId {
222            shard_id: self.shard_id,
223            pk_index,
224        }
225    }
226
227    fn current_data_batch(&self) -> DataBatch<'_> {
228        self.parts_reader.current_data_batch()
229    }
230
231    fn prune_batch_by_key(&mut self) -> Result<()> {
232        let Some(key_filter) = &mut self.key_filter else {
233            return Ok(());
234        };
235
236        while self.parts_reader.is_valid() {
237            let pk_index = self.parts_reader.current_data_batch().pk_index();
238            if let Some(yield_pk_index) = self.last_yield_pk_index
239                && pk_index == yield_pk_index
240            {
241                break;
242            }
243            self.keys_before_pruning += 1;
244            // Safety: `key_filter` is some so the shard has primary keys.
245            let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
246            let now = Instant::now();
247            if key_filter.matches(key).context(DecodeSnafu)? {
248                self.prune_pk_cost += now.elapsed();
249                self.last_yield_pk_index = Some(pk_index);
250                self.keys_after_pruning += 1;
251                break;
252            }
253            self.prune_pk_cost += now.elapsed();
254            self.parts_reader.next()?;
255        }
256
257        Ok(())
258    }
259}
260
261impl Drop for ShardReader {
262    fn drop(&mut self) {
263        let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
264        PARTITION_TREE_READ_STAGE_ELAPSED
265            .with_label_values(&["shard_prune_pk"])
266            .observe(shard_prune_pk);
267        if self.keys_before_pruning > 0 {
268            common_telemetry::debug!(
269                "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
270                self.parts_reader.num_parts(),
271                self.keys_before_pruning,
272                self.keys_after_pruning,
273                shard_prune_pk,
274                self.data_build_cost.as_secs_f64(),
275            );
276        }
277    }
278}
279
280/// A merger that merges batches from multiple shards.
281pub(crate) struct ShardMerger {
282    merger: Merger<ShardNode>,
283}
284
285impl ShardMerger {
286    pub(crate) fn try_new(nodes: Vec<ShardNode>) -> Result<Self> {
287        let merger = Merger::try_new(nodes)?;
288        Ok(ShardMerger { merger })
289    }
290}
291
292impl DataBatchSource for ShardMerger {
293    fn is_valid(&self) -> bool {
294        self.merger.is_valid()
295    }
296
297    fn next(&mut self) -> Result<()> {
298        self.merger.next()
299    }
300
301    fn current_pk_id(&self) -> PkId {
302        self.merger.current_node().current_pk_id()
303    }
304
305    fn current_key(&self) -> Option<&[u8]> {
306        self.merger.current_node().current_key()
307    }
308
309    fn current_data_batch(&self) -> DataBatch<'_> {
310        let batch = self.merger.current_node().current_data_batch();
311        batch.slice(0, self.merger.current_rows())
312    }
313}
314
315pub(crate) enum ShardSource {
316    Builder(ShardBuilderReader),
317    Shard(ShardReader),
318}
319
320impl ShardSource {
321    fn is_valid(&self) -> bool {
322        match self {
323            ShardSource::Builder(r) => r.is_valid(),
324            ShardSource::Shard(r) => r.is_valid(),
325        }
326    }
327
328    fn next(&mut self) -> Result<()> {
329        match self {
330            ShardSource::Builder(r) => r.next(),
331            ShardSource::Shard(r) => r.next(),
332        }
333    }
334
335    fn current_pk_id(&self) -> PkId {
336        match self {
337            ShardSource::Builder(r) => r.current_pk_id(),
338            ShardSource::Shard(r) => r.current_pk_id(),
339        }
340    }
341
342    fn current_key(&self) -> Option<&[u8]> {
343        match self {
344            ShardSource::Builder(r) => r.current_key(),
345            ShardSource::Shard(r) => r.current_key(),
346        }
347    }
348
349    fn current_data_batch(&self) -> DataBatch<'_> {
350        match self {
351            ShardSource::Builder(r) => r.current_data_batch(),
352            ShardSource::Shard(r) => r.current_data_batch(),
353        }
354    }
355}
356
357/// Node for the merger to get items.
358pub(crate) struct ShardNode {
359    source: ShardSource,
360}
361
362impl ShardNode {
363    pub(crate) fn new(source: ShardSource) -> Self {
364        Self { source }
365    }
366
367    fn current_pk_id(&self) -> PkId {
368        self.source.current_pk_id()
369    }
370
371    fn current_key(&self) -> Option<&[u8]> {
372        self.source.current_key()
373    }
374
375    fn current_data_batch(&self) -> DataBatch<'_> {
376        self.source.current_data_batch()
377    }
378}
379
380impl PartialEq for ShardNode {
381    fn eq(&self, other: &Self) -> bool {
382        self.source.current_key() == other.source.current_key()
383    }
384}
385
386impl Eq for ShardNode {}
387
388impl Ord for ShardNode {
389    fn cmp(&self, other: &Self) -> Ordering {
390        self.source
391            .current_key()
392            .cmp(&other.source.current_key())
393            .reverse()
394    }
395}
396
397impl PartialOrd for ShardNode {
398    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
399        Some(self.cmp(other))
400    }
401}
402
403impl Node for ShardNode {
404    fn is_valid(&self) -> bool {
405        self.source.is_valid()
406    }
407
408    fn is_behind(&self, other: &Self) -> bool {
409        // We expect a key only belongs to one shard.
410        debug_assert_ne!(self.source.current_key(), other.source.current_key());
411        self.source.current_key() < other.source.current_key()
412    }
413
414    fn advance(&mut self, len: usize) -> Result<()> {
415        debug_assert_eq!(self.source.current_data_batch().num_rows(), len);
416        self.source.next()
417    }
418
419    fn current_item_len(&self) -> usize {
420        self.current_data_batch().num_rows()
421    }
422
423    fn search_key_in_current_item(&self, _other: &Self) -> Result<usize, usize> {
424        Err(self.source.current_data_batch().num_rows())
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use std::sync::Arc;
431
432    use super::*;
433    use crate::memtable::KeyValues;
434    use crate::memtable::partition_tree::PkIndex;
435    use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
436    use crate::memtable::partition_tree::dict::KeyDictBuilder;
437    use crate::memtable::stats::WriteMetrics;
438    use crate::test_util::memtable_util::{
439        build_key_values_with_ts_seq_values, encode_keys, metadata_for_test,
440    };
441
442    /// Returns key values and expect pk index.
443    fn input_with_key(metadata: &RegionMetadataRef) -> Vec<(KeyValues, PkIndex)> {
444        vec![
445            (
446                build_key_values_with_ts_seq_values(
447                    metadata,
448                    "shard".to_string(),
449                    2,
450                    [20, 21].into_iter(),
451                    [Some(0.0), Some(1.0)].into_iter(),
452                    0,
453                ),
454                2,
455            ),
456            (
457                build_key_values_with_ts_seq_values(
458                    metadata,
459                    "shard".to_string(),
460                    0,
461                    [0, 1].into_iter(),
462                    [Some(0.0), Some(1.0)].into_iter(),
463                    1,
464                ),
465                0,
466            ),
467            (
468                build_key_values_with_ts_seq_values(
469                    metadata,
470                    "shard".to_string(),
471                    1,
472                    [10, 11].into_iter(),
473                    [Some(0.0), Some(1.0)].into_iter(),
474                    2,
475                ),
476                1,
477            ),
478        ]
479    }
480
481    fn new_shard_with_dict(
482        shard_id: ShardId,
483        metadata: RegionMetadataRef,
484        input: &[(KeyValues, PkIndex)],
485        data_freeze_threshold: usize,
486    ) -> Shard {
487        let mut dict_builder = KeyDictBuilder::new(1024);
488        let mut metrics = WriteMetrics::default();
489        let mut keys = Vec::with_capacity(input.len());
490        for (kvs, _) in input {
491            encode_keys(&metadata, kvs, &mut keys);
492        }
493        for key in &keys {
494            dict_builder.insert_key(key, None, &mut metrics);
495        }
496
497        let (dict, _) = dict_builder.finish().unwrap();
498        let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
499
500        Shard::new(
501            shard_id,
502            Some(Arc::new(dict)),
503            data_parts,
504            true,
505            data_freeze_threshold,
506        )
507    }
508
509    fn collect_timestamps(shard: &Shard) -> Vec<i64> {
510        let mut reader = shard.read().unwrap().build(None).unwrap();
511        let mut timestamps = Vec::new();
512        while reader.is_valid() {
513            let rb = reader.current_data_batch().slice_record_batch();
514            let ts_array = rb.column(1);
515            let ts_slice = timestamp_array_to_i64_slice(ts_array);
516            timestamps.extend_from_slice(ts_slice);
517
518            reader.next().unwrap();
519        }
520        timestamps
521    }
522
523    #[test]
524    fn test_write_read_shard() {
525        let metadata = metadata_for_test();
526        let input = input_with_key(&metadata);
527        let mut shard = new_shard_with_dict(8, metadata, &input, 100);
528        assert!(shard.is_empty());
529        for (key_values, pk_index) in &input {
530            for kv in key_values.iter() {
531                let pk_id = PkId {
532                    shard_id: shard.shard_id,
533                    pk_index: *pk_index,
534                };
535                shard.write_with_pk_id(pk_id, &kv).unwrap();
536            }
537        }
538        assert!(!shard.is_empty());
539
540        let timestamps = collect_timestamps(&shard);
541        assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
542    }
543
544    #[test]
545    fn test_shard_freeze() {
546        let metadata = metadata_for_test();
547        let kvs = build_key_values_with_ts_seq_values(
548            &metadata,
549            "shard".to_string(),
550            0,
551            [0].into_iter(),
552            [Some(0.0)].into_iter(),
553            0,
554        );
555        let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
556        let expected: Vec<_> = (0..200).collect();
557        for i in &expected {
558            let kvs = build_key_values_with_ts_seq_values(
559                &metadata,
560                "shard".to_string(),
561                0,
562                [*i].into_iter(),
563                [Some(0.0)].into_iter(),
564                *i as u64,
565            );
566            let pk_id = PkId {
567                shard_id: shard.shard_id,
568                pk_index: *i as PkIndex,
569            };
570            for kv in kvs.iter() {
571                shard.write_with_pk_id(pk_id, &kv).unwrap();
572            }
573        }
574        assert!(!shard.is_empty());
575        assert_eq!(3, shard.data_parts.frozen_len());
576
577        let timestamps = collect_timestamps(&shard);
578        assert_eq!(expected, timestamps);
579    }
580}