mito2/sst/index/fulltext_index/
applier.rsuse std::collections::BTreeSet;
use std::sync::Arc;
use common_telemetry::warn;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
pub mod builder;
pub struct FulltextIndexApplier {
region_dir: String,
region_id: RegionId,
queries: Vec<(ColumnId, String)>,
puffin_manager_factory: PuffinManagerFactory,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
impl FulltextIndexApplier {
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
queries: Vec<(ColumnId, String)>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
region_id,
store,
queries,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
}
}
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub async fn apply(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BTreeSet<RowId>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let mut inited = false;
let mut row_ids = BTreeSet::new();
for (column_id, query) in &self.queries {
let dir = self
.index_dir_path(file_id, *column_id, file_size_hint)
.await?;
let path = match &dir {
Some(dir) => dir.path(),
None => {
return Ok(BTreeSet::new());
}
};
let searcher =
TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
let result = searcher
.search(query)
.await
.context(ApplyFulltextIndexSnafu)?;
if !inited {
row_ids = result;
inited = true;
continue;
}
row_ids.retain(|id| result.contains(id));
if row_ids.is_empty() {
break;
}
}
Ok(row_ids)
}
async fn index_dir_path(
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
if let Some(file_cache) = &self.file_cache {
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_some() {
match self
.get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key)
.await
{
Ok(dir) => return Ok(dir),
Err(err) => {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
}
}
}
self.get_index_from_remote_file(file_id, file_size_hint, &blob_key)
.await
}
async fn get_index_from_file_cache(
&self,
file_cache: &FileCacheRef,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
async fn get_index_from_remote_file(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
self.store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
}