mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Maximum uncompressed row group size in bytes. 0 if unknown.
80    pub max_row_group_uncompressed_size: u64,
81    /// Number of rows.
82    pub num_rows: usize,
83    /// Number of row groups
84    pub num_row_groups: u64,
85    /// File Meta Data
86    pub file_metadata: Option<Arc<ParquetMetaData>>,
87    /// Index Meta Data
88    pub index_metadata: IndexOutput,
89    /// Number of series
90    pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95    use std::collections::HashSet;
96    use std::sync::Arc;
97
98    use api::v1::{OpType, SemanticType};
99    use common_function::function::FunctionRef;
100    use common_function::function_factory::ScalarFunctionFactory;
101    use common_function::scalars::matches::MatchesFunction;
102    use common_function::scalars::matches_term::MatchesTermFunction;
103    use common_time::Timestamp;
104    use datafusion_common::{Column, ScalarValue};
105    use datafusion_expr::expr::ScalarFunction;
106    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107    use datatypes::arrow;
108    use datatypes::arrow::array::{
109        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110        TimestampMillisecondArray, UInt8Array, UInt64Array,
111    };
112    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113    use datatypes::prelude::ConcreteDataType;
114    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
115    use object_store::ObjectStore;
116    use parquet::arrow::AsyncArrowWriter;
117    use parquet::basic::{Compression, Encoding, ZstdLevel};
118    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
119    use parquet::file::properties::WriterProperties;
120    use store_api::codec::PrimaryKeyEncoding;
121    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
122    use store_api::region_request::PathType;
123    use store_api::storage::{ColumnSchema, RegionId};
124    use table::predicate::Predicate;
125    use tokio_util::compat::FuturesAsyncWriteCompatExt;
126
127    use super::*;
128    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
129    use crate::cache::test_util::assert_parquet_metadata_equal;
130    use crate::cache::{CacheManager, CacheStrategy, PageKey};
131    use crate::config::IndexConfig;
132    use crate::read::{BatchBuilder, BatchReader, FlatSource};
133    use crate::region::options::{IndexOptions, InvertedIndexOptions};
134    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
135    use crate::sst::file_purger::NoopFilePurger;
136    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
137    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
138    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
139    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
140    use crate::sst::parquet::format::PrimaryKeyWriteFormat;
141    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
142    use crate::sst::parquet::writer::ParquetWriter;
143    use crate::sst::{
144        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
145    };
146    use crate::test_util::sst_util::{
147        build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary,
148        new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key,
149        sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
150        sst_region_metadata_with_encoding,
151    };
152    use crate::test_util::{TestEnv, check_reader_result};
153
154    const FILE_DIR: &str = "/";
155    const REGION_ID: RegionId = RegionId::new(0, 0);
156
157    #[derive(Clone)]
158    struct FixedPathProvider {
159        region_file_id: RegionFileId,
160    }
161
162    impl FilePathProvider for FixedPathProvider {
163        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
164            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
165        }
166
167        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
168            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
169        }
170
171        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
172            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
173        }
174    }
175
176    struct NoopIndexBuilder;
177
178    #[async_trait::async_trait]
179    impl IndexerBuilder for NoopIndexBuilder {
180        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
181            Indexer::default()
182        }
183    }
184
185    #[tokio::test]
186    async fn test_write_read() {
187        let mut env = TestEnv::new().await;
188        let object_store = env.init_object_store_manager();
189        let handle = sst_file_handle(0, 1000);
190        let file_path = FixedPathProvider {
191            region_file_id: handle.file_id(),
192        };
193        let metadata = Arc::new(sst_region_metadata());
194        let source = new_source(&[
195            new_batch_by_range(&["a", "d"], 0, 60),
196            new_batch_by_range(&["b", "f"], 0, 40),
197            new_batch_by_range(&["b", "h"], 100, 200),
198        ]);
199        // Use a small row group size for test.
200        let write_opts = WriteOptions {
201            row_group_size: 50,
202            ..Default::default()
203        };
204
205        let mut metrics = Metrics::new(WriteType::Flush);
206        let mut writer = ParquetWriter::new_with_object_store(
207            object_store.clone(),
208            metadata.clone(),
209            IndexConfig::default(),
210            NoopIndexBuilder,
211            file_path,
212            &mut metrics,
213        )
214        .await;
215
216        let info = writer
217            .write_all(source, None, &write_opts)
218            .await
219            .unwrap()
220            .remove(0);
221        assert_eq!(200, info.num_rows);
222        assert!(info.file_size > 0);
223        assert_eq!(
224            (
225                Timestamp::new_millisecond(0),
226                Timestamp::new_millisecond(199)
227            ),
228            info.time_range
229        );
230
231        let builder = ParquetReaderBuilder::new(
232            FILE_DIR.to_string(),
233            PathType::Bare,
234            handle.clone(),
235            object_store,
236        );
237        let mut reader = builder.build().await.unwrap();
238        check_reader_result(
239            &mut reader,
240            &[
241                new_batch_by_range(&["a", "d"], 0, 50),
242                new_batch_by_range(&["a", "d"], 50, 60),
243                new_batch_by_range(&["b", "f"], 0, 40),
244                new_batch_by_range(&["b", "h"], 100, 150),
245                new_batch_by_range(&["b", "h"], 150, 200),
246            ],
247        )
248        .await;
249    }
250
251    #[tokio::test]
252    async fn test_read_with_cache() {
253        let mut env = TestEnv::new().await;
254        let object_store = env.init_object_store_manager();
255        let handle = sst_file_handle(0, 1000);
256        let metadata = Arc::new(sst_region_metadata());
257        let source = new_source(&[
258            new_batch_by_range(&["a", "d"], 0, 60),
259            new_batch_by_range(&["b", "f"], 0, 40),
260            new_batch_by_range(&["b", "h"], 100, 200),
261        ]);
262        // Use a small row group size for test.
263        let write_opts = WriteOptions {
264            row_group_size: 50,
265            ..Default::default()
266        };
267        // Prepare data.
268        let mut metrics = Metrics::new(WriteType::Flush);
269        let mut writer = ParquetWriter::new_with_object_store(
270            object_store.clone(),
271            metadata.clone(),
272            IndexConfig::default(),
273            NoopIndexBuilder,
274            FixedPathProvider {
275                region_file_id: handle.file_id(),
276            },
277            &mut metrics,
278        )
279        .await;
280
281        let sst_info = writer
282            .write_all(source, None, &write_opts)
283            .await
284            .unwrap()
285            .remove(0);
286
287        // Enable page cache.
288        let cache = CacheStrategy::EnableAll(Arc::new(
289            CacheManager::builder()
290                .page_cache_size(64 * 1024 * 1024)
291                .build(),
292        ));
293        let builder = ParquetReaderBuilder::new(
294            FILE_DIR.to_string(),
295            PathType::Bare,
296            handle.clone(),
297            object_store,
298        )
299        .cache(cache.clone());
300        for _ in 0..3 {
301            let mut reader = builder.build().await.unwrap();
302            check_reader_result(
303                &mut reader,
304                &[
305                    new_batch_by_range(&["a", "d"], 0, 50),
306                    new_batch_by_range(&["a", "d"], 50, 60),
307                    new_batch_by_range(&["b", "f"], 0, 40),
308                    new_batch_by_range(&["b", "h"], 100, 150),
309                    new_batch_by_range(&["b", "h"], 150, 200),
310                ],
311            )
312            .await;
313        }
314
315        let parquet_meta = sst_info.file_metadata.unwrap();
316        let get_ranges = |row_group_idx: usize| {
317            let row_group = parquet_meta.row_group(row_group_idx);
318            let mut ranges = Vec::with_capacity(row_group.num_columns());
319            for i in 0..row_group.num_columns() {
320                let (start, length) = row_group.column(i).byte_range();
321                ranges.push(start..start + length);
322            }
323
324            ranges
325        };
326
327        // Cache 4 row groups.
328        for i in 0..4 {
329            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
330            assert!(cache.get_pages(&page_key).is_some());
331        }
332        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
333        assert!(cache.get_pages(&page_key).is_none());
334    }
335
336    #[tokio::test]
337    async fn test_parquet_metadata_eq() {
338        // create test env
339        let mut env = crate::test_util::TestEnv::new().await;
340        let object_store = env.init_object_store_manager();
341        let handle = sst_file_handle(0, 1000);
342        let metadata = Arc::new(sst_region_metadata());
343        let source = new_source(&[
344            new_batch_by_range(&["a", "d"], 0, 60),
345            new_batch_by_range(&["b", "f"], 0, 40),
346            new_batch_by_range(&["b", "h"], 100, 200),
347        ]);
348        let write_opts = WriteOptions {
349            row_group_size: 50,
350            ..Default::default()
351        };
352
353        // write the sst file and get sst info
354        // sst info contains the parquet metadata, which is converted from FileMetaData
355        let mut metrics = Metrics::new(WriteType::Flush);
356        let mut writer = ParquetWriter::new_with_object_store(
357            object_store.clone(),
358            metadata.clone(),
359            IndexConfig::default(),
360            NoopIndexBuilder,
361            FixedPathProvider {
362                region_file_id: handle.file_id(),
363            },
364            &mut metrics,
365        )
366        .await;
367
368        let sst_info = writer
369            .write_all(source, None, &write_opts)
370            .await
371            .unwrap()
372            .remove(0);
373        let writer_metadata = sst_info.file_metadata.unwrap();
374
375        // read the sst file metadata
376        let builder = ParquetReaderBuilder::new(
377            FILE_DIR.to_string(),
378            PathType::Bare,
379            handle.clone(),
380            object_store,
381        )
382        .page_index_policy(PageIndexPolicy::Optional);
383        let reader = builder.build().await.unwrap();
384        let reader_metadata = reader.parquet_metadata();
385
386        assert_parquet_metadata_equal(writer_metadata, reader_metadata);
387    }
388
389    #[tokio::test]
390    async fn test_read_with_tag_filter() {
391        let mut env = TestEnv::new().await;
392        let object_store = env.init_object_store_manager();
393        let handle = sst_file_handle(0, 1000);
394        let metadata = Arc::new(sst_region_metadata());
395        let source = new_source(&[
396            new_batch_by_range(&["a", "d"], 0, 60),
397            new_batch_by_range(&["b", "f"], 0, 40),
398            new_batch_by_range(&["b", "h"], 100, 200),
399        ]);
400        // Use a small row group size for test.
401        let write_opts = WriteOptions {
402            row_group_size: 50,
403            ..Default::default()
404        };
405        // Prepare data.
406        let mut metrics = Metrics::new(WriteType::Flush);
407        let mut writer = ParquetWriter::new_with_object_store(
408            object_store.clone(),
409            metadata.clone(),
410            IndexConfig::default(),
411            NoopIndexBuilder,
412            FixedPathProvider {
413                region_file_id: handle.file_id(),
414            },
415            &mut metrics,
416        )
417        .await;
418        writer
419            .write_all(source, None, &write_opts)
420            .await
421            .unwrap()
422            .remove(0);
423
424        // Predicate
425        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
426            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
427            op: Operator::Eq,
428            right: Box::new("a".lit()),
429        })]));
430
431        let builder = ParquetReaderBuilder::new(
432            FILE_DIR.to_string(),
433            PathType::Bare,
434            handle.clone(),
435            object_store,
436        )
437        .predicate(predicate);
438        let mut reader = builder.build().await.unwrap();
439        check_reader_result(
440            &mut reader,
441            &[
442                new_batch_by_range(&["a", "d"], 0, 50),
443                new_batch_by_range(&["a", "d"], 50, 60),
444            ],
445        )
446        .await;
447    }
448
449    #[tokio::test]
450    async fn test_read_empty_batch() {
451        let mut env = TestEnv::new().await;
452        let object_store = env.init_object_store_manager();
453        let handle = sst_file_handle(0, 1000);
454        let metadata = Arc::new(sst_region_metadata());
455        let source = new_source(&[
456            new_batch_by_range(&["a", "z"], 0, 0),
457            new_batch_by_range(&["a", "z"], 100, 100),
458            new_batch_by_range(&["a", "z"], 200, 230),
459        ]);
460        // Use a small row group size for test.
461        let write_opts = WriteOptions {
462            row_group_size: 50,
463            ..Default::default()
464        };
465        // Prepare data.
466        let mut metrics = Metrics::new(WriteType::Flush);
467        let mut writer = ParquetWriter::new_with_object_store(
468            object_store.clone(),
469            metadata.clone(),
470            IndexConfig::default(),
471            NoopIndexBuilder,
472            FixedPathProvider {
473                region_file_id: handle.file_id(),
474            },
475            &mut metrics,
476        )
477        .await;
478        writer
479            .write_all(source, None, &write_opts)
480            .await
481            .unwrap()
482            .remove(0);
483
484        let builder = ParquetReaderBuilder::new(
485            FILE_DIR.to_string(),
486            PathType::Bare,
487            handle.clone(),
488            object_store,
489        );
490        let mut reader = builder.build().await.unwrap();
491        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
492    }
493
494    #[tokio::test]
495    async fn test_read_with_field_filter() {
496        let mut env = TestEnv::new().await;
497        let object_store = env.init_object_store_manager();
498        let handle = sst_file_handle(0, 1000);
499        let metadata = Arc::new(sst_region_metadata());
500        let source = new_source(&[
501            new_batch_by_range(&["a", "d"], 0, 60),
502            new_batch_by_range(&["b", "f"], 0, 40),
503            new_batch_by_range(&["b", "h"], 100, 200),
504        ]);
505        // Use a small row group size for test.
506        let write_opts = WriteOptions {
507            row_group_size: 50,
508            ..Default::default()
509        };
510        // Prepare data.
511        let mut metrics = Metrics::new(WriteType::Flush);
512        let mut writer = ParquetWriter::new_with_object_store(
513            object_store.clone(),
514            metadata.clone(),
515            IndexConfig::default(),
516            NoopIndexBuilder,
517            FixedPathProvider {
518                region_file_id: handle.file_id(),
519            },
520            &mut metrics,
521        )
522        .await;
523
524        writer
525            .write_all(source, None, &write_opts)
526            .await
527            .unwrap()
528            .remove(0);
529
530        // Predicate
531        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
532            left: Box::new(Expr::Column(Column::from_name("field_0"))),
533            op: Operator::GtEq,
534            right: Box::new(150u64.lit()),
535        })]));
536
537        let builder = ParquetReaderBuilder::new(
538            FILE_DIR.to_string(),
539            PathType::Bare,
540            handle.clone(),
541            object_store,
542        )
543        .predicate(predicate);
544        let mut reader = builder.build().await.unwrap();
545        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
546    }
547
548    #[tokio::test]
549    async fn test_read_large_binary() {
550        let mut env = TestEnv::new().await;
551        let object_store = env.init_object_store_manager();
552        let handle = sst_file_handle(0, 1000);
553        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
554
555        let write_opts = WriteOptions {
556            row_group_size: 50,
557            ..Default::default()
558        };
559
560        let metadata = build_test_binary_test_region_metadata();
561        let json = metadata.to_json().unwrap();
562        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
563
564        let props_builder = WriterProperties::builder()
565            .set_key_value_metadata(Some(vec![key_value_meta]))
566            .set_compression(Compression::ZSTD(ZstdLevel::default()))
567            .set_encoding(Encoding::PLAIN)
568            .set_max_row_group_size(write_opts.row_group_size);
569
570        let writer_props = props_builder.build();
571
572        let write_format = PrimaryKeyWriteFormat::new(metadata);
573        let fields: Vec<_> = write_format
574            .arrow_schema()
575            .fields()
576            .into_iter()
577            .map(|field| {
578                let data_type = field.data_type().clone();
579                if data_type == DataType::Binary {
580                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
581                } else {
582                    Field::new(field.name(), data_type, field.is_nullable())
583                }
584            })
585            .collect();
586
587        let arrow_schema = Arc::new(Schema::new(fields));
588
589        // Ensures field_0 has LargeBinary type.
590        assert_eq!(
591            &DataType::LargeBinary,
592            arrow_schema.field_with_name("field_0").unwrap().data_type()
593        );
594        let mut writer = AsyncArrowWriter::try_new(
595            object_store
596                .writer_with(&file_path)
597                .concurrent(DEFAULT_WRITE_CONCURRENCY)
598                .await
599                .map(|w| w.into_futures_async_write().compat_write())
600                .unwrap(),
601            arrow_schema.clone(),
602            Some(writer_props),
603        )
604        .unwrap();
605
606        let batch = new_batch_with_binary(&["a"], 0, 60);
607        let arrow_batch = write_format.convert_batch(&batch).unwrap();
608        let arrays: Vec<_> = arrow_batch
609            .columns()
610            .iter()
611            .map(|array| {
612                let data_type = array.data_type().clone();
613                if data_type == DataType::Binary {
614                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
615                } else {
616                    array.clone()
617                }
618            })
619            .collect();
620        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
621
622        writer.write(&result).await.unwrap();
623        writer.close().await.unwrap();
624
625        let builder = ParquetReaderBuilder::new(
626            FILE_DIR.to_string(),
627            PathType::Bare,
628            handle.clone(),
629            object_store,
630        );
631        let mut reader = builder.build().await.unwrap();
632        check_reader_result(
633            &mut reader,
634            &[
635                new_batch_with_binary(&["a"], 0, 50),
636                new_batch_with_binary(&["a"], 50, 60),
637            ],
638        )
639        .await;
640    }
641
642    #[tokio::test]
643    async fn test_write_multiple_files() {
644        common_telemetry::init_default_ut_logging();
645        // create test env
646        let mut env = TestEnv::new().await;
647        let object_store = env.init_object_store_manager();
648        let metadata = Arc::new(sst_region_metadata());
649        let batches = &[
650            new_batch_by_range(&["a", "d"], 0, 1000),
651            new_batch_by_range(&["b", "f"], 0, 1000),
652            new_batch_by_range(&["c", "g"], 0, 1000),
653            new_batch_by_range(&["b", "h"], 100, 200),
654            new_batch_by_range(&["b", "h"], 200, 300),
655            new_batch_by_range(&["b", "h"], 300, 1000),
656        ];
657        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
658
659        let source = new_source(batches);
660        let write_opts = WriteOptions {
661            row_group_size: 50,
662            max_file_size: Some(1024 * 16),
663            ..Default::default()
664        };
665
666        let path_provider = RegionFilePathFactory {
667            table_dir: "test".to_string(),
668            path_type: PathType::Bare,
669        };
670        let mut metrics = Metrics::new(WriteType::Flush);
671        let mut writer = ParquetWriter::new_with_object_store(
672            object_store.clone(),
673            metadata.clone(),
674            IndexConfig::default(),
675            NoopIndexBuilder,
676            path_provider,
677            &mut metrics,
678        )
679        .await;
680
681        let files = writer.write_all(source, None, &write_opts).await.unwrap();
682        assert_eq!(2, files.len());
683
684        let mut rows_read = 0;
685        for f in &files {
686            let file_handle = sst_file_handle_with_file_id(
687                f.file_id,
688                f.time_range.0.value(),
689                f.time_range.1.value(),
690            );
691            let builder = ParquetReaderBuilder::new(
692                "test".to_string(),
693                PathType::Bare,
694                file_handle,
695                object_store.clone(),
696            );
697            let mut reader = builder.build().await.unwrap();
698            while let Some(batch) = reader.next_batch().await.unwrap() {
699                rows_read += batch.num_rows();
700            }
701        }
702        assert_eq!(total_rows, rows_read);
703    }
704
705    #[tokio::test]
706    async fn test_write_read_with_index() {
707        let mut env = TestEnv::new().await;
708        let object_store = env.init_object_store_manager();
709        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
710        let metadata = Arc::new(sst_region_metadata());
711        let row_group_size = 50;
712
713        let source = new_source(&[
714            new_batch_by_range(&["a", "d"], 0, 20),
715            new_batch_by_range(&["b", "d"], 0, 20),
716            new_batch_by_range(&["c", "d"], 0, 20),
717            new_batch_by_range(&["c", "f"], 0, 40),
718            new_batch_by_range(&["c", "h"], 100, 200),
719        ]);
720        // Use a small row group size for test.
721        let write_opts = WriteOptions {
722            row_group_size,
723            ..Default::default()
724        };
725
726        let puffin_manager = env
727            .get_puffin_manager()
728            .build(object_store.clone(), file_path.clone());
729        let intermediate_manager = env.get_intermediate_manager();
730
731        let indexer_builder = IndexerBuilderImpl {
732            build_type: IndexBuildType::Flush,
733            metadata: metadata.clone(),
734            row_group_size,
735            puffin_manager,
736            write_cache_enabled: false,
737            intermediate_manager,
738            index_options: IndexOptions {
739                inverted_index: InvertedIndexOptions {
740                    segment_row_count: 1,
741                    ..Default::default()
742                },
743            },
744            inverted_index_config: Default::default(),
745            fulltext_index_config: Default::default(),
746            bloom_filter_index_config: Default::default(),
747            #[cfg(feature = "vector_index")]
748            vector_index_config: Default::default(),
749        };
750
751        let mut metrics = Metrics::new(WriteType::Flush);
752        let mut writer = ParquetWriter::new_with_object_store(
753            object_store.clone(),
754            metadata.clone(),
755            IndexConfig::default(),
756            indexer_builder,
757            file_path.clone(),
758            &mut metrics,
759        )
760        .await;
761
762        let info = writer
763            .write_all(source, None, &write_opts)
764            .await
765            .unwrap()
766            .remove(0);
767        assert_eq!(200, info.num_rows);
768        assert!(info.file_size > 0);
769        assert!(info.index_metadata.file_size > 0);
770
771        assert!(info.index_metadata.inverted_index.index_size > 0);
772        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
773        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
774
775        assert!(info.index_metadata.bloom_filter.index_size > 0);
776        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
777        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
778
779        assert_eq!(
780            (
781                Timestamp::new_millisecond(0),
782                Timestamp::new_millisecond(199)
783            ),
784            info.time_range
785        );
786
787        let handle = FileHandle::new(
788            FileMeta {
789                region_id: metadata.region_id,
790                file_id: info.file_id,
791                time_range: info.time_range,
792                level: 0,
793                file_size: info.file_size,
794                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
795                available_indexes: info.index_metadata.build_available_indexes(),
796                indexes: info.index_metadata.build_indexes(),
797                index_file_size: info.index_metadata.file_size,
798                index_version: 0,
799                num_row_groups: info.num_row_groups,
800                num_rows: info.num_rows as u64,
801                sequence: None,
802                partition_expr: match &metadata.partition_expr {
803                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
804                        .expect("partition expression should be valid JSON"),
805                    None => None,
806                },
807                num_series: 0,
808            },
809            Arc::new(NoopFilePurger),
810        );
811
812        let cache = Arc::new(
813            CacheManager::builder()
814                .index_result_cache_size(1024 * 1024)
815                .index_metadata_size(1024 * 1024)
816                .index_content_page_size(1024 * 1024)
817                .index_content_size(1024 * 1024)
818                .puffin_metadata_size(1024 * 1024)
819                .build(),
820        );
821        let index_result_cache = cache.index_result_cache().unwrap();
822
823        let build_inverted_index_applier = |exprs: &[Expr]| {
824            InvertedIndexApplierBuilder::new(
825                FILE_DIR.to_string(),
826                PathType::Bare,
827                object_store.clone(),
828                &metadata,
829                HashSet::from_iter([0]),
830                env.get_puffin_manager(),
831            )
832            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
833            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
834            .build(exprs)
835            .unwrap()
836            .map(Arc::new)
837        };
838
839        let build_bloom_filter_applier = |exprs: &[Expr]| {
840            BloomFilterIndexApplierBuilder::new(
841                FILE_DIR.to_string(),
842                PathType::Bare,
843                object_store.clone(),
844                &metadata,
845                env.get_puffin_manager(),
846            )
847            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
848            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
849            .build(exprs)
850            .unwrap()
851            .map(Arc::new)
852        };
853
854        // Data: ts tag_0 tag_1
855        // Data: 0-20 [a, d]
856        //       0-20 [b, d]
857        //       0-20 [c, d]
858        //       0-40 [c, f]
859        //    100-200 [c, h]
860        //
861        // Pred: tag_0 = "b"
862        //
863        // Row groups & rows pruning:
864        //
865        // Row Groups:
866        // - min-max: filter out row groups 1..=3
867        //
868        // Rows:
869        // - inverted index: hit row group 0, hit 20 rows
870        let preds = vec![col("tag_0").eq(lit("b"))];
871        let inverted_index_applier = build_inverted_index_applier(&preds);
872        let bloom_filter_applier = build_bloom_filter_applier(&preds);
873
874        let builder = ParquetReaderBuilder::new(
875            FILE_DIR.to_string(),
876            PathType::Bare,
877            handle.clone(),
878            object_store.clone(),
879        )
880        .predicate(Some(Predicate::new(preds)))
881        .inverted_index_appliers([inverted_index_applier.clone(), None])
882        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
883        .cache(CacheStrategy::EnableAll(cache.clone()));
884
885        let mut metrics = ReaderMetrics::default();
886        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
887        let mut reader = ParquetReader::new(Arc::new(context), selection)
888            .await
889            .unwrap();
890        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
891
892        assert_eq!(metrics.filter_metrics.rg_total, 4);
893        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
894        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
895        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
896        let cached = index_result_cache
897            .get(
898                inverted_index_applier.unwrap().predicate_key(),
899                handle.file_id().file_id(),
900            )
901            .unwrap();
902        // inverted index will search all row groups
903        assert!(cached.contains_row_group(0));
904        assert!(cached.contains_row_group(1));
905        assert!(cached.contains_row_group(2));
906        assert!(cached.contains_row_group(3));
907
908        // Data: ts tag_0 tag_1
909        // Data: 0-20 [a, d]
910        //       0-20 [b, d]
911        //       0-20 [c, d]
912        //       0-40 [c, f]
913        //    100-200 [c, h]
914        //
915        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
916        //
917        // Row groups & rows pruning:
918        //
919        // Row Groups:
920        // - min-max: filter out row groups 0..=1
921        // - bloom filter: filter out row groups 2..=3
922        let preds = vec![
923            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
924            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
925            col("tag_1").eq(lit("d")),
926        ];
927        let inverted_index_applier = build_inverted_index_applier(&preds);
928        let bloom_filter_applier = build_bloom_filter_applier(&preds);
929
930        let builder = ParquetReaderBuilder::new(
931            FILE_DIR.to_string(),
932            PathType::Bare,
933            handle.clone(),
934            object_store.clone(),
935        )
936        .predicate(Some(Predicate::new(preds)))
937        .inverted_index_appliers([inverted_index_applier.clone(), None])
938        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
939        .cache(CacheStrategy::EnableAll(cache.clone()));
940
941        let mut metrics = ReaderMetrics::default();
942        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
943        let mut reader = ParquetReader::new(Arc::new(context), selection)
944            .await
945            .unwrap();
946        check_reader_result(&mut reader, &[]).await;
947
948        assert_eq!(metrics.filter_metrics.rg_total, 4);
949        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
950        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
951        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
952        let cached = index_result_cache
953            .get(
954                bloom_filter_applier.unwrap().predicate_key(),
955                handle.file_id().file_id(),
956            )
957            .unwrap();
958        assert!(cached.contains_row_group(2));
959        assert!(cached.contains_row_group(3));
960        assert!(!cached.contains_row_group(0));
961        assert!(!cached.contains_row_group(1));
962
963        // Remove the pred of `ts`, continue to use the pred of `tag_1`
964        // to test if cache works.
965
966        // Data: ts tag_0 tag_1
967        // Data: 0-20 [a, d]
968        //       0-20 [b, d]
969        //       0-20 [c, d]
970        //       0-40 [c, f]
971        //    100-200 [c, h]
972        //
973        // Pred: tag_1 = "d"
974        //
975        // Row groups & rows pruning:
976        //
977        // Row Groups:
978        // - bloom filter: filter out row groups 2..=3
979        //
980        // Rows:
981        // - bloom filter: hit row group 0, hit 50 rows
982        //                 hit row group 1, hit 10 rows
983        let preds = vec![col("tag_1").eq(lit("d"))];
984        let inverted_index_applier = build_inverted_index_applier(&preds);
985        let bloom_filter_applier = build_bloom_filter_applier(&preds);
986
987        let builder = ParquetReaderBuilder::new(
988            FILE_DIR.to_string(),
989            PathType::Bare,
990            handle.clone(),
991            object_store.clone(),
992        )
993        .predicate(Some(Predicate::new(preds)))
994        .inverted_index_appliers([inverted_index_applier.clone(), None])
995        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
996        .cache(CacheStrategy::EnableAll(cache.clone()));
997
998        let mut metrics = ReaderMetrics::default();
999        let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1000        let mut reader = ParquetReader::new(Arc::new(context), selection)
1001            .await
1002            .unwrap();
1003        check_reader_result(
1004            &mut reader,
1005            &[
1006                new_batch_by_range(&["a", "d"], 0, 20),
1007                new_batch_by_range(&["b", "d"], 0, 20),
1008                new_batch_by_range(&["c", "d"], 0, 10),
1009                new_batch_by_range(&["c", "d"], 10, 20),
1010            ],
1011        )
1012        .await;
1013
1014        assert_eq!(metrics.filter_metrics.rg_total, 4);
1015        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1016        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1017        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1018        let cached = index_result_cache
1019            .get(
1020                bloom_filter_applier.unwrap().predicate_key(),
1021                handle.file_id().file_id(),
1022            )
1023            .unwrap();
1024        assert!(cached.contains_row_group(0));
1025        assert!(cached.contains_row_group(1));
1026        assert!(cached.contains_row_group(2));
1027        assert!(cached.contains_row_group(3));
1028    }
1029
1030    /// Creates a flat format RecordBatch for testing.
1031    /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
1032    fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1033        assert!(end >= start);
1034        let metadata = Arc::new(sst_region_metadata());
1035        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1036
1037        let num_rows = end - start;
1038        let mut columns = Vec::new();
1039
1040        // Add primary key columns (tag_0, tag_1) as dictionary arrays
1041        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1042        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1043
1044        for _ in 0..num_rows {
1045            tag_0_builder.append_value(tags[0]);
1046            tag_1_builder.append_value(tags[1]);
1047        }
1048
1049        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1050        columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1051
1052        // Add field column (field_0)
1053        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1054        columns.push(Arc::new(UInt64Array::from(field_values)));
1055
1056        // Add time index column (ts)
1057        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1058        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1059
1060        // Add encoded primary key column
1061        let pk = new_primary_key(tags);
1062        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1063        for _ in 0..num_rows {
1064            pk_builder.append(&pk).unwrap();
1065        }
1066        columns.push(Arc::new(pk_builder.finish()));
1067
1068        // Add sequence column
1069        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1070
1071        // Add op_type column
1072        columns.push(Arc::new(UInt8Array::from_value(
1073            OpType::Put as u8,
1074            num_rows,
1075        )));
1076
1077        RecordBatch::try_new(flat_schema, columns).unwrap()
1078    }
1079
1080    /// Creates a FlatSource from flat format RecordBatches.
1081    fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1082        FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1083    }
1084
1085    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1086    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1087    fn new_record_batch_by_range_sparse(
1088        tags: &[&str],
1089        start: usize,
1090        end: usize,
1091        metadata: &Arc<RegionMetadata>,
1092    ) -> RecordBatch {
1093        assert!(end >= start);
1094        let flat_schema = to_flat_sst_arrow_schema(
1095            metadata,
1096            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1097        );
1098
1099        let num_rows = end - start;
1100        let mut columns: Vec<ArrayRef> = Vec::new();
1101
1102        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1103
1104        // Add field column (field_0)
1105        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1106        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1107
1108        // Add time index column (ts)
1109        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1110        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1111
1112        // Add encoded primary key column using sparse encoding
1113        let table_id = 1u32; // Test table ID
1114        let tsid = 100u64; // Base TSID
1115        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1116
1117        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1118        for _ in 0..num_rows {
1119            pk_builder.append(&pk).unwrap();
1120        }
1121        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1122
1123        // Add sequence column
1124        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1125
1126        // Add op_type column
1127        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1128
1129        RecordBatch::try_new(flat_schema, columns).unwrap()
1130    }
1131
1132    /// Helper function to create IndexerBuilderImpl for tests.
1133    fn create_test_indexer_builder(
1134        env: &TestEnv,
1135        object_store: ObjectStore,
1136        file_path: RegionFilePathFactory,
1137        metadata: Arc<RegionMetadata>,
1138        row_group_size: usize,
1139    ) -> IndexerBuilderImpl {
1140        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1141        let intermediate_manager = env.get_intermediate_manager();
1142
1143        IndexerBuilderImpl {
1144            build_type: IndexBuildType::Flush,
1145            metadata,
1146            row_group_size,
1147            puffin_manager,
1148            write_cache_enabled: false,
1149            intermediate_manager,
1150            index_options: IndexOptions {
1151                inverted_index: InvertedIndexOptions {
1152                    segment_row_count: 1,
1153                    ..Default::default()
1154                },
1155            },
1156            inverted_index_config: Default::default(),
1157            fulltext_index_config: Default::default(),
1158            bloom_filter_index_config: Default::default(),
1159            #[cfg(feature = "vector_index")]
1160            vector_index_config: Default::default(),
1161        }
1162    }
1163
1164    /// Helper function to write flat SST and return SstInfo.
1165    async fn write_flat_sst(
1166        object_store: ObjectStore,
1167        metadata: Arc<RegionMetadata>,
1168        indexer_builder: IndexerBuilderImpl,
1169        file_path: RegionFilePathFactory,
1170        flat_source: FlatSource,
1171        write_opts: &WriteOptions,
1172    ) -> SstInfo {
1173        let mut metrics = Metrics::new(WriteType::Flush);
1174        let mut writer = ParquetWriter::new_with_object_store(
1175            object_store,
1176            metadata,
1177            IndexConfig::default(),
1178            indexer_builder,
1179            file_path,
1180            &mut metrics,
1181        )
1182        .await;
1183
1184        writer
1185            .write_all_flat(flat_source, write_opts)
1186            .await
1187            .unwrap()
1188            .remove(0)
1189    }
1190
1191    /// Helper function to create FileHandle from SstInfo.
1192    fn create_file_handle_from_sst_info(
1193        info: &SstInfo,
1194        metadata: &Arc<RegionMetadata>,
1195    ) -> FileHandle {
1196        FileHandle::new(
1197            FileMeta {
1198                region_id: metadata.region_id,
1199                file_id: info.file_id,
1200                time_range: info.time_range,
1201                level: 0,
1202                file_size: info.file_size,
1203                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1204                available_indexes: info.index_metadata.build_available_indexes(),
1205                indexes: info.index_metadata.build_indexes(),
1206                index_file_size: info.index_metadata.file_size,
1207                index_version: 0,
1208                num_row_groups: info.num_row_groups,
1209                num_rows: info.num_rows as u64,
1210                sequence: None,
1211                partition_expr: match &metadata.partition_expr {
1212                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1213                        .expect("partition expression should be valid JSON"),
1214                    None => None,
1215                },
1216                num_series: 0,
1217            },
1218            Arc::new(NoopFilePurger),
1219        )
1220    }
1221
1222    /// Helper function to create test cache with standard settings.
1223    fn create_test_cache() -> Arc<CacheManager> {
1224        Arc::new(
1225            CacheManager::builder()
1226                .index_result_cache_size(1024 * 1024)
1227                .index_metadata_size(1024 * 1024)
1228                .index_content_page_size(1024 * 1024)
1229                .index_content_size(1024 * 1024)
1230                .puffin_metadata_size(1024 * 1024)
1231                .build(),
1232        )
1233    }
1234
1235    #[tokio::test]
1236    async fn test_write_flat_with_index() {
1237        let mut env = TestEnv::new().await;
1238        let object_store = env.init_object_store_manager();
1239        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1240        let metadata = Arc::new(sst_region_metadata());
1241        let row_group_size = 50;
1242
1243        // Create flat format RecordBatches
1244        let flat_batches = vec![
1245            new_record_batch_by_range(&["a", "d"], 0, 20),
1246            new_record_batch_by_range(&["b", "d"], 0, 20),
1247            new_record_batch_by_range(&["c", "d"], 0, 20),
1248            new_record_batch_by_range(&["c", "f"], 0, 40),
1249            new_record_batch_by_range(&["c", "h"], 100, 200),
1250        ];
1251
1252        let flat_source = new_flat_source_from_record_batches(flat_batches);
1253
1254        let write_opts = WriteOptions {
1255            row_group_size,
1256            ..Default::default()
1257        };
1258
1259        let puffin_manager = env
1260            .get_puffin_manager()
1261            .build(object_store.clone(), file_path.clone());
1262        let intermediate_manager = env.get_intermediate_manager();
1263
1264        let indexer_builder = IndexerBuilderImpl {
1265            build_type: IndexBuildType::Flush,
1266            metadata: metadata.clone(),
1267            row_group_size,
1268            puffin_manager,
1269            write_cache_enabled: false,
1270            intermediate_manager,
1271            index_options: IndexOptions {
1272                inverted_index: InvertedIndexOptions {
1273                    segment_row_count: 1,
1274                    ..Default::default()
1275                },
1276            },
1277            inverted_index_config: Default::default(),
1278            fulltext_index_config: Default::default(),
1279            bloom_filter_index_config: Default::default(),
1280            #[cfg(feature = "vector_index")]
1281            vector_index_config: Default::default(),
1282        };
1283
1284        let mut metrics = Metrics::new(WriteType::Flush);
1285        let mut writer = ParquetWriter::new_with_object_store(
1286            object_store.clone(),
1287            metadata.clone(),
1288            IndexConfig::default(),
1289            indexer_builder,
1290            file_path.clone(),
1291            &mut metrics,
1292        )
1293        .await;
1294
1295        let info = writer
1296            .write_all_flat(flat_source, &write_opts)
1297            .await
1298            .unwrap()
1299            .remove(0);
1300        assert_eq!(200, info.num_rows);
1301        assert!(info.file_size > 0);
1302        assert!(info.index_metadata.file_size > 0);
1303
1304        assert!(info.index_metadata.inverted_index.index_size > 0);
1305        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1306        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1307
1308        assert!(info.index_metadata.bloom_filter.index_size > 0);
1309        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1310        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1311
1312        assert_eq!(
1313            (
1314                Timestamp::new_millisecond(0),
1315                Timestamp::new_millisecond(199)
1316            ),
1317            info.time_range
1318        );
1319    }
1320
1321    #[tokio::test]
1322    async fn test_read_with_override_sequence() {
1323        let mut env = TestEnv::new().await;
1324        let object_store = env.init_object_store_manager();
1325        let handle = sst_file_handle(0, 1000);
1326        let file_path = FixedPathProvider {
1327            region_file_id: handle.file_id(),
1328        };
1329        let metadata = Arc::new(sst_region_metadata());
1330
1331        // Create batches with sequence 0 to trigger override functionality
1332        let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1333        let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1334        let source = new_source(&[batch1, batch2]);
1335
1336        let write_opts = WriteOptions {
1337            row_group_size: 50,
1338            ..Default::default()
1339        };
1340
1341        let mut metrics = Metrics::new(WriteType::Flush);
1342        let mut writer = ParquetWriter::new_with_object_store(
1343            object_store.clone(),
1344            metadata.clone(),
1345            IndexConfig::default(),
1346            NoopIndexBuilder,
1347            file_path,
1348            &mut metrics,
1349        )
1350        .await;
1351
1352        writer
1353            .write_all(source, None, &write_opts)
1354            .await
1355            .unwrap()
1356            .remove(0);
1357
1358        // Read without override sequence (should read sequence 0)
1359        let builder = ParquetReaderBuilder::new(
1360            FILE_DIR.to_string(),
1361            PathType::Bare,
1362            handle.clone(),
1363            object_store.clone(),
1364        );
1365        let mut reader = builder.build().await.unwrap();
1366        let mut normal_batches = Vec::new();
1367        while let Some(batch) = reader.next_batch().await.unwrap() {
1368            normal_batches.push(batch);
1369        }
1370
1371        // Read with override sequence using FileMeta.sequence
1372        let custom_sequence = 12345u64;
1373        let file_meta = handle.meta_ref();
1374        let mut override_file_meta = file_meta.clone();
1375        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1376        let override_handle = FileHandle::new(
1377            override_file_meta,
1378            Arc::new(crate::sst::file_purger::NoopFilePurger),
1379        );
1380
1381        let builder = ParquetReaderBuilder::new(
1382            FILE_DIR.to_string(),
1383            PathType::Bare,
1384            override_handle,
1385            object_store.clone(),
1386        );
1387        let mut reader = builder.build().await.unwrap();
1388        let mut override_batches = Vec::new();
1389        while let Some(batch) = reader.next_batch().await.unwrap() {
1390            override_batches.push(batch);
1391        }
1392
1393        // Compare the results
1394        assert_eq!(normal_batches.len(), override_batches.len());
1395        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1396            // Create expected batch with override sequence
1397            let expected_batch = {
1398                let num_rows = normal.num_rows();
1399                let mut builder = BatchBuilder::from(normal);
1400                builder
1401                    .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1402                    .unwrap();
1403
1404                builder.build().unwrap()
1405            };
1406
1407            // Override batch should match expected batch
1408            assert_eq!(*override_batch, expected_batch);
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn test_write_flat_read_with_inverted_index() {
1414        let mut env = TestEnv::new().await;
1415        let object_store = env.init_object_store_manager();
1416        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1417        let metadata = Arc::new(sst_region_metadata());
1418        let row_group_size = 100;
1419
1420        // Create flat format RecordBatches with non-overlapping timestamp ranges
1421        // Each batch becomes one row group (row_group_size = 100)
1422        // Data: ts tag_0 tag_1
1423        // RG 0:   0-50  [a, d]
1424        // RG 0:  50-100 [b, d]
1425        // RG 1: 100-150 [c, d]
1426        // RG 1: 150-200 [c, f]
1427        let flat_batches = vec![
1428            new_record_batch_by_range(&["a", "d"], 0, 50),
1429            new_record_batch_by_range(&["b", "d"], 50, 100),
1430            new_record_batch_by_range(&["c", "d"], 100, 150),
1431            new_record_batch_by_range(&["c", "f"], 150, 200),
1432        ];
1433
1434        let flat_source = new_flat_source_from_record_batches(flat_batches);
1435
1436        let write_opts = WriteOptions {
1437            row_group_size,
1438            ..Default::default()
1439        };
1440
1441        let indexer_builder = create_test_indexer_builder(
1442            &env,
1443            object_store.clone(),
1444            file_path.clone(),
1445            metadata.clone(),
1446            row_group_size,
1447        );
1448
1449        let info = write_flat_sst(
1450            object_store.clone(),
1451            metadata.clone(),
1452            indexer_builder,
1453            file_path.clone(),
1454            flat_source,
1455            &write_opts,
1456        )
1457        .await;
1458        assert_eq!(200, info.num_rows);
1459        assert!(info.file_size > 0);
1460        assert!(info.index_metadata.file_size > 0);
1461
1462        let handle = create_file_handle_from_sst_info(&info, &metadata);
1463
1464        let cache = create_test_cache();
1465
1466        // Test 1: Filter by tag_0 = "b"
1467        // Expected: Only rows with tag_0="b"
1468        let preds = vec![col("tag_0").eq(lit("b"))];
1469        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1470            FILE_DIR.to_string(),
1471            PathType::Bare,
1472            object_store.clone(),
1473            &metadata,
1474            HashSet::from_iter([0]),
1475            env.get_puffin_manager(),
1476        )
1477        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1478        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1479        .build(&preds)
1480        .unwrap()
1481        .map(Arc::new);
1482
1483        let builder = ParquetReaderBuilder::new(
1484            FILE_DIR.to_string(),
1485            PathType::Bare,
1486            handle.clone(),
1487            object_store.clone(),
1488        )
1489        .flat_format(true)
1490        .predicate(Some(Predicate::new(preds)))
1491        .inverted_index_appliers([inverted_index_applier.clone(), None])
1492        .cache(CacheStrategy::EnableAll(cache.clone()));
1493
1494        let mut metrics = ReaderMetrics::default();
1495        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1496
1497        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1498        assert_eq!(selection.row_group_count(), 1);
1499        assert_eq!(50, selection.get(0).unwrap().row_count());
1500
1501        // Verify filtering metrics
1502        assert_eq!(metrics.filter_metrics.rg_total, 2);
1503        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1504        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1505        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1506    }
1507
1508    #[tokio::test]
1509    async fn test_write_flat_read_with_bloom_filter() {
1510        let mut env = TestEnv::new().await;
1511        let object_store = env.init_object_store_manager();
1512        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1513        let metadata = Arc::new(sst_region_metadata());
1514        let row_group_size = 100;
1515
1516        // Create flat format RecordBatches with non-overlapping timestamp ranges
1517        // Each batch becomes one row group (row_group_size = 100)
1518        // Data: ts tag_0 tag_1
1519        // RG 0:   0-50  [a, d]
1520        // RG 0:  50-100 [b, e]
1521        // RG 1: 100-150 [c, d]
1522        // RG 1: 150-200 [c, f]
1523        let flat_batches = vec![
1524            new_record_batch_by_range(&["a", "d"], 0, 50),
1525            new_record_batch_by_range(&["b", "e"], 50, 100),
1526            new_record_batch_by_range(&["c", "d"], 100, 150),
1527            new_record_batch_by_range(&["c", "f"], 150, 200),
1528        ];
1529
1530        let flat_source = new_flat_source_from_record_batches(flat_batches);
1531
1532        let write_opts = WriteOptions {
1533            row_group_size,
1534            ..Default::default()
1535        };
1536
1537        let indexer_builder = create_test_indexer_builder(
1538            &env,
1539            object_store.clone(),
1540            file_path.clone(),
1541            metadata.clone(),
1542            row_group_size,
1543        );
1544
1545        let info = write_flat_sst(
1546            object_store.clone(),
1547            metadata.clone(),
1548            indexer_builder,
1549            file_path.clone(),
1550            flat_source,
1551            &write_opts,
1552        )
1553        .await;
1554        assert_eq!(200, info.num_rows);
1555        assert!(info.file_size > 0);
1556        assert!(info.index_metadata.file_size > 0);
1557
1558        let handle = create_file_handle_from_sst_info(&info, &metadata);
1559
1560        let cache = create_test_cache();
1561
1562        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1563        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1564        let preds = vec![
1565            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1566            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1567            col("tag_1").eq(lit("d")),
1568        ];
1569        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1570            FILE_DIR.to_string(),
1571            PathType::Bare,
1572            object_store.clone(),
1573            &metadata,
1574            env.get_puffin_manager(),
1575        )
1576        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1577        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1578        .build(&preds)
1579        .unwrap()
1580        .map(Arc::new);
1581
1582        let builder = ParquetReaderBuilder::new(
1583            FILE_DIR.to_string(),
1584            PathType::Bare,
1585            handle.clone(),
1586            object_store.clone(),
1587        )
1588        .flat_format(true)
1589        .predicate(Some(Predicate::new(preds)))
1590        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1591        .cache(CacheStrategy::EnableAll(cache.clone()));
1592
1593        let mut metrics = ReaderMetrics::default();
1594        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1595
1596        // Verify selection contains RG 0 and RG 1
1597        assert_eq!(selection.row_group_count(), 2);
1598        assert_eq!(50, selection.get(0).unwrap().row_count());
1599        assert_eq!(50, selection.get(1).unwrap().row_count());
1600
1601        // Verify filtering metrics
1602        assert_eq!(metrics.filter_metrics.rg_total, 2);
1603        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1604        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1605        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1606    }
1607
1608    #[tokio::test]
1609    async fn test_write_flat_read_with_inverted_index_sparse() {
1610        common_telemetry::init_default_ut_logging();
1611
1612        let mut env = TestEnv::new().await;
1613        let object_store = env.init_object_store_manager();
1614        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1615        let metadata = Arc::new(sst_region_metadata_with_encoding(
1616            PrimaryKeyEncoding::Sparse,
1617        ));
1618        let row_group_size = 100;
1619
1620        // Create flat format RecordBatches with non-overlapping timestamp ranges
1621        // Each batch becomes one row group (row_group_size = 100)
1622        // Data: ts tag_0 tag_1
1623        // RG 0:   0-50  [a, d]
1624        // RG 0:  50-100 [b, d]
1625        // RG 1: 100-150 [c, d]
1626        // RG 1: 150-200 [c, f]
1627        let flat_batches = vec![
1628            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1629            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1630            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1631            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1632        ];
1633
1634        let flat_source = new_flat_source_from_record_batches(flat_batches);
1635
1636        let write_opts = WriteOptions {
1637            row_group_size,
1638            ..Default::default()
1639        };
1640
1641        let indexer_builder = create_test_indexer_builder(
1642            &env,
1643            object_store.clone(),
1644            file_path.clone(),
1645            metadata.clone(),
1646            row_group_size,
1647        );
1648
1649        let info = write_flat_sst(
1650            object_store.clone(),
1651            metadata.clone(),
1652            indexer_builder,
1653            file_path.clone(),
1654            flat_source,
1655            &write_opts,
1656        )
1657        .await;
1658        assert_eq!(200, info.num_rows);
1659        assert!(info.file_size > 0);
1660        assert!(info.index_metadata.file_size > 0);
1661
1662        let handle = create_file_handle_from_sst_info(&info, &metadata);
1663
1664        let cache = create_test_cache();
1665
1666        // Test 1: Filter by tag_0 = "b"
1667        // Expected: Only rows with tag_0="b"
1668        let preds = vec![col("tag_0").eq(lit("b"))];
1669        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1670            FILE_DIR.to_string(),
1671            PathType::Bare,
1672            object_store.clone(),
1673            &metadata,
1674            HashSet::from_iter([0]),
1675            env.get_puffin_manager(),
1676        )
1677        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1678        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1679        .build(&preds)
1680        .unwrap()
1681        .map(Arc::new);
1682
1683        let builder = ParquetReaderBuilder::new(
1684            FILE_DIR.to_string(),
1685            PathType::Bare,
1686            handle.clone(),
1687            object_store.clone(),
1688        )
1689        .flat_format(true)
1690        .predicate(Some(Predicate::new(preds)))
1691        .inverted_index_appliers([inverted_index_applier.clone(), None])
1692        .cache(CacheStrategy::EnableAll(cache.clone()));
1693
1694        let mut metrics = ReaderMetrics::default();
1695        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1696
1697        // RG 0 has 50 matching rows (tag_0="b")
1698        assert_eq!(selection.row_group_count(), 1);
1699        assert_eq!(50, selection.get(0).unwrap().row_count());
1700
1701        // Verify filtering metrics
1702        // Note: With sparse encoding, tag columns aren't stored separately,
1703        // so minmax filtering on tags doesn't work (only inverted index)
1704        assert_eq!(metrics.filter_metrics.rg_total, 2);
1705        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1706        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1707        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1708    }
1709
1710    #[tokio::test]
1711    async fn test_write_flat_read_with_bloom_filter_sparse() {
1712        let mut env = TestEnv::new().await;
1713        let object_store = env.init_object_store_manager();
1714        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1715        let metadata = Arc::new(sst_region_metadata_with_encoding(
1716            PrimaryKeyEncoding::Sparse,
1717        ));
1718        let row_group_size = 100;
1719
1720        // Create flat format RecordBatches with non-overlapping timestamp ranges
1721        // Each batch becomes one row group (row_group_size = 100)
1722        // Data: ts tag_0 tag_1
1723        // RG 0:   0-50  [a, d]
1724        // RG 0:  50-100 [b, e]
1725        // RG 1: 100-150 [c, d]
1726        // RG 1: 150-200 [c, f]
1727        let flat_batches = vec![
1728            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1729            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1730            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1731            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1732        ];
1733
1734        let flat_source = new_flat_source_from_record_batches(flat_batches);
1735
1736        let write_opts = WriteOptions {
1737            row_group_size,
1738            ..Default::default()
1739        };
1740
1741        let indexer_builder = create_test_indexer_builder(
1742            &env,
1743            object_store.clone(),
1744            file_path.clone(),
1745            metadata.clone(),
1746            row_group_size,
1747        );
1748
1749        let info = write_flat_sst(
1750            object_store.clone(),
1751            metadata.clone(),
1752            indexer_builder,
1753            file_path.clone(),
1754            flat_source,
1755            &write_opts,
1756        )
1757        .await;
1758        assert_eq!(200, info.num_rows);
1759        assert!(info.file_size > 0);
1760        assert!(info.index_metadata.file_size > 0);
1761
1762        let handle = create_file_handle_from_sst_info(&info, &metadata);
1763
1764        let cache = create_test_cache();
1765
1766        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1767        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1768        let preds = vec![
1769            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1770            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1771            col("tag_1").eq(lit("d")),
1772        ];
1773        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1774            FILE_DIR.to_string(),
1775            PathType::Bare,
1776            object_store.clone(),
1777            &metadata,
1778            env.get_puffin_manager(),
1779        )
1780        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1781        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1782        .build(&preds)
1783        .unwrap()
1784        .map(Arc::new);
1785
1786        let builder = ParquetReaderBuilder::new(
1787            FILE_DIR.to_string(),
1788            PathType::Bare,
1789            handle.clone(),
1790            object_store.clone(),
1791        )
1792        .flat_format(true)
1793        .predicate(Some(Predicate::new(preds)))
1794        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1795        .cache(CacheStrategy::EnableAll(cache.clone()));
1796
1797        let mut metrics = ReaderMetrics::default();
1798        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1799
1800        // Verify selection contains RG 0 and RG 1
1801        assert_eq!(selection.row_group_count(), 2);
1802        assert_eq!(50, selection.get(0).unwrap().row_count());
1803        assert_eq!(50, selection.get(1).unwrap().row_count());
1804
1805        // Verify filtering metrics
1806        assert_eq!(metrics.filter_metrics.rg_total, 2);
1807        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1808        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1809        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1810    }
1811
1812    /// Creates region metadata for testing fulltext indexes.
1813    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
1814    fn fulltext_region_metadata() -> RegionMetadata {
1815        let mut builder = RegionMetadataBuilder::new(REGION_ID);
1816        builder
1817            .push_column_metadata(ColumnMetadata {
1818                column_schema: ColumnSchema::new(
1819                    "tag_0".to_string(),
1820                    ConcreteDataType::string_datatype(),
1821                    true,
1822                ),
1823                semantic_type: SemanticType::Tag,
1824                column_id: 0,
1825            })
1826            .push_column_metadata(ColumnMetadata {
1827                column_schema: ColumnSchema::new(
1828                    "text_bloom".to_string(),
1829                    ConcreteDataType::string_datatype(),
1830                    true,
1831                )
1832                .with_fulltext_options(FulltextOptions {
1833                    enable: true,
1834                    analyzer: FulltextAnalyzer::English,
1835                    case_sensitive: false,
1836                    backend: FulltextBackend::Bloom,
1837                    granularity: 1,
1838                    false_positive_rate_in_10000: 50,
1839                })
1840                .unwrap(),
1841                semantic_type: SemanticType::Field,
1842                column_id: 1,
1843            })
1844            .push_column_metadata(ColumnMetadata {
1845                column_schema: ColumnSchema::new(
1846                    "text_tantivy".to_string(),
1847                    ConcreteDataType::string_datatype(),
1848                    true,
1849                )
1850                .with_fulltext_options(FulltextOptions {
1851                    enable: true,
1852                    analyzer: FulltextAnalyzer::English,
1853                    case_sensitive: false,
1854                    backend: FulltextBackend::Tantivy,
1855                    granularity: 1,
1856                    false_positive_rate_in_10000: 50,
1857                })
1858                .unwrap(),
1859                semantic_type: SemanticType::Field,
1860                column_id: 2,
1861            })
1862            .push_column_metadata(ColumnMetadata {
1863                column_schema: ColumnSchema::new(
1864                    "field_0".to_string(),
1865                    ConcreteDataType::uint64_datatype(),
1866                    true,
1867                ),
1868                semantic_type: SemanticType::Field,
1869                column_id: 3,
1870            })
1871            .push_column_metadata(ColumnMetadata {
1872                column_schema: ColumnSchema::new(
1873                    "ts".to_string(),
1874                    ConcreteDataType::timestamp_millisecond_datatype(),
1875                    false,
1876                ),
1877                semantic_type: SemanticType::Timestamp,
1878                column_id: 4,
1879            })
1880            .primary_key(vec![0]);
1881        builder.build().unwrap()
1882    }
1883
1884    /// Creates a flat format RecordBatch with string fields for fulltext testing.
1885    fn new_fulltext_record_batch_by_range(
1886        tag: &str,
1887        text_bloom: &str,
1888        text_tantivy: &str,
1889        start: usize,
1890        end: usize,
1891    ) -> RecordBatch {
1892        assert!(end >= start);
1893        let metadata = Arc::new(fulltext_region_metadata());
1894        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1895
1896        let num_rows = end - start;
1897        let mut columns = Vec::new();
1898
1899        // Add primary key column (tag_0) as dictionary array
1900        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1901        for _ in 0..num_rows {
1902            tag_builder.append_value(tag);
1903        }
1904        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1905
1906        // Add text_bloom field (fulltext with bloom backend)
1907        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1908        columns.push(Arc::new(StringArray::from(text_bloom_values)));
1909
1910        // Add text_tantivy field (fulltext with tantivy backend)
1911        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1912        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1913
1914        // Add field column (field_0)
1915        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1916        columns.push(Arc::new(UInt64Array::from(field_values)));
1917
1918        // Add time index column (ts)
1919        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1920        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1921
1922        // Add encoded primary key column
1923        let pk = new_primary_key(&[tag]);
1924        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1925        for _ in 0..num_rows {
1926            pk_builder.append(&pk).unwrap();
1927        }
1928        columns.push(Arc::new(pk_builder.finish()));
1929
1930        // Add sequence column
1931        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1932
1933        // Add op_type column
1934        columns.push(Arc::new(UInt8Array::from_value(
1935            OpType::Put as u8,
1936            num_rows,
1937        )));
1938
1939        RecordBatch::try_new(flat_schema, columns).unwrap()
1940    }
1941
1942    #[tokio::test]
1943    async fn test_write_flat_read_with_fulltext_index() {
1944        let mut env = TestEnv::new().await;
1945        let object_store = env.init_object_store_manager();
1946        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1947        let metadata = Arc::new(fulltext_region_metadata());
1948        let row_group_size = 50;
1949
1950        // Create flat format RecordBatches with different text content
1951        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
1952        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
1953        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
1954        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
1955        let flat_batches = vec![
1956            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
1957            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
1958            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
1959            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
1960        ];
1961
1962        let flat_source = new_flat_source_from_record_batches(flat_batches);
1963
1964        let write_opts = WriteOptions {
1965            row_group_size,
1966            ..Default::default()
1967        };
1968
1969        let indexer_builder = create_test_indexer_builder(
1970            &env,
1971            object_store.clone(),
1972            file_path.clone(),
1973            metadata.clone(),
1974            row_group_size,
1975        );
1976
1977        let mut info = write_flat_sst(
1978            object_store.clone(),
1979            metadata.clone(),
1980            indexer_builder,
1981            file_path.clone(),
1982            flat_source,
1983            &write_opts,
1984        )
1985        .await;
1986        assert_eq!(200, info.num_rows);
1987        assert!(info.file_size > 0);
1988        assert!(info.index_metadata.file_size > 0);
1989
1990        // Verify fulltext indexes were created
1991        assert!(info.index_metadata.fulltext_index.index_size > 0);
1992        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
1993        // text_bloom (column_id 1) and text_tantivy (column_id 2)
1994        info.index_metadata.fulltext_index.columns.sort_unstable();
1995        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
1996
1997        assert_eq!(
1998            (
1999                Timestamp::new_millisecond(0),
2000                Timestamp::new_millisecond(199)
2001            ),
2002            info.time_range
2003        );
2004
2005        let handle = create_file_handle_from_sst_info(&info, &metadata);
2006
2007        let cache = create_test_cache();
2008
2009        // Helper functions to create fulltext function expressions
2010        let matches_func = || {
2011            Arc::new(
2012                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2013                    .provide(Default::default()),
2014            )
2015        };
2016
2017        let matches_term_func = || {
2018            Arc::new(
2019                ScalarFunctionFactory::from(
2020                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2021                )
2022                .provide(Default::default()),
2023            )
2024        };
2025
2026        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2027        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2028        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2029            args: vec![col("text_bloom"), "hello".lit()],
2030            func: matches_term_func(),
2031        })];
2032
2033        let fulltext_applier = FulltextIndexApplierBuilder::new(
2034            FILE_DIR.to_string(),
2035            PathType::Bare,
2036            object_store.clone(),
2037            env.get_puffin_manager(),
2038            &metadata,
2039        )
2040        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2041        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2042        .build(&preds)
2043        .unwrap()
2044        .map(Arc::new);
2045
2046        let builder = ParquetReaderBuilder::new(
2047            FILE_DIR.to_string(),
2048            PathType::Bare,
2049            handle.clone(),
2050            object_store.clone(),
2051        )
2052        .flat_format(true)
2053        .predicate(Some(Predicate::new(preds)))
2054        .fulltext_index_appliers([None, fulltext_applier.clone()])
2055        .cache(CacheStrategy::EnableAll(cache.clone()));
2056
2057        let mut metrics = ReaderMetrics::default();
2058        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2059
2060        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2061        assert_eq!(selection.row_group_count(), 2);
2062        assert_eq!(50, selection.get(0).unwrap().row_count());
2063        assert_eq!(50, selection.get(1).unwrap().row_count());
2064
2065        // Verify filtering metrics
2066        assert_eq!(metrics.filter_metrics.rg_total, 4);
2067        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2068        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2069        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2070
2071        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2072        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2073        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2074            args: vec![col("text_tantivy"), "lazy".lit()],
2075            func: matches_func(),
2076        })];
2077
2078        let fulltext_applier = FulltextIndexApplierBuilder::new(
2079            FILE_DIR.to_string(),
2080            PathType::Bare,
2081            object_store.clone(),
2082            env.get_puffin_manager(),
2083            &metadata,
2084        )
2085        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2086        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2087        .build(&preds)
2088        .unwrap()
2089        .map(Arc::new);
2090
2091        let builder = ParquetReaderBuilder::new(
2092            FILE_DIR.to_string(),
2093            PathType::Bare,
2094            handle.clone(),
2095            object_store.clone(),
2096        )
2097        .flat_format(true)
2098        .predicate(Some(Predicate::new(preds)))
2099        .fulltext_index_appliers([None, fulltext_applier.clone()])
2100        .cache(CacheStrategy::EnableAll(cache.clone()));
2101
2102        let mut metrics = ReaderMetrics::default();
2103        let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2104
2105        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2106        assert_eq!(selection.row_group_count(), 2);
2107        assert_eq!(50, selection.get(2).unwrap().row_count());
2108        assert_eq!(50, selection.get(3).unwrap().row_count());
2109
2110        // Verify filtering metrics
2111        assert_eq!(metrics.filter_metrics.rg_total, 4);
2112        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2113        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2114        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2115    }
2116}