mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Number of rows.
80    pub num_rows: usize,
81    /// Number of row groups
82    pub num_row_groups: u64,
83    /// File Meta Data
84    pub file_metadata: Option<Arc<ParquetMetaData>>,
85    /// Index Meta Data
86    pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91    use std::collections::HashSet;
92    use std::sync::Arc;
93
94    use common_time::Timestamp;
95    use datafusion_common::{Column, ScalarValue};
96    use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
97    use datatypes::arrow;
98    use datatypes::arrow::array::{RecordBatch, UInt64Array};
99    use datatypes::arrow::datatypes::{DataType, Field, Schema};
100    use parquet::arrow::AsyncArrowWriter;
101    use parquet::basic::{Compression, Encoding, ZstdLevel};
102    use parquet::file::metadata::KeyValue;
103    use parquet::file::properties::WriterProperties;
104    use store_api::region_request::PathType;
105    use table::predicate::Predicate;
106    use tokio_util::compat::FuturesAsyncWriteCompatExt;
107
108    use super::*;
109    use crate::access_layer::{
110        FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
111    };
112    use crate::cache::{CacheManager, CacheStrategy, PageKey};
113    use crate::read::{BatchBuilder, BatchReader};
114    use crate::region::options::{IndexOptions, InvertedIndexOptions};
115    use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
116    use crate::sst::file_purger::NoopFilePurger;
117    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
118    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
119    use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
120    use crate::sst::parquet::format::WriteFormat;
121    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
122    use crate::sst::parquet::writer::ParquetWriter;
123    use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
124    use crate::test_util::sst_util::{
125        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
126        new_batch_with_binary, new_batch_with_custom_sequence, new_source, sst_file_handle,
127        sst_file_handle_with_file_id, sst_region_metadata,
128    };
129    use crate::test_util::{check_reader_result, TestEnv};
130
131    const FILE_DIR: &str = "/";
132
133    #[derive(Clone)]
134    struct FixedPathProvider {
135        region_file_id: RegionFileId,
136    }
137
138    impl FilePathProvider for FixedPathProvider {
139        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
140            location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
141        }
142
143        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
144            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
145        }
146    }
147
148    struct NoopIndexBuilder;
149
150    #[async_trait::async_trait]
151    impl IndexerBuilder for NoopIndexBuilder {
152        async fn build(&self, _file_id: FileId) -> Indexer {
153            Indexer::default()
154        }
155    }
156
157    #[tokio::test]
158    async fn test_write_read() {
159        let mut env = TestEnv::new().await;
160        let object_store = env.init_object_store_manager();
161        let handle = sst_file_handle(0, 1000);
162        let file_path = FixedPathProvider {
163            region_file_id: handle.file_id(),
164        };
165        let metadata = Arc::new(sst_region_metadata());
166        let source = new_source(&[
167            new_batch_by_range(&["a", "d"], 0, 60),
168            new_batch_by_range(&["b", "f"], 0, 40),
169            new_batch_by_range(&["b", "h"], 100, 200),
170        ]);
171        // Use a small row group size for test.
172        let write_opts = WriteOptions {
173            row_group_size: 50,
174            ..Default::default()
175        };
176
177        let mut writer = ParquetWriter::new_with_object_store(
178            object_store.clone(),
179            metadata.clone(),
180            NoopIndexBuilder,
181            file_path,
182            Metrics::new(WriteType::Flush),
183        )
184        .await;
185
186        let info = writer
187            .write_all(source, None, &write_opts)
188            .await
189            .unwrap()
190            .remove(0);
191        assert_eq!(200, info.num_rows);
192        assert!(info.file_size > 0);
193        assert_eq!(
194            (
195                Timestamp::new_millisecond(0),
196                Timestamp::new_millisecond(199)
197            ),
198            info.time_range
199        );
200
201        let builder = ParquetReaderBuilder::new(
202            FILE_DIR.to_string(),
203            PathType::Bare,
204            handle.clone(),
205            object_store,
206        );
207        let mut reader = builder.build().await.unwrap();
208        check_reader_result(
209            &mut reader,
210            &[
211                new_batch_by_range(&["a", "d"], 0, 50),
212                new_batch_by_range(&["a", "d"], 50, 60),
213                new_batch_by_range(&["b", "f"], 0, 40),
214                new_batch_by_range(&["b", "h"], 100, 150),
215                new_batch_by_range(&["b", "h"], 150, 200),
216            ],
217        )
218        .await;
219    }
220
221    #[tokio::test]
222    async fn test_read_with_cache() {
223        let mut env = TestEnv::new().await;
224        let object_store = env.init_object_store_manager();
225        let handle = sst_file_handle(0, 1000);
226        let metadata = Arc::new(sst_region_metadata());
227        let source = new_source(&[
228            new_batch_by_range(&["a", "d"], 0, 60),
229            new_batch_by_range(&["b", "f"], 0, 40),
230            new_batch_by_range(&["b", "h"], 100, 200),
231        ]);
232        // Use a small row group size for test.
233        let write_opts = WriteOptions {
234            row_group_size: 50,
235            ..Default::default()
236        };
237        // Prepare data.
238        let mut writer = ParquetWriter::new_with_object_store(
239            object_store.clone(),
240            metadata.clone(),
241            NoopIndexBuilder,
242            FixedPathProvider {
243                region_file_id: handle.file_id(),
244            },
245            Metrics::new(WriteType::Flush),
246        )
247        .await;
248
249        writer
250            .write_all(source, None, &write_opts)
251            .await
252            .unwrap()
253            .remove(0);
254
255        // Enable page cache.
256        let cache = CacheStrategy::EnableAll(Arc::new(
257            CacheManager::builder()
258                .page_cache_size(64 * 1024 * 1024)
259                .build(),
260        ));
261        let builder = ParquetReaderBuilder::new(
262            FILE_DIR.to_string(),
263            PathType::Bare,
264            handle.clone(),
265            object_store,
266        )
267        .cache(cache.clone());
268        for _ in 0..3 {
269            let mut reader = builder.build().await.unwrap();
270            check_reader_result(
271                &mut reader,
272                &[
273                    new_batch_by_range(&["a", "d"], 0, 50),
274                    new_batch_by_range(&["a", "d"], 50, 60),
275                    new_batch_by_range(&["b", "f"], 0, 40),
276                    new_batch_by_range(&["b", "h"], 100, 150),
277                    new_batch_by_range(&["b", "h"], 150, 200),
278                ],
279            )
280            .await;
281        }
282
283        // Doesn't have compressed page cached.
284        let page_key =
285            PageKey::new_compressed(metadata.region_id, handle.file_id().file_id(), 0, 0);
286        assert!(cache.get_pages(&page_key).is_none());
287
288        // Cache 4 row groups.
289        for i in 0..4 {
290            let page_key =
291                PageKey::new_uncompressed(metadata.region_id, handle.file_id().file_id(), i, 0);
292            assert!(cache.get_pages(&page_key).is_some());
293        }
294        let page_key =
295            PageKey::new_uncompressed(metadata.region_id, handle.file_id().file_id(), 5, 0);
296        assert!(cache.get_pages(&page_key).is_none());
297    }
298
299    #[tokio::test]
300    async fn test_parquet_metadata_eq() {
301        // create test env
302        let mut env = crate::test_util::TestEnv::new().await;
303        let object_store = env.init_object_store_manager();
304        let handle = sst_file_handle(0, 1000);
305        let metadata = Arc::new(sst_region_metadata());
306        let source = new_source(&[
307            new_batch_by_range(&["a", "d"], 0, 60),
308            new_batch_by_range(&["b", "f"], 0, 40),
309            new_batch_by_range(&["b", "h"], 100, 200),
310        ]);
311        let write_opts = WriteOptions {
312            row_group_size: 50,
313            ..Default::default()
314        };
315
316        // write the sst file and get sst info
317        // sst info contains the parquet metadata, which is converted from FileMetaData
318        let mut writer = ParquetWriter::new_with_object_store(
319            object_store.clone(),
320            metadata.clone(),
321            NoopIndexBuilder,
322            FixedPathProvider {
323                region_file_id: handle.file_id(),
324            },
325            Metrics::new(WriteType::Flush),
326        )
327        .await;
328
329        let sst_info = writer
330            .write_all(source, None, &write_opts)
331            .await
332            .unwrap()
333            .remove(0);
334        let writer_metadata = sst_info.file_metadata.unwrap();
335
336        // read the sst file metadata
337        let builder = ParquetReaderBuilder::new(
338            FILE_DIR.to_string(),
339            PathType::Bare,
340            handle.clone(),
341            object_store,
342        );
343        let reader = builder.build().await.unwrap();
344        let reader_metadata = reader.parquet_metadata();
345
346        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
347    }
348
349    #[tokio::test]
350    async fn test_read_with_tag_filter() {
351        let mut env = TestEnv::new().await;
352        let object_store = env.init_object_store_manager();
353        let handle = sst_file_handle(0, 1000);
354        let metadata = Arc::new(sst_region_metadata());
355        let source = new_source(&[
356            new_batch_by_range(&["a", "d"], 0, 60),
357            new_batch_by_range(&["b", "f"], 0, 40),
358            new_batch_by_range(&["b", "h"], 100, 200),
359        ]);
360        // Use a small row group size for test.
361        let write_opts = WriteOptions {
362            row_group_size: 50,
363            ..Default::default()
364        };
365        // Prepare data.
366        let mut writer = ParquetWriter::new_with_object_store(
367            object_store.clone(),
368            metadata.clone(),
369            NoopIndexBuilder,
370            FixedPathProvider {
371                region_file_id: handle.file_id(),
372            },
373            Metrics::new(WriteType::Flush),
374        )
375        .await;
376        writer
377            .write_all(source, None, &write_opts)
378            .await
379            .unwrap()
380            .remove(0);
381
382        // Predicate
383        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
384            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
385            op: Operator::Eq,
386            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
387        })]));
388
389        let builder = ParquetReaderBuilder::new(
390            FILE_DIR.to_string(),
391            PathType::Bare,
392            handle.clone(),
393            object_store,
394        )
395        .predicate(predicate);
396        let mut reader = builder.build().await.unwrap();
397        check_reader_result(
398            &mut reader,
399            &[
400                new_batch_by_range(&["a", "d"], 0, 50),
401                new_batch_by_range(&["a", "d"], 50, 60),
402            ],
403        )
404        .await;
405    }
406
407    #[tokio::test]
408    async fn test_read_empty_batch() {
409        let mut env = TestEnv::new().await;
410        let object_store = env.init_object_store_manager();
411        let handle = sst_file_handle(0, 1000);
412        let metadata = Arc::new(sst_region_metadata());
413        let source = new_source(&[
414            new_batch_by_range(&["a", "z"], 0, 0),
415            new_batch_by_range(&["a", "z"], 100, 100),
416            new_batch_by_range(&["a", "z"], 200, 230),
417        ]);
418        // Use a small row group size for test.
419        let write_opts = WriteOptions {
420            row_group_size: 50,
421            ..Default::default()
422        };
423        // Prepare data.
424        let mut writer = ParquetWriter::new_with_object_store(
425            object_store.clone(),
426            metadata.clone(),
427            NoopIndexBuilder,
428            FixedPathProvider {
429                region_file_id: handle.file_id(),
430            },
431            Metrics::new(WriteType::Flush),
432        )
433        .await;
434        writer
435            .write_all(source, None, &write_opts)
436            .await
437            .unwrap()
438            .remove(0);
439
440        let builder = ParquetReaderBuilder::new(
441            FILE_DIR.to_string(),
442            PathType::Bare,
443            handle.clone(),
444            object_store,
445        );
446        let mut reader = builder.build().await.unwrap();
447        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
448    }
449
450    #[tokio::test]
451    async fn test_read_with_field_filter() {
452        let mut env = TestEnv::new().await;
453        let object_store = env.init_object_store_manager();
454        let handle = sst_file_handle(0, 1000);
455        let metadata = Arc::new(sst_region_metadata());
456        let source = new_source(&[
457            new_batch_by_range(&["a", "d"], 0, 60),
458            new_batch_by_range(&["b", "f"], 0, 40),
459            new_batch_by_range(&["b", "h"], 100, 200),
460        ]);
461        // Use a small row group size for test.
462        let write_opts = WriteOptions {
463            row_group_size: 50,
464            ..Default::default()
465        };
466        // Prepare data.
467        let mut writer = ParquetWriter::new_with_object_store(
468            object_store.clone(),
469            metadata.clone(),
470            NoopIndexBuilder,
471            FixedPathProvider {
472                region_file_id: handle.file_id(),
473            },
474            Metrics::new(WriteType::Flush),
475        )
476        .await;
477
478        writer
479            .write_all(source, None, &write_opts)
480            .await
481            .unwrap()
482            .remove(0);
483
484        // Predicate
485        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
486            left: Box::new(Expr::Column(Column::from_name("field_0"))),
487            op: Operator::GtEq,
488            right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
489        })]));
490
491        let builder = ParquetReaderBuilder::new(
492            FILE_DIR.to_string(),
493            PathType::Bare,
494            handle.clone(),
495            object_store,
496        )
497        .predicate(predicate);
498        let mut reader = builder.build().await.unwrap();
499        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
500    }
501
502    #[tokio::test]
503    async fn test_read_large_binary() {
504        let mut env = TestEnv::new().await;
505        let object_store = env.init_object_store_manager();
506        let handle = sst_file_handle(0, 1000);
507        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
508
509        let write_opts = WriteOptions {
510            row_group_size: 50,
511            ..Default::default()
512        };
513
514        let metadata = build_test_binary_test_region_metadata();
515        let json = metadata.to_json().unwrap();
516        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
517
518        let props_builder = WriterProperties::builder()
519            .set_key_value_metadata(Some(vec![key_value_meta]))
520            .set_compression(Compression::ZSTD(ZstdLevel::default()))
521            .set_encoding(Encoding::PLAIN)
522            .set_max_row_group_size(write_opts.row_group_size);
523
524        let writer_props = props_builder.build();
525
526        let write_format = WriteFormat::new(metadata);
527        let fields: Vec<_> = write_format
528            .arrow_schema()
529            .fields()
530            .into_iter()
531            .map(|field| {
532                let data_type = field.data_type().clone();
533                if data_type == DataType::Binary {
534                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
535                } else {
536                    Field::new(field.name(), data_type, field.is_nullable())
537                }
538            })
539            .collect();
540
541        let arrow_schema = Arc::new(Schema::new(fields));
542
543        // Ensures field_0 has LargeBinary type.
544        assert_eq!(
545            &DataType::LargeBinary,
546            arrow_schema.field_with_name("field_0").unwrap().data_type()
547        );
548        let mut writer = AsyncArrowWriter::try_new(
549            object_store
550                .writer_with(&file_path)
551                .concurrent(DEFAULT_WRITE_CONCURRENCY)
552                .await
553                .map(|w| w.into_futures_async_write().compat_write())
554                .unwrap(),
555            arrow_schema.clone(),
556            Some(writer_props),
557        )
558        .unwrap();
559
560        let batch = new_batch_with_binary(&["a"], 0, 60);
561        let arrow_batch = write_format.convert_batch(&batch).unwrap();
562        let arrays: Vec<_> = arrow_batch
563            .columns()
564            .iter()
565            .map(|array| {
566                let data_type = array.data_type().clone();
567                if data_type == DataType::Binary {
568                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
569                } else {
570                    array.clone()
571                }
572            })
573            .collect();
574        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
575
576        writer.write(&result).await.unwrap();
577        writer.close().await.unwrap();
578
579        let builder = ParquetReaderBuilder::new(
580            FILE_DIR.to_string(),
581            PathType::Bare,
582            handle.clone(),
583            object_store,
584        );
585        let mut reader = builder.build().await.unwrap();
586        check_reader_result(
587            &mut reader,
588            &[
589                new_batch_with_binary(&["a"], 0, 50),
590                new_batch_with_binary(&["a"], 50, 60),
591            ],
592        )
593        .await;
594    }
595
596    #[tokio::test]
597    async fn test_write_multiple_files() {
598        common_telemetry::init_default_ut_logging();
599        // create test env
600        let mut env = TestEnv::new().await;
601        let object_store = env.init_object_store_manager();
602        let metadata = Arc::new(sst_region_metadata());
603        let batches = &[
604            new_batch_by_range(&["a", "d"], 0, 1000),
605            new_batch_by_range(&["b", "f"], 0, 1000),
606            new_batch_by_range(&["b", "h"], 100, 200),
607            new_batch_by_range(&["b", "h"], 200, 300),
608            new_batch_by_range(&["b", "h"], 300, 1000),
609        ];
610        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
611
612        let source = new_source(batches);
613        let write_opts = WriteOptions {
614            row_group_size: 50,
615            max_file_size: Some(1024 * 16),
616            ..Default::default()
617        };
618
619        let path_provider = RegionFilePathFactory {
620            table_dir: "test".to_string(),
621            path_type: PathType::Bare,
622        };
623        let mut writer = ParquetWriter::new_with_object_store(
624            object_store.clone(),
625            metadata.clone(),
626            NoopIndexBuilder,
627            path_provider,
628            Metrics::new(WriteType::Flush),
629        )
630        .await;
631
632        let files = writer.write_all(source, None, &write_opts).await.unwrap();
633        assert_eq!(2, files.len());
634
635        let mut rows_read = 0;
636        for f in &files {
637            let file_handle = sst_file_handle_with_file_id(
638                f.file_id,
639                f.time_range.0.value(),
640                f.time_range.1.value(),
641            );
642            let builder = ParquetReaderBuilder::new(
643                "test".to_string(),
644                PathType::Bare,
645                file_handle,
646                object_store.clone(),
647            );
648            let mut reader = builder.build().await.unwrap();
649            while let Some(batch) = reader.next_batch().await.unwrap() {
650                rows_read += batch.num_rows();
651            }
652        }
653        assert_eq!(total_rows, rows_read);
654    }
655
656    #[tokio::test]
657    async fn test_write_read_with_index() {
658        let mut env = TestEnv::new().await;
659        let object_store = env.init_object_store_manager();
660        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
661        let metadata = Arc::new(sst_region_metadata());
662        let row_group_size = 50;
663
664        let source = new_source(&[
665            new_batch_by_range(&["a", "d"], 0, 20),
666            new_batch_by_range(&["b", "d"], 0, 20),
667            new_batch_by_range(&["c", "d"], 0, 20),
668            new_batch_by_range(&["c", "f"], 0, 40),
669            new_batch_by_range(&["c", "h"], 100, 200),
670        ]);
671        // Use a small row group size for test.
672        let write_opts = WriteOptions {
673            row_group_size,
674            ..Default::default()
675        };
676
677        let puffin_manager = env
678            .get_puffin_manager()
679            .build(object_store.clone(), file_path.clone());
680        let intermediate_manager = env.get_intermediate_manager();
681
682        let indexer_builder = IndexerBuilderImpl {
683            op_type: OperationType::Flush,
684            metadata: metadata.clone(),
685            row_group_size,
686            puffin_manager,
687            intermediate_manager,
688            index_options: IndexOptions {
689                inverted_index: InvertedIndexOptions {
690                    segment_row_count: 1,
691                    ..Default::default()
692                },
693            },
694            inverted_index_config: Default::default(),
695            fulltext_index_config: Default::default(),
696            bloom_filter_index_config: Default::default(),
697        };
698
699        let mut writer = ParquetWriter::new_with_object_store(
700            object_store.clone(),
701            metadata.clone(),
702            indexer_builder,
703            file_path.clone(),
704            Metrics::new(WriteType::Flush),
705        )
706        .await;
707
708        let info = writer
709            .write_all(source, None, &write_opts)
710            .await
711            .unwrap()
712            .remove(0);
713        assert_eq!(200, info.num_rows);
714        assert!(info.file_size > 0);
715        assert!(info.index_metadata.file_size > 0);
716
717        assert!(info.index_metadata.inverted_index.index_size > 0);
718        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
719        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
720
721        assert!(info.index_metadata.bloom_filter.index_size > 0);
722        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
723        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
724
725        assert_eq!(
726            (
727                Timestamp::new_millisecond(0),
728                Timestamp::new_millisecond(199)
729            ),
730            info.time_range
731        );
732
733        let handle = FileHandle::new(
734            FileMeta {
735                region_id: metadata.region_id,
736                file_id: info.file_id,
737                time_range: info.time_range,
738                level: 0,
739                file_size: info.file_size,
740                available_indexes: info.index_metadata.build_available_indexes(),
741                index_file_size: info.index_metadata.file_size,
742                num_row_groups: info.num_row_groups,
743                num_rows: info.num_rows as u64,
744                sequence: None,
745            },
746            Arc::new(NoopFilePurger),
747        );
748
749        let cache = Arc::new(
750            CacheManager::builder()
751                .index_result_cache_size(1024 * 1024)
752                .index_metadata_size(1024 * 1024)
753                .index_content_page_size(1024 * 1024)
754                .index_content_size(1024 * 1024)
755                .puffin_metadata_size(1024 * 1024)
756                .build(),
757        );
758        let index_result_cache = cache.index_result_cache().unwrap();
759
760        let build_inverted_index_applier = |exprs: &[Expr]| {
761            InvertedIndexApplierBuilder::new(
762                FILE_DIR.to_string(),
763                PathType::Bare,
764                object_store.clone(),
765                &metadata,
766                HashSet::from_iter([0]),
767                env.get_puffin_manager(),
768            )
769            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
770            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
771            .build(exprs)
772            .unwrap()
773            .map(Arc::new)
774        };
775
776        let build_bloom_filter_applier = |exprs: &[Expr]| {
777            BloomFilterIndexApplierBuilder::new(
778                FILE_DIR.to_string(),
779                PathType::Bare,
780                object_store.clone(),
781                &metadata,
782                env.get_puffin_manager(),
783            )
784            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
785            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
786            .build(exprs)
787            .unwrap()
788            .map(Arc::new)
789        };
790
791        // Data: ts tag_0 tag_1
792        // Data: 0-20 [a, d]
793        //       0-20 [b, d]
794        //       0-20 [c, d]
795        //       0-40 [c, f]
796        //    100-200 [c, h]
797        //
798        // Pred: tag_0 = "b"
799        //
800        // Row groups & rows pruning:
801        //
802        // Row Groups:
803        // - min-max: filter out row groups 1..=3
804        //
805        // Rows:
806        // - inverted index: hit row group 0, hit 20 rows
807        let preds = vec![col("tag_0").eq(lit("b"))];
808        let inverted_index_applier = build_inverted_index_applier(&preds);
809        let bloom_filter_applier = build_bloom_filter_applier(&preds);
810
811        let builder = ParquetReaderBuilder::new(
812            FILE_DIR.to_string(),
813            PathType::Bare,
814            handle.clone(),
815            object_store.clone(),
816        )
817        .predicate(Some(Predicate::new(preds)))
818        .inverted_index_applier(inverted_index_applier.clone())
819        .bloom_filter_index_applier(bloom_filter_applier.clone())
820        .cache(CacheStrategy::EnableAll(cache.clone()));
821
822        let mut metrics = ReaderMetrics::default();
823        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
824        let mut reader = ParquetReader::new(Arc::new(context), selection)
825            .await
826            .unwrap();
827        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
828
829        assert_eq!(metrics.filter_metrics.rg_total, 4);
830        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
831        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
832        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
833        let cached = index_result_cache
834            .get(
835                inverted_index_applier.unwrap().predicate_key(),
836                handle.file_id().file_id(),
837            )
838            .unwrap();
839        // inverted index will search all row groups
840        assert!(cached.contains_row_group(0));
841        assert!(cached.contains_row_group(1));
842        assert!(cached.contains_row_group(2));
843        assert!(cached.contains_row_group(3));
844
845        // Data: ts tag_0 tag_1
846        // Data: 0-20 [a, d]
847        //       0-20 [b, d]
848        //       0-20 [c, d]
849        //       0-40 [c, f]
850        //    100-200 [c, h]
851        //
852        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
853        //
854        // Row groups & rows pruning:
855        //
856        // Row Groups:
857        // - min-max: filter out row groups 0..=1
858        // - bloom filter: filter out row groups 2..=3
859        let preds = vec![
860            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
861            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
862            col("tag_1").eq(lit("d")),
863        ];
864        let inverted_index_applier = build_inverted_index_applier(&preds);
865        let bloom_filter_applier = build_bloom_filter_applier(&preds);
866
867        let builder = ParquetReaderBuilder::new(
868            FILE_DIR.to_string(),
869            PathType::Bare,
870            handle.clone(),
871            object_store.clone(),
872        )
873        .predicate(Some(Predicate::new(preds)))
874        .inverted_index_applier(inverted_index_applier.clone())
875        .bloom_filter_index_applier(bloom_filter_applier.clone())
876        .cache(CacheStrategy::EnableAll(cache.clone()));
877
878        let mut metrics = ReaderMetrics::default();
879        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
880        let mut reader = ParquetReader::new(Arc::new(context), selection)
881            .await
882            .unwrap();
883        check_reader_result(&mut reader, &[]).await;
884
885        assert_eq!(metrics.filter_metrics.rg_total, 4);
886        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
887        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
888        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
889        let cached = index_result_cache
890            .get(
891                bloom_filter_applier.unwrap().predicate_key(),
892                handle.file_id().file_id(),
893            )
894            .unwrap();
895        assert!(cached.contains_row_group(2));
896        assert!(cached.contains_row_group(3));
897        assert!(!cached.contains_row_group(0));
898        assert!(!cached.contains_row_group(1));
899
900        // Remove the pred of `ts`, continue to use the pred of `tag_1`
901        // to test if cache works.
902
903        // Data: ts tag_0 tag_1
904        // Data: 0-20 [a, d]
905        //       0-20 [b, d]
906        //       0-20 [c, d]
907        //       0-40 [c, f]
908        //    100-200 [c, h]
909        //
910        // Pred: tag_1 = "d"
911        //
912        // Row groups & rows pruning:
913        //
914        // Row Groups:
915        // - bloom filter: filter out row groups 2..=3
916        //
917        // Rows:
918        // - bloom filter: hit row group 0, hit 50 rows
919        //                 hit row group 1, hit 10 rows
920        let preds = vec![col("tag_1").eq(lit("d"))];
921        let inverted_index_applier = build_inverted_index_applier(&preds);
922        let bloom_filter_applier = build_bloom_filter_applier(&preds);
923
924        let builder = ParquetReaderBuilder::new(
925            FILE_DIR.to_string(),
926            PathType::Bare,
927            handle.clone(),
928            object_store.clone(),
929        )
930        .predicate(Some(Predicate::new(preds)))
931        .inverted_index_applier(inverted_index_applier.clone())
932        .bloom_filter_index_applier(bloom_filter_applier.clone())
933        .cache(CacheStrategy::EnableAll(cache.clone()));
934
935        let mut metrics = ReaderMetrics::default();
936        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
937        let mut reader = ParquetReader::new(Arc::new(context), selection)
938            .await
939            .unwrap();
940        check_reader_result(
941            &mut reader,
942            &[
943                new_batch_by_range(&["a", "d"], 0, 20),
944                new_batch_by_range(&["b", "d"], 0, 20),
945                new_batch_by_range(&["c", "d"], 0, 10),
946                new_batch_by_range(&["c", "d"], 10, 20),
947            ],
948        )
949        .await;
950
951        assert_eq!(metrics.filter_metrics.rg_total, 4);
952        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
953        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
954        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
955        let cached = index_result_cache
956            .get(
957                bloom_filter_applier.unwrap().predicate_key(),
958                handle.file_id().file_id(),
959            )
960            .unwrap();
961        assert!(cached.contains_row_group(0));
962        assert!(cached.contains_row_group(1));
963        assert!(cached.contains_row_group(2));
964        assert!(cached.contains_row_group(3));
965    }
966
967    #[tokio::test]
968    async fn test_read_with_override_sequence() {
969        let mut env = TestEnv::new().await;
970        let object_store = env.init_object_store_manager();
971        let handle = sst_file_handle(0, 1000);
972        let file_path = FixedPathProvider {
973            region_file_id: handle.file_id(),
974        };
975        let metadata = Arc::new(sst_region_metadata());
976
977        // Create batches with sequence 0 to trigger override functionality
978        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
979        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
980        let source = new_source(&[batch1, batch2]);
981
982        let write_opts = WriteOptions {
983            row_group_size: 50,
984            ..Default::default()
985        };
986
987        let mut writer = ParquetWriter::new_with_object_store(
988            object_store.clone(),
989            metadata.clone(),
990            NoopIndexBuilder,
991            file_path,
992            Metrics::new(WriteType::Flush),
993        )
994        .await;
995
996        writer
997            .write_all(source, None, &write_opts)
998            .await
999            .unwrap()
1000            .remove(0);
1001
1002        // Read without override sequence (should read sequence 0)
1003        let builder = ParquetReaderBuilder::new(
1004            FILE_DIR.to_string(),
1005            PathType::Bare,
1006            handle.clone(),
1007            object_store.clone(),
1008        );
1009        let mut reader = builder.build().await.unwrap();
1010        let mut normal_batches = Vec::new();
1011        while let Some(batch) = reader.next_batch().await.unwrap() {
1012            normal_batches.push(batch);
1013        }
1014
1015        // Read with override sequence using FileMeta.sequence
1016        let custom_sequence = 12345u64;
1017        let file_meta = handle.meta_ref();
1018        let mut override_file_meta = file_meta.clone();
1019        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1020        let override_handle = FileHandle::new(
1021            override_file_meta,
1022            Arc::new(crate::sst::file_purger::NoopFilePurger),
1023        );
1024
1025        let builder = ParquetReaderBuilder::new(
1026            FILE_DIR.to_string(),
1027            PathType::Bare,
1028            override_handle,
1029            object_store.clone(),
1030        );
1031        let mut reader = builder.build().await.unwrap();
1032        let mut override_batches = Vec::new();
1033        while let Some(batch) = reader.next_batch().await.unwrap() {
1034            override_batches.push(batch);
1035        }
1036
1037        // Compare the results
1038        assert_eq!(normal_batches.len(), override_batches.len());
1039        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1040            // Create expected batch with override sequence
1041            let expected_batch = {
1042                let num_rows = normal.num_rows();
1043                let mut builder = BatchBuilder::from(normal);
1044                builder
1045                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1046                    .unwrap();
1047
1048                builder.build().unwrap()
1049            };
1050
1051            // Override batch should match expected batch
1052            assert_eq!(*override_batch, expected_batch);
1053        }
1054    }
1055}