1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
26use index::fulltext_index::{Analyzer, Config};
27use index::target::IndexTarget;
28use object_store::ObjectStore;
29use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
30use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
31use snafu::ResultExt;
32use store_api::region_request::PathType;
33use store_api::storage::ColumnId;
34
35use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
36use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
37use crate::cache::index::bloom_filter_index::{
38 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
39};
40use crate::cache::index::result_cache::PredicateKey;
41use crate::error::{
42 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
43 PuffinReadBlobSnafu, Result,
44};
45use crate::metrics::INDEX_APPLY_ELAPSED;
46use crate::sst::file::RegionFileId;
47use crate::sst::index::TYPE_FULLTEXT_INDEX;
48use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
49use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
50use crate::sst::index::puffin_manager::{
51 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
52};
53
54pub mod builder;
55
56pub struct FulltextIndexApplier {
58 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
60
61 index_source: IndexSource,
63
64 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
66
67 predicate_key: PredicateKey,
69}
70
71pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
72
73impl FulltextIndexApplier {
74 pub fn new(
76 table_dir: String,
77 path_type: PathType,
78 store: ObjectStore,
79 requests: BTreeMap<ColumnId, FulltextRequest>,
80 puffin_manager_factory: PuffinManagerFactory,
81 ) -> Self {
82 let requests = Arc::new(requests);
83 let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
84
85 Self {
86 predicate_key: PredicateKey::new_fulltext(requests.clone()),
87 requests,
88 index_source,
89 bloom_filter_index_cache: None,
90 }
91 }
92
93 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
95 self.index_source.set_file_cache(file_cache);
96 self
97 }
98
99 pub fn with_puffin_metadata_cache(
101 mut self,
102 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
103 ) -> Self {
104 self.index_source
105 .set_puffin_metadata_cache(puffin_metadata_cache);
106 self
107 }
108
109 pub fn with_bloom_filter_cache(
111 mut self,
112 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
113 ) -> Self {
114 self.bloom_filter_index_cache = bloom_filter_index_cache;
115 self
116 }
117
118 pub fn predicate_key(&self) -> &PredicateKey {
120 &self.predicate_key
121 }
122}
123
124impl FulltextIndexApplier {
125 pub async fn apply_fine(
128 &self,
129 file_id: RegionFileId,
130 file_size_hint: Option<u64>,
131 ) -> Result<Option<BTreeSet<RowId>>> {
132 let timer = INDEX_APPLY_ELAPSED
133 .with_label_values(&[TYPE_FULLTEXT_INDEX])
134 .start_timer();
135
136 let mut row_ids: Option<BTreeSet<RowId>> = None;
137 for (column_id, request) in self.requests.iter() {
138 if request.queries.is_empty() && request.terms.is_empty() {
139 continue;
140 }
141
142 let Some(result) = self
143 .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
144 .await?
145 else {
146 continue;
147 };
148
149 if let Some(ids) = row_ids.as_mut() {
150 ids.retain(|id| result.contains(id));
151 } else {
152 row_ids = Some(result);
153 }
154
155 if let Some(ids) = row_ids.as_ref()
156 && ids.is_empty()
157 {
158 break;
159 }
160 }
161
162 if row_ids.is_none() {
163 timer.stop_and_discard();
164 }
165 Ok(row_ids)
166 }
167
168 async fn apply_fine_one_column(
169 &self,
170 file_size_hint: Option<u64>,
171 file_id: RegionFileId,
172 column_id: ColumnId,
173 request: &FulltextRequest,
174 ) -> Result<Option<BTreeSet<RowId>>> {
175 let blob_key = format!(
176 "{INDEX_BLOB_TYPE_TANTIVY}-{}",
177 IndexTarget::ColumnId(column_id)
178 );
179 let dir = self
180 .index_source
181 .dir(file_id, &blob_key, file_size_hint)
182 .await?;
183
184 let dir = match &dir {
185 Some(dir) => dir,
186 None => {
187 return Ok(None);
188 }
189 };
190
191 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
192 let path = dir.path();
193
194 let searcher =
195 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
196 let mut row_ids: Option<BTreeSet<RowId>> = None;
197
198 for query in &request.queries {
200 let result = searcher
201 .search(&query.0)
202 .await
203 .context(ApplyFulltextIndexSnafu)?;
204
205 if let Some(ids) = row_ids.as_mut() {
206 ids.retain(|id| result.contains(id));
207 } else {
208 row_ids = Some(result);
209 }
210
211 if let Some(ids) = row_ids.as_ref()
212 && ids.is_empty()
213 {
214 break;
215 }
216 }
217
218 let query = request.terms_as_query(config.case_sensitive);
220 if !query.0.is_empty() {
221 let result = searcher
222 .search(&query.0)
223 .await
224 .context(ApplyFulltextIndexSnafu)?;
225
226 if let Some(ids) = row_ids.as_mut() {
227 ids.retain(|id| result.contains(id));
228 } else {
229 row_ids = Some(result);
230 }
231 }
232
233 Ok(row_ids)
234 }
235}
236
237impl FulltextIndexApplier {
238 pub async fn apply_coarse(
244 &self,
245 file_id: RegionFileId,
246 file_size_hint: Option<u64>,
247 row_groups: impl Iterator<Item = (usize, bool)>,
248 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
249 let timer = INDEX_APPLY_ELAPSED
250 .with_label_values(&[TYPE_FULLTEXT_INDEX])
251 .start_timer();
252
253 let (input, mut output) = Self::init_coarse_output(row_groups);
254 let mut applied = false;
255
256 for (column_id, request) in self.requests.iter() {
257 if request.terms.is_empty() {
258 continue;
260 }
261
262 applied |= self
263 .apply_coarse_one_column(
264 file_id,
265 file_size_hint,
266 *column_id,
267 &request.terms,
268 &mut output,
269 )
270 .await?;
271 }
272
273 if !applied {
274 timer.stop_and_discard();
275 return Ok(None);
276 }
277
278 Self::adjust_coarse_output(input, &mut output);
279 Ok(Some(output))
280 }
281
282 async fn apply_coarse_one_column(
283 &self,
284 file_id: RegionFileId,
285 file_size_hint: Option<u64>,
286 column_id: ColumnId,
287 terms: &[FulltextTerm],
288 output: &mut [(usize, Vec<Range<usize>>)],
289 ) -> Result<bool> {
290 let blob_key = format!(
291 "{INDEX_BLOB_TYPE_BLOOM}-{}",
292 IndexTarget::ColumnId(column_id)
293 );
294 let Some(reader) = self
295 .index_source
296 .blob(file_id, &blob_key, file_size_hint)
297 .await?
298 else {
299 return Ok(false);
300 };
301 let config =
302 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
303
304 let predicates = Self::terms_to_predicates(terms, &config);
305 if predicates.is_empty() {
306 return Ok(false);
307 }
308
309 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
310 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
311 let blob_size = range_reader
312 .metadata()
313 .await
314 .context(MetadataSnafu)?
315 .content_length;
316 let reader = CachedBloomFilterIndexBlobReader::new(
317 file_id.file_id(),
318 column_id,
319 Tag::Fulltext,
320 blob_size,
321 BloomFilterReaderImpl::new(range_reader),
322 bloom_filter_cache.clone(),
323 );
324 Box::new(reader) as _
325 } else {
326 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
327 };
328
329 let mut applier = BloomFilterApplier::new(reader)
330 .await
331 .context(ApplyBloomFilterIndexSnafu)?;
332 for (_, row_group_output) in output.iter_mut() {
333 if row_group_output.is_empty() {
335 continue;
336 }
337
338 *row_group_output = applier
339 .search(&predicates, row_group_output)
340 .await
341 .context(ApplyBloomFilterIndexSnafu)?;
342 }
343
344 Ok(true)
345 }
346
347 #[allow(clippy::type_complexity)]
355 fn init_coarse_output(
356 row_groups: impl Iterator<Item = (usize, bool)>,
357 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
358 let mut input = Vec::with_capacity(row_groups.size_hint().0);
360 let mut start = 0;
361 for (i, (len, to_search)) in row_groups.enumerate() {
362 let end = start + len;
363 if to_search {
364 input.push((i, start..end));
365 }
366 start = end;
367 }
368
369 let output = input
372 .iter()
373 .map(|(i, range)| (*i, vec![range.clone()]))
374 .collect::<Vec<_>>();
375
376 (input, output)
377 }
378
379 fn adjust_coarse_output(
381 input: Vec<(usize, Range<usize>)>,
382 output: &mut [(usize, Vec<Range<usize>>)],
383 ) {
384 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
386 let start = input.start;
387 for range in output.iter_mut() {
388 range.start -= start;
389 range.end -= start;
390 }
391 }
392 }
393
394 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
399 let mut probes = HashSet::new();
400 for term in terms {
401 if config.case_sensitive && term.col_lowered {
402 continue;
404 }
405 probes.extend(Self::term_to_probes(&term.term, config));
406 }
407
408 probes
409 .into_iter()
410 .map(|p| InListPredicate {
411 list: iter::once(p).collect(),
412 })
413 .collect::<Vec<_>>()
414 }
415
416 fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
417 let tokens = match config.analyzer {
418 Analyzer::English => EnglishTokenizer {}.tokenize(term),
419 Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
420 };
421
422 tokens.into_iter().map(|t| {
423 if !config.case_sensitive {
424 t.to_lowercase()
425 } else {
426 t.to_string()
427 }
428 .into_bytes()
429 })
430 }
431}
432
433struct IndexSource {
435 table_dir: String,
436
437 path_type: PathType,
439
440 puffin_manager_factory: PuffinManagerFactory,
442
443 remote_store: ObjectStore,
445
446 file_cache: Option<FileCacheRef>,
448
449 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
451}
452
453impl IndexSource {
454 fn new(
455 table_dir: String,
456 path_type: PathType,
457 puffin_manager_factory: PuffinManagerFactory,
458 remote_store: ObjectStore,
459 ) -> Self {
460 Self {
461 table_dir,
462 path_type,
463 puffin_manager_factory,
464 remote_store,
465 file_cache: None,
466 puffin_metadata_cache: None,
467 }
468 }
469
470 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
471 self.file_cache = file_cache;
472 }
473
474 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
475 self.puffin_metadata_cache = puffin_metadata_cache;
476 }
477
478 async fn blob(
482 &self,
483 file_id: RegionFileId,
484 key: &str,
485 file_size_hint: Option<u64>,
486 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
487 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
488 let res = reader.blob(key).await;
489 match res {
490 Ok(blob) => Ok(Some(blob)),
491 Err(err) if err.is_blob_not_found() => Ok(None),
492 Err(err) => {
493 if fallbacked {
494 Err(err).context(PuffinReadBlobSnafu)
495 } else {
496 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
497 let reader = self.build_remote(file_id, file_size_hint).await?;
498 let res = reader.blob(key).await;
499 match res {
500 Ok(blob) => Ok(Some(blob)),
501 Err(err) if err.is_blob_not_found() => Ok(None),
502 Err(err) => Err(err).context(PuffinReadBlobSnafu),
503 }
504 }
505 }
506 }
507 }
508
509 async fn dir(
513 &self,
514 file_id: RegionFileId,
515 key: &str,
516 file_size_hint: Option<u64>,
517 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
518 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
519 let res = reader.dir(key).await;
520 match res {
521 Ok(dir) => Ok(Some(dir)),
522 Err(err) if err.is_blob_not_found() => Ok(None),
523 Err(err) => {
524 if fallbacked {
525 Err(err).context(PuffinReadBlobSnafu)
526 } else {
527 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
528 let reader = self.build_remote(file_id, file_size_hint).await?;
529 let res = reader.dir(key).await;
530 match res {
531 Ok(dir) => Ok(Some(dir)),
532 Err(err) if err.is_blob_not_found() => Ok(None),
533 Err(err) => Err(err).context(PuffinReadBlobSnafu),
534 }
535 }
536 }
537 }
538 }
539
540 async fn ensure_reader(
542 &self,
543 file_id: RegionFileId,
544 file_size_hint: Option<u64>,
545 ) -> Result<(SstPuffinReader, bool)> {
546 match self.build_local_cache(file_id, file_size_hint).await {
547 Ok(Some(r)) => Ok((r, false)),
548 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
549 Err(err) => Err(err),
550 }
551 }
552
553 async fn build_local_cache(
554 &self,
555 file_id: RegionFileId,
556 file_size_hint: Option<u64>,
557 ) -> Result<Option<SstPuffinReader>> {
558 let Some(file_cache) = &self.file_cache else {
559 return Ok(None);
560 };
561
562 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
563 if file_cache.get(index_key).await.is_none() {
564 return Ok(None);
565 };
566
567 let puffin_manager = self
568 .puffin_manager_factory
569 .build(
570 file_cache.local_store(),
571 WriteCachePathProvider::new(file_cache.clone()),
572 )
573 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
574 let reader = puffin_manager
575 .reader(&file_id)
576 .await
577 .context(PuffinBuildReaderSnafu)?
578 .with_file_size_hint(file_size_hint);
579 Ok(Some(reader))
580 }
581
582 async fn build_remote(
583 &self,
584 file_id: RegionFileId,
585 file_size_hint: Option<u64>,
586 ) -> Result<SstPuffinReader> {
587 let puffin_manager = self
588 .puffin_manager_factory
589 .build(
590 self.remote_store.clone(),
591 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
592 )
593 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
594
595 let reader = puffin_manager
596 .reader(&file_id)
597 .await
598 .context(PuffinBuildReaderSnafu)?
599 .with_file_size_hint(file_size_hint);
600
601 Ok(reader)
602 }
603}