mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod flat_format;
28pub mod format;
29pub(crate) mod helper;
30pub(crate) mod metadata;
31pub mod reader;
32pub mod row_group;
33pub mod row_selection;
34pub(crate) mod stats;
35pub mod writer;
36
37/// Key of metadata in parquet SST.
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
39
40/// Default batch size to read parquet files.
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
42/// Default row group size for parquet files.
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
44
45/// Parquet write options.
46#[derive(Debug)]
47pub struct WriteOptions {
48    /// Buffer size for async writer.
49    pub write_buffer_size: ReadableSize,
50    /// Row group size.
51    pub row_group_size: usize,
52    /// Max single output file size.
53    /// Note: This is not a hard limit as we can only observe the file size when
54    /// ArrowWrite writes to underlying writers.
55    pub max_file_size: Option<usize>,
56}
57
58impl Default for WriteOptions {
59    fn default() -> Self {
60        WriteOptions {
61            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
62            row_group_size: DEFAULT_ROW_GROUP_SIZE,
63            max_file_size: None,
64        }
65    }
66}
67
68/// Parquet SST info returned by the writer.
69#[derive(Debug)]
70pub struct SstInfo {
71    /// SST file id.
72    pub file_id: FileId,
73    /// Time range of the SST. The timestamps have the same time unit as the
74    /// data in the SST.
75    pub time_range: FileTimeRange,
76    /// File size in bytes.
77    pub file_size: u64,
78    /// Number of rows.
79    pub num_rows: usize,
80    /// Number of row groups
81    pub num_row_groups: u64,
82    /// File Meta Data
83    pub file_metadata: Option<Arc<ParquetMetaData>>,
84    /// Index Meta Data
85    pub index_metadata: IndexOutput,
86}
87
88#[cfg(test)]
89mod tests {
90    use std::collections::HashSet;
91    use std::sync::Arc;
92
93    use api::v1::OpType;
94    use common_time::Timestamp;
95    use datafusion_common::{Column, ScalarValue};
96    use datafusion_expr::{col, lit, BinaryExpr, Expr, Literal, Operator};
97    use datatypes::arrow;
98    use datatypes::arrow::array::{
99        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
100        TimestampMillisecondArray, UInt64Array, UInt8Array,
101    };
102    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
103    use parquet::arrow::AsyncArrowWriter;
104    use parquet::basic::{Compression, Encoding, ZstdLevel};
105    use parquet::file::metadata::KeyValue;
106    use parquet::file::properties::WriterProperties;
107    use store_api::region_request::PathType;
108    use table::predicate::Predicate;
109    use tokio_util::compat::FuturesAsyncWriteCompatExt;
110
111    use super::*;
112    use crate::access_layer::{
113        FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
114    };
115    use crate::cache::{CacheManager, CacheStrategy, PageKey};
116    use crate::read::{BatchBuilder, BatchReader, FlatSource};
117    use crate::region::options::{IndexOptions, InvertedIndexOptions};
118    use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
119    use crate::sst::file_purger::NoopFilePurger;
120    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
121    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
122    use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
123    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
124    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
125    use crate::sst::parquet::writer::ParquetWriter;
126    use crate::sst::{
127        location, to_flat_sst_arrow_schema, FlatSchemaOptions, DEFAULT_WRITE_CONCURRENCY,
128    };
129    use crate::test_util::sst_util::{
130        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
131        new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
132        sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
133    };
134    use crate::test_util::{check_reader_result, TestEnv};
135
136    const FILE_DIR: &str = "/";
137
138    #[derive(Clone)]
139    struct FixedPathProvider {
140        region_file_id: RegionFileId,
141    }
142
143    impl FilePathProvider for FixedPathProvider {
144        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
145            location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
146        }
147
148        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
149            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
150        }
151    }
152
153    struct NoopIndexBuilder;
154
155    #[async_trait::async_trait]
156    impl IndexerBuilder for NoopIndexBuilder {
157        async fn build(&self, _file_id: FileId) -> Indexer {
158            Indexer::default()
159        }
160    }
161
162    #[tokio::test]
163    async fn test_write_read() {
164        let mut env = TestEnv::new().await;
165        let object_store = env.init_object_store_manager();
166        let handle = sst_file_handle(0, 1000);
167        let file_path = FixedPathProvider {
168            region_file_id: handle.file_id(),
169        };
170        let metadata = Arc::new(sst_region_metadata());
171        let source = new_source(&[
172            new_batch_by_range(&["a", "d"], 0, 60),
173            new_batch_by_range(&["b", "f"], 0, 40),
174            new_batch_by_range(&["b", "h"], 100, 200),
175        ]);
176        // Use a small row group size for test.
177        let write_opts = WriteOptions {
178            row_group_size: 50,
179            ..Default::default()
180        };
181
182        let mut writer = ParquetWriter::new_with_object_store(
183            object_store.clone(),
184            metadata.clone(),
185            NoopIndexBuilder,
186            file_path,
187            Metrics::new(WriteType::Flush),
188        )
189        .await;
190
191        let info = writer
192            .write_all(source, None, &write_opts)
193            .await
194            .unwrap()
195            .remove(0);
196        assert_eq!(200, info.num_rows);
197        assert!(info.file_size > 0);
198        assert_eq!(
199            (
200                Timestamp::new_millisecond(0),
201                Timestamp::new_millisecond(199)
202            ),
203            info.time_range
204        );
205
206        let builder = ParquetReaderBuilder::new(
207            FILE_DIR.to_string(),
208            PathType::Bare,
209            handle.clone(),
210            object_store,
211        );
212        let mut reader = builder.build().await.unwrap();
213        check_reader_result(
214            &mut reader,
215            &[
216                new_batch_by_range(&["a", "d"], 0, 50),
217                new_batch_by_range(&["a", "d"], 50, 60),
218                new_batch_by_range(&["b", "f"], 0, 40),
219                new_batch_by_range(&["b", "h"], 100, 150),
220                new_batch_by_range(&["b", "h"], 150, 200),
221            ],
222        )
223        .await;
224    }
225
226    #[tokio::test]
227    async fn test_read_with_cache() {
228        let mut env = TestEnv::new().await;
229        let object_store = env.init_object_store_manager();
230        let handle = sst_file_handle(0, 1000);
231        let metadata = Arc::new(sst_region_metadata());
232        let source = new_source(&[
233            new_batch_by_range(&["a", "d"], 0, 60),
234            new_batch_by_range(&["b", "f"], 0, 40),
235            new_batch_by_range(&["b", "h"], 100, 200),
236        ]);
237        // Use a small row group size for test.
238        let write_opts = WriteOptions {
239            row_group_size: 50,
240            ..Default::default()
241        };
242        // Prepare data.
243        let mut writer = ParquetWriter::new_with_object_store(
244            object_store.clone(),
245            metadata.clone(),
246            NoopIndexBuilder,
247            FixedPathProvider {
248                region_file_id: handle.file_id(),
249            },
250            Metrics::new(WriteType::Flush),
251        )
252        .await;
253
254        let sst_info = writer
255            .write_all(source, None, &write_opts)
256            .await
257            .unwrap()
258            .remove(0);
259
260        // Enable page cache.
261        let cache = CacheStrategy::EnableAll(Arc::new(
262            CacheManager::builder()
263                .page_cache_size(64 * 1024 * 1024)
264                .build(),
265        ));
266        let builder = ParquetReaderBuilder::new(
267            FILE_DIR.to_string(),
268            PathType::Bare,
269            handle.clone(),
270            object_store,
271        )
272        .cache(cache.clone());
273        for _ in 0..3 {
274            let mut reader = builder.build().await.unwrap();
275            check_reader_result(
276                &mut reader,
277                &[
278                    new_batch_by_range(&["a", "d"], 0, 50),
279                    new_batch_by_range(&["a", "d"], 50, 60),
280                    new_batch_by_range(&["b", "f"], 0, 40),
281                    new_batch_by_range(&["b", "h"], 100, 150),
282                    new_batch_by_range(&["b", "h"], 150, 200),
283                ],
284            )
285            .await;
286        }
287
288        let parquet_meta = sst_info.file_metadata.unwrap();
289        let get_ranges = |row_group_idx: usize| {
290            let row_group = parquet_meta.row_group(row_group_idx);
291            let mut ranges = Vec::with_capacity(row_group.num_columns());
292            for i in 0..row_group.num_columns() {
293                let (start, length) = row_group.column(i).byte_range();
294                ranges.push(start..start + length);
295            }
296
297            ranges
298        };
299
300        // Cache 4 row groups.
301        for i in 0..4 {
302            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
303            assert!(cache.get_pages(&page_key).is_some());
304        }
305        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
306        assert!(cache.get_pages(&page_key).is_none());
307    }
308
309    #[tokio::test]
310    async fn test_parquet_metadata_eq() {
311        // create test env
312        let mut env = crate::test_util::TestEnv::new().await;
313        let object_store = env.init_object_store_manager();
314        let handle = sst_file_handle(0, 1000);
315        let metadata = Arc::new(sst_region_metadata());
316        let source = new_source(&[
317            new_batch_by_range(&["a", "d"], 0, 60),
318            new_batch_by_range(&["b", "f"], 0, 40),
319            new_batch_by_range(&["b", "h"], 100, 200),
320        ]);
321        let write_opts = WriteOptions {
322            row_group_size: 50,
323            ..Default::default()
324        };
325
326        // write the sst file and get sst info
327        // sst info contains the parquet metadata, which is converted from FileMetaData
328        let mut writer = ParquetWriter::new_with_object_store(
329            object_store.clone(),
330            metadata.clone(),
331            NoopIndexBuilder,
332            FixedPathProvider {
333                region_file_id: handle.file_id(),
334            },
335            Metrics::new(WriteType::Flush),
336        )
337        .await;
338
339        let sst_info = writer
340            .write_all(source, None, &write_opts)
341            .await
342            .unwrap()
343            .remove(0);
344        let writer_metadata = sst_info.file_metadata.unwrap();
345
346        // read the sst file metadata
347        let builder = ParquetReaderBuilder::new(
348            FILE_DIR.to_string(),
349            PathType::Bare,
350            handle.clone(),
351            object_store,
352        );
353        let reader = builder.build().await.unwrap();
354        let reader_metadata = reader.parquet_metadata();
355
356        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
357    }
358
359    #[tokio::test]
360    async fn test_read_with_tag_filter() {
361        let mut env = TestEnv::new().await;
362        let object_store = env.init_object_store_manager();
363        let handle = sst_file_handle(0, 1000);
364        let metadata = Arc::new(sst_region_metadata());
365        let source = new_source(&[
366            new_batch_by_range(&["a", "d"], 0, 60),
367            new_batch_by_range(&["b", "f"], 0, 40),
368            new_batch_by_range(&["b", "h"], 100, 200),
369        ]);
370        // Use a small row group size for test.
371        let write_opts = WriteOptions {
372            row_group_size: 50,
373            ..Default::default()
374        };
375        // Prepare data.
376        let mut writer = ParquetWriter::new_with_object_store(
377            object_store.clone(),
378            metadata.clone(),
379            NoopIndexBuilder,
380            FixedPathProvider {
381                region_file_id: handle.file_id(),
382            },
383            Metrics::new(WriteType::Flush),
384        )
385        .await;
386        writer
387            .write_all(source, None, &write_opts)
388            .await
389            .unwrap()
390            .remove(0);
391
392        // Predicate
393        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
394            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
395            op: Operator::Eq,
396            right: Box::new("a".lit()),
397        })]));
398
399        let builder = ParquetReaderBuilder::new(
400            FILE_DIR.to_string(),
401            PathType::Bare,
402            handle.clone(),
403            object_store,
404        )
405        .predicate(predicate);
406        let mut reader = builder.build().await.unwrap();
407        check_reader_result(
408            &mut reader,
409            &[
410                new_batch_by_range(&["a", "d"], 0, 50),
411                new_batch_by_range(&["a", "d"], 50, 60),
412            ],
413        )
414        .await;
415    }
416
417    #[tokio::test]
418    async fn test_read_empty_batch() {
419        let mut env = TestEnv::new().await;
420        let object_store = env.init_object_store_manager();
421        let handle = sst_file_handle(0, 1000);
422        let metadata = Arc::new(sst_region_metadata());
423        let source = new_source(&[
424            new_batch_by_range(&["a", "z"], 0, 0),
425            new_batch_by_range(&["a", "z"], 100, 100),
426            new_batch_by_range(&["a", "z"], 200, 230),
427        ]);
428        // Use a small row group size for test.
429        let write_opts = WriteOptions {
430            row_group_size: 50,
431            ..Default::default()
432        };
433        // Prepare data.
434        let mut writer = ParquetWriter::new_with_object_store(
435            object_store.clone(),
436            metadata.clone(),
437            NoopIndexBuilder,
438            FixedPathProvider {
439                region_file_id: handle.file_id(),
440            },
441            Metrics::new(WriteType::Flush),
442        )
443        .await;
444        writer
445            .write_all(source, None, &write_opts)
446            .await
447            .unwrap()
448            .remove(0);
449
450        let builder = ParquetReaderBuilder::new(
451            FILE_DIR.to_string(),
452            PathType::Bare,
453            handle.clone(),
454            object_store,
455        );
456        let mut reader = builder.build().await.unwrap();
457        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
458    }
459
460    #[tokio::test]
461    async fn test_read_with_field_filter() {
462        let mut env = TestEnv::new().await;
463        let object_store = env.init_object_store_manager();
464        let handle = sst_file_handle(0, 1000);
465        let metadata = Arc::new(sst_region_metadata());
466        let source = new_source(&[
467            new_batch_by_range(&["a", "d"], 0, 60),
468            new_batch_by_range(&["b", "f"], 0, 40),
469            new_batch_by_range(&["b", "h"], 100, 200),
470        ]);
471        // Use a small row group size for test.
472        let write_opts = WriteOptions {
473            row_group_size: 50,
474            ..Default::default()
475        };
476        // Prepare data.
477        let mut writer = ParquetWriter::new_with_object_store(
478            object_store.clone(),
479            metadata.clone(),
480            NoopIndexBuilder,
481            FixedPathProvider {
482                region_file_id: handle.file_id(),
483            },
484            Metrics::new(WriteType::Flush),
485        )
486        .await;
487
488        writer
489            .write_all(source, None, &write_opts)
490            .await
491            .unwrap()
492            .remove(0);
493
494        // Predicate
495        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
496            left: Box::new(Expr::Column(Column::from_name("field_0"))),
497            op: Operator::GtEq,
498            right: Box::new(150u64.lit()),
499        })]));
500
501        let builder = ParquetReaderBuilder::new(
502            FILE_DIR.to_string(),
503            PathType::Bare,
504            handle.clone(),
505            object_store,
506        )
507        .predicate(predicate);
508        let mut reader = builder.build().await.unwrap();
509        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
510    }
511
512    #[tokio::test]
513    async fn test_read_large_binary() {
514        let mut env = TestEnv::new().await;
515        let object_store = env.init_object_store_manager();
516        let handle = sst_file_handle(0, 1000);
517        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
518
519        let write_opts = WriteOptions {
520            row_group_size: 50,
521            ..Default::default()
522        };
523
524        let metadata = build_test_binary_test_region_metadata();
525        let json = metadata.to_json().unwrap();
526        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
527
528        let props_builder = WriterProperties::builder()
529            .set_key_value_metadata(Some(vec![key_value_meta]))
530            .set_compression(Compression::ZSTD(ZstdLevel::default()))
531            .set_encoding(Encoding::PLAIN)
532            .set_max_row_group_size(write_opts.row_group_size);
533
534        let writer_props = props_builder.build();
535
536        let write_format = PrimaryKeyWriteFormat::new(metadata);
537        let fields: Vec<_> = write_format
538            .arrow_schema()
539            .fields()
540            .into_iter()
541            .map(|field| {
542                let data_type = field.data_type().clone();
543                if data_type == DataType::Binary {
544                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
545                } else {
546                    Field::new(field.name(), data_type, field.is_nullable())
547                }
548            })
549            .collect();
550
551        let arrow_schema = Arc::new(Schema::new(fields));
552
553        // Ensures field_0 has LargeBinary type.
554        assert_eq!(
555            &DataType::LargeBinary,
556            arrow_schema.field_with_name("field_0").unwrap().data_type()
557        );
558        let mut writer = AsyncArrowWriter::try_new(
559            object_store
560                .writer_with(&file_path)
561                .concurrent(DEFAULT_WRITE_CONCURRENCY)
562                .await
563                .map(|w| w.into_futures_async_write().compat_write())
564                .unwrap(),
565            arrow_schema.clone(),
566            Some(writer_props),
567        )
568        .unwrap();
569
570        let batch = new_batch_with_binary(&["a"], 0, 60);
571        let arrow_batch = write_format.convert_batch(&batch).unwrap();
572        let arrays: Vec<_> = arrow_batch
573            .columns()
574            .iter()
575            .map(|array| {
576                let data_type = array.data_type().clone();
577                if data_type == DataType::Binary {
578                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
579                } else {
580                    array.clone()
581                }
582            })
583            .collect();
584        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
585
586        writer.write(&result).await.unwrap();
587        writer.close().await.unwrap();
588
589        let builder = ParquetReaderBuilder::new(
590            FILE_DIR.to_string(),
591            PathType::Bare,
592            handle.clone(),
593            object_store,
594        );
595        let mut reader = builder.build().await.unwrap();
596        check_reader_result(
597            &mut reader,
598            &[
599                new_batch_with_binary(&["a"], 0, 50),
600                new_batch_with_binary(&["a"], 50, 60),
601            ],
602        )
603        .await;
604    }
605
606    #[tokio::test]
607    async fn test_write_multiple_files() {
608        common_telemetry::init_default_ut_logging();
609        // create test env
610        let mut env = TestEnv::new().await;
611        let object_store = env.init_object_store_manager();
612        let metadata = Arc::new(sst_region_metadata());
613        let batches = &[
614            new_batch_by_range(&["a", "d"], 0, 1000),
615            new_batch_by_range(&["b", "f"], 0, 1000),
616            new_batch_by_range(&["c", "g"], 0, 1000),
617            new_batch_by_range(&["b", "h"], 100, 200),
618            new_batch_by_range(&["b", "h"], 200, 300),
619            new_batch_by_range(&["b", "h"], 300, 1000),
620        ];
621        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
622
623        let source = new_source(batches);
624        let write_opts = WriteOptions {
625            row_group_size: 50,
626            max_file_size: Some(1024 * 16),
627            ..Default::default()
628        };
629
630        let path_provider = RegionFilePathFactory {
631            table_dir: "test".to_string(),
632            path_type: PathType::Bare,
633        };
634        let mut writer = ParquetWriter::new_with_object_store(
635            object_store.clone(),
636            metadata.clone(),
637            NoopIndexBuilder,
638            path_provider,
639            Metrics::new(WriteType::Flush),
640        )
641        .await;
642
643        let files = writer.write_all(source, None, &write_opts).await.unwrap();
644        assert_eq!(2, files.len());
645
646        let mut rows_read = 0;
647        for f in &files {
648            let file_handle = sst_file_handle_with_file_id(
649                f.file_id,
650                f.time_range.0.value(),
651                f.time_range.1.value(),
652            );
653            let builder = ParquetReaderBuilder::new(
654                "test".to_string(),
655                PathType::Bare,
656                file_handle,
657                object_store.clone(),
658            );
659            let mut reader = builder.build().await.unwrap();
660            while let Some(batch) = reader.next_batch().await.unwrap() {
661                rows_read += batch.num_rows();
662            }
663        }
664        assert_eq!(total_rows, rows_read);
665    }
666
667    #[tokio::test]
668    async fn test_write_read_with_index() {
669        let mut env = TestEnv::new().await;
670        let object_store = env.init_object_store_manager();
671        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
672        let metadata = Arc::new(sst_region_metadata());
673        let row_group_size = 50;
674
675        let source = new_source(&[
676            new_batch_by_range(&["a", "d"], 0, 20),
677            new_batch_by_range(&["b", "d"], 0, 20),
678            new_batch_by_range(&["c", "d"], 0, 20),
679            new_batch_by_range(&["c", "f"], 0, 40),
680            new_batch_by_range(&["c", "h"], 100, 200),
681        ]);
682        // Use a small row group size for test.
683        let write_opts = WriteOptions {
684            row_group_size,
685            ..Default::default()
686        };
687
688        let puffin_manager = env
689            .get_puffin_manager()
690            .build(object_store.clone(), file_path.clone());
691        let intermediate_manager = env.get_intermediate_manager();
692
693        let indexer_builder = IndexerBuilderImpl {
694            op_type: OperationType::Flush,
695            metadata: metadata.clone(),
696            row_group_size,
697            puffin_manager,
698            intermediate_manager,
699            index_options: IndexOptions {
700                inverted_index: InvertedIndexOptions {
701                    segment_row_count: 1,
702                    ..Default::default()
703                },
704            },
705            inverted_index_config: Default::default(),
706            fulltext_index_config: Default::default(),
707            bloom_filter_index_config: Default::default(),
708        };
709
710        let mut writer = ParquetWriter::new_with_object_store(
711            object_store.clone(),
712            metadata.clone(),
713            indexer_builder,
714            file_path.clone(),
715            Metrics::new(WriteType::Flush),
716        )
717        .await;
718
719        let info = writer
720            .write_all(source, None, &write_opts)
721            .await
722            .unwrap()
723            .remove(0);
724        assert_eq!(200, info.num_rows);
725        assert!(info.file_size > 0);
726        assert!(info.index_metadata.file_size > 0);
727
728        assert!(info.index_metadata.inverted_index.index_size > 0);
729        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
730        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
731
732        assert!(info.index_metadata.bloom_filter.index_size > 0);
733        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
734        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
735
736        assert_eq!(
737            (
738                Timestamp::new_millisecond(0),
739                Timestamp::new_millisecond(199)
740            ),
741            info.time_range
742        );
743
744        let handle = FileHandle::new(
745            FileMeta {
746                region_id: metadata.region_id,
747                file_id: info.file_id,
748                time_range: info.time_range,
749                level: 0,
750                file_size: info.file_size,
751                available_indexes: info.index_metadata.build_available_indexes(),
752                index_file_size: info.index_metadata.file_size,
753                num_row_groups: info.num_row_groups,
754                num_rows: info.num_rows as u64,
755                sequence: None,
756            },
757            Arc::new(NoopFilePurger),
758        );
759
760        let cache = Arc::new(
761            CacheManager::builder()
762                .index_result_cache_size(1024 * 1024)
763                .index_metadata_size(1024 * 1024)
764                .index_content_page_size(1024 * 1024)
765                .index_content_size(1024 * 1024)
766                .puffin_metadata_size(1024 * 1024)
767                .build(),
768        );
769        let index_result_cache = cache.index_result_cache().unwrap();
770
771        let build_inverted_index_applier = |exprs: &[Expr]| {
772            InvertedIndexApplierBuilder::new(
773                FILE_DIR.to_string(),
774                PathType::Bare,
775                object_store.clone(),
776                &metadata,
777                HashSet::from_iter([0]),
778                env.get_puffin_manager(),
779            )
780            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
781            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
782            .build(exprs)
783            .unwrap()
784            .map(Arc::new)
785        };
786
787        let build_bloom_filter_applier = |exprs: &[Expr]| {
788            BloomFilterIndexApplierBuilder::new(
789                FILE_DIR.to_string(),
790                PathType::Bare,
791                object_store.clone(),
792                &metadata,
793                env.get_puffin_manager(),
794            )
795            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
796            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
797            .build(exprs)
798            .unwrap()
799            .map(Arc::new)
800        };
801
802        // Data: ts tag_0 tag_1
803        // Data: 0-20 [a, d]
804        //       0-20 [b, d]
805        //       0-20 [c, d]
806        //       0-40 [c, f]
807        //    100-200 [c, h]
808        //
809        // Pred: tag_0 = "b"
810        //
811        // Row groups & rows pruning:
812        //
813        // Row Groups:
814        // - min-max: filter out row groups 1..=3
815        //
816        // Rows:
817        // - inverted index: hit row group 0, hit 20 rows
818        let preds = vec![col("tag_0").eq(lit("b"))];
819        let inverted_index_applier = build_inverted_index_applier(&preds);
820        let bloom_filter_applier = build_bloom_filter_applier(&preds);
821
822        let builder = ParquetReaderBuilder::new(
823            FILE_DIR.to_string(),
824            PathType::Bare,
825            handle.clone(),
826            object_store.clone(),
827        )
828        .predicate(Some(Predicate::new(preds)))
829        .inverted_index_applier(inverted_index_applier.clone())
830        .bloom_filter_index_applier(bloom_filter_applier.clone())
831        .cache(CacheStrategy::EnableAll(cache.clone()));
832
833        let mut metrics = ReaderMetrics::default();
834        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
835        let mut reader = ParquetReader::new(Arc::new(context), selection)
836            .await
837            .unwrap();
838        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
839
840        assert_eq!(metrics.filter_metrics.rg_total, 4);
841        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
842        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
843        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
844        let cached = index_result_cache
845            .get(
846                inverted_index_applier.unwrap().predicate_key(),
847                handle.file_id().file_id(),
848            )
849            .unwrap();
850        // inverted index will search all row groups
851        assert!(cached.contains_row_group(0));
852        assert!(cached.contains_row_group(1));
853        assert!(cached.contains_row_group(2));
854        assert!(cached.contains_row_group(3));
855
856        // Data: ts tag_0 tag_1
857        // Data: 0-20 [a, d]
858        //       0-20 [b, d]
859        //       0-20 [c, d]
860        //       0-40 [c, f]
861        //    100-200 [c, h]
862        //
863        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
864        //
865        // Row groups & rows pruning:
866        //
867        // Row Groups:
868        // - min-max: filter out row groups 0..=1
869        // - bloom filter: filter out row groups 2..=3
870        let preds = vec![
871            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
872            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
873            col("tag_1").eq(lit("d")),
874        ];
875        let inverted_index_applier = build_inverted_index_applier(&preds);
876        let bloom_filter_applier = build_bloom_filter_applier(&preds);
877
878        let builder = ParquetReaderBuilder::new(
879            FILE_DIR.to_string(),
880            PathType::Bare,
881            handle.clone(),
882            object_store.clone(),
883        )
884        .predicate(Some(Predicate::new(preds)))
885        .inverted_index_applier(inverted_index_applier.clone())
886        .bloom_filter_index_applier(bloom_filter_applier.clone())
887        .cache(CacheStrategy::EnableAll(cache.clone()));
888
889        let mut metrics = ReaderMetrics::default();
890        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
891        let mut reader = ParquetReader::new(Arc::new(context), selection)
892            .await
893            .unwrap();
894        check_reader_result(&mut reader, &[]).await;
895
896        assert_eq!(metrics.filter_metrics.rg_total, 4);
897        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
898        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
899        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
900        let cached = index_result_cache
901            .get(
902                bloom_filter_applier.unwrap().predicate_key(),
903                handle.file_id().file_id(),
904            )
905            .unwrap();
906        assert!(cached.contains_row_group(2));
907        assert!(cached.contains_row_group(3));
908        assert!(!cached.contains_row_group(0));
909        assert!(!cached.contains_row_group(1));
910
911        // Remove the pred of `ts`, continue to use the pred of `tag_1`
912        // to test if cache works.
913
914        // Data: ts tag_0 tag_1
915        // Data: 0-20 [a, d]
916        //       0-20 [b, d]
917        //       0-20 [c, d]
918        //       0-40 [c, f]
919        //    100-200 [c, h]
920        //
921        // Pred: tag_1 = "d"
922        //
923        // Row groups & rows pruning:
924        //
925        // Row Groups:
926        // - bloom filter: filter out row groups 2..=3
927        //
928        // Rows:
929        // - bloom filter: hit row group 0, hit 50 rows
930        //                 hit row group 1, hit 10 rows
931        let preds = vec![col("tag_1").eq(lit("d"))];
932        let inverted_index_applier = build_inverted_index_applier(&preds);
933        let bloom_filter_applier = build_bloom_filter_applier(&preds);
934
935        let builder = ParquetReaderBuilder::new(
936            FILE_DIR.to_string(),
937            PathType::Bare,
938            handle.clone(),
939            object_store.clone(),
940        )
941        .predicate(Some(Predicate::new(preds)))
942        .inverted_index_applier(inverted_index_applier.clone())
943        .bloom_filter_index_applier(bloom_filter_applier.clone())
944        .cache(CacheStrategy::EnableAll(cache.clone()));
945
946        let mut metrics = ReaderMetrics::default();
947        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
948        let mut reader = ParquetReader::new(Arc::new(context), selection)
949            .await
950            .unwrap();
951        check_reader_result(
952            &mut reader,
953            &[
954                new_batch_by_range(&["a", "d"], 0, 20),
955                new_batch_by_range(&["b", "d"], 0, 20),
956                new_batch_by_range(&["c", "d"], 0, 10),
957                new_batch_by_range(&["c", "d"], 10, 20),
958            ],
959        )
960        .await;
961
962        assert_eq!(metrics.filter_metrics.rg_total, 4);
963        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
964        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
965        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
966        let cached = index_result_cache
967            .get(
968                bloom_filter_applier.unwrap().predicate_key(),
969                handle.file_id().file_id(),
970            )
971            .unwrap();
972        assert!(cached.contains_row_group(0));
973        assert!(cached.contains_row_group(1));
974        assert!(cached.contains_row_group(2));
975        assert!(cached.contains_row_group(3));
976    }
977
978    /// Creates a flat format RecordBatch for testing.
979    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
980    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
981        assert!(end >= start);
982        let metadata = Arc::new(sst_region_metadata());
983        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
984
985        let num_rows = end - start;
986        let mut columns = Vec::new();
987
988        // Add primary key columns (tag_0, tag_1) as dictionary arrays
989        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
990        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
991
992        for _ in 0..num_rows {
993            tag_0_builder.append_value(tags[0]);
994            tag_1_builder.append_value(tags[1]);
995        }
996
997        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
998        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
999
1000        // Add field column (field_0)
1001        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1002        columns.push(Arc::new(UInt64Array::from(field_values)));
1003
1004        // Add time index column (ts)
1005        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1006        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1007
1008        // Add encoded primary key column
1009        let pk = new_primary_key(tags);
1010        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1011        for _ in 0..num_rows {
1012            pk_builder.append(&pk).unwrap();
1013        }
1014        columns.push(Arc::new(pk_builder.finish()));
1015
1016        // Add sequence column
1017        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1018
1019        // Add op_type column
1020        columns.push(Arc::new(UInt8Array::from_value(
1021            OpType::Put as u8,
1022            num_rows,
1023        )));
1024
1025        RecordBatch::try_new(flat_schema, columns).unwrap()
1026    }
1027
1028    /// Creates a FlatSource from flat format RecordBatches.
1029    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1030        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1031    }
1032
1033    #[tokio::test]
1034    async fn test_write_flat_with_index() {
1035        let mut env = TestEnv::new().await;
1036        let object_store = env.init_object_store_manager();
1037        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1038        let metadata = Arc::new(sst_region_metadata());
1039        let row_group_size = 50;
1040
1041        // Create flat format RecordBatches
1042        let flat_batches = vec![
1043            new_record_batch_by_range(&["a", "d"], 0, 20),
1044            new_record_batch_by_range(&["b", "d"], 0, 20),
1045            new_record_batch_by_range(&["c", "d"], 0, 20),
1046            new_record_batch_by_range(&["c", "f"], 0, 40),
1047            new_record_batch_by_range(&["c", "h"], 100, 200),
1048        ];
1049
1050        let flat_source = new_flat_source_from_record_batches(flat_batches);
1051
1052        let write_opts = WriteOptions {
1053            row_group_size,
1054            ..Default::default()
1055        };
1056
1057        let puffin_manager = env
1058            .get_puffin_manager()
1059            .build(object_store.clone(), file_path.clone());
1060        let intermediate_manager = env.get_intermediate_manager();
1061
1062        let indexer_builder = IndexerBuilderImpl {
1063            op_type: OperationType::Flush,
1064            metadata: metadata.clone(),
1065            row_group_size,
1066            puffin_manager,
1067            intermediate_manager,
1068            index_options: IndexOptions {
1069                inverted_index: InvertedIndexOptions {
1070                    segment_row_count: 1,
1071                    ..Default::default()
1072                },
1073            },
1074            inverted_index_config: Default::default(),
1075            fulltext_index_config: Default::default(),
1076            bloom_filter_index_config: Default::default(),
1077        };
1078
1079        let mut writer = ParquetWriter::new_with_object_store(
1080            object_store.clone(),
1081            metadata.clone(),
1082            indexer_builder,
1083            file_path.clone(),
1084            Metrics::new(WriteType::Flush),
1085        )
1086        .await;
1087
1088        let info = writer
1089            .write_all_flat(flat_source, &write_opts)
1090            .await
1091            .unwrap()
1092            .remove(0);
1093        assert_eq!(200, info.num_rows);
1094        assert!(info.file_size > 0);
1095        assert!(info.index_metadata.file_size > 0);
1096
1097        assert!(info.index_metadata.inverted_index.index_size > 0);
1098        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1099        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1100
1101        assert!(info.index_metadata.bloom_filter.index_size > 0);
1102        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1103        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1104
1105        assert_eq!(
1106            (
1107                Timestamp::new_millisecond(0),
1108                Timestamp::new_millisecond(199)
1109            ),
1110            info.time_range
1111        );
1112    }
1113
1114    #[tokio::test]
1115    async fn test_read_with_override_sequence() {
1116        let mut env = TestEnv::new().await;
1117        let object_store = env.init_object_store_manager();
1118        let handle = sst_file_handle(0, 1000);
1119        let file_path = FixedPathProvider {
1120            region_file_id: handle.file_id(),
1121        };
1122        let metadata = Arc::new(sst_region_metadata());
1123
1124        // Create batches with sequence 0 to trigger override functionality
1125        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1126        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1127        let source = new_source(&[batch1, batch2]);
1128
1129        let write_opts = WriteOptions {
1130            row_group_size: 50,
1131            ..Default::default()
1132        };
1133
1134        let mut writer = ParquetWriter::new_with_object_store(
1135            object_store.clone(),
1136            metadata.clone(),
1137            NoopIndexBuilder,
1138            file_path,
1139            Metrics::new(WriteType::Flush),
1140        )
1141        .await;
1142
1143        writer
1144            .write_all(source, None, &write_opts)
1145            .await
1146            .unwrap()
1147            .remove(0);
1148
1149        // Read without override sequence (should read sequence 0)
1150        let builder = ParquetReaderBuilder::new(
1151            FILE_DIR.to_string(),
1152            PathType::Bare,
1153            handle.clone(),
1154            object_store.clone(),
1155        );
1156        let mut reader = builder.build().await.unwrap();
1157        let mut normal_batches = Vec::new();
1158        while let Some(batch) = reader.next_batch().await.unwrap() {
1159            normal_batches.push(batch);
1160        }
1161
1162        // Read with override sequence using FileMeta.sequence
1163        let custom_sequence = 12345u64;
1164        let file_meta = handle.meta_ref();
1165        let mut override_file_meta = file_meta.clone();
1166        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1167        let override_handle = FileHandle::new(
1168            override_file_meta,
1169            Arc::new(crate::sst::file_purger::NoopFilePurger),
1170        );
1171
1172        let builder = ParquetReaderBuilder::new(
1173            FILE_DIR.to_string(),
1174            PathType::Bare,
1175            override_handle,
1176            object_store.clone(),
1177        );
1178        let mut reader = builder.build().await.unwrap();
1179        let mut override_batches = Vec::new();
1180        while let Some(batch) = reader.next_batch().await.unwrap() {
1181            override_batches.push(batch);
1182        }
1183
1184        // Compare the results
1185        assert_eq!(normal_batches.len(), override_batches.len());
1186        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1187            // Create expected batch with override sequence
1188            let expected_batch = {
1189                let num_rows = normal.num_rows();
1190                let mut builder = BatchBuilder::from(normal);
1191                builder
1192                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1193                    .unwrap();
1194
1195                builder.build().unwrap()
1196            };
1197
1198            // Override batch should match expected batch
1199            assert_eq!(*override_batch, expected_batch);
1200        }
1201    }
1202}