1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use puffin::puffin_manager::{PuffinWriter, PutOptions};
26use snafu::{ensure, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::ColumnId;
29use tokio::io::duplex;
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33 BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
34 PushIndexValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::row_converter::SortField;
38use crate::sst::file::FileId;
39use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
40use crate::sst::index::intermediate::{
41 IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54pub struct InvertedIndexer {
56 index_creator: Box<dyn InvertedIndexCreator>,
58 temp_file_provider: Arc<TempFileProvider>,
60
61 codec: IndexValuesCodec,
63 value_buf: Vec<u8>,
65
66 stats: Statistics,
68 aborted: bool,
70
71 memory_usage: Arc<AtomicUsize>,
73
74 indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79 pub fn new(
82 sst_file_id: FileId,
83 metadata: &RegionMetadataRef,
84 intermediate_manager: IntermediateManager,
85 memory_usage_threshold: Option<usize>,
86 segment_row_count: NonZeroUsize,
87 indexed_column_ids: HashSet<ColumnId>,
88 ) -> Self {
89 let temp_file_provider = Arc::new(TempFileProvider::new(
90 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91 intermediate_manager,
92 ));
93
94 let memory_usage = Arc::new(AtomicUsize::new(0));
95
96 let sorter = ExternalSorter::factory(
97 temp_file_provider.clone() as _,
98 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99 memory_usage.clone(),
100 memory_usage_threshold,
101 );
102 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104 let codec = IndexValuesCodec::from_tag_columns(
105 metadata.primary_key_encoding,
106 metadata.primary_key_columns(),
107 );
108 let indexed_column_ids = indexed_column_ids
109 .into_iter()
110 .map(|col_id| {
111 let col_id_str = col_id.to_string();
112 (col_id, col_id_str)
113 })
114 .collect();
115 Self {
116 codec,
117 index_creator,
118 temp_file_provider,
119 value_buf: vec![],
120 stats: Statistics::new(TYPE_INVERTED_INDEX),
121 aborted: false,
122 memory_usage,
123 indexed_column_ids,
124 }
125 }
126
127 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130 ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132 if batch.is_empty() {
133 return Ok(());
134 }
135
136 if let Err(update_err) = self.do_update(batch).await {
137 if let Err(err) = self.do_cleanup().await {
139 if cfg!(any(test, feature = "test")) {
140 panic!("Failed to clean up index creator, err: {err}",);
141 } else {
142 warn!(err; "Failed to clean up index creator");
143 }
144 }
145 return Err(update_err);
146 }
147
148 Ok(())
149 }
150
151 pub async fn finish(
154 &mut self,
155 puffin_writer: &mut SstPuffinWriter,
156 ) -> Result<(RowCount, ByteCount)> {
157 ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159 if self.stats.row_count() == 0 {
160 return Ok((0, 0));
162 }
163
164 let finish_res = self.do_finish(puffin_writer).await;
165 if let Err(err) = self.do_cleanup().await {
167 if cfg!(any(test, feature = "test")) {
168 panic!("Failed to clean up index creator, err: {err}",);
169 } else {
170 warn!(err; "Failed to clean up index creator");
171 }
172 }
173
174 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175 }
176
177 pub async fn abort(&mut self) -> Result<()> {
179 if self.aborted {
180 return Ok(());
181 }
182 self.aborted = true;
183
184 self.do_cleanup().await
185 }
186
187 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188 let mut guard = self.stats.record_update();
189
190 let n = batch.num_rows();
191 guard.inc_row_count(n);
192
193 for (col_id, col_id_str) in &self.indexed_column_ids {
194 match self.codec.pk_col_info(*col_id) {
195 Some(col_info) => {
197 let pk_idx = col_info.idx;
198 let field = &col_info.field;
199 let value = batch
200 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201 .filter(|v| !v.is_null())
202 .map(|v| {
203 self.value_buf.clear();
204 IndexValueCodec::encode_nonnull_value(
205 v.as_value_ref(),
206 field,
207 &mut self.value_buf,
208 )?;
209 Ok(self.value_buf.as_slice())
210 })
211 .transpose()?;
212
213 self.index_creator
214 .push_with_name_n(col_id_str, value, n)
215 .await
216 .context(PushIndexValueSnafu)?;
217 }
218 None => {
220 let Some(values) = batch.field_col_value(*col_id) else {
221 debug!(
222 "Column {} not found in the batch during building inverted index",
223 col_id
224 );
225 continue;
226 };
227 let sort_field = SortField::new(values.data.data_type());
228 for i in 0..n {
229 self.value_buf.clear();
230 let value = values.data.get_ref(i);
231 if value.is_null() {
232 self.index_creator
233 .push_with_name(col_id_str, None)
234 .await
235 .context(PushIndexValueSnafu)?;
236 } else {
237 IndexValueCodec::encode_nonnull_value(
238 value,
239 &sort_field,
240 &mut self.value_buf,
241 )?;
242 self.index_creator
243 .push_with_name(col_id_str, Some(&self.value_buf))
244 .await
245 .context(PushIndexValueSnafu)?;
246 }
247 }
248 }
249 }
250 }
251
252 Ok(())
253 }
254
255 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
274 let mut guard = self.stats.record_finish();
275
276 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
277 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
278
279 let (index_finish, puffin_add_blob) = futures::join!(
280 self.index_creator
282 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
283 puffin_writer.put_blob(
284 INDEX_BLOB_TYPE,
285 rx.compat(),
286 PutOptions::default(),
287 Default::default(),
288 )
289 );
290
291 match (
292 puffin_add_blob.context(PuffinAddBlobSnafu),
293 index_finish.context(IndexFinishSnafu),
294 ) {
295 (Err(e1), Err(e2)) => BiErrorsSnafu {
296 first: Box::new(e1),
297 second: Box::new(e2),
298 }
299 .fail()?,
300
301 (Ok(_), e @ Err(_)) => e?,
302 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
303 (Ok(written_bytes), Ok(_)) => {
304 guard.inc_byte_count(written_bytes);
305 }
306 }
307
308 Ok(())
309 }
310
311 async fn do_cleanup(&mut self) -> Result<()> {
312 let _guard = self.stats.record_cleanup();
313
314 self.temp_file_provider.cleanup().await
315 }
316
317 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
318 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
319 }
320
321 pub fn memory_usage(&self) -> usize {
322 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::collections::BTreeSet;
329
330 use api::v1::SemanticType;
331 use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
332 use datatypes::data_type::ConcreteDataType;
333 use datatypes::schema::ColumnSchema;
334 use datatypes::value::ValueRef;
335 use datatypes::vectors::{UInt64Vector, UInt8Vector};
336 use futures::future::BoxFuture;
337 use object_store::services::Memory;
338 use object_store::ObjectStore;
339 use puffin::puffin_manager::cache::PuffinMetadataCache;
340 use puffin::puffin_manager::PuffinManager;
341 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
342 use store_api::storage::RegionId;
343
344 use super::*;
345 use crate::access_layer::RegionFilePathFactory;
346 use crate::cache::index::inverted_index::InvertedIndexCache;
347 use crate::metrics::CACHE_BYTES;
348 use crate::read::BatchColumn;
349 use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
350 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
351 use crate::sst::index::puffin_manager::PuffinManagerFactory;
352
353 fn mock_object_store() -> ObjectStore {
354 ObjectStore::new(Memory::default()).unwrap().finish()
355 }
356
357 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
358 IntermediateManager::init_fs(path).await.unwrap()
359 }
360
361 fn mock_region_metadata() -> RegionMetadataRef {
362 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
363 builder
364 .push_column_metadata(ColumnMetadata {
365 column_schema: ColumnSchema::new(
366 "tag_str",
367 ConcreteDataType::string_datatype(),
368 false,
369 ),
370 semantic_type: SemanticType::Tag,
371 column_id: 1,
372 })
373 .push_column_metadata(ColumnMetadata {
374 column_schema: ColumnSchema::new(
375 "tag_i32",
376 ConcreteDataType::int32_datatype(),
377 false,
378 ),
379 semantic_type: SemanticType::Tag,
380 column_id: 2,
381 })
382 .push_column_metadata(ColumnMetadata {
383 column_schema: ColumnSchema::new(
384 "ts",
385 ConcreteDataType::timestamp_millisecond_datatype(),
386 false,
387 ),
388 semantic_type: SemanticType::Timestamp,
389 column_id: 3,
390 })
391 .push_column_metadata(ColumnMetadata {
392 column_schema: ColumnSchema::new(
393 "field_u64",
394 ConcreteDataType::uint64_datatype(),
395 false,
396 ),
397 semantic_type: SemanticType::Field,
398 column_id: 4,
399 })
400 .primary_key(vec![1, 2]);
401
402 Arc::new(builder.build().unwrap())
403 }
404
405 fn new_batch(
406 str_tag: impl AsRef<str>,
407 i32_tag: impl Into<i32>,
408 u64_field: impl IntoIterator<Item = u64>,
409 ) -> Batch {
410 let fields = vec![
411 (0, SortField::new(ConcreteDataType::string_datatype())),
412 (1, SortField::new(ConcreteDataType::int32_datatype())),
413 ];
414 let codec = DensePrimaryKeyCodec::with_fields(fields);
415 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
416 let primary_key = codec.encode(row.into_iter()).unwrap();
417
418 let u64_field = BatchColumn {
419 column_id: 4,
420 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
421 };
422 let num_rows = u64_field.data.len();
423
424 Batch::new(
425 primary_key,
426 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
427 0, num_rows,
428 ))),
429 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
430 0, num_rows,
431 ))),
432 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
433 1, num_rows,
434 ))),
435 vec![u64_field],
436 )
437 .unwrap()
438 }
439
440 async fn build_applier_factory(
441 prefix: &str,
442 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
443 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
444 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
445 let region_dir = "region0".to_string();
446 let sst_file_id = FileId::random();
447 let object_store = mock_object_store();
448 let region_metadata = mock_region_metadata();
449 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
450 let memory_threshold = None;
451 let segment_row_count = 2;
452 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
453
454 let mut creator = InvertedIndexer::new(
455 sst_file_id,
456 ®ion_metadata,
457 intm_mgr,
458 memory_threshold,
459 NonZeroUsize::new(segment_row_count).unwrap(),
460 indexed_column_ids.clone(),
461 );
462
463 for (str_tag, i32_tag, u64_field) in &rows {
464 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
465 creator.update(&mut batch).await.unwrap();
466 }
467
468 let puffin_manager = factory.build(
469 object_store.clone(),
470 RegionFilePathFactory::new(region_dir.clone()),
471 );
472 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
473 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
474 assert_eq!(row_count, rows.len() * segment_row_count);
475 writer.finish().await.unwrap();
476
477 move |expr| {
478 let _d = &d;
479 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
480 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
481 let applier = InvertedIndexApplierBuilder::new(
482 region_dir.clone(),
483 object_store.clone(),
484 ®ion_metadata,
485 indexed_column_ids.clone(),
486 factory.clone(),
487 )
488 .with_inverted_index_cache(Some(cache))
489 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
490 .build(&[expr])
491 .unwrap()
492 .unwrap();
493 Box::pin(async move {
494 applier
495 .apply(sst_file_id, None)
496 .await
497 .unwrap()
498 .matched_segment_ids
499 .iter_ones()
500 .collect()
501 })
502 }
503 }
504
505 #[tokio::test]
506 async fn test_create_and_query_get_key() {
507 let rows = BTreeSet::from_iter([
508 ("aaa", 1, [1, 2]),
509 ("aaa", 2, [2, 3]),
510 ("aaa", 3, [3, 4]),
511 ("aab", 1, [4, 5]),
512 ("aab", 2, [5, 6]),
513 ("aab", 3, [6, 7]),
514 ("abc", 1, [7, 8]),
515 ("abc", 2, [8, 9]),
516 ("abc", 3, [9, 10]),
517 ]);
518
519 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
520
521 let expr = col("tag_str").eq(lit("aaa"));
522 let res = applier_factory(expr).await;
523 assert_eq!(res, vec![0, 1, 2]);
524
525 let expr = col("tag_i32").eq(lit(2));
526 let res = applier_factory(expr).await;
527 assert_eq!(res, vec![1, 4, 7]);
528
529 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
530 let res = applier_factory(expr).await;
531 assert_eq!(res, vec![1]);
532
533 let expr = col("tag_str")
534 .eq(lit("aaa"))
535 .or(col("tag_str").eq(lit("abc")));
536 let res = applier_factory(expr).await;
537 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
538
539 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
540 let res = applier_factory(expr).await;
541 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
542
543 let expr = col("field_u64").eq(lit(2u64));
544 let res = applier_factory(expr).await;
545 assert_eq!(res, vec![0, 1]);
546 }
547
548 #[tokio::test]
549 async fn test_create_and_query_range() {
550 let rows = BTreeSet::from_iter([
551 ("aaa", 1, [1, 2]),
552 ("aaa", 2, [2, 3]),
553 ("aaa", 3, [3, 4]),
554 ("aab", 1, [4, 5]),
555 ("aab", 2, [5, 6]),
556 ("aab", 3, [6, 7]),
557 ("abc", 1, [7, 8]),
558 ("abc", 2, [8, 9]),
559 ("abc", 3, [9, 10]),
560 ]);
561
562 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
563
564 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
565 let res = applier_factory(expr).await;
566 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
567
568 let expr = col("tag_i32").between(lit(2), lit(3));
569 let res = applier_factory(expr).await;
570 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
571
572 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
573 let res = applier_factory(expr).await;
574 assert_eq!(res, vec![0, 1, 2]);
575
576 let expr = col("tag_i32").between(lit(2), lit(2));
577 let res = applier_factory(expr).await;
578 assert_eq!(res, vec![1, 4, 7]);
579
580 let expr = col("field_u64").between(lit(2u64), lit(5u64));
581 let res = applier_factory(expr).await;
582 assert_eq!(res, vec![0, 1, 2, 3, 4]);
583 }
584
585 #[tokio::test]
586 async fn test_create_and_query_comparison() {
587 let rows = BTreeSet::from_iter([
588 ("aaa", 1, [1, 2]),
589 ("aaa", 2, [2, 3]),
590 ("aaa", 3, [3, 4]),
591 ("aab", 1, [4, 5]),
592 ("aab", 2, [5, 6]),
593 ("aab", 3, [6, 7]),
594 ("abc", 1, [7, 8]),
595 ("abc", 2, [8, 9]),
596 ("abc", 3, [9, 10]),
597 ]);
598
599 let applier_factory =
600 build_applier_factory("test_create_and_query_comparison_", rows).await;
601
602 let expr = col("tag_str").lt(lit("aab"));
603 let res = applier_factory(expr).await;
604 assert_eq!(res, vec![0, 1, 2]);
605
606 let expr = col("tag_i32").lt(lit(2));
607 let res = applier_factory(expr).await;
608 assert_eq!(res, vec![0, 3, 6]);
609
610 let expr = col("field_u64").lt(lit(2u64));
611 let res = applier_factory(expr).await;
612 assert_eq!(res, vec![0]);
613
614 let expr = col("tag_str").gt(lit("aab"));
615 let res = applier_factory(expr).await;
616 assert_eq!(res, vec![6, 7, 8]);
617
618 let expr = col("tag_i32").gt(lit(2));
619 let res = applier_factory(expr).await;
620 assert_eq!(res, vec![2, 5, 8]);
621
622 let expr = col("field_u64").gt(lit(8u64));
623 let res = applier_factory(expr).await;
624 assert_eq!(res, vec![7, 8]);
625
626 let expr = col("tag_str").lt_eq(lit("aab"));
627 let res = applier_factory(expr).await;
628 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
629
630 let expr = col("tag_i32").lt_eq(lit(2));
631 let res = applier_factory(expr).await;
632 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
633
634 let expr = col("field_u64").lt_eq(lit(2u64));
635 let res = applier_factory(expr).await;
636 assert_eq!(res, vec![0, 1]);
637
638 let expr = col("tag_str").gt_eq(lit("aab"));
639 let res = applier_factory(expr).await;
640 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
641
642 let expr = col("tag_i32").gt_eq(lit(2));
643 let res = applier_factory(expr).await;
644 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
645
646 let expr = col("field_u64").gt_eq(lit(8u64));
647 let res = applier_factory(expr).await;
648 assert_eq!(res, vec![6, 7, 8]);
649
650 let expr = col("tag_str")
651 .gt(lit("aaa"))
652 .and(col("tag_str").lt(lit("abc")));
653 let res = applier_factory(expr).await;
654 assert_eq!(res, vec![3, 4, 5]);
655
656 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
657 let res = applier_factory(expr).await;
658 assert_eq!(res, vec![1, 4, 7]);
659
660 let expr = col("field_u64")
661 .gt(lit(2u64))
662 .and(col("field_u64").lt(lit(9u64)));
663 let res = applier_factory(expr).await;
664 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
665 }
666
667 #[tokio::test]
668 async fn test_create_and_query_regex() {
669 let rows = BTreeSet::from_iter([
670 ("aaa", 1, [1, 2]),
671 ("aaa", 2, [2, 3]),
672 ("aaa", 3, [3, 4]),
673 ("aab", 1, [4, 5]),
674 ("aab", 2, [5, 6]),
675 ("aab", 3, [6, 7]),
676 ("abc", 1, [7, 8]),
677 ("abc", 2, [8, 9]),
678 ("abc", 3, [9, 10]),
679 ]);
680
681 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
682
683 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
684 let res = applier_factory(expr).await;
685 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
686
687 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
688 let res = applier_factory(expr).await;
689 assert_eq!(res, vec![6, 7, 8]);
690
691 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
692 let res = applier_factory(expr).await;
693 assert_eq!(res, vec![3, 4, 5]);
694
695 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
696 let res = applier_factory(expr).await;
697 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
698
699 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
700 let res = applier_factory(expr).await;
701 assert!(res.is_empty());
702
703 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
704 let res = applier_factory(expr).await;
705 assert_eq!(res, vec![0, 1, 2]);
706 }
707}