1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
40 PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::FileId;
44use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
45use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
46use crate::sst::index::puffin_manager::{
47 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
48};
49use crate::sst::index::TYPE_FULLTEXT_INDEX;
50
51pub mod builder;
52
53pub struct FulltextIndexApplier {
55 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
57
58 index_source: IndexSource,
60
61 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
63
64 predicate_key: PredicateKey,
66}
67
68pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
69
70impl FulltextIndexApplier {
71 pub fn new(
73 region_dir: String,
74 region_id: RegionId,
75 store: ObjectStore,
76 requests: BTreeMap<ColumnId, FulltextRequest>,
77 puffin_manager_factory: PuffinManagerFactory,
78 ) -> Self {
79 let requests = Arc::new(requests);
80 let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
81
82 Self {
83 predicate_key: PredicateKey::new_fulltext(requests.clone()),
84 requests,
85 index_source,
86 bloom_filter_index_cache: None,
87 }
88 }
89
90 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
92 self.index_source.set_file_cache(file_cache);
93 self
94 }
95
96 pub fn with_puffin_metadata_cache(
98 mut self,
99 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
100 ) -> Self {
101 self.index_source
102 .set_puffin_metadata_cache(puffin_metadata_cache);
103 self
104 }
105
106 pub fn with_bloom_filter_cache(
108 mut self,
109 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
110 ) -> Self {
111 self.bloom_filter_index_cache = bloom_filter_index_cache;
112 self
113 }
114
115 pub fn predicate_key(&self) -> &PredicateKey {
117 &self.predicate_key
118 }
119}
120
121impl FulltextIndexApplier {
122 pub async fn apply_fine(
125 &self,
126 file_id: FileId,
127 file_size_hint: Option<u64>,
128 ) -> Result<Option<BTreeSet<RowId>>> {
129 let timer = INDEX_APPLY_ELAPSED
130 .with_label_values(&[TYPE_FULLTEXT_INDEX])
131 .start_timer();
132
133 let mut row_ids: Option<BTreeSet<RowId>> = None;
134 for (column_id, request) in self.requests.iter() {
135 if request.queries.is_empty() && request.terms.is_empty() {
136 continue;
137 }
138
139 let Some(result) = self
140 .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
141 .await?
142 else {
143 continue;
144 };
145
146 if let Some(ids) = row_ids.as_mut() {
147 ids.retain(|id| result.contains(id));
148 } else {
149 row_ids = Some(result);
150 }
151
152 if let Some(ids) = row_ids.as_ref() {
153 if ids.is_empty() {
154 break;
155 }
156 }
157 }
158
159 if row_ids.is_none() {
160 timer.stop_and_discard();
161 }
162 Ok(row_ids)
163 }
164
165 async fn apply_fine_one_column(
166 &self,
167 file_size_hint: Option<u64>,
168 file_id: FileId,
169 column_id: ColumnId,
170 request: &FulltextRequest,
171 ) -> Result<Option<BTreeSet<RowId>>> {
172 let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
173 let dir = self
174 .index_source
175 .dir(file_id, &blob_key, file_size_hint)
176 .await?;
177
178 let dir = match &dir {
179 Some(dir) => dir,
180 None => {
181 return Ok(None);
182 }
183 };
184
185 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
186 let path = dir.path();
187
188 let searcher =
189 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
190 let mut row_ids: Option<BTreeSet<RowId>> = None;
191
192 for query in &request.queries {
194 let result = searcher
195 .search(&query.0)
196 .await
197 .context(ApplyFulltextIndexSnafu)?;
198
199 if let Some(ids) = row_ids.as_mut() {
200 ids.retain(|id| result.contains(id));
201 } else {
202 row_ids = Some(result);
203 }
204
205 if let Some(ids) = row_ids.as_ref() {
206 if ids.is_empty() {
207 break;
208 }
209 }
210 }
211
212 let query = request.terms_as_query(config.case_sensitive);
214 if !query.0.is_empty() {
215 let result = searcher
216 .search(&query.0)
217 .await
218 .context(ApplyFulltextIndexSnafu)?;
219
220 if let Some(ids) = row_ids.as_mut() {
221 ids.retain(|id| result.contains(id));
222 } else {
223 row_ids = Some(result);
224 }
225 }
226
227 Ok(row_ids)
228 }
229}
230
231impl FulltextIndexApplier {
232 pub async fn apply_coarse(
238 &self,
239 file_id: FileId,
240 file_size_hint: Option<u64>,
241 row_groups: impl Iterator<Item = (usize, bool)>,
242 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
243 let timer = INDEX_APPLY_ELAPSED
244 .with_label_values(&[TYPE_FULLTEXT_INDEX])
245 .start_timer();
246
247 let (input, mut output) = Self::init_coarse_output(row_groups);
248 let mut applied = false;
249
250 for (column_id, request) in self.requests.iter() {
251 if request.terms.is_empty() {
252 continue;
254 }
255
256 applied |= self
257 .apply_coarse_one_column(
258 file_id,
259 file_size_hint,
260 *column_id,
261 &request.terms,
262 &mut output,
263 )
264 .await?;
265 }
266
267 if !applied {
268 timer.stop_and_discard();
269 return Ok(None);
270 }
271
272 Self::adjust_coarse_output(input, &mut output);
273 Ok(Some(output))
274 }
275
276 async fn apply_coarse_one_column(
277 &self,
278 file_id: FileId,
279 file_size_hint: Option<u64>,
280 column_id: ColumnId,
281 terms: &[FulltextTerm],
282 output: &mut [(usize, Vec<Range<usize>>)],
283 ) -> Result<bool> {
284 let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
285 let Some(reader) = self
286 .index_source
287 .blob(file_id, &blob_key, file_size_hint)
288 .await?
289 else {
290 return Ok(false);
291 };
292 let config =
293 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
294
295 let predicates = Self::terms_to_predicates(terms, &config);
296 if predicates.is_empty() {
297 return Ok(false);
298 }
299
300 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
301 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
302 let blob_size = range_reader
303 .metadata()
304 .await
305 .context(MetadataSnafu)?
306 .content_length;
307 let reader = CachedBloomFilterIndexBlobReader::new(
308 file_id,
309 column_id,
310 Tag::Fulltext,
311 blob_size,
312 BloomFilterReaderImpl::new(range_reader),
313 bloom_filter_cache.clone(),
314 );
315 Box::new(reader) as _
316 } else {
317 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
318 };
319
320 let mut applier = BloomFilterApplier::new(reader)
321 .await
322 .context(ApplyBloomFilterIndexSnafu)?;
323 for (_, row_group_output) in output.iter_mut() {
324 if row_group_output.is_empty() {
326 continue;
327 }
328
329 *row_group_output = applier
330 .search(&predicates, row_group_output)
331 .await
332 .context(ApplyBloomFilterIndexSnafu)?;
333 }
334
335 Ok(true)
336 }
337
338 #[allow(clippy::type_complexity)]
346 fn init_coarse_output(
347 row_groups: impl Iterator<Item = (usize, bool)>,
348 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
349 let mut input = Vec::with_capacity(row_groups.size_hint().0);
351 let mut start = 0;
352 for (i, (len, to_search)) in row_groups.enumerate() {
353 let end = start + len;
354 if to_search {
355 input.push((i, start..end));
356 }
357 start = end;
358 }
359
360 let output = input
363 .iter()
364 .map(|(i, range)| (*i, vec![range.clone()]))
365 .collect::<Vec<_>>();
366
367 (input, output)
368 }
369
370 fn adjust_coarse_output(
372 input: Vec<(usize, Range<usize>)>,
373 output: &mut [(usize, Vec<Range<usize>>)],
374 ) {
375 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
377 let start = input.start;
378 for range in output.iter_mut() {
379 range.start -= start;
380 range.end -= start;
381 }
382 }
383 }
384
385 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
390 let mut probes = HashSet::new();
391 for term in terms {
392 if config.case_sensitive && term.col_lowered {
393 continue;
395 }
396
397 let ts = term
398 .term
399 .split(|c: char| !c.is_alphanumeric())
400 .filter(|&t| !t.is_empty())
401 .map(|t| {
402 if !config.case_sensitive {
403 t.to_lowercase()
404 } else {
405 t.to_string()
406 }
407 .into_bytes()
408 });
409
410 probes.extend(ts);
411 }
412
413 probes
414 .into_iter()
415 .map(|p| InListPredicate {
416 list: iter::once(p).collect(),
417 })
418 .collect::<Vec<_>>()
419 }
420}
421
422struct IndexSource {
424 region_dir: String,
425 region_id: RegionId,
426
427 puffin_manager_factory: PuffinManagerFactory,
429
430 remote_store: ObjectStore,
432
433 file_cache: Option<FileCacheRef>,
435
436 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
438}
439
440impl IndexSource {
441 fn new(
442 region_dir: String,
443 region_id: RegionId,
444 puffin_manager_factory: PuffinManagerFactory,
445 remote_store: ObjectStore,
446 ) -> Self {
447 Self {
448 region_dir,
449 region_id,
450 puffin_manager_factory,
451 remote_store,
452 file_cache: None,
453 puffin_metadata_cache: None,
454 }
455 }
456
457 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
458 self.file_cache = file_cache;
459 }
460
461 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
462 self.puffin_metadata_cache = puffin_metadata_cache;
463 }
464
465 async fn blob(
469 &self,
470 file_id: FileId,
471 key: &str,
472 file_size_hint: Option<u64>,
473 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
474 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
475 let res = reader.blob(key).await;
476 match res {
477 Ok(blob) => Ok(Some(blob)),
478 Err(err) if err.is_blob_not_found() => Ok(None),
479 Err(err) => {
480 if fallbacked {
481 Err(err).context(PuffinReadBlobSnafu)
482 } else {
483 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
484 let reader = self.build_remote(file_id, file_size_hint).await?;
485 let res = reader.blob(key).await;
486 match res {
487 Ok(blob) => Ok(Some(blob)),
488 Err(err) if err.is_blob_not_found() => Ok(None),
489 Err(err) => Err(err).context(PuffinReadBlobSnafu),
490 }
491 }
492 }
493 }
494 }
495
496 async fn dir(
500 &self,
501 file_id: FileId,
502 key: &str,
503 file_size_hint: Option<u64>,
504 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
505 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
506 let res = reader.dir(key).await;
507 match res {
508 Ok(dir) => Ok(Some(dir)),
509 Err(err) if err.is_blob_not_found() => Ok(None),
510 Err(err) => {
511 if fallbacked {
512 Err(err).context(PuffinReadBlobSnafu)
513 } else {
514 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
515 let reader = self.build_remote(file_id, file_size_hint).await?;
516 let res = reader.dir(key).await;
517 match res {
518 Ok(dir) => Ok(Some(dir)),
519 Err(err) if err.is_blob_not_found() => Ok(None),
520 Err(err) => Err(err).context(PuffinReadBlobSnafu),
521 }
522 }
523 }
524 }
525 }
526
527 async fn ensure_reader(
529 &self,
530 file_id: FileId,
531 file_size_hint: Option<u64>,
532 ) -> Result<(SstPuffinReader, bool)> {
533 match self.build_local_cache(file_id, file_size_hint).await {
534 Ok(Some(r)) => Ok((r, false)),
535 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
536 Err(err) => Err(err),
537 }
538 }
539
540 async fn build_local_cache(
541 &self,
542 file_id: FileId,
543 file_size_hint: Option<u64>,
544 ) -> Result<Option<SstPuffinReader>> {
545 let Some(file_cache) = &self.file_cache else {
546 return Ok(None);
547 };
548
549 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
550 if file_cache.get(index_key).await.is_none() {
551 return Ok(None);
552 };
553
554 let puffin_manager = self
555 .puffin_manager_factory
556 .build(
557 file_cache.local_store(),
558 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
559 )
560 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
561 let reader = puffin_manager
562 .reader(&file_id)
563 .await
564 .context(PuffinBuildReaderSnafu)?
565 .with_file_size_hint(file_size_hint);
566 Ok(Some(reader))
567 }
568
569 async fn build_remote(
570 &self,
571 file_id: FileId,
572 file_size_hint: Option<u64>,
573 ) -> Result<SstPuffinReader> {
574 let puffin_manager = self
575 .puffin_manager_factory
576 .build(
577 self.remote_store.clone(),
578 RegionFilePathFactory::new(self.region_dir.clone()),
579 )
580 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
581
582 let reader = puffin_manager
583 .reader(&file_id)
584 .await
585 .context(PuffinBuildReaderSnafu)?
586 .with_file_size_hint(file_size_hint);
587
588 Ok(reader)
589 }
590}