Skip to main content

mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub mod metadata;
32pub mod prefilter;
33pub mod push_decoder;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41/// Key of metadata in parquet SST.
42pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44/// Default batch size to read parquet files.
45///
46/// This is a runtime-only scan granularity, so we align it with DataFusion's
47/// default execution batch size to reduce rebatching and concatenation in the
48/// query pipeline.
49pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50/// Default row group size for parquet files.
51///
52/// Keep the existing persisted/on-disk default stable. It intentionally stays
53/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
54/// batching without changing the row group layout of newly written SSTs.
55pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57/// Parquet write options.
58#[derive(Debug, Clone)]
59pub struct WriteOptions {
60    /// Buffer size for async writer.
61    pub write_buffer_size: ReadableSize,
62    /// Row group size.
63    pub row_group_size: usize,
64    /// Max single output file size.
65    /// Note: This is not a hard limit as we can only observe the file size when
66    /// ArrowWrite writes to underlying writers.
67    pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71    fn default() -> Self {
72        WriteOptions {
73            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74            row_group_size: DEFAULT_ROW_GROUP_SIZE,
75            max_file_size: None,
76        }
77    }
78}
79
80/// Parquet SST info returned by the writer.
81#[derive(Debug, Default)]
82pub struct SstInfo {
83    /// SST file id.
84    pub file_id: FileId,
85    /// Time range of the SST. The timestamps have the same time unit as the
86    /// data in the SST.
87    pub time_range: FileTimeRange,
88    /// File size in bytes.
89    pub file_size: u64,
90    /// Maximum uncompressed row group size in bytes. 0 if unknown.
91    pub max_row_group_uncompressed_size: u64,
92    /// Number of rows.
93    pub num_rows: usize,
94    /// Number of row groups
95    pub num_row_groups: u64,
96    /// File Meta Data
97    pub file_metadata: Option<Arc<ParquetMetaData>>,
98    /// Index Meta Data
99    pub index_metadata: IndexOutput,
100    /// Number of series
101    pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106    use std::collections::HashSet;
107    use std::sync::Arc;
108
109    use api::v1::{OpType, SemanticType};
110    use common_function::function::FunctionRef;
111    use common_function::function_factory::ScalarFunctionFactory;
112    use common_function::scalars::matches::MatchesFunction;
113    use common_function::scalars::matches_term::MatchesTermFunction;
114    use common_time::Timestamp;
115    use datafusion_common::{Column, ScalarValue};
116    use datafusion_expr::expr::ScalarFunction;
117    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118    use datatypes::arrow;
119    use datatypes::arrow::array::{
120        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121        TimestampMillisecondArray, UInt8Array, UInt64Array,
122    };
123    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124    use datatypes::arrow::util::pretty::pretty_format_batches;
125    use datatypes::prelude::ConcreteDataType;
126    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127    use object_store::ObjectStore;
128    use parquet::arrow::AsyncArrowWriter;
129    use parquet::basic::{Compression, Encoding, ZstdLevel};
130    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131    use parquet::file::properties::WriterProperties;
132    use store_api::codec::PrimaryKeyEncoding;
133    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134    use store_api::region_request::PathType;
135    use store_api::storage::{ColumnSchema, RegionId};
136    use table::predicate::Predicate;
137    use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139    use super::*;
140    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141    use crate::cache::index::result_cache::PredicateKey;
142    use crate::cache::test_util::assert_parquet_metadata_equal;
143    use crate::cache::{CacheManager, CacheStrategy};
144    use crate::config::IndexConfig;
145    use crate::read::FlatSource;
146    use crate::region::options::{IndexOptions, InvertedIndexOptions};
147    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
148    use crate::sst::file_purger::NoopFilePurger;
149    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
150    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
151    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
152    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
153    use crate::sst::parquet::flat_format::FlatWriteFormat;
154    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
155    use crate::sst::parquet::row_selection::RowGroupSelection;
156    use crate::sst::parquet::writer::ParquetWriter;
157    use crate::sst::{
158        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
159    };
160    use crate::test_util::TestEnv;
161    use crate::test_util::sst_util::{
162        build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
163        new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
164        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
165        sst_region_metadata_with_encoding,
166    };
167
168    const FILE_DIR: &str = "/";
169    const REGION_ID: RegionId = RegionId::new(0, 0);
170
171    #[derive(Clone)]
172    struct FixedPathProvider {
173        region_file_id: RegionFileId,
174    }
175
176    impl FilePathProvider for FixedPathProvider {
177        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
178            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
179        }
180
181        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
182            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
183        }
184
185        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
186            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
187        }
188    }
189
190    struct NoopIndexBuilder;
191
192    #[async_trait::async_trait]
193    impl IndexerBuilder for NoopIndexBuilder {
194        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
195            Indexer::default()
196        }
197    }
198
199    #[tokio::test]
200    async fn test_write_read() {
201        let mut env = TestEnv::new().await;
202        let object_store = env.init_object_store_manager();
203        let handle = sst_file_handle(0, 1000);
204        let file_path = FixedPathProvider {
205            region_file_id: handle.file_id(),
206        };
207        let metadata = Arc::new(sst_region_metadata());
208        let source = new_flat_source_from_record_batches(vec![
209            new_record_batch_by_range(&["a", "d"], 0, 60),
210            new_record_batch_by_range(&["b", "f"], 0, 40),
211            new_record_batch_by_range(&["b", "h"], 100, 200),
212        ]);
213        // Use a small row group size for test.
214        let write_opts = WriteOptions {
215            row_group_size: 50,
216            ..Default::default()
217        };
218
219        let mut metrics = Metrics::new(WriteType::Flush);
220        let mut writer = ParquetWriter::new_with_object_store(
221            object_store.clone(),
222            metadata.clone(),
223            IndexConfig::default(),
224            NoopIndexBuilder,
225            file_path,
226            &mut metrics,
227        )
228        .await;
229
230        let info = writer
231            .write_all_flat_as_primary_key(source, None, &write_opts)
232            .await
233            .unwrap()
234            .remove(0);
235        assert_eq!(200, info.num_rows);
236        assert!(info.file_size > 0);
237        assert_eq!(
238            (
239                Timestamp::new_millisecond(0),
240                Timestamp::new_millisecond(199)
241            ),
242            info.time_range
243        );
244
245        let builder = ParquetReaderBuilder::new(
246            FILE_DIR.to_string(),
247            PathType::Bare,
248            handle.clone(),
249            object_store,
250        );
251        let mut reader = builder.build().await.unwrap().unwrap();
252        check_record_batch_reader_result(
253            &mut reader,
254            &[
255                new_record_batch_by_range(&["a", "d"], 0, 50),
256                new_record_batch_by_range(&["a", "d"], 50, 60),
257                new_record_batch_by_range(&["b", "f"], 0, 40),
258                new_record_batch_by_range(&["b", "h"], 100, 150),
259                new_record_batch_by_range(&["b", "h"], 150, 200),
260            ],
261        )
262        .await;
263    }
264
265    #[tokio::test]
266    async fn test_read_with_cache() {
267        let mut env = TestEnv::new().await;
268        let object_store = env.init_object_store_manager();
269        let handle = sst_file_handle(0, 1000);
270        let metadata = Arc::new(sst_region_metadata());
271        let source = new_flat_source_from_record_batches(vec![
272            new_record_batch_by_range(&["a", "d"], 0, 60),
273            new_record_batch_by_range(&["b", "f"], 0, 40),
274            new_record_batch_by_range(&["b", "h"], 100, 200),
275        ]);
276        // Use a small row group size for test.
277        let write_opts = WriteOptions {
278            row_group_size: 50,
279            ..Default::default()
280        };
281        // Prepare data.
282        let mut metrics = Metrics::new(WriteType::Flush);
283        let mut writer = ParquetWriter::new_with_object_store(
284            object_store.clone(),
285            metadata.clone(),
286            IndexConfig::default(),
287            NoopIndexBuilder,
288            FixedPathProvider {
289                region_file_id: handle.file_id(),
290            },
291            &mut metrics,
292        )
293        .await;
294
295        let sst_info = writer
296            .write_all_flat_as_primary_key(source, None, &write_opts)
297            .await
298            .unwrap()
299            .remove(0);
300
301        // Enable page cache.
302        let cache = CacheStrategy::EnableAll(Arc::new(
303            CacheManager::builder()
304                .page_cache_size(64 * 1024 * 1024)
305                .build(),
306        ));
307        let builder = ParquetReaderBuilder::new(
308            FILE_DIR.to_string(),
309            PathType::Bare,
310            handle.clone(),
311            object_store,
312        )
313        .cache(cache.clone());
314        for _ in 0..3 {
315            let mut reader = builder.build().await.unwrap().unwrap();
316            check_record_batch_reader_result(
317                &mut reader,
318                &[
319                    new_record_batch_by_range(&["a", "d"], 0, 50),
320                    new_record_batch_by_range(&["a", "d"], 50, 60),
321                    new_record_batch_by_range(&["b", "f"], 0, 40),
322                    new_record_batch_by_range(&["b", "h"], 100, 150),
323                    new_record_batch_by_range(&["b", "h"], 150, 200),
324                ],
325            )
326            .await;
327        }
328
329        let parquet_meta = sst_info.file_metadata.unwrap();
330        let get_ranges = |row_group_idx: usize| {
331            let row_group = parquet_meta.row_group(row_group_idx);
332            let mut ranges = Vec::with_capacity(row_group.num_columns());
333            for i in 0..row_group.num_columns() {
334                let (start, length) = row_group.column(i).byte_range();
335                ranges.push(start..start + length);
336            }
337
338            ranges
339        };
340
341        // Cache 4 row groups.
342        for i in 0..4 {
343            let lookup = cache
344                .get_page_ranges(handle.file_id().file_id(), i, &get_ranges(i))
345                .unwrap();
346            assert!(lookup.is_fully_cached());
347        }
348        let missing_range = 0..10;
349        let lookup = cache
350            .get_page_ranges(
351                handle.file_id().file_id(),
352                5,
353                std::slice::from_ref(&missing_range),
354            )
355            .unwrap();
356        assert_eq!(vec![0..10], lookup.missing_ranges);
357    }
358
359    #[tokio::test]
360    async fn test_parquet_metadata_eq() {
361        // create test env
362        let mut env = crate::test_util::TestEnv::new().await;
363        let object_store = env.init_object_store_manager();
364        let handle = sst_file_handle(0, 1000);
365        let metadata = Arc::new(sst_region_metadata());
366        let source = new_flat_source_from_record_batches(vec![
367            new_record_batch_by_range(&["a", "d"], 0, 60),
368            new_record_batch_by_range(&["b", "f"], 0, 40),
369            new_record_batch_by_range(&["b", "h"], 100, 200),
370        ]);
371        let write_opts = WriteOptions {
372            row_group_size: 50,
373            ..Default::default()
374        };
375
376        // write the sst file and get sst info
377        // sst info contains the parquet metadata, which is converted from FileMetaData
378        let mut metrics = Metrics::new(WriteType::Flush);
379        let mut writer = ParquetWriter::new_with_object_store(
380            object_store.clone(),
381            metadata.clone(),
382            IndexConfig::default(),
383            NoopIndexBuilder,
384            FixedPathProvider {
385                region_file_id: handle.file_id(),
386            },
387            &mut metrics,
388        )
389        .await;
390
391        let sst_info = writer
392            .write_all_flat_as_primary_key(source, None, &write_opts)
393            .await
394            .unwrap()
395            .remove(0);
396        let writer_metadata = sst_info.file_metadata.unwrap();
397
398        // read the sst file metadata
399        let builder = ParquetReaderBuilder::new(
400            FILE_DIR.to_string(),
401            PathType::Bare,
402            handle.clone(),
403            object_store,
404        )
405        .page_index_policy(PageIndexPolicy::Optional);
406        let reader = builder.build().await.unwrap().unwrap();
407        let reader_metadata = reader.parquet_metadata();
408        let cached_writer_metadata =
409            crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
410                .unwrap()
411                .parquet_metadata();
412
413        assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
414    }
415
416    #[tokio::test]
417    async fn test_read_with_tag_filter() {
418        let mut env = TestEnv::new().await;
419        let object_store = env.init_object_store_manager();
420        let handle = sst_file_handle(0, 1000);
421        let metadata = Arc::new(sst_region_metadata());
422        let source = new_flat_source_from_record_batches(vec![
423            new_record_batch_by_range(&["a", "d"], 0, 60),
424            new_record_batch_by_range(&["b", "f"], 0, 40),
425            new_record_batch_by_range(&["b", "h"], 100, 200),
426        ]);
427        // Use a small row group size for test.
428        let write_opts = WriteOptions {
429            row_group_size: 50,
430            ..Default::default()
431        };
432        // Prepare data.
433        let mut metrics = Metrics::new(WriteType::Flush);
434        let mut writer = ParquetWriter::new_with_object_store(
435            object_store.clone(),
436            metadata.clone(),
437            IndexConfig::default(),
438            NoopIndexBuilder,
439            FixedPathProvider {
440                region_file_id: handle.file_id(),
441            },
442            &mut metrics,
443        )
444        .await;
445        writer
446            .write_all_flat_as_primary_key(source, None, &write_opts)
447            .await
448            .unwrap()
449            .remove(0);
450
451        // Predicate
452        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
453            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
454            op: Operator::Eq,
455            right: Box::new("a".lit()),
456        })]));
457
458        let builder = ParquetReaderBuilder::new(
459            FILE_DIR.to_string(),
460            PathType::Bare,
461            handle.clone(),
462            object_store,
463        )
464        .predicate(predicate);
465        let mut reader = builder.build().await.unwrap().unwrap();
466        check_record_batch_reader_result(
467            &mut reader,
468            &[
469                new_record_batch_by_range(&["a", "d"], 0, 50),
470                new_record_batch_by_range(&["a", "d"], 50, 60),
471            ],
472        )
473        .await;
474    }
475
476    #[tokio::test]
477    async fn test_read_empty_batch() {
478        let mut env = TestEnv::new().await;
479        let object_store = env.init_object_store_manager();
480        let handle = sst_file_handle(0, 1000);
481        let metadata = Arc::new(sst_region_metadata());
482        let source = new_flat_source_from_record_batches(vec![
483            new_record_batch_by_range(&["a", "z"], 0, 0),
484            new_record_batch_by_range(&["a", "z"], 100, 100),
485            new_record_batch_by_range(&["a", "z"], 200, 230),
486        ]);
487        // Use a small row group size for test.
488        let write_opts = WriteOptions {
489            row_group_size: 50,
490            ..Default::default()
491        };
492        // Prepare data.
493        let mut metrics = Metrics::new(WriteType::Flush);
494        let mut writer = ParquetWriter::new_with_object_store(
495            object_store.clone(),
496            metadata.clone(),
497            IndexConfig::default(),
498            NoopIndexBuilder,
499            FixedPathProvider {
500                region_file_id: handle.file_id(),
501            },
502            &mut metrics,
503        )
504        .await;
505        writer
506            .write_all_flat_as_primary_key(source, None, &write_opts)
507            .await
508            .unwrap()
509            .remove(0);
510
511        let builder = ParquetReaderBuilder::new(
512            FILE_DIR.to_string(),
513            PathType::Bare,
514            handle.clone(),
515            object_store,
516        );
517        let mut reader = builder.build().await.unwrap().unwrap();
518        check_record_batch_reader_result(
519            &mut reader,
520            &[new_record_batch_by_range(&["a", "z"], 200, 230)],
521        )
522        .await;
523    }
524
525    #[tokio::test]
526    async fn test_read_with_field_filter() {
527        let mut env = TestEnv::new().await;
528        let object_store = env.init_object_store_manager();
529        let handle = sst_file_handle(0, 1000);
530        let metadata = Arc::new(sst_region_metadata());
531        let source = new_flat_source_from_record_batches(vec![
532            new_record_batch_by_range(&["a", "d"], 0, 60),
533            new_record_batch_by_range(&["b", "f"], 0, 40),
534            new_record_batch_by_range(&["b", "h"], 100, 200),
535        ]);
536        // Use a small row group size for test.
537        let write_opts = WriteOptions {
538            row_group_size: 50,
539            ..Default::default()
540        };
541        // Prepare data.
542        let mut metrics = Metrics::new(WriteType::Flush);
543        let mut writer = ParquetWriter::new_with_object_store(
544            object_store.clone(),
545            metadata.clone(),
546            IndexConfig::default(),
547            NoopIndexBuilder,
548            FixedPathProvider {
549                region_file_id: handle.file_id(),
550            },
551            &mut metrics,
552        )
553        .await;
554
555        writer
556            .write_all_flat_as_primary_key(source, None, &write_opts)
557            .await
558            .unwrap()
559            .remove(0);
560
561        // Predicate
562        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
563            left: Box::new(Expr::Column(Column::from_name("field_0"))),
564            op: Operator::GtEq,
565            right: Box::new(150u64.lit()),
566        })]));
567
568        let builder = ParquetReaderBuilder::new(
569            FILE_DIR.to_string(),
570            PathType::Bare,
571            handle.clone(),
572            object_store,
573        )
574        .predicate(predicate);
575        let mut reader = builder.build().await.unwrap().unwrap();
576        check_record_batch_reader_result(
577            &mut reader,
578            &[new_record_batch_by_range(&["b", "h"], 150, 200)],
579        )
580        .await;
581    }
582
583    #[tokio::test]
584    async fn test_read_large_binary() {
585        let mut env = TestEnv::new().await;
586        let object_store = env.init_object_store_manager();
587        let handle = sst_file_handle(0, 1000);
588        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
589
590        let write_opts = WriteOptions {
591            row_group_size: 50,
592            ..Default::default()
593        };
594
595        let metadata = build_test_binary_test_region_metadata();
596        let json = metadata.to_json().unwrap();
597        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
598
599        let props_builder = WriterProperties::builder()
600            .set_key_value_metadata(Some(vec![key_value_meta]))
601            .set_compression(Compression::ZSTD(ZstdLevel::default()))
602            .set_encoding(Encoding::PLAIN)
603            .set_max_row_group_row_count(Some(write_opts.row_group_size));
604
605        let writer_props = props_builder.build();
606
607        let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
608        let fields: Vec<_> = write_format
609            .arrow_schema()
610            .fields()
611            .into_iter()
612            .map(|field| {
613                let data_type = field.data_type().clone();
614                if data_type == DataType::Binary {
615                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
616                } else {
617                    Field::new(field.name(), data_type, field.is_nullable())
618                }
619            })
620            .collect();
621
622        let arrow_schema = Arc::new(Schema::new(fields));
623
624        // Ensures field_0 has LargeBinary type.
625        assert_eq!(
626            &DataType::LargeBinary,
627            arrow_schema.field_with_name("field_0").unwrap().data_type()
628        );
629        let mut writer = AsyncArrowWriter::try_new(
630            object_store
631                .writer_with(&file_path)
632                .concurrent(DEFAULT_WRITE_CONCURRENCY)
633                .await
634                .map(|w| w.into_futures_async_write().compat_write())
635                .unwrap(),
636            arrow_schema.clone(),
637            Some(writer_props),
638        )
639        .unwrap();
640
641        let batch = new_record_batch_with_binary(&["a"], 0, 60);
642        let arrays: Vec<_> = batch
643            .columns()
644            .iter()
645            .map(|array| {
646                let data_type = array.data_type().clone();
647                if data_type == DataType::Binary {
648                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
649                } else {
650                    array.clone()
651                }
652            })
653            .collect();
654        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
655
656        writer.write(&result).await.unwrap();
657        writer.close().await.unwrap();
658
659        let builder = ParquetReaderBuilder::new(
660            FILE_DIR.to_string(),
661            PathType::Bare,
662            handle.clone(),
663            object_store,
664        );
665        let mut reader = builder.build().await.unwrap().unwrap();
666        check_record_batch_reader_result(
667            &mut reader,
668            &[
669                new_record_batch_with_binary(&["a"], 0, 50),
670                new_record_batch_with_binary(&["a"], 50, 60),
671            ],
672        )
673        .await;
674    }
675
676    #[tokio::test]
677    async fn test_write_multiple_files() {
678        common_telemetry::init_default_ut_logging();
679        // create test env
680        let mut env = TestEnv::new().await;
681        let object_store = env.init_object_store_manager();
682        let metadata = Arc::new(sst_region_metadata());
683        let batches = vec![
684            new_record_batch_by_range(&["a", "d"], 0, 1000),
685            new_record_batch_by_range(&["b", "f"], 0, 1000),
686            new_record_batch_by_range(&["c", "g"], 0, 1000),
687            new_record_batch_by_range(&["b", "h"], 100, 200),
688            new_record_batch_by_range(&["b", "h"], 200, 300),
689            new_record_batch_by_range(&["b", "h"], 300, 1000),
690        ];
691        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
692
693        let source = new_flat_source_from_record_batches(batches);
694        let write_opts = WriteOptions {
695            row_group_size: 50,
696            max_file_size: Some(1024 * 16),
697            ..Default::default()
698        };
699
700        let path_provider = RegionFilePathFactory {
701            table_dir: "test".to_string(),
702            path_type: PathType::Bare,
703        };
704        let mut metrics = Metrics::new(WriteType::Flush);
705        let mut writer = ParquetWriter::new_with_object_store(
706            object_store.clone(),
707            metadata.clone(),
708            IndexConfig::default(),
709            NoopIndexBuilder,
710            path_provider,
711            &mut metrics,
712        )
713        .await;
714
715        let files = writer
716            .write_all_flat_as_primary_key(source, None, &write_opts)
717            .await
718            .unwrap();
719        assert_eq!(2, files.len());
720
721        let mut rows_read = 0;
722        for f in &files {
723            let file_handle = sst_file_handle_with_file_id(
724                f.file_id,
725                f.time_range.0.value(),
726                f.time_range.1.value(),
727            );
728            let builder = ParquetReaderBuilder::new(
729                "test".to_string(),
730                PathType::Bare,
731                file_handle,
732                object_store.clone(),
733            );
734            let mut reader = builder.build().await.unwrap().unwrap();
735            while let Some(batch) = reader.next_record_batch().await.unwrap() {
736                rows_read += batch.num_rows();
737            }
738        }
739        assert_eq!(total_rows, rows_read);
740    }
741
742    #[tokio::test]
743    async fn test_write_read_with_index() {
744        let mut env = TestEnv::new().await;
745        let object_store = env.init_object_store_manager();
746        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
747        let metadata = Arc::new(sst_region_metadata());
748        let row_group_size = 50;
749
750        let source = new_flat_source_from_record_batches(vec![
751            new_record_batch_by_range(&["a", "d"], 0, 20),
752            new_record_batch_by_range(&["b", "d"], 0, 20),
753            new_record_batch_by_range(&["c", "d"], 0, 20),
754            new_record_batch_by_range(&["c", "f"], 0, 40),
755            new_record_batch_by_range(&["c", "h"], 100, 200),
756        ]);
757        // Use a small row group size for test.
758        let write_opts = WriteOptions {
759            row_group_size,
760            ..Default::default()
761        };
762
763        let puffin_manager = env
764            .get_puffin_manager()
765            .build(object_store.clone(), file_path.clone());
766        let intermediate_manager = env.get_intermediate_manager();
767
768        let indexer_builder = IndexerBuilderImpl {
769            build_type: IndexBuildType::Flush,
770            metadata: metadata.clone(),
771            row_group_size,
772            puffin_manager,
773            write_cache_enabled: false,
774            intermediate_manager,
775            index_options: IndexOptions {
776                inverted_index: InvertedIndexOptions {
777                    segment_row_count: 1,
778                    ..Default::default()
779                },
780            },
781            inverted_index_config: Default::default(),
782            fulltext_index_config: Default::default(),
783            bloom_filter_index_config: Default::default(),
784            #[cfg(feature = "vector_index")]
785            vector_index_config: Default::default(),
786        };
787
788        let mut metrics = Metrics::new(WriteType::Flush);
789        let mut writer = ParquetWriter::new_with_object_store(
790            object_store.clone(),
791            metadata.clone(),
792            IndexConfig::default(),
793            indexer_builder,
794            file_path.clone(),
795            &mut metrics,
796        )
797        .await;
798
799        let info = writer
800            .write_all_flat_as_primary_key(source, None, &write_opts)
801            .await
802            .unwrap()
803            .remove(0);
804        assert_eq!(200, info.num_rows);
805        assert!(info.file_size > 0);
806        assert!(info.index_metadata.file_size > 0);
807
808        assert!(info.index_metadata.inverted_index.index_size > 0);
809        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
810        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
811
812        assert!(info.index_metadata.bloom_filter.index_size > 0);
813        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
814        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
815
816        assert_eq!(
817            (
818                Timestamp::new_millisecond(0),
819                Timestamp::new_millisecond(199)
820            ),
821            info.time_range
822        );
823
824        let handle = FileHandle::new(
825            FileMeta {
826                region_id: metadata.region_id,
827                file_id: info.file_id,
828                time_range: info.time_range,
829                level: 0,
830                file_size: info.file_size,
831                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
832                available_indexes: info.index_metadata.build_available_indexes(),
833                indexes: info.index_metadata.build_indexes(),
834                index_file_size: info.index_metadata.file_size,
835                index_version: 0,
836                num_row_groups: info.num_row_groups,
837                num_rows: info.num_rows as u64,
838                sequence: None,
839                partition_expr: match &metadata.partition_expr {
840                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
841                        .expect("partition expression should be valid JSON"),
842                    None => None,
843                },
844                num_series: 0,
845                ..Default::default()
846            },
847            Arc::new(NoopFilePurger),
848        );
849
850        let cache = Arc::new(
851            CacheManager::builder()
852                .index_result_cache_size(1024 * 1024)
853                .index_metadata_size(1024 * 1024)
854                .index_content_page_size(1024 * 1024)
855                .index_content_size(1024 * 1024)
856                .puffin_metadata_size(1024 * 1024)
857                .build(),
858        );
859        let index_result_cache = cache.index_result_cache().unwrap();
860
861        let build_inverted_index_applier = |exprs: &[Expr]| {
862            InvertedIndexApplierBuilder::new(
863                FILE_DIR.to_string(),
864                PathType::Bare,
865                object_store.clone(),
866                &metadata,
867                HashSet::from_iter([0]),
868                env.get_puffin_manager(),
869            )
870            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
871            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
872            .build(exprs)
873            .unwrap()
874            .map(Arc::new)
875        };
876
877        let build_bloom_filter_applier = |exprs: &[Expr]| {
878            BloomFilterIndexApplierBuilder::new(
879                FILE_DIR.to_string(),
880                PathType::Bare,
881                object_store.clone(),
882                &metadata,
883                env.get_puffin_manager(),
884            )
885            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
886            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
887            .build(exprs)
888            .unwrap()
889            .map(Arc::new)
890        };
891
892        // Data: ts tag_0 tag_1
893        // Data: 0-20 [a, d]
894        //       0-20 [b, d]
895        //       0-20 [c, d]
896        //       0-40 [c, f]
897        //    100-200 [c, h]
898        //
899        // Pred: tag_0 = "b"
900        //
901        // Row groups & rows pruning:
902        //
903        // Row Groups:
904        // - min-max: filter out row groups 1..=3
905        //
906        // Rows:
907        // - inverted index: hit row group 0, hit 20 rows
908        let preds = vec![col("tag_0").eq(lit("b"))];
909        let inverted_index_applier = build_inverted_index_applier(&preds);
910        let bloom_filter_applier = build_bloom_filter_applier(&preds);
911
912        let builder = ParquetReaderBuilder::new(
913            FILE_DIR.to_string(),
914            PathType::Bare,
915            handle.clone(),
916            object_store.clone(),
917        )
918        .predicate(Some(Predicate::new(preds)))
919        .inverted_index_appliers([inverted_index_applier.clone(), None])
920        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
921        .cache(CacheStrategy::EnableAll(cache.clone()));
922
923        let mut metrics = ReaderMetrics::default();
924        let (context, selection) = builder
925            .build_reader_input(&mut metrics)
926            .await
927            .unwrap()
928            .unwrap();
929        let mut reader = ParquetReader::new(Arc::new(context), selection)
930            .await
931            .unwrap();
932        check_record_batch_reader_result(
933            &mut reader,
934            &[new_record_batch_by_range(&["b", "d"], 0, 20)],
935        )
936        .await;
937
938        assert_eq!(metrics.filter_metrics.rg_total, 4);
939        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
940        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
941        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
942        let plan = inverted_index_applier
943            .as_ref()
944            .unwrap()
945            .plan_for_sst(&metadata)
946            .unwrap()
947            .unwrap();
948        let cached = index_result_cache
949            .get(&plan.predicate_key, handle.file_id().file_id())
950            .unwrap();
951        // inverted index will search all row groups
952        assert!(cached.contains_row_group(0));
953        assert!(cached.contains_row_group(1));
954        assert!(cached.contains_row_group(2));
955        assert!(cached.contains_row_group(3));
956
957        // Data: ts tag_0 tag_1
958        // Data: 0-20 [a, d]
959        //       0-20 [b, d]
960        //       0-20 [c, d]
961        //       0-40 [c, f]
962        //    100-200 [c, h]
963        //
964        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
965        //
966        // Row groups & rows pruning:
967        //
968        // Row Groups:
969        // - min-max: filter out row groups 0..=1
970        // - bloom filter: filter out row groups 2..=3
971        let preds = vec![
972            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
973            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
974            col("tag_1").eq(lit("d")),
975        ];
976        let inverted_index_applier = build_inverted_index_applier(&preds);
977        let bloom_filter_applier = build_bloom_filter_applier(&preds);
978
979        let builder = ParquetReaderBuilder::new(
980            FILE_DIR.to_string(),
981            PathType::Bare,
982            handle.clone(),
983            object_store.clone(),
984        )
985        .predicate(Some(Predicate::new(preds)))
986        .inverted_index_appliers([inverted_index_applier.clone(), None])
987        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
988        .cache(CacheStrategy::EnableAll(cache.clone()));
989
990        let mut metrics = ReaderMetrics::default();
991        let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
992        assert!(read_input.is_none());
993
994        assert_eq!(metrics.filter_metrics.rg_total, 4);
995        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
996        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
997        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
998        let bloom_predicates = bloom_filter_applier
999            .as_ref()
1000            .unwrap()
1001            .compatible_predicate_for_sst(&metadata)
1002            .unwrap();
1003        let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1004        let cached = index_result_cache
1005            .get(&bloom_predicate_key, handle.file_id().file_id())
1006            .unwrap();
1007        assert!(cached.contains_row_group(2));
1008        assert!(cached.contains_row_group(3));
1009        assert!(!cached.contains_row_group(0));
1010        assert!(!cached.contains_row_group(1));
1011
1012        // Remove the pred of `ts`, continue to use the pred of `tag_1`
1013        // to test if cache works.
1014
1015        // Data: ts tag_0 tag_1
1016        // Data: 0-20 [a, d]
1017        //       0-20 [b, d]
1018        //       0-20 [c, d]
1019        //       0-40 [c, f]
1020        //    100-200 [c, h]
1021        //
1022        // Pred: tag_1 = "d"
1023        //
1024        // Row groups & rows pruning:
1025        //
1026        // Row Groups:
1027        // - bloom filter: filter out row groups 2..=3
1028        //
1029        // Rows:
1030        // - bloom filter: hit row group 0, hit 50 rows
1031        //                 hit row group 1, hit 10 rows
1032        let preds = vec![col("tag_1").eq(lit("d"))];
1033        let inverted_index_applier = build_inverted_index_applier(&preds);
1034        let bloom_filter_applier = build_bloom_filter_applier(&preds);
1035
1036        let builder = ParquetReaderBuilder::new(
1037            FILE_DIR.to_string(),
1038            PathType::Bare,
1039            handle.clone(),
1040            object_store.clone(),
1041        )
1042        .predicate(Some(Predicate::new(preds)))
1043        .inverted_index_appliers([inverted_index_applier.clone(), None])
1044        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1045        .cache(CacheStrategy::EnableAll(cache.clone()));
1046
1047        let mut metrics = ReaderMetrics::default();
1048        let (context, selection) = builder
1049            .build_reader_input(&mut metrics)
1050            .await
1051            .unwrap()
1052            .unwrap();
1053        let mut reader = ParquetReader::new(Arc::new(context), selection)
1054            .await
1055            .unwrap();
1056        check_record_batch_reader_result(
1057            &mut reader,
1058            &[
1059                new_record_batch_by_range(&["a", "d"], 0, 20),
1060                new_record_batch_by_range(&["b", "d"], 0, 20),
1061                new_record_batch_by_range(&["c", "d"], 0, 10),
1062                new_record_batch_by_range(&["c", "d"], 10, 20),
1063            ],
1064        )
1065        .await;
1066
1067        assert_eq!(metrics.filter_metrics.rg_total, 4);
1068        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1069        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1070        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1071        let bloom_predicates = bloom_filter_applier
1072            .as_ref()
1073            .unwrap()
1074            .compatible_predicate_for_sst(&metadata)
1075            .unwrap();
1076        let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1077        let cached = index_result_cache
1078            .get(&bloom_predicate_key, handle.file_id().file_id())
1079            .unwrap();
1080        assert!(cached.contains_row_group(0));
1081        assert!(cached.contains_row_group(1));
1082        assert!(cached.contains_row_group(2));
1083        assert!(cached.contains_row_group(3));
1084    }
1085
1086    fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1087        assert!(end >= start);
1088        let metadata = build_test_binary_test_region_metadata();
1089        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1090
1091        let num_rows = end - start;
1092        let mut columns = Vec::new();
1093
1094        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1095        for _ in 0..num_rows {
1096            tag_0_builder.append_value(tags[0]);
1097        }
1098        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1099
1100        let values = (0..num_rows)
1101            .map(|_| "some data".as_bytes())
1102            .collect::<Vec<_>>();
1103        columns.push(
1104            Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1105                values,
1106            )) as ArrayRef,
1107        );
1108
1109        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1110        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1111
1112        let pk = new_primary_key(tags);
1113        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1114        for _ in 0..num_rows {
1115            pk_builder.append(&pk).unwrap();
1116        }
1117        columns.push(Arc::new(pk_builder.finish()));
1118
1119        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1120        columns.push(Arc::new(UInt8Array::from_value(
1121            OpType::Put as u8,
1122            num_rows,
1123        )));
1124
1125        RecordBatch::try_new(flat_schema, columns).unwrap()
1126    }
1127
1128    async fn check_record_batch_reader_result(
1129        reader: &mut ParquetReader,
1130        expected: &[RecordBatch],
1131    ) {
1132        let mut actual = Vec::new();
1133        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1134            actual.push(batch);
1135        }
1136        assert_eq!(
1137            pretty_format_batches(expected).unwrap().to_string(),
1138            pretty_format_batches(&actual).unwrap().to_string()
1139        );
1140        assert!(reader.next_record_batch().await.unwrap().is_none());
1141    }
1142
1143    fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1144        let metadata = Arc::new(sst_region_metadata());
1145        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1146
1147        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1148        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1149        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1150        let mut field_values = Vec::with_capacity(rows.len());
1151        let mut timestamps = Vec::with_capacity(rows.len());
1152
1153        for (tag_0, tag_1, ts) in rows {
1154            tag_0_builder.append_value(*tag_0);
1155            tag_1_builder.append_value(*tag_1);
1156            pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1157            field_values.push(*ts as u64);
1158            timestamps.push(*ts);
1159        }
1160
1161        RecordBatch::try_new(
1162            flat_schema,
1163            vec![
1164                Arc::new(tag_0_builder.finish()) as ArrayRef,
1165                Arc::new(tag_1_builder.finish()) as ArrayRef,
1166                Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1167                Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1168                Arc::new(pk_builder.finish()) as ArrayRef,
1169                Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1170                Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1171            ],
1172        )
1173        .unwrap()
1174    }
1175
1176    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1177    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1178    fn new_record_batch_by_range_sparse(
1179        tags: &[&str],
1180        start: usize,
1181        end: usize,
1182        metadata: &Arc<RegionMetadata>,
1183    ) -> RecordBatch {
1184        assert!(end >= start);
1185        let flat_schema = to_flat_sst_arrow_schema(
1186            metadata,
1187            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1188        );
1189
1190        let num_rows = end - start;
1191        let mut columns: Vec<ArrayRef> = Vec::new();
1192
1193        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1194
1195        // Add field column (field_0)
1196        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1197        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1198
1199        // Add time index column (ts)
1200        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1201        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1202
1203        // Add encoded primary key column using sparse encoding
1204        let table_id = 1u32; // Test table ID
1205        let tsid = 100u64; // Base TSID
1206        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1207
1208        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1209        for _ in 0..num_rows {
1210            pk_builder.append(&pk).unwrap();
1211        }
1212        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1213
1214        // Add sequence column
1215        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1216
1217        // Add op_type column
1218        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1219
1220        RecordBatch::try_new(flat_schema, columns).unwrap()
1221    }
1222
1223    /// Helper function to create IndexerBuilderImpl for tests.
1224    fn create_test_indexer_builder(
1225        env: &TestEnv,
1226        object_store: ObjectStore,
1227        file_path: RegionFilePathFactory,
1228        metadata: Arc<RegionMetadata>,
1229        row_group_size: usize,
1230    ) -> IndexerBuilderImpl {
1231        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1232        let intermediate_manager = env.get_intermediate_manager();
1233
1234        IndexerBuilderImpl {
1235            build_type: IndexBuildType::Flush,
1236            metadata,
1237            row_group_size,
1238            puffin_manager,
1239            write_cache_enabled: false,
1240            intermediate_manager,
1241            index_options: IndexOptions {
1242                inverted_index: InvertedIndexOptions {
1243                    segment_row_count: 1,
1244                    ..Default::default()
1245                },
1246            },
1247            inverted_index_config: Default::default(),
1248            fulltext_index_config: Default::default(),
1249            bloom_filter_index_config: Default::default(),
1250            #[cfg(feature = "vector_index")]
1251            vector_index_config: Default::default(),
1252        }
1253    }
1254
1255    /// Helper function to write flat SST and return SstInfo.
1256    async fn write_flat_sst(
1257        object_store: ObjectStore,
1258        metadata: Arc<RegionMetadata>,
1259        indexer_builder: IndexerBuilderImpl,
1260        file_path: RegionFilePathFactory,
1261        flat_source: FlatSource,
1262        write_opts: &WriteOptions,
1263    ) -> SstInfo {
1264        let mut metrics = Metrics::new(WriteType::Flush);
1265        let mut writer = ParquetWriter::new_with_object_store(
1266            object_store,
1267            metadata,
1268            IndexConfig::default(),
1269            indexer_builder,
1270            file_path,
1271            &mut metrics,
1272        )
1273        .await;
1274
1275        writer
1276            .write_all_flat(flat_source, None, write_opts)
1277            .await
1278            .unwrap()
1279            .remove(0)
1280    }
1281
1282    /// Helper function to create FileHandle from SstInfo.
1283    fn create_file_handle_from_sst_info(
1284        info: &SstInfo,
1285        metadata: &Arc<RegionMetadata>,
1286    ) -> FileHandle {
1287        FileHandle::new(
1288            FileMeta {
1289                region_id: metadata.region_id,
1290                file_id: info.file_id,
1291                time_range: info.time_range,
1292                level: 0,
1293                file_size: info.file_size,
1294                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1295                available_indexes: info.index_metadata.build_available_indexes(),
1296                indexes: info.index_metadata.build_indexes(),
1297                index_file_size: info.index_metadata.file_size,
1298                index_version: 0,
1299                num_row_groups: info.num_row_groups,
1300                num_rows: info.num_rows as u64,
1301                sequence: None,
1302                partition_expr: match &metadata.partition_expr {
1303                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1304                        .expect("partition expression should be valid JSON"),
1305                    None => None,
1306                },
1307                num_series: 0,
1308                ..Default::default()
1309            },
1310            Arc::new(NoopFilePurger),
1311        )
1312    }
1313
1314    /// Helper function to create test cache with standard settings.
1315    fn create_test_cache() -> Arc<CacheManager> {
1316        Arc::new(
1317            CacheManager::builder()
1318                .index_result_cache_size(1024 * 1024)
1319                .index_metadata_size(1024 * 1024)
1320                .index_content_page_size(1024 * 1024)
1321                .index_content_size(1024 * 1024)
1322                .puffin_metadata_size(1024 * 1024)
1323                .build(),
1324        )
1325    }
1326
1327    #[tokio::test]
1328    async fn test_write_flat_with_index() {
1329        let mut env = TestEnv::new().await;
1330        let object_store = env.init_object_store_manager();
1331        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1332        let metadata = Arc::new(sst_region_metadata());
1333        let row_group_size = 50;
1334
1335        // Create flat format RecordBatches
1336        let flat_batches = vec![
1337            new_record_batch_by_range(&["a", "d"], 0, 20),
1338            new_record_batch_by_range(&["b", "d"], 0, 20),
1339            new_record_batch_by_range(&["c", "d"], 0, 20),
1340            new_record_batch_by_range(&["c", "f"], 0, 40),
1341            new_record_batch_by_range(&["c", "h"], 100, 200),
1342        ];
1343
1344        let flat_source = new_flat_source_from_record_batches(flat_batches);
1345
1346        let write_opts = WriteOptions {
1347            row_group_size,
1348            ..Default::default()
1349        };
1350
1351        let puffin_manager = env
1352            .get_puffin_manager()
1353            .build(object_store.clone(), file_path.clone());
1354        let intermediate_manager = env.get_intermediate_manager();
1355
1356        let indexer_builder = IndexerBuilderImpl {
1357            build_type: IndexBuildType::Flush,
1358            metadata: metadata.clone(),
1359            row_group_size,
1360            puffin_manager,
1361            write_cache_enabled: false,
1362            intermediate_manager,
1363            index_options: IndexOptions {
1364                inverted_index: InvertedIndexOptions {
1365                    segment_row_count: 1,
1366                    ..Default::default()
1367                },
1368            },
1369            inverted_index_config: Default::default(),
1370            fulltext_index_config: Default::default(),
1371            bloom_filter_index_config: Default::default(),
1372            #[cfg(feature = "vector_index")]
1373            vector_index_config: Default::default(),
1374        };
1375
1376        let mut metrics = Metrics::new(WriteType::Flush);
1377        let mut writer = ParquetWriter::new_with_object_store(
1378            object_store.clone(),
1379            metadata.clone(),
1380            IndexConfig::default(),
1381            indexer_builder,
1382            file_path.clone(),
1383            &mut metrics,
1384        )
1385        .await;
1386
1387        let info = writer
1388            .write_all_flat(flat_source, None, &write_opts)
1389            .await
1390            .unwrap()
1391            .remove(0);
1392        assert_eq!(200, info.num_rows);
1393        assert!(info.file_size > 0);
1394        assert!(info.index_metadata.file_size > 0);
1395
1396        assert!(info.index_metadata.inverted_index.index_size > 0);
1397        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1398        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1399
1400        assert!(info.index_metadata.bloom_filter.index_size > 0);
1401        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1402        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1403
1404        assert_eq!(
1405            (
1406                Timestamp::new_millisecond(0),
1407                Timestamp::new_millisecond(199)
1408            ),
1409            info.time_range
1410        );
1411    }
1412
1413    #[tokio::test]
1414    async fn test_read_with_override_sequence() {
1415        let mut env = TestEnv::new().await;
1416        let object_store = env.init_object_store_manager();
1417        let handle = sst_file_handle(0, 1000);
1418        let file_path = FixedPathProvider {
1419            region_file_id: handle.file_id(),
1420        };
1421        let metadata = Arc::new(sst_region_metadata());
1422
1423        // Create batches with sequence 0 to trigger override functionality.
1424        let source = new_flat_source_from_record_batches(vec![
1425            new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1426            new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1427        ]);
1428
1429        let write_opts = WriteOptions {
1430            row_group_size: 50,
1431            ..Default::default()
1432        };
1433
1434        let mut metrics = Metrics::new(WriteType::Flush);
1435        let mut writer = ParquetWriter::new_with_object_store(
1436            object_store.clone(),
1437            metadata.clone(),
1438            IndexConfig::default(),
1439            NoopIndexBuilder,
1440            file_path,
1441            &mut metrics,
1442        )
1443        .await;
1444
1445        writer
1446            .write_all_flat_as_primary_key(source, None, &write_opts)
1447            .await
1448            .unwrap()
1449            .remove(0);
1450
1451        // Read without override sequence (should read sequence 0)
1452        let builder = ParquetReaderBuilder::new(
1453            FILE_DIR.to_string(),
1454            PathType::Bare,
1455            handle.clone(),
1456            object_store.clone(),
1457        );
1458        let mut reader = builder.build().await.unwrap().unwrap();
1459        let mut normal_batches = Vec::new();
1460        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1461            normal_batches.push(batch);
1462        }
1463
1464        // Read with override sequence using FileMeta.sequence
1465        let custom_sequence = 12345u64;
1466        let file_meta = handle.meta_ref();
1467        let mut override_file_meta = file_meta.clone();
1468        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1469        let override_handle = FileHandle::new(
1470            override_file_meta,
1471            Arc::new(crate::sst::file_purger::NoopFilePurger),
1472        );
1473
1474        let builder = ParquetReaderBuilder::new(
1475            FILE_DIR.to_string(),
1476            PathType::Bare,
1477            override_handle,
1478            object_store.clone(),
1479        );
1480        let mut reader = builder.build().await.unwrap().unwrap();
1481        let mut override_batches = Vec::new();
1482        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1483            override_batches.push(batch);
1484        }
1485
1486        // Compare the results
1487        assert_eq!(normal_batches.len(), override_batches.len());
1488        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1489            let expected_batch = {
1490                let mut columns = normal.columns().to_vec();
1491                let num_cols = columns.len();
1492                columns[num_cols - 2] =
1493                    Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1494                RecordBatch::try_new(normal.schema(), columns).unwrap()
1495            };
1496
1497            // Override batch should match expected batch
1498            assert_eq!(*override_batch, expected_batch);
1499        }
1500    }
1501
1502    #[tokio::test]
1503    async fn test_write_flat_read_with_inverted_index() {
1504        let mut env = TestEnv::new().await;
1505        let object_store = env.init_object_store_manager();
1506        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1507        let metadata = Arc::new(sst_region_metadata());
1508        let row_group_size = 100;
1509
1510        // Create flat format RecordBatches with non-overlapping timestamp ranges
1511        // Each batch becomes one row group (row_group_size = 100)
1512        // Data: ts tag_0 tag_1
1513        // RG 0:   0-50  [a, d]
1514        // RG 0:  50-100 [b, d]
1515        // RG 1: 100-150 [c, d]
1516        // RG 1: 150-200 [c, f]
1517        let flat_batches = vec![
1518            new_record_batch_by_range(&["a", "d"], 0, 50),
1519            new_record_batch_by_range(&["b", "d"], 50, 100),
1520            new_record_batch_by_range(&["c", "d"], 100, 150),
1521            new_record_batch_by_range(&["c", "f"], 150, 200),
1522        ];
1523
1524        let flat_source = new_flat_source_from_record_batches(flat_batches);
1525
1526        let write_opts = WriteOptions {
1527            row_group_size,
1528            ..Default::default()
1529        };
1530
1531        let indexer_builder = create_test_indexer_builder(
1532            &env,
1533            object_store.clone(),
1534            file_path.clone(),
1535            metadata.clone(),
1536            row_group_size,
1537        );
1538
1539        let info = write_flat_sst(
1540            object_store.clone(),
1541            metadata.clone(),
1542            indexer_builder,
1543            file_path.clone(),
1544            flat_source,
1545            &write_opts,
1546        )
1547        .await;
1548        assert_eq!(200, info.num_rows);
1549        assert!(info.file_size > 0);
1550        assert!(info.index_metadata.file_size > 0);
1551
1552        let handle = create_file_handle_from_sst_info(&info, &metadata);
1553
1554        let cache = create_test_cache();
1555
1556        // Test 1: Filter by tag_0 = "b"
1557        // Expected: Only rows with tag_0="b"
1558        let preds = vec![col("tag_0").eq(lit("b"))];
1559        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1560            FILE_DIR.to_string(),
1561            PathType::Bare,
1562            object_store.clone(),
1563            &metadata,
1564            HashSet::from_iter([0]),
1565            env.get_puffin_manager(),
1566        )
1567        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1568        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1569        .build(&preds)
1570        .unwrap()
1571        .map(Arc::new);
1572
1573        let builder = ParquetReaderBuilder::new(
1574            FILE_DIR.to_string(),
1575            PathType::Bare,
1576            handle.clone(),
1577            object_store.clone(),
1578        )
1579        .predicate(Some(Predicate::new(preds)))
1580        .inverted_index_appliers([inverted_index_applier.clone(), None])
1581        .cache(CacheStrategy::EnableAll(cache.clone()));
1582
1583        let mut metrics = ReaderMetrics::default();
1584        let (_context, selection) = builder
1585            .build_reader_input(&mut metrics)
1586            .await
1587            .unwrap()
1588            .unwrap();
1589
1590        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1591        assert_eq!(selection.row_group_count(), 1);
1592        assert_eq!(50, selection.get(0).unwrap().row_count());
1593
1594        // Verify filtering metrics
1595        assert_eq!(metrics.filter_metrics.rg_total, 2);
1596        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1597        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1598        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1599    }
1600
1601    #[tokio::test]
1602    async fn test_write_flat_read_with_bloom_filter() {
1603        let mut env = TestEnv::new().await;
1604        let object_store = env.init_object_store_manager();
1605        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1606        let metadata = Arc::new(sst_region_metadata());
1607        let row_group_size = 100;
1608
1609        // Create flat format RecordBatches with non-overlapping timestamp ranges
1610        // Each batch becomes one row group (row_group_size = 100)
1611        // Data: ts tag_0 tag_1
1612        // RG 0:   0-50  [a, d]
1613        // RG 0:  50-100 [b, e]
1614        // RG 1: 100-150 [c, d]
1615        // RG 1: 150-200 [c, f]
1616        let flat_batches = vec![
1617            new_record_batch_by_range(&["a", "d"], 0, 50),
1618            new_record_batch_by_range(&["b", "e"], 50, 100),
1619            new_record_batch_by_range(&["c", "d"], 100, 150),
1620            new_record_batch_by_range(&["c", "f"], 150, 200),
1621        ];
1622
1623        let flat_source = new_flat_source_from_record_batches(flat_batches);
1624
1625        let write_opts = WriteOptions {
1626            row_group_size,
1627            ..Default::default()
1628        };
1629
1630        let indexer_builder = create_test_indexer_builder(
1631            &env,
1632            object_store.clone(),
1633            file_path.clone(),
1634            metadata.clone(),
1635            row_group_size,
1636        );
1637
1638        let info = write_flat_sst(
1639            object_store.clone(),
1640            metadata.clone(),
1641            indexer_builder,
1642            file_path.clone(),
1643            flat_source,
1644            &write_opts,
1645        )
1646        .await;
1647        assert_eq!(200, info.num_rows);
1648        assert!(info.file_size > 0);
1649        assert!(info.index_metadata.file_size > 0);
1650
1651        let handle = create_file_handle_from_sst_info(&info, &metadata);
1652
1653        let cache = create_test_cache();
1654
1655        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1656        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1657        let preds = vec![
1658            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1659            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1660            col("tag_1").eq(lit("d")),
1661        ];
1662        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1663            FILE_DIR.to_string(),
1664            PathType::Bare,
1665            object_store.clone(),
1666            &metadata,
1667            env.get_puffin_manager(),
1668        )
1669        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1670        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1671        .build(&preds)
1672        .unwrap()
1673        .map(Arc::new);
1674
1675        let builder = ParquetReaderBuilder::new(
1676            FILE_DIR.to_string(),
1677            PathType::Bare,
1678            handle.clone(),
1679            object_store.clone(),
1680        )
1681        .predicate(Some(Predicate::new(preds)))
1682        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1683        .cache(CacheStrategy::EnableAll(cache.clone()));
1684
1685        let mut metrics = ReaderMetrics::default();
1686        let (_context, selection) = builder
1687            .build_reader_input(&mut metrics)
1688            .await
1689            .unwrap()
1690            .unwrap();
1691
1692        // Verify selection contains RG 0 and RG 1
1693        assert_eq!(selection.row_group_count(), 2);
1694        assert_eq!(50, selection.get(0).unwrap().row_count());
1695        assert_eq!(50, selection.get(1).unwrap().row_count());
1696
1697        // Verify filtering metrics
1698        assert_eq!(metrics.filter_metrics.rg_total, 2);
1699        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1700        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1701        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1702    }
1703
1704    #[tokio::test]
1705    async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1706        let mut env = TestEnv::new().await;
1707        let object_store = env.init_object_store_manager();
1708        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1709        let metadata = Arc::new(sst_region_metadata());
1710        let row_group_size = 10;
1711
1712        let flat_source = new_flat_source_from_record_batches(vec![
1713            new_record_batch_by_range(&["a", "d"], 0, 3),
1714            new_record_batch_by_range(&["b", "d"], 3, 10),
1715        ]);
1716        let write_opts = WriteOptions {
1717            row_group_size,
1718            ..Default::default()
1719        };
1720        let indexer_builder = create_test_indexer_builder(
1721            &env,
1722            object_store.clone(),
1723            file_path.clone(),
1724            metadata.clone(),
1725            row_group_size,
1726        );
1727        let info = write_flat_sst(
1728            object_store.clone(),
1729            metadata.clone(),
1730            indexer_builder,
1731            file_path,
1732            flat_source,
1733            &write_opts,
1734        )
1735        .await;
1736        let handle = create_file_handle_from_sst_info(&info, &metadata);
1737
1738        let builder =
1739            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1740                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1741
1742        let mut metrics = ReaderMetrics::default();
1743        let (context, _) = builder
1744            .build_reader_input(&mut metrics)
1745            .await
1746            .unwrap()
1747            .unwrap();
1748        let selection = RowGroupSelection::from_row_ranges(
1749            vec![(0, std::iter::once(0..6).collect())],
1750            row_group_size,
1751        );
1752
1753        let mut reader = ParquetReader::new(Arc::new(context), selection)
1754            .await
1755            .unwrap();
1756        check_record_batch_reader_result(
1757            &mut reader,
1758            &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1759        )
1760        .await;
1761    }
1762
1763    #[tokio::test]
1764    async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1765        let mut env = TestEnv::new().await;
1766        let object_store = env.init_object_store_manager();
1767        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1768        let metadata = Arc::new(sst_region_metadata());
1769        let row_group_size = 8;
1770
1771        let flat_source = new_flat_source_from_record_batches(vec![
1772            new_record_batch_by_range(&["a", "d"], 0, 2),
1773            new_record_batch_by_range(&["b", "d"], 2, 4),
1774            new_record_batch_by_range(&["a", "d"], 4, 6),
1775            new_record_batch_by_range(&["c", "d"], 6, 8),
1776        ]);
1777        let write_opts = WriteOptions {
1778            row_group_size,
1779            ..Default::default()
1780        };
1781        let indexer_builder = create_test_indexer_builder(
1782            &env,
1783            object_store.clone(),
1784            file_path.clone(),
1785            metadata.clone(),
1786            row_group_size,
1787        );
1788        let info = write_flat_sst(
1789            object_store.clone(),
1790            metadata.clone(),
1791            indexer_builder,
1792            file_path,
1793            flat_source,
1794            &write_opts,
1795        )
1796        .await;
1797        let handle = create_file_handle_from_sst_info(&info, &metadata);
1798
1799        let builder =
1800            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1801                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1802
1803        let mut metrics = ReaderMetrics::default();
1804        let (context, _) = builder
1805            .build_reader_input(&mut metrics)
1806            .await
1807            .unwrap()
1808            .unwrap();
1809        let selection = RowGroupSelection::from_row_ranges(
1810            vec![(0, std::iter::once(0..8).collect())],
1811            row_group_size,
1812        );
1813
1814        let mut reader = ParquetReader::new(Arc::new(context), selection)
1815            .await
1816            .unwrap();
1817        check_record_batch_reader_result(
1818            &mut reader,
1819            &[new_record_batch_from_rows(&[
1820                ("a", "d", 0),
1821                ("a", "d", 1),
1822                ("a", "d", 4),
1823                ("a", "d", 5),
1824            ])],
1825        )
1826        .await;
1827    }
1828
1829    #[tokio::test]
1830    async fn test_write_flat_read_with_inverted_index_sparse() {
1831        common_telemetry::init_default_ut_logging();
1832
1833        let mut env = TestEnv::new().await;
1834        let object_store = env.init_object_store_manager();
1835        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1836        let metadata = Arc::new(sst_region_metadata_with_encoding(
1837            PrimaryKeyEncoding::Sparse,
1838        ));
1839        let row_group_size = 100;
1840
1841        // Create flat format RecordBatches with non-overlapping timestamp ranges
1842        // Each batch becomes one row group (row_group_size = 100)
1843        // Data: ts tag_0 tag_1
1844        // RG 0:   0-50  [a, d]
1845        // RG 0:  50-100 [b, d]
1846        // RG 1: 100-150 [c, d]
1847        // RG 1: 150-200 [c, f]
1848        let flat_batches = vec![
1849            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1850            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1851            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1852            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1853        ];
1854
1855        let flat_source = new_flat_source_from_record_batches(flat_batches);
1856
1857        let write_opts = WriteOptions {
1858            row_group_size,
1859            ..Default::default()
1860        };
1861
1862        let indexer_builder = create_test_indexer_builder(
1863            &env,
1864            object_store.clone(),
1865            file_path.clone(),
1866            metadata.clone(),
1867            row_group_size,
1868        );
1869
1870        let info = write_flat_sst(
1871            object_store.clone(),
1872            metadata.clone(),
1873            indexer_builder,
1874            file_path.clone(),
1875            flat_source,
1876            &write_opts,
1877        )
1878        .await;
1879        assert_eq!(200, info.num_rows);
1880        assert!(info.file_size > 0);
1881        assert!(info.index_metadata.file_size > 0);
1882
1883        let handle = create_file_handle_from_sst_info(&info, &metadata);
1884
1885        let cache = create_test_cache();
1886
1887        // Test 1: Filter by tag_0 = "b"
1888        // Expected: Only rows with tag_0="b"
1889        let preds = vec![col("tag_0").eq(lit("b"))];
1890        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1891            FILE_DIR.to_string(),
1892            PathType::Bare,
1893            object_store.clone(),
1894            &metadata,
1895            HashSet::from_iter([0]),
1896            env.get_puffin_manager(),
1897        )
1898        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1899        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1900        .build(&preds)
1901        .unwrap()
1902        .map(Arc::new);
1903
1904        let builder = ParquetReaderBuilder::new(
1905            FILE_DIR.to_string(),
1906            PathType::Bare,
1907            handle.clone(),
1908            object_store.clone(),
1909        )
1910        .predicate(Some(Predicate::new(preds)))
1911        .inverted_index_appliers([inverted_index_applier.clone(), None])
1912        .cache(CacheStrategy::EnableAll(cache.clone()));
1913
1914        let mut metrics = ReaderMetrics::default();
1915        let (_context, selection) = builder
1916            .build_reader_input(&mut metrics)
1917            .await
1918            .unwrap()
1919            .unwrap();
1920
1921        // RG 0 has 50 matching rows (tag_0="b")
1922        assert_eq!(selection.row_group_count(), 1);
1923        assert_eq!(50, selection.get(0).unwrap().row_count());
1924
1925        // Verify filtering metrics
1926        // Note: With sparse encoding, tag columns aren't stored separately,
1927        // so minmax filtering on tags doesn't work (only inverted index)
1928        assert_eq!(metrics.filter_metrics.rg_total, 2);
1929        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1930        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1931        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1932    }
1933
1934    #[tokio::test]
1935    async fn test_write_flat_read_with_bloom_filter_sparse() {
1936        let mut env = TestEnv::new().await;
1937        let object_store = env.init_object_store_manager();
1938        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1939        let metadata = Arc::new(sst_region_metadata_with_encoding(
1940            PrimaryKeyEncoding::Sparse,
1941        ));
1942        let row_group_size = 100;
1943
1944        // Create flat format RecordBatches with non-overlapping timestamp ranges
1945        // Each batch becomes one row group (row_group_size = 100)
1946        // Data: ts tag_0 tag_1
1947        // RG 0:   0-50  [a, d]
1948        // RG 0:  50-100 [b, e]
1949        // RG 1: 100-150 [c, d]
1950        // RG 1: 150-200 [c, f]
1951        let flat_batches = vec![
1952            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1953            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1954            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1955            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1956        ];
1957
1958        let flat_source = new_flat_source_from_record_batches(flat_batches);
1959
1960        let write_opts = WriteOptions {
1961            row_group_size,
1962            ..Default::default()
1963        };
1964
1965        let indexer_builder = create_test_indexer_builder(
1966            &env,
1967            object_store.clone(),
1968            file_path.clone(),
1969            metadata.clone(),
1970            row_group_size,
1971        );
1972
1973        let info = write_flat_sst(
1974            object_store.clone(),
1975            metadata.clone(),
1976            indexer_builder,
1977            file_path.clone(),
1978            flat_source,
1979            &write_opts,
1980        )
1981        .await;
1982        assert_eq!(200, info.num_rows);
1983        assert!(info.file_size > 0);
1984        assert!(info.index_metadata.file_size > 0);
1985
1986        let handle = create_file_handle_from_sst_info(&info, &metadata);
1987
1988        let cache = create_test_cache();
1989
1990        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1991        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1992        let preds = vec![
1993            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1994            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1995            col("tag_1").eq(lit("d")),
1996        ];
1997        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1998            FILE_DIR.to_string(),
1999            PathType::Bare,
2000            object_store.clone(),
2001            &metadata,
2002            env.get_puffin_manager(),
2003        )
2004        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2005        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
2006        .build(&preds)
2007        .unwrap()
2008        .map(Arc::new);
2009
2010        let builder = ParquetReaderBuilder::new(
2011            FILE_DIR.to_string(),
2012            PathType::Bare,
2013            handle.clone(),
2014            object_store.clone(),
2015        )
2016        .predicate(Some(Predicate::new(preds)))
2017        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2018        .cache(CacheStrategy::EnableAll(cache.clone()));
2019
2020        let mut metrics = ReaderMetrics::default();
2021        let (_context, selection) = builder
2022            .build_reader_input(&mut metrics)
2023            .await
2024            .unwrap()
2025            .unwrap();
2026
2027        // Verify selection contains RG 0 and RG 1
2028        assert_eq!(selection.row_group_count(), 2);
2029        assert_eq!(50, selection.get(0).unwrap().row_count());
2030        assert_eq!(50, selection.get(1).unwrap().row_count());
2031
2032        // Verify filtering metrics
2033        assert_eq!(metrics.filter_metrics.rg_total, 2);
2034        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2035        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2036        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2037    }
2038
2039    /// Creates region metadata for testing fulltext indexes.
2040    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
2041    fn fulltext_region_metadata() -> RegionMetadata {
2042        let mut builder = RegionMetadataBuilder::new(REGION_ID);
2043        builder
2044            .push_column_metadata(ColumnMetadata {
2045                column_schema: ColumnSchema::new(
2046                    "tag_0".to_string(),
2047                    ConcreteDataType::string_datatype(),
2048                    true,
2049                ),
2050                semantic_type: SemanticType::Tag,
2051                column_id: 0,
2052            })
2053            .push_column_metadata(ColumnMetadata {
2054                column_schema: ColumnSchema::new(
2055                    "text_bloom".to_string(),
2056                    ConcreteDataType::string_datatype(),
2057                    true,
2058                )
2059                .with_fulltext_options(FulltextOptions {
2060                    enable: true,
2061                    analyzer: FulltextAnalyzer::English,
2062                    case_sensitive: false,
2063                    backend: FulltextBackend::Bloom,
2064                    granularity: 1,
2065                    false_positive_rate_in_10000: 50,
2066                })
2067                .unwrap(),
2068                semantic_type: SemanticType::Field,
2069                column_id: 1,
2070            })
2071            .push_column_metadata(ColumnMetadata {
2072                column_schema: ColumnSchema::new(
2073                    "text_tantivy".to_string(),
2074                    ConcreteDataType::string_datatype(),
2075                    true,
2076                )
2077                .with_fulltext_options(FulltextOptions {
2078                    enable: true,
2079                    analyzer: FulltextAnalyzer::English,
2080                    case_sensitive: false,
2081                    backend: FulltextBackend::Tantivy,
2082                    granularity: 1,
2083                    false_positive_rate_in_10000: 50,
2084                })
2085                .unwrap(),
2086                semantic_type: SemanticType::Field,
2087                column_id: 2,
2088            })
2089            .push_column_metadata(ColumnMetadata {
2090                column_schema: ColumnSchema::new(
2091                    "field_0".to_string(),
2092                    ConcreteDataType::uint64_datatype(),
2093                    true,
2094                ),
2095                semantic_type: SemanticType::Field,
2096                column_id: 3,
2097            })
2098            .push_column_metadata(ColumnMetadata {
2099                column_schema: ColumnSchema::new(
2100                    "ts".to_string(),
2101                    ConcreteDataType::timestamp_millisecond_datatype(),
2102                    false,
2103                ),
2104                semantic_type: SemanticType::Timestamp,
2105                column_id: 4,
2106            })
2107            .primary_key(vec![0]);
2108        builder.build().unwrap()
2109    }
2110
2111    /// Creates a flat format RecordBatch with string fields for fulltext testing.
2112    fn new_fulltext_record_batch_by_range(
2113        tag: &str,
2114        text_bloom: &str,
2115        text_tantivy: &str,
2116        start: usize,
2117        end: usize,
2118    ) -> RecordBatch {
2119        assert!(end >= start);
2120        let metadata = Arc::new(fulltext_region_metadata());
2121        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2122
2123        let num_rows = end - start;
2124        let mut columns = Vec::new();
2125
2126        // Add primary key column (tag_0) as dictionary array
2127        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2128        for _ in 0..num_rows {
2129            tag_builder.append_value(tag);
2130        }
2131        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2132
2133        // Add text_bloom field (fulltext with bloom backend)
2134        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2135        columns.push(Arc::new(StringArray::from(text_bloom_values)));
2136
2137        // Add text_tantivy field (fulltext with tantivy backend)
2138        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2139        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2140
2141        // Add field column (field_0)
2142        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2143        columns.push(Arc::new(UInt64Array::from(field_values)));
2144
2145        // Add time index column (ts)
2146        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2147        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2148
2149        // Add encoded primary key column
2150        let pk = new_primary_key(&[tag]);
2151        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2152        for _ in 0..num_rows {
2153            pk_builder.append(&pk).unwrap();
2154        }
2155        columns.push(Arc::new(pk_builder.finish()));
2156
2157        // Add sequence column
2158        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2159
2160        // Add op_type column
2161        columns.push(Arc::new(UInt8Array::from_value(
2162            OpType::Put as u8,
2163            num_rows,
2164        )));
2165
2166        RecordBatch::try_new(flat_schema, columns).unwrap()
2167    }
2168
2169    #[tokio::test]
2170    async fn test_write_flat_read_with_fulltext_index() {
2171        let mut env = TestEnv::new().await;
2172        let object_store = env.init_object_store_manager();
2173        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2174        let metadata = Arc::new(fulltext_region_metadata());
2175        let row_group_size = 50;
2176
2177        // Create flat format RecordBatches with different text content
2178        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
2179        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
2180        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
2181        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
2182        let flat_batches = vec![
2183            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2184            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2185            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2186            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2187        ];
2188
2189        let flat_source = new_flat_source_from_record_batches(flat_batches);
2190
2191        let write_opts = WriteOptions {
2192            row_group_size,
2193            ..Default::default()
2194        };
2195
2196        let indexer_builder = create_test_indexer_builder(
2197            &env,
2198            object_store.clone(),
2199            file_path.clone(),
2200            metadata.clone(),
2201            row_group_size,
2202        );
2203
2204        let mut info = write_flat_sst(
2205            object_store.clone(),
2206            metadata.clone(),
2207            indexer_builder,
2208            file_path.clone(),
2209            flat_source,
2210            &write_opts,
2211        )
2212        .await;
2213        assert_eq!(200, info.num_rows);
2214        assert!(info.file_size > 0);
2215        assert!(info.index_metadata.file_size > 0);
2216
2217        // Verify fulltext indexes were created
2218        assert!(info.index_metadata.fulltext_index.index_size > 0);
2219        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2220        // text_bloom (column_id 1) and text_tantivy (column_id 2)
2221        info.index_metadata.fulltext_index.columns.sort_unstable();
2222        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2223
2224        assert_eq!(
2225            (
2226                Timestamp::new_millisecond(0),
2227                Timestamp::new_millisecond(199)
2228            ),
2229            info.time_range
2230        );
2231
2232        let handle = create_file_handle_from_sst_info(&info, &metadata);
2233
2234        let cache = create_test_cache();
2235
2236        // Helper functions to create fulltext function expressions
2237        let matches_func = || {
2238            Arc::new(
2239                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2240                    .provide(Default::default()),
2241            )
2242        };
2243
2244        let matches_term_func = || {
2245            Arc::new(
2246                ScalarFunctionFactory::from(
2247                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2248                )
2249                .provide(Default::default()),
2250            )
2251        };
2252
2253        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2254        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2255        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2256            args: vec![col("text_bloom"), "hello".lit()],
2257            func: matches_term_func(),
2258        })];
2259
2260        let fulltext_applier = FulltextIndexApplierBuilder::new(
2261            FILE_DIR.to_string(),
2262            PathType::Bare,
2263            object_store.clone(),
2264            env.get_puffin_manager(),
2265            &metadata,
2266        )
2267        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2268        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2269        .build(&preds)
2270        .unwrap()
2271        .map(Arc::new);
2272
2273        let builder = ParquetReaderBuilder::new(
2274            FILE_DIR.to_string(),
2275            PathType::Bare,
2276            handle.clone(),
2277            object_store.clone(),
2278        )
2279        .predicate(Some(Predicate::new(preds)))
2280        .fulltext_index_appliers([None, fulltext_applier.clone()])
2281        .cache(CacheStrategy::EnableAll(cache.clone()));
2282
2283        let mut metrics = ReaderMetrics::default();
2284        let (_context, selection) = builder
2285            .build_reader_input(&mut metrics)
2286            .await
2287            .unwrap()
2288            .unwrap();
2289
2290        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2291        assert_eq!(selection.row_group_count(), 2);
2292        assert_eq!(50, selection.get(0).unwrap().row_count());
2293        assert_eq!(50, selection.get(1).unwrap().row_count());
2294
2295        // Verify filtering metrics
2296        assert_eq!(metrics.filter_metrics.rg_total, 4);
2297        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2298        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2299        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2300
2301        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2302        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2303        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2304            args: vec![col("text_tantivy"), "lazy".lit()],
2305            func: matches_func(),
2306        })];
2307
2308        let fulltext_applier = FulltextIndexApplierBuilder::new(
2309            FILE_DIR.to_string(),
2310            PathType::Bare,
2311            object_store.clone(),
2312            env.get_puffin_manager(),
2313            &metadata,
2314        )
2315        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2316        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2317        .build(&preds)
2318        .unwrap()
2319        .map(Arc::new);
2320
2321        let builder = ParquetReaderBuilder::new(
2322            FILE_DIR.to_string(),
2323            PathType::Bare,
2324            handle.clone(),
2325            object_store.clone(),
2326        )
2327        .predicate(Some(Predicate::new(preds)))
2328        .fulltext_index_appliers([None, fulltext_applier.clone()])
2329        .cache(CacheStrategy::EnableAll(cache.clone()));
2330
2331        let mut metrics = ReaderMetrics::default();
2332        let (_context, selection) = builder
2333            .build_reader_input(&mut metrics)
2334            .await
2335            .unwrap()
2336            .unwrap();
2337
2338        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2339        assert_eq!(selection.row_group_count(), 2);
2340        assert_eq!(50, selection.get(2).unwrap().row_count());
2341        assert_eq!(50, selection.get(3).unwrap().row_count());
2342
2343        // Verify filtering metrics
2344        assert_eq!(metrics.filter_metrics.rg_total, 4);
2345        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2346        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2347        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2348    }
2349}