index/fulltext_index/create/
bloom_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use puffin::puffin_manager::{PuffinWriter, PutOptions};
22use snafu::{OptionExt, ResultExt};
23use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
24
25use crate::bloom_filter::creator::BloomFilterCreator;
26use crate::external_provider::ExternalTempFileProvider;
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29    AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
30    SerializeToJsonSnafu,
31};
32use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
33use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
34
35const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
36
37/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter.
38pub struct BloomFilterFulltextIndexCreator {
39    inner: Option<BloomFilterCreator>,
40    analyzer: Analyzer,
41    config: Config,
42}
43
44impl BloomFilterFulltextIndexCreator {
45    pub fn new(
46        config: Config,
47        rows_per_segment: usize,
48        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
49        global_memory_usage: Arc<AtomicUsize>,
50        global_memory_usage_threshold: Option<usize>,
51    ) -> Self {
52        let tokenizer = match config.analyzer {
53            crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
54            crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
55        };
56        let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
57
58        let inner = BloomFilterCreator::new(
59            rows_per_segment,
60            intermediate_provider,
61            global_memory_usage,
62            global_memory_usage_threshold,
63        );
64        Self {
65            inner: Some(inner),
66            analyzer,
67            config,
68        }
69    }
70}
71
72#[async_trait]
73impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
74    async fn push_text(&mut self, text: &str) -> Result<()> {
75        let tokens = self.analyzer.analyze_text(text)?;
76        self.inner
77            .as_mut()
78            .context(AbortedSnafu)?
79            .push_row_elems(tokens)
80            .await
81            .map_err(BoxedError::new)
82            .context(ExternalSnafu)?;
83        Ok(())
84    }
85
86    async fn finish(
87        &mut self,
88        puffin_writer: &mut (impl PuffinWriter + Send),
89        blob_key: &str,
90        put_options: PutOptions,
91    ) -> Result<u64> {
92        let creator = self.inner.as_mut().context(AbortedSnafu)?;
93
94        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
95
96        let property_key = KEY_FULLTEXT_CONFIG.to_string();
97        let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
98
99        let (index_finish, puffin_add_blob) = futures::join!(
100            creator.finish(tx.compat_write()),
101            puffin_writer.put_blob(
102                blob_key,
103                rx.compat(),
104                put_options,
105                HashMap::from([(property_key, property_value)]),
106            )
107        );
108
109        match (
110            puffin_add_blob.context(PuffinAddBlobSnafu),
111            index_finish.context(BloomFilterFinishSnafu),
112        ) {
113            (Err(e1), Err(e2)) => BiErrorsSnafu {
114                first: Box::new(e1),
115                second: Box::new(e2),
116            }
117            .fail()?,
118
119            (Ok(_), e @ Err(_)) => e?,
120            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
121            (Ok(written_bytes), Ok(_)) => {
122                return Ok(written_bytes);
123            }
124        }
125        Ok(0)
126    }
127
128    async fn abort(&mut self) -> Result<()> {
129        self.inner.take().context(AbortedSnafu)?;
130        Ok(())
131    }
132
133    fn memory_usage(&self) -> usize {
134        self.inner
135            .as_ref()
136            .map(|i| i.memory_usage())
137            .unwrap_or_default()
138    }
139}