1pub mod builder;
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use datatypes::data_type::ConcreteDataType;
24use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
25use index::inverted_index::search::index_apply::{
26 ApplyOutput, IndexApplier, IndexNotFoundStrategy, PredicatesIndexApplier, SearchContext,
27};
28use index::inverted_index::search::predicate::Predicate;
29use index::target::IndexTarget;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use puffin::puffin_manager::{PuffinManager, PuffinReader};
33use snafu::ResultExt;
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
39use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
40use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyInvertedIndexSnafu, BuildIndexApplierSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
44 PuffinReadBlobSnafu, Result,
45};
46use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
49use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
50use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
51
52#[derive(Default, Clone)]
54pub struct InvertedIndexApplyMetrics {
55 pub apply_elapsed: std::time::Duration,
57 pub blob_cache_miss: usize,
59 pub blob_read_bytes: u64,
61 pub inverted_index_read_metrics: InvertedIndexReadMetrics,
63}
64
65impl std::fmt::Debug for InvertedIndexApplyMetrics {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let Self {
68 apply_elapsed,
69 blob_cache_miss,
70 blob_read_bytes,
71 inverted_index_read_metrics,
72 } = self;
73
74 if self.is_empty() {
75 return write!(f, "{{}}");
76 }
77 write!(f, "{{")?;
78
79 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
80
81 if *blob_cache_miss > 0 {
82 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
83 }
84 if *blob_read_bytes > 0 {
85 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
86 }
87 write!(
88 f,
89 ", \"inverted_index_read_metrics\":{:?}",
90 inverted_index_read_metrics
91 )?;
92
93 write!(f, "}}")
94 }
95}
96
97impl InvertedIndexApplyMetrics {
98 pub fn is_empty(&self) -> bool {
100 self.apply_elapsed.is_zero()
101 }
102
103 pub fn merge_from(&mut self, other: &Self) {
105 self.apply_elapsed += other.apply_elapsed;
106 self.blob_cache_miss += other.blob_cache_miss;
107 self.blob_read_bytes += other.blob_read_bytes;
108 self.inverted_index_read_metrics
109 .merge_from(&other.inverted_index_read_metrics);
110 }
111}
112
113pub(crate) struct InvertedIndexApplier {
116 table_dir: String,
118
119 path_type: PathType,
121
122 store: ObjectStore,
124
125 file_cache: Option<FileCacheRef>,
127
128 puffin_manager_factory: PuffinManagerFactory,
130
131 inverted_index_cache: Option<InvertedIndexCacheRef>,
133
134 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
136
137 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
139
140 default_plan: SstApplyPlan,
142
143 expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
145}
146
147pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
148
149#[derive(Clone)]
150pub(crate) struct SstApplyPlan {
151 pub predicate_key: PredicateKey,
152 pub index_applier: Arc<PredicatesIndexApplier>,
153}
154
155impl InvertedIndexApplier {
156 pub fn new(
158 table_dir: String,
159 path_type: PathType,
160 store: ObjectStore,
161 puffin_manager_factory: PuffinManagerFactory,
162 predicates: BTreeMap<ColumnId, Vec<Predicate>>,
163 expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
164 ) -> Result<Self> {
165 let default_plan = Self::build_apply_plan(&predicates)?;
166 INDEX_APPLY_MEMORY_USAGE.add(default_plan.index_applier.memory_usage() as i64);
167
168 Ok(Self {
169 table_dir,
170 path_type,
171 store,
172 file_cache: None,
173 puffin_manager_factory,
174 inverted_index_cache: None,
175 puffin_metadata_cache: None,
176 predicates,
177 default_plan,
178 expected_predicate_col_types,
179 })
180 }
181
182 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
184 self.file_cache = file_cache;
185 self
186 }
187
188 pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
190 self.inverted_index_cache = index_cache;
191 self
192 }
193
194 pub fn with_puffin_metadata_cache(
196 mut self,
197 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
198 ) -> Self {
199 self.puffin_metadata_cache = puffin_metadata_cache;
200 self
201 }
202
203 #[tracing::instrument(
211 skip_all,
212 fields(file_id = %file_id)
213 )]
214 pub async fn apply(
215 &self,
216 file_id: RegionIndexId,
217 file_size_hint: Option<u64>,
218 index_applier: &PredicatesIndexApplier,
219 mut metrics: Option<&mut InvertedIndexApplyMetrics>,
220 ) -> Result<ApplyOutput> {
221 let start = Instant::now();
222
223 let context = SearchContext {
224 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
226 };
227
228 let mut cache_miss = 0;
229 let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
230 Ok(Some(puffin_reader)) => puffin_reader,
231 other => {
232 cache_miss += 1;
233 if let Err(err) = other {
234 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
235 }
236 self.remote_blob_reader(file_id, file_size_hint).await?
237 }
238 };
239
240 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
241
242 let result = if let Some(index_cache) = &self.inverted_index_cache {
243 let mut index_reader = CachedInvertedIndexBlobReader::new(
244 file_id.file_id(),
245 file_id.version,
246 blob_size,
247 InvertedIndexBlobReader::new(blob),
248 index_cache.clone(),
249 );
250 index_applier
251 .apply(
252 context,
253 &mut index_reader,
254 metrics
255 .as_deref_mut()
256 .map(|m| &mut m.inverted_index_read_metrics),
257 )
258 .await
259 .context(ApplyInvertedIndexSnafu)
260 } else {
261 let mut index_reader = InvertedIndexBlobReader::new(blob);
262 index_applier
263 .apply(
264 context,
265 &mut index_reader,
266 metrics
267 .as_deref_mut()
268 .map(|m| &mut m.inverted_index_read_metrics),
269 )
270 .await
271 .context(ApplyInvertedIndexSnafu)
272 };
273
274 let elapsed = start.elapsed();
276 INDEX_APPLY_ELAPSED
277 .with_label_values(&[TYPE_INVERTED_INDEX])
278 .observe(elapsed.as_secs_f64());
279
280 if let Some(metrics) = metrics {
281 metrics.apply_elapsed = elapsed;
282 metrics.blob_cache_miss = cache_miss;
283 metrics.blob_read_bytes = blob_size;
284 }
285
286 result
287 }
288
289 async fn cached_blob_reader(
291 &self,
292 file_id: RegionIndexId,
293 file_size_hint: Option<u64>,
294 ) -> Result<Option<BlobReader>> {
295 let Some(file_cache) = &self.file_cache else {
296 return Ok(None);
297 };
298
299 let index_key = IndexKey::new(
300 file_id.region_id(),
301 file_id.file_id(),
302 FileType::Puffin(file_id.version),
303 );
304 if file_cache.get(index_key).await.is_none() {
305 return Ok(None);
306 };
307
308 let puffin_manager = self.puffin_manager_factory.build(
309 file_cache.local_store(),
310 WriteCachePathProvider::new(file_cache.clone()),
311 );
312
313 let reader = puffin_manager
315 .reader(&file_id)
316 .await
317 .context(PuffinBuildReaderSnafu)?
318 .with_file_size_hint(file_size_hint)
319 .blob(INDEX_BLOB_TYPE)
320 .await
321 .context(PuffinReadBlobSnafu)?
322 .reader()
323 .await
324 .context(PuffinBuildReaderSnafu)?;
325 Ok(Some(reader))
326 }
327
328 async fn remote_blob_reader(
330 &self,
331 file_id: RegionIndexId,
332 file_size_hint: Option<u64>,
333 ) -> Result<BlobReader> {
334 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
335
336 trigger_index_background_download(
338 self.file_cache.as_ref(),
339 &file_id,
340 file_size_hint,
341 &path_factory,
342 &self.store,
343 );
344
345 let puffin_manager = self
346 .puffin_manager_factory
347 .build(self.store.clone(), path_factory)
348 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
349
350 puffin_manager
351 .reader(&file_id)
352 .await
353 .context(PuffinBuildReaderSnafu)?
354 .with_file_size_hint(file_size_hint)
355 .blob(INDEX_BLOB_TYPE)
356 .await
357 .context(PuffinReadBlobSnafu)?
358 .reader()
359 .await
360 .context(PuffinBuildReaderSnafu)
361 }
362
363 pub fn plan_for_sst(&self, sst_metadata: &RegionMetadataRef) -> Result<Option<SstApplyPlan>> {
367 let mut compatible_predicates = BTreeMap::new();
368 let mut has_type_mismatch = false;
369
370 for (col_id, expected) in &self.expected_predicate_col_types {
371 if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
372 && sst_col.column_schema.data_type != *expected
373 {
374 has_type_mismatch = true;
375 continue;
376 }
377
378 if let Some(predicates) = self.predicates.get(col_id) {
379 compatible_predicates.insert(*col_id, predicates.clone());
380 }
381 }
382
383 if compatible_predicates.is_empty() {
384 return Ok(None);
385 }
386
387 if !has_type_mismatch {
388 return Ok(Some(self.default_plan.clone()));
389 }
390
391 let plan = Self::build_apply_plan(&compatible_predicates)?;
392 Ok(Some(plan))
393 }
394
395 fn build_apply_plan(
396 predicates_by_col: &BTreeMap<ColumnId, Vec<Predicate>>,
397 ) -> Result<SstApplyPlan> {
398 let predicates = predicates_by_col
399 .iter()
400 .map(|(col_id, preds)| (format!("{}", IndexTarget::ColumnId(*col_id)), preds.clone()))
401 .collect();
402
403 let index_applier =
404 PredicatesIndexApplier::try_from(predicates).context(BuildIndexApplierSnafu)?;
405
406 let predicate_key = PredicateKey::new_inverted(Arc::new(predicates_by_col.clone()));
407 Ok(SstApplyPlan {
408 predicate_key,
409 index_applier: Arc::new(index_applier),
410 })
411 }
412}
413
414impl Drop for InvertedIndexApplier {
415 fn drop(&mut self) {
416 INDEX_APPLY_MEMORY_USAGE.sub(self.default_plan.index_applier.memory_usage() as i64);
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use api::v1::SemanticType;
423 use datatypes::data_type::ConcreteDataType;
424 use datatypes::schema::ColumnSchema;
425 use futures::io::Cursor;
426 use index::inverted_index::search::predicate::RegexMatchPredicate;
427 use object_store::services::Memory;
428 use puffin::puffin_manager::PuffinWriter;
429 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
430 use store_api::storage::{FileId, RegionId};
431
432 use super::*;
433 use crate::sst::index::RegionFileId;
434
435 #[tokio::test]
436 async fn test_plan_for_sst() {
437 let (_d, puffin_manager_factory) =
438 PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
439 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
440 let table_dir = "table_dir".to_string();
441
442 let mut predicates = BTreeMap::new();
443 predicates.insert(
444 1,
445 vec![Predicate::RegexMatch(RegexMatchPredicate {
446 pattern: "foo".to_string(),
447 })],
448 );
449 let expected_predicate_col_types =
450 BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
451
452 let sst_index_applier = InvertedIndexApplier::new(
453 table_dir,
454 PathType::Bare,
455 object_store,
456 puffin_manager_factory,
457 predicates,
458 expected_predicate_col_types,
459 )
460 .unwrap();
461 let plan = sst_index_applier
462 .plan_for_sst(&mock_region_metadata())
463 .unwrap();
464 assert!(plan.is_some());
465 }
466
467 #[tokio::test]
468 async fn test_plan_for_sst_type_mismatch() {
469 let (_d, puffin_manager_factory) =
470 PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
471 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
472 let table_dir = "table_dir".to_string();
473
474 let mut predicates = BTreeMap::new();
475 predicates.insert(
476 1,
477 vec![Predicate::RegexMatch(RegexMatchPredicate {
478 pattern: "foo".to_string(),
479 })],
480 );
481 let expected_predicate_col_types =
483 BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
484
485 let sst_index_applier = InvertedIndexApplier::new(
486 table_dir,
487 PathType::Bare,
488 object_store,
489 puffin_manager_factory,
490 predicates,
491 expected_predicate_col_types,
492 )
493 .unwrap();
494 let plan = sst_index_applier
495 .plan_for_sst(&mock_region_metadata())
496 .unwrap();
497 assert!(plan.is_none());
498 }
499
500 #[tokio::test]
501 async fn test_index_applier_apply_invalid_blob_type() {
502 let (_d, puffin_manager_factory) =
503 PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
504 .await;
505 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
506 let file_id = RegionFileId::new(0.into(), FileId::random());
507 let index_id = RegionIndexId::new(file_id, 0);
508 let table_dir = "table_dir".to_string();
509
510 let puffin_manager = puffin_manager_factory.build(
511 object_store.clone(),
512 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
513 );
514 let mut writer = puffin_manager.writer(&index_id).await.unwrap();
515 writer
516 .put_blob(
517 "invalid_blob_type",
518 Cursor::new(vec![]),
519 Default::default(),
520 Default::default(),
521 )
522 .await
523 .unwrap();
524 writer.finish().await.unwrap();
525
526 let mut predicates = BTreeMap::new();
527 predicates.insert(
528 1,
529 vec![Predicate::RegexMatch(RegexMatchPredicate {
530 pattern: "foo".to_string(),
531 })],
532 );
533 let expected_predicate_col_types =
534 BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
535 let sst_index_applier = InvertedIndexApplier::new(
536 table_dir.clone(),
537 PathType::Bare,
538 object_store,
539 puffin_manager_factory,
540 predicates,
541 expected_predicate_col_types,
542 )
543 .unwrap();
544 let plan = sst_index_applier
545 .plan_for_sst(&mock_region_metadata())
546 .unwrap()
547 .unwrap();
548 let res = sst_index_applier
549 .apply(index_id, None, &plan.index_applier, None)
550 .await;
551 assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
552 }
553
554 fn mock_region_metadata() -> RegionMetadataRef {
555 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
556 builder
557 .push_column_metadata(ColumnMetadata {
558 column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), false),
559 semantic_type: SemanticType::Tag,
560 column_id: 1,
561 })
562 .push_column_metadata(ColumnMetadata {
563 column_schema: ColumnSchema::new(
564 "ts",
565 ConcreteDataType::timestamp_millisecond_datatype(),
566 false,
567 ),
568 semantic_type: SemanticType::Timestamp,
569 column_id: 2,
570 })
571 .primary_key(vec![1]);
572 Arc::new(builder.build().unwrap())
573 }
574}