mito2/memtable/partition_tree/
dict.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Key dictionary of a shard.
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
21
22use crate::memtable::partition_tree::PkIndex;
23use crate::memtable::stats::WriteMetrics;
24use crate::metrics::MEMTABLE_DICT_BYTES;
25
26/// Maximum keys in a [DictBlock].
27const MAX_KEYS_PER_BLOCK: u16 = 256;
28
29/// The key is mcmp-encoded primary keys, while the values are the pk index and
30/// optionally sparsely encoded primary keys.
31type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
32
33/// Builder to build a key dictionary.
34pub struct KeyDictBuilder {
35    /// Max keys of the dictionary.
36    capacity: usize,
37    /// Number of keys in the builder.
38    num_keys: usize,
39    /// Maps primary key to pk index.
40    pk_to_index: PkIndexMap,
41    /// Buffer for active dict block.
42    key_buffer: KeyBuffer,
43    /// Dictionary blocks.
44    dict_blocks: Vec<DictBlock>,
45    /// Bytes allocated by keys in the index.
46    key_bytes_in_index: usize,
47}
48
49impl KeyDictBuilder {
50    /// Creates a new builder that can hold up to `capacity` keys.
51    pub fn new(capacity: usize) -> Self {
52        Self {
53            capacity,
54            num_keys: 0,
55            pk_to_index: BTreeMap::new(),
56            key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
57            dict_blocks: Vec::with_capacity(capacity / MAX_KEYS_PER_BLOCK as usize + 1),
58            key_bytes_in_index: 0,
59        }
60    }
61
62    /// Returns true if the builder is full.
63    pub fn is_full(&self) -> bool {
64        self.num_keys >= self.capacity
65    }
66
67    /// Adds the key to the builder and returns its index if the builder is not full.
68    ///
69    /// # Panics
70    /// Panics if the builder is full.
71    pub fn insert_key(
72        &mut self,
73        full_primary_key: &[u8],
74        sparse_key: Option<&[u8]>,
75        metrics: &mut WriteMetrics,
76    ) -> PkIndex {
77        assert!(!self.is_full());
78
79        if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
80            // Already in the builder.
81            return pk_index;
82        }
83
84        if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
85            // The write buffer is full. Freeze a dict block.
86            let dict_block = self.key_buffer.finish(false);
87            self.dict_blocks.push(dict_block);
88        }
89
90        // Safety: we have checked the buffer length.
91        let pk_index = self.key_buffer.push_key(full_primary_key);
92        let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
93            (Some(sparse_key.to_vec()), sparse_key.len())
94        } else {
95            (None, 0)
96        };
97        self.pk_to_index
98            .insert(full_primary_key.to_vec(), (pk_index, sparse_key));
99        self.num_keys += 1;
100
101        // Since we store the key twice so the bytes usage doubled.
102        metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
103        self.key_bytes_in_index += full_primary_key.len();
104
105        // Adds key size of index to the metrics.
106        MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
107
108        pk_index
109    }
110
111    /// Memory size of the builder.
112    #[cfg(test)]
113    pub fn memory_size(&self) -> usize {
114        self.key_bytes_in_index
115            + self.key_buffer.buffer_memory_size()
116            + self
117                .dict_blocks
118                .iter()
119                .map(|block| block.buffer_memory_size())
120                .sum::<usize>()
121    }
122
123    /// Finishes the builder. The key of the second BTreeMap is sparse-encoded bytes.
124    pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
125        if self.key_buffer.is_empty() {
126            return None;
127        }
128        let mut pk_to_index_map = BTreeMap::new();
129
130        // Finishes current dict block and resets the pk index.
131        let dict_block = self.key_buffer.finish(true);
132        self.dict_blocks.push(dict_block);
133        // Computes key position and then alter pk index.
134        let mut key_positions = vec![0; self.pk_to_index.len()];
135
136        for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
137            .into_iter()
138            .enumerate()
139        {
140            // The position of the i-th key is the old pk index.
141            key_positions[i] = pk_index;
142            if let Some(sparse_key) = sparse_key {
143                pk_to_index_map.insert(sparse_key, i as PkIndex);
144            }
145            pk_to_index_map.insert(full_pk, i as PkIndex);
146        }
147
148        self.num_keys = 0;
149        let key_bytes_in_index = self.key_bytes_in_index;
150        self.key_bytes_in_index = 0;
151
152        Some((
153            KeyDict {
154                dict_blocks: std::mem::take(&mut self.dict_blocks),
155                key_positions,
156                key_bytes_in_index,
157            },
158            pk_to_index_map,
159        ))
160    }
161
162    /// Reads the builder.
163    pub fn read(&self) -> DictBuilderReader {
164        let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
165        let block = self.key_buffer.finish_cloned();
166        let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
167        blocks.extend_from_slice(&self.dict_blocks);
168        blocks.push(block);
169
170        DictBuilderReader::new(blocks, sorted_pk_indices)
171    }
172}
173
174impl Drop for KeyDictBuilder {
175    fn drop(&mut self) {
176        MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
177    }
178}
179
180/// Reader to scan the [KeyDictBuilder].
181#[derive(Default)]
182pub struct DictBuilderReader {
183    blocks: Vec<DictBlock>,
184    sorted_pk_indices: Vec<PkIndex>,
185}
186
187impl DictBuilderReader {
188    fn new(blocks: Vec<DictBlock>, sorted_pk_indices: Vec<PkIndex>) -> Self {
189        Self {
190            blocks,
191            sorted_pk_indices,
192        }
193    }
194
195    /// Returns the number of keys.
196    #[cfg(test)]
197    pub fn num_keys(&self) -> usize {
198        self.sorted_pk_indices.len()
199    }
200
201    /// Gets the i-th pk index.
202    #[cfg(test)]
203    pub fn pk_index(&self, offset: usize) -> PkIndex {
204        self.sorted_pk_indices[offset]
205    }
206
207    /// Gets the i-th key.
208    #[cfg(test)]
209    pub fn key(&self, offset: usize) -> &[u8] {
210        let pk_index = self.pk_index(offset);
211        self.key_by_pk_index(pk_index)
212    }
213
214    /// Gets the key by the pk index.
215    pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
216        let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
217        self.blocks[block_idx as usize].key_by_pk_index(pk_index)
218    }
219
220    /// Returns pk weights to sort a data part and replaces pk indices.
221    pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
222        compute_pk_weights(&self.sorted_pk_indices, pk_weights)
223    }
224}
225
226/// Returns pk weights to sort a data part and replaces pk indices.
227fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
228    pk_weights.resize(sorted_pk_indices.len(), 0);
229    for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
230        pk_weights[*pk_index as usize] = weight as u16;
231    }
232}
233
234/// A key dictionary.
235#[derive(Default)]
236pub struct KeyDict {
237    // TODO(yingwen): We can use key_positions to do a binary search.
238    /// Unsorted key blocks.
239    dict_blocks: Vec<DictBlock>,
240    /// Maps pk index to position of the key in [Self::dict_blocks].
241    key_positions: Vec<PkIndex>,
242    /// Bytes of keys in the index.
243    key_bytes_in_index: usize,
244}
245
246pub type KeyDictRef = Arc<KeyDict>;
247
248impl KeyDict {
249    /// Gets the primary key by its index.
250    ///
251    /// # Panics
252    /// Panics if the index is invalid.
253    pub fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
254        let position = self.key_positions[index as usize];
255        let block_index = position / MAX_KEYS_PER_BLOCK;
256        self.dict_blocks[block_index as usize].key_by_pk_index(position)
257    }
258
259    /// Returns pk weights to sort a data part and replaces pk indices.
260    pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
261        let mut pk_weights = Vec::with_capacity(self.key_positions.len());
262        compute_pk_weights(&self.key_positions, &mut pk_weights);
263        pk_weights
264    }
265
266    /// Returns the shared memory size.
267    pub(crate) fn shared_memory_size(&self) -> usize {
268        self.key_bytes_in_index
269            + self
270                .dict_blocks
271                .iter()
272                .map(|block| block.buffer_memory_size())
273                .sum::<usize>()
274    }
275}
276
277impl Drop for KeyDict {
278    fn drop(&mut self) {
279        MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
280    }
281}
282
283/// Buffer to store unsorted primary keys.
284struct KeyBuffer {
285    key_builder: BinaryBuilder,
286    next_pk_index: usize,
287}
288
289impl KeyBuffer {
290    fn new(item_capacity: usize) -> Self {
291        Self {
292            key_builder: BinaryBuilder::with_capacity(item_capacity, 0),
293            next_pk_index: 0,
294        }
295    }
296
297    /// Pushes a new key and returns its pk index.
298    ///
299    /// # Panics
300    /// Panics if the [PkIndex] type cannot represent the index.
301    fn push_key(&mut self, key: &[u8]) -> PkIndex {
302        let pk_index = self.next_pk_index.try_into().unwrap();
303        self.next_pk_index += 1;
304        self.key_builder.append_value(key);
305
306        pk_index
307    }
308
309    /// Returns number of items in the buffer.
310    fn len(&self) -> usize {
311        self.key_builder.len()
312    }
313
314    /// Returns whether the buffer is empty.
315    fn is_empty(&self) -> bool {
316        self.key_builder.is_empty()
317    }
318
319    /// Returns the buffer size of the builder.
320    #[cfg(test)]
321    fn buffer_memory_size(&self) -> usize {
322        self.key_builder.values_slice().len()
323            + std::mem::size_of_val(self.key_builder.offsets_slice())
324            + self
325                .key_builder
326                .validity_slice()
327                .map(|v| v.len())
328                .unwrap_or(0)
329    }
330
331    fn finish(&mut self, reset_index: bool) -> DictBlock {
332        let primary_key = self.key_builder.finish();
333        // Reserve capacity for the new builder. `finish()` the builder will leave the builder
334        // empty with capacity 0.
335        // TODO(yingwen): Do we need to reserve capacity for data?
336        self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0);
337        if reset_index {
338            self.next_pk_index = 0;
339        }
340
341        DictBlock::new(primary_key)
342    }
343
344    fn finish_cloned(&self) -> DictBlock {
345        let primary_key = self.key_builder.finish_cloned();
346
347        DictBlock::new(primary_key)
348    }
349}
350
351/// A block in the key dictionary.
352///
353/// The block is cheap to clone. Keys in the block are unsorted.
354#[derive(Clone)]
355struct DictBlock {
356    /// Container of keys in the block.
357    keys: BinaryArray,
358}
359
360impl DictBlock {
361    fn new(keys: BinaryArray) -> Self {
362        let buffer_size = keys.get_buffer_memory_size();
363        MEMTABLE_DICT_BYTES.add(buffer_size as i64);
364
365        Self { keys }
366    }
367
368    fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
369        let pos = index % MAX_KEYS_PER_BLOCK;
370        self.keys.value(pos as usize)
371    }
372
373    fn buffer_memory_size(&self) -> usize {
374        self.keys.get_buffer_memory_size()
375    }
376}
377
378impl Drop for DictBlock {
379    fn drop(&mut self) {
380        let buffer_size = self.keys.get_buffer_memory_size();
381        MEMTABLE_DICT_BYTES.sub(buffer_size as i64);
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use rand::Rng;
388
389    use super::*;
390
391    fn prepare_input_keys(num_keys: usize) -> Vec<Vec<u8>> {
392        let prefix = ["a", "b", "c", "d", "e", "f"];
393        let mut rng = rand::rng();
394        let mut keys = Vec::with_capacity(num_keys);
395        for i in 0..num_keys {
396            let prefix_idx = rng.random_range(0..prefix.len());
397            // We don't need to decode the primary key in index's test so we format the string
398            // into the key.
399            let key = format!("{}{}", prefix[prefix_idx], i);
400            keys.push(key.into_bytes());
401        }
402
403        keys
404    }
405
406    #[test]
407    fn test_write_scan_builder() {
408        let num_keys = MAX_KEYS_PER_BLOCK * 2 + MAX_KEYS_PER_BLOCK / 2;
409        let keys = prepare_input_keys(num_keys.into());
410
411        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
412        let mut last_pk_index = None;
413        let mut metrics = WriteMetrics::default();
414        for key in &keys {
415            assert!(!builder.is_full());
416            let pk_index = builder.insert_key(key, None, &mut metrics);
417            last_pk_index = Some(pk_index);
418        }
419        assert_eq!(num_keys - 1, last_pk_index.unwrap());
420        let key_bytes: usize = keys.iter().map(|key| key.len() * 2).sum();
421        assert_eq!(key_bytes, metrics.key_bytes);
422
423        let mut expect: Vec<_> = keys
424            .into_iter()
425            .enumerate()
426            .map(|(i, key)| (key, i as PkIndex))
427            .collect();
428        expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));
429
430        let mut result = Vec::with_capacity(expect.len());
431        let reader = builder.read();
432        for i in 0..reader.num_keys() {
433            result.push((reader.key(i).to_vec(), reader.pk_index(i)));
434        }
435        assert_eq!(expect, result);
436    }
437
438    #[test]
439    fn test_dict_memory_size() {
440        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
441        let mut metrics = WriteMetrics::default();
442        // 513 keys
443        let num_keys = MAX_KEYS_PER_BLOCK * 2 + 1;
444        // Writes 2 blocks
445        for i in 0..num_keys {
446            // Each key is 5 bytes.
447            let key = format!("{i:05}");
448            builder.insert_key(key.as_bytes(), None, &mut metrics);
449        }
450        let key_bytes = num_keys as usize * 5;
451        assert_eq!(key_bytes * 2, metrics.key_bytes);
452        assert_eq!(key_bytes, builder.key_bytes_in_index);
453        assert_eq!(8850, builder.memory_size());
454
455        let (dict, _) = builder.finish().unwrap();
456        assert_eq!(0, builder.key_bytes_in_index);
457        assert_eq!(key_bytes, dict.key_bytes_in_index);
458        assert!(dict.shared_memory_size() > key_bytes);
459    }
460
461    #[test]
462    fn test_builder_finish() {
463        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
464        let mut metrics = WriteMetrics::default();
465        for i in 0..MAX_KEYS_PER_BLOCK * 2 {
466            let key = format!("{i:010}");
467            assert!(!builder.is_full());
468            builder.insert_key(key.as_bytes(), None, &mut metrics);
469        }
470        assert!(builder.is_full());
471        builder.finish();
472
473        assert!(!builder.is_full());
474        assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
475    }
476
477    #[test]
478    fn test_builder_finish_with_sparse_key() {
479        let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
480        let mut metrics = WriteMetrics::default();
481        let full_key = "42".to_string();
482        let sparse_key = &[42u8];
483
484        builder.insert_key(full_key.as_bytes(), Some(sparse_key), &mut metrics);
485        let (dict, pk_to_pk_id) = builder.finish().unwrap();
486        assert_eq!(dict.key_positions.len(), 1);
487        assert_eq!(dict.dict_blocks.len(), 1);
488        assert_eq!(
489            pk_to_pk_id.get(sparse_key.as_slice()),
490            pk_to_pk_id.get(full_key.as_bytes())
491        );
492    }
493}