index/inverted_index/create/sort/
merge_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19use futures::{ready, Stream, StreamExt};
20use pin_project::pin_project;
21
22use crate::bitmap::Bitmap;
23use crate::inverted_index::create::sort::SortedStream;
24use crate::inverted_index::error::Result;
25use crate::Bytes;
26
27/// A [`Stream`] implementation that merges two sorted streams into a single sorted stream
28#[pin_project]
29pub struct MergeSortedStream {
30    stream1: Option<SortedStream>,
31    peek1: Option<(Bytes, Bitmap)>,
32
33    stream2: Option<SortedStream>,
34    peek2: Option<(Bytes, Bitmap)>,
35}
36
37impl MergeSortedStream {
38    /// Creates a new `MergeSortedStream` that will return elements from `stream1` and `stream2`
39    /// in sorted order, merging duplicate items by unioning their bitmaps
40    pub fn merge(stream1: SortedStream, stream2: SortedStream) -> SortedStream {
41        Box::new(MergeSortedStream {
42            stream1: Some(stream1),
43            peek1: None,
44
45            stream2: Some(stream2),
46            peek2: None,
47        })
48    }
49}
50
51impl Stream for MergeSortedStream {
52    type Item = Result<(Bytes, Bitmap)>;
53
54    /// Polls both streams and returns the next item from the stream that has the smaller next item.
55    /// If both streams have the same next item, the bitmaps are unioned together.
56    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        let this = self.project();
58
59        if let (None, Some(stream1)) = (&this.peek1, this.stream1.as_mut()) {
60            match ready!(stream1.poll_next_unpin(cx)) {
61                Some(item) => *this.peek1 = Some(item?),
62                None => *this.stream1 = None, // `stream1` is exhausted, don't poll it next time
63            }
64        }
65
66        if let (None, Some(stream2)) = (&this.peek2, this.stream2.as_mut()) {
67            match ready!(stream2.poll_next_unpin(cx)) {
68                Some(item) => *this.peek2 = Some(item?),
69                None => *this.stream2 = None, // `stream2` is exhausted, don't poll it next time
70            }
71        }
72
73        Poll::Ready(match (this.peek1.take(), this.peek2.take()) {
74            (Some((v1, b1)), Some((v2, b2))) => match v1.cmp(&v2) {
75                Ordering::Less => {
76                    *this.peek2 = Some((v2, b2)); // Preserve the rest of `stream2`
77                    Some(Ok((v1, b1)))
78                }
79                Ordering::Greater => {
80                    *this.peek1 = Some((v1, b1)); // Preserve the rest of `stream1`
81                    Some(Ok((v2, b2)))
82                }
83                Ordering::Equal => Some(Ok((v1, merge_bitmaps(b1, b2)))),
84            },
85            (None, Some(item)) | (Some(item), None) => Some(Ok(item)),
86            (None, None) => None,
87        })
88    }
89}
90
91/// Merges two bitmaps by bit-wise OR'ing them together, preserving all bits from both
92fn merge_bitmaps(mut bitmap1: Bitmap, bitmap2: Bitmap) -> Bitmap {
93    bitmap1.union(bitmap2);
94    bitmap1
95}
96
97#[cfg(test)]
98mod tests {
99    use futures::stream;
100    use greptime_proto::v1::index::BitmapType;
101
102    use super::*;
103    use crate::inverted_index::error::Error;
104
105    fn bitmap(bytes: &[u8]) -> Bitmap {
106        Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
107    }
108
109    fn sorted_stream_from_vec(vec: Vec<(Bytes, Bitmap)>) -> SortedStream {
110        Box::new(stream::iter(vec.into_iter().map(Ok::<_, Error>)))
111    }
112
113    #[tokio::test]
114    async fn test_merge_sorted_stream_non_overlapping() {
115        let stream1 = sorted_stream_from_vec(vec![
116            (Bytes::from("apple"), bitmap(&[0b10101010])),
117            (Bytes::from("orange"), bitmap(&[0b01010101])),
118        ]);
119        let stream2 = sorted_stream_from_vec(vec![
120            (Bytes::from("banana"), bitmap(&[0b10101010])),
121            (Bytes::from("peach"), bitmap(&[0b01010101])),
122        ]);
123
124        let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
125
126        let item = merged_stream.next().await.unwrap().unwrap();
127        assert_eq!(item.0, Bytes::from("apple"));
128        assert_eq!(item.1, bitmap(&[0b10101010]));
129        let item = merged_stream.next().await.unwrap().unwrap();
130        assert_eq!(item.0, Bytes::from("banana"));
131        assert_eq!(item.1, bitmap(&[0b10101010]));
132        let item = merged_stream.next().await.unwrap().unwrap();
133        assert_eq!(item.0, Bytes::from("orange"));
134        assert_eq!(item.1, bitmap(&[0b01010101]));
135        let item = merged_stream.next().await.unwrap().unwrap();
136        assert_eq!(item.0, Bytes::from("peach"));
137        assert_eq!(item.1, bitmap(&[0b01010101]));
138        assert!(merged_stream.next().await.is_none());
139    }
140
141    #[tokio::test]
142    async fn test_merge_sorted_stream_overlapping() {
143        let stream1 = sorted_stream_from_vec(vec![
144            (Bytes::from("apple"), bitmap(&[0b10101010])),
145            (Bytes::from("orange"), bitmap(&[0b10101010])),
146        ]);
147        let stream2 = sorted_stream_from_vec(vec![
148            (Bytes::from("apple"), bitmap(&[0b01010101])),
149            (Bytes::from("peach"), bitmap(&[0b01010101])),
150        ]);
151
152        let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
153
154        let item = merged_stream.next().await.unwrap().unwrap();
155        assert_eq!(item.0, Bytes::from("apple"));
156        assert_eq!(item.1, bitmap(&[0b11111111]));
157        let item = merged_stream.next().await.unwrap().unwrap();
158        assert_eq!(item.0, Bytes::from("orange"));
159        assert_eq!(item.1, bitmap(&[0b10101010]));
160        let item = merged_stream.next().await.unwrap().unwrap();
161        assert_eq!(item.0, Bytes::from("peach"));
162        assert_eq!(item.1, bitmap(&[0b01010101]));
163        assert!(merged_stream.next().await.is_none());
164    }
165
166    #[tokio::test]
167    async fn test_merge_sorted_stream_empty_streams() {
168        let stream1 = sorted_stream_from_vec(vec![]);
169        let stream2 = sorted_stream_from_vec(vec![]);
170
171        let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
172        assert!(merged_stream.next().await.is_none());
173    }
174}