Skip to main content

mito2/memtable/partition_tree/
shard_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Builder of a shard.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use mito_codec::key_values::KeyValue;
22use mito_codec::row_converter::PrimaryKeyFilter;
23use snafu::ResultExt;
24use store_api::metadata::RegionMetadataRef;
25
26use crate::error::{DecodeSnafu, Result};
27use crate::memtable::partition_tree::data::{
28    DATA_INIT_CAP, DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts,
29};
30use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
31use crate::memtable::partition_tree::shard::Shard;
32use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
33use crate::memtable::stats::WriteMetrics;
34use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
35
36/// Builder to write keys and data to a shard that the key dictionary
37/// is still active.
38pub struct ShardBuilder {
39    /// Id of the current shard to build.
40    current_shard_id: ShardId,
41    /// Builder for the key dictionary.
42    dict_builder: KeyDictBuilder,
43    /// Buffer to store data.
44    data_buffer: DataBuffer,
45    /// Number of rows to freeze a data part.
46    data_freeze_threshold: usize,
47    dedup: bool,
48}
49
50impl ShardBuilder {
51    /// Returns a new builder.
52    pub fn new(
53        metadata: RegionMetadataRef,
54        config: &PartitionTreeConfig,
55        shard_id: ShardId,
56    ) -> ShardBuilder {
57        ShardBuilder {
58            current_shard_id: shard_id,
59            dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
60            data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup),
61            data_freeze_threshold: config.data_freeze_threshold,
62            dedup: config.dedup,
63        }
64    }
65
66    /// Write a key value with given pk_index (caller must ensure the pk_index exist in dict_builder)
67    pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
68        assert_eq!(self.current_shard_id, pk_id.shard_id);
69        self.data_buffer.write_row(pk_id.pk_index, key_value);
70    }
71
72    /// Write a key value with its encoded primary key.
73    pub fn write_with_key(
74        &mut self,
75        full_primary_key: &[u8],
76        sparse_key: Option<&[u8]>,
77        key_value: &KeyValue,
78        metrics: &mut WriteMetrics,
79    ) -> PkId {
80        // Safety: we check whether the builder need to freeze before.
81        let pk_index = self
82            .dict_builder
83            .insert_key(full_primary_key, sparse_key, metrics);
84        self.data_buffer.write_row(pk_index, key_value);
85        PkId {
86            shard_id: self.current_shard_id,
87            pk_index,
88        }
89    }
90
91    /// Returns true if the builder need to freeze.
92    pub fn should_freeze(&self) -> bool {
93        self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold
94    }
95
96    /// Returns the current shard id of the builder.
97    pub fn current_shard_id(&self) -> ShardId {
98        self.current_shard_id
99    }
100
101    /// Builds a new shard and resets the builder.
102    ///
103    /// Returns `None` if the builder is empty.
104    pub fn finish(
105        &mut self,
106        metadata: RegionMetadataRef,
107        pk_to_pk_id: &mut HashMap<Vec<u8>, PkId>,
108    ) -> Result<Option<Shard>> {
109        if self.is_empty() {
110            return Ok(None);
111        }
112
113        let (data_part, key_dict) = match self.dict_builder.finish() {
114            Some((dict, pk_to_index)) => {
115                // Adds mapping to the map.
116                pk_to_pk_id.reserve(pk_to_index.len());
117                for (k, pk_index) in pk_to_index {
118                    pk_to_pk_id.insert(
119                        k,
120                        PkId {
121                            shard_id: self.current_shard_id,
122                            pk_index,
123                        },
124                    );
125                }
126
127                let pk_weights = dict.pk_weights_to_sort_data();
128                let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
129                (part, Some(dict))
130            }
131            None => {
132                let pk_weights = [0];
133                (self.data_buffer.freeze(Some(&pk_weights), true)?, None)
134            }
135        };
136
137        // build data parts.
138        let data_parts =
139            DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
140        let key_dict = key_dict.map(Arc::new);
141        let shard_id = self.current_shard_id;
142        self.current_shard_id += 1;
143
144        Ok(Some(Shard::new(
145            shard_id,
146            key_dict,
147            data_parts,
148            self.dedup,
149            self.data_freeze_threshold,
150        )))
151    }
152
153    /// Scans the shard builder.
154    pub fn read(&self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReaderBuilder> {
155        let dict_reader = {
156            let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
157                .with_label_values(&["shard_builder_read_pk"])
158                .start_timer();
159            self.dict_builder.read()
160        };
161
162        {
163            let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
164                .with_label_values(&["sort_pk"])
165                .start_timer();
166            dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
167        }
168
169        let data_reader = self.data_buffer.read()?;
170        Ok(ShardBuilderReaderBuilder {
171            shard_id: self.current_shard_id,
172            dict_reader,
173            data_reader,
174        })
175    }
176
177    /// Returns true if the builder is empty.
178    pub fn is_empty(&self) -> bool {
179        self.data_buffer.is_empty()
180    }
181}
182
183pub(crate) struct ShardBuilderReaderBuilder {
184    shard_id: ShardId,
185    dict_reader: DictBuilderReader,
186    data_reader: DataBufferReaderBuilder,
187}
188
189impl ShardBuilderReaderBuilder {
190    pub(crate) fn build(
191        self,
192        pk_weights: Option<&[u16]>,
193        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
194    ) -> Result<ShardBuilderReader> {
195        let now = Instant::now();
196        let data_reader = self.data_reader.build(pk_weights)?;
197        ShardBuilderReader::new(
198            self.shard_id,
199            self.dict_reader,
200            data_reader,
201            key_filter,
202            now.elapsed(),
203        )
204    }
205}
206
207/// Reader to scan a shard builder.
208pub struct ShardBuilderReader {
209    shard_id: ShardId,
210    dict_reader: DictBuilderReader,
211    data_reader: DataBufferReader,
212    key_filter: Option<Box<dyn PrimaryKeyFilter>>,
213    last_yield_pk_index: Option<PkIndex>,
214    keys_before_pruning: usize,
215    keys_after_pruning: usize,
216    prune_pk_cost: Duration,
217    data_build_cost: Duration,
218}
219
220impl ShardBuilderReader {
221    fn new(
222        shard_id: ShardId,
223        dict_reader: DictBuilderReader,
224        data_reader: DataBufferReader,
225        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
226        data_build_cost: Duration,
227    ) -> Result<Self> {
228        let mut reader = ShardBuilderReader {
229            shard_id,
230            dict_reader,
231            data_reader,
232            key_filter,
233            last_yield_pk_index: None,
234            keys_before_pruning: 0,
235            keys_after_pruning: 0,
236            prune_pk_cost: Duration::default(),
237            data_build_cost,
238        };
239        reader.prune_batch_by_key()?;
240
241        Ok(reader)
242    }
243
244    pub fn is_valid(&self) -> bool {
245        self.data_reader.is_valid()
246    }
247
248    pub fn next(&mut self) -> Result<()> {
249        self.data_reader.next()?;
250        self.prune_batch_by_key()
251    }
252
253    pub fn current_key(&self) -> Option<&[u8]> {
254        let pk_index = self.data_reader.current_data_batch().pk_index();
255        Some(self.dict_reader.key_by_pk_index(pk_index))
256    }
257
258    pub fn current_pk_id(&self) -> PkId {
259        let pk_index = self.data_reader.current_data_batch().pk_index();
260        PkId {
261            shard_id: self.shard_id,
262            pk_index,
263        }
264    }
265
266    pub fn current_data_batch(&self) -> DataBatch<'_> {
267        self.data_reader.current_data_batch()
268    }
269
270    fn prune_batch_by_key(&mut self) -> Result<()> {
271        let Some(key_filter) = &mut self.key_filter else {
272            return Ok(());
273        };
274
275        while self.data_reader.is_valid() {
276            let pk_index = self.data_reader.current_data_batch().pk_index();
277            if let Some(yield_pk_index) = self.last_yield_pk_index
278                && pk_index == yield_pk_index
279            {
280                break;
281            }
282            self.keys_before_pruning += 1;
283            let key = self.dict_reader.key_by_pk_index(pk_index);
284            let now = Instant::now();
285            if key_filter.matches(key).context(DecodeSnafu)? {
286                self.prune_pk_cost += now.elapsed();
287                self.last_yield_pk_index = Some(pk_index);
288                self.keys_after_pruning += 1;
289                break;
290            }
291            self.prune_pk_cost += now.elapsed();
292            self.data_reader.next()?;
293        }
294
295        Ok(())
296    }
297}
298
299impl Drop for ShardBuilderReader {
300    fn drop(&mut self) {
301        let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
302        PARTITION_TREE_READ_STAGE_ELAPSED
303            .with_label_values(&["shard_builder_prune_pk"])
304            .observe(shard_builder_prune_pk);
305        if self.keys_before_pruning > 0 {
306            common_telemetry::debug!(
307                "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
308                self.keys_before_pruning,
309                self.keys_after_pruning,
310                shard_builder_prune_pk,
311                self.data_build_cost.as_secs_f64(),
312            );
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319
320    use super::*;
321    use crate::memtable::KeyValues;
322    use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
323    use crate::test_util::memtable_util::{
324        build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
325    };
326
327    fn input_with_key(metadata: &RegionMetadataRef) -> Vec<KeyValues> {
328        vec![
329            build_key_values_with_ts_seq_values(
330                metadata,
331                "shard_builder".to_string(),
332                2,
333                [20, 21].into_iter(),
334                [Some(0.0), Some(1.0)].into_iter(),
335                0,
336            ),
337            build_key_values_with_ts_seq_values(
338                metadata,
339                "shard_builder".to_string(),
340                0,
341                [0, 1].into_iter(),
342                [Some(0.0), Some(1.0)].into_iter(),
343                1,
344            ),
345            build_key_values_with_ts_seq_values(
346                metadata,
347                "shard_builder".to_string(),
348                1,
349                [10, 11].into_iter(),
350                [Some(0.0), Some(1.0)].into_iter(),
351                2,
352            ),
353        ]
354    }
355
356    #[test]
357    fn test_write_shard_builder() {
358        let metadata = metadata_for_test();
359        let input = input_with_key(&metadata);
360        let config = PartitionTreeConfig::default();
361        let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
362        let mut metrics = WriteMetrics::default();
363        assert!(
364            shard_builder
365                .finish(metadata.clone(), &mut HashMap::new())
366                .unwrap()
367                .is_none()
368        );
369        assert_eq!(1, shard_builder.current_shard_id);
370
371        for key_values in &input {
372            for kv in key_values.iter() {
373                let key = encode_key_by_kv(&kv);
374                shard_builder.write_with_key(&key, None, &kv, &mut metrics);
375            }
376        }
377        let shard = shard_builder
378            .finish(metadata, &mut HashMap::new())
379            .unwrap()
380            .unwrap();
381        assert_eq!(1, shard.shard_id);
382        assert_eq!(2, shard_builder.current_shard_id);
383    }
384
385    #[test]
386    fn test_write_read_shard_builder() {
387        let metadata = metadata_for_test();
388        let input = input_with_key(&metadata);
389        let config = PartitionTreeConfig::default();
390        let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
391        let mut metrics = WriteMetrics::default();
392
393        for key_values in &input {
394            for kv in key_values.iter() {
395                let key = encode_key_by_kv(&kv);
396                shard_builder.write_with_key(&key, None, &kv, &mut metrics);
397            }
398        }
399
400        let mut pk_weights = Vec::new();
401        let mut reader = shard_builder
402            .read(&mut pk_weights)
403            .unwrap()
404            .build(Some(&pk_weights), None)
405            .unwrap();
406        let mut timestamps = Vec::new();
407        while reader.is_valid() {
408            let rb = reader.current_data_batch().slice_record_batch();
409            let ts_array = rb.column(1);
410            let ts_slice = timestamp_array_to_i64_slice(ts_array);
411            timestamps.extend_from_slice(ts_slice);
412
413            reader.next().unwrap();
414        }
415        assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
416        assert_eq!(vec![2, 0, 1], pk_weights);
417    }
418}