mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Number of rows.
80    pub num_rows: usize,
81    /// Number of row groups
82    pub num_row_groups: u64,
83    /// File Meta Data
84    pub file_metadata: Option<Arc<ParquetMetaData>>,
85    /// Index Meta Data
86    pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91    use std::collections::HashSet;
92    use std::sync::Arc;
93
94    use common_time::Timestamp;
95    use datafusion_common::{Column, ScalarValue};
96    use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
97    use datatypes::arrow;
98    use datatypes::arrow::array::RecordBatch;
99    use datatypes::arrow::datatypes::{DataType, Field, Schema};
100    use parquet::arrow::AsyncArrowWriter;
101    use parquet::basic::{Compression, Encoding, ZstdLevel};
102    use parquet::file::metadata::KeyValue;
103    use parquet::file::properties::WriterProperties;
104    use table::predicate::Predicate;
105    use tokio_util::compat::FuturesAsyncWriteCompatExt;
106
107    use super::*;
108    use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory};
109    use crate::cache::{CacheManager, CacheStrategy, PageKey};
110    use crate::read::BatchReader;
111    use crate::region::options::{IndexOptions, InvertedIndexOptions};
112    use crate::sst::file::{FileHandle, FileMeta};
113    use crate::sst::file_purger::NoopFilePurger;
114    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
115    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
116    use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
117    use crate::sst::parquet::format::WriteFormat;
118    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
119    use crate::sst::parquet::writer::ParquetWriter;
120    use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
121    use crate::test_util::sst_util::{
122        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
123        new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
124        sst_region_metadata,
125    };
126    use crate::test_util::{check_reader_result, TestEnv};
127
128    const FILE_DIR: &str = "/";
129
130    #[derive(Clone)]
131    struct FixedPathProvider {
132        file_id: FileId,
133    }
134
135    impl FilePathProvider for FixedPathProvider {
136        fn build_index_file_path(&self, _file_id: FileId) -> String {
137            location::index_file_path(FILE_DIR, self.file_id)
138        }
139
140        fn build_sst_file_path(&self, _file_id: FileId) -> String {
141            location::sst_file_path(FILE_DIR, self.file_id)
142        }
143    }
144
145    struct NoopIndexBuilder;
146
147    #[async_trait::async_trait]
148    impl IndexerBuilder for NoopIndexBuilder {
149        async fn build(&self, _file_id: FileId) -> Indexer {
150            Indexer::default()
151        }
152    }
153
154    #[tokio::test]
155    async fn test_write_read() {
156        let mut env = TestEnv::new().await;
157        let object_store = env.init_object_store_manager();
158        let handle = sst_file_handle(0, 1000);
159        let file_path = FixedPathProvider {
160            file_id: handle.file_id(),
161        };
162        let metadata = Arc::new(sst_region_metadata());
163        let source = new_source(&[
164            new_batch_by_range(&["a", "d"], 0, 60),
165            new_batch_by_range(&["b", "f"], 0, 40),
166            new_batch_by_range(&["b", "h"], 100, 200),
167        ]);
168        // Use a small row group size for test.
169        let write_opts = WriteOptions {
170            row_group_size: 50,
171            ..Default::default()
172        };
173
174        let mut writer = ParquetWriter::new_with_object_store(
175            object_store.clone(),
176            metadata.clone(),
177            NoopIndexBuilder,
178            file_path,
179        )
180        .await;
181
182        let info = writer
183            .write_all(source, None, &write_opts)
184            .await
185            .unwrap()
186            .remove(0);
187        assert_eq!(200, info.num_rows);
188        assert!(info.file_size > 0);
189        assert_eq!(
190            (
191                Timestamp::new_millisecond(0),
192                Timestamp::new_millisecond(199)
193            ),
194            info.time_range
195        );
196
197        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
198        let mut reader = builder.build().await.unwrap();
199        check_reader_result(
200            &mut reader,
201            &[
202                new_batch_by_range(&["a", "d"], 0, 50),
203                new_batch_by_range(&["a", "d"], 50, 60),
204                new_batch_by_range(&["b", "f"], 0, 40),
205                new_batch_by_range(&["b", "h"], 100, 150),
206                new_batch_by_range(&["b", "h"], 150, 200),
207            ],
208        )
209        .await;
210    }
211
212    #[tokio::test]
213    async fn test_read_with_cache() {
214        let mut env = TestEnv::new().await;
215        let object_store = env.init_object_store_manager();
216        let handle = sst_file_handle(0, 1000);
217        let metadata = Arc::new(sst_region_metadata());
218        let source = new_source(&[
219            new_batch_by_range(&["a", "d"], 0, 60),
220            new_batch_by_range(&["b", "f"], 0, 40),
221            new_batch_by_range(&["b", "h"], 100, 200),
222        ]);
223        // Use a small row group size for test.
224        let write_opts = WriteOptions {
225            row_group_size: 50,
226            ..Default::default()
227        };
228        // Prepare data.
229        let mut writer = ParquetWriter::new_with_object_store(
230            object_store.clone(),
231            metadata.clone(),
232            NoopIndexBuilder,
233            FixedPathProvider {
234                file_id: handle.file_id(),
235            },
236        )
237        .await;
238
239        writer
240            .write_all(source, None, &write_opts)
241            .await
242            .unwrap()
243            .remove(0);
244
245        // Enable page cache.
246        let cache = CacheStrategy::EnableAll(Arc::new(
247            CacheManager::builder()
248                .page_cache_size(64 * 1024 * 1024)
249                .build(),
250        ));
251        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
252            .cache(cache.clone());
253        for _ in 0..3 {
254            let mut reader = builder.build().await.unwrap();
255            check_reader_result(
256                &mut reader,
257                &[
258                    new_batch_by_range(&["a", "d"], 0, 50),
259                    new_batch_by_range(&["a", "d"], 50, 60),
260                    new_batch_by_range(&["b", "f"], 0, 40),
261                    new_batch_by_range(&["b", "h"], 100, 150),
262                    new_batch_by_range(&["b", "h"], 150, 200),
263                ],
264            )
265            .await;
266        }
267
268        // Doesn't have compressed page cached.
269        let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
270        assert!(cache.get_pages(&page_key).is_none());
271
272        // Cache 4 row groups.
273        for i in 0..4 {
274            let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
275            assert!(cache.get_pages(&page_key).is_some());
276        }
277        let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
278        assert!(cache.get_pages(&page_key).is_none());
279    }
280
281    #[tokio::test]
282    async fn test_parquet_metadata_eq() {
283        // create test env
284        let mut env = crate::test_util::TestEnv::new().await;
285        let object_store = env.init_object_store_manager();
286        let handle = sst_file_handle(0, 1000);
287        let metadata = Arc::new(sst_region_metadata());
288        let source = new_source(&[
289            new_batch_by_range(&["a", "d"], 0, 60),
290            new_batch_by_range(&["b", "f"], 0, 40),
291            new_batch_by_range(&["b", "h"], 100, 200),
292        ]);
293        let write_opts = WriteOptions {
294            row_group_size: 50,
295            ..Default::default()
296        };
297
298        // write the sst file and get sst info
299        // sst info contains the parquet metadata, which is converted from FileMetaData
300        let mut writer = ParquetWriter::new_with_object_store(
301            object_store.clone(),
302            metadata.clone(),
303            NoopIndexBuilder,
304            FixedPathProvider {
305                file_id: handle.file_id(),
306            },
307        )
308        .await;
309
310        let sst_info = writer
311            .write_all(source, None, &write_opts)
312            .await
313            .unwrap()
314            .remove(0);
315        let writer_metadata = sst_info.file_metadata.unwrap();
316
317        // read the sst file metadata
318        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
319        let reader = builder.build().await.unwrap();
320        let reader_metadata = reader.parquet_metadata();
321
322        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
323    }
324
325    #[tokio::test]
326    async fn test_read_with_tag_filter() {
327        let mut env = TestEnv::new().await;
328        let object_store = env.init_object_store_manager();
329        let handle = sst_file_handle(0, 1000);
330        let metadata = Arc::new(sst_region_metadata());
331        let source = new_source(&[
332            new_batch_by_range(&["a", "d"], 0, 60),
333            new_batch_by_range(&["b", "f"], 0, 40),
334            new_batch_by_range(&["b", "h"], 100, 200),
335        ]);
336        // Use a small row group size for test.
337        let write_opts = WriteOptions {
338            row_group_size: 50,
339            ..Default::default()
340        };
341        // Prepare data.
342        let mut writer = ParquetWriter::new_with_object_store(
343            object_store.clone(),
344            metadata.clone(),
345            NoopIndexBuilder,
346            FixedPathProvider {
347                file_id: handle.file_id(),
348            },
349        )
350        .await;
351        writer
352            .write_all(source, None, &write_opts)
353            .await
354            .unwrap()
355            .remove(0);
356
357        // Predicate
358        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
359            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
360            op: Operator::Eq,
361            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
362        })]));
363
364        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
365            .predicate(predicate);
366        let mut reader = builder.build().await.unwrap();
367        check_reader_result(
368            &mut reader,
369            &[
370                new_batch_by_range(&["a", "d"], 0, 50),
371                new_batch_by_range(&["a", "d"], 50, 60),
372            ],
373        )
374        .await;
375    }
376
377    #[tokio::test]
378    async fn test_read_empty_batch() {
379        let mut env = TestEnv::new().await;
380        let object_store = env.init_object_store_manager();
381        let handle = sst_file_handle(0, 1000);
382        let metadata = Arc::new(sst_region_metadata());
383        let source = new_source(&[
384            new_batch_by_range(&["a", "z"], 0, 0),
385            new_batch_by_range(&["a", "z"], 100, 100),
386            new_batch_by_range(&["a", "z"], 200, 230),
387        ]);
388        // Use a small row group size for test.
389        let write_opts = WriteOptions {
390            row_group_size: 50,
391            ..Default::default()
392        };
393        // Prepare data.
394        let mut writer = ParquetWriter::new_with_object_store(
395            object_store.clone(),
396            metadata.clone(),
397            NoopIndexBuilder,
398            FixedPathProvider {
399                file_id: handle.file_id(),
400            },
401        )
402        .await;
403        writer
404            .write_all(source, None, &write_opts)
405            .await
406            .unwrap()
407            .remove(0);
408
409        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
410        let mut reader = builder.build().await.unwrap();
411        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
412    }
413
414    #[tokio::test]
415    async fn test_read_with_field_filter() {
416        let mut env = TestEnv::new().await;
417        let object_store = env.init_object_store_manager();
418        let handle = sst_file_handle(0, 1000);
419        let metadata = Arc::new(sst_region_metadata());
420        let source = new_source(&[
421            new_batch_by_range(&["a", "d"], 0, 60),
422            new_batch_by_range(&["b", "f"], 0, 40),
423            new_batch_by_range(&["b", "h"], 100, 200),
424        ]);
425        // Use a small row group size for test.
426        let write_opts = WriteOptions {
427            row_group_size: 50,
428            ..Default::default()
429        };
430        // Prepare data.
431        let mut writer = ParquetWriter::new_with_object_store(
432            object_store.clone(),
433            metadata.clone(),
434            NoopIndexBuilder,
435            FixedPathProvider {
436                file_id: handle.file_id(),
437            },
438        )
439        .await;
440
441        writer
442            .write_all(source, None, &write_opts)
443            .await
444            .unwrap()
445            .remove(0);
446
447        // Predicate
448        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
449            left: Box::new(Expr::Column(Column::from_name("field_0"))),
450            op: Operator::GtEq,
451            right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
452        })]));
453
454        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
455            .predicate(predicate);
456        let mut reader = builder.build().await.unwrap();
457        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
458    }
459
460    #[tokio::test]
461    async fn test_read_large_binary() {
462        let mut env = TestEnv::new().await;
463        let object_store = env.init_object_store_manager();
464        let handle = sst_file_handle(0, 1000);
465        let file_path = handle.file_path(FILE_DIR);
466
467        let write_opts = WriteOptions {
468            row_group_size: 50,
469            ..Default::default()
470        };
471
472        let metadata = build_test_binary_test_region_metadata();
473        let json = metadata.to_json().unwrap();
474        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
475
476        let props_builder = WriterProperties::builder()
477            .set_key_value_metadata(Some(vec![key_value_meta]))
478            .set_compression(Compression::ZSTD(ZstdLevel::default()))
479            .set_encoding(Encoding::PLAIN)
480            .set_max_row_group_size(write_opts.row_group_size);
481
482        let writer_props = props_builder.build();
483
484        let write_format = WriteFormat::new(metadata);
485        let fields: Vec<_> = write_format
486            .arrow_schema()
487            .fields()
488            .into_iter()
489            .map(|field| {
490                let data_type = field.data_type().clone();
491                if data_type == DataType::Binary {
492                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
493                } else {
494                    Field::new(field.name(), data_type, field.is_nullable())
495                }
496            })
497            .collect();
498
499        let arrow_schema = Arc::new(Schema::new(fields));
500
501        // Ensures field_0 has LargeBinary type.
502        assert_eq!(
503            &DataType::LargeBinary,
504            arrow_schema.field_with_name("field_0").unwrap().data_type()
505        );
506        let mut writer = AsyncArrowWriter::try_new(
507            object_store
508                .writer_with(&file_path)
509                .concurrent(DEFAULT_WRITE_CONCURRENCY)
510                .await
511                .map(|w| w.into_futures_async_write().compat_write())
512                .unwrap(),
513            arrow_schema.clone(),
514            Some(writer_props),
515        )
516        .unwrap();
517
518        let batch = new_batch_with_binary(&["a"], 0, 60);
519        let arrow_batch = write_format.convert_batch(&batch).unwrap();
520        let arrays: Vec<_> = arrow_batch
521            .columns()
522            .iter()
523            .map(|array| {
524                let data_type = array.data_type().clone();
525                if data_type == DataType::Binary {
526                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
527                } else {
528                    array.clone()
529                }
530            })
531            .collect();
532        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
533
534        writer.write(&result).await.unwrap();
535        writer.close().await.unwrap();
536
537        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
538        let mut reader = builder.build().await.unwrap();
539        check_reader_result(
540            &mut reader,
541            &[
542                new_batch_with_binary(&["a"], 0, 50),
543                new_batch_with_binary(&["a"], 50, 60),
544            ],
545        )
546        .await;
547    }
548
549    #[tokio::test]
550    async fn test_write_multiple_files() {
551        common_telemetry::init_default_ut_logging();
552        // create test env
553        let mut env = TestEnv::new().await;
554        let object_store = env.init_object_store_manager();
555        let metadata = Arc::new(sst_region_metadata());
556        let batches = &[
557            new_batch_by_range(&["a", "d"], 0, 1000),
558            new_batch_by_range(&["b", "f"], 0, 1000),
559            new_batch_by_range(&["b", "h"], 100, 200),
560            new_batch_by_range(&["b", "h"], 200, 300),
561            new_batch_by_range(&["b", "h"], 300, 1000),
562        ];
563        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
564
565        let source = new_source(batches);
566        let write_opts = WriteOptions {
567            row_group_size: 50,
568            max_file_size: Some(1024 * 16),
569            ..Default::default()
570        };
571
572        let path_provider = RegionFilePathFactory {
573            region_dir: "test".to_string(),
574        };
575        let mut writer = ParquetWriter::new_with_object_store(
576            object_store.clone(),
577            metadata.clone(),
578            NoopIndexBuilder,
579            path_provider,
580        )
581        .await;
582
583        let files = writer.write_all(source, None, &write_opts).await.unwrap();
584        assert_eq!(2, files.len());
585
586        let mut rows_read = 0;
587        for f in &files {
588            let file_handle = sst_file_handle_with_file_id(
589                f.file_id,
590                f.time_range.0.value(),
591                f.time_range.1.value(),
592            );
593            let builder =
594                ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
595            let mut reader = builder.build().await.unwrap();
596            while let Some(batch) = reader.next_batch().await.unwrap() {
597                rows_read += batch.num_rows();
598            }
599        }
600        assert_eq!(total_rows, rows_read);
601    }
602
603    #[tokio::test]
604    async fn test_write_read_with_index() {
605        let mut env = TestEnv::new().await;
606        let object_store = env.init_object_store_manager();
607        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string());
608        let metadata = Arc::new(sst_region_metadata());
609        let row_group_size = 50;
610
611        let source = new_source(&[
612            new_batch_by_range(&["a", "d"], 0, 20),
613            new_batch_by_range(&["b", "d"], 0, 20),
614            new_batch_by_range(&["c", "d"], 0, 20),
615            new_batch_by_range(&["c", "f"], 0, 40),
616            new_batch_by_range(&["c", "h"], 100, 200),
617        ]);
618        // Use a small row group size for test.
619        let write_opts = WriteOptions {
620            row_group_size,
621            ..Default::default()
622        };
623
624        let puffin_manager = env
625            .get_puffin_manager()
626            .build(object_store.clone(), file_path.clone());
627        let intermediate_manager = env.get_intermediate_manager();
628
629        let indexer_builder = IndexerBuilderImpl {
630            op_type: OperationType::Flush,
631            metadata: metadata.clone(),
632            row_group_size,
633            puffin_manager,
634            intermediate_manager,
635            index_options: IndexOptions {
636                inverted_index: InvertedIndexOptions {
637                    segment_row_count: 1,
638                    ..Default::default()
639                },
640            },
641            inverted_index_config: Default::default(),
642            fulltext_index_config: Default::default(),
643            bloom_filter_index_config: Default::default(),
644        };
645
646        let mut writer = ParquetWriter::new_with_object_store(
647            object_store.clone(),
648            metadata.clone(),
649            indexer_builder,
650            file_path.clone(),
651        )
652        .await;
653
654        let info = writer
655            .write_all(source, None, &write_opts)
656            .await
657            .unwrap()
658            .remove(0);
659        assert_eq!(200, info.num_rows);
660        assert!(info.file_size > 0);
661        assert!(info.index_metadata.file_size > 0);
662
663        assert!(info.index_metadata.inverted_index.index_size > 0);
664        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
665        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
666
667        assert!(info.index_metadata.bloom_filter.index_size > 0);
668        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
669        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
670
671        assert_eq!(
672            (
673                Timestamp::new_millisecond(0),
674                Timestamp::new_millisecond(199)
675            ),
676            info.time_range
677        );
678
679        let handle = FileHandle::new(
680            FileMeta {
681                region_id: metadata.region_id,
682                file_id: info.file_id,
683                time_range: info.time_range,
684                level: 0,
685                file_size: info.file_size,
686                available_indexes: info.index_metadata.build_available_indexes(),
687                index_file_size: info.index_metadata.file_size,
688                num_row_groups: info.num_row_groups,
689                num_rows: info.num_rows as u64,
690                sequence: None,
691            },
692            Arc::new(NoopFilePurger),
693        );
694
695        let cache = Arc::new(
696            CacheManager::builder()
697                .index_result_cache_size(1024 * 1024)
698                .index_metadata_size(1024 * 1024)
699                .index_content_page_size(1024 * 1024)
700                .index_content_size(1024 * 1024)
701                .puffin_metadata_size(1024 * 1024)
702                .build(),
703        );
704        let index_result_cache = cache.index_result_cache().unwrap();
705
706        let build_inverted_index_applier = |exprs: &[Expr]| {
707            InvertedIndexApplierBuilder::new(
708                FILE_DIR.to_string(),
709                object_store.clone(),
710                &metadata,
711                HashSet::from_iter([0]),
712                env.get_puffin_manager(),
713            )
714            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
715            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
716            .build(exprs)
717            .unwrap()
718            .map(Arc::new)
719        };
720
721        let build_bloom_filter_applier = |exprs: &[Expr]| {
722            BloomFilterIndexApplierBuilder::new(
723                FILE_DIR.to_string(),
724                object_store.clone(),
725                &metadata,
726                env.get_puffin_manager(),
727            )
728            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
729            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
730            .build(exprs)
731            .unwrap()
732            .map(Arc::new)
733        };
734
735        // Data: ts tag_0 tag_1
736        // Data: 0-20 [a, d]
737        //       0-20 [b, d]
738        //       0-20 [c, d]
739        //       0-40 [c, f]
740        //    100-200 [c, h]
741        //
742        // Pred: tag_0 = "b"
743        //
744        // Row groups & rows pruning:
745        //
746        // Row Groups:
747        // - min-max: filter out row groups 1..=3
748        //
749        // Rows:
750        // - inverted index: hit row group 0, hit 20 rows
751        let preds = vec![col("tag_0").eq(lit("b"))];
752        let inverted_index_applier = build_inverted_index_applier(&preds);
753        let bloom_filter_applier = build_bloom_filter_applier(&preds);
754
755        let builder =
756            ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
757                .predicate(Some(Predicate::new(preds)))
758                .inverted_index_applier(inverted_index_applier.clone())
759                .bloom_filter_index_applier(bloom_filter_applier.clone())
760                .cache(CacheStrategy::EnableAll(cache.clone()));
761
762        let mut metrics = ReaderMetrics::default();
763        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
764        let mut reader = ParquetReader::new(Arc::new(context), selection)
765            .await
766            .unwrap();
767        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
768
769        assert_eq!(metrics.filter_metrics.rg_total, 4);
770        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
771        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
772        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
773        let cached = index_result_cache
774            .get(
775                inverted_index_applier.unwrap().predicate_key(),
776                handle.file_id(),
777            )
778            .unwrap();
779        // inverted index will search all row groups
780        assert!(cached.contains_row_group(0));
781        assert!(cached.contains_row_group(1));
782        assert!(cached.contains_row_group(2));
783        assert!(cached.contains_row_group(3));
784
785        // Data: ts tag_0 tag_1
786        // Data: 0-20 [a, d]
787        //       0-20 [b, d]
788        //       0-20 [c, d]
789        //       0-40 [c, f]
790        //    100-200 [c, h]
791        //
792        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
793        //
794        // Row groups & rows pruning:
795        //
796        // Row Groups:
797        // - min-max: filter out row groups 0..=1
798        // - bloom filter: filter out row groups 2..=3
799        let preds = vec![
800            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
801            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
802            col("tag_1").eq(lit("d")),
803        ];
804        let inverted_index_applier = build_inverted_index_applier(&preds);
805        let bloom_filter_applier = build_bloom_filter_applier(&preds);
806
807        let builder =
808            ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
809                .predicate(Some(Predicate::new(preds)))
810                .inverted_index_applier(inverted_index_applier.clone())
811                .bloom_filter_index_applier(bloom_filter_applier.clone())
812                .cache(CacheStrategy::EnableAll(cache.clone()));
813
814        let mut metrics = ReaderMetrics::default();
815        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
816        let mut reader = ParquetReader::new(Arc::new(context), selection)
817            .await
818            .unwrap();
819        check_reader_result(&mut reader, &[]).await;
820
821        assert_eq!(metrics.filter_metrics.rg_total, 4);
822        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
823        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
824        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
825        let cached = index_result_cache
826            .get(
827                bloom_filter_applier.unwrap().predicate_key(),
828                handle.file_id(),
829            )
830            .unwrap();
831        assert!(cached.contains_row_group(2));
832        assert!(cached.contains_row_group(3));
833        assert!(!cached.contains_row_group(0));
834        assert!(!cached.contains_row_group(1));
835
836        // Remove the pred of `ts`, continue to use the pred of `tag_1`
837        // to test if cache works.
838
839        // Data: ts tag_0 tag_1
840        // Data: 0-20 [a, d]
841        //       0-20 [b, d]
842        //       0-20 [c, d]
843        //       0-40 [c, f]
844        //    100-200 [c, h]
845        //
846        // Pred: tag_1 = "d"
847        //
848        // Row groups & rows pruning:
849        //
850        // Row Groups:
851        // - bloom filter: filter out row groups 2..=3
852        //
853        // Rows:
854        // - bloom filter: hit row group 0, hit 50 rows
855        //                 hit row group 1, hit 10 rows
856        let preds = vec![col("tag_1").eq(lit("d"))];
857        let inverted_index_applier = build_inverted_index_applier(&preds);
858        let bloom_filter_applier = build_bloom_filter_applier(&preds);
859
860        let builder =
861            ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
862                .predicate(Some(Predicate::new(preds)))
863                .inverted_index_applier(inverted_index_applier.clone())
864                .bloom_filter_index_applier(bloom_filter_applier.clone())
865                .cache(CacheStrategy::EnableAll(cache.clone()));
866
867        let mut metrics = ReaderMetrics::default();
868        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
869        let mut reader = ParquetReader::new(Arc::new(context), selection)
870            .await
871            .unwrap();
872        check_reader_result(
873            &mut reader,
874            &[
875                new_batch_by_range(&["a", "d"], 0, 20),
876                new_batch_by_range(&["b", "d"], 0, 20),
877                new_batch_by_range(&["c", "d"], 0, 10),
878                new_batch_by_range(&["c", "d"], 10, 20),
879            ],
880        )
881        .await;
882
883        assert_eq!(metrics.filter_metrics.rg_total, 4);
884        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
885        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
886        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
887        let cached = index_result_cache
888            .get(
889                bloom_filter_applier.unwrap().predicate_key(),
890                handle.file_id(),
891            )
892            .unwrap();
893        assert!(cached.contains_row_group(0));
894        assert!(cached.contains_row_group(1));
895        assert!(cached.contains_row_group(2));
896        assert!(cached.contains_row_group(3));
897    }
898}