mito2/memtable/partition_tree/
dict.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Key dictionary of a shard.
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
21
22use crate::memtable::partition_tree::PkIndex;
23use crate::memtable::stats::WriteMetrics;
24use crate::metrics::MEMTABLE_DICT_BYTES;
25
26/// Maximum keys in a [DictBlock].
27const MAX_KEYS_PER_BLOCK: u16 = 256;
28
29/// The key is mcmp-encoded primary keys, while the values are the pk index and
30/// optionally sparsely encoded primary keys.
31type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
32
33/// Builder to build a key dictionary.
34pub struct KeyDictBuilder {
35    /// Max keys of the dictionary.
36    capacity: usize,
37    /// Number of keys in the builder.
38    num_keys: usize,
39    /// Maps primary key to pk index.
40    pk_to_index: PkIndexMap,
41    /// Buffer for active dict block.
42    key_buffer: KeyBuffer,
43    /// Dictionary blocks.
44    dict_blocks: Vec<DictBlock>,
45    /// Bytes allocated by keys in the index.
46    key_bytes_in_index: usize,
47}
48
49impl KeyDictBuilder {
50    /// Creates a new builder that can hold up to `capacity` keys.
51    pub fn new(capacity: usize) -> Self {
52        Self {
53            capacity,
54            num_keys: 0,
55            pk_to_index: BTreeMap::new(),
56            key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
57            dict_blocks: Vec::with_capacity(capacity / MAX_KEYS_PER_BLOCK as usize + 1),
58            key_bytes_in_index: 0,
59        }
60    }
61
62    /// Returns true if the builder is full.
63    pub fn is_full(&self) -> bool {
64        self.num_keys >= self.capacity
65    }
66
67    /// Adds the key to the builder and returns its index if the builder is not full.
68    ///
69    /// # Panics
70    /// Panics if the builder is full.
71    pub fn insert_key(
72        &mut self,
73        full_primary_key: &[u8],
74        sparse_key: Option<&[u8]>,
75        metrics: &mut WriteMetrics,
76    ) -> PkIndex {
77        assert!(!self.is_full());
78
79        if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
80            // Already in the builder.
81            return pk_index;
82        }
83
84        if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
85            // The write buffer is full. Freeze a dict block.
86            let dict_block = self.key_buffer.finish(false);
87            self.dict_blocks.push(dict_block);
88        }
89
90        // Safety: we have checked the buffer length.
91        let pk_index = self.key_buffer.push_key(full_primary_key);
92        let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
93            (Some(sparse_key.to_vec()), sparse_key.len())
94        } else {
95            (None, 0)
96        };
97        self.pk_to_index
98            .insert(full_primary_key.to_vec(), (pk_index, sparse_key));
99        self.num_keys += 1;
100
101        // Since we store the key twice so the bytes usage doubled.
102        metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
103        self.key_bytes_in_index += full_primary_key.len();
104
105        // Adds key size of index to the metrics.
106        MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
107
108        pk_index
109    }
110
111    /// Memory size of the builder.
112    #[cfg(test)]
113    pub fn memory_size(&self) -> usize {
114        self.key_bytes_in_index
115            + self.key_buffer.buffer_memory_size()
116            + self
117                .dict_blocks
118                .iter()
119                .map(|block| block.buffer_memory_size())
120                .sum::<usize>()
121    }
122
123    /// Finishes the builder. The key of the second BTreeMap is sparse-encoded bytes.
124    pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
125        if self.key_buffer.is_empty() {
126            return None;
127        }
128        let mut pk_to_index_map = BTreeMap::new();
129
130        // Finishes current dict block and resets the pk index.
131        let dict_block = self.key_buffer.finish(true);
132        self.dict_blocks.push(dict_block);
133        // Computes key position and then alter pk index.
134        let mut key_positions = vec![0; self.pk_to_index.len()];
135
136        for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
137            .into_iter()
138            .enumerate()
139        {
140            // The position of the i-th key is the old pk index.
141            key_positions[i] = pk_index;
142            if let Some(sparse_key) = sparse_key {
143                pk_to_index_map.insert(sparse_key, i as PkIndex);
144            }
145        }
146
147        self.num_keys = 0;
148        let key_bytes_in_index = self.key_bytes_in_index;
149        self.key_bytes_in_index = 0;
150
151        Some((
152            KeyDict {
153                dict_blocks: std::mem::take(&mut self.dict_blocks),
154                key_positions,
155                key_bytes_in_index,
156            },
157            pk_to_index_map,
158        ))
159    }
160
161    /// Reads the builder.
162    pub fn read(&self) -> DictBuilderReader {
163        let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
164        let block = self.key_buffer.finish_cloned();
165        let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
166        blocks.extend_from_slice(&self.dict_blocks);
167        blocks.push(block);
168
169        DictBuilderReader::new(blocks, sorted_pk_indices)
170    }
171}
172
173impl Drop for KeyDictBuilder {
174    fn drop(&mut self) {
175        MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
176    }
177}
178
179/// Reader to scan the [KeyDictBuilder].
180#[derive(Default)]
181pub struct DictBuilderReader {
182    blocks: Vec<DictBlock>,
183    sorted_pk_indices: Vec<PkIndex>,
184}
185
186impl DictBuilderReader {
187    fn new(blocks: Vec<DictBlock>, sorted_pk_indices: Vec<PkIndex>) -> Self {
188        Self {
189            blocks,
190            sorted_pk_indices,
191        }
192    }
193
194    /// Returns the number of keys.
195    #[cfg(test)]
196    pub fn num_keys(&self) -> usize {
197        self.sorted_pk_indices.len()
198    }
199
200    /// Gets the i-th pk index.
201    #[cfg(test)]
202    pub fn pk_index(&self, offset: usize) -> PkIndex {
203        self.sorted_pk_indices[offset]
204    }
205
206    /// Gets the i-th key.
207    #[cfg(test)]
208    pub fn key(&self, offset: usize) -> &[u8] {
209        let pk_index = self.pk_index(offset);
210        self.key_by_pk_index(pk_index)
211    }
212
213    /// Gets the key by the pk index.
214    pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
215        let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
216        self.blocks[block_idx as usize].key_by_pk_index(pk_index)
217    }
218
219    /// Returns pk weights to sort a data part and replaces pk indices.
220    pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
221        compute_pk_weights(&self.sorted_pk_indices, pk_weights)
222    }
223}
224
225/// Returns pk weights to sort a data part and replaces pk indices.
226fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
227    pk_weights.resize(sorted_pk_indices.len(), 0);
228    for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
229        pk_weights[*pk_index as usize] = weight as u16;
230    }
231}
232
233/// A key dictionary.
234#[derive(Default)]
235pub struct KeyDict {
236    // TODO(yingwen): We can use key_positions to do a binary search.
237    /// Unsorted key blocks.
238    dict_blocks: Vec<DictBlock>,
239    /// Maps pk index to position of the key in [Self::dict_blocks].
240    key_positions: Vec<PkIndex>,
241    /// Bytes of keys in the index.
242    key_bytes_in_index: usize,
243}
244
245pub type KeyDictRef = Arc<KeyDict>;
246
247impl KeyDict {
248    /// Gets the primary key by its index.
249    ///
250    /// # Panics
251    /// Panics if the index is invalid.
252    pub fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
253        let position = self.key_positions[index as usize];
254        let block_index = position / MAX_KEYS_PER_BLOCK;
255        self.dict_blocks[block_index as usize].key_by_pk_index(position)
256    }
257
258    /// Returns pk weights to sort a data part and replaces pk indices.
259    pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
260        let mut pk_weights = Vec::with_capacity(self.key_positions.len());
261        compute_pk_weights(&self.key_positions, &mut pk_weights);
262        pk_weights
263    }
264
265    /// Returns the shared memory size.
266    pub(crate) fn shared_memory_size(&self) -> usize {
267        self.key_bytes_in_index
268            + self
269                .dict_blocks
270                .iter()
271                .map(|block| block.buffer_memory_size())
272                .sum::<usize>()
273    }
274}
275
276impl Drop for KeyDict {
277    fn drop(&mut self) {
278        MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
279    }
280}
281
282/// Buffer to store unsorted primary keys.
283struct KeyBuffer {
284    key_builder: BinaryBuilder,
285    next_pk_index: usize,
286}
287
288impl KeyBuffer {
289    fn new(item_capacity: usize) -> Self {
290        Self {
291            key_builder: BinaryBuilder::with_capacity(item_capacity, 0),
292            next_pk_index: 0,
293        }
294    }
295
296    /// Pushes a new key and returns its pk index.
297    ///
298    /// # Panics
299    /// Panics if the [PkIndex] type cannot represent the index.
300    fn push_key(&mut self, key: &[u8]) -> PkIndex {
301        let pk_index = self.next_pk_index.try_into().unwrap();
302        self.next_pk_index += 1;
303        self.key_builder.append_value(key);
304
305        pk_index
306    }
307
308    /// Returns number of items in the buffer.
309    fn len(&self) -> usize {
310        self.key_builder.len()
311    }
312
313    /// Returns whether the buffer is empty.
314    fn is_empty(&self) -> bool {
315        self.key_builder.is_empty()
316    }
317
318    /// Returns the buffer size of the builder.
319    #[cfg(test)]
320    fn buffer_memory_size(&self) -> usize {
321        self.key_builder.values_slice().len()
322            + std::mem::size_of_val(self.key_builder.offsets_slice())
323            + self
324                .key_builder
325                .validity_slice()
326                .map(|v| v.len())
327                .unwrap_or(0)
328    }
329
330    fn finish(&mut self, reset_index: bool) -> DictBlock {
331        let primary_key = self.key_builder.finish();
332        // Reserve capacity for the new builder. `finish()` the builder will leave the builder
333        // empty with capacity 0.
334        // TODO(yingwen): Do we need to reserve capacity for data?
335        self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0);
336        if reset_index {
337            self.next_pk_index = 0;
338        }
339
340        DictBlock::new(primary_key)
341    }
342
343    fn finish_cloned(&self) -> DictBlock {
344        let primary_key = self.key_builder.finish_cloned();
345
346        DictBlock::new(primary_key)
347    }
348}
349
350/// A block in the key dictionary.
351///
352/// The block is cheap to clone. Keys in the block are unsorted.
353#[derive(Clone)]
354struct DictBlock {
355    /// Container of keys in the block.
356    keys: BinaryArray,
357}
358
359impl DictBlock {
360    fn new(keys: BinaryArray) -> Self {
361        let buffer_size = keys.get_buffer_memory_size();
362        MEMTABLE_DICT_BYTES.add(buffer_size as i64);
363
364        Self { keys }
365    }
366
367    fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
368        let pos = index % MAX_KEYS_PER_BLOCK;
369        self.keys.value(pos as usize)
370    }
371
372    fn buffer_memory_size(&self) -> usize {
373        self.keys.get_buffer_memory_size()
374    }
375}
376
377impl Drop for DictBlock {
378    fn drop(&mut self) {
379        let buffer_size = self.keys.get_buffer_memory_size();
380        MEMTABLE_DICT_BYTES.sub(buffer_size as i64);
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use rand::Rng;
387
388    use super::*;
389
390    fn prepare_input_keys(num_keys: usize) -> Vec<Vec<u8>> {
391        let prefix = ["a", "b", "c", "d", "e", "f"];
392        let mut rng = rand::rng();
393        let mut keys = Vec::with_capacity(num_keys);
394        for i in 0..num_keys {
395            let prefix_idx = rng.random_range(0..prefix.len());
396            // We don't need to decode the primary key in index's test so we format the string
397            // into the key.
398            let key = format!("{}{}", prefix[prefix_idx], i);
399            keys.push(key.into_bytes());
400        }
401
402        keys
403    }
404
405    #[test]
406    fn test_write_scan_builder() {
407        let num_keys = MAX_KEYS_PER_BLOCK * 2 + MAX_KEYS_PER_BLOCK / 2;
408        let keys = prepare_input_keys(num_keys.into());
409
410        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
411        let mut last_pk_index = None;
412        let mut metrics = WriteMetrics::default();
413        for key in &keys {
414            assert!(!builder.is_full());
415            let pk_index = builder.insert_key(key, None, &mut metrics);
416            last_pk_index = Some(pk_index);
417        }
418        assert_eq!(num_keys - 1, last_pk_index.unwrap());
419        let key_bytes: usize = keys.iter().map(|key| key.len() * 2).sum();
420        assert_eq!(key_bytes, metrics.key_bytes);
421
422        let mut expect: Vec<_> = keys
423            .into_iter()
424            .enumerate()
425            .map(|(i, key)| (key, i as PkIndex))
426            .collect();
427        expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));
428
429        let mut result = Vec::with_capacity(expect.len());
430        let reader = builder.read();
431        for i in 0..reader.num_keys() {
432            result.push((reader.key(i).to_vec(), reader.pk_index(i)));
433        }
434        assert_eq!(expect, result);
435    }
436
437    #[test]
438    fn test_dict_memory_size() {
439        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
440        let mut metrics = WriteMetrics::default();
441        // 513 keys
442        let num_keys = MAX_KEYS_PER_BLOCK * 2 + 1;
443        // Writes 2 blocks
444        for i in 0..num_keys {
445            // Each key is 5 bytes.
446            let key = format!("{i:05}");
447            builder.insert_key(key.as_bytes(), None, &mut metrics);
448        }
449        let key_bytes = num_keys as usize * 5;
450        assert_eq!(key_bytes * 2, metrics.key_bytes);
451        assert_eq!(key_bytes, builder.key_bytes_in_index);
452        assert_eq!(8850, builder.memory_size());
453
454        let (dict, _) = builder.finish().unwrap();
455        assert_eq!(0, builder.key_bytes_in_index);
456        assert_eq!(key_bytes, dict.key_bytes_in_index);
457        assert!(dict.shared_memory_size() > key_bytes);
458    }
459
460    #[test]
461    fn test_builder_finish() {
462        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
463        let mut metrics = WriteMetrics::default();
464        for i in 0..MAX_KEYS_PER_BLOCK * 2 {
465            let key = format!("{i:010}");
466            assert!(!builder.is_full());
467            builder.insert_key(key.as_bytes(), None, &mut metrics);
468        }
469        assert!(builder.is_full());
470        builder.finish();
471
472        assert!(!builder.is_full());
473        assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
474    }
475}