mito2/memtable/partition_tree/
shard_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Builder of a shard.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use store_api::metadata::RegionMetadataRef;
22
23use crate::error::Result;
24use crate::memtable::key_values::KeyValue;
25use crate::memtable::partition_tree::data::{
26    DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
27};
28use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
29use crate::memtable::partition_tree::shard::Shard;
30use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
31use crate::memtable::stats::WriteMetrics;
32use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
33use crate::row_converter::PrimaryKeyFilter;
34
35/// Builder to write keys and data to a shard that the key dictionary
36/// is still active.
37pub struct ShardBuilder {
38    /// Id of the current shard to build.
39    current_shard_id: ShardId,
40    /// Builder for the key dictionary.
41    dict_builder: KeyDictBuilder,
42    /// Buffer to store data.
43    data_buffer: DataBuffer,
44    /// Number of rows to freeze a data part.
45    data_freeze_threshold: usize,
46    dedup: bool,
47}
48
49impl ShardBuilder {
50    /// Returns a new builder.
51    pub fn new(
52        metadata: RegionMetadataRef,
53        config: &PartitionTreeConfig,
54        shard_id: ShardId,
55    ) -> ShardBuilder {
56        ShardBuilder {
57            current_shard_id: shard_id,
58            dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
59            data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup),
60            data_freeze_threshold: config.data_freeze_threshold,
61            dedup: config.dedup,
62        }
63    }
64
65    /// Write a key value with given pk_index (caller must ensure the pk_index exist in dict_builder)
66    pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
67        assert_eq!(self.current_shard_id, pk_id.shard_id);
68        self.data_buffer.write_row(pk_id.pk_index, key_value);
69    }
70
71    /// Write a key value with its encoded primary key.
72    pub fn write_with_key(
73        &mut self,
74        full_primary_key: &[u8],
75        sparse_key: Option<&[u8]>,
76        key_value: &KeyValue,
77        metrics: &mut WriteMetrics,
78    ) -> PkId {
79        // Safety: we check whether the builder need to freeze before.
80        let pk_index = self
81            .dict_builder
82            .insert_key(full_primary_key, sparse_key, metrics);
83        self.data_buffer.write_row(pk_index, key_value);
84        PkId {
85            shard_id: self.current_shard_id,
86            pk_index,
87        }
88    }
89
90    /// Returns true if the builder need to freeze.
91    pub fn should_freeze(&self) -> bool {
92        self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold
93    }
94
95    /// Returns the current shard id of the builder.
96    pub fn current_shard_id(&self) -> ShardId {
97        self.current_shard_id
98    }
99
100    /// Builds a new shard and resets the builder.
101    ///
102    /// Returns `None` if the builder is empty.
103    pub fn finish(
104        &mut self,
105        metadata: RegionMetadataRef,
106        pk_to_pk_id: &mut HashMap<Vec<u8>, PkId>,
107    ) -> Result<Option<Shard>> {
108        if self.is_empty() {
109            return Ok(None);
110        }
111
112        let (data_part, key_dict) = match self.dict_builder.finish() {
113            Some((dict, pk_to_index)) => {
114                // Adds mapping to the map.
115                pk_to_pk_id.reserve(pk_to_index.len());
116                for (k, pk_index) in pk_to_index {
117                    pk_to_pk_id.insert(
118                        k,
119                        PkId {
120                            shard_id: self.current_shard_id,
121                            pk_index,
122                        },
123                    );
124                }
125
126                let pk_weights = dict.pk_weights_to_sort_data();
127                let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
128                (part, Some(dict))
129            }
130            None => {
131                let pk_weights = [0];
132                (self.data_buffer.freeze(Some(&pk_weights), true)?, None)
133            }
134        };
135
136        // build data parts.
137        let data_parts =
138            DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
139        let key_dict = key_dict.map(Arc::new);
140        let shard_id = self.current_shard_id;
141        self.current_shard_id += 1;
142
143        Ok(Some(Shard::new(
144            shard_id,
145            key_dict,
146            data_parts,
147            self.dedup,
148            self.data_freeze_threshold,
149        )))
150    }
151
152    /// Scans the shard builder.
153    pub fn read(&self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReaderBuilder> {
154        let dict_reader = {
155            let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
156                .with_label_values(&["shard_builder_read_pk"])
157                .start_timer();
158            self.dict_builder.read()
159        };
160
161        {
162            let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
163                .with_label_values(&["sort_pk"])
164                .start_timer();
165            dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
166        }
167
168        let data_reader = self.data_buffer.read()?;
169        Ok(ShardBuilderReaderBuilder {
170            shard_id: self.current_shard_id,
171            dict_reader,
172            data_reader,
173        })
174    }
175
176    /// Returns true if the builder is empty.
177    pub fn is_empty(&self) -> bool {
178        self.data_buffer.is_empty()
179    }
180}
181
182pub(crate) struct ShardBuilderReaderBuilder {
183    shard_id: ShardId,
184    dict_reader: DictBuilderReader,
185    data_reader: DataBufferReaderBuilder,
186}
187
188impl ShardBuilderReaderBuilder {
189    pub(crate) fn build(
190        self,
191        pk_weights: Option<&[u16]>,
192        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
193    ) -> Result<ShardBuilderReader> {
194        let now = Instant::now();
195        let data_reader = self.data_reader.build(pk_weights)?;
196        ShardBuilderReader::new(
197            self.shard_id,
198            self.dict_reader,
199            data_reader,
200            key_filter,
201            now.elapsed(),
202        )
203    }
204}
205
206/// Reader to scan a shard builder.
207pub struct ShardBuilderReader {
208    shard_id: ShardId,
209    dict_reader: DictBuilderReader,
210    data_reader: DataBufferReader,
211    key_filter: Option<Box<dyn PrimaryKeyFilter>>,
212    last_yield_pk_index: Option<PkIndex>,
213    keys_before_pruning: usize,
214    keys_after_pruning: usize,
215    prune_pk_cost: Duration,
216    data_build_cost: Duration,
217}
218
219impl ShardBuilderReader {
220    fn new(
221        shard_id: ShardId,
222        dict_reader: DictBuilderReader,
223        data_reader: DataBufferReader,
224        key_filter: Option<Box<dyn PrimaryKeyFilter>>,
225        data_build_cost: Duration,
226    ) -> Result<Self> {
227        let mut reader = ShardBuilderReader {
228            shard_id,
229            dict_reader,
230            data_reader,
231            key_filter,
232            last_yield_pk_index: None,
233            keys_before_pruning: 0,
234            keys_after_pruning: 0,
235            prune_pk_cost: Duration::default(),
236            data_build_cost,
237        };
238        reader.prune_batch_by_key()?;
239
240        Ok(reader)
241    }
242
243    pub fn is_valid(&self) -> bool {
244        self.data_reader.is_valid()
245    }
246
247    pub fn next(&mut self) -> Result<()> {
248        self.data_reader.next()?;
249        self.prune_batch_by_key()
250    }
251
252    pub fn current_key(&self) -> Option<&[u8]> {
253        let pk_index = self.data_reader.current_data_batch().pk_index();
254        Some(self.dict_reader.key_by_pk_index(pk_index))
255    }
256
257    pub fn current_pk_id(&self) -> PkId {
258        let pk_index = self.data_reader.current_data_batch().pk_index();
259        PkId {
260            shard_id: self.shard_id,
261            pk_index,
262        }
263    }
264
265    pub fn current_data_batch(&self) -> DataBatch {
266        self.data_reader.current_data_batch()
267    }
268
269    fn prune_batch_by_key(&mut self) -> Result<()> {
270        let Some(key_filter) = &mut self.key_filter else {
271            return Ok(());
272        };
273
274        while self.data_reader.is_valid() {
275            let pk_index = self.data_reader.current_data_batch().pk_index();
276            if let Some(yield_pk_index) = self.last_yield_pk_index {
277                if pk_index == yield_pk_index {
278                    break;
279                }
280            }
281            self.keys_before_pruning += 1;
282            let key = self.dict_reader.key_by_pk_index(pk_index);
283            let now = Instant::now();
284            if key_filter.matches(key) {
285                self.prune_pk_cost += now.elapsed();
286                self.last_yield_pk_index = Some(pk_index);
287                self.keys_after_pruning += 1;
288                break;
289            }
290            self.prune_pk_cost += now.elapsed();
291            self.data_reader.next()?;
292        }
293
294        Ok(())
295    }
296}
297
298impl Drop for ShardBuilderReader {
299    fn drop(&mut self) {
300        let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
301        PARTITION_TREE_READ_STAGE_ELAPSED
302            .with_label_values(&["shard_builder_prune_pk"])
303            .observe(shard_builder_prune_pk);
304        if self.keys_before_pruning > 0 {
305            common_telemetry::debug!(
306                "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
307                self.keys_before_pruning,
308                self.keys_after_pruning,
309                shard_builder_prune_pk,
310                self.data_build_cost.as_secs_f64(),
311            );
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318
319    use super::*;
320    use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
321    use crate::memtable::KeyValues;
322    use crate::test_util::memtable_util::{
323        build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
324    };
325
326    fn input_with_key(metadata: &RegionMetadataRef) -> Vec<KeyValues> {
327        vec![
328            build_key_values_with_ts_seq_values(
329                metadata,
330                "shard_builder".to_string(),
331                2,
332                [20, 21].into_iter(),
333                [Some(0.0), Some(1.0)].into_iter(),
334                0,
335            ),
336            build_key_values_with_ts_seq_values(
337                metadata,
338                "shard_builder".to_string(),
339                0,
340                [0, 1].into_iter(),
341                [Some(0.0), Some(1.0)].into_iter(),
342                1,
343            ),
344            build_key_values_with_ts_seq_values(
345                metadata,
346                "shard_builder".to_string(),
347                1,
348                [10, 11].into_iter(),
349                [Some(0.0), Some(1.0)].into_iter(),
350                2,
351            ),
352        ]
353    }
354
355    #[test]
356    fn test_write_shard_builder() {
357        let metadata = metadata_for_test();
358        let input = input_with_key(&metadata);
359        let config = PartitionTreeConfig::default();
360        let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
361        let mut metrics = WriteMetrics::default();
362        assert!(shard_builder
363            .finish(metadata.clone(), &mut HashMap::new())
364            .unwrap()
365            .is_none());
366        assert_eq!(1, shard_builder.current_shard_id);
367
368        for key_values in &input {
369            for kv in key_values.iter() {
370                let key = encode_key_by_kv(&kv);
371                shard_builder.write_with_key(&key, None, &kv, &mut metrics);
372            }
373        }
374        let shard = shard_builder
375            .finish(metadata, &mut HashMap::new())
376            .unwrap()
377            .unwrap();
378        assert_eq!(1, shard.shard_id);
379        assert_eq!(2, shard_builder.current_shard_id);
380    }
381
382    #[test]
383    fn test_write_read_shard_builder() {
384        let metadata = metadata_for_test();
385        let input = input_with_key(&metadata);
386        let config = PartitionTreeConfig::default();
387        let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
388        let mut metrics = WriteMetrics::default();
389
390        for key_values in &input {
391            for kv in key_values.iter() {
392                let key = encode_key_by_kv(&kv);
393                shard_builder.write_with_key(&key, None, &kv, &mut metrics);
394            }
395        }
396
397        let mut pk_weights = Vec::new();
398        let mut reader = shard_builder
399            .read(&mut pk_weights)
400            .unwrap()
401            .build(Some(&pk_weights), None)
402            .unwrap();
403        let mut timestamps = Vec::new();
404        while reader.is_valid() {
405            let rb = reader.current_data_batch().slice_record_batch();
406            let ts_array = rb.column(1);
407            let ts_slice = timestamp_array_to_i64_slice(ts_array);
408            timestamps.extend_from_slice(ts_slice);
409
410            reader.next().unwrap();
411        }
412        assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
413        assert_eq!(vec![2, 0, 1], pk_weights);
414    }
415}