index/fulltext_index/create/
bloom_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use puffin::puffin_manager::{PuffinWriter, PutOptions};
22use snafu::{OptionExt, ResultExt};
23use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
24
25use crate::bloom_filter::creator::BloomFilterCreator;
26use crate::external_provider::ExternalTempFileProvider;
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29    AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
30    SerializeToJsonSnafu,
31};
32use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
33use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
34
35const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
36
37/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter.
38pub struct BloomFilterFulltextIndexCreator {
39    inner: Option<BloomFilterCreator>,
40    analyzer: Analyzer,
41    config: Config,
42}
43
44impl BloomFilterFulltextIndexCreator {
45    pub fn new(
46        config: Config,
47        rows_per_segment: usize,
48        false_positive_rate: f64,
49        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
50        global_memory_usage: Arc<AtomicUsize>,
51        global_memory_usage_threshold: Option<usize>,
52    ) -> Self {
53        let tokenizer = match config.analyzer {
54            crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
55            crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
56        };
57        let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
58
59        let inner = BloomFilterCreator::new(
60            rows_per_segment,
61            false_positive_rate,
62            intermediate_provider,
63            global_memory_usage,
64            global_memory_usage_threshold,
65        );
66        Self {
67            inner: Some(inner),
68            analyzer,
69            config,
70        }
71    }
72}
73
74#[async_trait]
75impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
76    async fn push_text(&mut self, text: &str) -> Result<()> {
77        let tokens = self.analyzer.analyze_text(text)?;
78        self.inner
79            .as_mut()
80            .context(AbortedSnafu)?
81            .push_row_elems(tokens)
82            .await
83            .map_err(BoxedError::new)
84            .context(ExternalSnafu)?;
85        Ok(())
86    }
87
88    async fn finish(
89        &mut self,
90        puffin_writer: &mut (impl PuffinWriter + Send),
91        blob_key: &str,
92        mut put_options: PutOptions,
93    ) -> Result<u64> {
94        // Compressing the bloom filter doesn't reduce the size but hurts read performance.
95        // Always disable compression here.
96        put_options.compression = None;
97
98        let creator = self.inner.as_mut().context(AbortedSnafu)?;
99
100        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
101
102        let property_key = KEY_FULLTEXT_CONFIG.to_string();
103        let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
104
105        let (index_finish, puffin_add_blob) = futures::join!(
106            creator.finish(tx.compat_write()),
107            puffin_writer.put_blob(
108                blob_key,
109                rx.compat(),
110                put_options,
111                HashMap::from([(property_key, property_value)]),
112            )
113        );
114
115        match (
116            puffin_add_blob.context(PuffinAddBlobSnafu),
117            index_finish.context(BloomFilterFinishSnafu),
118        ) {
119            (Err(e1), Err(e2)) => BiErrorsSnafu {
120                first: Box::new(e1),
121                second: Box::new(e2),
122            }
123            .fail()?,
124
125            (Ok(_), e @ Err(_)) => e?,
126            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
127            (Ok(written_bytes), Ok(_)) => {
128                return Ok(written_bytes);
129            }
130        }
131        Ok(0)
132    }
133
134    async fn abort(&mut self) -> Result<()> {
135        self.inner.take().context(AbortedSnafu)?;
136        Ok(())
137    }
138
139    fn memory_usage(&self) -> usize {
140        self.inner
141            .as_ref()
142            .map(|i| i.memory_usage())
143            .unwrap_or_default()
144    }
145}