1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::{tracing, warn};
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
25use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
26use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
27use index::fulltext_index::{Analyzer, Config};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44 PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
49use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
50use crate::sst::index::puffin_manager::{
51 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
52};
53use crate::sst::index::{TYPE_FULLTEXT_INDEX, trigger_index_background_download};
54
55pub mod builder;
56
57#[derive(Default, Clone)]
59pub struct FulltextIndexApplyMetrics {
60 pub apply_elapsed: std::time::Duration,
62 pub blob_cache_miss: usize,
64 pub dir_cache_hit: usize,
66 pub dir_cache_miss: usize,
68 pub dir_init_elapsed: std::time::Duration,
70 pub bloom_filter_read_metrics: BloomFilterReadMetrics,
72}
73
74impl std::fmt::Debug for FulltextIndexApplyMetrics {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 let Self {
77 apply_elapsed,
78 blob_cache_miss,
79 dir_cache_hit,
80 dir_cache_miss,
81 dir_init_elapsed,
82 bloom_filter_read_metrics,
83 } = self;
84
85 if self.is_empty() {
86 return write!(f, "{{}}");
87 }
88 write!(f, "{{")?;
89
90 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
91
92 if *blob_cache_miss > 0 {
93 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
94 }
95 if *dir_cache_hit > 0 {
96 write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?;
97 }
98 if *dir_cache_miss > 0 {
99 write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?;
100 }
101 if !dir_init_elapsed.is_zero() {
102 write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?;
103 }
104 write!(
105 f,
106 ", \"bloom_filter_read_metrics\":{:?}",
107 bloom_filter_read_metrics
108 )?;
109
110 write!(f, "}}")
111 }
112}
113
114impl FulltextIndexApplyMetrics {
115 pub fn is_empty(&self) -> bool {
117 self.apply_elapsed.is_zero()
118 }
119
120 pub fn collect_dir_metrics(
122 &mut self,
123 elapsed: std::time::Duration,
124 dir_metrics: puffin::puffin_manager::DirMetrics,
125 ) {
126 self.dir_init_elapsed += elapsed;
127 if dir_metrics.cache_hit {
128 self.dir_cache_hit += 1;
129 } else {
130 self.dir_cache_miss += 1;
131 }
132 }
133
134 pub fn merge_from(&mut self, other: &Self) {
136 self.apply_elapsed += other.apply_elapsed;
137 self.blob_cache_miss += other.blob_cache_miss;
138 self.dir_cache_hit += other.dir_cache_hit;
139 self.dir_cache_miss += other.dir_cache_miss;
140 self.dir_init_elapsed += other.dir_init_elapsed;
141 self.bloom_filter_read_metrics
142 .merge_from(&other.bloom_filter_read_metrics);
143 }
144}
145
146pub struct FulltextIndexApplier {
148 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
150
151 index_source: IndexSource,
153
154 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
156
157 predicate_key: PredicateKey,
159}
160
161pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
162
163impl FulltextIndexApplier {
164 pub fn new(
166 table_dir: String,
167 path_type: PathType,
168 store: ObjectStore,
169 requests: BTreeMap<ColumnId, FulltextRequest>,
170 puffin_manager_factory: PuffinManagerFactory,
171 ) -> Self {
172 let requests = Arc::new(requests);
173 let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
174
175 Self {
176 predicate_key: PredicateKey::new_fulltext(requests.clone()),
177 requests,
178 index_source,
179 bloom_filter_index_cache: None,
180 }
181 }
182
183 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
185 self.index_source.set_file_cache(file_cache);
186 self
187 }
188
189 pub fn with_puffin_metadata_cache(
191 mut self,
192 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
193 ) -> Self {
194 self.index_source
195 .set_puffin_metadata_cache(puffin_metadata_cache);
196 self
197 }
198
199 pub fn with_bloom_filter_cache(
201 mut self,
202 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
203 ) -> Self {
204 self.bloom_filter_index_cache = bloom_filter_index_cache;
205 self
206 }
207
208 pub fn predicate_key(&self) -> &PredicateKey {
210 &self.predicate_key
211 }
212}
213
214impl FulltextIndexApplier {
215 #[tracing::instrument(
223 skip_all,
224 fields(file_id = %file_id)
225 )]
226 pub async fn apply_fine(
227 &self,
228 file_id: RegionIndexId,
229 file_size_hint: Option<u64>,
230 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
231 ) -> Result<Option<BTreeSet<RowId>>> {
232 let apply_start = Instant::now();
233
234 let mut row_ids: Option<BTreeSet<RowId>> = None;
235 for (column_id, request) in self.requests.iter() {
236 if request.queries.is_empty() && request.terms.is_empty() {
237 continue;
238 }
239
240 let Some(result) = self
241 .apply_fine_one_column(
242 file_size_hint,
243 file_id,
244 *column_id,
245 request,
246 metrics.as_deref_mut(),
247 )
248 .await?
249 else {
250 continue;
251 };
252
253 if let Some(ids) = row_ids.as_mut() {
254 ids.retain(|id| result.contains(id));
255 } else {
256 row_ids = Some(result);
257 }
258
259 if let Some(ids) = row_ids.as_ref()
260 && ids.is_empty()
261 {
262 break;
263 }
264 }
265
266 let elapsed = apply_start.elapsed();
268 INDEX_APPLY_ELAPSED
269 .with_label_values(&[TYPE_FULLTEXT_INDEX])
270 .observe(elapsed.as_secs_f64());
271
272 if let Some(m) = metrics {
273 m.apply_elapsed += elapsed;
274 }
275
276 Ok(row_ids)
277 }
278
279 async fn apply_fine_one_column(
280 &self,
281 file_size_hint: Option<u64>,
282 file_id: RegionIndexId,
283 column_id: ColumnId,
284 request: &FulltextRequest,
285 metrics: Option<&mut FulltextIndexApplyMetrics>,
286 ) -> Result<Option<BTreeSet<RowId>>> {
287 let blob_key = format!(
288 "{INDEX_BLOB_TYPE_TANTIVY}-{}",
289 IndexTarget::ColumnId(column_id)
290 );
291 let dir = self
292 .index_source
293 .dir(file_id, &blob_key, file_size_hint, metrics)
294 .await?;
295
296 let dir = match &dir {
297 Some(dir) => dir,
298 None => {
299 return Ok(None);
300 }
301 };
302
303 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
304 let path = dir.path();
305
306 let searcher =
307 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
308 let mut row_ids: Option<BTreeSet<RowId>> = None;
309
310 for query in &request.queries {
312 let result = searcher
313 .search(&query.0)
314 .await
315 .context(ApplyFulltextIndexSnafu)?;
316
317 if let Some(ids) = row_ids.as_mut() {
318 ids.retain(|id| result.contains(id));
319 } else {
320 row_ids = Some(result);
321 }
322
323 if let Some(ids) = row_ids.as_ref()
324 && ids.is_empty()
325 {
326 break;
327 }
328 }
329
330 let query = request.terms_as_query(config.case_sensitive);
332 if !query.0.is_empty() {
333 let result = searcher
334 .search(&query.0)
335 .await
336 .context(ApplyFulltextIndexSnafu)?;
337
338 if let Some(ids) = row_ids.as_mut() {
339 ids.retain(|id| result.contains(id));
340 } else {
341 row_ids = Some(result);
342 }
343 }
344
345 Ok(row_ids)
346 }
347}
348
349impl FulltextIndexApplier {
350 #[allow(clippy::type_complexity)]
362 #[tracing::instrument(
363 skip_all,
364 fields(file_id = %file_id)
365 )]
366 pub async fn apply_coarse(
367 &self,
368 file_id: RegionIndexId,
369 file_size_hint: Option<u64>,
370 row_groups: impl Iterator<Item = (usize, bool)>,
371 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
372 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
373 let apply_start = Instant::now();
374
375 let (input, mut output) = Self::init_coarse_output(row_groups);
376 let mut applied = false;
377
378 for (column_id, request) in self.requests.iter() {
379 if request.terms.is_empty() {
380 continue;
382 }
383
384 applied |= self
385 .apply_coarse_one_column(
386 file_id,
387 file_size_hint,
388 *column_id,
389 &request.terms,
390 &mut output,
391 metrics.as_deref_mut(),
392 )
393 .await?;
394 }
395
396 if !applied {
397 return Ok(None);
398 }
399
400 Self::adjust_coarse_output(input, &mut output);
401
402 let elapsed = apply_start.elapsed();
404 INDEX_APPLY_ELAPSED
405 .with_label_values(&[TYPE_FULLTEXT_INDEX])
406 .observe(elapsed.as_secs_f64());
407
408 if let Some(m) = metrics {
409 m.apply_elapsed += elapsed;
410 }
411
412 Ok(Some(output))
413 }
414
415 async fn apply_coarse_one_column(
416 &self,
417 file_id: RegionIndexId,
418 file_size_hint: Option<u64>,
419 column_id: ColumnId,
420 terms: &[FulltextTerm],
421 output: &mut [(usize, Vec<Range<usize>>)],
422 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
423 ) -> Result<bool> {
424 let blob_key = format!(
425 "{INDEX_BLOB_TYPE_BLOOM}-{}",
426 IndexTarget::ColumnId(column_id)
427 );
428 let Some(reader) = self
429 .index_source
430 .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
431 .await?
432 else {
433 return Ok(false);
434 };
435 let config =
436 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
437
438 let predicates = Self::terms_to_predicates(terms, &config);
439 if predicates.is_empty() {
440 return Ok(false);
441 }
442
443 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
444 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
445 let blob_size = range_reader
446 .metadata()
447 .await
448 .context(MetadataSnafu)?
449 .content_length;
450 let reader = CachedBloomFilterIndexBlobReader::new(
451 file_id.file_id(),
452 file_id.version,
453 column_id,
454 Tag::Fulltext,
455 blob_size,
456 BloomFilterReaderImpl::new(range_reader),
457 bloom_filter_cache.clone(),
458 );
459 Box::new(reader) as _
460 } else {
461 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
462 };
463
464 let mut applier = BloomFilterApplier::new(reader)
465 .await
466 .context(ApplyBloomFilterIndexSnafu)?;
467 for (_, row_group_output) in output.iter_mut() {
468 if row_group_output.is_empty() {
470 continue;
471 }
472
473 *row_group_output = applier
474 .search(
475 &predicates,
476 row_group_output,
477 metrics
478 .as_deref_mut()
479 .map(|m| &mut m.bloom_filter_read_metrics),
480 )
481 .await
482 .context(ApplyBloomFilterIndexSnafu)?;
483 }
484
485 Ok(true)
486 }
487
488 #[allow(clippy::type_complexity)]
496 fn init_coarse_output(
497 row_groups: impl Iterator<Item = (usize, bool)>,
498 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
499 let mut input = Vec::with_capacity(row_groups.size_hint().0);
501 let mut start = 0;
502 for (i, (len, to_search)) in row_groups.enumerate() {
503 let end = start + len;
504 if to_search {
505 input.push((i, start..end));
506 }
507 start = end;
508 }
509
510 let output = input
513 .iter()
514 .map(|(i, range)| (*i, vec![range.clone()]))
515 .collect::<Vec<_>>();
516
517 (input, output)
518 }
519
520 fn adjust_coarse_output(
522 input: Vec<(usize, Range<usize>)>,
523 output: &mut [(usize, Vec<Range<usize>>)],
524 ) {
525 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
527 let start = input.start;
528 for range in output.iter_mut() {
529 range.start -= start;
530 range.end -= start;
531 }
532 }
533 }
534
535 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
540 let mut probes = HashSet::new();
541 for term in terms {
542 if config.case_sensitive && term.col_lowered {
543 continue;
545 }
546 probes.extend(Self::term_to_probes(&term.term, config));
547 }
548
549 probes
550 .into_iter()
551 .map(|p| InListPredicate {
552 list: iter::once(p).collect(),
553 })
554 .collect::<Vec<_>>()
555 }
556
557 fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
558 let tokens = match config.analyzer {
559 Analyzer::English => EnglishTokenizer {}.tokenize(term),
560 Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
561 };
562
563 tokens.into_iter().map(|t| {
564 if !config.case_sensitive {
565 t.to_lowercase()
566 } else {
567 t.to_string()
568 }
569 .into_bytes()
570 })
571 }
572}
573
574struct IndexSource {
576 table_dir: String,
577
578 path_type: PathType,
580
581 puffin_manager_factory: PuffinManagerFactory,
583
584 remote_store: ObjectStore,
586
587 file_cache: Option<FileCacheRef>,
589
590 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
592}
593
594impl IndexSource {
595 fn new(
596 table_dir: String,
597 path_type: PathType,
598 puffin_manager_factory: PuffinManagerFactory,
599 remote_store: ObjectStore,
600 ) -> Self {
601 Self {
602 table_dir,
603 path_type,
604 puffin_manager_factory,
605 remote_store,
606 file_cache: None,
607 puffin_metadata_cache: None,
608 }
609 }
610
611 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
612 self.file_cache = file_cache;
613 }
614
615 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
616 self.puffin_metadata_cache = puffin_metadata_cache;
617 }
618
619 async fn blob(
623 &self,
624 file_id: RegionIndexId,
625 key: &str,
626 file_size_hint: Option<u64>,
627 metrics: Option<&mut FulltextIndexApplyMetrics>,
628 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
629 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
630
631 if fallbacked && let Some(m) = metrics {
633 m.blob_cache_miss += 1;
634 }
635
636 let res = reader.blob(key).await;
637 match res {
638 Ok(blob) => Ok(Some(blob)),
639 Err(err) if err.is_blob_not_found() => Ok(None),
640 Err(err) => {
641 if fallbacked {
642 Err(err).context(PuffinReadBlobSnafu)
643 } else {
644 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
645 let reader = self.build_remote(file_id, file_size_hint).await?;
646 let res = reader.blob(key).await;
647 match res {
648 Ok(blob) => Ok(Some(blob)),
649 Err(err) if err.is_blob_not_found() => Ok(None),
650 Err(err) => Err(err).context(PuffinReadBlobSnafu),
651 }
652 }
653 }
654 }
655 }
656
657 async fn dir(
661 &self,
662 file_id: RegionIndexId,
663 key: &str,
664 file_size_hint: Option<u64>,
665 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
666 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
667 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
668
669 if fallbacked && let Some(m) = &mut metrics {
671 m.blob_cache_miss += 1;
672 }
673
674 let start = metrics.as_ref().map(|_| Instant::now());
675 let res = reader.dir(key).await;
676 match res {
677 Ok((dir, dir_metrics)) => {
678 if let Some(m) = metrics {
679 m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
681 }
682 Ok(Some(dir))
683 }
684 Err(err) if err.is_blob_not_found() => Ok(None),
685 Err(err) => {
686 if fallbacked {
687 Err(err).context(PuffinReadBlobSnafu)
688 } else {
689 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
690 let reader = self.build_remote(file_id, file_size_hint).await?;
691 let start = metrics.as_ref().map(|_| Instant::now());
692 let res = reader.dir(key).await;
693 match res {
694 Ok((dir, dir_metrics)) => {
695 if let Some(m) = metrics {
696 m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
698 }
699 Ok(Some(dir))
700 }
701 Err(err) if err.is_blob_not_found() => Ok(None),
702 Err(err) => Err(err).context(PuffinReadBlobSnafu),
703 }
704 }
705 }
706 }
707 }
708
709 async fn ensure_reader(
711 &self,
712 file_id: RegionIndexId,
713 file_size_hint: Option<u64>,
714 ) -> Result<(SstPuffinReader, bool)> {
715 match self.build_local_cache(file_id, file_size_hint).await {
716 Ok(Some(r)) => Ok((r, false)),
717 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
718 Err(err) => Err(err),
719 }
720 }
721
722 async fn build_local_cache(
723 &self,
724 file_id: RegionIndexId,
725 file_size_hint: Option<u64>,
726 ) -> Result<Option<SstPuffinReader>> {
727 let Some(file_cache) = &self.file_cache else {
728 return Ok(None);
729 };
730
731 let index_key = IndexKey::new(
732 file_id.region_id(),
733 file_id.file_id(),
734 FileType::Puffin(file_id.version),
735 );
736 if file_cache.get(index_key).await.is_none() {
737 return Ok(None);
738 };
739
740 let puffin_manager = self
741 .puffin_manager_factory
742 .build(
743 file_cache.local_store(),
744 WriteCachePathProvider::new(file_cache.clone()),
745 )
746 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
747 let reader = puffin_manager
748 .reader(&file_id)
749 .await
750 .context(PuffinBuildReaderSnafu)?
751 .with_file_size_hint(file_size_hint);
752 Ok(Some(reader))
753 }
754
755 async fn build_remote(
756 &self,
757 file_id: RegionIndexId,
758 file_size_hint: Option<u64>,
759 ) -> Result<SstPuffinReader> {
760 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
761
762 trigger_index_background_download(
764 self.file_cache.as_ref(),
765 &file_id,
766 file_size_hint,
767 &path_factory,
768 &self.remote_store,
769 );
770
771 let puffin_manager = self
772 .puffin_manager_factory
773 .build(self.remote_store.clone(), path_factory)
774 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
775
776 let reader = puffin_manager
777 .reader(&file_id)
778 .await
779 .context(PuffinBuildReaderSnafu)?
780 .with_file_size_hint(file_size_hint);
781
782 Ok(reader)
783 }
784}