mito2/memtable/partition_tree/
dict.rs1use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
21
22use crate::memtable::partition_tree::PkIndex;
23use crate::memtable::stats::WriteMetrics;
24use crate::metrics::MEMTABLE_DICT_BYTES;
25
26const MAX_KEYS_PER_BLOCK: u16 = 256;
28
29type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
32
33pub struct KeyDictBuilder {
35 capacity: usize,
37 num_keys: usize,
39 pk_to_index: PkIndexMap,
41 key_buffer: KeyBuffer,
43 dict_blocks: Vec<DictBlock>,
45 key_bytes_in_index: usize,
47}
48
49impl KeyDictBuilder {
50 pub fn new(capacity: usize) -> Self {
52 Self {
53 capacity,
54 num_keys: 0,
55 pk_to_index: BTreeMap::new(),
56 key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
57 dict_blocks: Vec::with_capacity(capacity / MAX_KEYS_PER_BLOCK as usize + 1),
58 key_bytes_in_index: 0,
59 }
60 }
61
62 pub fn is_full(&self) -> bool {
64 self.num_keys >= self.capacity
65 }
66
67 pub fn insert_key(
72 &mut self,
73 full_primary_key: &[u8],
74 sparse_key: Option<&[u8]>,
75 metrics: &mut WriteMetrics,
76 ) -> PkIndex {
77 assert!(!self.is_full());
78
79 if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
80 return pk_index;
82 }
83
84 if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
85 let dict_block = self.key_buffer.finish(false);
87 self.dict_blocks.push(dict_block);
88 }
89
90 let pk_index = self.key_buffer.push_key(full_primary_key);
92 let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
93 (Some(sparse_key.to_vec()), sparse_key.len())
94 } else {
95 (None, 0)
96 };
97 self.pk_to_index
98 .insert(full_primary_key.to_vec(), (pk_index, sparse_key));
99 self.num_keys += 1;
100
101 metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
103 self.key_bytes_in_index += full_primary_key.len();
104
105 MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
107
108 pk_index
109 }
110
111 #[cfg(test)]
113 pub fn memory_size(&self) -> usize {
114 self.key_bytes_in_index
115 + self.key_buffer.buffer_memory_size()
116 + self
117 .dict_blocks
118 .iter()
119 .map(|block| block.buffer_memory_size())
120 .sum::<usize>()
121 }
122
123 pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
125 if self.key_buffer.is_empty() {
126 return None;
127 }
128 let mut pk_to_index_map = BTreeMap::new();
129
130 let dict_block = self.key_buffer.finish(true);
132 self.dict_blocks.push(dict_block);
133 let mut key_positions = vec![0; self.pk_to_index.len()];
135
136 for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
137 .into_iter()
138 .enumerate()
139 {
140 key_positions[i] = pk_index;
142 if let Some(sparse_key) = sparse_key {
143 pk_to_index_map.insert(sparse_key, i as PkIndex);
144 }
145 pk_to_index_map.insert(full_pk, i as PkIndex);
146 }
147
148 self.num_keys = 0;
149 let key_bytes_in_index = self.key_bytes_in_index;
150 self.key_bytes_in_index = 0;
151
152 Some((
153 KeyDict {
154 dict_blocks: std::mem::take(&mut self.dict_blocks),
155 key_positions,
156 key_bytes_in_index,
157 },
158 pk_to_index_map,
159 ))
160 }
161
162 pub fn read(&self) -> DictBuilderReader {
164 let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
165 let block = self.key_buffer.finish_cloned();
166 let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
167 blocks.extend_from_slice(&self.dict_blocks);
168 blocks.push(block);
169
170 DictBuilderReader::new(blocks, sorted_pk_indices)
171 }
172}
173
174impl Drop for KeyDictBuilder {
175 fn drop(&mut self) {
176 MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
177 }
178}
179
180#[derive(Default)]
182pub struct DictBuilderReader {
183 blocks: Vec<DictBlock>,
184 sorted_pk_indices: Vec<PkIndex>,
185}
186
187impl DictBuilderReader {
188 fn new(blocks: Vec<DictBlock>, sorted_pk_indices: Vec<PkIndex>) -> Self {
189 Self {
190 blocks,
191 sorted_pk_indices,
192 }
193 }
194
195 #[cfg(test)]
197 pub fn num_keys(&self) -> usize {
198 self.sorted_pk_indices.len()
199 }
200
201 #[cfg(test)]
203 pub fn pk_index(&self, offset: usize) -> PkIndex {
204 self.sorted_pk_indices[offset]
205 }
206
207 #[cfg(test)]
209 pub fn key(&self, offset: usize) -> &[u8] {
210 let pk_index = self.pk_index(offset);
211 self.key_by_pk_index(pk_index)
212 }
213
214 pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
216 let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
217 self.blocks[block_idx as usize].key_by_pk_index(pk_index)
218 }
219
220 pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
222 compute_pk_weights(&self.sorted_pk_indices, pk_weights)
223 }
224}
225
226fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
228 pk_weights.resize(sorted_pk_indices.len(), 0);
229 for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
230 pk_weights[*pk_index as usize] = weight as u16;
231 }
232}
233
234#[derive(Default)]
236pub struct KeyDict {
237 dict_blocks: Vec<DictBlock>,
240 key_positions: Vec<PkIndex>,
242 key_bytes_in_index: usize,
244}
245
246pub type KeyDictRef = Arc<KeyDict>;
247
248impl KeyDict {
249 pub fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
254 let position = self.key_positions[index as usize];
255 let block_index = position / MAX_KEYS_PER_BLOCK;
256 self.dict_blocks[block_index as usize].key_by_pk_index(position)
257 }
258
259 pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
261 let mut pk_weights = Vec::with_capacity(self.key_positions.len());
262 compute_pk_weights(&self.key_positions, &mut pk_weights);
263 pk_weights
264 }
265
266 pub(crate) fn shared_memory_size(&self) -> usize {
268 self.key_bytes_in_index
269 + self
270 .dict_blocks
271 .iter()
272 .map(|block| block.buffer_memory_size())
273 .sum::<usize>()
274 }
275}
276
277impl Drop for KeyDict {
278 fn drop(&mut self) {
279 MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
280 }
281}
282
283struct KeyBuffer {
285 key_builder: BinaryBuilder,
286 next_pk_index: usize,
287}
288
289impl KeyBuffer {
290 fn new(item_capacity: usize) -> Self {
291 Self {
292 key_builder: BinaryBuilder::with_capacity(item_capacity, 0),
293 next_pk_index: 0,
294 }
295 }
296
297 fn push_key(&mut self, key: &[u8]) -> PkIndex {
302 let pk_index = self.next_pk_index.try_into().unwrap();
303 self.next_pk_index += 1;
304 self.key_builder.append_value(key);
305
306 pk_index
307 }
308
309 fn len(&self) -> usize {
311 self.key_builder.len()
312 }
313
314 fn is_empty(&self) -> bool {
316 self.key_builder.is_empty()
317 }
318
319 #[cfg(test)]
321 fn buffer_memory_size(&self) -> usize {
322 self.key_builder.values_slice().len()
323 + std::mem::size_of_val(self.key_builder.offsets_slice())
324 + self
325 .key_builder
326 .validity_slice()
327 .map(|v| v.len())
328 .unwrap_or(0)
329 }
330
331 fn finish(&mut self, reset_index: bool) -> DictBlock {
332 let primary_key = self.key_builder.finish();
333 self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0);
337 if reset_index {
338 self.next_pk_index = 0;
339 }
340
341 DictBlock::new(primary_key)
342 }
343
344 fn finish_cloned(&self) -> DictBlock {
345 let primary_key = self.key_builder.finish_cloned();
346
347 DictBlock::new(primary_key)
348 }
349}
350
351#[derive(Clone)]
355struct DictBlock {
356 keys: BinaryArray,
358}
359
360impl DictBlock {
361 fn new(keys: BinaryArray) -> Self {
362 let buffer_size = keys.get_buffer_memory_size();
363 MEMTABLE_DICT_BYTES.add(buffer_size as i64);
364
365 Self { keys }
366 }
367
368 fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
369 let pos = index % MAX_KEYS_PER_BLOCK;
370 self.keys.value(pos as usize)
371 }
372
373 fn buffer_memory_size(&self) -> usize {
374 self.keys.get_buffer_memory_size()
375 }
376}
377
378impl Drop for DictBlock {
379 fn drop(&mut self) {
380 let buffer_size = self.keys.get_buffer_memory_size();
381 MEMTABLE_DICT_BYTES.sub(buffer_size as i64);
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use rand::Rng;
388
389 use super::*;
390
391 fn prepare_input_keys(num_keys: usize) -> Vec<Vec<u8>> {
392 let prefix = ["a", "b", "c", "d", "e", "f"];
393 let mut rng = rand::rng();
394 let mut keys = Vec::with_capacity(num_keys);
395 for i in 0..num_keys {
396 let prefix_idx = rng.random_range(0..prefix.len());
397 let key = format!("{}{}", prefix[prefix_idx], i);
400 keys.push(key.into_bytes());
401 }
402
403 keys
404 }
405
406 #[test]
407 fn test_write_scan_builder() {
408 let num_keys = MAX_KEYS_PER_BLOCK * 2 + MAX_KEYS_PER_BLOCK / 2;
409 let keys = prepare_input_keys(num_keys.into());
410
411 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
412 let mut last_pk_index = None;
413 let mut metrics = WriteMetrics::default();
414 for key in &keys {
415 assert!(!builder.is_full());
416 let pk_index = builder.insert_key(key, None, &mut metrics);
417 last_pk_index = Some(pk_index);
418 }
419 assert_eq!(num_keys - 1, last_pk_index.unwrap());
420 let key_bytes: usize = keys.iter().map(|key| key.len() * 2).sum();
421 assert_eq!(key_bytes, metrics.key_bytes);
422
423 let mut expect: Vec<_> = keys
424 .into_iter()
425 .enumerate()
426 .map(|(i, key)| (key, i as PkIndex))
427 .collect();
428 expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));
429
430 let mut result = Vec::with_capacity(expect.len());
431 let reader = builder.read();
432 for i in 0..reader.num_keys() {
433 result.push((reader.key(i).to_vec(), reader.pk_index(i)));
434 }
435 assert_eq!(expect, result);
436 }
437
438 #[test]
439 fn test_dict_memory_size() {
440 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
441 let mut metrics = WriteMetrics::default();
442 let num_keys = MAX_KEYS_PER_BLOCK * 2 + 1;
444 for i in 0..num_keys {
446 let key = format!("{i:05}");
448 builder.insert_key(key.as_bytes(), None, &mut metrics);
449 }
450 let key_bytes = num_keys as usize * 5;
451 assert_eq!(key_bytes * 2, metrics.key_bytes);
452 assert_eq!(key_bytes, builder.key_bytes_in_index);
453 assert_eq!(8850, builder.memory_size());
454
455 let (dict, _) = builder.finish().unwrap();
456 assert_eq!(0, builder.key_bytes_in_index);
457 assert_eq!(key_bytes, dict.key_bytes_in_index);
458 assert!(dict.shared_memory_size() > key_bytes);
459 }
460
461 #[test]
462 fn test_builder_finish() {
463 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
464 let mut metrics = WriteMetrics::default();
465 for i in 0..MAX_KEYS_PER_BLOCK * 2 {
466 let key = format!("{i:010}");
467 assert!(!builder.is_full());
468 builder.insert_key(key.as_bytes(), None, &mut metrics);
469 }
470 assert!(builder.is_full());
471 builder.finish();
472
473 assert!(!builder.is_full());
474 assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
475 }
476
477 #[test]
478 fn test_builder_finish_with_sparse_key() {
479 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
480 let mut metrics = WriteMetrics::default();
481 let full_key = "42".to_string();
482 let sparse_key = &[42u8];
483
484 builder.insert_key(full_key.as_bytes(), Some(sparse_key), &mut metrics);
485 let (dict, pk_to_pk_id) = builder.finish().unwrap();
486 assert_eq!(dict.key_positions.len(), 1);
487 assert_eq!(dict.dict_blocks.len(), 1);
488 assert_eq!(
489 pk_to_pk_id.get(sparse_key.as_slice()),
490 pk_to_pk_id.get(full_key.as_bytes())
491 );
492 }
493}