index/inverted_index/format/writer/
blob.rsuse std::num::NonZeroUsize;
use async_trait::async_trait;
use futures::{AsyncWrite, AsyncWriteExt};
use greptime_proto::v1::index::InvertedIndexMetas;
use prost::Message;
use snafu::ResultExt;
use crate::bitmap::{Bitmap, BitmapType};
use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::inverted_index::format::writer::single::SingleIndexWriter;
use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
pub struct InvertedIndexBlobWriter<W> {
blob_writer: W,
written_size: u64,
metas: InvertedIndexMetas,
}
#[async_trait]
impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
async fn add_index(
&mut self,
name: String,
null_bitmap: Bitmap,
values: ValueStream,
bitmap_type: BitmapType,
) -> Result<()> {
let single_writer = SingleIndexWriter::new(
name.clone(),
self.written_size,
null_bitmap,
values,
&mut self.blob_writer,
bitmap_type,
);
let metadata = single_writer.write().await?;
self.written_size += metadata.inverted_index_size;
self.metas.metas.insert(name, metadata);
Ok(())
}
async fn finish(
&mut self,
total_row_count: u64,
segment_row_count: NonZeroUsize,
) -> Result<()> {
self.metas.segment_row_count = segment_row_count.get() as _;
self.metas.total_row_count = total_row_count;
let metas_bytes = self.metas.encode_to_vec();
self.blob_writer
.write_all(&metas_bytes)
.await
.context(WriteSnafu)?;
let footer_size = metas_bytes.len() as u32;
self.blob_writer
.write_all(&footer_size.to_le_bytes())
.await
.context(WriteSnafu)?;
self.blob_writer.flush().await.context(FlushSnafu)?;
self.blob_writer.close().await.context(CloseSnafu)?;
Ok(())
}
}
impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
pub fn new(blob_writer: W) -> InvertedIndexBlobWriter<W> {
InvertedIndexBlobWriter {
blob_writer,
written_size: 0,
metas: InvertedIndexMetas::default(),
}
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use greptime_proto::v1::index::BitmapType;
use super::*;
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use crate::Bytes;
fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
}
#[tokio::test]
async fn test_inverted_index_blob_writer_write_empty() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 0);
}
#[tokio::test]
async fn test_inverted_index_blob_writer_write_basic() {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.add_index(
"tag0".to_string(),
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
Box::new(stream::iter(vec![
Ok((
Bytes::from("a"),
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
)),
Ok((
Bytes::from("b"),
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
)),
Ok((
Bytes::from("c"),
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
)),
])),
BitmapType::Roaring,
)
.await
.unwrap();
writer
.add_index(
"tag1".to_string(),
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
Box::new(stream::iter(vec![
Ok((
Bytes::from("x"),
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
)),
Ok((
Bytes::from("y"),
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
)),
Ok((
Bytes::from("z"),
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
)),
])),
BitmapType::Roaring,
)
.await
.unwrap();
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
let tag0 = metadata.metas.get("tag0").unwrap();
let stats0 = tag0.stats.as_ref().unwrap();
assert_eq!(stats0.distinct_count, 3);
assert_eq!(stats0.null_count, 1);
assert_eq!(stats0.min_value, Bytes::from("a"));
assert_eq!(stats0.max_value, Bytes::from("c"));
let fst0 = reader
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
);
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
);
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
);
let tag1 = metadata.metas.get("tag1").unwrap();
let stats1 = tag1.stats.as_ref().unwrap();
assert_eq!(stats1.distinct_count, 3);
assert_eq!(stats1.null_count, 1);
assert_eq!(stats1.min_value, Bytes::from("x"));
assert_eq!(stats1.max_value, Bytes::from("z"));
let fst1 = reader
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
);
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
);
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(
bitmap,
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
);
}
}