mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Maximum uncompressed row group size in bytes. 0 if unknown.
80    pub max_row_group_uncompressed_size: u64,
81    /// Number of rows.
82    pub num_rows: usize,
83    /// Number of row groups
84    pub num_row_groups: u64,
85    /// File Meta Data
86    pub file_metadata: Option<Arc<ParquetMetaData>>,
87    /// Index Meta Data
88    pub index_metadata: IndexOutput,
89    /// Number of series
90    pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95    use std::collections::HashSet;
96    use std::sync::Arc;
97
98    use api::v1::OpType;
99    use common_time::Timestamp;
100    use datafusion_common::{Column, ScalarValue};
101    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
102    use datatypes::arrow;
103    use datatypes::arrow::array::{
104        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
105        TimestampMillisecondArray, UInt8Array, UInt64Array,
106    };
107    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
108    use parquet::arrow::AsyncArrowWriter;
109    use parquet::basic::{Compression, Encoding, ZstdLevel};
110    use parquet::file::metadata::KeyValue;
111    use parquet::file::properties::WriterProperties;
112    use store_api::region_request::PathType;
113    use table::predicate::Predicate;
114    use tokio_util::compat::FuturesAsyncWriteCompatExt;
115
116    use super::*;
117    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
118    use crate::cache::{CacheManager, CacheStrategy, PageKey};
119    use crate::config::IndexConfig;
120    use crate::read::{BatchBuilder, BatchReader, FlatSource};
121    use crate::region::options::{IndexOptions, InvertedIndexOptions};
122    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
123    use crate::sst::file_purger::NoopFilePurger;
124    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
125    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
126    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
127    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
128    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
129    use crate::sst::parquet::writer::ParquetWriter;
130    use crate::sst::{
131        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
132    };
133    use crate::test_util::sst_util::{
134        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
135        new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
136        sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
137    };
138    use crate::test_util::{TestEnv, check_reader_result};
139
140    const FILE_DIR: &str = "/";
141
142    #[derive(Clone)]
143    struct FixedPathProvider {
144        region_file_id: RegionFileId,
145    }
146
147    impl FilePathProvider for FixedPathProvider {
148        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
149            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
150        }
151
152        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
153            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
154        }
155
156        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
157            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
158        }
159    }
160
161    struct NoopIndexBuilder;
162
163    #[async_trait::async_trait]
164    impl IndexerBuilder for NoopIndexBuilder {
165        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
166            Indexer::default()
167        }
168    }
169
170    #[tokio::test]
171    async fn test_write_read() {
172        let mut env = TestEnv::new().await;
173        let object_store = env.init_object_store_manager();
174        let handle = sst_file_handle(0, 1000);
175        let file_path = FixedPathProvider {
176            region_file_id: handle.file_id(),
177        };
178        let metadata = Arc::new(sst_region_metadata());
179        let source = new_source(&[
180            new_batch_by_range(&["a", "d"], 0, 60),
181            new_batch_by_range(&["b", "f"], 0, 40),
182            new_batch_by_range(&["b", "h"], 100, 200),
183        ]);
184        // Use a small row group size for test.
185        let write_opts = WriteOptions {
186            row_group_size: 50,
187            ..Default::default()
188        };
189
190        let mut metrics = Metrics::new(WriteType::Flush);
191        let mut writer = ParquetWriter::new_with_object_store(
192            object_store.clone(),
193            metadata.clone(),
194            IndexConfig::default(),
195            NoopIndexBuilder,
196            file_path,
197            &mut metrics,
198        )
199        .await;
200
201        let info = writer
202            .write_all(source, None, &write_opts)
203            .await
204            .unwrap()
205            .remove(0);
206        assert_eq!(200, info.num_rows);
207        assert!(info.file_size > 0);
208        assert_eq!(
209            (
210                Timestamp::new_millisecond(0),
211                Timestamp::new_millisecond(199)
212            ),
213            info.time_range
214        );
215
216        let builder = ParquetReaderBuilder::new(
217            FILE_DIR.to_string(),
218            PathType::Bare,
219            handle.clone(),
220            object_store,
221        );
222        let mut reader = builder.build().await.unwrap();
223        check_reader_result(
224            &mut reader,
225            &[
226                new_batch_by_range(&["a", "d"], 0, 50),
227                new_batch_by_range(&["a", "d"], 50, 60),
228                new_batch_by_range(&["b", "f"], 0, 40),
229                new_batch_by_range(&["b", "h"], 100, 150),
230                new_batch_by_range(&["b", "h"], 150, 200),
231            ],
232        )
233        .await;
234    }
235
236    #[tokio::test]
237    async fn test_read_with_cache() {
238        let mut env = TestEnv::new().await;
239        let object_store = env.init_object_store_manager();
240        let handle = sst_file_handle(0, 1000);
241        let metadata = Arc::new(sst_region_metadata());
242        let source = new_source(&[
243            new_batch_by_range(&["a", "d"], 0, 60),
244            new_batch_by_range(&["b", "f"], 0, 40),
245            new_batch_by_range(&["b", "h"], 100, 200),
246        ]);
247        // Use a small row group size for test.
248        let write_opts = WriteOptions {
249            row_group_size: 50,
250            ..Default::default()
251        };
252        // Prepare data.
253        let mut metrics = Metrics::new(WriteType::Flush);
254        let mut writer = ParquetWriter::new_with_object_store(
255            object_store.clone(),
256            metadata.clone(),
257            IndexConfig::default(),
258            NoopIndexBuilder,
259            FixedPathProvider {
260                region_file_id: handle.file_id(),
261            },
262            &mut metrics,
263        )
264        .await;
265
266        let sst_info = writer
267            .write_all(source, None, &write_opts)
268            .await
269            .unwrap()
270            .remove(0);
271
272        // Enable page cache.
273        let cache = CacheStrategy::EnableAll(Arc::new(
274            CacheManager::builder()
275                .page_cache_size(64 * 1024 * 1024)
276                .build(),
277        ));
278        let builder = ParquetReaderBuilder::new(
279            FILE_DIR.to_string(),
280            PathType::Bare,
281            handle.clone(),
282            object_store,
283        )
284        .cache(cache.clone());
285        for _ in 0..3 {
286            let mut reader = builder.build().await.unwrap();
287            check_reader_result(
288                &mut reader,
289                &[
290                    new_batch_by_range(&["a", "d"], 0, 50),
291                    new_batch_by_range(&["a", "d"], 50, 60),
292                    new_batch_by_range(&["b", "f"], 0, 40),
293                    new_batch_by_range(&["b", "h"], 100, 150),
294                    new_batch_by_range(&["b", "h"], 150, 200),
295                ],
296            )
297            .await;
298        }
299
300        let parquet_meta = sst_info.file_metadata.unwrap();
301        let get_ranges = |row_group_idx: usize| {
302            let row_group = parquet_meta.row_group(row_group_idx);
303            let mut ranges = Vec::with_capacity(row_group.num_columns());
304            for i in 0..row_group.num_columns() {
305                let (start, length) = row_group.column(i).byte_range();
306                ranges.push(start..start + length);
307            }
308
309            ranges
310        };
311
312        // Cache 4 row groups.
313        for i in 0..4 {
314            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
315            assert!(cache.get_pages(&page_key).is_some());
316        }
317        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
318        assert!(cache.get_pages(&page_key).is_none());
319    }
320
321    #[tokio::test]
322    async fn test_parquet_metadata_eq() {
323        // create test env
324        let mut env = crate::test_util::TestEnv::new().await;
325        let object_store = env.init_object_store_manager();
326        let handle = sst_file_handle(0, 1000);
327        let metadata = Arc::new(sst_region_metadata());
328        let source = new_source(&[
329            new_batch_by_range(&["a", "d"], 0, 60),
330            new_batch_by_range(&["b", "f"], 0, 40),
331            new_batch_by_range(&["b", "h"], 100, 200),
332        ]);
333        let write_opts = WriteOptions {
334            row_group_size: 50,
335            ..Default::default()
336        };
337
338        // write the sst file and get sst info
339        // sst info contains the parquet metadata, which is converted from FileMetaData
340        let mut metrics = Metrics::new(WriteType::Flush);
341        let mut writer = ParquetWriter::new_with_object_store(
342            object_store.clone(),
343            metadata.clone(),
344            IndexConfig::default(),
345            NoopIndexBuilder,
346            FixedPathProvider {
347                region_file_id: handle.file_id(),
348            },
349            &mut metrics,
350        )
351        .await;
352
353        let sst_info = writer
354            .write_all(source, None, &write_opts)
355            .await
356            .unwrap()
357            .remove(0);
358        let writer_metadata = sst_info.file_metadata.unwrap();
359
360        // read the sst file metadata
361        let builder = ParquetReaderBuilder::new(
362            FILE_DIR.to_string(),
363            PathType::Bare,
364            handle.clone(),
365            object_store,
366        );
367        let reader = builder.build().await.unwrap();
368        let reader_metadata = reader.parquet_metadata();
369
370        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
371    }
372
373    #[tokio::test]
374    async fn test_read_with_tag_filter() {
375        let mut env = TestEnv::new().await;
376        let object_store = env.init_object_store_manager();
377        let handle = sst_file_handle(0, 1000);
378        let metadata = Arc::new(sst_region_metadata());
379        let source = new_source(&[
380            new_batch_by_range(&["a", "d"], 0, 60),
381            new_batch_by_range(&["b", "f"], 0, 40),
382            new_batch_by_range(&["b", "h"], 100, 200),
383        ]);
384        // Use a small row group size for test.
385        let write_opts = WriteOptions {
386            row_group_size: 50,
387            ..Default::default()
388        };
389        // Prepare data.
390        let mut metrics = Metrics::new(WriteType::Flush);
391        let mut writer = ParquetWriter::new_with_object_store(
392            object_store.clone(),
393            metadata.clone(),
394            IndexConfig::default(),
395            NoopIndexBuilder,
396            FixedPathProvider {
397                region_file_id: handle.file_id(),
398            },
399            &mut metrics,
400        )
401        .await;
402        writer
403            .write_all(source, None, &write_opts)
404            .await
405            .unwrap()
406            .remove(0);
407
408        // Predicate
409        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
410            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
411            op: Operator::Eq,
412            right: Box::new("a".lit()),
413        })]));
414
415        let builder = ParquetReaderBuilder::new(
416            FILE_DIR.to_string(),
417            PathType::Bare,
418            handle.clone(),
419            object_store,
420        )
421        .predicate(predicate);
422        let mut reader = builder.build().await.unwrap();
423        check_reader_result(
424            &mut reader,
425            &[
426                new_batch_by_range(&["a", "d"], 0, 50),
427                new_batch_by_range(&["a", "d"], 50, 60),
428            ],
429        )
430        .await;
431    }
432
433    #[tokio::test]
434    async fn test_read_empty_batch() {
435        let mut env = TestEnv::new().await;
436        let object_store = env.init_object_store_manager();
437        let handle = sst_file_handle(0, 1000);
438        let metadata = Arc::new(sst_region_metadata());
439        let source = new_source(&[
440            new_batch_by_range(&["a", "z"], 0, 0),
441            new_batch_by_range(&["a", "z"], 100, 100),
442            new_batch_by_range(&["a", "z"], 200, 230),
443        ]);
444        // Use a small row group size for test.
445        let write_opts = WriteOptions {
446            row_group_size: 50,
447            ..Default::default()
448        };
449        // Prepare data.
450        let mut metrics = Metrics::new(WriteType::Flush);
451        let mut writer = ParquetWriter::new_with_object_store(
452            object_store.clone(),
453            metadata.clone(),
454            IndexConfig::default(),
455            NoopIndexBuilder,
456            FixedPathProvider {
457                region_file_id: handle.file_id(),
458            },
459            &mut metrics,
460        )
461        .await;
462        writer
463            .write_all(source, None, &write_opts)
464            .await
465            .unwrap()
466            .remove(0);
467
468        let builder = ParquetReaderBuilder::new(
469            FILE_DIR.to_string(),
470            PathType::Bare,
471            handle.clone(),
472            object_store,
473        );
474        let mut reader = builder.build().await.unwrap();
475        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
476    }
477
478    #[tokio::test]
479    async fn test_read_with_field_filter() {
480        let mut env = TestEnv::new().await;
481        let object_store = env.init_object_store_manager();
482        let handle = sst_file_handle(0, 1000);
483        let metadata = Arc::new(sst_region_metadata());
484        let source = new_source(&[
485            new_batch_by_range(&["a", "d"], 0, 60),
486            new_batch_by_range(&["b", "f"], 0, 40),
487            new_batch_by_range(&["b", "h"], 100, 200),
488        ]);
489        // Use a small row group size for test.
490        let write_opts = WriteOptions {
491            row_group_size: 50,
492            ..Default::default()
493        };
494        // Prepare data.
495        let mut metrics = Metrics::new(WriteType::Flush);
496        let mut writer = ParquetWriter::new_with_object_store(
497            object_store.clone(),
498            metadata.clone(),
499            IndexConfig::default(),
500            NoopIndexBuilder,
501            FixedPathProvider {
502                region_file_id: handle.file_id(),
503            },
504            &mut metrics,
505        )
506        .await;
507
508        writer
509            .write_all(source, None, &write_opts)
510            .await
511            .unwrap()
512            .remove(0);
513
514        // Predicate
515        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
516            left: Box::new(Expr::Column(Column::from_name("field_0"))),
517            op: Operator::GtEq,
518            right: Box::new(150u64.lit()),
519        })]));
520
521        let builder = ParquetReaderBuilder::new(
522            FILE_DIR.to_string(),
523            PathType::Bare,
524            handle.clone(),
525            object_store,
526        )
527        .predicate(predicate);
528        let mut reader = builder.build().await.unwrap();
529        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
530    }
531
532    #[tokio::test]
533    async fn test_read_large_binary() {
534        let mut env = TestEnv::new().await;
535        let object_store = env.init_object_store_manager();
536        let handle = sst_file_handle(0, 1000);
537        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
538
539        let write_opts = WriteOptions {
540            row_group_size: 50,
541            ..Default::default()
542        };
543
544        let metadata = build_test_binary_test_region_metadata();
545        let json = metadata.to_json().unwrap();
546        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
547
548        let props_builder = WriterProperties::builder()
549            .set_key_value_metadata(Some(vec![key_value_meta]))
550            .set_compression(Compression::ZSTD(ZstdLevel::default()))
551            .set_encoding(Encoding::PLAIN)
552            .set_max_row_group_size(write_opts.row_group_size);
553
554        let writer_props = props_builder.build();
555
556        let write_format = PrimaryKeyWriteFormat::new(metadata);
557        let fields: Vec<_> = write_format
558            .arrow_schema()
559            .fields()
560            .into_iter()
561            .map(|field| {
562                let data_type = field.data_type().clone();
563                if data_type == DataType::Binary {
564                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
565                } else {
566                    Field::new(field.name(), data_type, field.is_nullable())
567                }
568            })
569            .collect();
570
571        let arrow_schema = Arc::new(Schema::new(fields));
572
573        // Ensures field_0 has LargeBinary type.
574        assert_eq!(
575            &DataType::LargeBinary,
576            arrow_schema.field_with_name("field_0").unwrap().data_type()
577        );
578        let mut writer = AsyncArrowWriter::try_new(
579            object_store
580                .writer_with(&file_path)
581                .concurrent(DEFAULT_WRITE_CONCURRENCY)
582                .await
583                .map(|w| w.into_futures_async_write().compat_write())
584                .unwrap(),
585            arrow_schema.clone(),
586            Some(writer_props),
587        )
588        .unwrap();
589
590        let batch = new_batch_with_binary(&["a"], 0, 60);
591        let arrow_batch = write_format.convert_batch(&batch).unwrap();
592        let arrays: Vec<_> = arrow_batch
593            .columns()
594            .iter()
595            .map(|array| {
596                let data_type = array.data_type().clone();
597                if data_type == DataType::Binary {
598                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
599                } else {
600                    array.clone()
601                }
602            })
603            .collect();
604        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
605
606        writer.write(&result).await.unwrap();
607        writer.close().await.unwrap();
608
609        let builder = ParquetReaderBuilder::new(
610            FILE_DIR.to_string(),
611            PathType::Bare,
612            handle.clone(),
613            object_store,
614        );
615        let mut reader = builder.build().await.unwrap();
616        check_reader_result(
617            &mut reader,
618            &[
619                new_batch_with_binary(&["a"], 0, 50),
620                new_batch_with_binary(&["a"], 50, 60),
621            ],
622        )
623        .await;
624    }
625
626    #[tokio::test]
627    async fn test_write_multiple_files() {
628        common_telemetry::init_default_ut_logging();
629        // create test env
630        let mut env = TestEnv::new().await;
631        let object_store = env.init_object_store_manager();
632        let metadata = Arc::new(sst_region_metadata());
633        let batches = &[
634            new_batch_by_range(&["a", "d"], 0, 1000),
635            new_batch_by_range(&["b", "f"], 0, 1000),
636            new_batch_by_range(&["c", "g"], 0, 1000),
637            new_batch_by_range(&["b", "h"], 100, 200),
638            new_batch_by_range(&["b", "h"], 200, 300),
639            new_batch_by_range(&["b", "h"], 300, 1000),
640        ];
641        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
642
643        let source = new_source(batches);
644        let write_opts = WriteOptions {
645            row_group_size: 50,
646            max_file_size: Some(1024 * 16),
647            ..Default::default()
648        };
649
650        let path_provider = RegionFilePathFactory {
651            table_dir: "test".to_string(),
652            path_type: PathType::Bare,
653        };
654        let mut metrics = Metrics::new(WriteType::Flush);
655        let mut writer = ParquetWriter::new_with_object_store(
656            object_store.clone(),
657            metadata.clone(),
658            IndexConfig::default(),
659            NoopIndexBuilder,
660            path_provider,
661            &mut metrics,
662        )
663        .await;
664
665        let files = writer.write_all(source, None, &write_opts).await.unwrap();
666        assert_eq!(2, files.len());
667
668        let mut rows_read = 0;
669        for f in &files {
670            let file_handle = sst_file_handle_with_file_id(
671                f.file_id,
672                f.time_range.0.value(),
673                f.time_range.1.value(),
674            );
675            let builder = ParquetReaderBuilder::new(
676                "test".to_string(),
677                PathType::Bare,
678                file_handle,
679                object_store.clone(),
680            );
681            let mut reader = builder.build().await.unwrap();
682            while let Some(batch) = reader.next_batch().await.unwrap() {
683                rows_read += batch.num_rows();
684            }
685        }
686        assert_eq!(total_rows, rows_read);
687    }
688
689    #[tokio::test]
690    async fn test_write_read_with_index() {
691        let mut env = TestEnv::new().await;
692        let object_store = env.init_object_store_manager();
693        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
694        let metadata = Arc::new(sst_region_metadata());
695        let row_group_size = 50;
696
697        let source = new_source(&[
698            new_batch_by_range(&["a", "d"], 0, 20),
699            new_batch_by_range(&["b", "d"], 0, 20),
700            new_batch_by_range(&["c", "d"], 0, 20),
701            new_batch_by_range(&["c", "f"], 0, 40),
702            new_batch_by_range(&["c", "h"], 100, 200),
703        ]);
704        // Use a small row group size for test.
705        let write_opts = WriteOptions {
706            row_group_size,
707            ..Default::default()
708        };
709
710        let puffin_manager = env
711            .get_puffin_manager()
712            .build(object_store.clone(), file_path.clone());
713        let intermediate_manager = env.get_intermediate_manager();
714
715        let indexer_builder = IndexerBuilderImpl {
716            build_type: IndexBuildType::Flush,
717            metadata: metadata.clone(),
718            row_group_size,
719            puffin_manager,
720            write_cache_enabled: false,
721            intermediate_manager,
722            index_options: IndexOptions {
723                inverted_index: InvertedIndexOptions {
724                    segment_row_count: 1,
725                    ..Default::default()
726                },
727            },
728            inverted_index_config: Default::default(),
729            fulltext_index_config: Default::default(),
730            bloom_filter_index_config: Default::default(),
731        };
732
733        let mut metrics = Metrics::new(WriteType::Flush);
734        let mut writer = ParquetWriter::new_with_object_store(
735            object_store.clone(),
736            metadata.clone(),
737            IndexConfig::default(),
738            indexer_builder,
739            file_path.clone(),
740            &mut metrics,
741        )
742        .await;
743
744        let info = writer
745            .write_all(source, None, &write_opts)
746            .await
747            .unwrap()
748            .remove(0);
749        assert_eq!(200, info.num_rows);
750        assert!(info.file_size > 0);
751        assert!(info.index_metadata.file_size > 0);
752
753        assert!(info.index_metadata.inverted_index.index_size > 0);
754        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
755        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
756
757        assert!(info.index_metadata.bloom_filter.index_size > 0);
758        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
759        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
760
761        assert_eq!(
762            (
763                Timestamp::new_millisecond(0),
764                Timestamp::new_millisecond(199)
765            ),
766            info.time_range
767        );
768
769        let handle = FileHandle::new(
770            FileMeta {
771                region_id: metadata.region_id,
772                file_id: info.file_id,
773                time_range: info.time_range,
774                level: 0,
775                file_size: info.file_size,
776                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
777                available_indexes: info.index_metadata.build_available_indexes(),
778                indexes: info.index_metadata.build_indexes(),
779                index_file_size: info.index_metadata.file_size,
780                index_version: 0,
781                num_row_groups: info.num_row_groups,
782                num_rows: info.num_rows as u64,
783                sequence: None,
784                partition_expr: match &metadata.partition_expr {
785                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
786                        .expect("partition expression should be valid JSON"),
787                    None => None,
788                },
789                num_series: 0,
790            },
791            Arc::new(NoopFilePurger),
792        );
793
794        let cache = Arc::new(
795            CacheManager::builder()
796                .index_result_cache_size(1024 * 1024)
797                .index_metadata_size(1024 * 1024)
798                .index_content_page_size(1024 * 1024)
799                .index_content_size(1024 * 1024)
800                .puffin_metadata_size(1024 * 1024)
801                .build(),
802        );
803        let index_result_cache = cache.index_result_cache().unwrap();
804
805        let build_inverted_index_applier = |exprs: &[Expr]| {
806            InvertedIndexApplierBuilder::new(
807                FILE_DIR.to_string(),
808                PathType::Bare,
809                object_store.clone(),
810                &metadata,
811                HashSet::from_iter([0]),
812                env.get_puffin_manager(),
813            )
814            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
815            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
816            .build(exprs)
817            .unwrap()
818            .map(Arc::new)
819        };
820
821        let build_bloom_filter_applier = |exprs: &[Expr]| {
822            BloomFilterIndexApplierBuilder::new(
823                FILE_DIR.to_string(),
824                PathType::Bare,
825                object_store.clone(),
826                &metadata,
827                env.get_puffin_manager(),
828            )
829            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
830            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
831            .build(exprs)
832            .unwrap()
833            .map(Arc::new)
834        };
835
836        // Data: ts tag_0 tag_1
837        // Data: 0-20 [a, d]
838        //       0-20 [b, d]
839        //       0-20 [c, d]
840        //       0-40 [c, f]
841        //    100-200 [c, h]
842        //
843        // Pred: tag_0 = "b"
844        //
845        // Row groups & rows pruning:
846        //
847        // Row Groups:
848        // - min-max: filter out row groups 1..=3
849        //
850        // Rows:
851        // - inverted index: hit row group 0, hit 20 rows
852        let preds = vec![col("tag_0").eq(lit("b"))];
853        let inverted_index_applier = build_inverted_index_applier(&preds);
854        let bloom_filter_applier = build_bloom_filter_applier(&preds);
855
856        let builder = ParquetReaderBuilder::new(
857            FILE_DIR.to_string(),
858            PathType::Bare,
859            handle.clone(),
860            object_store.clone(),
861        )
862        .predicate(Some(Predicate::new(preds)))
863        .inverted_index_appliers([inverted_index_applier.clone(), None])
864        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
865        .cache(CacheStrategy::EnableAll(cache.clone()));
866
867        let mut metrics = ReaderMetrics::default();
868        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
869        let mut reader = ParquetReader::new(Arc::new(context), selection)
870            .await
871            .unwrap();
872        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
873
874        assert_eq!(metrics.filter_metrics.rg_total, 4);
875        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
876        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
877        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
878        let cached = index_result_cache
879            .get(
880                inverted_index_applier.unwrap().predicate_key(),
881                handle.file_id().file_id(),
882            )
883            .unwrap();
884        // inverted index will search all row groups
885        assert!(cached.contains_row_group(0));
886        assert!(cached.contains_row_group(1));
887        assert!(cached.contains_row_group(2));
888        assert!(cached.contains_row_group(3));
889
890        // Data: ts tag_0 tag_1
891        // Data: 0-20 [a, d]
892        //       0-20 [b, d]
893        //       0-20 [c, d]
894        //       0-40 [c, f]
895        //    100-200 [c, h]
896        //
897        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
898        //
899        // Row groups & rows pruning:
900        //
901        // Row Groups:
902        // - min-max: filter out row groups 0..=1
903        // - bloom filter: filter out row groups 2..=3
904        let preds = vec![
905            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
906            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
907            col("tag_1").eq(lit("d")),
908        ];
909        let inverted_index_applier = build_inverted_index_applier(&preds);
910        let bloom_filter_applier = build_bloom_filter_applier(&preds);
911
912        let builder = ParquetReaderBuilder::new(
913            FILE_DIR.to_string(),
914            PathType::Bare,
915            handle.clone(),
916            object_store.clone(),
917        )
918        .predicate(Some(Predicate::new(preds)))
919        .inverted_index_appliers([inverted_index_applier.clone(), None])
920        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
921        .cache(CacheStrategy::EnableAll(cache.clone()));
922
923        let mut metrics = ReaderMetrics::default();
924        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
925        let mut reader = ParquetReader::new(Arc::new(context), selection)
926            .await
927            .unwrap();
928        check_reader_result(&mut reader, &[]).await;
929
930        assert_eq!(metrics.filter_metrics.rg_total, 4);
931        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
932        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
933        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
934        let cached = index_result_cache
935            .get(
936                bloom_filter_applier.unwrap().predicate_key(),
937                handle.file_id().file_id(),
938            )
939            .unwrap();
940        assert!(cached.contains_row_group(2));
941        assert!(cached.contains_row_group(3));
942        assert!(!cached.contains_row_group(0));
943        assert!(!cached.contains_row_group(1));
944
945        // Remove the pred of `ts`, continue to use the pred of `tag_1`
946        // to test if cache works.
947
948        // Data: ts tag_0 tag_1
949        // Data: 0-20 [a, d]
950        //       0-20 [b, d]
951        //       0-20 [c, d]
952        //       0-40 [c, f]
953        //    100-200 [c, h]
954        //
955        // Pred: tag_1 = "d"
956        //
957        // Row groups & rows pruning:
958        //
959        // Row Groups:
960        // - bloom filter: filter out row groups 2..=3
961        //
962        // Rows:
963        // - bloom filter: hit row group 0, hit 50 rows
964        //                 hit row group 1, hit 10 rows
965        let preds = vec![col("tag_1").eq(lit("d"))];
966        let inverted_index_applier = build_inverted_index_applier(&preds);
967        let bloom_filter_applier = build_bloom_filter_applier(&preds);
968
969        let builder = ParquetReaderBuilder::new(
970            FILE_DIR.to_string(),
971            PathType::Bare,
972            handle.clone(),
973            object_store.clone(),
974        )
975        .predicate(Some(Predicate::new(preds)))
976        .inverted_index_appliers([inverted_index_applier.clone(), None])
977        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
978        .cache(CacheStrategy::EnableAll(cache.clone()));
979
980        let mut metrics = ReaderMetrics::default();
981        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
982        let mut reader = ParquetReader::new(Arc::new(context), selection)
983            .await
984            .unwrap();
985        check_reader_result(
986            &mut reader,
987            &[
988                new_batch_by_range(&["a", "d"], 0, 20),
989                new_batch_by_range(&["b", "d"], 0, 20),
990                new_batch_by_range(&["c", "d"], 0, 10),
991                new_batch_by_range(&["c", "d"], 10, 20),
992            ],
993        )
994        .await;
995
996        assert_eq!(metrics.filter_metrics.rg_total, 4);
997        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
998        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
999        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1000        let cached = index_result_cache
1001            .get(
1002                bloom_filter_applier.unwrap().predicate_key(),
1003                handle.file_id().file_id(),
1004            )
1005            .unwrap();
1006        assert!(cached.contains_row_group(0));
1007        assert!(cached.contains_row_group(1));
1008        assert!(cached.contains_row_group(2));
1009        assert!(cached.contains_row_group(3));
1010    }
1011
1012    /// Creates a flat format RecordBatch for testing.
1013    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
1014    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1015        assert!(end >= start);
1016        let metadata = Arc::new(sst_region_metadata());
1017        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1018
1019        let num_rows = end - start;
1020        let mut columns = Vec::new();
1021
1022        // Add primary key columns (tag_0, tag_1) as dictionary arrays
1023        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1024        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1025
1026        for _ in 0..num_rows {
1027            tag_0_builder.append_value(tags[0]);
1028            tag_1_builder.append_value(tags[1]);
1029        }
1030
1031        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1032        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1033
1034        // Add field column (field_0)
1035        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1036        columns.push(Arc::new(UInt64Array::from(field_values)));
1037
1038        // Add time index column (ts)
1039        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1040        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1041
1042        // Add encoded primary key column
1043        let pk = new_primary_key(tags);
1044        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1045        for _ in 0..num_rows {
1046            pk_builder.append(&pk).unwrap();
1047        }
1048        columns.push(Arc::new(pk_builder.finish()));
1049
1050        // Add sequence column
1051        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1052
1053        // Add op_type column
1054        columns.push(Arc::new(UInt8Array::from_value(
1055            OpType::Put as u8,
1056            num_rows,
1057        )));
1058
1059        RecordBatch::try_new(flat_schema, columns).unwrap()
1060    }
1061
1062    /// Creates a FlatSource from flat format RecordBatches.
1063    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1064        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1065    }
1066
1067    #[tokio::test]
1068    async fn test_write_flat_with_index() {
1069        let mut env = TestEnv::new().await;
1070        let object_store = env.init_object_store_manager();
1071        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1072        let metadata = Arc::new(sst_region_metadata());
1073        let row_group_size = 50;
1074
1075        // Create flat format RecordBatches
1076        let flat_batches = vec![
1077            new_record_batch_by_range(&["a", "d"], 0, 20),
1078            new_record_batch_by_range(&["b", "d"], 0, 20),
1079            new_record_batch_by_range(&["c", "d"], 0, 20),
1080            new_record_batch_by_range(&["c", "f"], 0, 40),
1081            new_record_batch_by_range(&["c", "h"], 100, 200),
1082        ];
1083
1084        let flat_source = new_flat_source_from_record_batches(flat_batches);
1085
1086        let write_opts = WriteOptions {
1087            row_group_size,
1088            ..Default::default()
1089        };
1090
1091        let puffin_manager = env
1092            .get_puffin_manager()
1093            .build(object_store.clone(), file_path.clone());
1094        let intermediate_manager = env.get_intermediate_manager();
1095
1096        let indexer_builder = IndexerBuilderImpl {
1097            build_type: IndexBuildType::Flush,
1098            metadata: metadata.clone(),
1099            row_group_size,
1100            puffin_manager,
1101            write_cache_enabled: false,
1102            intermediate_manager,
1103            index_options: IndexOptions {
1104                inverted_index: InvertedIndexOptions {
1105                    segment_row_count: 1,
1106                    ..Default::default()
1107                },
1108            },
1109            inverted_index_config: Default::default(),
1110            fulltext_index_config: Default::default(),
1111            bloom_filter_index_config: Default::default(),
1112        };
1113
1114        let mut metrics = Metrics::new(WriteType::Flush);
1115        let mut writer = ParquetWriter::new_with_object_store(
1116            object_store.clone(),
1117            metadata.clone(),
1118            IndexConfig::default(),
1119            indexer_builder,
1120            file_path.clone(),
1121            &mut metrics,
1122        )
1123        .await;
1124
1125        let info = writer
1126            .write_all_flat(flat_source, &write_opts)
1127            .await
1128            .unwrap()
1129            .remove(0);
1130        assert_eq!(200, info.num_rows);
1131        assert!(info.file_size > 0);
1132        assert!(info.index_metadata.file_size > 0);
1133
1134        assert!(info.index_metadata.inverted_index.index_size > 0);
1135        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1136        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1137
1138        assert!(info.index_metadata.bloom_filter.index_size > 0);
1139        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1140        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1141
1142        assert_eq!(
1143            (
1144                Timestamp::new_millisecond(0),
1145                Timestamp::new_millisecond(199)
1146            ),
1147            info.time_range
1148        );
1149    }
1150
1151    #[tokio::test]
1152    async fn test_read_with_override_sequence() {
1153        let mut env = TestEnv::new().await;
1154        let object_store = env.init_object_store_manager();
1155        let handle = sst_file_handle(0, 1000);
1156        let file_path = FixedPathProvider {
1157            region_file_id: handle.file_id(),
1158        };
1159        let metadata = Arc::new(sst_region_metadata());
1160
1161        // Create batches with sequence 0 to trigger override functionality
1162        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1163        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1164        let source = new_source(&[batch1, batch2]);
1165
1166        let write_opts = WriteOptions {
1167            row_group_size: 50,
1168            ..Default::default()
1169        };
1170
1171        let mut metrics = Metrics::new(WriteType::Flush);
1172        let mut writer = ParquetWriter::new_with_object_store(
1173            object_store.clone(),
1174            metadata.clone(),
1175            IndexConfig::default(),
1176            NoopIndexBuilder,
1177            file_path,
1178            &mut metrics,
1179        )
1180        .await;
1181
1182        writer
1183            .write_all(source, None, &write_opts)
1184            .await
1185            .unwrap()
1186            .remove(0);
1187
1188        // Read without override sequence (should read sequence 0)
1189        let builder = ParquetReaderBuilder::new(
1190            FILE_DIR.to_string(),
1191            PathType::Bare,
1192            handle.clone(),
1193            object_store.clone(),
1194        );
1195        let mut reader = builder.build().await.unwrap();
1196        let mut normal_batches = Vec::new();
1197        while let Some(batch) = reader.next_batch().await.unwrap() {
1198            normal_batches.push(batch);
1199        }
1200
1201        // Read with override sequence using FileMeta.sequence
1202        let custom_sequence = 12345u64;
1203        let file_meta = handle.meta_ref();
1204        let mut override_file_meta = file_meta.clone();
1205        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1206        let override_handle = FileHandle::new(
1207            override_file_meta,
1208            Arc::new(crate::sst::file_purger::NoopFilePurger),
1209        );
1210
1211        let builder = ParquetReaderBuilder::new(
1212            FILE_DIR.to_string(),
1213            PathType::Bare,
1214            override_handle,
1215            object_store.clone(),
1216        );
1217        let mut reader = builder.build().await.unwrap();
1218        let mut override_batches = Vec::new();
1219        while let Some(batch) = reader.next_batch().await.unwrap() {
1220            override_batches.push(batch);
1221        }
1222
1223        // Compare the results
1224        assert_eq!(normal_batches.len(), override_batches.len());
1225        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1226            // Create expected batch with override sequence
1227            let expected_batch = {
1228                let num_rows = normal.num_rows();
1229                let mut builder = BatchBuilder::from(normal);
1230                builder
1231                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1232                    .unwrap();
1233
1234                builder.build().unwrap()
1235            };
1236
1237            // Override batch should match expected batch
1238            assert_eq!(*override_batch, expected_batch);
1239        }
1240    }
1241}