1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
26use mito_codec::row_converter::SortField;
27use puffin::puffin_manager::{PuffinWriter, PutOptions};
28use snafu::{ensure, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31use tokio::io::duplex;
32use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
33
34use crate::error::{
35 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
36 PushIndexValueSnafu, Result,
37};
38use crate::read::Batch;
39use crate::sst::file::FileId;
40use crate::sst::index::intermediate::{
41 IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54pub struct InvertedIndexer {
56 index_creator: Box<dyn InvertedIndexCreator>,
58 temp_file_provider: Arc<TempFileProvider>,
60
61 codec: IndexValuesCodec,
63 value_buf: Vec<u8>,
65
66 stats: Statistics,
68 aborted: bool,
70
71 memory_usage: Arc<AtomicUsize>,
73
74 indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79 pub fn new(
82 sst_file_id: FileId,
83 metadata: &RegionMetadataRef,
84 intermediate_manager: IntermediateManager,
85 memory_usage_threshold: Option<usize>,
86 segment_row_count: NonZeroUsize,
87 indexed_column_ids: HashSet<ColumnId>,
88 ) -> Self {
89 let temp_file_provider = Arc::new(TempFileProvider::new(
90 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91 intermediate_manager,
92 ));
93
94 let memory_usage = Arc::new(AtomicUsize::new(0));
95
96 let sorter = ExternalSorter::factory(
97 temp_file_provider.clone() as _,
98 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99 memory_usage.clone(),
100 memory_usage_threshold,
101 );
102 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104 let codec = IndexValuesCodec::from_tag_columns(
105 metadata.primary_key_encoding,
106 metadata.primary_key_columns(),
107 );
108 let indexed_column_ids = indexed_column_ids
109 .into_iter()
110 .map(|col_id| {
111 let col_id_str = col_id.to_string();
112 (col_id, col_id_str)
113 })
114 .collect();
115 Self {
116 codec,
117 index_creator,
118 temp_file_provider,
119 value_buf: vec![],
120 stats: Statistics::new(TYPE_INVERTED_INDEX),
121 aborted: false,
122 memory_usage,
123 indexed_column_ids,
124 }
125 }
126
127 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130 ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132 if batch.is_empty() {
133 return Ok(());
134 }
135
136 if let Err(update_err) = self.do_update(batch).await {
137 if let Err(err) = self.do_cleanup().await {
139 if cfg!(any(test, feature = "test")) {
140 panic!("Failed to clean up index creator, err: {err}",);
141 } else {
142 warn!(err; "Failed to clean up index creator");
143 }
144 }
145 return Err(update_err);
146 }
147
148 Ok(())
149 }
150
151 pub async fn finish(
154 &mut self,
155 puffin_writer: &mut SstPuffinWriter,
156 ) -> Result<(RowCount, ByteCount)> {
157 ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159 if self.stats.row_count() == 0 {
160 return Ok((0, 0));
162 }
163
164 let finish_res = self.do_finish(puffin_writer).await;
165 if let Err(err) = self.do_cleanup().await {
167 if cfg!(any(test, feature = "test")) {
168 panic!("Failed to clean up index creator, err: {err}",);
169 } else {
170 warn!(err; "Failed to clean up index creator");
171 }
172 }
173
174 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175 }
176
177 pub async fn abort(&mut self) -> Result<()> {
179 if self.aborted {
180 return Ok(());
181 }
182 self.aborted = true;
183
184 self.do_cleanup().await
185 }
186
187 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188 let mut guard = self.stats.record_update();
189
190 let n = batch.num_rows();
191 guard.inc_row_count(n);
192
193 for (col_id, col_id_str) in &self.indexed_column_ids {
194 match self.codec.pk_col_info(*col_id) {
195 Some(col_info) => {
197 let pk_idx = col_info.idx;
198 let field = &col_info.field;
199 let value = batch
200 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201 .filter(|v| !v.is_null())
202 .map(|v| {
203 self.value_buf.clear();
204 IndexValueCodec::encode_nonnull_value(
205 v.as_value_ref(),
206 field,
207 &mut self.value_buf,
208 )
209 .context(EncodeSnafu)?;
210 Ok(self.value_buf.as_slice())
211 })
212 .transpose()?;
213
214 self.index_creator
215 .push_with_name_n(col_id_str, value, n)
216 .await
217 .context(PushIndexValueSnafu)?;
218 }
219 None => {
221 let Some(values) = batch.field_col_value(*col_id) else {
222 debug!(
223 "Column {} not found in the batch during building inverted index",
224 col_id
225 );
226 continue;
227 };
228 let sort_field = SortField::new(values.data.data_type());
229 for i in 0..n {
230 self.value_buf.clear();
231 let value = values.data.get_ref(i);
232 if value.is_null() {
233 self.index_creator
234 .push_with_name(col_id_str, None)
235 .await
236 .context(PushIndexValueSnafu)?;
237 } else {
238 IndexValueCodec::encode_nonnull_value(
239 value,
240 &sort_field,
241 &mut self.value_buf,
242 )
243 .context(EncodeSnafu)?;
244 self.index_creator
245 .push_with_name(col_id_str, Some(&self.value_buf))
246 .await
247 .context(PushIndexValueSnafu)?;
248 }
249 }
250 }
251 }
252 }
253
254 Ok(())
255 }
256
257 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
276 let mut guard = self.stats.record_finish();
277
278 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
279 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
280
281 let (index_finish, puffin_add_blob) = futures::join!(
282 self.index_creator
284 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
285 puffin_writer.put_blob(
286 INDEX_BLOB_TYPE,
287 rx.compat(),
288 PutOptions::default(),
289 Default::default(),
290 )
291 );
292
293 match (
294 puffin_add_blob.context(PuffinAddBlobSnafu),
295 index_finish.context(IndexFinishSnafu),
296 ) {
297 (Err(e1), Err(e2)) => BiErrorsSnafu {
298 first: Box::new(e1),
299 second: Box::new(e2),
300 }
301 .fail()?,
302
303 (Ok(_), e @ Err(_)) => e?,
304 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
305 (Ok(written_bytes), Ok(_)) => {
306 guard.inc_byte_count(written_bytes);
307 }
308 }
309
310 Ok(())
311 }
312
313 async fn do_cleanup(&mut self) -> Result<()> {
314 let _guard = self.stats.record_cleanup();
315
316 self.temp_file_provider.cleanup().await
317 }
318
319 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
320 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
321 }
322
323 pub fn memory_usage(&self) -> usize {
324 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::BTreeSet;
331
332 use api::v1::SemanticType;
333 use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
334 use datatypes::data_type::ConcreteDataType;
335 use datatypes::schema::ColumnSchema;
336 use datatypes::value::ValueRef;
337 use datatypes::vectors::{UInt64Vector, UInt8Vector};
338 use futures::future::BoxFuture;
339 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
340 use object_store::services::Memory;
341 use object_store::ObjectStore;
342 use puffin::puffin_manager::cache::PuffinMetadataCache;
343 use puffin::puffin_manager::PuffinManager;
344 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
345 use store_api::region_request::PathType;
346 use store_api::storage::RegionId;
347
348 use super::*;
349 use crate::access_layer::RegionFilePathFactory;
350 use crate::cache::index::inverted_index::InvertedIndexCache;
351 use crate::metrics::CACHE_BYTES;
352 use crate::read::BatchColumn;
353 use crate::sst::file::RegionFileId;
354 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
355 use crate::sst::index::puffin_manager::PuffinManagerFactory;
356
357 fn mock_object_store() -> ObjectStore {
358 ObjectStore::new(Memory::default()).unwrap().finish()
359 }
360
361 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
362 IntermediateManager::init_fs(path).await.unwrap()
363 }
364
365 fn mock_region_metadata() -> RegionMetadataRef {
366 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
367 builder
368 .push_column_metadata(ColumnMetadata {
369 column_schema: ColumnSchema::new(
370 "tag_str",
371 ConcreteDataType::string_datatype(),
372 false,
373 ),
374 semantic_type: SemanticType::Tag,
375 column_id: 1,
376 })
377 .push_column_metadata(ColumnMetadata {
378 column_schema: ColumnSchema::new(
379 "tag_i32",
380 ConcreteDataType::int32_datatype(),
381 false,
382 ),
383 semantic_type: SemanticType::Tag,
384 column_id: 2,
385 })
386 .push_column_metadata(ColumnMetadata {
387 column_schema: ColumnSchema::new(
388 "ts",
389 ConcreteDataType::timestamp_millisecond_datatype(),
390 false,
391 ),
392 semantic_type: SemanticType::Timestamp,
393 column_id: 3,
394 })
395 .push_column_metadata(ColumnMetadata {
396 column_schema: ColumnSchema::new(
397 "field_u64",
398 ConcreteDataType::uint64_datatype(),
399 false,
400 ),
401 semantic_type: SemanticType::Field,
402 column_id: 4,
403 })
404 .primary_key(vec![1, 2]);
405
406 Arc::new(builder.build().unwrap())
407 }
408
409 fn new_batch(
410 str_tag: impl AsRef<str>,
411 i32_tag: impl Into<i32>,
412 u64_field: impl IntoIterator<Item = u64>,
413 ) -> Batch {
414 let fields = vec![
415 (0, SortField::new(ConcreteDataType::string_datatype())),
416 (1, SortField::new(ConcreteDataType::int32_datatype())),
417 ];
418 let codec = DensePrimaryKeyCodec::with_fields(fields);
419 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
420 let primary_key = codec.encode(row.into_iter()).unwrap();
421
422 let u64_field = BatchColumn {
423 column_id: 4,
424 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
425 };
426 let num_rows = u64_field.data.len();
427
428 Batch::new(
429 primary_key,
430 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
431 0, num_rows,
432 ))),
433 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
434 0, num_rows,
435 ))),
436 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
437 1, num_rows,
438 ))),
439 vec![u64_field],
440 )
441 .unwrap()
442 }
443
444 async fn build_applier_factory(
445 prefix: &str,
446 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
447 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
448 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
449 let table_dir = "table0".to_string();
450 let sst_file_id = FileId::random();
451 let object_store = mock_object_store();
452 let region_metadata = mock_region_metadata();
453 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
454 let memory_threshold = None;
455 let segment_row_count = 2;
456 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
457
458 let mut creator = InvertedIndexer::new(
459 sst_file_id,
460 ®ion_metadata,
461 intm_mgr,
462 memory_threshold,
463 NonZeroUsize::new(segment_row_count).unwrap(),
464 indexed_column_ids.clone(),
465 );
466
467 for (str_tag, i32_tag, u64_field) in &rows {
468 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
469 creator.update(&mut batch).await.unwrap();
470 }
471
472 let puffin_manager = factory.build(
473 object_store.clone(),
474 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
475 );
476
477 let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
478 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
479 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
480 assert_eq!(row_count, rows.len() * segment_row_count);
481 writer.finish().await.unwrap();
482
483 move |expr| {
484 let _d = &d;
485 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
486 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
487 let applier = InvertedIndexApplierBuilder::new(
488 table_dir.clone(),
489 PathType::Bare,
490 object_store.clone(),
491 ®ion_metadata,
492 indexed_column_ids.clone(),
493 factory.clone(),
494 )
495 .with_inverted_index_cache(Some(cache))
496 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
497 .build(&[expr])
498 .unwrap()
499 .unwrap();
500 Box::pin(async move {
501 applier
502 .apply(sst_file_id, None)
503 .await
504 .unwrap()
505 .matched_segment_ids
506 .iter_ones()
507 .collect()
508 })
509 }
510 }
511
512 #[tokio::test]
513 async fn test_create_and_query_get_key() {
514 let rows = BTreeSet::from_iter([
515 ("aaa", 1, [1, 2]),
516 ("aaa", 2, [2, 3]),
517 ("aaa", 3, [3, 4]),
518 ("aab", 1, [4, 5]),
519 ("aab", 2, [5, 6]),
520 ("aab", 3, [6, 7]),
521 ("abc", 1, [7, 8]),
522 ("abc", 2, [8, 9]),
523 ("abc", 3, [9, 10]),
524 ]);
525
526 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
527
528 let expr = col("tag_str").eq(lit("aaa"));
529 let res = applier_factory(expr).await;
530 assert_eq!(res, vec![0, 1, 2]);
531
532 let expr = col("tag_i32").eq(lit(2));
533 let res = applier_factory(expr).await;
534 assert_eq!(res, vec![1, 4, 7]);
535
536 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
537 let res = applier_factory(expr).await;
538 assert_eq!(res, vec![1]);
539
540 let expr = col("tag_str")
541 .eq(lit("aaa"))
542 .or(col("tag_str").eq(lit("abc")));
543 let res = applier_factory(expr).await;
544 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
545
546 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
547 let res = applier_factory(expr).await;
548 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
549
550 let expr = col("field_u64").eq(lit(2u64));
551 let res = applier_factory(expr).await;
552 assert_eq!(res, vec![0, 1]);
553 }
554
555 #[tokio::test]
556 async fn test_create_and_query_range() {
557 let rows = BTreeSet::from_iter([
558 ("aaa", 1, [1, 2]),
559 ("aaa", 2, [2, 3]),
560 ("aaa", 3, [3, 4]),
561 ("aab", 1, [4, 5]),
562 ("aab", 2, [5, 6]),
563 ("aab", 3, [6, 7]),
564 ("abc", 1, [7, 8]),
565 ("abc", 2, [8, 9]),
566 ("abc", 3, [9, 10]),
567 ]);
568
569 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
570
571 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
572 let res = applier_factory(expr).await;
573 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
574
575 let expr = col("tag_i32").between(lit(2), lit(3));
576 let res = applier_factory(expr).await;
577 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
578
579 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
580 let res = applier_factory(expr).await;
581 assert_eq!(res, vec![0, 1, 2]);
582
583 let expr = col("tag_i32").between(lit(2), lit(2));
584 let res = applier_factory(expr).await;
585 assert_eq!(res, vec![1, 4, 7]);
586
587 let expr = col("field_u64").between(lit(2u64), lit(5u64));
588 let res = applier_factory(expr).await;
589 assert_eq!(res, vec![0, 1, 2, 3, 4]);
590 }
591
592 #[tokio::test]
593 async fn test_create_and_query_comparison() {
594 let rows = BTreeSet::from_iter([
595 ("aaa", 1, [1, 2]),
596 ("aaa", 2, [2, 3]),
597 ("aaa", 3, [3, 4]),
598 ("aab", 1, [4, 5]),
599 ("aab", 2, [5, 6]),
600 ("aab", 3, [6, 7]),
601 ("abc", 1, [7, 8]),
602 ("abc", 2, [8, 9]),
603 ("abc", 3, [9, 10]),
604 ]);
605
606 let applier_factory =
607 build_applier_factory("test_create_and_query_comparison_", rows).await;
608
609 let expr = col("tag_str").lt(lit("aab"));
610 let res = applier_factory(expr).await;
611 assert_eq!(res, vec![0, 1, 2]);
612
613 let expr = col("tag_i32").lt(lit(2));
614 let res = applier_factory(expr).await;
615 assert_eq!(res, vec![0, 3, 6]);
616
617 let expr = col("field_u64").lt(lit(2u64));
618 let res = applier_factory(expr).await;
619 assert_eq!(res, vec![0]);
620
621 let expr = col("tag_str").gt(lit("aab"));
622 let res = applier_factory(expr).await;
623 assert_eq!(res, vec![6, 7, 8]);
624
625 let expr = col("tag_i32").gt(lit(2));
626 let res = applier_factory(expr).await;
627 assert_eq!(res, vec![2, 5, 8]);
628
629 let expr = col("field_u64").gt(lit(8u64));
630 let res = applier_factory(expr).await;
631 assert_eq!(res, vec![7, 8]);
632
633 let expr = col("tag_str").lt_eq(lit("aab"));
634 let res = applier_factory(expr).await;
635 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
636
637 let expr = col("tag_i32").lt_eq(lit(2));
638 let res = applier_factory(expr).await;
639 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
640
641 let expr = col("field_u64").lt_eq(lit(2u64));
642 let res = applier_factory(expr).await;
643 assert_eq!(res, vec![0, 1]);
644
645 let expr = col("tag_str").gt_eq(lit("aab"));
646 let res = applier_factory(expr).await;
647 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
648
649 let expr = col("tag_i32").gt_eq(lit(2));
650 let res = applier_factory(expr).await;
651 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
652
653 let expr = col("field_u64").gt_eq(lit(8u64));
654 let res = applier_factory(expr).await;
655 assert_eq!(res, vec![6, 7, 8]);
656
657 let expr = col("tag_str")
658 .gt(lit("aaa"))
659 .and(col("tag_str").lt(lit("abc")));
660 let res = applier_factory(expr).await;
661 assert_eq!(res, vec![3, 4, 5]);
662
663 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
664 let res = applier_factory(expr).await;
665 assert_eq!(res, vec![1, 4, 7]);
666
667 let expr = col("field_u64")
668 .gt(lit(2u64))
669 .and(col("field_u64").lt(lit(9u64)));
670 let res = applier_factory(expr).await;
671 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
672 }
673
674 #[tokio::test]
675 async fn test_create_and_query_regex() {
676 let rows = BTreeSet::from_iter([
677 ("aaa", 1, [1, 2]),
678 ("aaa", 2, [2, 3]),
679 ("aaa", 3, [3, 4]),
680 ("aab", 1, [4, 5]),
681 ("aab", 2, [5, 6]),
682 ("aab", 3, [6, 7]),
683 ("abc", 1, [7, 8]),
684 ("abc", 2, [8, 9]),
685 ("abc", 3, [9, 10]),
686 ]);
687
688 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
689
690 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
691 let res = applier_factory(expr).await;
692 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
693
694 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
695 let res = applier_factory(expr).await;
696 assert_eq!(res, vec![6, 7, 8]);
697
698 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
699 let res = applier_factory(expr).await;
700 assert_eq!(res, vec![3, 4, 5]);
701
702 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
703 let res = applier_factory(expr).await;
704 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
705
706 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
707 let res = applier_factory(expr).await;
708 assert!(res.is_empty());
709
710 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
711 let res = applier_factory(expr).await;
712 assert_eq!(res, vec![0, 1, 2]);
713 }
714}