1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::iter;
17use std::ops::Range;
18use std::sync::Arc;
19
20use common_base::range_read::RangeReader;
21use common_telemetry::warn;
22use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
23use index::bloom_filter::reader::BloomFilterReaderImpl;
24use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
25use index::fulltext_index::Config;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::storage::{ColumnId, RegionId};
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::error::{
38 ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
39 PuffinReadBlobSnafu, Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
44use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
45use crate::sst::index::puffin_manager::{
46 PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
47};
48use crate::sst::index::TYPE_FULLTEXT_INDEX;
49
50pub mod builder;
51
52pub struct FulltextIndexApplier {
54 requests: HashMap<ColumnId, FulltextRequest>,
56
57 index_source: IndexSource,
59
60 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
62}
63
64pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
65
66impl FulltextIndexApplier {
67 pub fn new(
69 region_dir: String,
70 region_id: RegionId,
71 store: ObjectStore,
72 requests: HashMap<ColumnId, FulltextRequest>,
73 puffin_manager_factory: PuffinManagerFactory,
74 ) -> Self {
75 let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
76
77 Self {
78 requests,
79 index_source,
80 bloom_filter_index_cache: None,
81 }
82 }
83
84 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
86 self.index_source.set_file_cache(file_cache);
87 self
88 }
89
90 pub fn with_puffin_metadata_cache(
92 mut self,
93 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
94 ) -> Self {
95 self.index_source
96 .set_puffin_metadata_cache(puffin_metadata_cache);
97 self
98 }
99
100 pub fn with_bloom_filter_cache(
102 mut self,
103 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
104 ) -> Self {
105 self.bloom_filter_index_cache = bloom_filter_index_cache;
106 self
107 }
108}
109
110impl FulltextIndexApplier {
111 pub async fn apply_fine(
114 &self,
115 file_id: FileId,
116 file_size_hint: Option<u64>,
117 ) -> Result<Option<BTreeSet<RowId>>> {
118 let timer = INDEX_APPLY_ELAPSED
119 .with_label_values(&[TYPE_FULLTEXT_INDEX])
120 .start_timer();
121
122 let mut row_ids: Option<BTreeSet<RowId>> = None;
123 for (column_id, request) in &self.requests {
124 if request.queries.is_empty() && request.terms.is_empty() {
125 continue;
126 }
127
128 let Some(result) = self
129 .apply_fine_one_column(file_size_hint, file_id, *column_id, request)
130 .await?
131 else {
132 continue;
133 };
134
135 if let Some(ids) = row_ids.as_mut() {
136 ids.retain(|id| result.contains(id));
137 } else {
138 row_ids = Some(result);
139 }
140
141 if let Some(ids) = row_ids.as_ref() {
142 if ids.is_empty() {
143 break;
144 }
145 }
146 }
147
148 if row_ids.is_none() {
149 timer.stop_and_discard();
150 }
151 Ok(row_ids)
152 }
153
154 async fn apply_fine_one_column(
155 &self,
156 file_size_hint: Option<u64>,
157 file_id: FileId,
158 column_id: ColumnId,
159 request: &FulltextRequest,
160 ) -> Result<Option<BTreeSet<RowId>>> {
161 let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
162 let dir = self
163 .index_source
164 .dir(file_id, &blob_key, file_size_hint)
165 .await?;
166
167 let dir = match &dir {
168 Some(dir) => dir,
169 None => {
170 return Ok(None);
171 }
172 };
173
174 let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
175 let path = dir.path();
176
177 let searcher =
178 TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
179 let mut row_ids: Option<BTreeSet<RowId>> = None;
180
181 for query in &request.queries {
183 let result = searcher
184 .search(&query.0)
185 .await
186 .context(ApplyFulltextIndexSnafu)?;
187
188 if let Some(ids) = row_ids.as_mut() {
189 ids.retain(|id| result.contains(id));
190 } else {
191 row_ids = Some(result);
192 }
193
194 if let Some(ids) = row_ids.as_ref() {
195 if ids.is_empty() {
196 break;
197 }
198 }
199 }
200
201 let query = request.terms_as_query(config.case_sensitive);
203 if !query.0.is_empty() {
204 let result = searcher
205 .search(&query.0)
206 .await
207 .context(ApplyFulltextIndexSnafu)?;
208
209 if let Some(ids) = row_ids.as_mut() {
210 ids.retain(|id| result.contains(id));
211 } else {
212 row_ids = Some(result);
213 }
214 }
215
216 Ok(row_ids)
217 }
218}
219
220impl FulltextIndexApplier {
221 pub async fn apply_coarse(
224 &self,
225 file_id: FileId,
226 file_size_hint: Option<u64>,
227 row_groups: impl Iterator<Item = (usize, bool)>,
228 ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
229 let timer = INDEX_APPLY_ELAPSED
230 .with_label_values(&[TYPE_FULLTEXT_INDEX])
231 .start_timer();
232
233 let (input, mut output) = Self::init_coarse_output(row_groups);
234 let mut applied = false;
235
236 for (column_id, request) in &self.requests {
237 if request.terms.is_empty() {
238 continue;
240 }
241
242 applied |= self
243 .apply_coarse_one_column(
244 file_id,
245 file_size_hint,
246 *column_id,
247 &request.terms,
248 &mut output,
249 )
250 .await?;
251 }
252
253 if !applied {
254 timer.stop_and_discard();
255 return Ok(None);
256 }
257
258 Self::adjust_coarse_output(input, &mut output);
259 Ok(Some(output))
260 }
261
262 async fn apply_coarse_one_column(
263 &self,
264 file_id: FileId,
265 file_size_hint: Option<u64>,
266 column_id: ColumnId,
267 terms: &[FulltextTerm],
268 output: &mut [(usize, Vec<Range<usize>>)],
269 ) -> Result<bool> {
270 let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
271 let Some(reader) = self
272 .index_source
273 .blob(file_id, &blob_key, file_size_hint)
274 .await?
275 else {
276 return Ok(false);
277 };
278 let config =
279 Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
280
281 let predicates = Self::terms_to_predicates(terms, &config);
282 if predicates.is_empty() {
283 return Ok(false);
284 }
285
286 let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
287 let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
288 let blob_size = range_reader
289 .metadata()
290 .await
291 .context(MetadataSnafu)?
292 .content_length;
293 let reader = CachedBloomFilterIndexBlobReader::new(
294 file_id,
295 column_id,
296 Tag::Fulltext,
297 blob_size,
298 BloomFilterReaderImpl::new(range_reader),
299 bloom_filter_cache.clone(),
300 );
301 Box::new(reader) as _
302 } else {
303 Box::new(BloomFilterReaderImpl::new(range_reader)) as _
304 };
305
306 let mut applier = BloomFilterApplier::new(reader)
307 .await
308 .context(ApplyBloomFilterIndexSnafu)?;
309 for (_, row_group_output) in output.iter_mut() {
310 if row_group_output.is_empty() {
312 continue;
313 }
314
315 *row_group_output = applier
316 .search(&predicates, row_group_output)
317 .await
318 .context(ApplyBloomFilterIndexSnafu)?;
319 }
320
321 Ok(true)
322 }
323
324 #[allow(clippy::type_complexity)]
332 fn init_coarse_output(
333 row_groups: impl Iterator<Item = (usize, bool)>,
334 ) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
335 let mut input = Vec::with_capacity(row_groups.size_hint().0);
337 let mut start = 0;
338 for (i, (len, to_search)) in row_groups.enumerate() {
339 let end = start + len;
340 if to_search {
341 input.push((i, start..end));
342 }
343 start = end;
344 }
345
346 let output = input
349 .iter()
350 .map(|(i, range)| (*i, vec![range.clone()]))
351 .collect::<Vec<_>>();
352
353 (input, output)
354 }
355
356 fn adjust_coarse_output(
358 input: Vec<(usize, Range<usize>)>,
359 output: &mut Vec<(usize, Vec<Range<usize>>)>,
360 ) {
361 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
363 let start = input.start;
364 for range in output.iter_mut() {
365 range.start -= start;
366 range.end -= start;
367 }
368 }
369 output.retain(|(_, ranges)| !ranges.is_empty());
370 }
371
372 fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
377 let mut probes = HashSet::new();
378 for term in terms {
379 if config.case_sensitive && term.col_lowered {
380 continue;
382 }
383
384 let ts = term
385 .term
386 .split(|c: char| !c.is_alphanumeric())
387 .filter(|&t| !t.is_empty())
388 .map(|t| {
389 if !config.case_sensitive {
390 t.to_lowercase()
391 } else {
392 t.to_string()
393 }
394 .into_bytes()
395 });
396
397 probes.extend(ts);
398 }
399
400 probes
401 .into_iter()
402 .map(|p| InListPredicate {
403 list: iter::once(p).collect(),
404 })
405 .collect::<Vec<_>>()
406 }
407}
408
409struct IndexSource {
411 region_dir: String,
412 region_id: RegionId,
413
414 puffin_manager_factory: PuffinManagerFactory,
416
417 remote_store: ObjectStore,
419
420 file_cache: Option<FileCacheRef>,
422
423 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
425}
426
427impl IndexSource {
428 fn new(
429 region_dir: String,
430 region_id: RegionId,
431 puffin_manager_factory: PuffinManagerFactory,
432 remote_store: ObjectStore,
433 ) -> Self {
434 Self {
435 region_dir,
436 region_id,
437 puffin_manager_factory,
438 remote_store,
439 file_cache: None,
440 puffin_metadata_cache: None,
441 }
442 }
443
444 fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
445 self.file_cache = file_cache;
446 }
447
448 fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
449 self.puffin_metadata_cache = puffin_metadata_cache;
450 }
451
452 #[allow(unused)]
456 async fn blob(
457 &self,
458 file_id: FileId,
459 key: &str,
460 file_size_hint: Option<u64>,
461 ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
462 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
463 let res = reader.blob(key).await;
464 match res {
465 Ok(blob) => Ok(Some(blob)),
466 Err(err) if err.is_blob_not_found() => Ok(None),
467 Err(err) => {
468 if fallbacked {
469 Err(err).context(PuffinReadBlobSnafu)
470 } else {
471 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
472 let reader = self.build_remote(file_id, file_size_hint).await?;
473 let res = reader.blob(key).await;
474 match res {
475 Ok(blob) => Ok(Some(blob)),
476 Err(err) if err.is_blob_not_found() => Ok(None),
477 Err(err) => Err(err).context(PuffinReadBlobSnafu),
478 }
479 }
480 }
481 }
482 }
483
484 async fn dir(
488 &self,
489 file_id: FileId,
490 key: &str,
491 file_size_hint: Option<u64>,
492 ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
493 let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
494 let res = reader.dir(key).await;
495 match res {
496 Ok(dir) => Ok(Some(dir)),
497 Err(err) if err.is_blob_not_found() => Ok(None),
498 Err(err) => {
499 if fallbacked {
500 Err(err).context(PuffinReadBlobSnafu)
501 } else {
502 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
503 let reader = self.build_remote(file_id, file_size_hint).await?;
504 let res = reader.dir(key).await;
505 match res {
506 Ok(dir) => Ok(Some(dir)),
507 Err(err) if err.is_blob_not_found() => Ok(None),
508 Err(err) => Err(err).context(PuffinReadBlobSnafu),
509 }
510 }
511 }
512 }
513 }
514
515 async fn ensure_reader(
517 &self,
518 file_id: FileId,
519 file_size_hint: Option<u64>,
520 ) -> Result<(SstPuffinReader, bool)> {
521 match self.build_local_cache(file_id, file_size_hint).await {
522 Ok(Some(r)) => Ok((r, false)),
523 Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
524 Err(err) => Err(err),
525 }
526 }
527
528 async fn build_local_cache(
529 &self,
530 file_id: FileId,
531 file_size_hint: Option<u64>,
532 ) -> Result<Option<SstPuffinReader>> {
533 let Some(file_cache) = &self.file_cache else {
534 return Ok(None);
535 };
536
537 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
538 if file_cache.get(index_key).await.is_none() {
539 return Ok(None);
540 };
541
542 let puffin_manager = self
543 .puffin_manager_factory
544 .build(
545 file_cache.local_store(),
546 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
547 )
548 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
549 let reader = puffin_manager
550 .reader(&file_id)
551 .await
552 .context(PuffinBuildReaderSnafu)?
553 .with_file_size_hint(file_size_hint);
554 Ok(Some(reader))
555 }
556
557 async fn build_remote(
558 &self,
559 file_id: FileId,
560 file_size_hint: Option<u64>,
561 ) -> Result<SstPuffinReader> {
562 let puffin_manager = self
563 .puffin_manager_factory
564 .build(
565 self.remote_store.clone(),
566 RegionFilePathFactory::new(self.region_dir.clone()),
567 )
568 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
569
570 let reader = puffin_manager
571 .reader(&file_id)
572 .await
573 .context(PuffinBuildReaderSnafu)?
574 .with_file_size_hint(file_size_hint);
575
576 Ok(reader)
577 }
578}