index/inverted_index/create/sort/
merge_stream.rs1use std::cmp::Ordering;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19use futures::{ready, Stream, StreamExt};
20use pin_project::pin_project;
21
22use crate::bitmap::Bitmap;
23use crate::inverted_index::create::sort::SortedStream;
24use crate::inverted_index::error::Result;
25use crate::Bytes;
26
27#[pin_project]
29pub struct MergeSortedStream {
30 stream1: Option<SortedStream>,
31 peek1: Option<(Bytes, Bitmap)>,
32
33 stream2: Option<SortedStream>,
34 peek2: Option<(Bytes, Bitmap)>,
35}
36
37impl MergeSortedStream {
38 pub fn merge(stream1: SortedStream, stream2: SortedStream) -> SortedStream {
41 Box::new(MergeSortedStream {
42 stream1: Some(stream1),
43 peek1: None,
44
45 stream2: Some(stream2),
46 peek2: None,
47 })
48 }
49}
50
51impl Stream for MergeSortedStream {
52 type Item = Result<(Bytes, Bitmap)>;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 let this = self.project();
58
59 if let (None, Some(stream1)) = (&this.peek1, this.stream1.as_mut()) {
60 match ready!(stream1.poll_next_unpin(cx)) {
61 Some(item) => *this.peek1 = Some(item?),
62 None => *this.stream1 = None, }
64 }
65
66 if let (None, Some(stream2)) = (&this.peek2, this.stream2.as_mut()) {
67 match ready!(stream2.poll_next_unpin(cx)) {
68 Some(item) => *this.peek2 = Some(item?),
69 None => *this.stream2 = None, }
71 }
72
73 Poll::Ready(match (this.peek1.take(), this.peek2.take()) {
74 (Some((v1, b1)), Some((v2, b2))) => match v1.cmp(&v2) {
75 Ordering::Less => {
76 *this.peek2 = Some((v2, b2)); Some(Ok((v1, b1)))
78 }
79 Ordering::Greater => {
80 *this.peek1 = Some((v1, b1)); Some(Ok((v2, b2)))
82 }
83 Ordering::Equal => Some(Ok((v1, merge_bitmaps(b1, b2)))),
84 },
85 (None, Some(item)) | (Some(item), None) => Some(Ok(item)),
86 (None, None) => None,
87 })
88 }
89}
90
91fn merge_bitmaps(mut bitmap1: Bitmap, bitmap2: Bitmap) -> Bitmap {
93 bitmap1.union(bitmap2);
94 bitmap1
95}
96
97#[cfg(test)]
98mod tests {
99 use futures::stream;
100 use greptime_proto::v1::index::BitmapType;
101
102 use super::*;
103 use crate::inverted_index::error::Error;
104
105 fn bitmap(bytes: &[u8]) -> Bitmap {
106 Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
107 }
108
109 fn sorted_stream_from_vec(vec: Vec<(Bytes, Bitmap)>) -> SortedStream {
110 Box::new(stream::iter(vec.into_iter().map(Ok::<_, Error>)))
111 }
112
113 #[tokio::test]
114 async fn test_merge_sorted_stream_non_overlapping() {
115 let stream1 = sorted_stream_from_vec(vec![
116 (Bytes::from("apple"), bitmap(&[0b10101010])),
117 (Bytes::from("orange"), bitmap(&[0b01010101])),
118 ]);
119 let stream2 = sorted_stream_from_vec(vec![
120 (Bytes::from("banana"), bitmap(&[0b10101010])),
121 (Bytes::from("peach"), bitmap(&[0b01010101])),
122 ]);
123
124 let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
125
126 let item = merged_stream.next().await.unwrap().unwrap();
127 assert_eq!(item.0, Bytes::from("apple"));
128 assert_eq!(item.1, bitmap(&[0b10101010]));
129 let item = merged_stream.next().await.unwrap().unwrap();
130 assert_eq!(item.0, Bytes::from("banana"));
131 assert_eq!(item.1, bitmap(&[0b10101010]));
132 let item = merged_stream.next().await.unwrap().unwrap();
133 assert_eq!(item.0, Bytes::from("orange"));
134 assert_eq!(item.1, bitmap(&[0b01010101]));
135 let item = merged_stream.next().await.unwrap().unwrap();
136 assert_eq!(item.0, Bytes::from("peach"));
137 assert_eq!(item.1, bitmap(&[0b01010101]));
138 assert!(merged_stream.next().await.is_none());
139 }
140
141 #[tokio::test]
142 async fn test_merge_sorted_stream_overlapping() {
143 let stream1 = sorted_stream_from_vec(vec![
144 (Bytes::from("apple"), bitmap(&[0b10101010])),
145 (Bytes::from("orange"), bitmap(&[0b10101010])),
146 ]);
147 let stream2 = sorted_stream_from_vec(vec![
148 (Bytes::from("apple"), bitmap(&[0b01010101])),
149 (Bytes::from("peach"), bitmap(&[0b01010101])),
150 ]);
151
152 let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
153
154 let item = merged_stream.next().await.unwrap().unwrap();
155 assert_eq!(item.0, Bytes::from("apple"));
156 assert_eq!(item.1, bitmap(&[0b11111111]));
157 let item = merged_stream.next().await.unwrap().unwrap();
158 assert_eq!(item.0, Bytes::from("orange"));
159 assert_eq!(item.1, bitmap(&[0b10101010]));
160 let item = merged_stream.next().await.unwrap().unwrap();
161 assert_eq!(item.0, Bytes::from("peach"));
162 assert_eq!(item.1, bitmap(&[0b01010101]));
163 assert!(merged_stream.next().await.is_none());
164 }
165
166 #[tokio::test]
167 async fn test_merge_sorted_stream_empty_streams() {
168 let stream1 = sorted_stream_from_vec(vec![]);
169 let stream2 = sorted_stream_from_vec(vec![]);
170
171 let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
172 assert!(merged_stream.next().await.is_none());
173 }
174}