mito2/memtable/partition_tree/
dict.rs1use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
21
22use crate::memtable::partition_tree::PkIndex;
23use crate::memtable::stats::WriteMetrics;
24use crate::metrics::MEMTABLE_DICT_BYTES;
25
26const MAX_KEYS_PER_BLOCK: u16 = 256;
28
29type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
32
33pub struct KeyDictBuilder {
35 capacity: usize,
37 num_keys: usize,
39 pk_to_index: PkIndexMap,
41 key_buffer: KeyBuffer,
43 dict_blocks: Vec<DictBlock>,
45 key_bytes_in_index: usize,
47}
48
49impl KeyDictBuilder {
50 pub fn new(capacity: usize) -> Self {
52 Self {
53 capacity,
54 num_keys: 0,
55 pk_to_index: BTreeMap::new(),
56 key_buffer: KeyBuffer::new(MAX_KEYS_PER_BLOCK.into()),
57 dict_blocks: Vec::with_capacity(capacity / MAX_KEYS_PER_BLOCK as usize + 1),
58 key_bytes_in_index: 0,
59 }
60 }
61
62 pub fn is_full(&self) -> bool {
64 self.num_keys >= self.capacity
65 }
66
67 pub fn insert_key(
72 &mut self,
73 full_primary_key: &[u8],
74 sparse_key: Option<&[u8]>,
75 metrics: &mut WriteMetrics,
76 ) -> PkIndex {
77 assert!(!self.is_full());
78
79 if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
80 return pk_index;
82 }
83
84 if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
85 let dict_block = self.key_buffer.finish(false);
87 self.dict_blocks.push(dict_block);
88 }
89
90 let pk_index = self.key_buffer.push_key(full_primary_key);
92 let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
93 (Some(sparse_key.to_vec()), sparse_key.len())
94 } else {
95 (None, 0)
96 };
97 self.pk_to_index
98 .insert(full_primary_key.to_vec(), (pk_index, sparse_key));
99 self.num_keys += 1;
100
101 metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
103 self.key_bytes_in_index += full_primary_key.len();
104
105 MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
107
108 pk_index
109 }
110
111 #[cfg(test)]
113 pub fn memory_size(&self) -> usize {
114 self.key_bytes_in_index
115 + self.key_buffer.buffer_memory_size()
116 + self
117 .dict_blocks
118 .iter()
119 .map(|block| block.buffer_memory_size())
120 .sum::<usize>()
121 }
122
123 pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
125 if self.key_buffer.is_empty() {
126 return None;
127 }
128 let mut pk_to_index_map = BTreeMap::new();
129
130 let dict_block = self.key_buffer.finish(true);
132 self.dict_blocks.push(dict_block);
133 let mut key_positions = vec![0; self.pk_to_index.len()];
135
136 for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
137 .into_iter()
138 .enumerate()
139 {
140 key_positions[i] = pk_index;
142 if let Some(sparse_key) = sparse_key {
143 pk_to_index_map.insert(sparse_key, i as PkIndex);
144 }
145 }
146
147 self.num_keys = 0;
148 let key_bytes_in_index = self.key_bytes_in_index;
149 self.key_bytes_in_index = 0;
150
151 Some((
152 KeyDict {
153 dict_blocks: std::mem::take(&mut self.dict_blocks),
154 key_positions,
155 key_bytes_in_index,
156 },
157 pk_to_index_map,
158 ))
159 }
160
161 pub fn read(&self) -> DictBuilderReader {
163 let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
164 let block = self.key_buffer.finish_cloned();
165 let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
166 blocks.extend_from_slice(&self.dict_blocks);
167 blocks.push(block);
168
169 DictBuilderReader::new(blocks, sorted_pk_indices)
170 }
171}
172
173impl Drop for KeyDictBuilder {
174 fn drop(&mut self) {
175 MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
176 }
177}
178
179#[derive(Default)]
181pub struct DictBuilderReader {
182 blocks: Vec<DictBlock>,
183 sorted_pk_indices: Vec<PkIndex>,
184}
185
186impl DictBuilderReader {
187 fn new(blocks: Vec<DictBlock>, sorted_pk_indices: Vec<PkIndex>) -> Self {
188 Self {
189 blocks,
190 sorted_pk_indices,
191 }
192 }
193
194 #[cfg(test)]
196 pub fn num_keys(&self) -> usize {
197 self.sorted_pk_indices.len()
198 }
199
200 #[cfg(test)]
202 pub fn pk_index(&self, offset: usize) -> PkIndex {
203 self.sorted_pk_indices[offset]
204 }
205
206 #[cfg(test)]
208 pub fn key(&self, offset: usize) -> &[u8] {
209 let pk_index = self.pk_index(offset);
210 self.key_by_pk_index(pk_index)
211 }
212
213 pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
215 let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
216 self.blocks[block_idx as usize].key_by_pk_index(pk_index)
217 }
218
219 pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
221 compute_pk_weights(&self.sorted_pk_indices, pk_weights)
222 }
223}
224
225fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
227 pk_weights.resize(sorted_pk_indices.len(), 0);
228 for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
229 pk_weights[*pk_index as usize] = weight as u16;
230 }
231}
232
233#[derive(Default)]
235pub struct KeyDict {
236 dict_blocks: Vec<DictBlock>,
239 key_positions: Vec<PkIndex>,
241 key_bytes_in_index: usize,
243}
244
245pub type KeyDictRef = Arc<KeyDict>;
246
247impl KeyDict {
248 pub fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
253 let position = self.key_positions[index as usize];
254 let block_index = position / MAX_KEYS_PER_BLOCK;
255 self.dict_blocks[block_index as usize].key_by_pk_index(position)
256 }
257
258 pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
260 let mut pk_weights = Vec::with_capacity(self.key_positions.len());
261 compute_pk_weights(&self.key_positions, &mut pk_weights);
262 pk_weights
263 }
264
265 pub(crate) fn shared_memory_size(&self) -> usize {
267 self.key_bytes_in_index
268 + self
269 .dict_blocks
270 .iter()
271 .map(|block| block.buffer_memory_size())
272 .sum::<usize>()
273 }
274}
275
276impl Drop for KeyDict {
277 fn drop(&mut self) {
278 MEMTABLE_DICT_BYTES.sub(self.key_bytes_in_index as i64);
279 }
280}
281
282struct KeyBuffer {
284 key_builder: BinaryBuilder,
285 next_pk_index: usize,
286}
287
288impl KeyBuffer {
289 fn new(item_capacity: usize) -> Self {
290 Self {
291 key_builder: BinaryBuilder::with_capacity(item_capacity, 0),
292 next_pk_index: 0,
293 }
294 }
295
296 fn push_key(&mut self, key: &[u8]) -> PkIndex {
301 let pk_index = self.next_pk_index.try_into().unwrap();
302 self.next_pk_index += 1;
303 self.key_builder.append_value(key);
304
305 pk_index
306 }
307
308 fn len(&self) -> usize {
310 self.key_builder.len()
311 }
312
313 fn is_empty(&self) -> bool {
315 self.key_builder.is_empty()
316 }
317
318 #[cfg(test)]
320 fn buffer_memory_size(&self) -> usize {
321 self.key_builder.values_slice().len()
322 + std::mem::size_of_val(self.key_builder.offsets_slice())
323 + self
324 .key_builder
325 .validity_slice()
326 .map(|v| v.len())
327 .unwrap_or(0)
328 }
329
330 fn finish(&mut self, reset_index: bool) -> DictBlock {
331 let primary_key = self.key_builder.finish();
332 self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0);
336 if reset_index {
337 self.next_pk_index = 0;
338 }
339
340 DictBlock::new(primary_key)
341 }
342
343 fn finish_cloned(&self) -> DictBlock {
344 let primary_key = self.key_builder.finish_cloned();
345
346 DictBlock::new(primary_key)
347 }
348}
349
350#[derive(Clone)]
354struct DictBlock {
355 keys: BinaryArray,
357}
358
359impl DictBlock {
360 fn new(keys: BinaryArray) -> Self {
361 let buffer_size = keys.get_buffer_memory_size();
362 MEMTABLE_DICT_BYTES.add(buffer_size as i64);
363
364 Self { keys }
365 }
366
367 fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
368 let pos = index % MAX_KEYS_PER_BLOCK;
369 self.keys.value(pos as usize)
370 }
371
372 fn buffer_memory_size(&self) -> usize {
373 self.keys.get_buffer_memory_size()
374 }
375}
376
377impl Drop for DictBlock {
378 fn drop(&mut self) {
379 let buffer_size = self.keys.get_buffer_memory_size();
380 MEMTABLE_DICT_BYTES.sub(buffer_size as i64);
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use rand::Rng;
387
388 use super::*;
389
390 fn prepare_input_keys(num_keys: usize) -> Vec<Vec<u8>> {
391 let prefix = ["a", "b", "c", "d", "e", "f"];
392 let mut rng = rand::rng();
393 let mut keys = Vec::with_capacity(num_keys);
394 for i in 0..num_keys {
395 let prefix_idx = rng.random_range(0..prefix.len());
396 let key = format!("{}{}", prefix[prefix_idx], i);
399 keys.push(key.into_bytes());
400 }
401
402 keys
403 }
404
405 #[test]
406 fn test_write_scan_builder() {
407 let num_keys = MAX_KEYS_PER_BLOCK * 2 + MAX_KEYS_PER_BLOCK / 2;
408 let keys = prepare_input_keys(num_keys.into());
409
410 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
411 let mut last_pk_index = None;
412 let mut metrics = WriteMetrics::default();
413 for key in &keys {
414 assert!(!builder.is_full());
415 let pk_index = builder.insert_key(key, None, &mut metrics);
416 last_pk_index = Some(pk_index);
417 }
418 assert_eq!(num_keys - 1, last_pk_index.unwrap());
419 let key_bytes: usize = keys.iter().map(|key| key.len() * 2).sum();
420 assert_eq!(key_bytes, metrics.key_bytes);
421
422 let mut expect: Vec<_> = keys
423 .into_iter()
424 .enumerate()
425 .map(|(i, key)| (key, i as PkIndex))
426 .collect();
427 expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));
428
429 let mut result = Vec::with_capacity(expect.len());
430 let reader = builder.read();
431 for i in 0..reader.num_keys() {
432 result.push((reader.key(i).to_vec(), reader.pk_index(i)));
433 }
434 assert_eq!(expect, result);
435 }
436
437 #[test]
438 fn test_dict_memory_size() {
439 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 3).into());
440 let mut metrics = WriteMetrics::default();
441 let num_keys = MAX_KEYS_PER_BLOCK * 2 + 1;
443 for i in 0..num_keys {
445 let key = format!("{i:05}");
447 builder.insert_key(key.as_bytes(), None, &mut metrics);
448 }
449 let key_bytes = num_keys as usize * 5;
450 assert_eq!(key_bytes * 2, metrics.key_bytes);
451 assert_eq!(key_bytes, builder.key_bytes_in_index);
452 assert_eq!(8850, builder.memory_size());
453
454 let (dict, _) = builder.finish().unwrap();
455 assert_eq!(0, builder.key_bytes_in_index);
456 assert_eq!(key_bytes, dict.key_bytes_in_index);
457 assert!(dict.shared_memory_size() > key_bytes);
458 }
459
460 #[test]
461 fn test_builder_finish() {
462 let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
463 let mut metrics = WriteMetrics::default();
464 for i in 0..MAX_KEYS_PER_BLOCK * 2 {
465 let key = format!("{i:010}");
466 assert!(!builder.is_full());
467 builder.insert_key(key.as_bytes(), None, &mut metrics);
468 }
469 assert!(builder.is_full());
470 builder.finish();
471
472 assert!(!builder.is_full());
473 assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
474 }
475}