1use std::collections::HashMap;
16use std::path::{Path, PathBuf};
17
18use async_trait::async_trait;
19use common_error::ext::BoxedError;
20use puffin::puffin_manager::{PuffinWriter, PutOptions};
21use snafu::{OptionExt, ResultExt};
22use tantivy::indexer::NoMergePolicy;
23use tantivy::schema::{Schema, STORED, TEXT};
24use tantivy::store::{Compressor, ZstdCompressor};
25use tantivy::{doc, Index, IndexWriter};
26
27use crate::fulltext_index::create::FulltextIndexCreator;
28use crate::fulltext_index::error::{
29 ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, SerializeToJsonSnafu, TantivySnafu,
30};
31use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
32
33pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text";
34pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid";
35
36pub struct TantivyFulltextIndexCreator {
38 writer: Option<IndexWriter>,
40
41 text_field: tantivy::schema::Field,
43
44 rowid_field: tantivy::schema::Field,
46
47 max_rowid: u64,
49
50 path: PathBuf,
52
53 config: Config,
55}
56
57impl TantivyFulltextIndexCreator {
58 pub async fn new(path: impl AsRef<Path>, config: Config, memory_limit: usize) -> Result<Self> {
62 tokio::fs::create_dir_all(path.as_ref())
63 .await
64 .context(IoSnafu)?;
65
66 let mut schema_builder = Schema::builder();
67 let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT);
68 let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED);
69 let schema = schema_builder.build();
70
71 let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?;
72 index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default());
73 index.set_tokenizers(config.build_tantivy_tokenizer());
74
75 let memory_limit = Self::sanitize_memory_limit(memory_limit);
76
77 let writer = index
79 .writer_with_num_threads(1, memory_limit)
80 .context(TantivySnafu)?;
81 writer.set_merge_policy(Box::new(NoMergePolicy));
82
83 Ok(Self {
84 writer: Some(writer),
85 text_field,
86 rowid_field,
87 max_rowid: 0,
88 path: path.as_ref().to_path_buf(),
89 config,
90 })
91 }
92
93 fn sanitize_memory_limit(memory_limit: usize) -> usize {
94 const MARGIN_IN_BYTES: usize = 1_000_000;
96 const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
97 const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES - 1;
98
99 memory_limit.clamp(MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX)
100 }
101}
102
103#[async_trait]
104impl FulltextIndexCreator for TantivyFulltextIndexCreator {
105 async fn push_text(&mut self, text: &str) -> Result<()> {
106 let writer = self.writer.as_mut().context(FinishedSnafu)?;
107 let doc = doc!(self.text_field => text, self.rowid_field => self.max_rowid);
108 self.max_rowid += 1;
109 writer.add_document(doc).context(TantivySnafu)?;
110 Ok(())
111 }
112
113 async fn finish(
114 &mut self,
115 puffin_writer: &mut (impl PuffinWriter + Send),
116 blob_key: &str,
117 put_options: PutOptions,
118 ) -> Result<u64> {
119 let mut writer = self.writer.take().context(FinishedSnafu)?;
120 common_runtime::spawn_blocking_global(move || {
121 writer.commit().context(TantivySnafu)?;
122 writer.wait_merging_threads().context(TantivySnafu)
123 })
124 .await
125 .context(JoinSnafu)??;
126
127 let property_key = KEY_FULLTEXT_CONFIG.to_string();
128 let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
129
130 puffin_writer
131 .put_dir(
132 blob_key,
133 self.path.clone(),
134 put_options,
135 HashMap::from([(property_key, property_value)]),
136 )
137 .await
138 .map_err(BoxedError::new)
139 .context(ExternalSnafu)
140 }
141
142 async fn abort(&mut self) -> Result<()> {
143 let mut writer = self.writer.take().context(FinishedSnafu)?;
144 common_runtime::spawn_blocking_global(move || {
145 writer.commit().context(TantivySnafu)?;
146 writer.wait_merging_threads().context(TantivySnafu)
147 })
148 .await
149 .context(JoinSnafu)??;
150
151 tokio::fs::remove_dir_all(&self.path).await.context(IoSnafu)
152 }
153
154 fn memory_usage(&self) -> usize {
155 0
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use std::collections::HashMap;
163
164 use common_test_util::temp_dir::create_temp_dir;
165 use futures::AsyncRead;
166 use tantivy::collector::DocSetCollector;
167 use tantivy::query::QueryParser;
168 use tantivy::schema::Value;
169 use tantivy::TantivyDocument;
170
171 use super::*;
172 use crate::fulltext_index::Analyzer;
173
174 struct MockPuffinWriter;
175
176 #[async_trait]
177 impl PuffinWriter for MockPuffinWriter {
178 async fn put_blob<R>(
179 &mut self,
180 _key: &str,
181 _raw_data: R,
182 _options: PutOptions,
183 _properties: HashMap<String, String>,
184 ) -> puffin::error::Result<u64>
185 where
186 R: AsyncRead + Send,
187 {
188 unreachable!()
189 }
190
191 async fn put_dir(
192 &mut self,
193 _key: &str,
194 _dir: PathBuf,
195 _options: PutOptions,
196 _properties: HashMap<String, String>,
197 ) -> puffin::error::Result<u64> {
198 Ok(0)
199 }
200 fn set_footer_lz4_compressed(&mut self, _lz4_compressed: bool) {
201 unreachable!()
202 }
203
204 async fn finish(self) -> puffin::error::Result<u64> {
205 Ok(0)
206 }
207 }
208
209 #[tokio::test]
210 async fn test_creator_basic() {
211 let memory_limits = [1, 64_000_000, usize::MAX];
212
213 for memory_limit in memory_limits {
214 let temp_dir = create_temp_dir("test_creator_basic_");
215
216 let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"];
217 let config = Config::default();
218 build_index(&texts, temp_dir.path(), config, memory_limit).await;
219
220 let cases = [
221 ("hello", vec![0u32, 2]),
222 ("world", vec![1, 2]),
223 ("foo", vec![3]),
224 ("bar", vec![4]),
225 ];
226 query_and_check(temp_dir.path(), config, &cases).await;
227 }
228 }
229
230 #[tokio::test]
231 async fn test_creator_case_sensitive() {
232 let memory_limits = [1, 64_000_000, usize::MAX];
233
234 for memory_limit in memory_limits {
235 let temp_dir = create_temp_dir("test_creator_case_sensitive_");
236
237 let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"];
238 let config = Config {
239 case_sensitive: true,
240 ..Config::default()
241 };
242 build_index(&texts, temp_dir.path(), config, memory_limit).await;
243
244 let cases = [
245 ("hello", vec![0u32, 2]),
246 ("world", vec![1, 2]),
247 ("foo", vec![3]),
248 ("Foo", vec![]),
249 ("FOO", vec![]),
250 ("bar", vec![]),
251 ("Bar", vec![4]),
252 ("BAR", vec![]),
253 ];
254 query_and_check(temp_dir.path(), config, &cases).await;
255 }
256 }
257
258 #[tokio::test]
259 async fn test_creator_chinese() {
260 let memory_limits = [1, 64_000_000, usize::MAX];
261
262 for memory_limit in memory_limits {
263 let temp_dir = create_temp_dir("test_creator_chinese_");
264
265 let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"];
266 let config = Config {
267 analyzer: Analyzer::Chinese,
268 ..Config::default()
269 };
270 build_index(&texts, temp_dir.path(), config, memory_limit).await;
271
272 let cases = [
273 ("你好", vec![0u32, 2, 3]),
274 ("世界", vec![1, 2, 3]),
275 ("foo", vec![4]),
276 ("bar", vec![5]),
277 ];
278 query_and_check(temp_dir.path(), config, &cases).await;
279 }
280 }
281
282 #[tokio::test]
283 async fn test_creator_chinese_case_sensitive() {
284 let memory_limits = [1, 64_000_000, usize::MAX];
285
286 for memory_limit in memory_limits {
287 let temp_dir = create_temp_dir("test_creator_chinese_case_sensitive_");
288
289 let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"];
290 let config = Config {
291 case_sensitive: true,
292 analyzer: Analyzer::Chinese,
293 };
294 build_index(&texts, temp_dir.path(), config, memory_limit).await;
295
296 let cases = [
297 ("你好", vec![0u32, 2, 3]),
298 ("世界", vec![1, 2, 3]),
299 ("foo", vec![4]),
300 ("bar", vec![]),
301 ("Foo", vec![]),
302 ("FOO", vec![]),
303 ("Bar", vec![5]),
304 ("BAR", vec![]),
305 ];
306 query_and_check(temp_dir.path(), config, &cases).await;
307 }
308 }
309
310 async fn build_index(texts: &[&str], path: &Path, config: Config, memory_limit: usize) {
311 let mut creator = TantivyFulltextIndexCreator::new(path, config, memory_limit)
312 .await
313 .unwrap();
314 for text in texts {
315 creator.push_text(text).await.unwrap();
316 }
317 creator
318 .finish(&mut MockPuffinWriter, "", PutOptions::default())
319 .await
320 .unwrap();
321 }
322
323 async fn query_and_check(path: &Path, config: Config, cases: &[(&str, Vec<u32>)]) {
324 let mut index = Index::open_in_dir(path).unwrap();
325 index.set_tokenizers(config.build_tantivy_tokenizer());
326 let reader = index.reader().unwrap();
327 let searcher = reader.searcher();
328 for (query, expected) in cases {
329 let query_parser = QueryParser::for_index(
330 &index,
331 vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()],
332 );
333 let query = query_parser.parse_query(query).unwrap();
334 let docs = searcher.search(&query, &DocSetCollector).unwrap();
335
336 let mut res = vec![];
337 let rowid_field = searcher.schema().get_field(ROWID_FIELD_NAME).unwrap();
338 for doc_addr in &docs {
339 let doc: TantivyDocument = searcher.doc(*doc_addr).unwrap();
340 let rowid = doc.get_first(rowid_field).unwrap().as_u64().unwrap();
341 assert_eq!(rowid as u32, doc_addr.doc_id);
342 res.push(rowid as u32);
343 }
344
345 res.sort();
346 assert_eq!(res, *expected);
347 }
348 }
349}