Skip to main content

mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::{tracing, warn};
24use datatypes::data_type::ConcreteDataType;
25use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
26use index::bloom_filter::reader::{
27    BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
28};
29use index::target::IndexTarget;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use puffin::puffin_manager::{PuffinManager, PuffinReader};
33use snafu::ResultExt;
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
39use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
40use crate::cache::index::bloom_filter_index::{
41    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
42};
43use crate::error::{
44    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
45    Result,
46};
47use crate::metrics::INDEX_APPLY_ELAPSED;
48use crate::sst::file::RegionIndexId;
49use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
50pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
51use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
52use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
53
54/// Metrics for tracking bloom filter index apply operations.
55#[derive(Default, Clone)]
56pub struct BloomFilterIndexApplyMetrics {
57    /// Total time spent applying the index.
58    pub apply_elapsed: std::time::Duration,
59    /// Number of blob cache misses.
60    pub blob_cache_miss: usize,
61    /// Total size of blobs read (in bytes).
62    pub blob_read_bytes: u64,
63    /// Metrics for bloom filter read operations.
64    pub read_metrics: BloomFilterReadMetrics,
65}
66
67impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let Self {
70            apply_elapsed,
71            blob_cache_miss,
72            blob_read_bytes,
73            read_metrics,
74        } = self;
75
76        if self.is_empty() {
77            return write!(f, "{{}}");
78        }
79        write!(f, "{{")?;
80
81        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
82
83        if *blob_cache_miss > 0 {
84            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
85        }
86        if *blob_read_bytes > 0 {
87            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
88        }
89        write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
90
91        write!(f, "}}")
92    }
93}
94
95impl BloomFilterIndexApplyMetrics {
96    /// Returns true if the metrics are empty (contain no meaningful data).
97    pub fn is_empty(&self) -> bool {
98        self.apply_elapsed.is_zero()
99    }
100
101    /// Merges another metrics into this one.
102    pub fn merge_from(&mut self, other: &Self) {
103        self.apply_elapsed += other.apply_elapsed;
104        self.blob_cache_miss += other.blob_cache_miss;
105        self.blob_read_bytes += other.blob_read_bytes;
106        self.read_metrics.merge_from(&other.read_metrics);
107    }
108}
109
110pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
111
112/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
113pub struct BloomFilterIndexApplier {
114    /// Directory of the table.
115    table_dir: String,
116
117    /// Path type for generating file paths.
118    path_type: PathType,
119
120    /// Object store to read the index file.
121    object_store: ObjectStore,
122
123    /// File cache to read the index file.
124    file_cache: Option<FileCacheRef>,
125
126    /// Factory to create puffin manager.
127    puffin_manager_factory: PuffinManagerFactory,
128
129    /// Cache for puffin metadata.
130    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
131
132    /// Cache for bloom filter index.
133    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
134
135    /// Bloom filter predicates.
136    /// For each column, the value will be retained only if it contains __all__ predicates.
137    default_predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
138
139    /// Expected predicate column types from the latest region metadata.
140    expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
141}
142
143impl BloomFilterIndexApplier {
144    /// Creates a new `BloomFilterIndexApplier`.
145    ///
146    /// For each column, the value will be retained only if it contains __all__ predicates.
147    pub fn new(
148        table_dir: String,
149        path_type: PathType,
150        object_store: ObjectStore,
151        puffin_manager_factory: PuffinManagerFactory,
152        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
153        expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
154    ) -> Self {
155        let default_predicates = Arc::new(predicates);
156        Self {
157            table_dir,
158            path_type,
159            object_store,
160            file_cache: None,
161            puffin_manager_factory,
162            puffin_metadata_cache: None,
163            bloom_filter_index_cache: None,
164            default_predicates,
165            expected_predicate_col_types,
166        }
167    }
168
169    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170        self.file_cache = file_cache;
171        self
172    }
173
174    pub fn with_puffin_metadata_cache(
175        mut self,
176        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
177    ) -> Self {
178        self.puffin_metadata_cache = puffin_metadata_cache;
179        self
180    }
181
182    pub fn with_bloom_filter_cache(
183        mut self,
184        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
185    ) -> Self {
186        self.bloom_filter_index_cache = bloom_filter_index_cache;
187        self
188    }
189
190    /// Applies bloom filter predicates to the provided SST file and returns a
191    /// list of row group ranges that match the predicates.
192    ///
193    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
194    ///
195    /// Row group id existing in the returned result means that the row group is searched.
196    /// Empty ranges means that the row group is searched but no rows are found.
197    ///
198    ///
199    /// # Arguments
200    /// * `file_id` - The region file ID to apply predicates to
201    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
202    /// * `row_groups` - Iterator of row group lengths and whether to search in the row group
203    /// * `metrics` - Optional mutable reference to collect metrics on demand
204    #[tracing::instrument(
205        skip_all,
206        fields(file_id = %file_id)
207    )]
208    pub async fn apply(
209        &self,
210        file_id: RegionIndexId,
211        file_size_hint: Option<u64>,
212        predicates: &BTreeMap<ColumnId, Vec<InListPredicate>>,
213        row_groups: impl Iterator<Item = (usize, bool)>,
214        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
215    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
216        let apply_start = Instant::now();
217
218        // Calculates row groups' ranges based on start of the file.
219        let mut input = Vec::with_capacity(row_groups.size_hint().0);
220        let mut start = 0;
221        for (i, (len, to_search)) in row_groups.enumerate() {
222            let end = start + len;
223            if to_search {
224                input.push((i, start..end));
225            }
226            start = end;
227        }
228
229        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
230        // so we need to adjust them later.
231        let mut output = input
232            .iter()
233            .map(|(i, range)| (*i, vec![range.clone()]))
234            .collect::<Vec<_>>();
235
236        for (column_id, predicates) in predicates {
237            let blob = match self
238                .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
239                .await?
240            {
241                Some(blob) => blob,
242                None => continue,
243            };
244
245            // Create appropriate reader based on whether we have caching enabled
246            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
247                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
248                if let Some(m) = &mut metrics {
249                    m.blob_read_bytes += blob_size;
250                }
251                let reader = CachedBloomFilterIndexBlobReader::new(
252                    file_id.file_id(),
253                    file_id.version,
254                    *column_id,
255                    Tag::Skipping,
256                    blob_size,
257                    BloomFilterReaderImpl::new(blob),
258                    bloom_filter_cache.clone(),
259                );
260                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
261                    .await
262                    .context(ApplyBloomFilterIndexSnafu)?;
263            } else {
264                let reader = BloomFilterReaderImpl::new(blob);
265                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
266                    .await
267                    .context(ApplyBloomFilterIndexSnafu)?;
268            }
269        }
270
271        // adjust ranges to be based on row group
272        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
273            let start = input.start;
274            for range in output.iter_mut() {
275                range.start -= start;
276                range.end -= start;
277            }
278        }
279
280        // Record elapsed time to histogram and collect metrics if requested
281        let elapsed = apply_start.elapsed();
282        INDEX_APPLY_ELAPSED
283            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
284            .observe(elapsed.as_secs_f64());
285
286        if let Some(m) = metrics {
287            m.apply_elapsed += elapsed;
288        }
289
290        Ok(output)
291    }
292
293    /// Creates a blob reader from the cached or remote index file.
294    ///
295    /// Returus `None` if the column does not have an index.
296    async fn blob_reader(
297        &self,
298        file_id: RegionIndexId,
299        column_id: ColumnId,
300        file_size_hint: Option<u64>,
301        metrics: Option<&mut BloomFilterIndexApplyMetrics>,
302    ) -> Result<Option<BlobReader>> {
303        let reader = match self
304            .cached_blob_reader(file_id, column_id, file_size_hint)
305            .await
306        {
307            Ok(Some(puffin_reader)) => puffin_reader,
308            other => {
309                if let Some(m) = metrics {
310                    m.blob_cache_miss += 1;
311                }
312                if let Err(err) = other {
313                    // Blob not found means no index for this column
314                    if is_blob_not_found(&err) {
315                        return Ok(None);
316                    }
317                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
318                }
319                let res = self
320                    .remote_blob_reader(file_id, column_id, file_size_hint)
321                    .await;
322                if let Err(err) = res {
323                    // Blob not found means no index for this column
324                    if is_blob_not_found(&err) {
325                        return Ok(None);
326                    }
327                    return Err(err);
328                }
329
330                res?
331            }
332        };
333
334        Ok(Some(reader))
335    }
336
337    /// Creates a blob reader from the cached index file
338    async fn cached_blob_reader(
339        &self,
340        file_id: RegionIndexId,
341        column_id: ColumnId,
342        file_size_hint: Option<u64>,
343    ) -> Result<Option<BlobReader>> {
344        let Some(file_cache) = &self.file_cache else {
345            return Ok(None);
346        };
347
348        let index_key = IndexKey::new(
349            file_id.region_id(),
350            file_id.file_id(),
351            FileType::Puffin(file_id.version),
352        );
353        if file_cache.get(index_key).await.is_none() {
354            return Ok(None);
355        };
356
357        let puffin_manager = self.puffin_manager_factory.build(
358            file_cache.local_store(),
359            WriteCachePathProvider::new(file_cache.clone()),
360        );
361        let blob_name = Self::column_blob_name(column_id);
362
363        let reader = puffin_manager
364            .reader(&file_id)
365            .await
366            .context(PuffinBuildReaderSnafu)?
367            .with_file_size_hint(file_size_hint)
368            .blob(&blob_name)
369            .await
370            .context(PuffinReadBlobSnafu)?
371            .reader()
372            .await
373            .context(PuffinBuildReaderSnafu)?;
374        Ok(Some(reader))
375    }
376
377    // TODO(ruihang): use the same util with the code in creator
378    fn column_blob_name(column_id: ColumnId) -> String {
379        format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
380    }
381
382    /// Creates a blob reader from the remote index file
383    async fn remote_blob_reader(
384        &self,
385        file_id: RegionIndexId,
386        column_id: ColumnId,
387        file_size_hint: Option<u64>,
388    ) -> Result<BlobReader> {
389        let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
390
391        // Trigger background download if file cache and file size are available
392        trigger_index_background_download(
393            self.file_cache.as_ref(),
394            &file_id,
395            file_size_hint,
396            &path_factory,
397            &self.object_store,
398        );
399
400        let puffin_manager = self
401            .puffin_manager_factory
402            .build(self.object_store.clone(), path_factory)
403            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
404
405        let blob_name = Self::column_blob_name(column_id);
406
407        puffin_manager
408            .reader(&file_id)
409            .await
410            .context(PuffinBuildReaderSnafu)?
411            .with_file_size_hint(file_size_hint)
412            .blob(&blob_name)
413            .await
414            .context(PuffinReadBlobSnafu)?
415            .reader()
416            .await
417            .context(PuffinBuildReaderSnafu)
418    }
419
420    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
421        &self,
422        reader: R,
423        predicates: &[InListPredicate],
424        output: &mut [(usize, Vec<Range<usize>>)],
425        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
426    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
427        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
428
429        for (_, row_group_output) in output.iter_mut() {
430            // All rows are filtered out, skip the search
431            if row_group_output.is_empty() {
432                continue;
433            }
434
435            let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
436            *row_group_output = applier
437                .search(predicates, row_group_output, read_metrics)
438                .await?;
439        }
440
441        Ok(())
442    }
443
444    /// Returns compatible bloom filter predicates with the given SST metadata.
445    ///
446    /// Returns `None` when no compatible predicate remains for this SST.
447    pub fn compatible_predicate_for_sst(
448        &self,
449        sst_metadata: &RegionMetadataRef,
450    ) -> Option<Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>> {
451        let mut has_type_mismatch = false;
452        let mut compatible_col_ids = Vec::new();
453
454        for (col_id, expected) in &self.expected_predicate_col_types {
455            let Some(sst_col) = sst_metadata.column_by_id(*col_id) else {
456                has_type_mismatch = true;
457                continue;
458            };
459
460            if sst_col.column_schema.data_type != *expected {
461                has_type_mismatch = true;
462                continue;
463            }
464
465            compatible_col_ids.push(*col_id);
466        }
467
468        if compatible_col_ids.is_empty() {
469            return None;
470        }
471
472        if !has_type_mismatch {
473            return Some(self.default_predicates.clone());
474        }
475
476        let mut compatible_predicates = BTreeMap::new();
477        for col_id in compatible_col_ids {
478            if let Some(predicates) = self.default_predicates.get(&col_id) {
479                compatible_predicates.insert(col_id, predicates.clone());
480            }
481        }
482
483        Some(Arc::new(compatible_predicates))
484    }
485}
486
487fn is_blob_not_found(err: &Error) -> bool {
488    matches!(
489        err,
490        Error::PuffinReadBlob {
491            source: puffin::error::Error::BlobNotFound { .. },
492            ..
493        }
494    )
495}
496
497#[cfg(test)]
498mod tests {
499    use std::collections::BTreeSet;
500
501    use datafusion_expr::{Expr, col, lit};
502    use futures::future::BoxFuture;
503    use index::Bytes;
504    use object_store::services::Memory;
505    use puffin::puffin_manager::PuffinWriter;
506    use store_api::metadata::RegionMetadata;
507    use store_api::storage::FileId;
508
509    use super::*;
510    use crate::sst::file::RegionFileId;
511    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
512    use crate::sst::index::bloom_filter::creator::tests::{
513        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
514    };
515
516    #[tokio::test]
517    async fn test_compatible_predicate_for_sst() {
518        let (_d, puffin_manager_factory) =
519            PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
520        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
521        let table_dir = "table_dir".to_string();
522
523        let predicates = BTreeMap::from_iter([(
524            1,
525            vec![InListPredicate {
526                list: BTreeSet::from_iter([Bytes::from("foo")]),
527            }],
528        )]);
529        let expected_predicate_col_types =
530            BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
531
532        let applier = BloomFilterIndexApplier::new(
533            table_dir,
534            PathType::Bare,
535            object_store,
536            puffin_manager_factory,
537            predicates,
538            expected_predicate_col_types,
539        );
540        let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
541        assert!(predicates.is_some());
542    }
543
544    #[tokio::test]
545    async fn test_compatible_predicate_for_sst_type_mismatch() {
546        let (_d, puffin_manager_factory) =
547            PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
548        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
549        let table_dir = "table_dir".to_string();
550
551        let predicates = BTreeMap::from_iter([(
552            1,
553            vec![InListPredicate {
554                list: BTreeSet::from_iter([Bytes::from("foo")]),
555            }],
556        )]);
557        let expected_predicate_col_types =
558            BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
559
560        let applier = BloomFilterIndexApplier::new(
561            table_dir,
562            PathType::Bare,
563            object_store,
564            puffin_manager_factory,
565            predicates,
566            expected_predicate_col_types,
567        );
568        let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
569        assert!(predicates.is_none());
570    }
571
572    #[tokio::test]
573    async fn test_compatible_predicate_for_sst_partial_type_mismatch() {
574        let (_d, puffin_manager_factory) =
575            PuffinManagerFactory::new_for_test_async("test_plan_for_sst_partial_mismatch_").await;
576        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
577        let table_dir = "table_dir".to_string();
578
579        // Column 1 (tag_str): expected string — matches SST (compatible).
580        // Column 3 (field_u64): expected int64 — SST has uint64 (mismatched).
581        let predicates = BTreeMap::from_iter([
582            (
583                1,
584                vec![InListPredicate {
585                    list: BTreeSet::from_iter([Bytes::from("foo")]),
586                }],
587            ),
588            (
589                3,
590                vec![InListPredicate {
591                    list: BTreeSet::from_iter([Bytes::from("bar")]),
592                }],
593            ),
594        ]);
595        let expected_predicate_col_types = BTreeMap::from_iter([
596            (1, ConcreteDataType::string_datatype()),
597            (3, ConcreteDataType::int64_datatype()), // intentional mismatch
598        ]);
599
600        let applier = BloomFilterIndexApplier::new(
601            table_dir,
602            PathType::Bare,
603            object_store,
604            puffin_manager_factory,
605            predicates,
606            expected_predicate_col_types,
607        );
608        let result = applier.compatible_predicate_for_sst(&mock_region_metadata());
609
610        // The subset containing only the compatible column must be returned.
611        let result = result.expect("expected Some with compatible subset");
612        assert!(
613            result.contains_key(&1),
614            "compatible column 1 must be present"
615        );
616        assert!(
617            !result.contains_key(&3),
618            "mismatched column 3 must be absent"
619        );
620        assert_eq!(result.len(), 1, "only the compatible predicate must remain");
621    }
622
623    #[allow(clippy::type_complexity)]
624    fn tester(
625        table_dir: String,
626        object_store: ObjectStore,
627        metadata: &RegionMetadata,
628        puffin_manager_factory: PuffinManagerFactory,
629        file_id: RegionIndexId,
630    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
631    + use<'_> {
632        move |exprs, row_groups| {
633            let table_dir = table_dir.clone();
634            let object_store: ObjectStore = object_store.clone();
635            let metadata = metadata.clone();
636            let puffin_manager_factory = puffin_manager_factory.clone();
637            let exprs = exprs.to_vec();
638
639            Box::pin(async move {
640                let builder = BloomFilterIndexApplierBuilder::new(
641                    table_dir,
642                    PathType::Bare,
643                    object_store,
644                    &metadata,
645                    puffin_manager_factory,
646                );
647
648                let applier = builder.build(&exprs).unwrap().unwrap();
649                let predicates = applier
650                    .compatible_predicate_for_sst(&Arc::new(metadata.clone()))
651                    .unwrap();
652                applier
653                    .apply(file_id, None, &predicates, row_groups.into_iter(), None)
654                    .await
655                    .unwrap()
656                    .into_iter()
657                    .filter(|(_, ranges)| !ranges.is_empty())
658                    .collect()
659            })
660        }
661    }
662
663    #[tokio::test]
664    #[allow(clippy::single_range_in_vec_init)]
665    async fn test_bloom_filter_applier() {
666        // tag_str:
667        //   - type: string
668        //   - index: bloom filter
669        //   - granularity: 2
670        //   - column_id: 1
671        //
672        // ts:
673        //   - type: timestamp
674        //   - index: time index
675        //   - column_id: 2
676        //
677        // field_u64:
678        //   - type: uint64
679        //   - index: bloom filter
680        //   - granularity: 4
681        //   - column_id: 3
682        let region_metadata = mock_region_metadata();
683        let prefix = "test_bloom_filter_applier_";
684        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
685        let object_store = mock_object_store();
686        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
687        let memory_usage_threshold = Some(1024);
688        let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
689        let file_id = RegionIndexId::new(file_id, 0);
690        let table_dir = "table_dir".to_string();
691
692        let mut indexer = BloomFilterIndexer::new(
693            file_id.file_id(),
694            &region_metadata,
695            intm_mgr,
696            memory_usage_threshold,
697        )
698        .unwrap()
699        .unwrap();
700
701        // push 20 rows
702        let mut batch = new_batch("tag1", 0..10);
703        indexer.update(&mut batch).await.unwrap();
704        let mut batch = new_batch("tag2", 10..20);
705        indexer.update(&mut batch).await.unwrap();
706
707        let puffin_manager = factory.build(
708            object_store.clone(),
709            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
710        );
711
712        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
713        indexer.finish(&mut puffin_writer).await.unwrap();
714        puffin_writer.finish().await.unwrap();
715
716        let tester = tester(
717            table_dir.clone(),
718            object_store.clone(),
719            &region_metadata,
720            factory.clone(),
721            file_id,
722        );
723
724        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
725        // row group: | o  row group |  o row group | o  row group     |  o row group     |
726        // tag_str:   |      o pred                 |   x pred                            |
727        let res = tester(
728            &[col("tag_str").eq(lit("tag1"))],
729            vec![(5, true), (5, true), (5, true), (5, true)],
730        )
731        .await;
732        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
733
734        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
735        // row group: | o  row group |  x row group | o  row group     |  o row group     |
736        // tag_str:   |      o pred                 |   x pred                            |
737        let res = tester(
738            &[col("tag_str").eq(lit("tag1"))],
739            vec![(5, true), (5, false), (5, true), (5, true)],
740        )
741        .await;
742        assert_eq!(res, vec![(0, vec![0..5])]);
743
744        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
745        // row group: | o  row group |  o row group | o  row group     |  o row group     |
746        // tag_str:   |      o pred                 |   x pred                            |
747        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
748        let res = tester(
749            &[
750                col("tag_str").eq(lit("tag1")),
751                col("field_u64").eq(lit(1u64)),
752            ],
753            vec![(5, true), (5, true), (5, true), (5, true)],
754        )
755        .await;
756        assert_eq!(res, vec![(0, vec![0..4])]);
757
758        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
759        // row group: | o  row group |  o row group | x  row group     |  o row group     |
760        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
761        let res = tester(
762            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
763            vec![(5, true), (5, true), (5, false), (5, true)],
764        )
765        .await;
766        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
767    }
768}