mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Maximum uncompressed row group size in bytes. 0 if unknown.
80    pub max_row_group_uncompressed_size: u64,
81    /// Number of rows.
82    pub num_rows: usize,
83    /// Number of row groups
84    pub num_row_groups: u64,
85    /// File Meta Data
86    pub file_metadata: Option<Arc<ParquetMetaData>>,
87    /// Index Meta Data
88    pub index_metadata: IndexOutput,
89    /// Number of series
90    pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95    use std::collections::HashSet;
96    use std::sync::Arc;
97
98    use api::v1::{OpType, SemanticType};
99    use common_function::function::FunctionRef;
100    use common_function::function_factory::ScalarFunctionFactory;
101    use common_function::scalars::matches::MatchesFunction;
102    use common_function::scalars::matches_term::MatchesTermFunction;
103    use common_time::Timestamp;
104    use datafusion_common::{Column, ScalarValue};
105    use datafusion_expr::expr::ScalarFunction;
106    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107    use datatypes::arrow;
108    use datatypes::arrow::array::{
109        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110        TimestampMillisecondArray, UInt8Array, UInt64Array,
111    };
112    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113    use datatypes::prelude::ConcreteDataType;
114    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
115    use object_store::ObjectStore;
116    use parquet::arrow::AsyncArrowWriter;
117    use parquet::basic::{Compression, Encoding, ZstdLevel};
118    use parquet::file::metadata::KeyValue;
119    use parquet::file::properties::WriterProperties;
120    use store_api::codec::PrimaryKeyEncoding;
121    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
122    use store_api::region_request::PathType;
123    use store_api::storage::{ColumnSchema, RegionId};
124    use table::predicate::Predicate;
125    use tokio_util::compat::FuturesAsyncWriteCompatExt;
126
127    use super::*;
128    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
129    use crate::cache::{CacheManager, CacheStrategy, PageKey};
130    use crate::config::IndexConfig;
131    use crate::read::{BatchBuilder, BatchReader, FlatSource};
132    use crate::region::options::{IndexOptions, InvertedIndexOptions};
133    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
134    use crate::sst::file_purger::NoopFilePurger;
135    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
136    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
137    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
138    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
139    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
140    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
141    use crate::sst::parquet::writer::ParquetWriter;
142    use crate::sst::{
143        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
144    };
145    use crate::test_util::sst_util::{
146        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
147        new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
148        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
149        sst_region_metadata_with_encoding,
150    };
151    use crate::test_util::{TestEnv, check_reader_result};
152
153    const FILE_DIR: &str = "/";
154    const REGION_ID: RegionId = RegionId::new(0, 0);
155
156    #[derive(Clone)]
157    struct FixedPathProvider {
158        region_file_id: RegionFileId,
159    }
160
161    impl FilePathProvider for FixedPathProvider {
162        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
163            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
164        }
165
166        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
167            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
168        }
169
170        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
171            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
172        }
173    }
174
175    struct NoopIndexBuilder;
176
177    #[async_trait::async_trait]
178    impl IndexerBuilder for NoopIndexBuilder {
179        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
180            Indexer::default()
181        }
182    }
183
184    #[tokio::test]
185    async fn test_write_read() {
186        let mut env = TestEnv::new().await;
187        let object_store = env.init_object_store_manager();
188        let handle = sst_file_handle(0, 1000);
189        let file_path = FixedPathProvider {
190            region_file_id: handle.file_id(),
191        };
192        let metadata = Arc::new(sst_region_metadata());
193        let source = new_source(&[
194            new_batch_by_range(&["a", "d"], 0, 60),
195            new_batch_by_range(&["b", "f"], 0, 40),
196            new_batch_by_range(&["b", "h"], 100, 200),
197        ]);
198        // Use a small row group size for test.
199        let write_opts = WriteOptions {
200            row_group_size: 50,
201            ..Default::default()
202        };
203
204        let mut metrics = Metrics::new(WriteType::Flush);
205        let mut writer = ParquetWriter::new_with_object_store(
206            object_store.clone(),
207            metadata.clone(),
208            IndexConfig::default(),
209            NoopIndexBuilder,
210            file_path,
211            &mut metrics,
212        )
213        .await;
214
215        let info = writer
216            .write_all(source, None, &write_opts)
217            .await
218            .unwrap()
219            .remove(0);
220        assert_eq!(200, info.num_rows);
221        assert!(info.file_size > 0);
222        assert_eq!(
223            (
224                Timestamp::new_millisecond(0),
225                Timestamp::new_millisecond(199)
226            ),
227            info.time_range
228        );
229
230        let builder = ParquetReaderBuilder::new(
231            FILE_DIR.to_string(),
232            PathType::Bare,
233            handle.clone(),
234            object_store,
235        );
236        let mut reader = builder.build().await.unwrap();
237        check_reader_result(
238            &mut reader,
239            &[
240                new_batch_by_range(&["a", "d"], 0, 50),
241                new_batch_by_range(&["a", "d"], 50, 60),
242                new_batch_by_range(&["b", "f"], 0, 40),
243                new_batch_by_range(&["b", "h"], 100, 150),
244                new_batch_by_range(&["b", "h"], 150, 200),
245            ],
246        )
247        .await;
248    }
249
250    #[tokio::test]
251    async fn test_read_with_cache() {
252        let mut env = TestEnv::new().await;
253        let object_store = env.init_object_store_manager();
254        let handle = sst_file_handle(0, 1000);
255        let metadata = Arc::new(sst_region_metadata());
256        let source = new_source(&[
257            new_batch_by_range(&["a", "d"], 0, 60),
258            new_batch_by_range(&["b", "f"], 0, 40),
259            new_batch_by_range(&["b", "h"], 100, 200),
260        ]);
261        // Use a small row group size for test.
262        let write_opts = WriteOptions {
263            row_group_size: 50,
264            ..Default::default()
265        };
266        // Prepare data.
267        let mut metrics = Metrics::new(WriteType::Flush);
268        let mut writer = ParquetWriter::new_with_object_store(
269            object_store.clone(),
270            metadata.clone(),
271            IndexConfig::default(),
272            NoopIndexBuilder,
273            FixedPathProvider {
274                region_file_id: handle.file_id(),
275            },
276            &mut metrics,
277        )
278        .await;
279
280        let sst_info = writer
281            .write_all(source, None, &write_opts)
282            .await
283            .unwrap()
284            .remove(0);
285
286        // Enable page cache.
287        let cache = CacheStrategy::EnableAll(Arc::new(
288            CacheManager::builder()
289                .page_cache_size(64 * 1024 * 1024)
290                .build(),
291        ));
292        let builder = ParquetReaderBuilder::new(
293            FILE_DIR.to_string(),
294            PathType::Bare,
295            handle.clone(),
296            object_store,
297        )
298        .cache(cache.clone());
299        for _ in 0..3 {
300            let mut reader = builder.build().await.unwrap();
301            check_reader_result(
302                &mut reader,
303                &[
304                    new_batch_by_range(&["a", "d"], 0, 50),
305                    new_batch_by_range(&["a", "d"], 50, 60),
306                    new_batch_by_range(&["b", "f"], 0, 40),
307                    new_batch_by_range(&["b", "h"], 100, 150),
308                    new_batch_by_range(&["b", "h"], 150, 200),
309                ],
310            )
311            .await;
312        }
313
314        let parquet_meta = sst_info.file_metadata.unwrap();
315        let get_ranges = |row_group_idx: usize| {
316            let row_group = parquet_meta.row_group(row_group_idx);
317            let mut ranges = Vec::with_capacity(row_group.num_columns());
318            for i in 0..row_group.num_columns() {
319                let (start, length) = row_group.column(i).byte_range();
320                ranges.push(start..start + length);
321            }
322
323            ranges
324        };
325
326        // Cache 4 row groups.
327        for i in 0..4 {
328            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
329            assert!(cache.get_pages(&page_key).is_some());
330        }
331        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
332        assert!(cache.get_pages(&page_key).is_none());
333    }
334
335    #[tokio::test]
336    async fn test_parquet_metadata_eq() {
337        // create test env
338        let mut env = crate::test_util::TestEnv::new().await;
339        let object_store = env.init_object_store_manager();
340        let handle = sst_file_handle(0, 1000);
341        let metadata = Arc::new(sst_region_metadata());
342        let source = new_source(&[
343            new_batch_by_range(&["a", "d"], 0, 60),
344            new_batch_by_range(&["b", "f"], 0, 40),
345            new_batch_by_range(&["b", "h"], 100, 200),
346        ]);
347        let write_opts = WriteOptions {
348            row_group_size: 50,
349            ..Default::default()
350        };
351
352        // write the sst file and get sst info
353        // sst info contains the parquet metadata, which is converted from FileMetaData
354        let mut metrics = Metrics::new(WriteType::Flush);
355        let mut writer = ParquetWriter::new_with_object_store(
356            object_store.clone(),
357            metadata.clone(),
358            IndexConfig::default(),
359            NoopIndexBuilder,
360            FixedPathProvider {
361                region_file_id: handle.file_id(),
362            },
363            &mut metrics,
364        )
365        .await;
366
367        let sst_info = writer
368            .write_all(source, None, &write_opts)
369            .await
370            .unwrap()
371            .remove(0);
372        let writer_metadata = sst_info.file_metadata.unwrap();
373
374        // read the sst file metadata
375        let builder = ParquetReaderBuilder::new(
376            FILE_DIR.to_string(),
377            PathType::Bare,
378            handle.clone(),
379            object_store,
380        );
381        let reader = builder.build().await.unwrap();
382        let reader_metadata = reader.parquet_metadata();
383
384        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
385    }
386
387    #[tokio::test]
388    async fn test_read_with_tag_filter() {
389        let mut env = TestEnv::new().await;
390        let object_store = env.init_object_store_manager();
391        let handle = sst_file_handle(0, 1000);
392        let metadata = Arc::new(sst_region_metadata());
393        let source = new_source(&[
394            new_batch_by_range(&["a", "d"], 0, 60),
395            new_batch_by_range(&["b", "f"], 0, 40),
396            new_batch_by_range(&["b", "h"], 100, 200),
397        ]);
398        // Use a small row group size for test.
399        let write_opts = WriteOptions {
400            row_group_size: 50,
401            ..Default::default()
402        };
403        // Prepare data.
404        let mut metrics = Metrics::new(WriteType::Flush);
405        let mut writer = ParquetWriter::new_with_object_store(
406            object_store.clone(),
407            metadata.clone(),
408            IndexConfig::default(),
409            NoopIndexBuilder,
410            FixedPathProvider {
411                region_file_id: handle.file_id(),
412            },
413            &mut metrics,
414        )
415        .await;
416        writer
417            .write_all(source, None, &write_opts)
418            .await
419            .unwrap()
420            .remove(0);
421
422        // Predicate
423        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
424            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
425            op: Operator::Eq,
426            right: Box::new("a".lit()),
427        })]));
428
429        let builder = ParquetReaderBuilder::new(
430            FILE_DIR.to_string(),
431            PathType::Bare,
432            handle.clone(),
433            object_store,
434        )
435        .predicate(predicate);
436        let mut reader = builder.build().await.unwrap();
437        check_reader_result(
438            &mut reader,
439            &[
440                new_batch_by_range(&["a", "d"], 0, 50),
441                new_batch_by_range(&["a", "d"], 50, 60),
442            ],
443        )
444        .await;
445    }
446
447    #[tokio::test]
448    async fn test_read_empty_batch() {
449        let mut env = TestEnv::new().await;
450        let object_store = env.init_object_store_manager();
451        let handle = sst_file_handle(0, 1000);
452        let metadata = Arc::new(sst_region_metadata());
453        let source = new_source(&[
454            new_batch_by_range(&["a", "z"], 0, 0),
455            new_batch_by_range(&["a", "z"], 100, 100),
456            new_batch_by_range(&["a", "z"], 200, 230),
457        ]);
458        // Use a small row group size for test.
459        let write_opts = WriteOptions {
460            row_group_size: 50,
461            ..Default::default()
462        };
463        // Prepare data.
464        let mut metrics = Metrics::new(WriteType::Flush);
465        let mut writer = ParquetWriter::new_with_object_store(
466            object_store.clone(),
467            metadata.clone(),
468            IndexConfig::default(),
469            NoopIndexBuilder,
470            FixedPathProvider {
471                region_file_id: handle.file_id(),
472            },
473            &mut metrics,
474        )
475        .await;
476        writer
477            .write_all(source, None, &write_opts)
478            .await
479            .unwrap()
480            .remove(0);
481
482        let builder = ParquetReaderBuilder::new(
483            FILE_DIR.to_string(),
484            PathType::Bare,
485            handle.clone(),
486            object_store,
487        );
488        let mut reader = builder.build().await.unwrap();
489        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
490    }
491
492    #[tokio::test]
493    async fn test_read_with_field_filter() {
494        let mut env = TestEnv::new().await;
495        let object_store = env.init_object_store_manager();
496        let handle = sst_file_handle(0, 1000);
497        let metadata = Arc::new(sst_region_metadata());
498        let source = new_source(&[
499            new_batch_by_range(&["a", "d"], 0, 60),
500            new_batch_by_range(&["b", "f"], 0, 40),
501            new_batch_by_range(&["b", "h"], 100, 200),
502        ]);
503        // Use a small row group size for test.
504        let write_opts = WriteOptions {
505            row_group_size: 50,
506            ..Default::default()
507        };
508        // Prepare data.
509        let mut metrics = Metrics::new(WriteType::Flush);
510        let mut writer = ParquetWriter::new_with_object_store(
511            object_store.clone(),
512            metadata.clone(),
513            IndexConfig::default(),
514            NoopIndexBuilder,
515            FixedPathProvider {
516                region_file_id: handle.file_id(),
517            },
518            &mut metrics,
519        )
520        .await;
521
522        writer
523            .write_all(source, None, &write_opts)
524            .await
525            .unwrap()
526            .remove(0);
527
528        // Predicate
529        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
530            left: Box::new(Expr::Column(Column::from_name("field_0"))),
531            op: Operator::GtEq,
532            right: Box::new(150u64.lit()),
533        })]));
534
535        let builder = ParquetReaderBuilder::new(
536            FILE_DIR.to_string(),
537            PathType::Bare,
538            handle.clone(),
539            object_store,
540        )
541        .predicate(predicate);
542        let mut reader = builder.build().await.unwrap();
543        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
544    }
545
546    #[tokio::test]
547    async fn test_read_large_binary() {
548        let mut env = TestEnv::new().await;
549        let object_store = env.init_object_store_manager();
550        let handle = sst_file_handle(0, 1000);
551        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
552
553        let write_opts = WriteOptions {
554            row_group_size: 50,
555            ..Default::default()
556        };
557
558        let metadata = build_test_binary_test_region_metadata();
559        let json = metadata.to_json().unwrap();
560        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
561
562        let props_builder = WriterProperties::builder()
563            .set_key_value_metadata(Some(vec![key_value_meta]))
564            .set_compression(Compression::ZSTD(ZstdLevel::default()))
565            .set_encoding(Encoding::PLAIN)
566            .set_max_row_group_size(write_opts.row_group_size);
567
568        let writer_props = props_builder.build();
569
570        let write_format = PrimaryKeyWriteFormat::new(metadata);
571        let fields: Vec<_> = write_format
572            .arrow_schema()
573            .fields()
574            .into_iter()
575            .map(|field| {
576                let data_type = field.data_type().clone();
577                if data_type == DataType::Binary {
578                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
579                } else {
580                    Field::new(field.name(), data_type, field.is_nullable())
581                }
582            })
583            .collect();
584
585        let arrow_schema = Arc::new(Schema::new(fields));
586
587        // Ensures field_0 has LargeBinary type.
588        assert_eq!(
589            &DataType::LargeBinary,
590            arrow_schema.field_with_name("field_0").unwrap().data_type()
591        );
592        let mut writer = AsyncArrowWriter::try_new(
593            object_store
594                .writer_with(&file_path)
595                .concurrent(DEFAULT_WRITE_CONCURRENCY)
596                .await
597                .map(|w| w.into_futures_async_write().compat_write())
598                .unwrap(),
599            arrow_schema.clone(),
600            Some(writer_props),
601        )
602        .unwrap();
603
604        let batch = new_batch_with_binary(&["a"], 0, 60);
605        let arrow_batch = write_format.convert_batch(&batch).unwrap();
606        let arrays: Vec<_> = arrow_batch
607            .columns()
608            .iter()
609            .map(|array| {
610                let data_type = array.data_type().clone();
611                if data_type == DataType::Binary {
612                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
613                } else {
614                    array.clone()
615                }
616            })
617            .collect();
618        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
619
620        writer.write(&result).await.unwrap();
621        writer.close().await.unwrap();
622
623        let builder = ParquetReaderBuilder::new(
624            FILE_DIR.to_string(),
625            PathType::Bare,
626            handle.clone(),
627            object_store,
628        );
629        let mut reader = builder.build().await.unwrap();
630        check_reader_result(
631            &mut reader,
632            &[
633                new_batch_with_binary(&["a"], 0, 50),
634                new_batch_with_binary(&["a"], 50, 60),
635            ],
636        )
637        .await;
638    }
639
640    #[tokio::test]
641    async fn test_write_multiple_files() {
642        common_telemetry::init_default_ut_logging();
643        // create test env
644        let mut env = TestEnv::new().await;
645        let object_store = env.init_object_store_manager();
646        let metadata = Arc::new(sst_region_metadata());
647        let batches = &[
648            new_batch_by_range(&["a", "d"], 0, 1000),
649            new_batch_by_range(&["b", "f"], 0, 1000),
650            new_batch_by_range(&["c", "g"], 0, 1000),
651            new_batch_by_range(&["b", "h"], 100, 200),
652            new_batch_by_range(&["b", "h"], 200, 300),
653            new_batch_by_range(&["b", "h"], 300, 1000),
654        ];
655        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
656
657        let source = new_source(batches);
658        let write_opts = WriteOptions {
659            row_group_size: 50,
660            max_file_size: Some(1024 * 16),
661            ..Default::default()
662        };
663
664        let path_provider = RegionFilePathFactory {
665            table_dir: "test".to_string(),
666            path_type: PathType::Bare,
667        };
668        let mut metrics = Metrics::new(WriteType::Flush);
669        let mut writer = ParquetWriter::new_with_object_store(
670            object_store.clone(),
671            metadata.clone(),
672            IndexConfig::default(),
673            NoopIndexBuilder,
674            path_provider,
675            &mut metrics,
676        )
677        .await;
678
679        let files = writer.write_all(source, None, &write_opts).await.unwrap();
680        assert_eq!(2, files.len());
681
682        let mut rows_read = 0;
683        for f in &files {
684            let file_handle = sst_file_handle_with_file_id(
685                f.file_id,
686                f.time_range.0.value(),
687                f.time_range.1.value(),
688            );
689            let builder = ParquetReaderBuilder::new(
690                "test".to_string(),
691                PathType::Bare,
692                file_handle,
693                object_store.clone(),
694            );
695            let mut reader = builder.build().await.unwrap();
696            while let Some(batch) = reader.next_batch().await.unwrap() {
697                rows_read += batch.num_rows();
698            }
699        }
700        assert_eq!(total_rows, rows_read);
701    }
702
703    #[tokio::test]
704    async fn test_write_read_with_index() {
705        let mut env = TestEnv::new().await;
706        let object_store = env.init_object_store_manager();
707        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
708        let metadata = Arc::new(sst_region_metadata());
709        let row_group_size = 50;
710
711        let source = new_source(&[
712            new_batch_by_range(&["a", "d"], 0, 20),
713            new_batch_by_range(&["b", "d"], 0, 20),
714            new_batch_by_range(&["c", "d"], 0, 20),
715            new_batch_by_range(&["c", "f"], 0, 40),
716            new_batch_by_range(&["c", "h"], 100, 200),
717        ]);
718        // Use a small row group size for test.
719        let write_opts = WriteOptions {
720            row_group_size,
721            ..Default::default()
722        };
723
724        let puffin_manager = env
725            .get_puffin_manager()
726            .build(object_store.clone(), file_path.clone());
727        let intermediate_manager = env.get_intermediate_manager();
728
729        let indexer_builder = IndexerBuilderImpl {
730            build_type: IndexBuildType::Flush,
731            metadata: metadata.clone(),
732            row_group_size,
733            puffin_manager,
734            write_cache_enabled: false,
735            intermediate_manager,
736            index_options: IndexOptions {
737                inverted_index: InvertedIndexOptions {
738                    segment_row_count: 1,
739                    ..Default::default()
740                },
741            },
742            inverted_index_config: Default::default(),
743            fulltext_index_config: Default::default(),
744            bloom_filter_index_config: Default::default(),
745            #[cfg(feature = "vector_index")]
746            vector_index_config: Default::default(),
747        };
748
749        let mut metrics = Metrics::new(WriteType::Flush);
750        let mut writer = ParquetWriter::new_with_object_store(
751            object_store.clone(),
752            metadata.clone(),
753            IndexConfig::default(),
754            indexer_builder,
755            file_path.clone(),
756            &mut metrics,
757        )
758        .await;
759
760        let info = writer
761            .write_all(source, None, &write_opts)
762            .await
763            .unwrap()
764            .remove(0);
765        assert_eq!(200, info.num_rows);
766        assert!(info.file_size > 0);
767        assert!(info.index_metadata.file_size > 0);
768
769        assert!(info.index_metadata.inverted_index.index_size > 0);
770        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
771        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
772
773        assert!(info.index_metadata.bloom_filter.index_size > 0);
774        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
775        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
776
777        assert_eq!(
778            (
779                Timestamp::new_millisecond(0),
780                Timestamp::new_millisecond(199)
781            ),
782            info.time_range
783        );
784
785        let handle = FileHandle::new(
786            FileMeta {
787                region_id: metadata.region_id,
788                file_id: info.file_id,
789                time_range: info.time_range,
790                level: 0,
791                file_size: info.file_size,
792                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
793                available_indexes: info.index_metadata.build_available_indexes(),
794                indexes: info.index_metadata.build_indexes(),
795                index_file_size: info.index_metadata.file_size,
796                index_version: 0,
797                num_row_groups: info.num_row_groups,
798                num_rows: info.num_rows as u64,
799                sequence: None,
800                partition_expr: match &metadata.partition_expr {
801                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
802                        .expect("partition expression should be valid JSON"),
803                    None => None,
804                },
805                num_series: 0,
806            },
807            Arc::new(NoopFilePurger),
808        );
809
810        let cache = Arc::new(
811            CacheManager::builder()
812                .index_result_cache_size(1024 * 1024)
813                .index_metadata_size(1024 * 1024)
814                .index_content_page_size(1024 * 1024)
815                .index_content_size(1024 * 1024)
816                .puffin_metadata_size(1024 * 1024)
817                .build(),
818        );
819        let index_result_cache = cache.index_result_cache().unwrap();
820
821        let build_inverted_index_applier = |exprs: &[Expr]| {
822            InvertedIndexApplierBuilder::new(
823                FILE_DIR.to_string(),
824                PathType::Bare,
825                object_store.clone(),
826                &metadata,
827                HashSet::from_iter([0]),
828                env.get_puffin_manager(),
829            )
830            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
831            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
832            .build(exprs)
833            .unwrap()
834            .map(Arc::new)
835        };
836
837        let build_bloom_filter_applier = |exprs: &[Expr]| {
838            BloomFilterIndexApplierBuilder::new(
839                FILE_DIR.to_string(),
840                PathType::Bare,
841                object_store.clone(),
842                &metadata,
843                env.get_puffin_manager(),
844            )
845            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
846            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
847            .build(exprs)
848            .unwrap()
849            .map(Arc::new)
850        };
851
852        // Data: ts tag_0 tag_1
853        // Data: 0-20 [a, d]
854        //       0-20 [b, d]
855        //       0-20 [c, d]
856        //       0-40 [c, f]
857        //    100-200 [c, h]
858        //
859        // Pred: tag_0 = "b"
860        //
861        // Row groups & rows pruning:
862        //
863        // Row Groups:
864        // - min-max: filter out row groups 1..=3
865        //
866        // Rows:
867        // - inverted index: hit row group 0, hit 20 rows
868        let preds = vec![col("tag_0").eq(lit("b"))];
869        let inverted_index_applier = build_inverted_index_applier(&preds);
870        let bloom_filter_applier = build_bloom_filter_applier(&preds);
871
872        let builder = ParquetReaderBuilder::new(
873            FILE_DIR.to_string(),
874            PathType::Bare,
875            handle.clone(),
876            object_store.clone(),
877        )
878        .predicate(Some(Predicate::new(preds)))
879        .inverted_index_appliers([inverted_index_applier.clone(), None])
880        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
881        .cache(CacheStrategy::EnableAll(cache.clone()));
882
883        let mut metrics = ReaderMetrics::default();
884        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
885        let mut reader = ParquetReader::new(Arc::new(context), selection)
886            .await
887            .unwrap();
888        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
889
890        assert_eq!(metrics.filter_metrics.rg_total, 4);
891        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
892        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
893        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
894        let cached = index_result_cache
895            .get(
896                inverted_index_applier.unwrap().predicate_key(),
897                handle.file_id().file_id(),
898            )
899            .unwrap();
900        // inverted index will search all row groups
901        assert!(cached.contains_row_group(0));
902        assert!(cached.contains_row_group(1));
903        assert!(cached.contains_row_group(2));
904        assert!(cached.contains_row_group(3));
905
906        // Data: ts tag_0 tag_1
907        // Data: 0-20 [a, d]
908        //       0-20 [b, d]
909        //       0-20 [c, d]
910        //       0-40 [c, f]
911        //    100-200 [c, h]
912        //
913        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
914        //
915        // Row groups & rows pruning:
916        //
917        // Row Groups:
918        // - min-max: filter out row groups 0..=1
919        // - bloom filter: filter out row groups 2..=3
920        let preds = vec![
921            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
922            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
923            col("tag_1").eq(lit("d")),
924        ];
925        let inverted_index_applier = build_inverted_index_applier(&preds);
926        let bloom_filter_applier = build_bloom_filter_applier(&preds);
927
928        let builder = ParquetReaderBuilder::new(
929            FILE_DIR.to_string(),
930            PathType::Bare,
931            handle.clone(),
932            object_store.clone(),
933        )
934        .predicate(Some(Predicate::new(preds)))
935        .inverted_index_appliers([inverted_index_applier.clone(), None])
936        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
937        .cache(CacheStrategy::EnableAll(cache.clone()));
938
939        let mut metrics = ReaderMetrics::default();
940        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
941        let mut reader = ParquetReader::new(Arc::new(context), selection)
942            .await
943            .unwrap();
944        check_reader_result(&mut reader, &[]).await;
945
946        assert_eq!(metrics.filter_metrics.rg_total, 4);
947        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
948        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
949        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
950        let cached = index_result_cache
951            .get(
952                bloom_filter_applier.unwrap().predicate_key(),
953                handle.file_id().file_id(),
954            )
955            .unwrap();
956        assert!(cached.contains_row_group(2));
957        assert!(cached.contains_row_group(3));
958        assert!(!cached.contains_row_group(0));
959        assert!(!cached.contains_row_group(1));
960
961        // Remove the pred of `ts`, continue to use the pred of `tag_1`
962        // to test if cache works.
963
964        // Data: ts tag_0 tag_1
965        // Data: 0-20 [a, d]
966        //       0-20 [b, d]
967        //       0-20 [c, d]
968        //       0-40 [c, f]
969        //    100-200 [c, h]
970        //
971        // Pred: tag_1 = "d"
972        //
973        // Row groups & rows pruning:
974        //
975        // Row Groups:
976        // - bloom filter: filter out row groups 2..=3
977        //
978        // Rows:
979        // - bloom filter: hit row group 0, hit 50 rows
980        //                 hit row group 1, hit 10 rows
981        let preds = vec![col("tag_1").eq(lit("d"))];
982        let inverted_index_applier = build_inverted_index_applier(&preds);
983        let bloom_filter_applier = build_bloom_filter_applier(&preds);
984
985        let builder = ParquetReaderBuilder::new(
986            FILE_DIR.to_string(),
987            PathType::Bare,
988            handle.clone(),
989            object_store.clone(),
990        )
991        .predicate(Some(Predicate::new(preds)))
992        .inverted_index_appliers([inverted_index_applier.clone(), None])
993        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
994        .cache(CacheStrategy::EnableAll(cache.clone()));
995
996        let mut metrics = ReaderMetrics::default();
997        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
998        let mut reader = ParquetReader::new(Arc::new(context), selection)
999            .await
1000            .unwrap();
1001        check_reader_result(
1002            &mut reader,
1003            &[
1004                new_batch_by_range(&["a", "d"], 0, 20),
1005                new_batch_by_range(&["b", "d"], 0, 20),
1006                new_batch_by_range(&["c", "d"], 0, 10),
1007                new_batch_by_range(&["c", "d"], 10, 20),
1008            ],
1009        )
1010        .await;
1011
1012        assert_eq!(metrics.filter_metrics.rg_total, 4);
1013        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1014        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1015        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1016        let cached = index_result_cache
1017            .get(
1018                bloom_filter_applier.unwrap().predicate_key(),
1019                handle.file_id().file_id(),
1020            )
1021            .unwrap();
1022        assert!(cached.contains_row_group(0));
1023        assert!(cached.contains_row_group(1));
1024        assert!(cached.contains_row_group(2));
1025        assert!(cached.contains_row_group(3));
1026    }
1027
1028    /// Creates a flat format RecordBatch for testing.
1029    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
1030    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1031        assert!(end >= start);
1032        let metadata = Arc::new(sst_region_metadata());
1033        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1034
1035        let num_rows = end - start;
1036        let mut columns = Vec::new();
1037
1038        // Add primary key columns (tag_0, tag_1) as dictionary arrays
1039        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1040        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1041
1042        for _ in 0..num_rows {
1043            tag_0_builder.append_value(tags[0]);
1044            tag_1_builder.append_value(tags[1]);
1045        }
1046
1047        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1048        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1049
1050        // Add field column (field_0)
1051        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1052        columns.push(Arc::new(UInt64Array::from(field_values)));
1053
1054        // Add time index column (ts)
1055        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1056        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1057
1058        // Add encoded primary key column
1059        let pk = new_primary_key(tags);
1060        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1061        for _ in 0..num_rows {
1062            pk_builder.append(&pk).unwrap();
1063        }
1064        columns.push(Arc::new(pk_builder.finish()));
1065
1066        // Add sequence column
1067        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1068
1069        // Add op_type column
1070        columns.push(Arc::new(UInt8Array::from_value(
1071            OpType::Put as u8,
1072            num_rows,
1073        )));
1074
1075        RecordBatch::try_new(flat_schema, columns).unwrap()
1076    }
1077
1078    /// Creates a FlatSource from flat format RecordBatches.
1079    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1080        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1081    }
1082
1083    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1084    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1085    fn new_record_batch_by_range_sparse(
1086        tags: &[&str],
1087        start: usize,
1088        end: usize,
1089        metadata: &Arc<RegionMetadata>,
1090    ) -> RecordBatch {
1091        assert!(end >= start);
1092        let flat_schema = to_flat_sst_arrow_schema(
1093            metadata,
1094            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1095        );
1096
1097        let num_rows = end - start;
1098        let mut columns: Vec<ArrayRef> = Vec::new();
1099
1100        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1101
1102        // Add field column (field_0)
1103        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1104        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1105
1106        // Add time index column (ts)
1107        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1108        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1109
1110        // Add encoded primary key column using sparse encoding
1111        let table_id = 1u32; // Test table ID
1112        let tsid = 100u64; // Base TSID
1113        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1114
1115        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1116        for _ in 0..num_rows {
1117            pk_builder.append(&pk).unwrap();
1118        }
1119        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1120
1121        // Add sequence column
1122        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1123
1124        // Add op_type column
1125        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1126
1127        RecordBatch::try_new(flat_schema, columns).unwrap()
1128    }
1129
1130    /// Helper function to create IndexerBuilderImpl for tests.
1131    fn create_test_indexer_builder(
1132        env: &TestEnv,
1133        object_store: ObjectStore,
1134        file_path: RegionFilePathFactory,
1135        metadata: Arc<RegionMetadata>,
1136        row_group_size: usize,
1137    ) -> IndexerBuilderImpl {
1138        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1139        let intermediate_manager = env.get_intermediate_manager();
1140
1141        IndexerBuilderImpl {
1142            build_type: IndexBuildType::Flush,
1143            metadata,
1144            row_group_size,
1145            puffin_manager,
1146            write_cache_enabled: false,
1147            intermediate_manager,
1148            index_options: IndexOptions {
1149                inverted_index: InvertedIndexOptions {
1150                    segment_row_count: 1,
1151                    ..Default::default()
1152                },
1153            },
1154            inverted_index_config: Default::default(),
1155            fulltext_index_config: Default::default(),
1156            bloom_filter_index_config: Default::default(),
1157            #[cfg(feature = "vector_index")]
1158            vector_index_config: Default::default(),
1159        }
1160    }
1161
1162    /// Helper function to write flat SST and return SstInfo.
1163    async fn write_flat_sst(
1164        object_store: ObjectStore,
1165        metadata: Arc<RegionMetadata>,
1166        indexer_builder: IndexerBuilderImpl,
1167        file_path: RegionFilePathFactory,
1168        flat_source: FlatSource,
1169        write_opts: &WriteOptions,
1170    ) -> SstInfo {
1171        let mut metrics = Metrics::new(WriteType::Flush);
1172        let mut writer = ParquetWriter::new_with_object_store(
1173            object_store,
1174            metadata,
1175            IndexConfig::default(),
1176            indexer_builder,
1177            file_path,
1178            &mut metrics,
1179        )
1180        .await;
1181
1182        writer
1183            .write_all_flat(flat_source, write_opts)
1184            .await
1185            .unwrap()
1186            .remove(0)
1187    }
1188
1189    /// Helper function to create FileHandle from SstInfo.
1190    fn create_file_handle_from_sst_info(
1191        info: &SstInfo,
1192        metadata: &Arc<RegionMetadata>,
1193    ) -> FileHandle {
1194        FileHandle::new(
1195            FileMeta {
1196                region_id: metadata.region_id,
1197                file_id: info.file_id,
1198                time_range: info.time_range,
1199                level: 0,
1200                file_size: info.file_size,
1201                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1202                available_indexes: info.index_metadata.build_available_indexes(),
1203                indexes: info.index_metadata.build_indexes(),
1204                index_file_size: info.index_metadata.file_size,
1205                index_version: 0,
1206                num_row_groups: info.num_row_groups,
1207                num_rows: info.num_rows as u64,
1208                sequence: None,
1209                partition_expr: match &metadata.partition_expr {
1210                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1211                        .expect("partition expression should be valid JSON"),
1212                    None => None,
1213                },
1214                num_series: 0,
1215            },
1216            Arc::new(NoopFilePurger),
1217        )
1218    }
1219
1220    /// Helper function to create test cache with standard settings.
1221    fn create_test_cache() -> Arc<CacheManager> {
1222        Arc::new(
1223            CacheManager::builder()
1224                .index_result_cache_size(1024 * 1024)
1225                .index_metadata_size(1024 * 1024)
1226                .index_content_page_size(1024 * 1024)
1227                .index_content_size(1024 * 1024)
1228                .puffin_metadata_size(1024 * 1024)
1229                .build(),
1230        )
1231    }
1232
1233    #[tokio::test]
1234    async fn test_write_flat_with_index() {
1235        let mut env = TestEnv::new().await;
1236        let object_store = env.init_object_store_manager();
1237        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1238        let metadata = Arc::new(sst_region_metadata());
1239        let row_group_size = 50;
1240
1241        // Create flat format RecordBatches
1242        let flat_batches = vec![
1243            new_record_batch_by_range(&["a", "d"], 0, 20),
1244            new_record_batch_by_range(&["b", "d"], 0, 20),
1245            new_record_batch_by_range(&["c", "d"], 0, 20),
1246            new_record_batch_by_range(&["c", "f"], 0, 40),
1247            new_record_batch_by_range(&["c", "h"], 100, 200),
1248        ];
1249
1250        let flat_source = new_flat_source_from_record_batches(flat_batches);
1251
1252        let write_opts = WriteOptions {
1253            row_group_size,
1254            ..Default::default()
1255        };
1256
1257        let puffin_manager = env
1258            .get_puffin_manager()
1259            .build(object_store.clone(), file_path.clone());
1260        let intermediate_manager = env.get_intermediate_manager();
1261
1262        let indexer_builder = IndexerBuilderImpl {
1263            build_type: IndexBuildType::Flush,
1264            metadata: metadata.clone(),
1265            row_group_size,
1266            puffin_manager,
1267            write_cache_enabled: false,
1268            intermediate_manager,
1269            index_options: IndexOptions {
1270                inverted_index: InvertedIndexOptions {
1271                    segment_row_count: 1,
1272                    ..Default::default()
1273                },
1274            },
1275            inverted_index_config: Default::default(),
1276            fulltext_index_config: Default::default(),
1277            bloom_filter_index_config: Default::default(),
1278            #[cfg(feature = "vector_index")]
1279            vector_index_config: Default::default(),
1280        };
1281
1282        let mut metrics = Metrics::new(WriteType::Flush);
1283        let mut writer = ParquetWriter::new_with_object_store(
1284            object_store.clone(),
1285            metadata.clone(),
1286            IndexConfig::default(),
1287            indexer_builder,
1288            file_path.clone(),
1289            &mut metrics,
1290        )
1291        .await;
1292
1293        let info = writer
1294            .write_all_flat(flat_source, &write_opts)
1295            .await
1296            .unwrap()
1297            .remove(0);
1298        assert_eq!(200, info.num_rows);
1299        assert!(info.file_size > 0);
1300        assert!(info.index_metadata.file_size > 0);
1301
1302        assert!(info.index_metadata.inverted_index.index_size > 0);
1303        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1304        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1305
1306        assert!(info.index_metadata.bloom_filter.index_size > 0);
1307        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1308        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1309
1310        assert_eq!(
1311            (
1312                Timestamp::new_millisecond(0),
1313                Timestamp::new_millisecond(199)
1314            ),
1315            info.time_range
1316        );
1317    }
1318
1319    #[tokio::test]
1320    async fn test_read_with_override_sequence() {
1321        let mut env = TestEnv::new().await;
1322        let object_store = env.init_object_store_manager();
1323        let handle = sst_file_handle(0, 1000);
1324        let file_path = FixedPathProvider {
1325            region_file_id: handle.file_id(),
1326        };
1327        let metadata = Arc::new(sst_region_metadata());
1328
1329        // Create batches with sequence 0 to trigger override functionality
1330        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1331        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1332        let source = new_source(&[batch1, batch2]);
1333
1334        let write_opts = WriteOptions {
1335            row_group_size: 50,
1336            ..Default::default()
1337        };
1338
1339        let mut metrics = Metrics::new(WriteType::Flush);
1340        let mut writer = ParquetWriter::new_with_object_store(
1341            object_store.clone(),
1342            metadata.clone(),
1343            IndexConfig::default(),
1344            NoopIndexBuilder,
1345            file_path,
1346            &mut metrics,
1347        )
1348        .await;
1349
1350        writer
1351            .write_all(source, None, &write_opts)
1352            .await
1353            .unwrap()
1354            .remove(0);
1355
1356        // Read without override sequence (should read sequence 0)
1357        let builder = ParquetReaderBuilder::new(
1358            FILE_DIR.to_string(),
1359            PathType::Bare,
1360            handle.clone(),
1361            object_store.clone(),
1362        );
1363        let mut reader = builder.build().await.unwrap();
1364        let mut normal_batches = Vec::new();
1365        while let Some(batch) = reader.next_batch().await.unwrap() {
1366            normal_batches.push(batch);
1367        }
1368
1369        // Read with override sequence using FileMeta.sequence
1370        let custom_sequence = 12345u64;
1371        let file_meta = handle.meta_ref();
1372        let mut override_file_meta = file_meta.clone();
1373        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1374        let override_handle = FileHandle::new(
1375            override_file_meta,
1376            Arc::new(crate::sst::file_purger::NoopFilePurger),
1377        );
1378
1379        let builder = ParquetReaderBuilder::new(
1380            FILE_DIR.to_string(),
1381            PathType::Bare,
1382            override_handle,
1383            object_store.clone(),
1384        );
1385        let mut reader = builder.build().await.unwrap();
1386        let mut override_batches = Vec::new();
1387        while let Some(batch) = reader.next_batch().await.unwrap() {
1388            override_batches.push(batch);
1389        }
1390
1391        // Compare the results
1392        assert_eq!(normal_batches.len(), override_batches.len());
1393        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1394            // Create expected batch with override sequence
1395            let expected_batch = {
1396                let num_rows = normal.num_rows();
1397                let mut builder = BatchBuilder::from(normal);
1398                builder
1399                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1400                    .unwrap();
1401
1402                builder.build().unwrap()
1403            };
1404
1405            // Override batch should match expected batch
1406            assert_eq!(*override_batch, expected_batch);
1407        }
1408    }
1409
1410    #[tokio::test]
1411    async fn test_write_flat_read_with_inverted_index() {
1412        let mut env = TestEnv::new().await;
1413        let object_store = env.init_object_store_manager();
1414        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1415        let metadata = Arc::new(sst_region_metadata());
1416        let row_group_size = 100;
1417
1418        // Create flat format RecordBatches with non-overlapping timestamp ranges
1419        // Each batch becomes one row group (row_group_size = 100)
1420        // Data: ts tag_0 tag_1
1421        // RG 0:   0-50  [a, d]
1422        // RG 0:  50-100 [b, d]
1423        // RG 1: 100-150 [c, d]
1424        // RG 1: 150-200 [c, f]
1425        let flat_batches = vec![
1426            new_record_batch_by_range(&["a", "d"], 0, 50),
1427            new_record_batch_by_range(&["b", "d"], 50, 100),
1428            new_record_batch_by_range(&["c", "d"], 100, 150),
1429            new_record_batch_by_range(&["c", "f"], 150, 200),
1430        ];
1431
1432        let flat_source = new_flat_source_from_record_batches(flat_batches);
1433
1434        let write_opts = WriteOptions {
1435            row_group_size,
1436            ..Default::default()
1437        };
1438
1439        let indexer_builder = create_test_indexer_builder(
1440            &env,
1441            object_store.clone(),
1442            file_path.clone(),
1443            metadata.clone(),
1444            row_group_size,
1445        );
1446
1447        let info = write_flat_sst(
1448            object_store.clone(),
1449            metadata.clone(),
1450            indexer_builder,
1451            file_path.clone(),
1452            flat_source,
1453            &write_opts,
1454        )
1455        .await;
1456        assert_eq!(200, info.num_rows);
1457        assert!(info.file_size > 0);
1458        assert!(info.index_metadata.file_size > 0);
1459
1460        let handle = create_file_handle_from_sst_info(&info, &metadata);
1461
1462        let cache = create_test_cache();
1463
1464        // Test 1: Filter by tag_0 = "b"
1465        // Expected: Only rows with tag_0="b"
1466        let preds = vec![col("tag_0").eq(lit("b"))];
1467        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1468            FILE_DIR.to_string(),
1469            PathType::Bare,
1470            object_store.clone(),
1471            &metadata,
1472            HashSet::from_iter([0]),
1473            env.get_puffin_manager(),
1474        )
1475        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1476        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1477        .build(&preds)
1478        .unwrap()
1479        .map(Arc::new);
1480
1481        let builder = ParquetReaderBuilder::new(
1482            FILE_DIR.to_string(),
1483            PathType::Bare,
1484            handle.clone(),
1485            object_store.clone(),
1486        )
1487        .flat_format(true)
1488        .predicate(Some(Predicate::new(preds)))
1489        .inverted_index_appliers([inverted_index_applier.clone(), None])
1490        .cache(CacheStrategy::EnableAll(cache.clone()));
1491
1492        let mut metrics = ReaderMetrics::default();
1493        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1494
1495        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1496        assert_eq!(selection.row_group_count(), 1);
1497        assert_eq!(50, selection.get(0).unwrap().row_count());
1498
1499        // Verify filtering metrics
1500        assert_eq!(metrics.filter_metrics.rg_total, 2);
1501        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1502        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1503        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1504    }
1505
1506    #[tokio::test]
1507    async fn test_write_flat_read_with_bloom_filter() {
1508        let mut env = TestEnv::new().await;
1509        let object_store = env.init_object_store_manager();
1510        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1511        let metadata = Arc::new(sst_region_metadata());
1512        let row_group_size = 100;
1513
1514        // Create flat format RecordBatches with non-overlapping timestamp ranges
1515        // Each batch becomes one row group (row_group_size = 100)
1516        // Data: ts tag_0 tag_1
1517        // RG 0:   0-50  [a, d]
1518        // RG 0:  50-100 [b, e]
1519        // RG 1: 100-150 [c, d]
1520        // RG 1: 150-200 [c, f]
1521        let flat_batches = vec![
1522            new_record_batch_by_range(&["a", "d"], 0, 50),
1523            new_record_batch_by_range(&["b", "e"], 50, 100),
1524            new_record_batch_by_range(&["c", "d"], 100, 150),
1525            new_record_batch_by_range(&["c", "f"], 150, 200),
1526        ];
1527
1528        let flat_source = new_flat_source_from_record_batches(flat_batches);
1529
1530        let write_opts = WriteOptions {
1531            row_group_size,
1532            ..Default::default()
1533        };
1534
1535        let indexer_builder = create_test_indexer_builder(
1536            &env,
1537            object_store.clone(),
1538            file_path.clone(),
1539            metadata.clone(),
1540            row_group_size,
1541        );
1542
1543        let info = write_flat_sst(
1544            object_store.clone(),
1545            metadata.clone(),
1546            indexer_builder,
1547            file_path.clone(),
1548            flat_source,
1549            &write_opts,
1550        )
1551        .await;
1552        assert_eq!(200, info.num_rows);
1553        assert!(info.file_size > 0);
1554        assert!(info.index_metadata.file_size > 0);
1555
1556        let handle = create_file_handle_from_sst_info(&info, &metadata);
1557
1558        let cache = create_test_cache();
1559
1560        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1561        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1562        let preds = vec![
1563            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1564            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1565            col("tag_1").eq(lit("d")),
1566        ];
1567        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1568            FILE_DIR.to_string(),
1569            PathType::Bare,
1570            object_store.clone(),
1571            &metadata,
1572            env.get_puffin_manager(),
1573        )
1574        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1575        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1576        .build(&preds)
1577        .unwrap()
1578        .map(Arc::new);
1579
1580        let builder = ParquetReaderBuilder::new(
1581            FILE_DIR.to_string(),
1582            PathType::Bare,
1583            handle.clone(),
1584            object_store.clone(),
1585        )
1586        .flat_format(true)
1587        .predicate(Some(Predicate::new(preds)))
1588        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1589        .cache(CacheStrategy::EnableAll(cache.clone()));
1590
1591        let mut metrics = ReaderMetrics::default();
1592        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1593
1594        // Verify selection contains RG 0 and RG 1
1595        assert_eq!(selection.row_group_count(), 2);
1596        assert_eq!(50, selection.get(0).unwrap().row_count());
1597        assert_eq!(50, selection.get(1).unwrap().row_count());
1598
1599        // Verify filtering metrics
1600        assert_eq!(metrics.filter_metrics.rg_total, 2);
1601        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1602        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1603        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1604    }
1605
1606    #[tokio::test]
1607    async fn test_write_flat_read_with_inverted_index_sparse() {
1608        common_telemetry::init_default_ut_logging();
1609
1610        let mut env = TestEnv::new().await;
1611        let object_store = env.init_object_store_manager();
1612        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1613        let metadata = Arc::new(sst_region_metadata_with_encoding(
1614            PrimaryKeyEncoding::Sparse,
1615        ));
1616        let row_group_size = 100;
1617
1618        // Create flat format RecordBatches with non-overlapping timestamp ranges
1619        // Each batch becomes one row group (row_group_size = 100)
1620        // Data: ts tag_0 tag_1
1621        // RG 0:   0-50  [a, d]
1622        // RG 0:  50-100 [b, d]
1623        // RG 1: 100-150 [c, d]
1624        // RG 1: 150-200 [c, f]
1625        let flat_batches = vec![
1626            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1627            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1628            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1629            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1630        ];
1631
1632        let flat_source = new_flat_source_from_record_batches(flat_batches);
1633
1634        let write_opts = WriteOptions {
1635            row_group_size,
1636            ..Default::default()
1637        };
1638
1639        let indexer_builder = create_test_indexer_builder(
1640            &env,
1641            object_store.clone(),
1642            file_path.clone(),
1643            metadata.clone(),
1644            row_group_size,
1645        );
1646
1647        let info = write_flat_sst(
1648            object_store.clone(),
1649            metadata.clone(),
1650            indexer_builder,
1651            file_path.clone(),
1652            flat_source,
1653            &write_opts,
1654        )
1655        .await;
1656        assert_eq!(200, info.num_rows);
1657        assert!(info.file_size > 0);
1658        assert!(info.index_metadata.file_size > 0);
1659
1660        let handle = create_file_handle_from_sst_info(&info, &metadata);
1661
1662        let cache = create_test_cache();
1663
1664        // Test 1: Filter by tag_0 = "b"
1665        // Expected: Only rows with tag_0="b"
1666        let preds = vec![col("tag_0").eq(lit("b"))];
1667        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1668            FILE_DIR.to_string(),
1669            PathType::Bare,
1670            object_store.clone(),
1671            &metadata,
1672            HashSet::from_iter([0]),
1673            env.get_puffin_manager(),
1674        )
1675        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1676        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1677        .build(&preds)
1678        .unwrap()
1679        .map(Arc::new);
1680
1681        let builder = ParquetReaderBuilder::new(
1682            FILE_DIR.to_string(),
1683            PathType::Bare,
1684            handle.clone(),
1685            object_store.clone(),
1686        )
1687        .flat_format(true)
1688        .predicate(Some(Predicate::new(preds)))
1689        .inverted_index_appliers([inverted_index_applier.clone(), None])
1690        .cache(CacheStrategy::EnableAll(cache.clone()));
1691
1692        let mut metrics = ReaderMetrics::default();
1693        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1694
1695        // RG 0 has 50 matching rows (tag_0="b")
1696        assert_eq!(selection.row_group_count(), 1);
1697        assert_eq!(50, selection.get(0).unwrap().row_count());
1698
1699        // Verify filtering metrics
1700        // Note: With sparse encoding, tag columns aren't stored separately,
1701        // so minmax filtering on tags doesn't work (only inverted index)
1702        assert_eq!(metrics.filter_metrics.rg_total, 2);
1703        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1704        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1705        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1706    }
1707
1708    #[tokio::test]
1709    async fn test_write_flat_read_with_bloom_filter_sparse() {
1710        let mut env = TestEnv::new().await;
1711        let object_store = env.init_object_store_manager();
1712        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1713        let metadata = Arc::new(sst_region_metadata_with_encoding(
1714            PrimaryKeyEncoding::Sparse,
1715        ));
1716        let row_group_size = 100;
1717
1718        // Create flat format RecordBatches with non-overlapping timestamp ranges
1719        // Each batch becomes one row group (row_group_size = 100)
1720        // Data: ts tag_0 tag_1
1721        // RG 0:   0-50  [a, d]
1722        // RG 0:  50-100 [b, e]
1723        // RG 1: 100-150 [c, d]
1724        // RG 1: 150-200 [c, f]
1725        let flat_batches = vec![
1726            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1727            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1728            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1729            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1730        ];
1731
1732        let flat_source = new_flat_source_from_record_batches(flat_batches);
1733
1734        let write_opts = WriteOptions {
1735            row_group_size,
1736            ..Default::default()
1737        };
1738
1739        let indexer_builder = create_test_indexer_builder(
1740            &env,
1741            object_store.clone(),
1742            file_path.clone(),
1743            metadata.clone(),
1744            row_group_size,
1745        );
1746
1747        let info = write_flat_sst(
1748            object_store.clone(),
1749            metadata.clone(),
1750            indexer_builder,
1751            file_path.clone(),
1752            flat_source,
1753            &write_opts,
1754        )
1755        .await;
1756        assert_eq!(200, info.num_rows);
1757        assert!(info.file_size > 0);
1758        assert!(info.index_metadata.file_size > 0);
1759
1760        let handle = create_file_handle_from_sst_info(&info, &metadata);
1761
1762        let cache = create_test_cache();
1763
1764        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1765        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1766        let preds = vec![
1767            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1768            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1769            col("tag_1").eq(lit("d")),
1770        ];
1771        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1772            FILE_DIR.to_string(),
1773            PathType::Bare,
1774            object_store.clone(),
1775            &metadata,
1776            env.get_puffin_manager(),
1777        )
1778        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1779        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1780        .build(&preds)
1781        .unwrap()
1782        .map(Arc::new);
1783
1784        let builder = ParquetReaderBuilder::new(
1785            FILE_DIR.to_string(),
1786            PathType::Bare,
1787            handle.clone(),
1788            object_store.clone(),
1789        )
1790        .flat_format(true)
1791        .predicate(Some(Predicate::new(preds)))
1792        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1793        .cache(CacheStrategy::EnableAll(cache.clone()));
1794
1795        let mut metrics = ReaderMetrics::default();
1796        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1797
1798        // Verify selection contains RG 0 and RG 1
1799        assert_eq!(selection.row_group_count(), 2);
1800        assert_eq!(50, selection.get(0).unwrap().row_count());
1801        assert_eq!(50, selection.get(1).unwrap().row_count());
1802
1803        // Verify filtering metrics
1804        assert_eq!(metrics.filter_metrics.rg_total, 2);
1805        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1806        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1807        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1808    }
1809
1810    /// Creates region metadata for testing fulltext indexes.
1811    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
1812    fn fulltext_region_metadata() -> RegionMetadata {
1813        let mut builder = RegionMetadataBuilder::new(REGION_ID);
1814        builder
1815            .push_column_metadata(ColumnMetadata {
1816                column_schema: ColumnSchema::new(
1817                    "tag_0".to_string(),
1818                    ConcreteDataType::string_datatype(),
1819                    true,
1820                ),
1821                semantic_type: SemanticType::Tag,
1822                column_id: 0,
1823            })
1824            .push_column_metadata(ColumnMetadata {
1825                column_schema: ColumnSchema::new(
1826                    "text_bloom".to_string(),
1827                    ConcreteDataType::string_datatype(),
1828                    true,
1829                )
1830                .with_fulltext_options(FulltextOptions {
1831                    enable: true,
1832                    analyzer: FulltextAnalyzer::English,
1833                    case_sensitive: false,
1834                    backend: FulltextBackend::Bloom,
1835                    granularity: 1,
1836                    false_positive_rate_in_10000: 50,
1837                })
1838                .unwrap(),
1839                semantic_type: SemanticType::Field,
1840                column_id: 1,
1841            })
1842            .push_column_metadata(ColumnMetadata {
1843                column_schema: ColumnSchema::new(
1844                    "text_tantivy".to_string(),
1845                    ConcreteDataType::string_datatype(),
1846                    true,
1847                )
1848                .with_fulltext_options(FulltextOptions {
1849                    enable: true,
1850                    analyzer: FulltextAnalyzer::English,
1851                    case_sensitive: false,
1852                    backend: FulltextBackend::Tantivy,
1853                    granularity: 1,
1854                    false_positive_rate_in_10000: 50,
1855                })
1856                .unwrap(),
1857                semantic_type: SemanticType::Field,
1858                column_id: 2,
1859            })
1860            .push_column_metadata(ColumnMetadata {
1861                column_schema: ColumnSchema::new(
1862                    "field_0".to_string(),
1863                    ConcreteDataType::uint64_datatype(),
1864                    true,
1865                ),
1866                semantic_type: SemanticType::Field,
1867                column_id: 3,
1868            })
1869            .push_column_metadata(ColumnMetadata {
1870                column_schema: ColumnSchema::new(
1871                    "ts".to_string(),
1872                    ConcreteDataType::timestamp_millisecond_datatype(),
1873                    false,
1874                ),
1875                semantic_type: SemanticType::Timestamp,
1876                column_id: 4,
1877            })
1878            .primary_key(vec![0]);
1879        builder.build().unwrap()
1880    }
1881
1882    /// Creates a flat format RecordBatch with string fields for fulltext testing.
1883    fn new_fulltext_record_batch_by_range(
1884        tag: &str,
1885        text_bloom: &str,
1886        text_tantivy: &str,
1887        start: usize,
1888        end: usize,
1889    ) -> RecordBatch {
1890        assert!(end >= start);
1891        let metadata = Arc::new(fulltext_region_metadata());
1892        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1893
1894        let num_rows = end - start;
1895        let mut columns = Vec::new();
1896
1897        // Add primary key column (tag_0) as dictionary array
1898        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1899        for _ in 0..num_rows {
1900            tag_builder.append_value(tag);
1901        }
1902        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1903
1904        // Add text_bloom field (fulltext with bloom backend)
1905        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1906        columns.push(Arc::new(StringArray::from(text_bloom_values)));
1907
1908        // Add text_tantivy field (fulltext with tantivy backend)
1909        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1910        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1911
1912        // Add field column (field_0)
1913        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1914        columns.push(Arc::new(UInt64Array::from(field_values)));
1915
1916        // Add time index column (ts)
1917        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1918        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1919
1920        // Add encoded primary key column
1921        let pk = new_primary_key(&[tag]);
1922        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1923        for _ in 0..num_rows {
1924            pk_builder.append(&pk).unwrap();
1925        }
1926        columns.push(Arc::new(pk_builder.finish()));
1927
1928        // Add sequence column
1929        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1930
1931        // Add op_type column
1932        columns.push(Arc::new(UInt8Array::from_value(
1933            OpType::Put as u8,
1934            num_rows,
1935        )));
1936
1937        RecordBatch::try_new(flat_schema, columns).unwrap()
1938    }
1939
1940    #[tokio::test]
1941    async fn test_write_flat_read_with_fulltext_index() {
1942        let mut env = TestEnv::new().await;
1943        let object_store = env.init_object_store_manager();
1944        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1945        let metadata = Arc::new(fulltext_region_metadata());
1946        let row_group_size = 50;
1947
1948        // Create flat format RecordBatches with different text content
1949        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
1950        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
1951        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
1952        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
1953        let flat_batches = vec![
1954            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
1955            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
1956            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
1957            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
1958        ];
1959
1960        let flat_source = new_flat_source_from_record_batches(flat_batches);
1961
1962        let write_opts = WriteOptions {
1963            row_group_size,
1964            ..Default::default()
1965        };
1966
1967        let indexer_builder = create_test_indexer_builder(
1968            &env,
1969            object_store.clone(),
1970            file_path.clone(),
1971            metadata.clone(),
1972            row_group_size,
1973        );
1974
1975        let mut info = write_flat_sst(
1976            object_store.clone(),
1977            metadata.clone(),
1978            indexer_builder,
1979            file_path.clone(),
1980            flat_source,
1981            &write_opts,
1982        )
1983        .await;
1984        assert_eq!(200, info.num_rows);
1985        assert!(info.file_size > 0);
1986        assert!(info.index_metadata.file_size > 0);
1987
1988        // Verify fulltext indexes were created
1989        assert!(info.index_metadata.fulltext_index.index_size > 0);
1990        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
1991        // text_bloom (column_id 1) and text_tantivy (column_id 2)
1992        info.index_metadata.fulltext_index.columns.sort_unstable();
1993        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
1994
1995        assert_eq!(
1996            (
1997                Timestamp::new_millisecond(0),
1998                Timestamp::new_millisecond(199)
1999            ),
2000            info.time_range
2001        );
2002
2003        let handle = create_file_handle_from_sst_info(&info, &metadata);
2004
2005        let cache = create_test_cache();
2006
2007        // Helper functions to create fulltext function expressions
2008        let matches_func = || {
2009            Arc::new(
2010                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2011                    .provide(Default::default()),
2012            )
2013        };
2014
2015        let matches_term_func = || {
2016            Arc::new(
2017                ScalarFunctionFactory::from(
2018                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2019                )
2020                .provide(Default::default()),
2021            )
2022        };
2023
2024        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2025        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2026        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2027            args: vec![col("text_bloom"), "hello".lit()],
2028            func: matches_term_func(),
2029        })];
2030
2031        let fulltext_applier = FulltextIndexApplierBuilder::new(
2032            FILE_DIR.to_string(),
2033            PathType::Bare,
2034            object_store.clone(),
2035            env.get_puffin_manager(),
2036            &metadata,
2037        )
2038        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2039        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2040        .build(&preds)
2041        .unwrap()
2042        .map(Arc::new);
2043
2044        let builder = ParquetReaderBuilder::new(
2045            FILE_DIR.to_string(),
2046            PathType::Bare,
2047            handle.clone(),
2048            object_store.clone(),
2049        )
2050        .flat_format(true)
2051        .predicate(Some(Predicate::new(preds)))
2052        .fulltext_index_appliers([None, fulltext_applier.clone()])
2053        .cache(CacheStrategy::EnableAll(cache.clone()));
2054
2055        let mut metrics = ReaderMetrics::default();
2056        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2057
2058        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2059        assert_eq!(selection.row_group_count(), 2);
2060        assert_eq!(50, selection.get(0).unwrap().row_count());
2061        assert_eq!(50, selection.get(1).unwrap().row_count());
2062
2063        // Verify filtering metrics
2064        assert_eq!(metrics.filter_metrics.rg_total, 4);
2065        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2066        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2067        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2068
2069        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2070        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2071        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2072            args: vec![col("text_tantivy"), "lazy".lit()],
2073            func: matches_func(),
2074        })];
2075
2076        let fulltext_applier = FulltextIndexApplierBuilder::new(
2077            FILE_DIR.to_string(),
2078            PathType::Bare,
2079            object_store.clone(),
2080            env.get_puffin_manager(),
2081            &metadata,
2082        )
2083        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2084        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2085        .build(&preds)
2086        .unwrap()
2087        .map(Arc::new);
2088
2089        let builder = ParquetReaderBuilder::new(
2090            FILE_DIR.to_string(),
2091            PathType::Bare,
2092            handle.clone(),
2093            object_store.clone(),
2094        )
2095        .flat_format(true)
2096        .predicate(Some(Predicate::new(preds)))
2097        .fulltext_index_appliers([None, fulltext_applier.clone()])
2098        .cache(CacheStrategy::EnableAll(cache.clone()));
2099
2100        let mut metrics = ReaderMetrics::default();
2101        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2102
2103        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2104        assert_eq!(selection.row_group_count(), 2);
2105        assert_eq!(50, selection.get(2).unwrap().row_count());
2106        assert_eq!(50, selection.get(3).unwrap().row_count());
2107
2108        // Verify filtering metrics
2109        assert_eq!(metrics.filter_metrics.rg_total, 4);
2110        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2111        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2112        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2113    }
2114}