index/fulltext_index/create/
tantivy.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::{Path, PathBuf};
17
18use async_trait::async_trait;
19use common_error::ext::BoxedError;
20use puffin::puffin_manager::{PuffinWriter, PutOptions};
21use snafu::{OptionExt, ResultExt};
22use tantivy::indexer::NoMergePolicy;
23use tantivy::schema::{Schema, STORED, TEXT};
24use tantivy::store::{Compressor, ZstdCompressor};
25use tantivy::{doc, Index, IndexWriter};
26
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29    ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, SerializeToJsonSnafu, TantivySnafu,
30};
31use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
32
33pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text";
34pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid";
35
36/// `TantivyFulltextIndexCreator` is a fulltext index creator using tantivy.
37pub struct TantivyFulltextIndexCreator {
38    /// The tantivy index writer.
39    writer: Option<IndexWriter>,
40
41    /// The field for the text.
42    text_field: tantivy::schema::Field,
43
44    /// The field for the row id.
45    rowid_field: tantivy::schema::Field,
46
47    /// The current max row id.
48    max_rowid: u64,
49
50    /// The directory path in filesystem to store the index.
51    path: PathBuf,
52
53    /// The configuration of the fulltext index.
54    config: Config,
55}
56
57impl TantivyFulltextIndexCreator {
58    /// Creates a new `TantivyFulltextIndexCreator`.
59    ///
60    /// The `path` is the directory path in filesystem to store the index.
61    pub async fn new(path: impl AsRef<Path>, config: Config, memory_limit: usize) -> Result<Self> {
62        tokio::fs::create_dir_all(path.as_ref())
63            .await
64            .context(IoSnafu)?;
65
66        let mut schema_builder = Schema::builder();
67        let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT);
68        let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED);
69        let schema = schema_builder.build();
70
71        let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?;
72        index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default());
73        index.set_tokenizers(config.build_tantivy_tokenizer());
74
75        let memory_limit = Self::sanitize_memory_limit(memory_limit);
76
77        // Use one thread to keep order of the row id.
78        let writer = index
79            .writer_with_num_threads(1, memory_limit)
80            .context(TantivySnafu)?;
81        writer.set_merge_policy(Box::new(NoMergePolicy));
82
83        Ok(Self {
84            writer: Some(writer),
85            text_field,
86            rowid_field,
87            max_rowid: 0,
88            path: path.as_ref().to_path_buf(),
89            config,
90        })
91    }
92
93    fn sanitize_memory_limit(memory_limit: usize) -> usize {
94        // Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX}
95        const MARGIN_IN_BYTES: usize = 1_000_000;
96        const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
97        const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES - 1;
98
99        memory_limit.clamp(MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX)
100    }
101}
102
103#[async_trait]
104impl FulltextIndexCreator for TantivyFulltextIndexCreator {
105    async fn push_text(&mut self, text: &str) -> Result<()> {
106        let writer = self.writer.as_mut().context(FinishedSnafu)?;
107        let doc = doc!(self.text_field => text, self.rowid_field => self.max_rowid);
108        self.max_rowid += 1;
109        writer.add_document(doc).context(TantivySnafu)?;
110        Ok(())
111    }
112
113    async fn finish(
114        &mut self,
115        puffin_writer: &mut (impl PuffinWriter + Send),
116        blob_key: &str,
117        put_options: PutOptions,
118    ) -> Result<u64> {
119        let mut writer = self.writer.take().context(FinishedSnafu)?;
120        common_runtime::spawn_blocking_global(move || {
121            writer.commit().context(TantivySnafu)?;
122            writer.wait_merging_threads().context(TantivySnafu)
123        })
124        .await
125        .context(JoinSnafu)??;
126
127        let property_key = KEY_FULLTEXT_CONFIG.to_string();
128        let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
129
130        puffin_writer
131            .put_dir(
132                blob_key,
133                self.path.clone(),
134                put_options,
135                HashMap::from([(property_key, property_value)]),
136            )
137            .await
138            .map_err(BoxedError::new)
139            .context(ExternalSnafu)
140    }
141
142    async fn abort(&mut self) -> Result<()> {
143        let mut writer = self.writer.take().context(FinishedSnafu)?;
144        common_runtime::spawn_blocking_global(move || {
145            writer.commit().context(TantivySnafu)?;
146            writer.wait_merging_threads().context(TantivySnafu)
147        })
148        .await
149        .context(JoinSnafu)??;
150
151        tokio::fs::remove_dir_all(&self.path).await.context(IoSnafu)
152    }
153
154    fn memory_usage(&self) -> usize {
155        // Unable to get the memory usage of `IndexWriter`.
156        0
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use std::collections::HashMap;
163
164    use common_test_util::temp_dir::create_temp_dir;
165    use futures::AsyncRead;
166    use tantivy::collector::DocSetCollector;
167    use tantivy::query::QueryParser;
168    use tantivy::schema::Value;
169    use tantivy::TantivyDocument;
170
171    use super::*;
172    use crate::fulltext_index::Analyzer;
173
174    struct MockPuffinWriter;
175
176    #[async_trait]
177    impl PuffinWriter for MockPuffinWriter {
178        async fn put_blob<R>(
179            &mut self,
180            _key: &str,
181            _raw_data: R,
182            _options: PutOptions,
183            _properties: HashMap<String, String>,
184        ) -> puffin::error::Result<u64>
185        where
186            R: AsyncRead + Send,
187        {
188            unreachable!()
189        }
190
191        async fn put_dir(
192            &mut self,
193            _key: &str,
194            _dir: PathBuf,
195            _options: PutOptions,
196            _properties: HashMap<String, String>,
197        ) -> puffin::error::Result<u64> {
198            Ok(0)
199        }
200        fn set_footer_lz4_compressed(&mut self, _lz4_compressed: bool) {
201            unreachable!()
202        }
203
204        async fn finish(self) -> puffin::error::Result<u64> {
205            Ok(0)
206        }
207    }
208
209    #[tokio::test]
210    async fn test_creator_basic() {
211        let memory_limits = [1, 64_000_000, usize::MAX];
212
213        for memory_limit in memory_limits {
214            let temp_dir = create_temp_dir("test_creator_basic_");
215
216            let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"];
217            let config = Config::default();
218            build_index(&texts, temp_dir.path(), config, memory_limit).await;
219
220            let cases = [
221                ("hello", vec![0u32, 2]),
222                ("world", vec![1, 2]),
223                ("foo", vec![3]),
224                ("bar", vec![4]),
225            ];
226            query_and_check(temp_dir.path(), config, &cases).await;
227        }
228    }
229
230    #[tokio::test]
231    async fn test_creator_case_sensitive() {
232        let memory_limits = [1, 64_000_000, usize::MAX];
233
234        for memory_limit in memory_limits {
235            let temp_dir = create_temp_dir("test_creator_case_sensitive_");
236
237            let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"];
238            let config = Config {
239                case_sensitive: true,
240                ..Config::default()
241            };
242            build_index(&texts, temp_dir.path(), config, memory_limit).await;
243
244            let cases = [
245                ("hello", vec![0u32, 2]),
246                ("world", vec![1, 2]),
247                ("foo", vec![3]),
248                ("Foo", vec![]),
249                ("FOO", vec![]),
250                ("bar", vec![]),
251                ("Bar", vec![4]),
252                ("BAR", vec![]),
253            ];
254            query_and_check(temp_dir.path(), config, &cases).await;
255        }
256    }
257
258    #[tokio::test]
259    async fn test_creator_chinese() {
260        let memory_limits = [1, 64_000_000, usize::MAX];
261
262        for memory_limit in memory_limits {
263            let temp_dir = create_temp_dir("test_creator_chinese_");
264
265            let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"];
266            let config = Config {
267                analyzer: Analyzer::Chinese,
268                ..Config::default()
269            };
270            build_index(&texts, temp_dir.path(), config, memory_limit).await;
271
272            let cases = [
273                ("你好", vec![0u32, 2, 3]),
274                ("世界", vec![1, 2, 3]),
275                ("foo", vec![4]),
276                ("bar", vec![5]),
277            ];
278            query_and_check(temp_dir.path(), config, &cases).await;
279        }
280    }
281
282    #[tokio::test]
283    async fn test_creator_chinese_case_sensitive() {
284        let memory_limits = [1, 64_000_000, usize::MAX];
285
286        for memory_limit in memory_limits {
287            let temp_dir = create_temp_dir("test_creator_chinese_case_sensitive_");
288
289            let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"];
290            let config = Config {
291                case_sensitive: true,
292                analyzer: Analyzer::Chinese,
293            };
294            build_index(&texts, temp_dir.path(), config, memory_limit).await;
295
296            let cases = [
297                ("你好", vec![0u32, 2, 3]),
298                ("世界", vec![1, 2, 3]),
299                ("foo", vec![4]),
300                ("bar", vec![]),
301                ("Foo", vec![]),
302                ("FOO", vec![]),
303                ("Bar", vec![5]),
304                ("BAR", vec![]),
305            ];
306            query_and_check(temp_dir.path(), config, &cases).await;
307        }
308    }
309
310    async fn build_index(texts: &[&str], path: &Path, config: Config, memory_limit: usize) {
311        let mut creator = TantivyFulltextIndexCreator::new(path, config, memory_limit)
312            .await
313            .unwrap();
314        for text in texts {
315            creator.push_text(text).await.unwrap();
316        }
317        creator
318            .finish(&mut MockPuffinWriter, "", PutOptions::default())
319            .await
320            .unwrap();
321    }
322
323    async fn query_and_check(path: &Path, config: Config, cases: &[(&str, Vec<u32>)]) {
324        let mut index = Index::open_in_dir(path).unwrap();
325        index.set_tokenizers(config.build_tantivy_tokenizer());
326        let reader = index.reader().unwrap();
327        let searcher = reader.searcher();
328        for (query, expected) in cases {
329            let query_parser = QueryParser::for_index(
330                &index,
331                vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()],
332            );
333            let query = query_parser.parse_query(query).unwrap();
334            let docs = searcher.search(&query, &DocSetCollector).unwrap();
335
336            let mut res = vec![];
337            let rowid_field = searcher.schema().get_field(ROWID_FIELD_NAME).unwrap();
338            for doc_addr in &docs {
339                let doc: TantivyDocument = searcher.doc(*doc_addr).unwrap();
340                let rowid = doc.get_first(rowid_field).unwrap().as_u64().unwrap();
341                assert_eq!(rowid as u32, doc_addr.doc_id);
342                res.push(rowid as u32);
343            }
344
345            res.sort();
346            assert_eq!(res, *expected);
347        }
348    }
349}