index/inverted_index/format/reader/
blob.rsuse std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};
use super::footer::DEFAULT_PREFETCH_SIZE;
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::reader::footer::InvertedIndexFooterReader;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::MIN_BLOB_SIZE;
pub struct InvertedIndexBlobReader<R> {
source: R,
}
impl<R> InvertedIndexBlobReader<R> {
pub fn new(source: R) -> Self {
Self { source }
}
fn validate_blob_size(blob_size: u64) -> Result<()> {
ensure!(
blob_size >= MIN_BLOB_SIZE,
UnexpectedBlobSizeSnafu {
min_blob_size: MIN_BLOB_SIZE,
actual_blob_size: blob_size,
}
);
Ok(())
}
}
#[async_trait]
impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
let buf = self
.source
.read(offset..offset + size as u64)
.await
.context(CommonIoSnafu)?;
Ok(buf.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.source.read_vec(ranges).await.context(CommonIoSnafu)
}
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new)
}
}
#[cfg(test)]
mod tests {
use fst::MapBuilder;
use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;
use super::*;
use crate::bitmap::Bitmap;
fn mock_fst() -> Vec<u8> {
let mut fst_buf = Vec::new();
let mut build = MapBuilder::new(&mut fst_buf).unwrap();
build.insert("key1".as_bytes(), 1).unwrap();
build.insert("key2".as_bytes(), 2).unwrap();
build.finish().unwrap();
fst_buf
}
fn mock_bitmap() -> Bitmap {
Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
}
fn mock_bitmap_bytes() -> Vec<u8> {
let mut buf = Vec::new();
mock_bitmap()
.serialize_into(BitmapType::Roaring, &mut buf)
.unwrap();
buf
}
fn create_inverted_index_blob() -> Vec<u8> {
let bitmap_size = mock_bitmap_bytes().len();
let fst_size = mock_fst().len();
let mut inverted_index = Vec::new();
inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_fst()); let meta = InvertedIndexMeta {
name: "tag0".to_string(),
base_offset: 0,
inverted_index_size: inverted_index.len() as _,
relative_null_bitmap_offset: bitmap_size as _,
null_bitmap_size: bitmap_size as _,
relative_fst_offset: (bitmap_size * 2) as _,
fst_size: fst_size as _,
bitmap_type: BitmapType::Roaring as _,
..Default::default()
};
let meta1 = InvertedIndexMeta {
name: "tag1".to_string(),
base_offset: meta.inverted_index_size,
inverted_index_size: inverted_index.len() as _,
relative_null_bitmap_offset: bitmap_size as _,
null_bitmap_size: bitmap_size as _,
relative_fst_offset: (bitmap_size * 2) as _,
fst_size: fst_size as _,
bitmap_type: BitmapType::Roaring as _,
..Default::default()
};
let mut metas = InvertedIndexMetas {
total_row_count: 10,
segment_row_count: 1,
..Default::default()
};
metas.metas.insert(meta.name.clone(), meta);
metas.metas.insert(meta1.name.clone(), meta1);
let mut meta_buf = Vec::new();
metas.encode(&mut meta_buf).unwrap();
let mut blob = vec![];
blob.extend_from_slice(&inverted_index);
blob.extend_from_slice(&inverted_index);
blob.extend_from_slice(&meta_buf);
blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
blob
}
#[tokio::test]
async fn test_inverted_index_blob_reader_metadata() {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 2);
let meta0 = metas.metas.get("tag0").unwrap();
assert_eq!(meta0.name, "tag0");
assert_eq!(meta0.base_offset, 0);
assert_eq!(meta0.inverted_index_size, 102);
assert_eq!(meta0.relative_null_bitmap_offset, 26);
assert_eq!(meta0.null_bitmap_size, 26);
assert_eq!(meta0.relative_fst_offset, 52);
assert_eq!(meta0.fst_size, 50);
let meta1 = metas.metas.get("tag1").unwrap();
assert_eq!(meta1.name, "tag1");
assert_eq!(meta1.base_offset, 102);
assert_eq!(meta1.inverted_index_size, 102);
assert_eq!(meta1.relative_null_bitmap_offset, 26);
assert_eq!(meta1.null_bitmap_size, 26);
assert_eq!(meta1.relative_fst_offset, 52);
assert_eq!(meta1.fst_size, 50);
}
#[tokio::test]
async fn test_inverted_index_blob_reader_fst() {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let fst_map = blob_reader
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
)
.await
.unwrap();
assert_eq!(fst_map.len(), 2);
assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
let meta = metas.metas.get("tag1").unwrap();
let fst_map = blob_reader
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
)
.await
.unwrap();
assert_eq!(fst_map.len(), 2);
assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
}
#[tokio::test]
async fn test_inverted_index_blob_reader_bitmap() {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag1").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
}
}