index/fulltext_index/create/
bloom_filter.rsuse std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{OptionExt, ResultExt};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::bloom_filter::creator::BloomFilterCreator;
use crate::external_provider::ExternalTempFileProvider;
use crate::fulltext_index::create::FulltextIndexCreator;
use crate::fulltext_index::error::{
AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
SerializeToJsonSnafu,
};
use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
use crate::fulltext_index::Config;
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
pub const KEY_FULLTEXT_CONFIG: &str = "fulltext_config";
pub struct BloomFilterFulltextIndexCreator {
inner: Option<BloomFilterCreator>,
analyzer: Analyzer,
config: Config,
}
impl BloomFilterFulltextIndexCreator {
pub fn new(
config: Config,
rows_per_segment: usize,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let tokenizer = match config.analyzer {
crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
};
let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
let inner = BloomFilterCreator::new(
rows_per_segment,
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
);
Self {
inner: Some(inner),
analyzer,
config,
}
}
}
#[async_trait]
impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
async fn push_text(&mut self, text: &str) -> Result<()> {
let tokens = self.analyzer.analyze_text(text)?;
self.inner
.as_mut()
.context(AbortedSnafu)?
.push_row_elems(tokens)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(())
}
async fn finish(
&mut self,
puffin_writer: &mut (impl PuffinWriter + Send),
blob_key: &str,
put_options: PutOptions,
) -> Result<u64> {
let creator = self.inner.as_mut().context(AbortedSnafu)?;
let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let property_key = KEY_FULLTEXT_CONFIG.to_string();
let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
let (index_finish, puffin_add_blob) = futures::join!(
creator.finish(tx.compat_write()),
puffin_writer.put_blob(
blob_key,
rx.compat(),
put_options,
HashMap::from([(property_key, property_value)]),
)
);
match (
puffin_add_blob.context(PuffinAddBlobSnafu),
index_finish.context(BloomFilterFinishSnafu),
) {
(Err(e1), Err(e2)) => BiErrorsSnafu {
first: Box::new(e1),
second: Box::new(e2),
}
.fail()?,
(Ok(_), e @ Err(_)) => e?,
(e @ Err(_), Ok(_)) => e.map(|_| ())?,
(Ok(written_bytes), Ok(_)) => {
return Ok(written_bytes);
}
}
Ok(0)
}
async fn abort(&mut self) -> Result<()> {
self.inner.take().context(AbortedSnafu)?;
Ok(())
}
fn memory_usage(&self) -> usize {
self.inner
.as_ref()
.map(|i| i.memory_usage())
.unwrap_or_default()
}
}