1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
26use mito_codec::row_converter::SortField;
27use puffin::puffin_manager::{PuffinWriter, PutOptions};
28use snafu::{ensure, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31use tokio::io::duplex;
32use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
33
34use crate::error::{
35 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
36 PushIndexValueSnafu, Result,
37};
38use crate::read::Batch;
39use crate::sst::file::FileId;
40use crate::sst::index::intermediate::{
41 IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54pub struct InvertedIndexer {
56 index_creator: Box<dyn InvertedIndexCreator>,
58 temp_file_provider: Arc<TempFileProvider>,
60
61 codec: IndexValuesCodec,
63 value_buf: Vec<u8>,
65
66 stats: Statistics,
68 aborted: bool,
70
71 memory_usage: Arc<AtomicUsize>,
73
74 indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79 pub fn new(
82 sst_file_id: FileId,
83 metadata: &RegionMetadataRef,
84 intermediate_manager: IntermediateManager,
85 memory_usage_threshold: Option<usize>,
86 segment_row_count: NonZeroUsize,
87 indexed_column_ids: HashSet<ColumnId>,
88 ) -> Self {
89 let temp_file_provider = Arc::new(TempFileProvider::new(
90 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91 intermediate_manager,
92 ));
93
94 let memory_usage = Arc::new(AtomicUsize::new(0));
95
96 let sorter = ExternalSorter::factory(
97 temp_file_provider.clone() as _,
98 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99 memory_usage.clone(),
100 memory_usage_threshold,
101 );
102 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104 let codec = IndexValuesCodec::from_tag_columns(
105 metadata.primary_key_encoding,
106 metadata.primary_key_columns(),
107 );
108 let indexed_column_ids = indexed_column_ids
109 .into_iter()
110 .map(|col_id| {
111 let col_id_str = col_id.to_string();
112 (col_id, col_id_str)
113 })
114 .collect();
115 Self {
116 codec,
117 index_creator,
118 temp_file_provider,
119 value_buf: vec![],
120 stats: Statistics::new(TYPE_INVERTED_INDEX),
121 aborted: false,
122 memory_usage,
123 indexed_column_ids,
124 }
125 }
126
127 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130 ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132 if batch.is_empty() {
133 return Ok(());
134 }
135
136 if let Err(update_err) = self.do_update(batch).await {
137 if let Err(err) = self.do_cleanup().await {
139 if cfg!(any(test, feature = "test")) {
140 panic!("Failed to clean up index creator, err: {err}",);
141 } else {
142 warn!(err; "Failed to clean up index creator");
143 }
144 }
145 return Err(update_err);
146 }
147
148 Ok(())
149 }
150
151 pub async fn finish(
154 &mut self,
155 puffin_writer: &mut SstPuffinWriter,
156 ) -> Result<(RowCount, ByteCount)> {
157 ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159 if self.stats.row_count() == 0 {
160 return Ok((0, 0));
162 }
163
164 let finish_res = self.do_finish(puffin_writer).await;
165 if let Err(err) = self.do_cleanup().await {
167 if cfg!(any(test, feature = "test")) {
168 panic!("Failed to clean up index creator, err: {err}",);
169 } else {
170 warn!(err; "Failed to clean up index creator");
171 }
172 }
173
174 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175 }
176
177 pub async fn abort(&mut self) -> Result<()> {
179 if self.aborted {
180 return Ok(());
181 }
182 self.aborted = true;
183
184 self.do_cleanup().await
185 }
186
187 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188 let mut guard = self.stats.record_update();
189
190 let n = batch.num_rows();
191 guard.inc_row_count(n);
192
193 for (col_id, col_id_str) in &self.indexed_column_ids {
194 match self.codec.pk_col_info(*col_id) {
195 Some(col_info) => {
197 let pk_idx = col_info.idx;
198 let field = &col_info.field;
199 let value = batch
200 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201 .filter(|v| !v.is_null())
202 .map(|v| {
203 self.value_buf.clear();
204 IndexValueCodec::encode_nonnull_value(
205 v.as_value_ref(),
206 field,
207 &mut self.value_buf,
208 )
209 .context(EncodeSnafu)?;
210 Ok(self.value_buf.as_slice())
211 })
212 .transpose()?;
213
214 self.index_creator
215 .push_with_name_n(col_id_str, value, n)
216 .await
217 .context(PushIndexValueSnafu)?;
218 }
219 None => {
221 let Some(values) = batch.field_col_value(*col_id) else {
222 debug!(
223 "Column {} not found in the batch during building inverted index",
224 col_id
225 );
226 continue;
227 };
228 let sort_field = SortField::new(values.data.data_type());
229 for i in 0..n {
230 self.value_buf.clear();
231 let value = values.data.get_ref(i);
232 if value.is_null() {
233 self.index_creator
234 .push_with_name(col_id_str, None)
235 .await
236 .context(PushIndexValueSnafu)?;
237 } else {
238 IndexValueCodec::encode_nonnull_value(
239 value,
240 &sort_field,
241 &mut self.value_buf,
242 )
243 .context(EncodeSnafu)?;
244 self.index_creator
245 .push_with_name(col_id_str, Some(&self.value_buf))
246 .await
247 .context(PushIndexValueSnafu)?;
248 }
249 }
250 }
251 }
252 }
253
254 Ok(())
255 }
256
257 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
276 let mut guard = self.stats.record_finish();
277
278 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
279 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
280
281 let (index_finish, puffin_add_blob) = futures::join!(
282 self.index_creator
284 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
285 puffin_writer.put_blob(
286 INDEX_BLOB_TYPE,
287 rx.compat(),
288 PutOptions::default(),
289 Default::default(),
290 )
291 );
292
293 match (
294 puffin_add_blob.context(PuffinAddBlobSnafu),
295 index_finish.context(IndexFinishSnafu),
296 ) {
297 (Err(e1), Err(e2)) => BiErrorsSnafu {
298 first: Box::new(e1),
299 second: Box::new(e2),
300 }
301 .fail()?,
302
303 (Ok(_), e @ Err(_)) => e?,
304 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
305 (Ok(written_bytes), Ok(_)) => {
306 guard.inc_byte_count(written_bytes);
307 }
308 }
309
310 Ok(())
311 }
312
313 async fn do_cleanup(&mut self) -> Result<()> {
314 let _guard = self.stats.record_cleanup();
315
316 self.temp_file_provider.cleanup().await
317 }
318
319 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
320 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
321 }
322
323 pub fn memory_usage(&self) -> usize {
324 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::BTreeSet;
331
332 use api::v1::SemanticType;
333 use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
334 use datatypes::data_type::ConcreteDataType;
335 use datatypes::schema::ColumnSchema;
336 use datatypes::value::ValueRef;
337 use datatypes::vectors::{UInt64Vector, UInt8Vector};
338 use futures::future::BoxFuture;
339 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
340 use object_store::services::Memory;
341 use object_store::ObjectStore;
342 use puffin::puffin_manager::cache::PuffinMetadataCache;
343 use puffin::puffin_manager::PuffinManager;
344 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
345 use store_api::storage::RegionId;
346
347 use super::*;
348 use crate::access_layer::RegionFilePathFactory;
349 use crate::cache::index::inverted_index::InvertedIndexCache;
350 use crate::metrics::CACHE_BYTES;
351 use crate::read::BatchColumn;
352 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
353 use crate::sst::index::puffin_manager::PuffinManagerFactory;
354
355 fn mock_object_store() -> ObjectStore {
356 ObjectStore::new(Memory::default()).unwrap().finish()
357 }
358
359 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
360 IntermediateManager::init_fs(path).await.unwrap()
361 }
362
363 fn mock_region_metadata() -> RegionMetadataRef {
364 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
365 builder
366 .push_column_metadata(ColumnMetadata {
367 column_schema: ColumnSchema::new(
368 "tag_str",
369 ConcreteDataType::string_datatype(),
370 false,
371 ),
372 semantic_type: SemanticType::Tag,
373 column_id: 1,
374 })
375 .push_column_metadata(ColumnMetadata {
376 column_schema: ColumnSchema::new(
377 "tag_i32",
378 ConcreteDataType::int32_datatype(),
379 false,
380 ),
381 semantic_type: SemanticType::Tag,
382 column_id: 2,
383 })
384 .push_column_metadata(ColumnMetadata {
385 column_schema: ColumnSchema::new(
386 "ts",
387 ConcreteDataType::timestamp_millisecond_datatype(),
388 false,
389 ),
390 semantic_type: SemanticType::Timestamp,
391 column_id: 3,
392 })
393 .push_column_metadata(ColumnMetadata {
394 column_schema: ColumnSchema::new(
395 "field_u64",
396 ConcreteDataType::uint64_datatype(),
397 false,
398 ),
399 semantic_type: SemanticType::Field,
400 column_id: 4,
401 })
402 .primary_key(vec![1, 2]);
403
404 Arc::new(builder.build().unwrap())
405 }
406
407 fn new_batch(
408 str_tag: impl AsRef<str>,
409 i32_tag: impl Into<i32>,
410 u64_field: impl IntoIterator<Item = u64>,
411 ) -> Batch {
412 let fields = vec![
413 (0, SortField::new(ConcreteDataType::string_datatype())),
414 (1, SortField::new(ConcreteDataType::int32_datatype())),
415 ];
416 let codec = DensePrimaryKeyCodec::with_fields(fields);
417 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
418 let primary_key = codec.encode(row.into_iter()).unwrap();
419
420 let u64_field = BatchColumn {
421 column_id: 4,
422 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
423 };
424 let num_rows = u64_field.data.len();
425
426 Batch::new(
427 primary_key,
428 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
429 0, num_rows,
430 ))),
431 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
432 0, num_rows,
433 ))),
434 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
435 1, num_rows,
436 ))),
437 vec![u64_field],
438 )
439 .unwrap()
440 }
441
442 async fn build_applier_factory(
443 prefix: &str,
444 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
445 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
446 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
447 let region_dir = "region0".to_string();
448 let sst_file_id = FileId::random();
449 let object_store = mock_object_store();
450 let region_metadata = mock_region_metadata();
451 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
452 let memory_threshold = None;
453 let segment_row_count = 2;
454 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
455
456 let mut creator = InvertedIndexer::new(
457 sst_file_id,
458 ®ion_metadata,
459 intm_mgr,
460 memory_threshold,
461 NonZeroUsize::new(segment_row_count).unwrap(),
462 indexed_column_ids.clone(),
463 );
464
465 for (str_tag, i32_tag, u64_field) in &rows {
466 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
467 creator.update(&mut batch).await.unwrap();
468 }
469
470 let puffin_manager = factory.build(
471 object_store.clone(),
472 RegionFilePathFactory::new(region_dir.clone()),
473 );
474 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
475 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
476 assert_eq!(row_count, rows.len() * segment_row_count);
477 writer.finish().await.unwrap();
478
479 move |expr| {
480 let _d = &d;
481 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
482 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
483 let applier = InvertedIndexApplierBuilder::new(
484 region_dir.clone(),
485 object_store.clone(),
486 ®ion_metadata,
487 indexed_column_ids.clone(),
488 factory.clone(),
489 )
490 .with_inverted_index_cache(Some(cache))
491 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
492 .build(&[expr])
493 .unwrap()
494 .unwrap();
495 Box::pin(async move {
496 applier
497 .apply(sst_file_id, None)
498 .await
499 .unwrap()
500 .matched_segment_ids
501 .iter_ones()
502 .collect()
503 })
504 }
505 }
506
507 #[tokio::test]
508 async fn test_create_and_query_get_key() {
509 let rows = BTreeSet::from_iter([
510 ("aaa", 1, [1, 2]),
511 ("aaa", 2, [2, 3]),
512 ("aaa", 3, [3, 4]),
513 ("aab", 1, [4, 5]),
514 ("aab", 2, [5, 6]),
515 ("aab", 3, [6, 7]),
516 ("abc", 1, [7, 8]),
517 ("abc", 2, [8, 9]),
518 ("abc", 3, [9, 10]),
519 ]);
520
521 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
522
523 let expr = col("tag_str").eq(lit("aaa"));
524 let res = applier_factory(expr).await;
525 assert_eq!(res, vec![0, 1, 2]);
526
527 let expr = col("tag_i32").eq(lit(2));
528 let res = applier_factory(expr).await;
529 assert_eq!(res, vec![1, 4, 7]);
530
531 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
532 let res = applier_factory(expr).await;
533 assert_eq!(res, vec![1]);
534
535 let expr = col("tag_str")
536 .eq(lit("aaa"))
537 .or(col("tag_str").eq(lit("abc")));
538 let res = applier_factory(expr).await;
539 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
540
541 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
542 let res = applier_factory(expr).await;
543 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
544
545 let expr = col("field_u64").eq(lit(2u64));
546 let res = applier_factory(expr).await;
547 assert_eq!(res, vec![0, 1]);
548 }
549
550 #[tokio::test]
551 async fn test_create_and_query_range() {
552 let rows = BTreeSet::from_iter([
553 ("aaa", 1, [1, 2]),
554 ("aaa", 2, [2, 3]),
555 ("aaa", 3, [3, 4]),
556 ("aab", 1, [4, 5]),
557 ("aab", 2, [5, 6]),
558 ("aab", 3, [6, 7]),
559 ("abc", 1, [7, 8]),
560 ("abc", 2, [8, 9]),
561 ("abc", 3, [9, 10]),
562 ]);
563
564 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
565
566 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
567 let res = applier_factory(expr).await;
568 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
569
570 let expr = col("tag_i32").between(lit(2), lit(3));
571 let res = applier_factory(expr).await;
572 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
573
574 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
575 let res = applier_factory(expr).await;
576 assert_eq!(res, vec![0, 1, 2]);
577
578 let expr = col("tag_i32").between(lit(2), lit(2));
579 let res = applier_factory(expr).await;
580 assert_eq!(res, vec![1, 4, 7]);
581
582 let expr = col("field_u64").between(lit(2u64), lit(5u64));
583 let res = applier_factory(expr).await;
584 assert_eq!(res, vec![0, 1, 2, 3, 4]);
585 }
586
587 #[tokio::test]
588 async fn test_create_and_query_comparison() {
589 let rows = BTreeSet::from_iter([
590 ("aaa", 1, [1, 2]),
591 ("aaa", 2, [2, 3]),
592 ("aaa", 3, [3, 4]),
593 ("aab", 1, [4, 5]),
594 ("aab", 2, [5, 6]),
595 ("aab", 3, [6, 7]),
596 ("abc", 1, [7, 8]),
597 ("abc", 2, [8, 9]),
598 ("abc", 3, [9, 10]),
599 ]);
600
601 let applier_factory =
602 build_applier_factory("test_create_and_query_comparison_", rows).await;
603
604 let expr = col("tag_str").lt(lit("aab"));
605 let res = applier_factory(expr).await;
606 assert_eq!(res, vec![0, 1, 2]);
607
608 let expr = col("tag_i32").lt(lit(2));
609 let res = applier_factory(expr).await;
610 assert_eq!(res, vec![0, 3, 6]);
611
612 let expr = col("field_u64").lt(lit(2u64));
613 let res = applier_factory(expr).await;
614 assert_eq!(res, vec![0]);
615
616 let expr = col("tag_str").gt(lit("aab"));
617 let res = applier_factory(expr).await;
618 assert_eq!(res, vec![6, 7, 8]);
619
620 let expr = col("tag_i32").gt(lit(2));
621 let res = applier_factory(expr).await;
622 assert_eq!(res, vec![2, 5, 8]);
623
624 let expr = col("field_u64").gt(lit(8u64));
625 let res = applier_factory(expr).await;
626 assert_eq!(res, vec![7, 8]);
627
628 let expr = col("tag_str").lt_eq(lit("aab"));
629 let res = applier_factory(expr).await;
630 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
631
632 let expr = col("tag_i32").lt_eq(lit(2));
633 let res = applier_factory(expr).await;
634 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
635
636 let expr = col("field_u64").lt_eq(lit(2u64));
637 let res = applier_factory(expr).await;
638 assert_eq!(res, vec![0, 1]);
639
640 let expr = col("tag_str").gt_eq(lit("aab"));
641 let res = applier_factory(expr).await;
642 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
643
644 let expr = col("tag_i32").gt_eq(lit(2));
645 let res = applier_factory(expr).await;
646 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
647
648 let expr = col("field_u64").gt_eq(lit(8u64));
649 let res = applier_factory(expr).await;
650 assert_eq!(res, vec![6, 7, 8]);
651
652 let expr = col("tag_str")
653 .gt(lit("aaa"))
654 .and(col("tag_str").lt(lit("abc")));
655 let res = applier_factory(expr).await;
656 assert_eq!(res, vec![3, 4, 5]);
657
658 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
659 let res = applier_factory(expr).await;
660 assert_eq!(res, vec![1, 4, 7]);
661
662 let expr = col("field_u64")
663 .gt(lit(2u64))
664 .and(col("field_u64").lt(lit(9u64)));
665 let res = applier_factory(expr).await;
666 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
667 }
668
669 #[tokio::test]
670 async fn test_create_and_query_regex() {
671 let rows = BTreeSet::from_iter([
672 ("aaa", 1, [1, 2]),
673 ("aaa", 2, [2, 3]),
674 ("aaa", 3, [3, 4]),
675 ("aab", 1, [4, 5]),
676 ("aab", 2, [5, 6]),
677 ("aab", 3, [6, 7]),
678 ("abc", 1, [7, 8]),
679 ("abc", 2, [8, 9]),
680 ("abc", 3, [9, 10]),
681 ]);
682
683 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
684
685 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
686 let res = applier_factory(expr).await;
687 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
688
689 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
690 let res = applier_factory(expr).await;
691 assert_eq!(res, vec![6, 7, 8]);
692
693 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
694 let res = applier_factory(expr).await;
695 assert_eq!(res, vec![3, 4, 5]);
696
697 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
698 let res = applier_factory(expr).await;
699 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
700
701 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
702 let res = applier_factory(expr).await;
703 assert!(res.is_empty());
704
705 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
706 let res = applier_factory(expr).await;
707 assert_eq!(res, vec![0, 1, 2]);
708 }
709}