index/inverted_index/create/
sort_create.rs1use std::collections::HashMap;
16use std::num::NonZeroUsize;
17
18use async_trait::async_trait;
19use snafu::ensure;
20
21use crate::bitmap::BitmapType;
22use crate::inverted_index::create::sort::{SortOutput, Sorter};
23use crate::inverted_index::create::InvertedIndexCreator;
24use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
25use crate::inverted_index::format::writer::InvertedIndexWriter;
26use crate::BytesRef;
27
28type IndexName = String;
29type SegmentRowCount = NonZeroUsize;
30
31pub type SorterFactory = Box<dyn Fn(IndexName, SegmentRowCount) -> Box<dyn Sorter> + Send>;
33
34pub struct SortIndexCreator {
37 sorter_factory: SorterFactory,
39
40 sorters: HashMap<IndexName, Box<dyn Sorter>>,
42
43 segment_row_count: NonZeroUsize,
45}
46
47#[async_trait]
48impl InvertedIndexCreator for SortIndexCreator {
49 async fn push_with_name_n(
54 &mut self,
55 index_name: &str,
56 value: Option<BytesRef<'_>>,
57 n: usize,
58 ) -> Result<()> {
59 match self.sorters.get_mut(index_name) {
60 Some(sorter) => sorter.push_n(value, n).await,
61 None => {
62 let index_name = index_name.to_string();
63 let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count);
64 sorter.push_n(value, n).await?;
65 self.sorters.insert(index_name, sorter);
66 Ok(())
67 }
68 }
69 }
70
71 async fn finish(
73 &mut self,
74 writer: &mut dyn InvertedIndexWriter,
75 bitmap_type: BitmapType,
76 ) -> Result<()> {
77 let mut output_row_count = None;
78 for (index_name, mut sorter) in self.sorters.drain() {
79 let SortOutput {
80 segment_null_bitmap,
81 sorted_stream,
82 total_row_count,
83 } = sorter.output().await?;
84
85 let expected_row_count = *output_row_count.get_or_insert(total_row_count);
86 ensure!(
87 expected_row_count == total_row_count,
88 InconsistentRowCountSnafu {
89 index_name,
90 total_row_count,
91 expected_row_count,
92 }
93 );
94
95 writer
96 .add_index(index_name, segment_null_bitmap, sorted_stream, bitmap_type)
97 .await?;
98 }
99
100 let total_row_count = output_row_count.unwrap_or_default() as _;
101 let segment_row_count = self.segment_row_count as _;
102 writer.finish(total_row_count, segment_row_count).await
103 }
104}
105
106impl SortIndexCreator {
107 pub fn new(sorter_factory: SorterFactory, segment_row_count: NonZeroUsize) -> Self {
109 Self {
110 sorter_factory,
111 sorters: HashMap::new(),
112 segment_row_count,
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use std::collections::BTreeMap;
120
121 use common_base::BitVec;
122 use futures::{stream, StreamExt};
123
124 use super::*;
125 use crate::bitmap::Bitmap;
126 use crate::inverted_index::error::Error;
127 use crate::inverted_index::format::writer::{MockInvertedIndexWriter, ValueStream};
128 use crate::Bytes;
129
130 #[tokio::test]
131 async fn test_sort_index_creator_basic() {
132 let mut creator =
133 SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
134
135 let index_values = vec![
136 ("a", vec![b"3", b"2", b"1"]),
137 ("b", vec![b"6", b"5", b"4"]),
138 ("c", vec![b"1", b"2", b"3"]),
139 ];
140
141 for (index_name, values) in index_values {
142 for value in values {
143 creator
144 .push_with_name(index_name, Some(value))
145 .await
146 .unwrap();
147 }
148 }
149
150 let mut mock_writer = MockInvertedIndexWriter::new();
151 mock_writer.expect_add_index().times(3).returning(
152 |name, null_bitmap, stream, bitmap_type| {
153 assert!(null_bitmap.is_empty());
154 assert_eq!(bitmap_type, BitmapType::Roaring);
155 match name.as_str() {
156 "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
157 "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
158 "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
159 _ => panic!("unexpected index name: {}", name),
160 }
161 Ok(())
162 },
163 );
164 mock_writer
165 .expect_finish()
166 .times(1)
167 .returning(|total_row_count, segment_row_count| {
168 assert_eq!(total_row_count, 3);
169 assert_eq!(segment_row_count.get(), 1);
170 Ok(())
171 });
172
173 creator
174 .finish(&mut mock_writer, BitmapType::Roaring)
175 .await
176 .unwrap();
177 }
178
179 #[tokio::test]
180 async fn test_sort_index_creator_inconsistent_row_count() {
181 let mut creator =
182 SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
183
184 let index_values = vec![
185 ("a", vec![b"3", b"2", b"1"]),
186 ("b", vec![b"6", b"5", b"4"]),
187 ("c", vec![b"1", b"2"]),
188 ];
189
190 for (index_name, values) in index_values {
191 for value in values {
192 creator
193 .push_with_name(index_name, Some(value))
194 .await
195 .unwrap();
196 }
197 }
198
199 let mut mock_writer = MockInvertedIndexWriter::new();
200 mock_writer
201 .expect_add_index()
202 .returning(|name, null_bitmap, stream, bitmap_type| {
203 assert!(null_bitmap.is_empty());
204 assert_eq!(bitmap_type, BitmapType::Roaring);
205 match name.as_str() {
206 "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
207 "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
208 "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]),
209 _ => panic!("unexpected index name: {}", name),
210 }
211 Ok(())
212 });
213 mock_writer.expect_finish().never();
214
215 let res = creator.finish(&mut mock_writer, BitmapType::Roaring).await;
216 assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
217 }
218
219 #[tokio::test]
220 async fn test_sort_index_creator_create_indexes_without_data() {
221 let mut creator =
222 SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
223
224 creator.push_with_name_n("a", None, 0).await.unwrap();
225 creator.push_with_name_n("b", None, 0).await.unwrap();
226 creator.push_with_name_n("c", None, 0).await.unwrap();
227
228 let mut mock_writer = MockInvertedIndexWriter::new();
229 mock_writer
230 .expect_add_index()
231 .returning(|name, null_bitmap, stream, bitmap_type| {
232 assert!(null_bitmap.is_empty());
233 assert_eq!(bitmap_type, BitmapType::Roaring);
234 assert!(matches!(name.as_str(), "a" | "b" | "c"));
235 assert!(stream_to_values(stream).is_empty());
236 Ok(())
237 });
238 mock_writer
239 .expect_finish()
240 .times(1)
241 .returning(|total_row_count, segment_row_count| {
242 assert_eq!(total_row_count, 0);
243 assert_eq!(segment_row_count.get(), 1);
244 Ok(())
245 });
246
247 creator
248 .finish(&mut mock_writer, BitmapType::Roaring)
249 .await
250 .unwrap();
251 }
252
253 fn set_bit(bit_vec: &mut BitVec, index: usize) {
254 if index >= bit_vec.len() {
255 bit_vec.resize(index + 1, false);
256 }
257 bit_vec.set(index, true);
258 }
259
260 struct NaiveSorter {
261 total_row_count: usize,
262 segment_row_count: NonZeroUsize,
263 values: BTreeMap<Option<Bytes>, BitVec>,
264 }
265
266 impl NaiveSorter {
267 fn factory() -> SorterFactory {
268 Box::new(|_index_name, segment_row_count| {
269 Box::new(NaiveSorter {
270 total_row_count: 0,
271 segment_row_count,
272 values: BTreeMap::new(),
273 })
274 })
275 }
276 }
277
278 #[async_trait]
279 impl Sorter for NaiveSorter {
280 async fn push(&mut self, value: Option<BytesRef<'_>>) -> Result<()> {
281 let segment_index = self.total_row_count / self.segment_row_count;
282 self.total_row_count += 1;
283
284 let bitmap = self.values.entry(value.map(Into::into)).or_default();
285 set_bit(bitmap, segment_index);
286
287 Ok(())
288 }
289
290 async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
291 for _ in 0..n {
292 self.push(value).await?;
293 }
294 Ok(())
295 }
296
297 async fn output(&mut self) -> Result<SortOutput> {
298 let segment_null_bitmap = self.values.remove(&None).unwrap_or_default();
299 let segment_null_bitmap = Bitmap::BitVec(segment_null_bitmap);
300
301 Ok(SortOutput {
302 segment_null_bitmap,
303 sorted_stream: Box::new(stream::iter(
304 std::mem::take(&mut self.values)
305 .into_iter()
306 .map(|(v, b)| Ok((v.unwrap(), Bitmap::BitVec(b)))),
307 )),
308 total_row_count: self.total_row_count,
309 })
310 }
311 }
312
313 fn stream_to_values(stream: ValueStream) -> Vec<Bytes> {
314 futures::executor::block_on(async {
315 stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
316 })
317 }
318}