index/fulltext_index/create/
bloom_filter.rs1use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use puffin::puffin_manager::{PuffinWriter, PutOptions};
22use snafu::{OptionExt, ResultExt};
23use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
24
25use crate::bloom_filter::creator::BloomFilterCreator;
26use crate::external_provider::ExternalTempFileProvider;
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29 AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
30 SerializeToJsonSnafu,
31};
32use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
33use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
34
35const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
36
37pub struct BloomFilterFulltextIndexCreator {
39 inner: Option<BloomFilterCreator>,
40 analyzer: Analyzer,
41 config: Config,
42}
43
44impl BloomFilterFulltextIndexCreator {
45 pub fn new(
46 config: Config,
47 rows_per_segment: usize,
48 false_positive_rate: f64,
49 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
50 global_memory_usage: Arc<AtomicUsize>,
51 global_memory_usage_threshold: Option<usize>,
52 ) -> Self {
53 let tokenizer = match config.analyzer {
54 crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
55 crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
56 };
57 let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
58
59 let inner = BloomFilterCreator::new(
60 rows_per_segment,
61 false_positive_rate,
62 intermediate_provider,
63 global_memory_usage,
64 global_memory_usage_threshold,
65 );
66 Self {
67 inner: Some(inner),
68 analyzer,
69 config,
70 }
71 }
72}
73
74#[async_trait]
75impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
76 async fn push_text(&mut self, text: &str) -> Result<()> {
77 let tokens = self.analyzer.analyze_text(text)?;
78 self.inner
79 .as_mut()
80 .context(AbortedSnafu)?
81 .push_row_elems(tokens)
82 .await
83 .map_err(BoxedError::new)
84 .context(ExternalSnafu)?;
85 Ok(())
86 }
87
88 async fn finish(
89 &mut self,
90 puffin_writer: &mut (impl PuffinWriter + Send),
91 blob_key: &str,
92 put_options: PutOptions,
93 ) -> Result<u64> {
94 let creator = self.inner.as_mut().context(AbortedSnafu)?;
95
96 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
97
98 let property_key = KEY_FULLTEXT_CONFIG.to_string();
99 let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
100
101 let (index_finish, puffin_add_blob) = futures::join!(
102 creator.finish(tx.compat_write()),
103 puffin_writer.put_blob(
104 blob_key,
105 rx.compat(),
106 put_options,
107 HashMap::from([(property_key, property_value)]),
108 )
109 );
110
111 match (
112 puffin_add_blob.context(PuffinAddBlobSnafu),
113 index_finish.context(BloomFilterFinishSnafu),
114 ) {
115 (Err(e1), Err(e2)) => BiErrorsSnafu {
116 first: Box::new(e1),
117 second: Box::new(e2),
118 }
119 .fail()?,
120
121 (Ok(_), e @ Err(_)) => e?,
122 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
123 (Ok(written_bytes), Ok(_)) => {
124 return Ok(written_bytes);
125 }
126 }
127 Ok(0)
128 }
129
130 async fn abort(&mut self) -> Result<()> {
131 self.inner.take().context(AbortedSnafu)?;
132 Ok(())
133 }
134
135 fn memory_usage(&self) -> usize {
136 self.inner
137 .as_ref()
138 .map(|i| i.memory_usage())
139 .unwrap_or_default()
140 }
141}