mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Number of rows.
80    pub num_rows: usize,
81    /// Number of row groups
82    pub num_row_groups: u64,
83    /// File Meta Data
84    pub file_metadata: Option<Arc<ParquetMetaData>>,
85    /// Index Meta Data
86    pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91    use std::collections::HashSet;
92    use std::sync::Arc;
93
94    use api::v1::OpType;
95    use common_time::Timestamp;
96    use datafusion_common::{Column, ScalarValue};
97    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
98    use datatypes::arrow;
99    use datatypes::arrow::array::{
100        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
101        TimestampMillisecondArray, UInt8Array, UInt64Array,
102    };
103    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
104    use parquet::arrow::AsyncArrowWriter;
105    use parquet::basic::{Compression, Encoding, ZstdLevel};
106    use parquet::file::metadata::KeyValue;
107    use parquet::file::properties::WriterProperties;
108    use store_api::region_request::PathType;
109    use table::predicate::Predicate;
110    use tokio_util::compat::FuturesAsyncWriteCompatExt;
111
112    use super::*;
113    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
114    use crate::cache::{CacheManager, CacheStrategy, PageKey};
115    use crate::config::IndexConfig;
116    use crate::read::{BatchBuilder, BatchReader, FlatSource};
117    use crate::region::options::{IndexOptions, InvertedIndexOptions};
118    use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
119    use crate::sst::file_purger::NoopFilePurger;
120    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
121    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
122    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
123    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
124    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
125    use crate::sst::parquet::writer::ParquetWriter;
126    use crate::sst::{
127        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
128    };
129    use crate::test_util::sst_util::{
130        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
131        new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
132        sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
133    };
134    use crate::test_util::{TestEnv, check_reader_result};
135
136    const FILE_DIR: &str = "/";
137
138    #[derive(Clone)]
139    struct FixedPathProvider {
140        region_file_id: RegionFileId,
141    }
142
143    impl FilePathProvider for FixedPathProvider {
144        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
145            location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
146        }
147
148        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
149            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
150        }
151    }
152
153    struct NoopIndexBuilder;
154
155    #[async_trait::async_trait]
156    impl IndexerBuilder for NoopIndexBuilder {
157        async fn build(&self, _file_id: FileId) -> Indexer {
158            Indexer::default()
159        }
160    }
161
162    #[tokio::test]
163    async fn test_write_read() {
164        let mut env = TestEnv::new().await;
165        let object_store = env.init_object_store_manager();
166        let handle = sst_file_handle(0, 1000);
167        let file_path = FixedPathProvider {
168            region_file_id: handle.file_id(),
169        };
170        let metadata = Arc::new(sst_region_metadata());
171        let source = new_source(&[
172            new_batch_by_range(&["a", "d"], 0, 60),
173            new_batch_by_range(&["b", "f"], 0, 40),
174            new_batch_by_range(&["b", "h"], 100, 200),
175        ]);
176        // Use a small row group size for test.
177        let write_opts = WriteOptions {
178            row_group_size: 50,
179            ..Default::default()
180        };
181
182        let mut writer = ParquetWriter::new_with_object_store(
183            object_store.clone(),
184            metadata.clone(),
185            IndexConfig::default(),
186            NoopIndexBuilder,
187            file_path,
188            Metrics::new(WriteType::Flush),
189        )
190        .await;
191
192        let info = writer
193            .write_all(source, None, &write_opts)
194            .await
195            .unwrap()
196            .remove(0);
197        assert_eq!(200, info.num_rows);
198        assert!(info.file_size > 0);
199        assert_eq!(
200            (
201                Timestamp::new_millisecond(0),
202                Timestamp::new_millisecond(199)
203            ),
204            info.time_range
205        );
206
207        let builder = ParquetReaderBuilder::new(
208            FILE_DIR.to_string(),
209            PathType::Bare,
210            handle.clone(),
211            object_store,
212        );
213        let mut reader = builder.build().await.unwrap();
214        check_reader_result(
215            &mut reader,
216            &[
217                new_batch_by_range(&["a", "d"], 0, 50),
218                new_batch_by_range(&["a", "d"], 50, 60),
219                new_batch_by_range(&["b", "f"], 0, 40),
220                new_batch_by_range(&["b", "h"], 100, 150),
221                new_batch_by_range(&["b", "h"], 150, 200),
222            ],
223        )
224        .await;
225    }
226
227    #[tokio::test]
228    async fn test_read_with_cache() {
229        let mut env = TestEnv::new().await;
230        let object_store = env.init_object_store_manager();
231        let handle = sst_file_handle(0, 1000);
232        let metadata = Arc::new(sst_region_metadata());
233        let source = new_source(&[
234            new_batch_by_range(&["a", "d"], 0, 60),
235            new_batch_by_range(&["b", "f"], 0, 40),
236            new_batch_by_range(&["b", "h"], 100, 200),
237        ]);
238        // Use a small row group size for test.
239        let write_opts = WriteOptions {
240            row_group_size: 50,
241            ..Default::default()
242        };
243        // Prepare data.
244        let mut writer = ParquetWriter::new_with_object_store(
245            object_store.clone(),
246            metadata.clone(),
247            IndexConfig::default(),
248            NoopIndexBuilder,
249            FixedPathProvider {
250                region_file_id: handle.file_id(),
251            },
252            Metrics::new(WriteType::Flush),
253        )
254        .await;
255
256        let sst_info = writer
257            .write_all(source, None, &write_opts)
258            .await
259            .unwrap()
260            .remove(0);
261
262        // Enable page cache.
263        let cache = CacheStrategy::EnableAll(Arc::new(
264            CacheManager::builder()
265                .page_cache_size(64 * 1024 * 1024)
266                .build(),
267        ));
268        let builder = ParquetReaderBuilder::new(
269            FILE_DIR.to_string(),
270            PathType::Bare,
271            handle.clone(),
272            object_store,
273        )
274        .cache(cache.clone());
275        for _ in 0..3 {
276            let mut reader = builder.build().await.unwrap();
277            check_reader_result(
278                &mut reader,
279                &[
280                    new_batch_by_range(&["a", "d"], 0, 50),
281                    new_batch_by_range(&["a", "d"], 50, 60),
282                    new_batch_by_range(&["b", "f"], 0, 40),
283                    new_batch_by_range(&["b", "h"], 100, 150),
284                    new_batch_by_range(&["b", "h"], 150, 200),
285                ],
286            )
287            .await;
288        }
289
290        let parquet_meta = sst_info.file_metadata.unwrap();
291        let get_ranges = |row_group_idx: usize| {
292            let row_group = parquet_meta.row_group(row_group_idx);
293            let mut ranges = Vec::with_capacity(row_group.num_columns());
294            for i in 0..row_group.num_columns() {
295                let (start, length) = row_group.column(i).byte_range();
296                ranges.push(start..start + length);
297            }
298
299            ranges
300        };
301
302        // Cache 4 row groups.
303        for i in 0..4 {
304            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
305            assert!(cache.get_pages(&page_key).is_some());
306        }
307        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
308        assert!(cache.get_pages(&page_key).is_none());
309    }
310
311    #[tokio::test]
312    async fn test_parquet_metadata_eq() {
313        // create test env
314        let mut env = crate::test_util::TestEnv::new().await;
315        let object_store = env.init_object_store_manager();
316        let handle = sst_file_handle(0, 1000);
317        let metadata = Arc::new(sst_region_metadata());
318        let source = new_source(&[
319            new_batch_by_range(&["a", "d"], 0, 60),
320            new_batch_by_range(&["b", "f"], 0, 40),
321            new_batch_by_range(&["b", "h"], 100, 200),
322        ]);
323        let write_opts = WriteOptions {
324            row_group_size: 50,
325            ..Default::default()
326        };
327
328        // write the sst file and get sst info
329        // sst info contains the parquet metadata, which is converted from FileMetaData
330        let mut writer = ParquetWriter::new_with_object_store(
331            object_store.clone(),
332            metadata.clone(),
333            IndexConfig::default(),
334            NoopIndexBuilder,
335            FixedPathProvider {
336                region_file_id: handle.file_id(),
337            },
338            Metrics::new(WriteType::Flush),
339        )
340        .await;
341
342        let sst_info = writer
343            .write_all(source, None, &write_opts)
344            .await
345            .unwrap()
346            .remove(0);
347        let writer_metadata = sst_info.file_metadata.unwrap();
348
349        // read the sst file metadata
350        let builder = ParquetReaderBuilder::new(
351            FILE_DIR.to_string(),
352            PathType::Bare,
353            handle.clone(),
354            object_store,
355        );
356        let reader = builder.build().await.unwrap();
357        let reader_metadata = reader.parquet_metadata();
358
359        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
360    }
361
362    #[tokio::test]
363    async fn test_read_with_tag_filter() {
364        let mut env = TestEnv::new().await;
365        let object_store = env.init_object_store_manager();
366        let handle = sst_file_handle(0, 1000);
367        let metadata = Arc::new(sst_region_metadata());
368        let source = new_source(&[
369            new_batch_by_range(&["a", "d"], 0, 60),
370            new_batch_by_range(&["b", "f"], 0, 40),
371            new_batch_by_range(&["b", "h"], 100, 200),
372        ]);
373        // Use a small row group size for test.
374        let write_opts = WriteOptions {
375            row_group_size: 50,
376            ..Default::default()
377        };
378        // Prepare data.
379        let mut writer = ParquetWriter::new_with_object_store(
380            object_store.clone(),
381            metadata.clone(),
382            IndexConfig::default(),
383            NoopIndexBuilder,
384            FixedPathProvider {
385                region_file_id: handle.file_id(),
386            },
387            Metrics::new(WriteType::Flush),
388        )
389        .await;
390        writer
391            .write_all(source, None, &write_opts)
392            .await
393            .unwrap()
394            .remove(0);
395
396        // Predicate
397        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
398            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
399            op: Operator::Eq,
400            right: Box::new("a".lit()),
401        })]));
402
403        let builder = ParquetReaderBuilder::new(
404            FILE_DIR.to_string(),
405            PathType::Bare,
406            handle.clone(),
407            object_store,
408        )
409        .predicate(predicate);
410        let mut reader = builder.build().await.unwrap();
411        check_reader_result(
412            &mut reader,
413            &[
414                new_batch_by_range(&["a", "d"], 0, 50),
415                new_batch_by_range(&["a", "d"], 50, 60),
416            ],
417        )
418        .await;
419    }
420
421    #[tokio::test]
422    async fn test_read_empty_batch() {
423        let mut env = TestEnv::new().await;
424        let object_store = env.init_object_store_manager();
425        let handle = sst_file_handle(0, 1000);
426        let metadata = Arc::new(sst_region_metadata());
427        let source = new_source(&[
428            new_batch_by_range(&["a", "z"], 0, 0),
429            new_batch_by_range(&["a", "z"], 100, 100),
430            new_batch_by_range(&["a", "z"], 200, 230),
431        ]);
432        // Use a small row group size for test.
433        let write_opts = WriteOptions {
434            row_group_size: 50,
435            ..Default::default()
436        };
437        // Prepare data.
438        let mut writer = ParquetWriter::new_with_object_store(
439            object_store.clone(),
440            metadata.clone(),
441            IndexConfig::default(),
442            NoopIndexBuilder,
443            FixedPathProvider {
444                region_file_id: handle.file_id(),
445            },
446            Metrics::new(WriteType::Flush),
447        )
448        .await;
449        writer
450            .write_all(source, None, &write_opts)
451            .await
452            .unwrap()
453            .remove(0);
454
455        let builder = ParquetReaderBuilder::new(
456            FILE_DIR.to_string(),
457            PathType::Bare,
458            handle.clone(),
459            object_store,
460        );
461        let mut reader = builder.build().await.unwrap();
462        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
463    }
464
465    #[tokio::test]
466    async fn test_read_with_field_filter() {
467        let mut env = TestEnv::new().await;
468        let object_store = env.init_object_store_manager();
469        let handle = sst_file_handle(0, 1000);
470        let metadata = Arc::new(sst_region_metadata());
471        let source = new_source(&[
472            new_batch_by_range(&["a", "d"], 0, 60),
473            new_batch_by_range(&["b", "f"], 0, 40),
474            new_batch_by_range(&["b", "h"], 100, 200),
475        ]);
476        // Use a small row group size for test.
477        let write_opts = WriteOptions {
478            row_group_size: 50,
479            ..Default::default()
480        };
481        // Prepare data.
482        let mut writer = ParquetWriter::new_with_object_store(
483            object_store.clone(),
484            metadata.clone(),
485            IndexConfig::default(),
486            NoopIndexBuilder,
487            FixedPathProvider {
488                region_file_id: handle.file_id(),
489            },
490            Metrics::new(WriteType::Flush),
491        )
492        .await;
493
494        writer
495            .write_all(source, None, &write_opts)
496            .await
497            .unwrap()
498            .remove(0);
499
500        // Predicate
501        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
502            left: Box::new(Expr::Column(Column::from_name("field_0"))),
503            op: Operator::GtEq,
504            right: Box::new(150u64.lit()),
505        })]));
506
507        let builder = ParquetReaderBuilder::new(
508            FILE_DIR.to_string(),
509            PathType::Bare,
510            handle.clone(),
511            object_store,
512        )
513        .predicate(predicate);
514        let mut reader = builder.build().await.unwrap();
515        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
516    }
517
518    #[tokio::test]
519    async fn test_read_large_binary() {
520        let mut env = TestEnv::new().await;
521        let object_store = env.init_object_store_manager();
522        let handle = sst_file_handle(0, 1000);
523        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
524
525        let write_opts = WriteOptions {
526            row_group_size: 50,
527            ..Default::default()
528        };
529
530        let metadata = build_test_binary_test_region_metadata();
531        let json = metadata.to_json().unwrap();
532        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
533
534        let props_builder = WriterProperties::builder()
535            .set_key_value_metadata(Some(vec![key_value_meta]))
536            .set_compression(Compression::ZSTD(ZstdLevel::default()))
537            .set_encoding(Encoding::PLAIN)
538            .set_max_row_group_size(write_opts.row_group_size);
539
540        let writer_props = props_builder.build();
541
542        let write_format = PrimaryKeyWriteFormat::new(metadata);
543        let fields: Vec<_> = write_format
544            .arrow_schema()
545            .fields()
546            .into_iter()
547            .map(|field| {
548                let data_type = field.data_type().clone();
549                if data_type == DataType::Binary {
550                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
551                } else {
552                    Field::new(field.name(), data_type, field.is_nullable())
553                }
554            })
555            .collect();
556
557        let arrow_schema = Arc::new(Schema::new(fields));
558
559        // Ensures field_0 has LargeBinary type.
560        assert_eq!(
561            &DataType::LargeBinary,
562            arrow_schema.field_with_name("field_0").unwrap().data_type()
563        );
564        let mut writer = AsyncArrowWriter::try_new(
565            object_store
566                .writer_with(&file_path)
567                .concurrent(DEFAULT_WRITE_CONCURRENCY)
568                .await
569                .map(|w| w.into_futures_async_write().compat_write())
570                .unwrap(),
571            arrow_schema.clone(),
572            Some(writer_props),
573        )
574        .unwrap();
575
576        let batch = new_batch_with_binary(&["a"], 0, 60);
577        let arrow_batch = write_format.convert_batch(&batch).unwrap();
578        let arrays: Vec<_> = arrow_batch
579            .columns()
580            .iter()
581            .map(|array| {
582                let data_type = array.data_type().clone();
583                if data_type == DataType::Binary {
584                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
585                } else {
586                    array.clone()
587                }
588            })
589            .collect();
590        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
591
592        writer.write(&result).await.unwrap();
593        writer.close().await.unwrap();
594
595        let builder = ParquetReaderBuilder::new(
596            FILE_DIR.to_string(),
597            PathType::Bare,
598            handle.clone(),
599            object_store,
600        );
601        let mut reader = builder.build().await.unwrap();
602        check_reader_result(
603            &mut reader,
604            &[
605                new_batch_with_binary(&["a"], 0, 50),
606                new_batch_with_binary(&["a"], 50, 60),
607            ],
608        )
609        .await;
610    }
611
612    #[tokio::test]
613    async fn test_write_multiple_files() {
614        common_telemetry::init_default_ut_logging();
615        // create test env
616        let mut env = TestEnv::new().await;
617        let object_store = env.init_object_store_manager();
618        let metadata = Arc::new(sst_region_metadata());
619        let batches = &[
620            new_batch_by_range(&["a", "d"], 0, 1000),
621            new_batch_by_range(&["b", "f"], 0, 1000),
622            new_batch_by_range(&["c", "g"], 0, 1000),
623            new_batch_by_range(&["b", "h"], 100, 200),
624            new_batch_by_range(&["b", "h"], 200, 300),
625            new_batch_by_range(&["b", "h"], 300, 1000),
626        ];
627        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
628
629        let source = new_source(batches);
630        let write_opts = WriteOptions {
631            row_group_size: 50,
632            max_file_size: Some(1024 * 16),
633            ..Default::default()
634        };
635
636        let path_provider = RegionFilePathFactory {
637            table_dir: "test".to_string(),
638            path_type: PathType::Bare,
639        };
640        let mut writer = ParquetWriter::new_with_object_store(
641            object_store.clone(),
642            metadata.clone(),
643            IndexConfig::default(),
644            NoopIndexBuilder,
645            path_provider,
646            Metrics::new(WriteType::Flush),
647        )
648        .await;
649
650        let files = writer.write_all(source, None, &write_opts).await.unwrap();
651        assert_eq!(2, files.len());
652
653        let mut rows_read = 0;
654        for f in &files {
655            let file_handle = sst_file_handle_with_file_id(
656                f.file_id,
657                f.time_range.0.value(),
658                f.time_range.1.value(),
659            );
660            let builder = ParquetReaderBuilder::new(
661                "test".to_string(),
662                PathType::Bare,
663                file_handle,
664                object_store.clone(),
665            );
666            let mut reader = builder.build().await.unwrap();
667            while let Some(batch) = reader.next_batch().await.unwrap() {
668                rows_read += batch.num_rows();
669            }
670        }
671        assert_eq!(total_rows, rows_read);
672    }
673
674    #[tokio::test]
675    async fn test_write_read_with_index() {
676        let mut env = TestEnv::new().await;
677        let object_store = env.init_object_store_manager();
678        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
679        let metadata = Arc::new(sst_region_metadata());
680        let row_group_size = 50;
681
682        let source = new_source(&[
683            new_batch_by_range(&["a", "d"], 0, 20),
684            new_batch_by_range(&["b", "d"], 0, 20),
685            new_batch_by_range(&["c", "d"], 0, 20),
686            new_batch_by_range(&["c", "f"], 0, 40),
687            new_batch_by_range(&["c", "h"], 100, 200),
688        ]);
689        // Use a small row group size for test.
690        let write_opts = WriteOptions {
691            row_group_size,
692            ..Default::default()
693        };
694
695        let puffin_manager = env
696            .get_puffin_manager()
697            .build(object_store.clone(), file_path.clone());
698        let intermediate_manager = env.get_intermediate_manager();
699
700        let indexer_builder = IndexerBuilderImpl {
701            build_type: IndexBuildType::Flush,
702            metadata: metadata.clone(),
703            row_group_size,
704            puffin_manager,
705            intermediate_manager,
706            index_options: IndexOptions {
707                inverted_index: InvertedIndexOptions {
708                    segment_row_count: 1,
709                    ..Default::default()
710                },
711            },
712            inverted_index_config: Default::default(),
713            fulltext_index_config: Default::default(),
714            bloom_filter_index_config: Default::default(),
715        };
716
717        let mut writer = ParquetWriter::new_with_object_store(
718            object_store.clone(),
719            metadata.clone(),
720            IndexConfig::default(),
721            indexer_builder,
722            file_path.clone(),
723            Metrics::new(WriteType::Flush),
724        )
725        .await;
726
727        let info = writer
728            .write_all(source, None, &write_opts)
729            .await
730            .unwrap()
731            .remove(0);
732        assert_eq!(200, info.num_rows);
733        assert!(info.file_size > 0);
734        assert!(info.index_metadata.file_size > 0);
735
736        assert!(info.index_metadata.inverted_index.index_size > 0);
737        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
738        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
739
740        assert!(info.index_metadata.bloom_filter.index_size > 0);
741        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
742        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
743
744        assert_eq!(
745            (
746                Timestamp::new_millisecond(0),
747                Timestamp::new_millisecond(199)
748            ),
749            info.time_range
750        );
751
752        let handle = FileHandle::new(
753            FileMeta {
754                region_id: metadata.region_id,
755                file_id: info.file_id,
756                time_range: info.time_range,
757                level: 0,
758                file_size: info.file_size,
759                available_indexes: info.index_metadata.build_available_indexes(),
760                index_file_size: info.index_metadata.file_size,
761                num_row_groups: info.num_row_groups,
762                num_rows: info.num_rows as u64,
763                sequence: None,
764                partition_expr: match &metadata.partition_expr {
765                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
766                        .expect("partition expression should be valid JSON"),
767                    None => None,
768                },
769            },
770            Arc::new(NoopFilePurger),
771        );
772
773        let cache = Arc::new(
774            CacheManager::builder()
775                .index_result_cache_size(1024 * 1024)
776                .index_metadata_size(1024 * 1024)
777                .index_content_page_size(1024 * 1024)
778                .index_content_size(1024 * 1024)
779                .puffin_metadata_size(1024 * 1024)
780                .build(),
781        );
782        let index_result_cache = cache.index_result_cache().unwrap();
783
784        let build_inverted_index_applier = |exprs: &[Expr]| {
785            InvertedIndexApplierBuilder::new(
786                FILE_DIR.to_string(),
787                PathType::Bare,
788                object_store.clone(),
789                &metadata,
790                HashSet::from_iter([0]),
791                env.get_puffin_manager(),
792            )
793            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
794            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
795            .build(exprs)
796            .unwrap()
797            .map(Arc::new)
798        };
799
800        let build_bloom_filter_applier = |exprs: &[Expr]| {
801            BloomFilterIndexApplierBuilder::new(
802                FILE_DIR.to_string(),
803                PathType::Bare,
804                object_store.clone(),
805                &metadata,
806                env.get_puffin_manager(),
807            )
808            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
809            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
810            .build(exprs)
811            .unwrap()
812            .map(Arc::new)
813        };
814
815        // Data: ts tag_0 tag_1
816        // Data: 0-20 [a, d]
817        //       0-20 [b, d]
818        //       0-20 [c, d]
819        //       0-40 [c, f]
820        //    100-200 [c, h]
821        //
822        // Pred: tag_0 = "b"
823        //
824        // Row groups & rows pruning:
825        //
826        // Row Groups:
827        // - min-max: filter out row groups 1..=3
828        //
829        // Rows:
830        // - inverted index: hit row group 0, hit 20 rows
831        let preds = vec![col("tag_0").eq(lit("b"))];
832        let inverted_index_applier = build_inverted_index_applier(&preds);
833        let bloom_filter_applier = build_bloom_filter_applier(&preds);
834
835        let builder = ParquetReaderBuilder::new(
836            FILE_DIR.to_string(),
837            PathType::Bare,
838            handle.clone(),
839            object_store.clone(),
840        )
841        .predicate(Some(Predicate::new(preds)))
842        .inverted_index_applier(inverted_index_applier.clone())
843        .bloom_filter_index_applier(bloom_filter_applier.clone())
844        .cache(CacheStrategy::EnableAll(cache.clone()));
845
846        let mut metrics = ReaderMetrics::default();
847        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
848        let mut reader = ParquetReader::new(Arc::new(context), selection)
849            .await
850            .unwrap();
851        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
852
853        assert_eq!(metrics.filter_metrics.rg_total, 4);
854        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
855        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
856        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
857        let cached = index_result_cache
858            .get(
859                inverted_index_applier.unwrap().predicate_key(),
860                handle.file_id().file_id(),
861            )
862            .unwrap();
863        // inverted index will search all row groups
864        assert!(cached.contains_row_group(0));
865        assert!(cached.contains_row_group(1));
866        assert!(cached.contains_row_group(2));
867        assert!(cached.contains_row_group(3));
868
869        // Data: ts tag_0 tag_1
870        // Data: 0-20 [a, d]
871        //       0-20 [b, d]
872        //       0-20 [c, d]
873        //       0-40 [c, f]
874        //    100-200 [c, h]
875        //
876        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
877        //
878        // Row groups & rows pruning:
879        //
880        // Row Groups:
881        // - min-max: filter out row groups 0..=1
882        // - bloom filter: filter out row groups 2..=3
883        let preds = vec![
884            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
885            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
886            col("tag_1").eq(lit("d")),
887        ];
888        let inverted_index_applier = build_inverted_index_applier(&preds);
889        let bloom_filter_applier = build_bloom_filter_applier(&preds);
890
891        let builder = ParquetReaderBuilder::new(
892            FILE_DIR.to_string(),
893            PathType::Bare,
894            handle.clone(),
895            object_store.clone(),
896        )
897        .predicate(Some(Predicate::new(preds)))
898        .inverted_index_applier(inverted_index_applier.clone())
899        .bloom_filter_index_applier(bloom_filter_applier.clone())
900        .cache(CacheStrategy::EnableAll(cache.clone()));
901
902        let mut metrics = ReaderMetrics::default();
903        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
904        let mut reader = ParquetReader::new(Arc::new(context), selection)
905            .await
906            .unwrap();
907        check_reader_result(&mut reader, &[]).await;
908
909        assert_eq!(metrics.filter_metrics.rg_total, 4);
910        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
911        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
912        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
913        let cached = index_result_cache
914            .get(
915                bloom_filter_applier.unwrap().predicate_key(),
916                handle.file_id().file_id(),
917            )
918            .unwrap();
919        assert!(cached.contains_row_group(2));
920        assert!(cached.contains_row_group(3));
921        assert!(!cached.contains_row_group(0));
922        assert!(!cached.contains_row_group(1));
923
924        // Remove the pred of `ts`, continue to use the pred of `tag_1`
925        // to test if cache works.
926
927        // Data: ts tag_0 tag_1
928        // Data: 0-20 [a, d]
929        //       0-20 [b, d]
930        //       0-20 [c, d]
931        //       0-40 [c, f]
932        //    100-200 [c, h]
933        //
934        // Pred: tag_1 = "d"
935        //
936        // Row groups & rows pruning:
937        //
938        // Row Groups:
939        // - bloom filter: filter out row groups 2..=3
940        //
941        // Rows:
942        // - bloom filter: hit row group 0, hit 50 rows
943        //                 hit row group 1, hit 10 rows
944        let preds = vec![col("tag_1").eq(lit("d"))];
945        let inverted_index_applier = build_inverted_index_applier(&preds);
946        let bloom_filter_applier = build_bloom_filter_applier(&preds);
947
948        let builder = ParquetReaderBuilder::new(
949            FILE_DIR.to_string(),
950            PathType::Bare,
951            handle.clone(),
952            object_store.clone(),
953        )
954        .predicate(Some(Predicate::new(preds)))
955        .inverted_index_applier(inverted_index_applier.clone())
956        .bloom_filter_index_applier(bloom_filter_applier.clone())
957        .cache(CacheStrategy::EnableAll(cache.clone()));
958
959        let mut metrics = ReaderMetrics::default();
960        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
961        let mut reader = ParquetReader::new(Arc::new(context), selection)
962            .await
963            .unwrap();
964        check_reader_result(
965            &mut reader,
966            &[
967                new_batch_by_range(&["a", "d"], 0, 20),
968                new_batch_by_range(&["b", "d"], 0, 20),
969                new_batch_by_range(&["c", "d"], 0, 10),
970                new_batch_by_range(&["c", "d"], 10, 20),
971            ],
972        )
973        .await;
974
975        assert_eq!(metrics.filter_metrics.rg_total, 4);
976        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
977        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
978        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
979        let cached = index_result_cache
980            .get(
981                bloom_filter_applier.unwrap().predicate_key(),
982                handle.file_id().file_id(),
983            )
984            .unwrap();
985        assert!(cached.contains_row_group(0));
986        assert!(cached.contains_row_group(1));
987        assert!(cached.contains_row_group(2));
988        assert!(cached.contains_row_group(3));
989    }
990
991    /// Creates a flat format RecordBatch for testing.
992    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
993    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
994        assert!(end >= start);
995        let metadata = Arc::new(sst_region_metadata());
996        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
997
998        let num_rows = end - start;
999        let mut columns = Vec::new();
1000
1001        // Add primary key columns (tag_0, tag_1) as dictionary arrays
1002        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1003        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1004
1005        for _ in 0..num_rows {
1006            tag_0_builder.append_value(tags[0]);
1007            tag_1_builder.append_value(tags[1]);
1008        }
1009
1010        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1011        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1012
1013        // Add field column (field_0)
1014        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1015        columns.push(Arc::new(UInt64Array::from(field_values)));
1016
1017        // Add time index column (ts)
1018        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1019        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1020
1021        // Add encoded primary key column
1022        let pk = new_primary_key(tags);
1023        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1024        for _ in 0..num_rows {
1025            pk_builder.append(&pk).unwrap();
1026        }
1027        columns.push(Arc::new(pk_builder.finish()));
1028
1029        // Add sequence column
1030        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1031
1032        // Add op_type column
1033        columns.push(Arc::new(UInt8Array::from_value(
1034            OpType::Put as u8,
1035            num_rows,
1036        )));
1037
1038        RecordBatch::try_new(flat_schema, columns).unwrap()
1039    }
1040
1041    /// Creates a FlatSource from flat format RecordBatches.
1042    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1043        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1044    }
1045
1046    #[tokio::test]
1047    async fn test_write_flat_with_index() {
1048        let mut env = TestEnv::new().await;
1049        let object_store = env.init_object_store_manager();
1050        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1051        let metadata = Arc::new(sst_region_metadata());
1052        let row_group_size = 50;
1053
1054        // Create flat format RecordBatches
1055        let flat_batches = vec![
1056            new_record_batch_by_range(&["a", "d"], 0, 20),
1057            new_record_batch_by_range(&["b", "d"], 0, 20),
1058            new_record_batch_by_range(&["c", "d"], 0, 20),
1059            new_record_batch_by_range(&["c", "f"], 0, 40),
1060            new_record_batch_by_range(&["c", "h"], 100, 200),
1061        ];
1062
1063        let flat_source = new_flat_source_from_record_batches(flat_batches);
1064
1065        let write_opts = WriteOptions {
1066            row_group_size,
1067            ..Default::default()
1068        };
1069
1070        let puffin_manager = env
1071            .get_puffin_manager()
1072            .build(object_store.clone(), file_path.clone());
1073        let intermediate_manager = env.get_intermediate_manager();
1074
1075        let indexer_builder = IndexerBuilderImpl {
1076            build_type: IndexBuildType::Flush,
1077            metadata: metadata.clone(),
1078            row_group_size,
1079            puffin_manager,
1080            intermediate_manager,
1081            index_options: IndexOptions {
1082                inverted_index: InvertedIndexOptions {
1083                    segment_row_count: 1,
1084                    ..Default::default()
1085                },
1086            },
1087            inverted_index_config: Default::default(),
1088            fulltext_index_config: Default::default(),
1089            bloom_filter_index_config: Default::default(),
1090        };
1091
1092        let mut writer = ParquetWriter::new_with_object_store(
1093            object_store.clone(),
1094            metadata.clone(),
1095            IndexConfig::default(),
1096            indexer_builder,
1097            file_path.clone(),
1098            Metrics::new(WriteType::Flush),
1099        )
1100        .await;
1101
1102        let info = writer
1103            .write_all_flat(flat_source, &write_opts)
1104            .await
1105            .unwrap()
1106            .remove(0);
1107        assert_eq!(200, info.num_rows);
1108        assert!(info.file_size > 0);
1109        assert!(info.index_metadata.file_size > 0);
1110
1111        assert!(info.index_metadata.inverted_index.index_size > 0);
1112        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1113        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1114
1115        assert!(info.index_metadata.bloom_filter.index_size > 0);
1116        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1117        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1118
1119        assert_eq!(
1120            (
1121                Timestamp::new_millisecond(0),
1122                Timestamp::new_millisecond(199)
1123            ),
1124            info.time_range
1125        );
1126    }
1127
1128    #[tokio::test]
1129    async fn test_read_with_override_sequence() {
1130        let mut env = TestEnv::new().await;
1131        let object_store = env.init_object_store_manager();
1132        let handle = sst_file_handle(0, 1000);
1133        let file_path = FixedPathProvider {
1134            region_file_id: handle.file_id(),
1135        };
1136        let metadata = Arc::new(sst_region_metadata());
1137
1138        // Create batches with sequence 0 to trigger override functionality
1139        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1140        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1141        let source = new_source(&[batch1, batch2]);
1142
1143        let write_opts = WriteOptions {
1144            row_group_size: 50,
1145            ..Default::default()
1146        };
1147
1148        let mut writer = ParquetWriter::new_with_object_store(
1149            object_store.clone(),
1150            metadata.clone(),
1151            IndexConfig::default(),
1152            NoopIndexBuilder,
1153            file_path,
1154            Metrics::new(WriteType::Flush),
1155        )
1156        .await;
1157
1158        writer
1159            .write_all(source, None, &write_opts)
1160            .await
1161            .unwrap()
1162            .remove(0);
1163
1164        // Read without override sequence (should read sequence 0)
1165        let builder = ParquetReaderBuilder::new(
1166            FILE_DIR.to_string(),
1167            PathType::Bare,
1168            handle.clone(),
1169            object_store.clone(),
1170        );
1171        let mut reader = builder.build().await.unwrap();
1172        let mut normal_batches = Vec::new();
1173        while let Some(batch) = reader.next_batch().await.unwrap() {
1174            normal_batches.push(batch);
1175        }
1176
1177        // Read with override sequence using FileMeta.sequence
1178        let custom_sequence = 12345u64;
1179        let file_meta = handle.meta_ref();
1180        let mut override_file_meta = file_meta.clone();
1181        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1182        let override_handle = FileHandle::new(
1183            override_file_meta,
1184            Arc::new(crate::sst::file_purger::NoopFilePurger),
1185        );
1186
1187        let builder = ParquetReaderBuilder::new(
1188            FILE_DIR.to_string(),
1189            PathType::Bare,
1190            override_handle,
1191            object_store.clone(),
1192        );
1193        let mut reader = builder.build().await.unwrap();
1194        let mut override_batches = Vec::new();
1195        while let Some(batch) = reader.next_batch().await.unwrap() {
1196            override_batches.push(batch);
1197        }
1198
1199        // Compare the results
1200        assert_eq!(normal_batches.len(), override_batches.len());
1201        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1202            // Create expected batch with override sequence
1203            let expected_batch = {
1204                let num_rows = normal.num_rows();
1205                let mut builder = BatchBuilder::from(normal);
1206                builder
1207                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1208                    .unwrap();
1209
1210                builder.build().unwrap()
1211            };
1212
1213            // Override batch should match expected batch
1214            assert_eq!(*override_batch, expected_batch);
1215        }
1216    }
1217}