1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
25use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
26use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
27use index::fulltext_index::{Analyzer, Config};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44 PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::TYPE_FULLTEXT_INDEX;
49use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
50use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
51use crate::sst::index::puffin_manager::{
52 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
53};
54
55pub mod builder;
56
57#[derive(Default, Clone)]
59pub struct FulltextIndexApplyMetrics {
60 pub apply_elapsed: std::time::Duration,
62 pub blob_cache_miss: usize,
64 pub dir_cache_hit: usize,
66 pub dir_cache_miss: usize,
68 pub dir_init_elapsed: std::time::Duration,
70 pub bloom_filter_read_metrics: BloomFilterReadMetrics,
72}
73
74impl std::fmt::Debug for FulltextIndexApplyMetrics {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 let Self {
77 apply_elapsed,
78 blob_cache_miss,
79 dir_cache_hit,
80 dir_cache_miss,
81 dir_init_elapsed,
82 bloom_filter_read_metrics,
83 } = self;
84
85 if self.is_empty() {
86 return write!(f, "{{}}");
87 }
88 write!(f, "{{")?;
89
90 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
91
92 if *blob_cache_miss > 0 {
93 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
94 }
95 if *dir_cache_hit > 0 {
96 write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?;
97 }
98 if *dir_cache_miss > 0 {
99 write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?;
100 }
101 if !dir_init_elapsed.is_zero() {
102 write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?;
103 }
104 write!(
105 f,
106 ", \"bloom_filter_read_metrics\":{:?}",
107 bloom_filter_read_metrics
108 )?;
109
110 write!(f, "}}")
111 }
112}
113
114impl FulltextIndexApplyMetrics {
115 pub fn is_empty(&self) -> bool {
117 self.apply_elapsed.is_zero()
118 }
119
120 pub fn collect_dir_metrics(
122 &mut self,
123 elapsed: std::time::Duration,
124 dir_metrics: puffin::puffin_manager::DirMetrics,
125 ) {
126 self.dir_init_elapsed += elapsed;
127 if dir_metrics.cache_hit {
128 self.dir_cache_hit += 1;
129 } else {
130 self.dir_cache_miss += 1;
131 }
132 }
133
134 pub fn merge_from(&mut self, other: &Self) {
136 self.apply_elapsed += other.apply_elapsed;
137 self.blob_cache_miss += other.blob_cache_miss;
138 self.dir_cache_hit += other.dir_cache_hit;
139 self.dir_cache_miss += other.dir_cache_miss;
140 self.dir_init_elapsed += other.dir_init_elapsed;
141 self.bloom_filter_read_metrics
142 .merge_from(&other.bloom_filter_read_metrics);
143 }
144}
145
146pub struct FulltextIndexApplier {
148 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
150
151 index_source: IndexSource,
153
154 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
156
157 predicate_key: PredicateKey,
159}
160
161pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
162
163impl FulltextIndexApplier {
164 pub fn new(
166 table_dir: String,
167 path_type: PathType,
168 store: ObjectStore,
169 requests: BTreeMap<ColumnId, FulltextRequest>,
170 puffin_manager_factory: PuffinManagerFactory,
171 ) -> Self {
172 let requests = Arc::new(requests);
173 let index_source = IndexSource::new(table_dir, path_type, puffin_manager_factory, store);
174
175 Self {
176 predicate_key: PredicateKey::new_fulltext(requests.clone()),
177 requests,
178 index_source,
179 bloom_filter_index_cache: None,
180 }
181 }
182
183 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
185 self.index_source.set_file_cache(file_cache);
186 self
187 }
188
189 pub fn with_puffin_metadata_cache(
191 mut self,
192 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
193 ) -> Self {
194 self.index_source
195 .set_puffin_metadata_cache(puffin_metadata_cache);
196 self
197 }
198
199 pub fn with_bloom_filter_cache(
201 mut self,
202 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
203 ) -> Self {
204 self.bloom_filter_index_cache = bloom_filter_index_cache;
205 self
206 }
207
208 pub fn predicate_key(&self) -> &PredicateKey {
210 &self.predicate_key
211 }
212}
213
214impl FulltextIndexApplier {
215 pub async fn apply_fine(
223 &self,
224 file_id: RegionIndexId,
225 file_size_hint: Option<u64>,
226 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
227 ) -> Result<Option<BTreeSet<RowId>>> {
228 let apply_start = Instant::now();
229
230 let mut row_ids: Option<BTreeSet<RowId>> = None;
231 for (column_id, request) in self.requests.iter() {
232 if request.queries.is_empty() && request.terms.is_empty() {
233 continue;
234 }
235
236 let Some(result) = self
237 .apply_fine_one_column(
238 file_size_hint,
239 file_id,
240 *column_id,
241 request,
242 metrics.as_deref_mut(),
243 )
244 .await?
245 else {
246 continue;
247 };
248
249 if let Some(ids) = row_ids.as_mut() {
250 ids.retain(|id| result.contains(id));
251 } else {
252 row_ids = Some(result);
253 }
254
255 if let Some(ids) = row_ids.as_ref()
256 && ids.is_empty()
257 {
258 break;
259 }
260 }
261
262 let elapsed = apply_start.elapsed();
264 INDEX_APPLY_ELAPSED
265 .with_label_values(&[TYPE_FULLTEXT_INDEX])
266 .observe(elapsed.as_secs_f64());
267
268 if let Some(m) = metrics {
269 m.apply_elapsed += elapsed;
270 }
271
272 Ok(row_ids)
273 }
274
275 async fn apply_fine_one_column(
276 &self,
277 file_size_hint: Option<u64>,
278 file_id: RegionIndexId,
279 column_id: ColumnId,
280 request: &FulltextRequest,
281 metrics: Option<&mut FulltextIndexApplyMetrics>,
282 ) -> Result<Option<BTreeSet<RowId>>> {
283 let blob_key = format!(
284 "{INDEX_BLOB_TYPE_TANTIVY}-{}",
285 IndexTarget::ColumnId(column_id)
286 );
287 let dir = self
288 .index_source
289 .dir(file_id, &blob_key, file_size_hint, metrics)
290 .await?;
291
292 let dir = match &dir {
293 Some(dir) => dir,
294 None => {
295 return Ok(None);
296 }
297 };
298
299 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
300 let path = dir.path();
301
302 let searcher =
303 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
304 let mut row_ids: Option<BTreeSet<RowId>> = None;
305
306 for query in &request.queries {
308 let result = searcher
309 .search(&query.0)
310 .await
311 .context(ApplyFulltextIndexSnafu)?;
312
313 if let Some(ids) = row_ids.as_mut() {
314 ids.retain(|id| result.contains(id));
315 } else {
316 row_ids = Some(result);
317 }
318
319 if let Some(ids) = row_ids.as_ref()
320 && ids.is_empty()
321 {
322 break;
323 }
324 }
325
326 let query = request.terms_as_query(config.case_sensitive);
328 if !query.0.is_empty() {
329 let result = searcher
330 .search(&query.0)
331 .await
332 .context(ApplyFulltextIndexSnafu)?;
333
334 if let Some(ids) = row_ids.as_mut() {
335 ids.retain(|id| result.contains(id));
336 } else {
337 row_ids = Some(result);
338 }
339 }
340
341 Ok(row_ids)
342 }
343}
344
345impl FulltextIndexApplier {
346 pub async fn apply_coarse(
358 &self,
359 file_id: RegionIndexId,
360 file_size_hint: Option<u64>,
361 row_groups: impl Iterator<Item = (usize, bool)>,
362 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
363 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
364 let apply_start = Instant::now();
365
366 let (input, mut output) = Self::init_coarse_output(row_groups);
367 let mut applied = false;
368
369 for (column_id, request) in self.requests.iter() {
370 if request.terms.is_empty() {
371 continue;
373 }
374
375 applied |= self
376 .apply_coarse_one_column(
377 file_id,
378 file_size_hint,
379 *column_id,
380 &request.terms,
381 &mut output,
382 metrics.as_deref_mut(),
383 )
384 .await?;
385 }
386
387 if !applied {
388 return Ok(None);
389 }
390
391 Self::adjust_coarse_output(input, &mut output);
392
393 let elapsed = apply_start.elapsed();
395 INDEX_APPLY_ELAPSED
396 .with_label_values(&[TYPE_FULLTEXT_INDEX])
397 .observe(elapsed.as_secs_f64());
398
399 if let Some(m) = metrics {
400 m.apply_elapsed += elapsed;
401 }
402
403 Ok(Some(output))
404 }
405
406 async fn apply_coarse_one_column(
407 &self,
408 file_id: RegionIndexId,
409 file_size_hint: Option<u64>,
410 column_id: ColumnId,
411 terms: &[FulltextTerm],
412 output: &mut [(usize, Vec<Range<usize>>)],
413 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
414 ) -> Result<bool> {
415 let blob_key = format!(
416 "{INDEX_BLOB_TYPE_BLOOM}-{}",
417 IndexTarget::ColumnId(column_id)
418 );
419 let Some(reader) = self
420 .index_source
421 .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
422 .await?
423 else {
424 return Ok(false);
425 };
426 let config =
427 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
428
429 let predicates = Self::terms_to_predicates(terms, &config);
430 if predicates.is_empty() {
431 return Ok(false);
432 }
433
434 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
435 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
436 let blob_size = range_reader
437 .metadata()
438 .await
439 .context(MetadataSnafu)?
440 .content_length;
441 let reader = CachedBloomFilterIndexBlobReader::new(
442 file_id.file_id(),
443 file_id.version,
444 column_id,
445 Tag::Fulltext,
446 blob_size,
447 BloomFilterReaderImpl::new(range_reader),
448 bloom_filter_cache.clone(),
449 );
450 Box::new(reader) as _
451 } else {
452 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
453 };
454
455 let mut applier = BloomFilterApplier::new(reader)
456 .await
457 .context(ApplyBloomFilterIndexSnafu)?;
458 for (_, row_group_output) in output.iter_mut() {
459 if row_group_output.is_empty() {
461 continue;
462 }
463
464 *row_group_output = applier
465 .search(
466 &predicates,
467 row_group_output,
468 metrics
469 .as_deref_mut()
470 .map(|m| &mut m.bloom_filter_read_metrics),
471 )
472 .await
473 .context(ApplyBloomFilterIndexSnafu)?;
474 }
475
476 Ok(true)
477 }
478
479 #[allow(clippy::type_complexity)]
487 fn init_coarse_output(
488 row_groups: impl Iterator<Item = (usize, bool)>,
489 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
490 let mut input = Vec::with_capacity(row_groups.size_hint().0);
492 let mut start = 0;
493 for (i, (len, to_search)) in row_groups.enumerate() {
494 let end = start + len;
495 if to_search {
496 input.push((i, start..end));
497 }
498 start = end;
499 }
500
501 let output = input
504 .iter()
505 .map(|(i, range)| (*i, vec![range.clone()]))
506 .collect::<Vec<_>>();
507
508 (input, output)
509 }
510
511 fn adjust_coarse_output(
513 input: Vec<(usize, Range<usize>)>,
514 output: &mut [(usize, Vec<Range<usize>>)],
515 ) {
516 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
518 let start = input.start;
519 for range in output.iter_mut() {
520 range.start -= start;
521 range.end -= start;
522 }
523 }
524 }
525
526 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
531 let mut probes = HashSet::new();
532 for term in terms {
533 if config.case_sensitive && term.col_lowered {
534 continue;
536 }
537 probes.extend(Self::term_to_probes(&term.term, config));
538 }
539
540 probes
541 .into_iter()
542 .map(|p| InListPredicate {
543 list: iter::once(p).collect(),
544 })
545 .collect::<Vec<_>>()
546 }
547
548 fn term_to_probes<'a>(term: &'a str, config: &'a Config) -> impl Iterator<Item = Vec<u8>> + 'a {
549 let tokens = match config.analyzer {
550 Analyzer::English => EnglishTokenizer {}.tokenize(term),
551 Analyzer::Chinese => ChineseTokenizer {}.tokenize(term),
552 };
553
554 tokens.into_iter().map(|t| {
555 if !config.case_sensitive {
556 t.to_lowercase()
557 } else {
558 t.to_string()
559 }
560 .into_bytes()
561 })
562 }
563}
564
565struct IndexSource {
567 table_dir: String,
568
569 path_type: PathType,
571
572 puffin_manager_factory: PuffinManagerFactory,
574
575 remote_store: ObjectStore,
577
578 file_cache: Option<FileCacheRef>,
580
581 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
583}
584
585impl IndexSource {
586 fn new(
587 table_dir: String,
588 path_type: PathType,
589 puffin_manager_factory: PuffinManagerFactory,
590 remote_store: ObjectStore,
591 ) -> Self {
592 Self {
593 table_dir,
594 path_type,
595 puffin_manager_factory,
596 remote_store,
597 file_cache: None,
598 puffin_metadata_cache: None,
599 }
600 }
601
602 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
603 self.file_cache = file_cache;
604 }
605
606 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
607 self.puffin_metadata_cache = puffin_metadata_cache;
608 }
609
610 async fn blob(
614 &self,
615 file_id: RegionIndexId,
616 key: &str,
617 file_size_hint: Option<u64>,
618 metrics: Option<&mut FulltextIndexApplyMetrics>,
619 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
620 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
621
622 if fallbacked && let Some(m) = metrics {
624 m.blob_cache_miss += 1;
625 }
626
627 let res = reader.blob(key).await;
628 match res {
629 Ok(blob) => Ok(Some(blob)),
630 Err(err) if err.is_blob_not_found() => Ok(None),
631 Err(err) => {
632 if fallbacked {
633 Err(err).context(PuffinReadBlobSnafu)
634 } else {
635 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
636 let reader = self.build_remote(file_id, file_size_hint).await?;
637 let res = reader.blob(key).await;
638 match res {
639 Ok(blob) => Ok(Some(blob)),
640 Err(err) if err.is_blob_not_found() => Ok(None),
641 Err(err) => Err(err).context(PuffinReadBlobSnafu),
642 }
643 }
644 }
645 }
646 }
647
648 async fn dir(
652 &self,
653 file_id: RegionIndexId,
654 key: &str,
655 file_size_hint: Option<u64>,
656 mut metrics: Option<&mut FulltextIndexApplyMetrics>,
657 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
658 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
659
660 if fallbacked && let Some(m) = &mut metrics {
662 m.blob_cache_miss += 1;
663 }
664
665 let start = metrics.as_ref().map(|_| Instant::now());
666 let res = reader.dir(key).await;
667 match res {
668 Ok((dir, dir_metrics)) => {
669 if let Some(m) = metrics {
670 m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
672 }
673 Ok(Some(dir))
674 }
675 Err(err) if err.is_blob_not_found() => Ok(None),
676 Err(err) => {
677 if fallbacked {
678 Err(err).context(PuffinReadBlobSnafu)
679 } else {
680 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
681 let reader = self.build_remote(file_id, file_size_hint).await?;
682 let start = metrics.as_ref().map(|_| Instant::now());
683 let res = reader.dir(key).await;
684 match res {
685 Ok((dir, dir_metrics)) => {
686 if let Some(m) = metrics {
687 m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
689 }
690 Ok(Some(dir))
691 }
692 Err(err) if err.is_blob_not_found() => Ok(None),
693 Err(err) => Err(err).context(PuffinReadBlobSnafu),
694 }
695 }
696 }
697 }
698 }
699
700 async fn ensure_reader(
702 &self,
703 file_id: RegionIndexId,
704 file_size_hint: Option<u64>,
705 ) -> Result<(SstPuffinReader, bool)> {
706 match self.build_local_cache(file_id, file_size_hint).await {
707 Ok(Some(r)) => Ok((r, false)),
708 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
709 Err(err) => Err(err),
710 }
711 }
712
713 async fn build_local_cache(
714 &self,
715 file_id: RegionIndexId,
716 file_size_hint: Option<u64>,
717 ) -> Result<Option<SstPuffinReader>> {
718 let Some(file_cache) = &self.file_cache else {
719 return Ok(None);
720 };
721
722 let index_key = IndexKey::new(
723 file_id.region_id(),
724 file_id.file_id(),
725 FileType::Puffin(file_id.version),
726 );
727 if file_cache.get(index_key).await.is_none() {
728 return Ok(None);
729 };
730
731 let puffin_manager = self
732 .puffin_manager_factory
733 .build(
734 file_cache.local_store(),
735 WriteCachePathProvider::new(file_cache.clone()),
736 )
737 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
738 let reader = puffin_manager
739 .reader(&file_id)
740 .await
741 .context(PuffinBuildReaderSnafu)?
742 .with_file_size_hint(file_size_hint);
743 Ok(Some(reader))
744 }
745
746 async fn build_remote(
747 &self,
748 file_id: RegionIndexId,
749 file_size_hint: Option<u64>,
750 ) -> Result<SstPuffinReader> {
751 let puffin_manager = self
752 .puffin_manager_factory
753 .build(
754 self.remote_store.clone(),
755 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
756 )
757 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
758
759 let reader = puffin_manager
760 .reader(&file_id)
761 .await
762 .context(PuffinBuildReaderSnafu)?
763 .with_file_size_hint(file_size_hint);
764
765 Ok(reader)
766 }
767}