index/inverted_index/create/
sort_create.rsuse std::collections::HashMap;
use std::num::NonZeroUsize;
use async_trait::async_trait;
use snafu::ensure;
use crate::bitmap::BitmapType;
use crate::inverted_index::create::sort::{SortOutput, Sorter};
use crate::inverted_index::create::InvertedIndexCreator;
use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::BytesRef;
type IndexName = String;
type SegmentRowCount = NonZeroUsize;
pub type SorterFactory = Box<dyn Fn(IndexName, SegmentRowCount) -> Box<dyn Sorter> + Send>;
pub struct SortIndexCreator {
sorter_factory: SorterFactory,
sorters: HashMap<IndexName, Box<dyn Sorter>>,
segment_row_count: NonZeroUsize,
}
#[async_trait]
impl InvertedIndexCreator for SortIndexCreator {
async fn push_with_name_n(
&mut self,
index_name: &str,
value: Option<BytesRef<'_>>,
n: usize,
) -> Result<()> {
match self.sorters.get_mut(index_name) {
Some(sorter) => sorter.push_n(value, n).await,
None => {
let index_name = index_name.to_string();
let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count);
sorter.push_n(value, n).await?;
self.sorters.insert(index_name, sorter);
Ok(())
}
}
}
async fn finish(
&mut self,
writer: &mut dyn InvertedIndexWriter,
bitmap_type: BitmapType,
) -> Result<()> {
let mut output_row_count = None;
for (index_name, mut sorter) in self.sorters.drain() {
let SortOutput {
segment_null_bitmap,
sorted_stream,
total_row_count,
} = sorter.output().await?;
let expected_row_count = *output_row_count.get_or_insert(total_row_count);
ensure!(
expected_row_count == total_row_count,
InconsistentRowCountSnafu {
index_name,
total_row_count,
expected_row_count,
}
);
writer
.add_index(index_name, segment_null_bitmap, sorted_stream, bitmap_type)
.await?;
}
let total_row_count = output_row_count.unwrap_or_default() as _;
let segment_row_count = self.segment_row_count as _;
writer.finish(total_row_count, segment_row_count).await
}
}
impl SortIndexCreator {
pub fn new(sorter_factory: SorterFactory, segment_row_count: NonZeroUsize) -> Self {
Self {
sorter_factory,
sorters: HashMap::new(),
segment_row_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use common_base::BitVec;
use futures::{stream, StreamExt};
use super::*;
use crate::bitmap::Bitmap;
use crate::inverted_index::error::Error;
use crate::inverted_index::format::writer::{MockInvertedIndexWriter, ValueStream};
use crate::Bytes;
#[tokio::test]
async fn test_sort_index_creator_basic() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
let index_values = vec![
("a", vec![b"3", b"2", b"1"]),
("b", vec![b"6", b"5", b"4"]),
("c", vec![b"1", b"2", b"3"]),
];
for (index_name, values) in index_values {
for value in values {
creator
.push_with_name(index_name, Some(value))
.await
.unwrap();
}
}
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer.expect_add_index().times(3).returning(
|name, null_bitmap, stream, bitmap_type| {
assert!(null_bitmap.is_empty());
assert_eq!(bitmap_type, BitmapType::Roaring);
match name.as_str() {
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
"c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
_ => panic!("unexpected index name: {}", name),
}
Ok(())
},
);
mock_writer
.expect_finish()
.times(1)
.returning(|total_row_count, segment_row_count| {
assert_eq!(total_row_count, 3);
assert_eq!(segment_row_count.get(), 1);
Ok(())
});
creator
.finish(&mut mock_writer, BitmapType::Roaring)
.await
.unwrap();
}
#[tokio::test]
async fn test_sort_index_creator_inconsistent_row_count() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
let index_values = vec![
("a", vec![b"3", b"2", b"1"]),
("b", vec![b"6", b"5", b"4"]),
("c", vec![b"1", b"2"]),
];
for (index_name, values) in index_values {
for value in values {
creator
.push_with_name(index_name, Some(value))
.await
.unwrap();
}
}
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.returning(|name, null_bitmap, stream, bitmap_type| {
assert!(null_bitmap.is_empty());
assert_eq!(bitmap_type, BitmapType::Roaring);
match name.as_str() {
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
"c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]),
_ => panic!("unexpected index name: {}", name),
}
Ok(())
});
mock_writer.expect_finish().never();
let res = creator.finish(&mut mock_writer, BitmapType::Roaring).await;
assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
}
#[tokio::test]
async fn test_sort_index_creator_create_indexes_without_data() {
let mut creator =
SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap());
creator.push_with_name_n("a", None, 0).await.unwrap();
creator.push_with_name_n("b", None, 0).await.unwrap();
creator.push_with_name_n("c", None, 0).await.unwrap();
let mut mock_writer = MockInvertedIndexWriter::new();
mock_writer
.expect_add_index()
.returning(|name, null_bitmap, stream, bitmap_type| {
assert!(null_bitmap.is_empty());
assert_eq!(bitmap_type, BitmapType::Roaring);
assert!(matches!(name.as_str(), "a" | "b" | "c"));
assert!(stream_to_values(stream).is_empty());
Ok(())
});
mock_writer
.expect_finish()
.times(1)
.returning(|total_row_count, segment_row_count| {
assert_eq!(total_row_count, 0);
assert_eq!(segment_row_count.get(), 1);
Ok(())
});
creator
.finish(&mut mock_writer, BitmapType::Roaring)
.await
.unwrap();
}
fn set_bit(bit_vec: &mut BitVec, index: usize) {
if index >= bit_vec.len() {
bit_vec.resize(index + 1, false);
}
bit_vec.set(index, true);
}
struct NaiveSorter {
total_row_count: usize,
segment_row_count: NonZeroUsize,
values: BTreeMap<Option<Bytes>, BitVec>,
}
impl NaiveSorter {
fn factory() -> SorterFactory {
Box::new(|_index_name, segment_row_count| {
Box::new(NaiveSorter {
total_row_count: 0,
segment_row_count,
values: BTreeMap::new(),
})
})
}
}
#[async_trait]
impl Sorter for NaiveSorter {
async fn push(&mut self, value: Option<BytesRef<'_>>) -> Result<()> {
let segment_index = self.total_row_count / self.segment_row_count;
self.total_row_count += 1;
let bitmap = self.values.entry(value.map(Into::into)).or_default();
set_bit(bitmap, segment_index);
Ok(())
}
async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
for _ in 0..n {
self.push(value).await?;
}
Ok(())
}
async fn output(&mut self) -> Result<SortOutput> {
let segment_null_bitmap = self.values.remove(&None).unwrap_or_default();
let segment_null_bitmap = Bitmap::BitVec(segment_null_bitmap);
Ok(SortOutput {
segment_null_bitmap,
sorted_stream: Box::new(stream::iter(
std::mem::take(&mut self.values)
.into_iter()
.map(|(v, b)| Ok((v.unwrap(), Bitmap::BitVec(b)))),
)),
total_row_count: self.total_row_count,
})
}
}
fn stream_to_values(stream: ValueStream) -> Vec<Bytes> {
futures::executor::block_on(async {
stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
})
}
}