1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
40 PuffinReadBlobSnafu, Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::FileId;
44use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
45use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
46use crate::sst::index::puffin_manager::{
47 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
48};
49use crate::sst::index::TYPE_FULLTEXT_INDEX;
50
51pub mod builder;
52
53pub struct FulltextIndexApplier {
55 requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
57
58 index_source: IndexSource,
60
61 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
63
64 predicate_key: PredicateKey,
66}
67
68pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
69
70impl FulltextIndexApplier {
71 pub fn new(
73 region_dir: String,
74 region_id: RegionId,
75 store: ObjectStore,
76 requests: BTreeMap<ColumnId, FulltextRequest>,
77 puffin_manager_factory: PuffinManagerFactory,
78 ) -> Self {
79 let requests = Arc::new(requests);
80 let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
81
82 Self {
83 predicate_key: PredicateKey::new_fulltext(requests.clone()),
84 requests,
85 index_source,
86 bloom_filter_index_cache: None,
87 }
88 }
89
90 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
92 self.index_source.set_file_cache(file_cache);
93 self
94 }
95
96 pub fn with_puffin_metadata_cache(
98 mut self,
99 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
100 ) -> Self {
101 self.index_source
102 .set_puffin_metadata_cache(puffin_metadata_cache);
103 self
104 }
105
106 pub fn with_bloom_filter_cache(
108 mut self,
109 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
110 ) -> Self {
111 self.bloom_filter_index_cache = bloom_filter_index_cache;
112 self
113 }
114
115 pub fn predicate_key(&self) -> &PredicateKey {
117 &self.predicate_key
118 }
119}
120
121impl FulltextIndexApplier {
122 pub async fn apply_fine(
125 &self,
126 file_id: FileId,
127 file_size_hint: Option<u64>,
128 ) -> Result<Option<BTreeSet<RowId>>> {
129 let timer = INDEX_APPLY_ELAPSED
130 .with_label_values(&[TYPE_FULLTEXT_INDEX])
131 .start_timer();
132
133 let mut row_ids: Option<BTreeSet<RowId>> = None;
134 for (column_id, request) in self.requests.iter() {
135 if request.queries.is_empty() && request.terms.is_empty() {
136 continue;
137 }
138
139 let Some(result) = self
140 .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
141 .await?
142 else {
143 continue;
144 };
145
146 if let Some(ids) = row_ids.as_mut() {
147 ids.retain(|id| result.contains(id));
148 } else {
149 row_ids = Some(result);
150 }
151
152 if let Some(ids) = row_ids.as_ref() {
153 if ids.is_empty() {
154 break;
155 }
156 }
157 }
158
159 if row_ids.is_none() {
160 timer.stop_and_discard();
161 }
162 Ok(row_ids)
163 }
164
165 async fn apply_fine_one_column(
166 &self,
167 file_size_hint: Option<u64>,
168 file_id: FileId,
169 column_id: ColumnId,
170 request: &FulltextRequest,
171 ) -> Result<Option<BTreeSet<RowId>>> {
172 let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
173 let dir = self
174 .index_source
175 .dir(file_id, &blob_key, file_size_hint)
176 .await?;
177
178 let dir = match &dir {
179 Some(dir) => dir,
180 None => {
181 return Ok(None);
182 }
183 };
184
185 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
186 let path = dir.path();
187
188 let searcher =
189 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
190 let mut row_ids: Option<BTreeSet<RowId>> = None;
191
192 for query in &request.queries {
194 let result = searcher
195 .search(&query.0)
196 .await
197 .context(ApplyFulltextIndexSnafu)?;
198
199 if let Some(ids) = row_ids.as_mut() {
200 ids.retain(|id| result.contains(id));
201 } else {
202 row_ids = Some(result);
203 }
204
205 if let Some(ids) = row_ids.as_ref() {
206 if ids.is_empty() {
207 break;
208 }
209 }
210 }
211
212 let query = request.terms_as_query(config.case_sensitive);
214 if !query.0.is_empty() {
215 let result = searcher
216 .search(&query.0)
217 .await
218 .context(ApplyFulltextIndexSnafu)?;
219
220 if let Some(ids) = row_ids.as_mut() {
221 ids.retain(|id| result.contains(id));
222 } else {
223 row_ids = Some(result);
224 }
225 }
226
227 Ok(row_ids)
228 }
229}
230
231impl FulltextIndexApplier {
232 pub async fn apply_coarse(
235 &self,
236 file_id: FileId,
237 file_size_hint: Option<u64>,
238 row_groups: impl Iterator<Item = (usize, bool)>,
239 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
240 let timer = INDEX_APPLY_ELAPSED
241 .with_label_values(&[TYPE_FULLTEXT_INDEX])
242 .start_timer();
243
244 let (input, mut output) = Self::init_coarse_output(row_groups);
245 let mut applied = false;
246
247 for (column_id, request) in self.requests.iter() {
248 if request.terms.is_empty() {
249 continue;
251 }
252
253 applied |= self
254 .apply_coarse_one_column(
255 file_id,
256 file_size_hint,
257 *column_id,
258 &request.terms,
259 &mut output,
260 )
261 .await?;
262 }
263
264 if !applied {
265 timer.stop_and_discard();
266 return Ok(None);
267 }
268
269 Self::adjust_coarse_output(input, &mut output);
270 Ok(Some(output))
271 }
272
273 async fn apply_coarse_one_column(
274 &self,
275 file_id: FileId,
276 file_size_hint: Option<u64>,
277 column_id: ColumnId,
278 terms: &[FulltextTerm],
279 output: &mut [(usize, Vec<Range<usize>>)],
280 ) -> Result<bool> {
281 let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
282 let Some(reader) = self
283 .index_source
284 .blob(file_id, &blob_key, file_size_hint)
285 .await?
286 else {
287 return Ok(false);
288 };
289 let config =
290 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
291
292 let predicates = Self::terms_to_predicates(terms, &config);
293 if predicates.is_empty() {
294 return Ok(false);
295 }
296
297 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
298 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
299 let blob_size = range_reader
300 .metadata()
301 .await
302 .context(MetadataSnafu)?
303 .content_length;
304 let reader = CachedBloomFilterIndexBlobReader::new(
305 file_id,
306 column_id,
307 Tag::Fulltext,
308 blob_size,
309 BloomFilterReaderImpl::new(range_reader),
310 bloom_filter_cache.clone(),
311 );
312 Box::new(reader) as _
313 } else {
314 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
315 };
316
317 let mut applier = BloomFilterApplier::new(reader)
318 .await
319 .context(ApplyBloomFilterIndexSnafu)?;
320 for (_, row_group_output) in output.iter_mut() {
321 if row_group_output.is_empty() {
323 continue;
324 }
325
326 *row_group_output = applier
327 .search(&predicates, row_group_output)
328 .await
329 .context(ApplyBloomFilterIndexSnafu)?;
330 }
331
332 Ok(true)
333 }
334
335 #[allow(clippy::type_complexity)]
343 fn init_coarse_output(
344 row_groups: impl Iterator<Item = (usize, bool)>,
345 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
346 let mut input = Vec::with_capacity(row_groups.size_hint().0);
348 let mut start = 0;
349 for (i, (len, to_search)) in row_groups.enumerate() {
350 let end = start + len;
351 if to_search {
352 input.push((i, start..end));
353 }
354 start = end;
355 }
356
357 let output = input
360 .iter()
361 .map(|(i, range)| (*i, vec![range.clone()]))
362 .collect::<Vec<_>>();
363
364 (input, output)
365 }
366
367 fn adjust_coarse_output(
369 input: Vec<(usize, Range<usize>)>,
370 output: &mut Vec<(usize, Vec<Range<usize>>)>,
371 ) {
372 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
374 let start = input.start;
375 for range in output.iter_mut() {
376 range.start -= start;
377 range.end -= start;
378 }
379 }
380 output.retain(|(_, ranges)| !ranges.is_empty());
381 }
382
383 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
388 let mut probes = HashSet::new();
389 for term in terms {
390 if config.case_sensitive && term.col_lowered {
391 continue;
393 }
394
395 let ts = term
396 .term
397 .split(|c: char| !c.is_alphanumeric())
398 .filter(|&t| !t.is_empty())
399 .map(|t| {
400 if !config.case_sensitive {
401 t.to_lowercase()
402 } else {
403 t.to_string()
404 }
405 .into_bytes()
406 });
407
408 probes.extend(ts);
409 }
410
411 probes
412 .into_iter()
413 .map(|p| InListPredicate {
414 list: iter::once(p).collect(),
415 })
416 .collect::<Vec<_>>()
417 }
418}
419
420struct IndexSource {
422 region_dir: String,
423 region_id: RegionId,
424
425 puffin_manager_factory: PuffinManagerFactory,
427
428 remote_store: ObjectStore,
430
431 file_cache: Option<FileCacheRef>,
433
434 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
436}
437
438impl IndexSource {
439 fn new(
440 region_dir: String,
441 region_id: RegionId,
442 puffin_manager_factory: PuffinManagerFactory,
443 remote_store: ObjectStore,
444 ) -> Self {
445 Self {
446 region_dir,
447 region_id,
448 puffin_manager_factory,
449 remote_store,
450 file_cache: None,
451 puffin_metadata_cache: None,
452 }
453 }
454
455 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
456 self.file_cache = file_cache;
457 }
458
459 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
460 self.puffin_metadata_cache = puffin_metadata_cache;
461 }
462
463 #[allow(unused)]
467 async fn blob(
468 &self,
469 file_id: FileId,
470 key: &str,
471 file_size_hint: Option<u64>,
472 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
473 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
474 let res = reader.blob(key).await;
475 match res {
476 Ok(blob) => Ok(Some(blob)),
477 Err(err) if err.is_blob_not_found() => Ok(None),
478 Err(err) => {
479 if fallbacked {
480 Err(err).context(PuffinReadBlobSnafu)
481 } else {
482 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
483 let reader = self.build_remote(file_id, file_size_hint).await?;
484 let res = reader.blob(key).await;
485 match res {
486 Ok(blob) => Ok(Some(blob)),
487 Err(err) if err.is_blob_not_found() => Ok(None),
488 Err(err) => Err(err).context(PuffinReadBlobSnafu),
489 }
490 }
491 }
492 }
493 }
494
495 async fn dir(
499 &self,
500 file_id: FileId,
501 key: &str,
502 file_size_hint: Option<u64>,
503 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
504 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
505 let res = reader.dir(key).await;
506 match res {
507 Ok(dir) => Ok(Some(dir)),
508 Err(err) if err.is_blob_not_found() => Ok(None),
509 Err(err) => {
510 if fallbacked {
511 Err(err).context(PuffinReadBlobSnafu)
512 } else {
513 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
514 let reader = self.build_remote(file_id, file_size_hint).await?;
515 let res = reader.dir(key).await;
516 match res {
517 Ok(dir) => Ok(Some(dir)),
518 Err(err) if err.is_blob_not_found() => Ok(None),
519 Err(err) => Err(err).context(PuffinReadBlobSnafu),
520 }
521 }
522 }
523 }
524 }
525
526 async fn ensure_reader(
528 &self,
529 file_id: FileId,
530 file_size_hint: Option<u64>,
531 ) -> Result<(SstPuffinReader, bool)> {
532 match self.build_local_cache(file_id, file_size_hint).await {
533 Ok(Some(r)) => Ok((r, false)),
534 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
535 Err(err) => Err(err),
536 }
537 }
538
539 async fn build_local_cache(
540 &self,
541 file_id: FileId,
542 file_size_hint: Option<u64>,
543 ) -> Result<Option<SstPuffinReader>> {
544 let Some(file_cache) = &self.file_cache else {
545 return Ok(None);
546 };
547
548 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
549 if file_cache.get(index_key).await.is_none() {
550 return Ok(None);
551 };
552
553 let puffin_manager = self
554 .puffin_manager_factory
555 .build(
556 file_cache.local_store(),
557 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
558 )
559 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
560 let reader = puffin_manager
561 .reader(&file_id)
562 .await
563 .context(PuffinBuildReaderSnafu)?
564 .with_file_size_hint(file_size_hint);
565 Ok(Some(reader))
566 }
567
568 async fn build_remote(
569 &self,
570 file_id: FileId,
571 file_size_hint: Option<u64>,
572 ) -> Result<SstPuffinReader> {
573 let puffin_manager = self
574 .puffin_manager_factory
575 .build(
576 self.remote_store.clone(),
577 RegionFilePathFactory::new(self.region_dir.clone()),
578 )
579 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
580
581 let reader = puffin_manager
582 .reader(&file_id)
583 .await
584 .context(PuffinBuildReaderSnafu)?
585 .with_file_size_hint(file_size_hint);
586
587 Ok(reader)
588 }
589}