index/fulltext_index/search/
tantivy.rs1use std::collections::{BTreeSet, HashMap};
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Instant;
19
20use async_trait::async_trait;
21use common_telemetry::debug;
22use snafu::{OptionExt, ResultExt};
23use tantivy::collector::DocSetCollector;
24use tantivy::query::QueryParser;
25use tantivy::schema::{Field, Value};
26use tantivy::{Index, IndexReader, ReloadPolicy, TantivyDocument};
27
28use crate::fulltext_index::Config;
29use crate::fulltext_index::create::{ROWID_FIELD_NAME, TEXT_FIELD_NAME};
30use crate::fulltext_index::error::{
31 JoinSnafu, Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu,
32};
33use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};
34
35pub struct TantivyFulltextIndexSearcher {
37 inner: Arc<TantivySearcherInner>,
38}
39
40struct TantivySearcherInner {
41 index: Index,
43 reader: IndexReader,
45 default_field: Field,
47}
48
49impl TantivyFulltextIndexSearcher {
50 pub fn new(path: impl AsRef<Path>, config: Config) -> Result<Self> {
52 let now = Instant::now();
53
54 let mut index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?;
55 index.set_tokenizers(config.build_tantivy_tokenizer());
56 let reader = index
57 .reader_builder()
58 .reload_policy(ReloadPolicy::Manual)
59 .num_warming_threads(0)
60 .try_into()
61 .context(TantivySnafu)?;
62 let default_field = index
63 .schema()
64 .get_field(TEXT_FIELD_NAME)
65 .context(TantivySnafu)?;
66
67 debug!(
68 "Opened tantivy index on {:?} in {:?}",
69 path.as_ref(),
70 now.elapsed()
71 );
72
73 Ok(Self {
74 inner: Arc::new(TantivySearcherInner {
75 index,
76 reader,
77 default_field,
78 }),
79 })
80 }
81}
82
83fn search_sync(inner: &TantivySearcherInner, query: &str) -> Result<BTreeSet<RowId>> {
84 let searcher = inner.reader.searcher();
85 let query_parser = QueryParser::for_index(&inner.index, vec![inner.default_field]);
86 let query = query_parser
87 .parse_query(query)
88 .context(TantivyParserSnafu)?;
89 let doc_addrs = searcher
90 .search(&query, &DocSetCollector)
91 .context(TantivySnafu)?;
92
93 let seg_metas = inner
94 .index
95 .searchable_segment_metas()
96 .context(TantivySnafu)?;
97
98 if seg_metas.len() == 1 {
101 return Ok(doc_addrs.into_iter().map(|d| d.doc_id).collect());
102 }
103
104 let rowid_field = searcher
106 .schema()
107 .get_field(ROWID_FIELD_NAME)
108 .context(TantivySnafu)?;
109 let mut seg_offsets = HashMap::with_capacity(seg_metas.len());
110 let mut res = BTreeSet::new();
111 for doc_addr in doc_addrs {
112 let offset = if let Some(offset) = seg_offsets.get(&doc_addr.segment_ord) {
113 *offset
114 } else {
115 let doc: TantivyDocument = searcher.doc(doc_addr).context(TantivySnafu)?;
118 let rowid = doc
119 .get_first(rowid_field)
120 .and_then(|v| v.as_u64())
121 .context(TantivyDocNotFoundSnafu { doc_addr })?;
122
123 let offset = rowid as u32 - doc_addr.doc_id;
124 seg_offsets.insert(doc_addr.segment_ord, offset);
125 offset
126 };
127
128 res.insert(doc_addr.doc_id + offset);
129 }
130
131 Ok(res)
132}
133
134#[async_trait]
135impl FulltextIndexSearcher for TantivyFulltextIndexSearcher {
136 async fn search(&self, query: &str) -> Result<BTreeSet<RowId>> {
137 let inner = self.inner.clone();
138 let query = query.to_string();
139 common_runtime::spawn_blocking_global(move || search_sync(&inner, &query))
140 .await
141 .context(JoinSnafu)?
142 }
143}