index/inverted_index/create/
sort_create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::num::NonZeroUsize;
17
18use async_trait::async_trait;
19use snafu::ensure;
20
21use crate::bitmap::BitmapType;
22use crate::inverted_index::create::sort::{SortOutput, Sorter};
23use crate::inverted_index::create::InvertedIndexCreator;
24use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
25use crate::inverted_index::format::writer::InvertedIndexWriter;
26use crate::BytesRef;
27
28type IndexName = String;
29type SegmentRowCount = NonZeroUsize;
30
31/// Factory type to produce `Sorter` instances associated with an index name and segment row count
32pub type SorterFactory = Box<dyn Fn(IndexName, SegmentRowCount) -> Box<dyn Sorter> + Send>;
33
34/// `SortIndexCreator` orchestrates indexing by sorting input data for each named index
35/// and writing to an inverted index writer
36pub struct SortIndexCreator {
37    /// Factory for producing `Sorter` instances
38    sorter_factory: SorterFactory,
39
40    /// Map of index names to sorters
41    sorters: HashMap<IndexName, Box<dyn Sorter>>,
42
43    /// Number of rows in each segment, used to produce sorters
44    segment_row_count: NonZeroUsize,
45}
46
47#[async_trait]
48impl InvertedIndexCreator for SortIndexCreator {
49    /// Inserts `n` values or nulls into the sorter for the specified index.
50    ///
51    /// If the index does not exist, a new index is created even if `n` is 0.
52    /// Caller may leverage this behavior to create indexes with no data.
53    async fn push_with_name_n(
54        &mut self,
55        index_name: &str,
56        value: Option<BytesRef<'_>>,
57        n: usize,
58    ) -> Result<()> {
59        match self.sorters.get_mut(index_name) {
60            Some(sorter) => sorter.push_n(value, n).await,
61            None => {
62                let index_name = index_name.to_string();
63                let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count);
64                sorter.push_n(value, n).await?;
65                self.sorters.insert(index_name, sorter);
66                Ok(())
67            }
68        }
69    }
70
71    /// Finalizes the sorting for all indexes and writes them using the inverted index writer
72    async fn finish(
73        &mut self,
74        writer: &mut dyn InvertedIndexWriter,
75        bitmap_type: BitmapType,
76    ) -> Result<()> {
77        let mut output_row_count = None;
78        for (index_name, mut sorter) in self.sorters.drain() {
79            let SortOutput {
80                segment_null_bitmap,
81                sorted_stream,
82                total_row_count,
83            } = sorter.output().await?;
84
85            let expected_row_count = *output_row_count.get_or_insert(total_row_count);
86            ensure!(
87                expected_row_count == total_row_count,
88                InconsistentRowCountSnafu {
89                    index_name,
90                    total_row_count,
91                    expected_row_count,
92                }
93            );
94
95            writer
96                .add_index(index_name, segment_null_bitmap, sorted_stream, bitmap_type)
97                .await?;
98        }
99
100        let total_row_count = output_row_count.unwrap_or_default() as _;
101        let segment_row_count = self.segment_row_count as _;
102        writer.finish(total_row_count, segment_row_count).await
103    }
104}
105
106impl SortIndexCreator {
107    /// Creates a new `SortIndexCreator` with the given sorter factory and index writer
108    pub fn new(sorter_factory: SorterFactory, segment_row_count: NonZeroUsize) -> Self {
109        Self {
110            sorter_factory,
111            sorters: HashMap::new(),
112            segment_row_count,
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::collections::BTreeMap;
120
121    use common_base::BitVec;
122    use futures::{stream, StreamExt};
123
124    use super::*;
125    use crate::bitmap::Bitmap;
126    use crate::inverted_index::error::Error;
127    use crate::inverted_index::format::writer::{MockInvertedIndexWriter, ValueStream};
128    use crate::Bytes;
129
130    #[tokio::test]
131    async fn test_sort_index_creator_basic() {
132        let mut creator =
133            SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
134
135        let index_values = vec![
136            ("a", vec![b"3", b"2", b"1"]),
137            ("b", vec![b"6", b"5", b"4"]),
138            ("c", vec![b"1", b"2", b"3"]),
139        ];
140
141        for (index_name, values) in index_values {
142            for value in values {
143                creator
144                    .push_with_name(index_name, Some(value))
145                    .await
146                    .unwrap();
147            }
148        }
149
150        let mut mock_writer = MockInvertedIndexWriter::new();
151        mock_writer.expect_add_index().times(3).returning(
152            |name, null_bitmap, stream, bitmap_type| {
153                assert!(null_bitmap.is_empty());
154                assert_eq!(bitmap_type, BitmapType::Roaring);
155                match name.as_str() {
156                    "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
157                    "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
158                    "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
159                    _ => panic!("unexpected index name: {}", name),
160                }
161                Ok(())
162            },
163        );
164        mock_writer
165            .expect_finish()
166            .times(1)
167            .returning(|total_row_count, segment_row_count| {
168                assert_eq!(total_row_count, 3);
169                assert_eq!(segment_row_count.get(), 1);
170                Ok(())
171            });
172
173        creator
174            .finish(&mut mock_writer, BitmapType::Roaring)
175            .await
176            .unwrap();
177    }
178
179    #[tokio::test]
180    async fn test_sort_index_creator_inconsistent_row_count() {
181        let mut creator =
182            SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
183
184        let index_values = vec![
185            ("a", vec![b"3", b"2", b"1"]),
186            ("b", vec![b"6", b"5", b"4"]),
187            ("c", vec![b"1", b"2"]),
188        ];
189
190        for (index_name, values) in index_values {
191            for value in values {
192                creator
193                    .push_with_name(index_name, Some(value))
194                    .await
195                    .unwrap();
196            }
197        }
198
199        let mut mock_writer = MockInvertedIndexWriter::new();
200        mock_writer
201            .expect_add_index()
202            .returning(|name, null_bitmap, stream, bitmap_type| {
203                assert!(null_bitmap.is_empty());
204                assert_eq!(bitmap_type, BitmapType::Roaring);
205                match name.as_str() {
206                    "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
207                    "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
208                    "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]),
209                    _ => panic!("unexpected index name: {}", name),
210                }
211                Ok(())
212            });
213        mock_writer.expect_finish().never();
214
215        let res = creator.finish(&mut mock_writer, BitmapType::Roaring).await;
216        assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
217    }
218
219    #[tokio::test]
220    async fn test_sort_index_creator_create_indexes_without_data() {
221        let mut creator =
222            SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
223
224        creator.push_with_name_n("a", None, 0).await.unwrap();
225        creator.push_with_name_n("b", None, 0).await.unwrap();
226        creator.push_with_name_n("c", None, 0).await.unwrap();
227
228        let mut mock_writer = MockInvertedIndexWriter::new();
229        mock_writer
230            .expect_add_index()
231            .returning(|name, null_bitmap, stream, bitmap_type| {
232                assert!(null_bitmap.is_empty());
233                assert_eq!(bitmap_type, BitmapType::Roaring);
234                assert!(matches!(name.as_str(), "a" | "b" | "c"));
235                assert!(stream_to_values(stream).is_empty());
236                Ok(())
237            });
238        mock_writer
239            .expect_finish()
240            .times(1)
241            .returning(|total_row_count, segment_row_count| {
242                assert_eq!(total_row_count, 0);
243                assert_eq!(segment_row_count.get(), 1);
244                Ok(())
245            });
246
247        creator
248            .finish(&mut mock_writer, BitmapType::Roaring)
249            .await
250            .unwrap();
251    }
252
253    fn set_bit(bit_vec: &mut BitVec, index: usize) {
254        if index >= bit_vec.len() {
255            bit_vec.resize(index + 1, false);
256        }
257        bit_vec.set(index, true);
258    }
259
260    struct NaiveSorter {
261        total_row_count: usize,
262        segment_row_count: NonZeroUsize,
263        values: BTreeMap<Option<Bytes>, BitVec>,
264    }
265
266    impl NaiveSorter {
267        fn factory() -> SorterFactory {
268            Box::new(|_index_name, segment_row_count| {
269                Box::new(NaiveSorter {
270                    total_row_count: 0,
271                    segment_row_count,
272                    values: BTreeMap::new(),
273                })
274            })
275        }
276    }
277
278    #[async_trait]
279    impl Sorter for NaiveSorter {
280        async fn push(&mut self, value: Option<BytesRef<'_>>) -> Result<()> {
281            let segment_index = self.total_row_count / self.segment_row_count;
282            self.total_row_count += 1;
283
284            let bitmap = self.values.entry(value.map(Into::into)).or_default();
285            set_bit(bitmap, segment_index);
286
287            Ok(())
288        }
289
290        async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
291            for _ in 0..n {
292                self.push(value).await?;
293            }
294            Ok(())
295        }
296
297        async fn output(&mut self) -> Result<SortOutput> {
298            let segment_null_bitmap = self.values.remove(&None).unwrap_or_default();
299            let segment_null_bitmap = Bitmap::BitVec(segment_null_bitmap);
300
301            Ok(SortOutput {
302                segment_null_bitmap,
303                sorted_stream: Box::new(stream::iter(
304                    std::mem::take(&mut self.values)
305                        .into_iter()
306                        .map(|(v, b)| Ok((v.unwrap(), Bitmap::BitVec(b)))),
307                )),
308                total_row_count: self.total_row_count,
309            })
310        }
311    }
312
313    fn stream_to_values(stream: ValueStream) -> Vec<Bytes> {
314        futures::executor::block_on(async {
315            stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
316        })
317    }
318}