mito2/memtable/partition_tree/
merger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::BinaryHeap;
17use std::fmt::Debug;
18use std::ops::Range;
19
20use crate::error::Result;
21use crate::memtable::partition_tree::data::{DataBatch, DataBufferReader, DataPartReader};
22use crate::memtable::partition_tree::PkIndex;
23
24/// Nodes of merger's heap.
25pub trait Node: Ord {
26    /// Returns true if current node is not exhausted.
27    fn is_valid(&self) -> bool;
28
29    /// Whether the other node is behind (exclusive) current node.
30    fn is_behind(&self, other: &Self) -> bool;
31
32    /// Advances `len` rows from current batch. If current batch is empty it fetches
33    /// next batch from the node.
34    ///
35    /// # Panics
36    /// If the node is invalid.
37    fn advance(&mut self, len: usize) -> Result<()>;
38
39    /// Length of current item.
40    fn current_item_len(&self) -> usize;
41
42    /// Searches first key of `other` in current item and returns the index.
43    fn search_key_in_current_item(&self, other: &Self) -> Result<usize, usize>;
44}
45
46pub struct Merger<T: Node> {
47    /// Heap to find node to read.
48    ///
49    /// Nodes in the heap are always valid.
50    heap: BinaryHeap<T>,
51    /// Current node to read.
52    ///
53    /// The node is always valid if it is not None.
54    current_node: Option<T>,
55    /// The number of rows in current node that are valid to read.
56    current_rows: usize,
57}
58
59impl<T: Node> Merger<T> {
60    pub(crate) fn try_new(nodes: Vec<T>) -> Result<Self> {
61        let mut heap = BinaryHeap::with_capacity(nodes.len());
62        for node in nodes {
63            if node.is_valid() {
64                heap.push(node);
65            }
66        }
67        let mut merger = Merger {
68            heap,
69            current_node: None,
70            current_rows: 0,
71        };
72        merger.next()?;
73        Ok(merger)
74    }
75
76    /// Returns true if current merger is still valid.
77    pub(crate) fn is_valid(&self) -> bool {
78        self.current_node.is_some()
79    }
80
81    /// Returns current node to read. Only [Self::current_rows] rows in current node
82    /// are valid to read.
83    ///
84    /// # Panics
85    /// Panics if the merger is invalid.
86    pub(crate) fn current_node(&self) -> &T {
87        self.current_node.as_ref().unwrap()
88    }
89
90    /// Returns rows of current node to read.
91    pub(crate) fn current_rows(&self) -> usize {
92        self.current_rows
93    }
94
95    /// Advances the merger to the next item.
96    pub(crate) fn next(&mut self) -> Result<()> {
97        self.maybe_advance_current_node()?;
98        debug_assert!(self.current_node.is_none());
99
100        // Finds node and range to read from the heap.
101        let Some(top_node) = self.heap.pop() else {
102            // Heap is empty.
103            return Ok(());
104        };
105        if let Some(next_node) = self.heap.peek() {
106            if next_node.is_behind(&top_node) {
107                // Does not overlap.
108                self.current_rows = top_node.current_item_len();
109            } else {
110                // Note that the heap ensures the top node always has the minimal row.
111                match top_node.search_key_in_current_item(next_node) {
112                    Ok(pos) => {
113                        if pos == 0 {
114                            // If the first item of top node has duplicate key with the next node,
115                            // we can simply return the first row in the top node as it must be the one
116                            // with max sequence.
117                            self.current_rows = 1;
118                        } else {
119                            // We don't know which one has the larger sequence so we use the range before
120                            // the duplicate pos.
121                            self.current_rows = pos;
122                        }
123                    }
124                    Err(pos) => {
125                        // No duplication. Output rows before pos.
126                        debug_assert!(pos > 0);
127                        self.current_rows = pos;
128                    }
129                }
130            }
131        } else {
132            // Top is the only node left. We can read all rows in it.
133            self.current_rows = top_node.current_item_len();
134        }
135        self.current_node = Some(top_node);
136
137        Ok(())
138    }
139
140    fn maybe_advance_current_node(&mut self) -> Result<()> {
141        let Some(mut node) = self.current_node.take() else {
142            return Ok(());
143        };
144
145        // Advances current node.
146        node.advance(self.current_rows)?;
147        self.current_rows = 0;
148        if !node.is_valid() {
149            return Ok(());
150        }
151
152        // Puts the node into the heap.
153        self.heap.push(node);
154        Ok(())
155    }
156}
157
158#[derive(Debug)]
159pub(crate) struct DataBatchKey {
160    pub(crate) pk_index: PkIndex,
161    pub(crate) timestamp: i64,
162}
163
164pub(crate) enum DataSource {
165    Buffer(DataBufferReader),
166    Part(DataPartReader),
167}
168
169impl DataSource {
170    fn current_data_batch(&self) -> DataBatch {
171        match self {
172            DataSource::Buffer(buffer) => buffer.current_data_batch(),
173            DataSource::Part(p) => p.current_data_batch(),
174        }
175    }
176
177    fn is_valid(&self) -> bool {
178        match self {
179            DataSource::Buffer(b) => b.is_valid(),
180            DataSource::Part(p) => p.is_valid(),
181        }
182    }
183
184    fn next(&mut self) -> Result<()> {
185        match self {
186            DataSource::Buffer(b) => b.next(),
187            DataSource::Part(p) => p.next(),
188        }
189    }
190}
191
192pub(crate) struct DataNode {
193    source: DataSource,
194    /// Current range of the batch in the source.
195    current_range: Option<Range<usize>>,
196}
197
198impl DataNode {
199    pub(crate) fn new(source: DataSource) -> Self {
200        let current_range = source
201            .is_valid()
202            .then(|| 0..source.current_data_batch().range().len());
203
204        Self {
205            source,
206            current_range,
207        }
208    }
209
210    pub(crate) fn current_data_batch(&self) -> DataBatch {
211        let range = self.current_range();
212        let batch = self.source.current_data_batch();
213        batch.slice(range.start, range.len())
214    }
215
216    fn current_range(&self) -> Range<usize> {
217        self.current_range.clone().unwrap()
218    }
219}
220
221impl Ord for DataNode {
222    fn cmp(&self, other: &Self) -> Ordering {
223        let weight = self.current_data_batch().pk_index();
224        let (ts_start, sequence) = self.current_data_batch().first_row();
225        let other_weight = other.current_data_batch().pk_index();
226        let (other_ts_start, other_sequence) = other.current_data_batch().first_row();
227        (weight, ts_start, Reverse(sequence))
228            .cmp(&(other_weight, other_ts_start, Reverse(other_sequence)))
229            .reverse()
230    }
231}
232
233impl Eq for DataNode {}
234
235impl PartialEq<Self> for DataNode {
236    fn eq(&self, other: &Self) -> bool {
237        self.current_data_batch()
238            .first_row()
239            .eq(&other.current_data_batch().first_row())
240    }
241}
242
243impl PartialOrd<Self> for DataNode {
244    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
245        Some(self.cmp(other))
246    }
247}
248
249impl Node for DataNode {
250    fn is_valid(&self) -> bool {
251        self.current_range.is_some()
252    }
253
254    fn is_behind(&self, other: &Self) -> bool {
255        let pk_weight = self.current_data_batch().pk_index();
256        let (start, seq) = self.current_data_batch().first_row();
257        let other_pk_weight = other.current_data_batch().pk_index();
258        let (other_end, other_seq) = other.current_data_batch().last_row();
259        (pk_weight, start, Reverse(seq)) > (other_pk_weight, other_end, Reverse(other_seq))
260    }
261
262    fn advance(&mut self, len: usize) -> Result<()> {
263        let mut range = self.current_range();
264        debug_assert!(range.len() >= len);
265
266        let remaining = range.len() - len;
267        if remaining == 0 {
268            // Nothing remains, we need to fetch next batch to ensure the current batch is not empty.
269            self.source.next()?;
270            if self.source.is_valid() {
271                self.current_range = Some(0..self.source.current_data_batch().range().len());
272            } else {
273                // The node is exhausted.
274                self.current_range = None;
275            }
276        } else {
277            range.start += len;
278            self.current_range = Some(range);
279        }
280
281        Ok(())
282    }
283
284    fn current_item_len(&self) -> usize {
285        self.current_range.clone().unwrap().len()
286    }
287
288    fn search_key_in_current_item(&self, other: &Self) -> Result<usize, usize> {
289        let key = other.current_data_batch().first_key();
290        self.current_data_batch().search_key(&key)
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use datatypes::arrow::array::UInt64Array;
297    use store_api::metadata::RegionMetadataRef;
298
299    use super::*;
300    use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBuffer};
301    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
302
303    fn write_rows_to_buffer(
304        buffer: &mut DataBuffer,
305        schema: &RegionMetadataRef,
306        pk_index: u16,
307        ts: Vec<i64>,
308        sequence: &mut u64,
309    ) {
310        let rows = ts.len() as u64;
311        let v0 = ts.iter().map(|v| Some(*v as f64)).collect::<Vec<_>>();
312        let kvs = build_key_values_with_ts_seq_values(
313            schema,
314            "whatever".to_string(),
315            1,
316            ts.into_iter(),
317            v0.into_iter(),
318            *sequence,
319        );
320
321        for kv in kvs.iter() {
322            buffer.write_row(pk_index, &kv);
323        }
324
325        *sequence += rows;
326    }
327
328    fn check_merger_read(nodes: Vec<DataNode>, expected: &[(u16, Vec<(i64, u64)>)]) {
329        let mut merger = Merger::try_new(nodes).unwrap();
330
331        let mut res = vec![];
332        while merger.is_valid() {
333            let data_batch = merger.current_node().current_data_batch();
334            let data_batch = data_batch.slice(0, merger.current_rows());
335            let batch = data_batch.slice_record_batch();
336            let ts_array = batch.column(1);
337            let ts_values: Vec<_> = timestamp_array_to_i64_slice(ts_array).to_vec();
338            let ts_and_seq = ts_values
339                .into_iter()
340                .zip(
341                    batch
342                        .column(2)
343                        .as_any()
344                        .downcast_ref::<UInt64Array>()
345                        .unwrap()
346                        .iter(),
347                )
348                .map(|(ts, seq)| (ts, seq.unwrap()))
349                .collect::<Vec<_>>();
350
351            res.push((data_batch.pk_index(), ts_and_seq));
352            merger.next().unwrap();
353        }
354        assert_eq!(expected, &res);
355    }
356
357    #[test]
358    fn test_merger() {
359        let metadata = metadata_for_test();
360        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
361        let weight = &[2, 1, 0];
362        let mut seq = 0;
363        write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
364        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
365        let node1 = DataNode::new(DataSource::Part(
366            buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
367        ));
368
369        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
370        write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
371        write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
372        let node2 = DataNode::new(DataSource::Part(
373            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
374        ));
375
376        check_merger_read(
377            vec![node1, node2],
378            &[
379                (1, vec![(2, 0)]),
380                (1, vec![(3, 4)]),
381                (1, vec![(3, 1)]),
382                (2, vec![(1, 5)]),
383                (2, vec![(1, 2), (2, 3)]),
384            ],
385        );
386    }
387
388    #[test]
389    fn test_merger2() {
390        let metadata = metadata_for_test();
391        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
392        let weight = &[2, 1, 0];
393        let mut seq = 0;
394        write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
395        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
396        let node1 = DataNode::new(DataSource::Part(
397            buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
398        ));
399
400        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
401        write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
402        let node2 = DataNode::new(DataSource::Part(
403            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
404        ));
405
406        let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
407        write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
408        let node3 = DataNode::new(DataSource::Part(
409            buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
410        ));
411
412        check_merger_read(
413            vec![node1, node3, node2],
414            &[
415                (1, vec![(2, 0)]),
416                (1, vec![(3, 4)]),
417                (1, vec![(3, 1)]),
418                (2, vec![(1, 2)]),
419                (2, vec![(2, 5)]),
420                (2, vec![(2, 3)]),
421                (2, vec![(3, 6)]),
422            ],
423        );
424    }
425
426    #[test]
427    fn test_merger_overlapping() {
428        let metadata = metadata_for_test();
429        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
430        let weight = &[0, 1, 2];
431        let mut seq = 0;
432        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
433        let node1 = DataNode::new(DataSource::Part(
434            buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
435        ));
436
437        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
438        write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
439        let node2 = DataNode::new(DataSource::Part(
440            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
441        ));
442
443        let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
444        write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
445        let node3 = DataNode::new(DataSource::Part(
446            buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
447        ));
448
449        check_merger_read(
450            vec![node1, node3, node2],
451            &[
452                (0, vec![(1, 0)]),
453                (0, vec![(2, 5)]),
454                (0, vec![(2, 1)]),
455                (0, vec![(3, 6)]),
456                (0, vec![(3, 2)]),
457                (1, vec![(2, 3), (3, 4)]),
458            ],
459        );
460    }
461
462    #[test]
463    fn test_merger_parts_and_buffer() {
464        let metadata = metadata_for_test();
465        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
466        let weight = &[0, 1, 2];
467        let mut seq = 0;
468        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
469        let node1 = DataNode::new(DataSource::Buffer(
470            buffer1.read().unwrap().build(Some(weight)).unwrap(),
471        ));
472
473        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
474        write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
475        let node2 = DataNode::new(DataSource::Part(
476            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
477        ));
478
479        let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
480        write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
481        let node3 = DataNode::new(DataSource::Part(
482            buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
483        ));
484
485        check_merger_read(
486            vec![node1, node3, node2],
487            &[
488                (0, vec![(1, 0)]),
489                (0, vec![(2, 5)]),
490                (0, vec![(2, 1)]),
491                (0, vec![(3, 6)]),
492                (0, vec![(3, 2)]),
493                (1, vec![(2, 3), (3, 4)]),
494            ],
495        );
496    }
497
498    #[test]
499    fn test_merger_overlapping_2() {
500        let metadata = metadata_for_test();
501        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
502        let weight = &[0, 1, 2];
503        let mut seq = 0;
504        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq);
505        let node1 = DataNode::new(DataSource::Part(
506            buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
507        ));
508
509        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
510        write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq);
511        let node2 = DataNode::new(DataSource::Part(
512            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
513        ));
514
515        let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
516        write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq);
517        let node3 = DataNode::new(DataSource::Part(
518            buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
519        ));
520
521        check_merger_read(
522            vec![node1, node2, node3],
523            &[
524                (0, vec![(1, 0)]),
525                (0, vec![(2, 4)]),
526                (0, vec![(2, 3)]),
527                (0, vec![(2, 2)]),
528            ],
529        );
530    }
531
532    #[test]
533    fn test_merger_overlapping_3() {
534        let metadata = metadata_for_test();
535        let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
536        let weight = &[0, 1, 2];
537        let mut seq = 0;
538        write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq);
539        let node1 = DataNode::new(DataSource::Part(
540            buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
541        ));
542
543        let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
544        write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
545        let node2 = DataNode::new(DataSource::Part(
546            buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
547        ));
548
549        check_merger_read(
550            vec![node1, node2],
551            &[(0, vec![(0, 0)]), (0, vec![(1, 2)]), (0, vec![(1, 1)])],
552        );
553    }
554}