mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Number of rows.
80    pub num_rows: usize,
81    /// Number of row groups
82    pub num_row_groups: u64,
83    /// File Meta Data
84    pub file_metadata: Option<Arc<ParquetMetaData>>,
85    /// Index Meta Data
86    pub index_metadata: IndexOutput,
87    /// Number of series
88    pub num_series: u64,
89}
90
91#[cfg(test)]
92mod tests {
93    use std::collections::HashSet;
94    use std::sync::Arc;
95
96    use api::v1::OpType;
97    use common_time::Timestamp;
98    use datafusion_common::{Column, ScalarValue};
99    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
100    use datatypes::arrow;
101    use datatypes::arrow::array::{
102        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
103        TimestampMillisecondArray, UInt8Array, UInt64Array,
104    };
105    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
106    use parquet::arrow::AsyncArrowWriter;
107    use parquet::basic::{Compression, Encoding, ZstdLevel};
108    use parquet::file::metadata::KeyValue;
109    use parquet::file::properties::WriterProperties;
110    use store_api::region_request::PathType;
111    use table::predicate::Predicate;
112    use tokio_util::compat::FuturesAsyncWriteCompatExt;
113
114    use super::*;
115    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
116    use crate::cache::{CacheManager, CacheStrategy, PageKey};
117    use crate::config::IndexConfig;
118    use crate::read::{BatchBuilder, BatchReader, FlatSource};
119    use crate::region::options::{IndexOptions, InvertedIndexOptions};
120    use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
121    use crate::sst::file_purger::NoopFilePurger;
122    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
123    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
124    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
125    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
126    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
127    use crate::sst::parquet::writer::ParquetWriter;
128    use crate::sst::{
129        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
130    };
131    use crate::test_util::sst_util::{
132        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
133        new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
134        sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
135    };
136    use crate::test_util::{TestEnv, check_reader_result};
137
138    const FILE_DIR: &str = "/";
139
140    #[derive(Clone)]
141    struct FixedPathProvider {
142        region_file_id: RegionFileId,
143    }
144
145    impl FilePathProvider for FixedPathProvider {
146        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
147            location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
148        }
149
150        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
151            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
152        }
153    }
154
155    struct NoopIndexBuilder;
156
157    #[async_trait::async_trait]
158    impl IndexerBuilder for NoopIndexBuilder {
159        async fn build(&self, _file_id: FileId) -> Indexer {
160            Indexer::default()
161        }
162    }
163
164    #[tokio::test]
165    async fn test_write_read() {
166        let mut env = TestEnv::new().await;
167        let object_store = env.init_object_store_manager();
168        let handle = sst_file_handle(0, 1000);
169        let file_path = FixedPathProvider {
170            region_file_id: handle.file_id(),
171        };
172        let metadata = Arc::new(sst_region_metadata());
173        let source = new_source(&[
174            new_batch_by_range(&["a", "d"], 0, 60),
175            new_batch_by_range(&["b", "f"], 0, 40),
176            new_batch_by_range(&["b", "h"], 100, 200),
177        ]);
178        // Use a small row group size for test.
179        let write_opts = WriteOptions {
180            row_group_size: 50,
181            ..Default::default()
182        };
183
184        let mut metrics = Metrics::new(WriteType::Flush);
185        let mut writer = ParquetWriter::new_with_object_store(
186            object_store.clone(),
187            metadata.clone(),
188            IndexConfig::default(),
189            NoopIndexBuilder,
190            file_path,
191            &mut metrics,
192        )
193        .await;
194
195        let info = writer
196            .write_all(source, None, &write_opts)
197            .await
198            .unwrap()
199            .remove(0);
200        assert_eq!(200, info.num_rows);
201        assert!(info.file_size > 0);
202        assert_eq!(
203            (
204                Timestamp::new_millisecond(0),
205                Timestamp::new_millisecond(199)
206            ),
207            info.time_range
208        );
209
210        let builder = ParquetReaderBuilder::new(
211            FILE_DIR.to_string(),
212            PathType::Bare,
213            handle.clone(),
214            object_store,
215        );
216        let mut reader = builder.build().await.unwrap();
217        check_reader_result(
218            &mut reader,
219            &[
220                new_batch_by_range(&["a", "d"], 0, 50),
221                new_batch_by_range(&["a", "d"], 50, 60),
222                new_batch_by_range(&["b", "f"], 0, 40),
223                new_batch_by_range(&["b", "h"], 100, 150),
224                new_batch_by_range(&["b", "h"], 150, 200),
225            ],
226        )
227        .await;
228    }
229
230    #[tokio::test]
231    async fn test_read_with_cache() {
232        let mut env = TestEnv::new().await;
233        let object_store = env.init_object_store_manager();
234        let handle = sst_file_handle(0, 1000);
235        let metadata = Arc::new(sst_region_metadata());
236        let source = new_source(&[
237            new_batch_by_range(&["a", "d"], 0, 60),
238            new_batch_by_range(&["b", "f"], 0, 40),
239            new_batch_by_range(&["b", "h"], 100, 200),
240        ]);
241        // Use a small row group size for test.
242        let write_opts = WriteOptions {
243            row_group_size: 50,
244            ..Default::default()
245        };
246        // Prepare data.
247        let mut metrics = Metrics::new(WriteType::Flush);
248        let mut writer = ParquetWriter::new_with_object_store(
249            object_store.clone(),
250            metadata.clone(),
251            IndexConfig::default(),
252            NoopIndexBuilder,
253            FixedPathProvider {
254                region_file_id: handle.file_id(),
255            },
256            &mut metrics,
257        )
258        .await;
259
260        let sst_info = writer
261            .write_all(source, None, &write_opts)
262            .await
263            .unwrap()
264            .remove(0);
265
266        // Enable page cache.
267        let cache = CacheStrategy::EnableAll(Arc::new(
268            CacheManager::builder()
269                .page_cache_size(64 * 1024 * 1024)
270                .build(),
271        ));
272        let builder = ParquetReaderBuilder::new(
273            FILE_DIR.to_string(),
274            PathType::Bare,
275            handle.clone(),
276            object_store,
277        )
278        .cache(cache.clone());
279        for _ in 0..3 {
280            let mut reader = builder.build().await.unwrap();
281            check_reader_result(
282                &mut reader,
283                &[
284                    new_batch_by_range(&["a", "d"], 0, 50),
285                    new_batch_by_range(&["a", "d"], 50, 60),
286                    new_batch_by_range(&["b", "f"], 0, 40),
287                    new_batch_by_range(&["b", "h"], 100, 150),
288                    new_batch_by_range(&["b", "h"], 150, 200),
289                ],
290            )
291            .await;
292        }
293
294        let parquet_meta = sst_info.file_metadata.unwrap();
295        let get_ranges = |row_group_idx: usize| {
296            let row_group = parquet_meta.row_group(row_group_idx);
297            let mut ranges = Vec::with_capacity(row_group.num_columns());
298            for i in 0..row_group.num_columns() {
299                let (start, length) = row_group.column(i).byte_range();
300                ranges.push(start..start + length);
301            }
302
303            ranges
304        };
305
306        // Cache 4 row groups.
307        for i in 0..4 {
308            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
309            assert!(cache.get_pages(&page_key).is_some());
310        }
311        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
312        assert!(cache.get_pages(&page_key).is_none());
313    }
314
315    #[tokio::test]
316    async fn test_parquet_metadata_eq() {
317        // create test env
318        let mut env = crate::test_util::TestEnv::new().await;
319        let object_store = env.init_object_store_manager();
320        let handle = sst_file_handle(0, 1000);
321        let metadata = Arc::new(sst_region_metadata());
322        let source = new_source(&[
323            new_batch_by_range(&["a", "d"], 0, 60),
324            new_batch_by_range(&["b", "f"], 0, 40),
325            new_batch_by_range(&["b", "h"], 100, 200),
326        ]);
327        let write_opts = WriteOptions {
328            row_group_size: 50,
329            ..Default::default()
330        };
331
332        // write the sst file and get sst info
333        // sst info contains the parquet metadata, which is converted from FileMetaData
334        let mut metrics = Metrics::new(WriteType::Flush);
335        let mut writer = ParquetWriter::new_with_object_store(
336            object_store.clone(),
337            metadata.clone(),
338            IndexConfig::default(),
339            NoopIndexBuilder,
340            FixedPathProvider {
341                region_file_id: handle.file_id(),
342            },
343            &mut metrics,
344        )
345        .await;
346
347        let sst_info = writer
348            .write_all(source, None, &write_opts)
349            .await
350            .unwrap()
351            .remove(0);
352        let writer_metadata = sst_info.file_metadata.unwrap();
353
354        // read the sst file metadata
355        let builder = ParquetReaderBuilder::new(
356            FILE_DIR.to_string(),
357            PathType::Bare,
358            handle.clone(),
359            object_store,
360        );
361        let reader = builder.build().await.unwrap();
362        let reader_metadata = reader.parquet_metadata();
363
364        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
365    }
366
367    #[tokio::test]
368    async fn test_read_with_tag_filter() {
369        let mut env = TestEnv::new().await;
370        let object_store = env.init_object_store_manager();
371        let handle = sst_file_handle(0, 1000);
372        let metadata = Arc::new(sst_region_metadata());
373        let source = new_source(&[
374            new_batch_by_range(&["a", "d"], 0, 60),
375            new_batch_by_range(&["b", "f"], 0, 40),
376            new_batch_by_range(&["b", "h"], 100, 200),
377        ]);
378        // Use a small row group size for test.
379        let write_opts = WriteOptions {
380            row_group_size: 50,
381            ..Default::default()
382        };
383        // Prepare data.
384        let mut metrics = Metrics::new(WriteType::Flush);
385        let mut writer = ParquetWriter::new_with_object_store(
386            object_store.clone(),
387            metadata.clone(),
388            IndexConfig::default(),
389            NoopIndexBuilder,
390            FixedPathProvider {
391                region_file_id: handle.file_id(),
392            },
393            &mut metrics,
394        )
395        .await;
396        writer
397            .write_all(source, None, &write_opts)
398            .await
399            .unwrap()
400            .remove(0);
401
402        // Predicate
403        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
404            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
405            op: Operator::Eq,
406            right: Box::new("a".lit()),
407        })]));
408
409        let builder = ParquetReaderBuilder::new(
410            FILE_DIR.to_string(),
411            PathType::Bare,
412            handle.clone(),
413            object_store,
414        )
415        .predicate(predicate);
416        let mut reader = builder.build().await.unwrap();
417        check_reader_result(
418            &mut reader,
419            &[
420                new_batch_by_range(&["a", "d"], 0, 50),
421                new_batch_by_range(&["a", "d"], 50, 60),
422            ],
423        )
424        .await;
425    }
426
427    #[tokio::test]
428    async fn test_read_empty_batch() {
429        let mut env = TestEnv::new().await;
430        let object_store = env.init_object_store_manager();
431        let handle = sst_file_handle(0, 1000);
432        let metadata = Arc::new(sst_region_metadata());
433        let source = new_source(&[
434            new_batch_by_range(&["a", "z"], 0, 0),
435            new_batch_by_range(&["a", "z"], 100, 100),
436            new_batch_by_range(&["a", "z"], 200, 230),
437        ]);
438        // Use a small row group size for test.
439        let write_opts = WriteOptions {
440            row_group_size: 50,
441            ..Default::default()
442        };
443        // Prepare data.
444        let mut metrics = Metrics::new(WriteType::Flush);
445        let mut writer = ParquetWriter::new_with_object_store(
446            object_store.clone(),
447            metadata.clone(),
448            IndexConfig::default(),
449            NoopIndexBuilder,
450            FixedPathProvider {
451                region_file_id: handle.file_id(),
452            },
453            &mut metrics,
454        )
455        .await;
456        writer
457            .write_all(source, None, &write_opts)
458            .await
459            .unwrap()
460            .remove(0);
461
462        let builder = ParquetReaderBuilder::new(
463            FILE_DIR.to_string(),
464            PathType::Bare,
465            handle.clone(),
466            object_store,
467        );
468        let mut reader = builder.build().await.unwrap();
469        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
470    }
471
472    #[tokio::test]
473    async fn test_read_with_field_filter() {
474        let mut env = TestEnv::new().await;
475        let object_store = env.init_object_store_manager();
476        let handle = sst_file_handle(0, 1000);
477        let metadata = Arc::new(sst_region_metadata());
478        let source = new_source(&[
479            new_batch_by_range(&["a", "d"], 0, 60),
480            new_batch_by_range(&["b", "f"], 0, 40),
481            new_batch_by_range(&["b", "h"], 100, 200),
482        ]);
483        // Use a small row group size for test.
484        let write_opts = WriteOptions {
485            row_group_size: 50,
486            ..Default::default()
487        };
488        // Prepare data.
489        let mut metrics = Metrics::new(WriteType::Flush);
490        let mut writer = ParquetWriter::new_with_object_store(
491            object_store.clone(),
492            metadata.clone(),
493            IndexConfig::default(),
494            NoopIndexBuilder,
495            FixedPathProvider {
496                region_file_id: handle.file_id(),
497            },
498            &mut metrics,
499        )
500        .await;
501
502        writer
503            .write_all(source, None, &write_opts)
504            .await
505            .unwrap()
506            .remove(0);
507
508        // Predicate
509        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
510            left: Box::new(Expr::Column(Column::from_name("field_0"))),
511            op: Operator::GtEq,
512            right: Box::new(150u64.lit()),
513        })]));
514
515        let builder = ParquetReaderBuilder::new(
516            FILE_DIR.to_string(),
517            PathType::Bare,
518            handle.clone(),
519            object_store,
520        )
521        .predicate(predicate);
522        let mut reader = builder.build().await.unwrap();
523        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
524    }
525
526    #[tokio::test]
527    async fn test_read_large_binary() {
528        let mut env = TestEnv::new().await;
529        let object_store = env.init_object_store_manager();
530        let handle = sst_file_handle(0, 1000);
531        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
532
533        let write_opts = WriteOptions {
534            row_group_size: 50,
535            ..Default::default()
536        };
537
538        let metadata = build_test_binary_test_region_metadata();
539        let json = metadata.to_json().unwrap();
540        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
541
542        let props_builder = WriterProperties::builder()
543            .set_key_value_metadata(Some(vec![key_value_meta]))
544            .set_compression(Compression::ZSTD(ZstdLevel::default()))
545            .set_encoding(Encoding::PLAIN)
546            .set_max_row_group_size(write_opts.row_group_size);
547
548        let writer_props = props_builder.build();
549
550        let write_format = PrimaryKeyWriteFormat::new(metadata);
551        let fields: Vec<_> = write_format
552            .arrow_schema()
553            .fields()
554            .into_iter()
555            .map(|field| {
556                let data_type = field.data_type().clone();
557                if data_type == DataType::Binary {
558                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
559                } else {
560                    Field::new(field.name(), data_type, field.is_nullable())
561                }
562            })
563            .collect();
564
565        let arrow_schema = Arc::new(Schema::new(fields));
566
567        // Ensures field_0 has LargeBinary type.
568        assert_eq!(
569            &DataType::LargeBinary,
570            arrow_schema.field_with_name("field_0").unwrap().data_type()
571        );
572        let mut writer = AsyncArrowWriter::try_new(
573            object_store
574                .writer_with(&file_path)
575                .concurrent(DEFAULT_WRITE_CONCURRENCY)
576                .await
577                .map(|w| w.into_futures_async_write().compat_write())
578                .unwrap(),
579            arrow_schema.clone(),
580            Some(writer_props),
581        )
582        .unwrap();
583
584        let batch = new_batch_with_binary(&["a"], 0, 60);
585        let arrow_batch = write_format.convert_batch(&batch).unwrap();
586        let arrays: Vec<_> = arrow_batch
587            .columns()
588            .iter()
589            .map(|array| {
590                let data_type = array.data_type().clone();
591                if data_type == DataType::Binary {
592                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
593                } else {
594                    array.clone()
595                }
596            })
597            .collect();
598        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
599
600        writer.write(&result).await.unwrap();
601        writer.close().await.unwrap();
602
603        let builder = ParquetReaderBuilder::new(
604            FILE_DIR.to_string(),
605            PathType::Bare,
606            handle.clone(),
607            object_store,
608        );
609        let mut reader = builder.build().await.unwrap();
610        check_reader_result(
611            &mut reader,
612            &[
613                new_batch_with_binary(&["a"], 0, 50),
614                new_batch_with_binary(&["a"], 50, 60),
615            ],
616        )
617        .await;
618    }
619
620    #[tokio::test]
621    async fn test_write_multiple_files() {
622        common_telemetry::init_default_ut_logging();
623        // create test env
624        let mut env = TestEnv::new().await;
625        let object_store = env.init_object_store_manager();
626        let metadata = Arc::new(sst_region_metadata());
627        let batches = &[
628            new_batch_by_range(&["a", "d"], 0, 1000),
629            new_batch_by_range(&["b", "f"], 0, 1000),
630            new_batch_by_range(&["c", "g"], 0, 1000),
631            new_batch_by_range(&["b", "h"], 100, 200),
632            new_batch_by_range(&["b", "h"], 200, 300),
633            new_batch_by_range(&["b", "h"], 300, 1000),
634        ];
635        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
636
637        let source = new_source(batches);
638        let write_opts = WriteOptions {
639            row_group_size: 50,
640            max_file_size: Some(1024 * 16),
641            ..Default::default()
642        };
643
644        let path_provider = RegionFilePathFactory {
645            table_dir: "test".to_string(),
646            path_type: PathType::Bare,
647        };
648        let mut metrics = Metrics::new(WriteType::Flush);
649        let mut writer = ParquetWriter::new_with_object_store(
650            object_store.clone(),
651            metadata.clone(),
652            IndexConfig::default(),
653            NoopIndexBuilder,
654            path_provider,
655            &mut metrics,
656        )
657        .await;
658
659        let files = writer.write_all(source, None, &write_opts).await.unwrap();
660        assert_eq!(2, files.len());
661
662        let mut rows_read = 0;
663        for f in &files {
664            let file_handle = sst_file_handle_with_file_id(
665                f.file_id,
666                f.time_range.0.value(),
667                f.time_range.1.value(),
668            );
669            let builder = ParquetReaderBuilder::new(
670                "test".to_string(),
671                PathType::Bare,
672                file_handle,
673                object_store.clone(),
674            );
675            let mut reader = builder.build().await.unwrap();
676            while let Some(batch) = reader.next_batch().await.unwrap() {
677                rows_read += batch.num_rows();
678            }
679        }
680        assert_eq!(total_rows, rows_read);
681    }
682
683    #[tokio::test]
684    async fn test_write_read_with_index() {
685        let mut env = TestEnv::new().await;
686        let object_store = env.init_object_store_manager();
687        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
688        let metadata = Arc::new(sst_region_metadata());
689        let row_group_size = 50;
690
691        let source = new_source(&[
692            new_batch_by_range(&["a", "d"], 0, 20),
693            new_batch_by_range(&["b", "d"], 0, 20),
694            new_batch_by_range(&["c", "d"], 0, 20),
695            new_batch_by_range(&["c", "f"], 0, 40),
696            new_batch_by_range(&["c", "h"], 100, 200),
697        ]);
698        // Use a small row group size for test.
699        let write_opts = WriteOptions {
700            row_group_size,
701            ..Default::default()
702        };
703
704        let puffin_manager = env
705            .get_puffin_manager()
706            .build(object_store.clone(), file_path.clone());
707        let intermediate_manager = env.get_intermediate_manager();
708
709        let indexer_builder = IndexerBuilderImpl {
710            build_type: IndexBuildType::Flush,
711            metadata: metadata.clone(),
712            row_group_size,
713            puffin_manager,
714            intermediate_manager,
715            index_options: IndexOptions {
716                inverted_index: InvertedIndexOptions {
717                    segment_row_count: 1,
718                    ..Default::default()
719                },
720            },
721            inverted_index_config: Default::default(),
722            fulltext_index_config: Default::default(),
723            bloom_filter_index_config: Default::default(),
724        };
725
726        let mut metrics = Metrics::new(WriteType::Flush);
727        let mut writer = ParquetWriter::new_with_object_store(
728            object_store.clone(),
729            metadata.clone(),
730            IndexConfig::default(),
731            indexer_builder,
732            file_path.clone(),
733            &mut metrics,
734        )
735        .await;
736
737        let info = writer
738            .write_all(source, None, &write_opts)
739            .await
740            .unwrap()
741            .remove(0);
742        assert_eq!(200, info.num_rows);
743        assert!(info.file_size > 0);
744        assert!(info.index_metadata.file_size > 0);
745
746        assert!(info.index_metadata.inverted_index.index_size > 0);
747        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
748        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
749
750        assert!(info.index_metadata.bloom_filter.index_size > 0);
751        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
752        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
753
754        assert_eq!(
755            (
756                Timestamp::new_millisecond(0),
757                Timestamp::new_millisecond(199)
758            ),
759            info.time_range
760        );
761
762        let handle = FileHandle::new(
763            FileMeta {
764                region_id: metadata.region_id,
765                file_id: info.file_id,
766                time_range: info.time_range,
767                level: 0,
768                file_size: info.file_size,
769                available_indexes: info.index_metadata.build_available_indexes(),
770                index_file_size: info.index_metadata.file_size,
771                index_file_id: None,
772                num_row_groups: info.num_row_groups,
773                num_rows: info.num_rows as u64,
774                sequence: None,
775                partition_expr: match &metadata.partition_expr {
776                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
777                        .expect("partition expression should be valid JSON"),
778                    None => None,
779                },
780                num_series: 0,
781            },
782            Arc::new(NoopFilePurger),
783        );
784
785        let cache = Arc::new(
786            CacheManager::builder()
787                .index_result_cache_size(1024 * 1024)
788                .index_metadata_size(1024 * 1024)
789                .index_content_page_size(1024 * 1024)
790                .index_content_size(1024 * 1024)
791                .puffin_metadata_size(1024 * 1024)
792                .build(),
793        );
794        let index_result_cache = cache.index_result_cache().unwrap();
795
796        let build_inverted_index_applier = |exprs: &[Expr]| {
797            InvertedIndexApplierBuilder::new(
798                FILE_DIR.to_string(),
799                PathType::Bare,
800                object_store.clone(),
801                &metadata,
802                HashSet::from_iter([0]),
803                env.get_puffin_manager(),
804            )
805            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
806            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
807            .build(exprs)
808            .unwrap()
809            .map(Arc::new)
810        };
811
812        let build_bloom_filter_applier = |exprs: &[Expr]| {
813            BloomFilterIndexApplierBuilder::new(
814                FILE_DIR.to_string(),
815                PathType::Bare,
816                object_store.clone(),
817                &metadata,
818                env.get_puffin_manager(),
819            )
820            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
821            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
822            .build(exprs)
823            .unwrap()
824            .map(Arc::new)
825        };
826
827        // Data: ts tag_0 tag_1
828        // Data: 0-20 [a, d]
829        //       0-20 [b, d]
830        //       0-20 [c, d]
831        //       0-40 [c, f]
832        //    100-200 [c, h]
833        //
834        // Pred: tag_0 = "b"
835        //
836        // Row groups & rows pruning:
837        //
838        // Row Groups:
839        // - min-max: filter out row groups 1..=3
840        //
841        // Rows:
842        // - inverted index: hit row group 0, hit 20 rows
843        let preds = vec![col("tag_0").eq(lit("b"))];
844        let inverted_index_applier = build_inverted_index_applier(&preds);
845        let bloom_filter_applier = build_bloom_filter_applier(&preds);
846
847        let builder = ParquetReaderBuilder::new(
848            FILE_DIR.to_string(),
849            PathType::Bare,
850            handle.clone(),
851            object_store.clone(),
852        )
853        .predicate(Some(Predicate::new(preds)))
854        .inverted_index_appliers([inverted_index_applier.clone(), None])
855        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
856        .cache(CacheStrategy::EnableAll(cache.clone()));
857
858        let mut metrics = ReaderMetrics::default();
859        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
860        let mut reader = ParquetReader::new(Arc::new(context), selection)
861            .await
862            .unwrap();
863        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
864
865        assert_eq!(metrics.filter_metrics.rg_total, 4);
866        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
867        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
868        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
869        let cached = index_result_cache
870            .get(
871                inverted_index_applier.unwrap().predicate_key(),
872                handle.file_id().file_id(),
873            )
874            .unwrap();
875        // inverted index will search all row groups
876        assert!(cached.contains_row_group(0));
877        assert!(cached.contains_row_group(1));
878        assert!(cached.contains_row_group(2));
879        assert!(cached.contains_row_group(3));
880
881        // Data: ts tag_0 tag_1
882        // Data: 0-20 [a, d]
883        //       0-20 [b, d]
884        //       0-20 [c, d]
885        //       0-40 [c, f]
886        //    100-200 [c, h]
887        //
888        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
889        //
890        // Row groups & rows pruning:
891        //
892        // Row Groups:
893        // - min-max: filter out row groups 0..=1
894        // - bloom filter: filter out row groups 2..=3
895        let preds = vec![
896            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
897            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
898            col("tag_1").eq(lit("d")),
899        ];
900        let inverted_index_applier = build_inverted_index_applier(&preds);
901        let bloom_filter_applier = build_bloom_filter_applier(&preds);
902
903        let builder = ParquetReaderBuilder::new(
904            FILE_DIR.to_string(),
905            PathType::Bare,
906            handle.clone(),
907            object_store.clone(),
908        )
909        .predicate(Some(Predicate::new(preds)))
910        .inverted_index_appliers([inverted_index_applier.clone(), None])
911        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
912        .cache(CacheStrategy::EnableAll(cache.clone()));
913
914        let mut metrics = ReaderMetrics::default();
915        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
916        let mut reader = ParquetReader::new(Arc::new(context), selection)
917            .await
918            .unwrap();
919        check_reader_result(&mut reader, &[]).await;
920
921        assert_eq!(metrics.filter_metrics.rg_total, 4);
922        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
923        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
924        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
925        let cached = index_result_cache
926            .get(
927                bloom_filter_applier.unwrap().predicate_key(),
928                handle.file_id().file_id(),
929            )
930            .unwrap();
931        assert!(cached.contains_row_group(2));
932        assert!(cached.contains_row_group(3));
933        assert!(!cached.contains_row_group(0));
934        assert!(!cached.contains_row_group(1));
935
936        // Remove the pred of `ts`, continue to use the pred of `tag_1`
937        // to test if cache works.
938
939        // Data: ts tag_0 tag_1
940        // Data: 0-20 [a, d]
941        //       0-20 [b, d]
942        //       0-20 [c, d]
943        //       0-40 [c, f]
944        //    100-200 [c, h]
945        //
946        // Pred: tag_1 = "d"
947        //
948        // Row groups & rows pruning:
949        //
950        // Row Groups:
951        // - bloom filter: filter out row groups 2..=3
952        //
953        // Rows:
954        // - bloom filter: hit row group 0, hit 50 rows
955        //                 hit row group 1, hit 10 rows
956        let preds = vec![col("tag_1").eq(lit("d"))];
957        let inverted_index_applier = build_inverted_index_applier(&preds);
958        let bloom_filter_applier = build_bloom_filter_applier(&preds);
959
960        let builder = ParquetReaderBuilder::new(
961            FILE_DIR.to_string(),
962            PathType::Bare,
963            handle.clone(),
964            object_store.clone(),
965        )
966        .predicate(Some(Predicate::new(preds)))
967        .inverted_index_appliers([inverted_index_applier.clone(), None])
968        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
969        .cache(CacheStrategy::EnableAll(cache.clone()));
970
971        let mut metrics = ReaderMetrics::default();
972        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
973        let mut reader = ParquetReader::new(Arc::new(context), selection)
974            .await
975            .unwrap();
976        check_reader_result(
977            &mut reader,
978            &[
979                new_batch_by_range(&["a", "d"], 0, 20),
980                new_batch_by_range(&["b", "d"], 0, 20),
981                new_batch_by_range(&["c", "d"], 0, 10),
982                new_batch_by_range(&["c", "d"], 10, 20),
983            ],
984        )
985        .await;
986
987        assert_eq!(metrics.filter_metrics.rg_total, 4);
988        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
989        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
990        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
991        let cached = index_result_cache
992            .get(
993                bloom_filter_applier.unwrap().predicate_key(),
994                handle.file_id().file_id(),
995            )
996            .unwrap();
997        assert!(cached.contains_row_group(0));
998        assert!(cached.contains_row_group(1));
999        assert!(cached.contains_row_group(2));
1000        assert!(cached.contains_row_group(3));
1001    }
1002
1003    /// Creates a flat format RecordBatch for testing.
1004    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
1005    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1006        assert!(end >= start);
1007        let metadata = Arc::new(sst_region_metadata());
1008        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1009
1010        let num_rows = end - start;
1011        let mut columns = Vec::new();
1012
1013        // Add primary key columns (tag_0, tag_1) as dictionary arrays
1014        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1015        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1016
1017        for _ in 0..num_rows {
1018            tag_0_builder.append_value(tags[0]);
1019            tag_1_builder.append_value(tags[1]);
1020        }
1021
1022        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1023        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1024
1025        // Add field column (field_0)
1026        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1027        columns.push(Arc::new(UInt64Array::from(field_values)));
1028
1029        // Add time index column (ts)
1030        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1031        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1032
1033        // Add encoded primary key column
1034        let pk = new_primary_key(tags);
1035        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1036        for _ in 0..num_rows {
1037            pk_builder.append(&pk).unwrap();
1038        }
1039        columns.push(Arc::new(pk_builder.finish()));
1040
1041        // Add sequence column
1042        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1043
1044        // Add op_type column
1045        columns.push(Arc::new(UInt8Array::from_value(
1046            OpType::Put as u8,
1047            num_rows,
1048        )));
1049
1050        RecordBatch::try_new(flat_schema, columns).unwrap()
1051    }
1052
1053    /// Creates a FlatSource from flat format RecordBatches.
1054    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1055        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1056    }
1057
1058    #[tokio::test]
1059    async fn test_write_flat_with_index() {
1060        let mut env = TestEnv::new().await;
1061        let object_store = env.init_object_store_manager();
1062        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1063        let metadata = Arc::new(sst_region_metadata());
1064        let row_group_size = 50;
1065
1066        // Create flat format RecordBatches
1067        let flat_batches = vec![
1068            new_record_batch_by_range(&["a", "d"], 0, 20),
1069            new_record_batch_by_range(&["b", "d"], 0, 20),
1070            new_record_batch_by_range(&["c", "d"], 0, 20),
1071            new_record_batch_by_range(&["c", "f"], 0, 40),
1072            new_record_batch_by_range(&["c", "h"], 100, 200),
1073        ];
1074
1075        let flat_source = new_flat_source_from_record_batches(flat_batches);
1076
1077        let write_opts = WriteOptions {
1078            row_group_size,
1079            ..Default::default()
1080        };
1081
1082        let puffin_manager = env
1083            .get_puffin_manager()
1084            .build(object_store.clone(), file_path.clone());
1085        let intermediate_manager = env.get_intermediate_manager();
1086
1087        let indexer_builder = IndexerBuilderImpl {
1088            build_type: IndexBuildType::Flush,
1089            metadata: metadata.clone(),
1090            row_group_size,
1091            puffin_manager,
1092            intermediate_manager,
1093            index_options: IndexOptions {
1094                inverted_index: InvertedIndexOptions {
1095                    segment_row_count: 1,
1096                    ..Default::default()
1097                },
1098            },
1099            inverted_index_config: Default::default(),
1100            fulltext_index_config: Default::default(),
1101            bloom_filter_index_config: Default::default(),
1102        };
1103
1104        let mut metrics = Metrics::new(WriteType::Flush);
1105        let mut writer = ParquetWriter::new_with_object_store(
1106            object_store.clone(),
1107            metadata.clone(),
1108            IndexConfig::default(),
1109            indexer_builder,
1110            file_path.clone(),
1111            &mut metrics,
1112        )
1113        .await;
1114
1115        let info = writer
1116            .write_all_flat(flat_source, &write_opts)
1117            .await
1118            .unwrap()
1119            .remove(0);
1120        assert_eq!(200, info.num_rows);
1121        assert!(info.file_size > 0);
1122        assert!(info.index_metadata.file_size > 0);
1123
1124        assert!(info.index_metadata.inverted_index.index_size > 0);
1125        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1126        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1127
1128        assert!(info.index_metadata.bloom_filter.index_size > 0);
1129        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1130        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1131
1132        assert_eq!(
1133            (
1134                Timestamp::new_millisecond(0),
1135                Timestamp::new_millisecond(199)
1136            ),
1137            info.time_range
1138        );
1139    }
1140
1141    #[tokio::test]
1142    async fn test_read_with_override_sequence() {
1143        let mut env = TestEnv::new().await;
1144        let object_store = env.init_object_store_manager();
1145        let handle = sst_file_handle(0, 1000);
1146        let file_path = FixedPathProvider {
1147            region_file_id: handle.file_id(),
1148        };
1149        let metadata = Arc::new(sst_region_metadata());
1150
1151        // Create batches with sequence 0 to trigger override functionality
1152        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1153        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1154        let source = new_source(&[batch1, batch2]);
1155
1156        let write_opts = WriteOptions {
1157            row_group_size: 50,
1158            ..Default::default()
1159        };
1160
1161        let mut metrics = Metrics::new(WriteType::Flush);
1162        let mut writer = ParquetWriter::new_with_object_store(
1163            object_store.clone(),
1164            metadata.clone(),
1165            IndexConfig::default(),
1166            NoopIndexBuilder,
1167            file_path,
1168            &mut metrics,
1169        )
1170        .await;
1171
1172        writer
1173            .write_all(source, None, &write_opts)
1174            .await
1175            .unwrap()
1176            .remove(0);
1177
1178        // Read without override sequence (should read sequence 0)
1179        let builder = ParquetReaderBuilder::new(
1180            FILE_DIR.to_string(),
1181            PathType::Bare,
1182            handle.clone(),
1183            object_store.clone(),
1184        );
1185        let mut reader = builder.build().await.unwrap();
1186        let mut normal_batches = Vec::new();
1187        while let Some(batch) = reader.next_batch().await.unwrap() {
1188            normal_batches.push(batch);
1189        }
1190
1191        // Read with override sequence using FileMeta.sequence
1192        let custom_sequence = 12345u64;
1193        let file_meta = handle.meta_ref();
1194        let mut override_file_meta = file_meta.clone();
1195        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1196        let override_handle = FileHandle::new(
1197            override_file_meta,
1198            Arc::new(crate::sst::file_purger::NoopFilePurger),
1199        );
1200
1201        let builder = ParquetReaderBuilder::new(
1202            FILE_DIR.to_string(),
1203            PathType::Bare,
1204            override_handle,
1205            object_store.clone(),
1206        );
1207        let mut reader = builder.build().await.unwrap();
1208        let mut override_batches = Vec::new();
1209        while let Some(batch) = reader.next_batch().await.unwrap() {
1210            override_batches.push(batch);
1211        }
1212
1213        // Compare the results
1214        assert_eq!(normal_batches.len(), override_batches.len());
1215        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1216            // Create expected batch with override sequence
1217            let expected_batch = {
1218                let num_rows = normal.num_rows();
1219                let mut builder = BatchBuilder::from(normal);
1220                builder
1221                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1222                    .unwrap();
1223
1224                builder.build().unwrap()
1225            };
1226
1227            // Override batch should match expected batch
1228            assert_eq!(*override_batch, expected_batch);
1229        }
1230    }
1231}