index/fulltext_index/create/
bloom_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use puffin::puffin_manager::{PuffinWriter, PutOptions};
22use snafu::{OptionExt, ResultExt};
23use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
24
25use crate::bloom_filter::creator::BloomFilterCreator;
26use crate::external_provider::ExternalTempFileProvider;
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29    AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
30    SerializeToJsonSnafu,
31};
32use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
33use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
34
35const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
36
37/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter.
38pub struct BloomFilterFulltextIndexCreator {
39    inner: Option<BloomFilterCreator>,
40    analyzer: Analyzer,
41    config: Config,
42}
43
44impl BloomFilterFulltextIndexCreator {
45    pub fn new(
46        config: Config,
47        rows_per_segment: usize,
48        false_positive_rate: f64,
49        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
50        global_memory_usage: Arc<AtomicUsize>,
51        global_memory_usage_threshold: Option<usize>,
52    ) -> Self {
53        let tokenizer = match config.analyzer {
54            crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
55            crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
56        };
57        let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
58
59        let inner = BloomFilterCreator::new(
60            rows_per_segment,
61            false_positive_rate,
62            intermediate_provider,
63            global_memory_usage,
64            global_memory_usage_threshold,
65        );
66        Self {
67            inner: Some(inner),
68            analyzer,
69            config,
70        }
71    }
72}
73
74#[async_trait]
75impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
76    async fn push_text(&mut self, text: &str) -> Result<()> {
77        let tokens = self.analyzer.analyze_text(text)?;
78        self.inner
79            .as_mut()
80            .context(AbortedSnafu)?
81            .push_row_elems(tokens)
82            .await
83            .map_err(BoxedError::new)
84            .context(ExternalSnafu)?;
85        Ok(())
86    }
87
88    async fn finish(
89        &mut self,
90        puffin_writer: &mut (impl PuffinWriter + Send),
91        blob_key: &str,
92        put_options: PutOptions,
93    ) -> Result<u64> {
94        let creator = self.inner.as_mut().context(AbortedSnafu)?;
95
96        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
97
98        let property_key = KEY_FULLTEXT_CONFIG.to_string();
99        let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
100
101        let (index_finish, puffin_add_blob) = futures::join!(
102            creator.finish(tx.compat_write()),
103            puffin_writer.put_blob(
104                blob_key,
105                rx.compat(),
106                put_options,
107                HashMap::from([(property_key, property_value)]),
108            )
109        );
110
111        match (
112            puffin_add_blob.context(PuffinAddBlobSnafu),
113            index_finish.context(BloomFilterFinishSnafu),
114        ) {
115            (Err(e1), Err(e2)) => BiErrorsSnafu {
116                first: Box::new(e1),
117                second: Box::new(e2),
118            }
119            .fail()?,
120
121            (Ok(_), e @ Err(_)) => e?,
122            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
123            (Ok(written_bytes), Ok(_)) => {
124                return Ok(written_bytes);
125            }
126        }
127        Ok(0)
128    }
129
130    async fn abort(&mut self) -> Result<()> {
131        self.inner.take().context(AbortedSnafu)?;
132        Ok(())
133    }
134
135    fn memory_usage(&self) -> usize {
136        self.inner
137            .as_ref()
138            .map(|i| i.memory_usage())
139            .unwrap_or_default()
140    }
141}