index/fulltext_index/create/
bloom_filter.rs1use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use puffin::puffin_manager::{PuffinWriter, PutOptions};
22use snafu::{OptionExt, ResultExt};
23use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
24
25use crate::bloom_filter::creator::BloomFilterCreator;
26use crate::external_provider::ExternalTempFileProvider;
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29 AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
30 SerializeToJsonSnafu,
31};
32use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
33use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
34
35const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
36
37pub struct BloomFilterFulltextIndexCreator {
39 inner: Option<BloomFilterCreator>,
40 analyzer: Analyzer,
41 config: Config,
42}
43
44impl BloomFilterFulltextIndexCreator {
45 pub fn new(
46 config: Config,
47 rows_per_segment: usize,
48 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
49 global_memory_usage: Arc<AtomicUsize>,
50 global_memory_usage_threshold: Option<usize>,
51 ) -> Self {
52 let tokenizer = match config.analyzer {
53 crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
54 crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
55 };
56 let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
57
58 let inner = BloomFilterCreator::new(
59 rows_per_segment,
60 intermediate_provider,
61 global_memory_usage,
62 global_memory_usage_threshold,
63 );
64 Self {
65 inner: Some(inner),
66 analyzer,
67 config,
68 }
69 }
70}
71
72#[async_trait]
73impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
74 async fn push_text(&mut self, text: &str) -> Result<()> {
75 let tokens = self.analyzer.analyze_text(text)?;
76 self.inner
77 .as_mut()
78 .context(AbortedSnafu)?
79 .push_row_elems(tokens)
80 .await
81 .map_err(BoxedError::new)
82 .context(ExternalSnafu)?;
83 Ok(())
84 }
85
86 async fn finish(
87 &mut self,
88 puffin_writer: &mut (impl PuffinWriter + Send),
89 blob_key: &str,
90 put_options: PutOptions,
91 ) -> Result<u64> {
92 let creator = self.inner.as_mut().context(AbortedSnafu)?;
93
94 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
95
96 let property_key = KEY_FULLTEXT_CONFIG.to_string();
97 let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
98
99 let (index_finish, puffin_add_blob) = futures::join!(
100 creator.finish(tx.compat_write()),
101 puffin_writer.put_blob(
102 blob_key,
103 rx.compat(),
104 put_options,
105 HashMap::from([(property_key, property_value)]),
106 )
107 );
108
109 match (
110 puffin_add_blob.context(PuffinAddBlobSnafu),
111 index_finish.context(BloomFilterFinishSnafu),
112 ) {
113 (Err(e1), Err(e2)) => BiErrorsSnafu {
114 first: Box::new(e1),
115 second: Box::new(e2),
116 }
117 .fail()?,
118
119 (Ok(_), e @ Err(_)) => e?,
120 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
121 (Ok(written_bytes), Ok(_)) => {
122 return Ok(written_bytes);
123 }
124 }
125 Ok(0)
126 }
127
128 async fn abort(&mut self) -> Result<()> {
129 self.inner.take().context(AbortedSnafu)?;
130 Ok(())
131 }
132
133 fn memory_usage(&self) -> usize {
134 self.inner
135 .as_ref()
136 .map(|i| i.memory_usage())
137 .unwrap_or_default()
138 }
139}