1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
26use index::fulltext_index::{Analyzer, Config};
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
30use snafu::ResultExt;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
35use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
36use crate::cache::index::bloom_filter_index::{
37 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
38};
39use crate::cache::index::result_cache::PredicateKey;
40use crate::error::{
41 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
42 PuffinReadBlobSnafu, Result,
43};
44use crate::metrics::INDEX_APPLY_ELAPSED;
45use crate::sst::file::RegionFileId;
46use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
47use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
48use crate::sst::index::puffin_manager::{
49 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
50};
51use crate::sst::index::TYPE_FULLTEXT_INDEX;
52
53pub mod builder;
54
55pub struct FulltextIndexApplier {
57 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
59
60 index_source: IndexSource,
62
63 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
65
66 predicate_key: PredicateKey,
68}
69
70pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
71
72impl FulltextIndexApplier {
73 pub fn new(
75 table_dir: String,
76 path_type: PathType,
77 store: ObjectStore,
78 requests: BTreeMap<ColumnId, FulltextRequest>,
79 puffin_manager_factory: PuffinManagerFactory,
80 ) -> Self {
81 let requests = Arc::new(requests);
82 let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
83
84 Self {
85 predicate_key: PredicateKey::new_fulltext(requests.clone()),
86 requests,
87 index_source,
88 bloom_filter_index_cache: None,
89 }
90 }
91
92 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
94 self.index_source.set_file_cache(file_cache);
95 self
96 }
97
98 pub fn with_puffin_metadata_cache(
100 mut self,
101 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
102 ) -> Self {
103 self.index_source
104 .set_puffin_metadata_cache(puffin_metadata_cache);
105 self
106 }
107
108 pub fn with_bloom_filter_cache(
110 mut self,
111 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
112 ) -> Self {
113 self.bloom_filter_index_cache = bloom_filter_index_cache;
114 self
115 }
116
117 pub fn predicate_key(&self) -> &PredicateKey {
119 &self.predicate_key
120 }
121}
122
123impl FulltextIndexApplier {
124 pub async fn apply_fine(
127 &self,
128 file_id: RegionFileId,
129 file_size_hint: Option<u64>,
130 ) -> Result<Option<BTreeSet<RowId>>> {
131 let timer = INDEX_APPLY_ELAPSED
132 .with_label_values(&[TYPE_FULLTEXT_INDEX])
133 .start_timer();
134
135 let mut row_ids: Option<BTreeSet<RowId>> = None;
136 for (column_id, request) in self.requests.iter() {
137 if request.queries.is_empty() && request.terms.is_empty() {
138 continue;
139 }
140
141 let Some(result) = self
142 .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
143 .await?
144 else {
145 continue;
146 };
147
148 if let Some(ids) = row_ids.as_mut() {
149 ids.retain(|id| result.contains(id));
150 } else {
151 row_ids = Some(result);
152 }
153
154 if let Some(ids) = row_ids.as_ref() {
155 if ids.is_empty() {
156 break;
157 }
158 }
159 }
160
161 if row_ids.is_none() {
162 timer.stop_and_discard();
163 }
164 Ok(row_ids)
165 }
166
167 async fn apply_fine_one_column(
168 &self,
169 file_size_hint: Option<u64>,
170 file_id: RegionFileId,
171 column_id: ColumnId,
172 request: &FulltextRequest,
173 ) -> Result<Option<BTreeSet<RowId>>> {
174 let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
175 let dir = self
176 .index_source
177 .dir(file_id, &blob_key, file_size_hint)
178 .await?;
179
180 let dir = match &dir {
181 Some(dir) => dir,
182 None => {
183 return Ok(None);
184 }
185 };
186
187 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
188 let path = dir.path();
189
190 let searcher =
191 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
192 let mut row_ids: Option<BTreeSet<RowId>> = None;
193
194 for query in &request.queries {
196 let result = searcher
197 .search(&query.0)
198 .await
199 .context(ApplyFulltextIndexSnafu)?;
200
201 if let Some(ids) = row_ids.as_mut() {
202 ids.retain(|id| result.contains(id));
203 } else {
204 row_ids = Some(result);
205 }
206
207 if let Some(ids) = row_ids.as_ref() {
208 if ids.is_empty() {
209 break;
210 }
211 }
212 }
213
214 let query = request.terms_as_query(config.case_sensitive);
216 if !query.0.is_empty() {
217 let result = searcher
218 .search(&query.0)
219 .await
220 .context(ApplyFulltextIndexSnafu)?;
221
222 if let Some(ids) = row_ids.as_mut() {
223 ids.retain(|id| result.contains(id));
224 } else {
225 row_ids = Some(result);
226 }
227 }
228
229 Ok(row_ids)
230 }
231}
232
233impl FulltextIndexApplier {
234 pub async fn apply_coarse(
240 &self,
241 file_id: RegionFileId,
242 file_size_hint: Option<u64>,
243 row_groups: impl Iterator<Item = (usize, bool)>,
244 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
245 let timer = INDEX_APPLY_ELAPSED
246 .with_label_values(&[TYPE_FULLTEXT_INDEX])
247 .start_timer();
248
249 let (input, mut output) = Self::init_coarse_output(row_groups);
250 let mut applied = false;
251
252 for (column_id, request) in self.requests.iter() {
253 if request.terms.is_empty() {
254 continue;
256 }
257
258 applied |= self
259 .apply_coarse_one_column(
260 file_id,
261 file_size_hint,
262 *column_id,
263 &request.terms,
264 &mut output,
265 )
266 .await?;
267 }
268
269 if !applied {
270 timer.stop_and_discard();
271 return Ok(None);
272 }
273
274 Self::adjust_coarse_output(input, &mut output);
275 Ok(Some(output))
276 }
277
278 async fn apply_coarse_one_column(
279 &self,
280 file_id: RegionFileId,
281 file_size_hint: Option<u64>,
282 column_id: ColumnId,
283 terms: &[FulltextTerm],
284 output: &mut [(usize, Vec<Range<usize>>)],
285 ) -> Result<bool> {
286 let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
287 let Some(reader) = self
288 .index_source
289 .blob(file_id, &blob_key, file_size_hint)
290 .await?
291 else {
292 return Ok(false);
293 };
294 let config =
295 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
296
297 let predicates = Self::terms_to_predicates(terms, &config);
298 if predicates.is_empty() {
299 return Ok(false);
300 }
301
302 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
303 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
304 let blob_size = range_reader
305 .metadata()
306 .await
307 .context(MetadataSnafu)?
308 .content_length;
309 let reader = CachedBloomFilterIndexBlobReader::new(
310 file_id.file_id(),
311 column_id,
312 Tag::Fulltext,
313 blob_size,
314 BloomFilterReaderImpl::new(range_reader),
315 bloom_filter_cache.clone(),
316 );
317 Box::new(reader) as _
318 } else {
319 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
320 };
321
322 let mut applier = BloomFilterApplier::new(reader)
323 .await
324 .context(ApplyBloomFilterIndexSnafu)?;
325 for (_, row_group_output) in output.iter_mut() {
326 if row_group_output.is_empty() {
328 continue;
329 }
330
331 *row_group_output = applier
332 .search(&predicates, row_group_output)
333 .await
334 .context(ApplyBloomFilterIndexSnafu)?;
335 }
336
337 Ok(true)
338 }
339
340 #[allow(clippy::type_complexity)]
348 fn init_coarse_output(
349 row_groups: impl Iterator<Item = (usize, bool)>,
350 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
351 let mut input = Vec::with_capacity(row_groups.size_hint().0);
353 let mut start = 0;
354 for (i, (len, to_search)) in row_groups.enumerate() {
355 let end = start + len;
356 if to_search {
357 input.push((i, start..end));
358 }
359 start = end;
360 }
361
362 let output = input
365 .iter()
366 .map(|(i, range)| (*i, vec![range.clone()]))
367 .collect::<Vec<_>>();
368
369 (input, output)
370 }
371
372 fn adjust_coarse_output(
374 input: Vec<(usize, Range<usize>)>,
375 output: &mut [(usize, Vec<Range<usize>>)],
376 ) {
377 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
379 let start = input.start;
380 for range in output.iter_mut() {
381 range.start -= start;
382 range.end -= start;
383 }
384 }
385 }
386
387 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
392 let mut probes = HashSet::new();
393 for term in terms {
394 if config.case_sensitive && term.col_lowered {
395 continue;
397 }
398 probes.extend(Self::term_to_probes(&term.term, config));
399 }
400
401 probes
402 .into_iter()
403 .map(|p| InListPredicate {
404 list: iter::once(p).collect(),
405 })
406 .collect::<Vec<_>>()
407 }
408
409 fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
410 let tokens = match config.analyzer {
411 Analyzer::English => EnglishTokenizer {}.tokenize(term),
412 Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
413 };
414
415 tokens.into_iter().map(|t| {
416 if !config.case_sensitive {
417 t.to_lowercase()
418 } else {
419 t.to_string()
420 }
421 .into_bytes()
422 })
423 }
424}
425
426struct IndexSource {
428 table_dir: String,
429
430 path_type: PathType,
432
433 puffin_manager_factory: PuffinManagerFactory,
435
436 remote_store: ObjectStore,
438
439 file_cache: Option<FileCacheRef>,
441
442 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
444}
445
446impl IndexSource {
447 fn new(
448 table_dir: String,
449 path_type: PathType,
450 puffin_manager_factory: PuffinManagerFactory,
451 remote_store: ObjectStore,
452 ) -> Self {
453 Self {
454 table_dir,
455 path_type,
456 puffin_manager_factory,
457 remote_store,
458 file_cache: None,
459 puffin_metadata_cache: None,
460 }
461 }
462
463 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
464 self.file_cache = file_cache;
465 }
466
467 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
468 self.puffin_metadata_cache = puffin_metadata_cache;
469 }
470
471 async fn blob(
475 &self,
476 file_id: RegionFileId,
477 key: &str,
478 file_size_hint: Option<u64>,
479 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
480 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
481 let res = reader.blob(key).await;
482 match res {
483 Ok(blob) => Ok(Some(blob)),
484 Err(err) if err.is_blob_not_found() => Ok(None),
485 Err(err) => {
486 if fallbacked {
487 Err(err).context(PuffinReadBlobSnafu)
488 } else {
489 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
490 let reader = self.build_remote(file_id, file_size_hint).await?;
491 let res = reader.blob(key).await;
492 match res {
493 Ok(blob) => Ok(Some(blob)),
494 Err(err) if err.is_blob_not_found() => Ok(None),
495 Err(err) => Err(err).context(PuffinReadBlobSnafu),
496 }
497 }
498 }
499 }
500 }
501
502 async fn dir(
506 &self,
507 file_id: RegionFileId,
508 key: &str,
509 file_size_hint: Option<u64>,
510 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
511 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
512 let res = reader.dir(key).await;
513 match res {
514 Ok(dir) => Ok(Some(dir)),
515 Err(err) if err.is_blob_not_found() => Ok(None),
516 Err(err) => {
517 if fallbacked {
518 Err(err).context(PuffinReadBlobSnafu)
519 } else {
520 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
521 let reader = self.build_remote(file_id, file_size_hint).await?;
522 let res = reader.dir(key).await;
523 match res {
524 Ok(dir) => Ok(Some(dir)),
525 Err(err) if err.is_blob_not_found() => Ok(None),
526 Err(err) => Err(err).context(PuffinReadBlobSnafu),
527 }
528 }
529 }
530 }
531 }
532
533 async fn ensure_reader(
535 &self,
536 file_id: RegionFileId,
537 file_size_hint: Option<u64>,
538 ) -> Result<(SstPuffinReader, bool)> {
539 match self.build_local_cache(file_id, file_size_hint).await {
540 Ok(Some(r)) => Ok((r, false)),
541 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
542 Err(err) => Err(err),
543 }
544 }
545
546 async fn build_local_cache(
547 &self,
548 file_id: RegionFileId,
549 file_size_hint: Option<u64>,
550 ) -> Result<Option<SstPuffinReader>> {
551 let Some(file_cache) = &self.file_cache else {
552 return Ok(None);
553 };
554
555 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
556 if file_cache.get(index_key).await.is_none() {
557 return Ok(None);
558 };
559
560 let puffin_manager = self
561 .puffin_manager_factory
562 .build(
563 file_cache.local_store(),
564 WriteCachePathProvider::new(file_cache.clone()),
565 )
566 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
567 let reader = puffin_manager
568 .reader(&file_id)
569 .await
570 .context(PuffinBuildReaderSnafu)?
571 .with_file_size_hint(file_size_hint);
572 Ok(Some(reader))
573 }
574
575 async fn build_remote(
576 &self,
577 file_id: RegionFileId,
578 file_size_hint: Option<u64>,
579 ) -> Result<SstPuffinReader> {
580 let puffin_manager = self
581 .puffin_manager_factory
582 .build(
583 self.remote_store.clone(),
584 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
585 )
586 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
587
588 let reader = puffin_manager
589 .reader(&file_id)
590 .await
591 .context(PuffinBuildReaderSnafu)?
592 .with_file_size_hint(file_size_hint);
593
594 Ok(reader)
595 }
596}