Skip to main content

mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod async_reader;
28pub mod file_range;
29pub mod flat_format;
30pub mod format;
31pub(crate) mod helper;
32pub(crate) mod metadata;
33pub mod prefilter;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41/// Key of metadata in parquet SST.
42pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44/// Default batch size to read parquet files.
45///
46/// This is a runtime-only scan granularity, so we align it with DataFusion's
47/// default execution batch size to reduce rebatching and concatenation in the
48/// query pipeline.
49pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50/// Default row group size for parquet files.
51///
52/// Keep the existing persisted/on-disk default stable. It intentionally stays
53/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
54/// batching without changing the row group layout of newly written SSTs.
55pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57/// Parquet write options.
58#[derive(Debug, Clone)]
59pub struct WriteOptions {
60    /// Buffer size for async writer.
61    pub write_buffer_size: ReadableSize,
62    /// Row group size.
63    pub row_group_size: usize,
64    /// Max single output file size.
65    /// Note: This is not a hard limit as we can only observe the file size when
66    /// ArrowWrite writes to underlying writers.
67    pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71    fn default() -> Self {
72        WriteOptions {
73            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74            row_group_size: DEFAULT_ROW_GROUP_SIZE,
75            max_file_size: None,
76        }
77    }
78}
79
80/// Parquet SST info returned by the writer.
81#[derive(Debug, Default)]
82pub struct SstInfo {
83    /// SST file id.
84    pub file_id: FileId,
85    /// Time range of the SST. The timestamps have the same time unit as the
86    /// data in the SST.
87    pub time_range: FileTimeRange,
88    /// File size in bytes.
89    pub file_size: u64,
90    /// Maximum uncompressed row group size in bytes. 0 if unknown.
91    pub max_row_group_uncompressed_size: u64,
92    /// Number of rows.
93    pub num_rows: usize,
94    /// Number of row groups
95    pub num_row_groups: u64,
96    /// File Meta Data
97    pub file_metadata: Option<Arc<ParquetMetaData>>,
98    /// Index Meta Data
99    pub index_metadata: IndexOutput,
100    /// Number of series
101    pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106    use std::collections::HashSet;
107    use std::sync::Arc;
108
109    use api::v1::{OpType, SemanticType};
110    use common_function::function::FunctionRef;
111    use common_function::function_factory::ScalarFunctionFactory;
112    use common_function::scalars::matches::MatchesFunction;
113    use common_function::scalars::matches_term::MatchesTermFunction;
114    use common_time::Timestamp;
115    use datafusion_common::{Column, ScalarValue};
116    use datafusion_expr::expr::ScalarFunction;
117    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118    use datatypes::arrow;
119    use datatypes::arrow::array::{
120        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121        TimestampMillisecondArray, UInt8Array, UInt64Array,
122    };
123    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124    use datatypes::arrow::util::pretty::pretty_format_batches;
125    use datatypes::prelude::ConcreteDataType;
126    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127    use object_store::ObjectStore;
128    use parquet::arrow::AsyncArrowWriter;
129    use parquet::basic::{Compression, Encoding, ZstdLevel};
130    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131    use parquet::file::properties::WriterProperties;
132    use store_api::codec::PrimaryKeyEncoding;
133    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134    use store_api::region_request::PathType;
135    use store_api::storage::{ColumnSchema, RegionId};
136    use table::predicate::Predicate;
137    use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139    use super::*;
140    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141    use crate::cache::test_util::assert_parquet_metadata_equal;
142    use crate::cache::{CacheManager, CacheStrategy, PageKey};
143    use crate::config::IndexConfig;
144    use crate::read::FlatSource;
145    use crate::region::options::{IndexOptions, InvertedIndexOptions};
146    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
147    use crate::sst::file_purger::NoopFilePurger;
148    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
149    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
150    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
151    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
152    use crate::sst::parquet::flat_format::FlatWriteFormat;
153    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
154    use crate::sst::parquet::row_selection::RowGroupSelection;
155    use crate::sst::parquet::writer::ParquetWriter;
156    use crate::sst::{
157        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
158    };
159    use crate::test_util::TestEnv;
160    use crate::test_util::sst_util::{
161        build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
162        new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
163        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
164        sst_region_metadata_with_encoding,
165    };
166
167    const FILE_DIR: &str = "/";
168    const REGION_ID: RegionId = RegionId::new(0, 0);
169
170    #[derive(Clone)]
171    struct FixedPathProvider {
172        region_file_id: RegionFileId,
173    }
174
175    impl FilePathProvider for FixedPathProvider {
176        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
177            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
178        }
179
180        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
181            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
182        }
183
184        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
185            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
186        }
187    }
188
189    struct NoopIndexBuilder;
190
191    #[async_trait::async_trait]
192    impl IndexerBuilder for NoopIndexBuilder {
193        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
194            Indexer::default()
195        }
196    }
197
198    #[tokio::test]
199    async fn test_write_read() {
200        let mut env = TestEnv::new().await;
201        let object_store = env.init_object_store_manager();
202        let handle = sst_file_handle(0, 1000);
203        let file_path = FixedPathProvider {
204            region_file_id: handle.file_id(),
205        };
206        let metadata = Arc::new(sst_region_metadata());
207        let source = new_flat_source_from_record_batches(vec![
208            new_record_batch_by_range(&["a", "d"], 0, 60),
209            new_record_batch_by_range(&["b", "f"], 0, 40),
210            new_record_batch_by_range(&["b", "h"], 100, 200),
211        ]);
212        // Use a small row group size for test.
213        let write_opts = WriteOptions {
214            row_group_size: 50,
215            ..Default::default()
216        };
217
218        let mut metrics = Metrics::new(WriteType::Flush);
219        let mut writer = ParquetWriter::new_with_object_store(
220            object_store.clone(),
221            metadata.clone(),
222            IndexConfig::default(),
223            NoopIndexBuilder,
224            file_path,
225            &mut metrics,
226        )
227        .await;
228
229        let info = writer
230            .write_all_flat_as_primary_key(source, None, &write_opts)
231            .await
232            .unwrap()
233            .remove(0);
234        assert_eq!(200, info.num_rows);
235        assert!(info.file_size > 0);
236        assert_eq!(
237            (
238                Timestamp::new_millisecond(0),
239                Timestamp::new_millisecond(199)
240            ),
241            info.time_range
242        );
243
244        let builder = ParquetReaderBuilder::new(
245            FILE_DIR.to_string(),
246            PathType::Bare,
247            handle.clone(),
248            object_store,
249        );
250        let mut reader = builder.build().await.unwrap().unwrap();
251        check_record_batch_reader_result(
252            &mut reader,
253            &[
254                new_record_batch_by_range(&["a", "d"], 0, 50),
255                new_record_batch_by_range(&["a", "d"], 50, 60),
256                new_record_batch_by_range(&["b", "f"], 0, 40),
257                new_record_batch_by_range(&["b", "h"], 100, 150),
258                new_record_batch_by_range(&["b", "h"], 150, 200),
259            ],
260        )
261        .await;
262    }
263
264    #[tokio::test]
265    async fn test_read_with_cache() {
266        let mut env = TestEnv::new().await;
267        let object_store = env.init_object_store_manager();
268        let handle = sst_file_handle(0, 1000);
269        let metadata = Arc::new(sst_region_metadata());
270        let source = new_flat_source_from_record_batches(vec![
271            new_record_batch_by_range(&["a", "d"], 0, 60),
272            new_record_batch_by_range(&["b", "f"], 0, 40),
273            new_record_batch_by_range(&["b", "h"], 100, 200),
274        ]);
275        // Use a small row group size for test.
276        let write_opts = WriteOptions {
277            row_group_size: 50,
278            ..Default::default()
279        };
280        // Prepare data.
281        let mut metrics = Metrics::new(WriteType::Flush);
282        let mut writer = ParquetWriter::new_with_object_store(
283            object_store.clone(),
284            metadata.clone(),
285            IndexConfig::default(),
286            NoopIndexBuilder,
287            FixedPathProvider {
288                region_file_id: handle.file_id(),
289            },
290            &mut metrics,
291        )
292        .await;
293
294        let sst_info = writer
295            .write_all_flat_as_primary_key(source, None, &write_opts)
296            .await
297            .unwrap()
298            .remove(0);
299
300        // Enable page cache.
301        let cache = CacheStrategy::EnableAll(Arc::new(
302            CacheManager::builder()
303                .page_cache_size(64 * 1024 * 1024)
304                .build(),
305        ));
306        let builder = ParquetReaderBuilder::new(
307            FILE_DIR.to_string(),
308            PathType::Bare,
309            handle.clone(),
310            object_store,
311        )
312        .cache(cache.clone());
313        for _ in 0..3 {
314            let mut reader = builder.build().await.unwrap().unwrap();
315            check_record_batch_reader_result(
316                &mut reader,
317                &[
318                    new_record_batch_by_range(&["a", "d"], 0, 50),
319                    new_record_batch_by_range(&["a", "d"], 50, 60),
320                    new_record_batch_by_range(&["b", "f"], 0, 40),
321                    new_record_batch_by_range(&["b", "h"], 100, 150),
322                    new_record_batch_by_range(&["b", "h"], 150, 200),
323                ],
324            )
325            .await;
326        }
327
328        let parquet_meta = sst_info.file_metadata.unwrap();
329        let get_ranges = |row_group_idx: usize| {
330            let row_group = parquet_meta.row_group(row_group_idx);
331            let mut ranges = Vec::with_capacity(row_group.num_columns());
332            for i in 0..row_group.num_columns() {
333                let (start, length) = row_group.column(i).byte_range();
334                ranges.push(start..start + length);
335            }
336
337            ranges
338        };
339
340        // Cache 4 row groups.
341        for i in 0..4 {
342            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
343            assert!(cache.get_pages(&page_key).is_some());
344        }
345        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
346        assert!(cache.get_pages(&page_key).is_none());
347    }
348
349    #[tokio::test]
350    async fn test_parquet_metadata_eq() {
351        // create test env
352        let mut env = crate::test_util::TestEnv::new().await;
353        let object_store = env.init_object_store_manager();
354        let handle = sst_file_handle(0, 1000);
355        let metadata = Arc::new(sst_region_metadata());
356        let source = new_flat_source_from_record_batches(vec![
357            new_record_batch_by_range(&["a", "d"], 0, 60),
358            new_record_batch_by_range(&["b", "f"], 0, 40),
359            new_record_batch_by_range(&["b", "h"], 100, 200),
360        ]);
361        let write_opts = WriteOptions {
362            row_group_size: 50,
363            ..Default::default()
364        };
365
366        // write the sst file and get sst info
367        // sst info contains the parquet metadata, which is converted from FileMetaData
368        let mut metrics = Metrics::new(WriteType::Flush);
369        let mut writer = ParquetWriter::new_with_object_store(
370            object_store.clone(),
371            metadata.clone(),
372            IndexConfig::default(),
373            NoopIndexBuilder,
374            FixedPathProvider {
375                region_file_id: handle.file_id(),
376            },
377            &mut metrics,
378        )
379        .await;
380
381        let sst_info = writer
382            .write_all_flat_as_primary_key(source, None, &write_opts)
383            .await
384            .unwrap()
385            .remove(0);
386        let writer_metadata = sst_info.file_metadata.unwrap();
387
388        // read the sst file metadata
389        let builder = ParquetReaderBuilder::new(
390            FILE_DIR.to_string(),
391            PathType::Bare,
392            handle.clone(),
393            object_store,
394        )
395        .page_index_policy(PageIndexPolicy::Optional);
396        let reader = builder.build().await.unwrap().unwrap();
397        let reader_metadata = reader.parquet_metadata();
398        let cached_writer_metadata =
399            crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
400                .unwrap()
401                .parquet_metadata();
402
403        assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
404    }
405
406    #[tokio::test]
407    async fn test_read_with_tag_filter() {
408        let mut env = TestEnv::new().await;
409        let object_store = env.init_object_store_manager();
410        let handle = sst_file_handle(0, 1000);
411        let metadata = Arc::new(sst_region_metadata());
412        let source = new_flat_source_from_record_batches(vec![
413            new_record_batch_by_range(&["a", "d"], 0, 60),
414            new_record_batch_by_range(&["b", "f"], 0, 40),
415            new_record_batch_by_range(&["b", "h"], 100, 200),
416        ]);
417        // Use a small row group size for test.
418        let write_opts = WriteOptions {
419            row_group_size: 50,
420            ..Default::default()
421        };
422        // Prepare data.
423        let mut metrics = Metrics::new(WriteType::Flush);
424        let mut writer = ParquetWriter::new_with_object_store(
425            object_store.clone(),
426            metadata.clone(),
427            IndexConfig::default(),
428            NoopIndexBuilder,
429            FixedPathProvider {
430                region_file_id: handle.file_id(),
431            },
432            &mut metrics,
433        )
434        .await;
435        writer
436            .write_all_flat_as_primary_key(source, None, &write_opts)
437            .await
438            .unwrap()
439            .remove(0);
440
441        // Predicate
442        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
444            op: Operator::Eq,
445            right: Box::new("a".lit()),
446        })]));
447
448        let builder = ParquetReaderBuilder::new(
449            FILE_DIR.to_string(),
450            PathType::Bare,
451            handle.clone(),
452            object_store,
453        )
454        .predicate(predicate);
455        let mut reader = builder.build().await.unwrap().unwrap();
456        check_record_batch_reader_result(
457            &mut reader,
458            &[
459                new_record_batch_by_range(&["a", "d"], 0, 50),
460                new_record_batch_by_range(&["a", "d"], 50, 60),
461            ],
462        )
463        .await;
464    }
465
466    #[tokio::test]
467    async fn test_read_empty_batch() {
468        let mut env = TestEnv::new().await;
469        let object_store = env.init_object_store_manager();
470        let handle = sst_file_handle(0, 1000);
471        let metadata = Arc::new(sst_region_metadata());
472        let source = new_flat_source_from_record_batches(vec![
473            new_record_batch_by_range(&["a", "z"], 0, 0),
474            new_record_batch_by_range(&["a", "z"], 100, 100),
475            new_record_batch_by_range(&["a", "z"], 200, 230),
476        ]);
477        // Use a small row group size for test.
478        let write_opts = WriteOptions {
479            row_group_size: 50,
480            ..Default::default()
481        };
482        // Prepare data.
483        let mut metrics = Metrics::new(WriteType::Flush);
484        let mut writer = ParquetWriter::new_with_object_store(
485            object_store.clone(),
486            metadata.clone(),
487            IndexConfig::default(),
488            NoopIndexBuilder,
489            FixedPathProvider {
490                region_file_id: handle.file_id(),
491            },
492            &mut metrics,
493        )
494        .await;
495        writer
496            .write_all_flat_as_primary_key(source, None, &write_opts)
497            .await
498            .unwrap()
499            .remove(0);
500
501        let builder = ParquetReaderBuilder::new(
502            FILE_DIR.to_string(),
503            PathType::Bare,
504            handle.clone(),
505            object_store,
506        );
507        let mut reader = builder.build().await.unwrap().unwrap();
508        check_record_batch_reader_result(
509            &mut reader,
510            &[new_record_batch_by_range(&["a", "z"], 200, 230)],
511        )
512        .await;
513    }
514
515    #[tokio::test]
516    async fn test_read_with_field_filter() {
517        let mut env = TestEnv::new().await;
518        let object_store = env.init_object_store_manager();
519        let handle = sst_file_handle(0, 1000);
520        let metadata = Arc::new(sst_region_metadata());
521        let source = new_flat_source_from_record_batches(vec![
522            new_record_batch_by_range(&["a", "d"], 0, 60),
523            new_record_batch_by_range(&["b", "f"], 0, 40),
524            new_record_batch_by_range(&["b", "h"], 100, 200),
525        ]);
526        // Use a small row group size for test.
527        let write_opts = WriteOptions {
528            row_group_size: 50,
529            ..Default::default()
530        };
531        // Prepare data.
532        let mut metrics = Metrics::new(WriteType::Flush);
533        let mut writer = ParquetWriter::new_with_object_store(
534            object_store.clone(),
535            metadata.clone(),
536            IndexConfig::default(),
537            NoopIndexBuilder,
538            FixedPathProvider {
539                region_file_id: handle.file_id(),
540            },
541            &mut metrics,
542        )
543        .await;
544
545        writer
546            .write_all_flat_as_primary_key(source, None, &write_opts)
547            .await
548            .unwrap()
549            .remove(0);
550
551        // Predicate
552        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
553            left: Box::new(Expr::Column(Column::from_name("field_0"))),
554            op: Operator::GtEq,
555            right: Box::new(150u64.lit()),
556        })]));
557
558        let builder = ParquetReaderBuilder::new(
559            FILE_DIR.to_string(),
560            PathType::Bare,
561            handle.clone(),
562            object_store,
563        )
564        .predicate(predicate);
565        let mut reader = builder.build().await.unwrap().unwrap();
566        check_record_batch_reader_result(
567            &mut reader,
568            &[new_record_batch_by_range(&["b", "h"], 150, 200)],
569        )
570        .await;
571    }
572
573    #[tokio::test]
574    async fn test_read_large_binary() {
575        let mut env = TestEnv::new().await;
576        let object_store = env.init_object_store_manager();
577        let handle = sst_file_handle(0, 1000);
578        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
579
580        let write_opts = WriteOptions {
581            row_group_size: 50,
582            ..Default::default()
583        };
584
585        let metadata = build_test_binary_test_region_metadata();
586        let json = metadata.to_json().unwrap();
587        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
588
589        let props_builder = WriterProperties::builder()
590            .set_key_value_metadata(Some(vec![key_value_meta]))
591            .set_compression(Compression::ZSTD(ZstdLevel::default()))
592            .set_encoding(Encoding::PLAIN)
593            .set_max_row_group_size(write_opts.row_group_size);
594
595        let writer_props = props_builder.build();
596
597        let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
598        let fields: Vec<_> = write_format
599            .arrow_schema()
600            .fields()
601            .into_iter()
602            .map(|field| {
603                let data_type = field.data_type().clone();
604                if data_type == DataType::Binary {
605                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
606                } else {
607                    Field::new(field.name(), data_type, field.is_nullable())
608                }
609            })
610            .collect();
611
612        let arrow_schema = Arc::new(Schema::new(fields));
613
614        // Ensures field_0 has LargeBinary type.
615        assert_eq!(
616            &DataType::LargeBinary,
617            arrow_schema.field_with_name("field_0").unwrap().data_type()
618        );
619        let mut writer = AsyncArrowWriter::try_new(
620            object_store
621                .writer_with(&file_path)
622                .concurrent(DEFAULT_WRITE_CONCURRENCY)
623                .await
624                .map(|w| w.into_futures_async_write().compat_write())
625                .unwrap(),
626            arrow_schema.clone(),
627            Some(writer_props),
628        )
629        .unwrap();
630
631        let batch = new_record_batch_with_binary(&["a"], 0, 60);
632        let arrays: Vec<_> = batch
633            .columns()
634            .iter()
635            .map(|array| {
636                let data_type = array.data_type().clone();
637                if data_type == DataType::Binary {
638                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
639                } else {
640                    array.clone()
641                }
642            })
643            .collect();
644        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
645
646        writer.write(&result).await.unwrap();
647        writer.close().await.unwrap();
648
649        let builder = ParquetReaderBuilder::new(
650            FILE_DIR.to_string(),
651            PathType::Bare,
652            handle.clone(),
653            object_store,
654        );
655        let mut reader = builder.build().await.unwrap().unwrap();
656        check_record_batch_reader_result(
657            &mut reader,
658            &[
659                new_record_batch_with_binary(&["a"], 0, 50),
660                new_record_batch_with_binary(&["a"], 50, 60),
661            ],
662        )
663        .await;
664    }
665
666    #[tokio::test]
667    async fn test_write_multiple_files() {
668        common_telemetry::init_default_ut_logging();
669        // create test env
670        let mut env = TestEnv::new().await;
671        let object_store = env.init_object_store_manager();
672        let metadata = Arc::new(sst_region_metadata());
673        let batches = vec![
674            new_record_batch_by_range(&["a", "d"], 0, 1000),
675            new_record_batch_by_range(&["b", "f"], 0, 1000),
676            new_record_batch_by_range(&["c", "g"], 0, 1000),
677            new_record_batch_by_range(&["b", "h"], 100, 200),
678            new_record_batch_by_range(&["b", "h"], 200, 300),
679            new_record_batch_by_range(&["b", "h"], 300, 1000),
680        ];
681        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
682
683        let source = new_flat_source_from_record_batches(batches);
684        let write_opts = WriteOptions {
685            row_group_size: 50,
686            max_file_size: Some(1024 * 16),
687            ..Default::default()
688        };
689
690        let path_provider = RegionFilePathFactory {
691            table_dir: "test".to_string(),
692            path_type: PathType::Bare,
693        };
694        let mut metrics = Metrics::new(WriteType::Flush);
695        let mut writer = ParquetWriter::new_with_object_store(
696            object_store.clone(),
697            metadata.clone(),
698            IndexConfig::default(),
699            NoopIndexBuilder,
700            path_provider,
701            &mut metrics,
702        )
703        .await;
704
705        let files = writer
706            .write_all_flat_as_primary_key(source, None, &write_opts)
707            .await
708            .unwrap();
709        assert_eq!(2, files.len());
710
711        let mut rows_read = 0;
712        for f in &files {
713            let file_handle = sst_file_handle_with_file_id(
714                f.file_id,
715                f.time_range.0.value(),
716                f.time_range.1.value(),
717            );
718            let builder = ParquetReaderBuilder::new(
719                "test".to_string(),
720                PathType::Bare,
721                file_handle,
722                object_store.clone(),
723            );
724            let mut reader = builder.build().await.unwrap().unwrap();
725            while let Some(batch) = reader.next_record_batch().await.unwrap() {
726                rows_read += batch.num_rows();
727            }
728        }
729        assert_eq!(total_rows, rows_read);
730    }
731
732    #[tokio::test]
733    async fn test_write_read_with_index() {
734        let mut env = TestEnv::new().await;
735        let object_store = env.init_object_store_manager();
736        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
737        let metadata = Arc::new(sst_region_metadata());
738        let row_group_size = 50;
739
740        let source = new_flat_source_from_record_batches(vec![
741            new_record_batch_by_range(&["a", "d"], 0, 20),
742            new_record_batch_by_range(&["b", "d"], 0, 20),
743            new_record_batch_by_range(&["c", "d"], 0, 20),
744            new_record_batch_by_range(&["c", "f"], 0, 40),
745            new_record_batch_by_range(&["c", "h"], 100, 200),
746        ]);
747        // Use a small row group size for test.
748        let write_opts = WriteOptions {
749            row_group_size,
750            ..Default::default()
751        };
752
753        let puffin_manager = env
754            .get_puffin_manager()
755            .build(object_store.clone(), file_path.clone());
756        let intermediate_manager = env.get_intermediate_manager();
757
758        let indexer_builder = IndexerBuilderImpl {
759            build_type: IndexBuildType::Flush,
760            metadata: metadata.clone(),
761            row_group_size,
762            puffin_manager,
763            write_cache_enabled: false,
764            intermediate_manager,
765            index_options: IndexOptions {
766                inverted_index: InvertedIndexOptions {
767                    segment_row_count: 1,
768                    ..Default::default()
769                },
770            },
771            inverted_index_config: Default::default(),
772            fulltext_index_config: Default::default(),
773            bloom_filter_index_config: Default::default(),
774            #[cfg(feature = "vector_index")]
775            vector_index_config: Default::default(),
776        };
777
778        let mut metrics = Metrics::new(WriteType::Flush);
779        let mut writer = ParquetWriter::new_with_object_store(
780            object_store.clone(),
781            metadata.clone(),
782            IndexConfig::default(),
783            indexer_builder,
784            file_path.clone(),
785            &mut metrics,
786        )
787        .await;
788
789        let info = writer
790            .write_all_flat_as_primary_key(source, None, &write_opts)
791            .await
792            .unwrap()
793            .remove(0);
794        assert_eq!(200, info.num_rows);
795        assert!(info.file_size > 0);
796        assert!(info.index_metadata.file_size > 0);
797
798        assert!(info.index_metadata.inverted_index.index_size > 0);
799        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
800        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
801
802        assert!(info.index_metadata.bloom_filter.index_size > 0);
803        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
804        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
805
806        assert_eq!(
807            (
808                Timestamp::new_millisecond(0),
809                Timestamp::new_millisecond(199)
810            ),
811            info.time_range
812        );
813
814        let handle = FileHandle::new(
815            FileMeta {
816                region_id: metadata.region_id,
817                file_id: info.file_id,
818                time_range: info.time_range,
819                level: 0,
820                file_size: info.file_size,
821                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
822                available_indexes: info.index_metadata.build_available_indexes(),
823                indexes: info.index_metadata.build_indexes(),
824                index_file_size: info.index_metadata.file_size,
825                index_version: 0,
826                num_row_groups: info.num_row_groups,
827                num_rows: info.num_rows as u64,
828                sequence: None,
829                partition_expr: match &metadata.partition_expr {
830                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
831                        .expect("partition expression should be valid JSON"),
832                    None => None,
833                },
834                num_series: 0,
835                ..Default::default()
836            },
837            Arc::new(NoopFilePurger),
838        );
839
840        let cache = Arc::new(
841            CacheManager::builder()
842                .index_result_cache_size(1024 * 1024)
843                .index_metadata_size(1024 * 1024)
844                .index_content_page_size(1024 * 1024)
845                .index_content_size(1024 * 1024)
846                .puffin_metadata_size(1024 * 1024)
847                .build(),
848        );
849        let index_result_cache = cache.index_result_cache().unwrap();
850
851        let build_inverted_index_applier = |exprs: &[Expr]| {
852            InvertedIndexApplierBuilder::new(
853                FILE_DIR.to_string(),
854                PathType::Bare,
855                object_store.clone(),
856                &metadata,
857                HashSet::from_iter([0]),
858                env.get_puffin_manager(),
859            )
860            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
861            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
862            .build(exprs)
863            .unwrap()
864            .map(Arc::new)
865        };
866
867        let build_bloom_filter_applier = |exprs: &[Expr]| {
868            BloomFilterIndexApplierBuilder::new(
869                FILE_DIR.to_string(),
870                PathType::Bare,
871                object_store.clone(),
872                &metadata,
873                env.get_puffin_manager(),
874            )
875            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
876            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
877            .build(exprs)
878            .unwrap()
879            .map(Arc::new)
880        };
881
882        // Data: ts tag_0 tag_1
883        // Data: 0-20 [a, d]
884        //       0-20 [b, d]
885        //       0-20 [c, d]
886        //       0-40 [c, f]
887        //    100-200 [c, h]
888        //
889        // Pred: tag_0 = "b"
890        //
891        // Row groups & rows pruning:
892        //
893        // Row Groups:
894        // - min-max: filter out row groups 1..=3
895        //
896        // Rows:
897        // - inverted index: hit row group 0, hit 20 rows
898        let preds = vec![col("tag_0").eq(lit("b"))];
899        let inverted_index_applier = build_inverted_index_applier(&preds);
900        let bloom_filter_applier = build_bloom_filter_applier(&preds);
901
902        let builder = ParquetReaderBuilder::new(
903            FILE_DIR.to_string(),
904            PathType::Bare,
905            handle.clone(),
906            object_store.clone(),
907        )
908        .predicate(Some(Predicate::new(preds)))
909        .inverted_index_appliers([inverted_index_applier.clone(), None])
910        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
911        .cache(CacheStrategy::EnableAll(cache.clone()));
912
913        let mut metrics = ReaderMetrics::default();
914        let (context, selection) = builder
915            .build_reader_input(&mut metrics)
916            .await
917            .unwrap()
918            .unwrap();
919        let mut reader = ParquetReader::new(Arc::new(context), selection)
920            .await
921            .unwrap();
922        check_record_batch_reader_result(
923            &mut reader,
924            &[new_record_batch_by_range(&["b", "d"], 0, 20)],
925        )
926        .await;
927
928        assert_eq!(metrics.filter_metrics.rg_total, 4);
929        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
930        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
931        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
932        let cached = index_result_cache
933            .get(
934                inverted_index_applier.unwrap().predicate_key(),
935                handle.file_id().file_id(),
936            )
937            .unwrap();
938        // inverted index will search all row groups
939        assert!(cached.contains_row_group(0));
940        assert!(cached.contains_row_group(1));
941        assert!(cached.contains_row_group(2));
942        assert!(cached.contains_row_group(3));
943
944        // Data: ts tag_0 tag_1
945        // Data: 0-20 [a, d]
946        //       0-20 [b, d]
947        //       0-20 [c, d]
948        //       0-40 [c, f]
949        //    100-200 [c, h]
950        //
951        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
952        //
953        // Row groups & rows pruning:
954        //
955        // Row Groups:
956        // - min-max: filter out row groups 0..=1
957        // - bloom filter: filter out row groups 2..=3
958        let preds = vec![
959            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
960            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
961            col("tag_1").eq(lit("d")),
962        ];
963        let inverted_index_applier = build_inverted_index_applier(&preds);
964        let bloom_filter_applier = build_bloom_filter_applier(&preds);
965
966        let builder = ParquetReaderBuilder::new(
967            FILE_DIR.to_string(),
968            PathType::Bare,
969            handle.clone(),
970            object_store.clone(),
971        )
972        .predicate(Some(Predicate::new(preds)))
973        .inverted_index_appliers([inverted_index_applier.clone(), None])
974        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
975        .cache(CacheStrategy::EnableAll(cache.clone()));
976
977        let mut metrics = ReaderMetrics::default();
978        let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
979        assert!(read_input.is_none());
980
981        assert_eq!(metrics.filter_metrics.rg_total, 4);
982        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
983        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
984        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
985        let cached = index_result_cache
986            .get(
987                bloom_filter_applier.unwrap().predicate_key(),
988                handle.file_id().file_id(),
989            )
990            .unwrap();
991        assert!(cached.contains_row_group(2));
992        assert!(cached.contains_row_group(3));
993        assert!(!cached.contains_row_group(0));
994        assert!(!cached.contains_row_group(1));
995
996        // Remove the pred of `ts`, continue to use the pred of `tag_1`
997        // to test if cache works.
998
999        // Data: ts tag_0 tag_1
1000        // Data: 0-20 [a, d]
1001        //       0-20 [b, d]
1002        //       0-20 [c, d]
1003        //       0-40 [c, f]
1004        //    100-200 [c, h]
1005        //
1006        // Pred: tag_1 = "d"
1007        //
1008        // Row groups & rows pruning:
1009        //
1010        // Row Groups:
1011        // - bloom filter: filter out row groups 2..=3
1012        //
1013        // Rows:
1014        // - bloom filter: hit row group 0, hit 50 rows
1015        //                 hit row group 1, hit 10 rows
1016        let preds = vec![col("tag_1").eq(lit("d"))];
1017        let inverted_index_applier = build_inverted_index_applier(&preds);
1018        let bloom_filter_applier = build_bloom_filter_applier(&preds);
1019
1020        let builder = ParquetReaderBuilder::new(
1021            FILE_DIR.to_string(),
1022            PathType::Bare,
1023            handle.clone(),
1024            object_store.clone(),
1025        )
1026        .predicate(Some(Predicate::new(preds)))
1027        .inverted_index_appliers([inverted_index_applier.clone(), None])
1028        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1029        .cache(CacheStrategy::EnableAll(cache.clone()));
1030
1031        let mut metrics = ReaderMetrics::default();
1032        let (context, selection) = builder
1033            .build_reader_input(&mut metrics)
1034            .await
1035            .unwrap()
1036            .unwrap();
1037        let mut reader = ParquetReader::new(Arc::new(context), selection)
1038            .await
1039            .unwrap();
1040        check_record_batch_reader_result(
1041            &mut reader,
1042            &[
1043                new_record_batch_by_range(&["a", "d"], 0, 20),
1044                new_record_batch_by_range(&["b", "d"], 0, 20),
1045                new_record_batch_by_range(&["c", "d"], 0, 10),
1046                new_record_batch_by_range(&["c", "d"], 10, 20),
1047            ],
1048        )
1049        .await;
1050
1051        assert_eq!(metrics.filter_metrics.rg_total, 4);
1052        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1053        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1054        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1055        let cached = index_result_cache
1056            .get(
1057                bloom_filter_applier.unwrap().predicate_key(),
1058                handle.file_id().file_id(),
1059            )
1060            .unwrap();
1061        assert!(cached.contains_row_group(0));
1062        assert!(cached.contains_row_group(1));
1063        assert!(cached.contains_row_group(2));
1064        assert!(cached.contains_row_group(3));
1065    }
1066
1067    fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1068        assert!(end >= start);
1069        let metadata = build_test_binary_test_region_metadata();
1070        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1071
1072        let num_rows = end - start;
1073        let mut columns = Vec::new();
1074
1075        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1076        for _ in 0..num_rows {
1077            tag_0_builder.append_value(tags[0]);
1078        }
1079        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1080
1081        let values = (0..num_rows)
1082            .map(|_| "some data".as_bytes())
1083            .collect::<Vec<_>>();
1084        columns.push(
1085            Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1086                values,
1087            )) as ArrayRef,
1088        );
1089
1090        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1091        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1092
1093        let pk = new_primary_key(tags);
1094        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1095        for _ in 0..num_rows {
1096            pk_builder.append(&pk).unwrap();
1097        }
1098        columns.push(Arc::new(pk_builder.finish()));
1099
1100        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1101        columns.push(Arc::new(UInt8Array::from_value(
1102            OpType::Put as u8,
1103            num_rows,
1104        )));
1105
1106        RecordBatch::try_new(flat_schema, columns).unwrap()
1107    }
1108
1109    async fn check_record_batch_reader_result(
1110        reader: &mut ParquetReader,
1111        expected: &[RecordBatch],
1112    ) {
1113        let mut actual = Vec::new();
1114        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1115            actual.push(batch);
1116        }
1117        assert_eq!(
1118            pretty_format_batches(expected).unwrap().to_string(),
1119            pretty_format_batches(&actual).unwrap().to_string()
1120        );
1121        assert!(reader.next_record_batch().await.unwrap().is_none());
1122    }
1123
1124    fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1125        let metadata = Arc::new(sst_region_metadata());
1126        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1127
1128        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1129        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1130        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1131        let mut field_values = Vec::with_capacity(rows.len());
1132        let mut timestamps = Vec::with_capacity(rows.len());
1133
1134        for (tag_0, tag_1, ts) in rows {
1135            tag_0_builder.append_value(*tag_0);
1136            tag_1_builder.append_value(*tag_1);
1137            pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1138            field_values.push(*ts as u64);
1139            timestamps.push(*ts);
1140        }
1141
1142        RecordBatch::try_new(
1143            flat_schema,
1144            vec![
1145                Arc::new(tag_0_builder.finish()) as ArrayRef,
1146                Arc::new(tag_1_builder.finish()) as ArrayRef,
1147                Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1148                Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1149                Arc::new(pk_builder.finish()) as ArrayRef,
1150                Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1151                Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1152            ],
1153        )
1154        .unwrap()
1155    }
1156
1157    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1158    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1159    fn new_record_batch_by_range_sparse(
1160        tags: &[&str],
1161        start: usize,
1162        end: usize,
1163        metadata: &Arc<RegionMetadata>,
1164    ) -> RecordBatch {
1165        assert!(end >= start);
1166        let flat_schema = to_flat_sst_arrow_schema(
1167            metadata,
1168            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1169        );
1170
1171        let num_rows = end - start;
1172        let mut columns: Vec<ArrayRef> = Vec::new();
1173
1174        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1175
1176        // Add field column (field_0)
1177        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1178        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1179
1180        // Add time index column (ts)
1181        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1182        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1183
1184        // Add encoded primary key column using sparse encoding
1185        let table_id = 1u32; // Test table ID
1186        let tsid = 100u64; // Base TSID
1187        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1188
1189        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1190        for _ in 0..num_rows {
1191            pk_builder.append(&pk).unwrap();
1192        }
1193        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1194
1195        // Add sequence column
1196        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1197
1198        // Add op_type column
1199        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1200
1201        RecordBatch::try_new(flat_schema, columns).unwrap()
1202    }
1203
1204    /// Helper function to create IndexerBuilderImpl for tests.
1205    fn create_test_indexer_builder(
1206        env: &TestEnv,
1207        object_store: ObjectStore,
1208        file_path: RegionFilePathFactory,
1209        metadata: Arc<RegionMetadata>,
1210        row_group_size: usize,
1211    ) -> IndexerBuilderImpl {
1212        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1213        let intermediate_manager = env.get_intermediate_manager();
1214
1215        IndexerBuilderImpl {
1216            build_type: IndexBuildType::Flush,
1217            metadata,
1218            row_group_size,
1219            puffin_manager,
1220            write_cache_enabled: false,
1221            intermediate_manager,
1222            index_options: IndexOptions {
1223                inverted_index: InvertedIndexOptions {
1224                    segment_row_count: 1,
1225                    ..Default::default()
1226                },
1227            },
1228            inverted_index_config: Default::default(),
1229            fulltext_index_config: Default::default(),
1230            bloom_filter_index_config: Default::default(),
1231            #[cfg(feature = "vector_index")]
1232            vector_index_config: Default::default(),
1233        }
1234    }
1235
1236    /// Helper function to write flat SST and return SstInfo.
1237    async fn write_flat_sst(
1238        object_store: ObjectStore,
1239        metadata: Arc<RegionMetadata>,
1240        indexer_builder: IndexerBuilderImpl,
1241        file_path: RegionFilePathFactory,
1242        flat_source: FlatSource,
1243        write_opts: &WriteOptions,
1244    ) -> SstInfo {
1245        let mut metrics = Metrics::new(WriteType::Flush);
1246        let mut writer = ParquetWriter::new_with_object_store(
1247            object_store,
1248            metadata,
1249            IndexConfig::default(),
1250            indexer_builder,
1251            file_path,
1252            &mut metrics,
1253        )
1254        .await;
1255
1256        writer
1257            .write_all_flat(flat_source, None, write_opts)
1258            .await
1259            .unwrap()
1260            .remove(0)
1261    }
1262
1263    /// Helper function to create FileHandle from SstInfo.
1264    fn create_file_handle_from_sst_info(
1265        info: &SstInfo,
1266        metadata: &Arc<RegionMetadata>,
1267    ) -> FileHandle {
1268        FileHandle::new(
1269            FileMeta {
1270                region_id: metadata.region_id,
1271                file_id: info.file_id,
1272                time_range: info.time_range,
1273                level: 0,
1274                file_size: info.file_size,
1275                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1276                available_indexes: info.index_metadata.build_available_indexes(),
1277                indexes: info.index_metadata.build_indexes(),
1278                index_file_size: info.index_metadata.file_size,
1279                index_version: 0,
1280                num_row_groups: info.num_row_groups,
1281                num_rows: info.num_rows as u64,
1282                sequence: None,
1283                partition_expr: match &metadata.partition_expr {
1284                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1285                        .expect("partition expression should be valid JSON"),
1286                    None => None,
1287                },
1288                num_series: 0,
1289                ..Default::default()
1290            },
1291            Arc::new(NoopFilePurger),
1292        )
1293    }
1294
1295    /// Helper function to create test cache with standard settings.
1296    fn create_test_cache() -> Arc<CacheManager> {
1297        Arc::new(
1298            CacheManager::builder()
1299                .index_result_cache_size(1024 * 1024)
1300                .index_metadata_size(1024 * 1024)
1301                .index_content_page_size(1024 * 1024)
1302                .index_content_size(1024 * 1024)
1303                .puffin_metadata_size(1024 * 1024)
1304                .build(),
1305        )
1306    }
1307
1308    #[tokio::test]
1309    async fn test_write_flat_with_index() {
1310        let mut env = TestEnv::new().await;
1311        let object_store = env.init_object_store_manager();
1312        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1313        let metadata = Arc::new(sst_region_metadata());
1314        let row_group_size = 50;
1315
1316        // Create flat format RecordBatches
1317        let flat_batches = vec![
1318            new_record_batch_by_range(&["a", "d"], 0, 20),
1319            new_record_batch_by_range(&["b", "d"], 0, 20),
1320            new_record_batch_by_range(&["c", "d"], 0, 20),
1321            new_record_batch_by_range(&["c", "f"], 0, 40),
1322            new_record_batch_by_range(&["c", "h"], 100, 200),
1323        ];
1324
1325        let flat_source = new_flat_source_from_record_batches(flat_batches);
1326
1327        let write_opts = WriteOptions {
1328            row_group_size,
1329            ..Default::default()
1330        };
1331
1332        let puffin_manager = env
1333            .get_puffin_manager()
1334            .build(object_store.clone(), file_path.clone());
1335        let intermediate_manager = env.get_intermediate_manager();
1336
1337        let indexer_builder = IndexerBuilderImpl {
1338            build_type: IndexBuildType::Flush,
1339            metadata: metadata.clone(),
1340            row_group_size,
1341            puffin_manager,
1342            write_cache_enabled: false,
1343            intermediate_manager,
1344            index_options: IndexOptions {
1345                inverted_index: InvertedIndexOptions {
1346                    segment_row_count: 1,
1347                    ..Default::default()
1348                },
1349            },
1350            inverted_index_config: Default::default(),
1351            fulltext_index_config: Default::default(),
1352            bloom_filter_index_config: Default::default(),
1353            #[cfg(feature = "vector_index")]
1354            vector_index_config: Default::default(),
1355        };
1356
1357        let mut metrics = Metrics::new(WriteType::Flush);
1358        let mut writer = ParquetWriter::new_with_object_store(
1359            object_store.clone(),
1360            metadata.clone(),
1361            IndexConfig::default(),
1362            indexer_builder,
1363            file_path.clone(),
1364            &mut metrics,
1365        )
1366        .await;
1367
1368        let info = writer
1369            .write_all_flat(flat_source, None, &write_opts)
1370            .await
1371            .unwrap()
1372            .remove(0);
1373        assert_eq!(200, info.num_rows);
1374        assert!(info.file_size > 0);
1375        assert!(info.index_metadata.file_size > 0);
1376
1377        assert!(info.index_metadata.inverted_index.index_size > 0);
1378        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1379        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1380
1381        assert!(info.index_metadata.bloom_filter.index_size > 0);
1382        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1383        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1384
1385        assert_eq!(
1386            (
1387                Timestamp::new_millisecond(0),
1388                Timestamp::new_millisecond(199)
1389            ),
1390            info.time_range
1391        );
1392    }
1393
1394    #[tokio::test]
1395    async fn test_read_with_override_sequence() {
1396        let mut env = TestEnv::new().await;
1397        let object_store = env.init_object_store_manager();
1398        let handle = sst_file_handle(0, 1000);
1399        let file_path = FixedPathProvider {
1400            region_file_id: handle.file_id(),
1401        };
1402        let metadata = Arc::new(sst_region_metadata());
1403
1404        // Create batches with sequence 0 to trigger override functionality.
1405        let source = new_flat_source_from_record_batches(vec![
1406            new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1407            new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1408        ]);
1409
1410        let write_opts = WriteOptions {
1411            row_group_size: 50,
1412            ..Default::default()
1413        };
1414
1415        let mut metrics = Metrics::new(WriteType::Flush);
1416        let mut writer = ParquetWriter::new_with_object_store(
1417            object_store.clone(),
1418            metadata.clone(),
1419            IndexConfig::default(),
1420            NoopIndexBuilder,
1421            file_path,
1422            &mut metrics,
1423        )
1424        .await;
1425
1426        writer
1427            .write_all_flat_as_primary_key(source, None, &write_opts)
1428            .await
1429            .unwrap()
1430            .remove(0);
1431
1432        // Read without override sequence (should read sequence 0)
1433        let builder = ParquetReaderBuilder::new(
1434            FILE_DIR.to_string(),
1435            PathType::Bare,
1436            handle.clone(),
1437            object_store.clone(),
1438        );
1439        let mut reader = builder.build().await.unwrap().unwrap();
1440        let mut normal_batches = Vec::new();
1441        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1442            normal_batches.push(batch);
1443        }
1444
1445        // Read with override sequence using FileMeta.sequence
1446        let custom_sequence = 12345u64;
1447        let file_meta = handle.meta_ref();
1448        let mut override_file_meta = file_meta.clone();
1449        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1450        let override_handle = FileHandle::new(
1451            override_file_meta,
1452            Arc::new(crate::sst::file_purger::NoopFilePurger),
1453        );
1454
1455        let builder = ParquetReaderBuilder::new(
1456            FILE_DIR.to_string(),
1457            PathType::Bare,
1458            override_handle,
1459            object_store.clone(),
1460        );
1461        let mut reader = builder.build().await.unwrap().unwrap();
1462        let mut override_batches = Vec::new();
1463        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1464            override_batches.push(batch);
1465        }
1466
1467        // Compare the results
1468        assert_eq!(normal_batches.len(), override_batches.len());
1469        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1470            let expected_batch = {
1471                let mut columns = normal.columns().to_vec();
1472                let num_cols = columns.len();
1473                columns[num_cols - 2] =
1474                    Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1475                RecordBatch::try_new(normal.schema(), columns).unwrap()
1476            };
1477
1478            // Override batch should match expected batch
1479            assert_eq!(*override_batch, expected_batch);
1480        }
1481    }
1482
1483    #[tokio::test]
1484    async fn test_write_flat_read_with_inverted_index() {
1485        let mut env = TestEnv::new().await;
1486        let object_store = env.init_object_store_manager();
1487        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1488        let metadata = Arc::new(sst_region_metadata());
1489        let row_group_size = 100;
1490
1491        // Create flat format RecordBatches with non-overlapping timestamp ranges
1492        // Each batch becomes one row group (row_group_size = 100)
1493        // Data: ts tag_0 tag_1
1494        // RG 0:   0-50  [a, d]
1495        // RG 0:  50-100 [b, d]
1496        // RG 1: 100-150 [c, d]
1497        // RG 1: 150-200 [c, f]
1498        let flat_batches = vec![
1499            new_record_batch_by_range(&["a", "d"], 0, 50),
1500            new_record_batch_by_range(&["b", "d"], 50, 100),
1501            new_record_batch_by_range(&["c", "d"], 100, 150),
1502            new_record_batch_by_range(&["c", "f"], 150, 200),
1503        ];
1504
1505        let flat_source = new_flat_source_from_record_batches(flat_batches);
1506
1507        let write_opts = WriteOptions {
1508            row_group_size,
1509            ..Default::default()
1510        };
1511
1512        let indexer_builder = create_test_indexer_builder(
1513            &env,
1514            object_store.clone(),
1515            file_path.clone(),
1516            metadata.clone(),
1517            row_group_size,
1518        );
1519
1520        let info = write_flat_sst(
1521            object_store.clone(),
1522            metadata.clone(),
1523            indexer_builder,
1524            file_path.clone(),
1525            flat_source,
1526            &write_opts,
1527        )
1528        .await;
1529        assert_eq!(200, info.num_rows);
1530        assert!(info.file_size > 0);
1531        assert!(info.index_metadata.file_size > 0);
1532
1533        let handle = create_file_handle_from_sst_info(&info, &metadata);
1534
1535        let cache = create_test_cache();
1536
1537        // Test 1: Filter by tag_0 = "b"
1538        // Expected: Only rows with tag_0="b"
1539        let preds = vec![col("tag_0").eq(lit("b"))];
1540        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1541            FILE_DIR.to_string(),
1542            PathType::Bare,
1543            object_store.clone(),
1544            &metadata,
1545            HashSet::from_iter([0]),
1546            env.get_puffin_manager(),
1547        )
1548        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1549        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1550        .build(&preds)
1551        .unwrap()
1552        .map(Arc::new);
1553
1554        let builder = ParquetReaderBuilder::new(
1555            FILE_DIR.to_string(),
1556            PathType::Bare,
1557            handle.clone(),
1558            object_store.clone(),
1559        )
1560        .predicate(Some(Predicate::new(preds)))
1561        .inverted_index_appliers([inverted_index_applier.clone(), None])
1562        .cache(CacheStrategy::EnableAll(cache.clone()));
1563
1564        let mut metrics = ReaderMetrics::default();
1565        let (_context, selection) = builder
1566            .build_reader_input(&mut metrics)
1567            .await
1568            .unwrap()
1569            .unwrap();
1570
1571        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1572        assert_eq!(selection.row_group_count(), 1);
1573        assert_eq!(50, selection.get(0).unwrap().row_count());
1574
1575        // Verify filtering metrics
1576        assert_eq!(metrics.filter_metrics.rg_total, 2);
1577        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1578        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1579        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1580    }
1581
1582    #[tokio::test]
1583    async fn test_write_flat_read_with_bloom_filter() {
1584        let mut env = TestEnv::new().await;
1585        let object_store = env.init_object_store_manager();
1586        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1587        let metadata = Arc::new(sst_region_metadata());
1588        let row_group_size = 100;
1589
1590        // Create flat format RecordBatches with non-overlapping timestamp ranges
1591        // Each batch becomes one row group (row_group_size = 100)
1592        // Data: ts tag_0 tag_1
1593        // RG 0:   0-50  [a, d]
1594        // RG 0:  50-100 [b, e]
1595        // RG 1: 100-150 [c, d]
1596        // RG 1: 150-200 [c, f]
1597        let flat_batches = vec![
1598            new_record_batch_by_range(&["a", "d"], 0, 50),
1599            new_record_batch_by_range(&["b", "e"], 50, 100),
1600            new_record_batch_by_range(&["c", "d"], 100, 150),
1601            new_record_batch_by_range(&["c", "f"], 150, 200),
1602        ];
1603
1604        let flat_source = new_flat_source_from_record_batches(flat_batches);
1605
1606        let write_opts = WriteOptions {
1607            row_group_size,
1608            ..Default::default()
1609        };
1610
1611        let indexer_builder = create_test_indexer_builder(
1612            &env,
1613            object_store.clone(),
1614            file_path.clone(),
1615            metadata.clone(),
1616            row_group_size,
1617        );
1618
1619        let info = write_flat_sst(
1620            object_store.clone(),
1621            metadata.clone(),
1622            indexer_builder,
1623            file_path.clone(),
1624            flat_source,
1625            &write_opts,
1626        )
1627        .await;
1628        assert_eq!(200, info.num_rows);
1629        assert!(info.file_size > 0);
1630        assert!(info.index_metadata.file_size > 0);
1631
1632        let handle = create_file_handle_from_sst_info(&info, &metadata);
1633
1634        let cache = create_test_cache();
1635
1636        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1637        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1638        let preds = vec![
1639            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1640            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1641            col("tag_1").eq(lit("d")),
1642        ];
1643        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1644            FILE_DIR.to_string(),
1645            PathType::Bare,
1646            object_store.clone(),
1647            &metadata,
1648            env.get_puffin_manager(),
1649        )
1650        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1651        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1652        .build(&preds)
1653        .unwrap()
1654        .map(Arc::new);
1655
1656        let builder = ParquetReaderBuilder::new(
1657            FILE_DIR.to_string(),
1658            PathType::Bare,
1659            handle.clone(),
1660            object_store.clone(),
1661        )
1662        .predicate(Some(Predicate::new(preds)))
1663        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1664        .cache(CacheStrategy::EnableAll(cache.clone()));
1665
1666        let mut metrics = ReaderMetrics::default();
1667        let (_context, selection) = builder
1668            .build_reader_input(&mut metrics)
1669            .await
1670            .unwrap()
1671            .unwrap();
1672
1673        // Verify selection contains RG 0 and RG 1
1674        assert_eq!(selection.row_group_count(), 2);
1675        assert_eq!(50, selection.get(0).unwrap().row_count());
1676        assert_eq!(50, selection.get(1).unwrap().row_count());
1677
1678        // Verify filtering metrics
1679        assert_eq!(metrics.filter_metrics.rg_total, 2);
1680        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1681        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1682        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1683    }
1684
1685    #[tokio::test]
1686    async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1687        let mut env = TestEnv::new().await;
1688        let object_store = env.init_object_store_manager();
1689        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1690        let metadata = Arc::new(sst_region_metadata());
1691        let row_group_size = 10;
1692
1693        let flat_source = new_flat_source_from_record_batches(vec![
1694            new_record_batch_by_range(&["a", "d"], 0, 3),
1695            new_record_batch_by_range(&["b", "d"], 3, 10),
1696        ]);
1697        let write_opts = WriteOptions {
1698            row_group_size,
1699            ..Default::default()
1700        };
1701        let indexer_builder = create_test_indexer_builder(
1702            &env,
1703            object_store.clone(),
1704            file_path.clone(),
1705            metadata.clone(),
1706            row_group_size,
1707        );
1708        let info = write_flat_sst(
1709            object_store.clone(),
1710            metadata.clone(),
1711            indexer_builder,
1712            file_path,
1713            flat_source,
1714            &write_opts,
1715        )
1716        .await;
1717        let handle = create_file_handle_from_sst_info(&info, &metadata);
1718
1719        let builder =
1720            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1721                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1722
1723        let mut metrics = ReaderMetrics::default();
1724        let (context, _) = builder
1725            .build_reader_input(&mut metrics)
1726            .await
1727            .unwrap()
1728            .unwrap();
1729        let selection = RowGroupSelection::from_row_ranges(
1730            vec![(0, std::iter::once(0..6).collect())],
1731            row_group_size,
1732        );
1733
1734        let mut reader = ParquetReader::new(Arc::new(context), selection)
1735            .await
1736            .unwrap();
1737        check_record_batch_reader_result(
1738            &mut reader,
1739            &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1740        )
1741        .await;
1742    }
1743
1744    #[tokio::test]
1745    async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1746        let mut env = TestEnv::new().await;
1747        let object_store = env.init_object_store_manager();
1748        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1749        let metadata = Arc::new(sst_region_metadata());
1750        let row_group_size = 8;
1751
1752        let flat_source = new_flat_source_from_record_batches(vec![
1753            new_record_batch_by_range(&["a", "d"], 0, 2),
1754            new_record_batch_by_range(&["b", "d"], 2, 4),
1755            new_record_batch_by_range(&["a", "d"], 4, 6),
1756            new_record_batch_by_range(&["c", "d"], 6, 8),
1757        ]);
1758        let write_opts = WriteOptions {
1759            row_group_size,
1760            ..Default::default()
1761        };
1762        let indexer_builder = create_test_indexer_builder(
1763            &env,
1764            object_store.clone(),
1765            file_path.clone(),
1766            metadata.clone(),
1767            row_group_size,
1768        );
1769        let info = write_flat_sst(
1770            object_store.clone(),
1771            metadata.clone(),
1772            indexer_builder,
1773            file_path,
1774            flat_source,
1775            &write_opts,
1776        )
1777        .await;
1778        let handle = create_file_handle_from_sst_info(&info, &metadata);
1779
1780        let builder =
1781            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1782                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1783
1784        let mut metrics = ReaderMetrics::default();
1785        let (context, _) = builder
1786            .build_reader_input(&mut metrics)
1787            .await
1788            .unwrap()
1789            .unwrap();
1790        let selection = RowGroupSelection::from_row_ranges(
1791            vec![(0, std::iter::once(0..8).collect())],
1792            row_group_size,
1793        );
1794
1795        let mut reader = ParquetReader::new(Arc::new(context), selection)
1796            .await
1797            .unwrap();
1798        check_record_batch_reader_result(
1799            &mut reader,
1800            &[new_record_batch_from_rows(&[
1801                ("a", "d", 0),
1802                ("a", "d", 1),
1803                ("a", "d", 4),
1804                ("a", "d", 5),
1805            ])],
1806        )
1807        .await;
1808    }
1809
1810    #[tokio::test]
1811    async fn test_write_flat_read_with_inverted_index_sparse() {
1812        common_telemetry::init_default_ut_logging();
1813
1814        let mut env = TestEnv::new().await;
1815        let object_store = env.init_object_store_manager();
1816        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1817        let metadata = Arc::new(sst_region_metadata_with_encoding(
1818            PrimaryKeyEncoding::Sparse,
1819        ));
1820        let row_group_size = 100;
1821
1822        // Create flat format RecordBatches with non-overlapping timestamp ranges
1823        // Each batch becomes one row group (row_group_size = 100)
1824        // Data: ts tag_0 tag_1
1825        // RG 0:   0-50  [a, d]
1826        // RG 0:  50-100 [b, d]
1827        // RG 1: 100-150 [c, d]
1828        // RG 1: 150-200 [c, f]
1829        let flat_batches = vec![
1830            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1831            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1832            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1833            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1834        ];
1835
1836        let flat_source = new_flat_source_from_record_batches(flat_batches);
1837
1838        let write_opts = WriteOptions {
1839            row_group_size,
1840            ..Default::default()
1841        };
1842
1843        let indexer_builder = create_test_indexer_builder(
1844            &env,
1845            object_store.clone(),
1846            file_path.clone(),
1847            metadata.clone(),
1848            row_group_size,
1849        );
1850
1851        let info = write_flat_sst(
1852            object_store.clone(),
1853            metadata.clone(),
1854            indexer_builder,
1855            file_path.clone(),
1856            flat_source,
1857            &write_opts,
1858        )
1859        .await;
1860        assert_eq!(200, info.num_rows);
1861        assert!(info.file_size > 0);
1862        assert!(info.index_metadata.file_size > 0);
1863
1864        let handle = create_file_handle_from_sst_info(&info, &metadata);
1865
1866        let cache = create_test_cache();
1867
1868        // Test 1: Filter by tag_0 = "b"
1869        // Expected: Only rows with tag_0="b"
1870        let preds = vec![col("tag_0").eq(lit("b"))];
1871        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1872            FILE_DIR.to_string(),
1873            PathType::Bare,
1874            object_store.clone(),
1875            &metadata,
1876            HashSet::from_iter([0]),
1877            env.get_puffin_manager(),
1878        )
1879        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1880        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1881        .build(&preds)
1882        .unwrap()
1883        .map(Arc::new);
1884
1885        let builder = ParquetReaderBuilder::new(
1886            FILE_DIR.to_string(),
1887            PathType::Bare,
1888            handle.clone(),
1889            object_store.clone(),
1890        )
1891        .predicate(Some(Predicate::new(preds)))
1892        .inverted_index_appliers([inverted_index_applier.clone(), None])
1893        .cache(CacheStrategy::EnableAll(cache.clone()));
1894
1895        let mut metrics = ReaderMetrics::default();
1896        let (_context, selection) = builder
1897            .build_reader_input(&mut metrics)
1898            .await
1899            .unwrap()
1900            .unwrap();
1901
1902        // RG 0 has 50 matching rows (tag_0="b")
1903        assert_eq!(selection.row_group_count(), 1);
1904        assert_eq!(50, selection.get(0).unwrap().row_count());
1905
1906        // Verify filtering metrics
1907        // Note: With sparse encoding, tag columns aren't stored separately,
1908        // so minmax filtering on tags doesn't work (only inverted index)
1909        assert_eq!(metrics.filter_metrics.rg_total, 2);
1910        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1911        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1912        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1913    }
1914
1915    #[tokio::test]
1916    async fn test_write_flat_read_with_bloom_filter_sparse() {
1917        let mut env = TestEnv::new().await;
1918        let object_store = env.init_object_store_manager();
1919        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1920        let metadata = Arc::new(sst_region_metadata_with_encoding(
1921            PrimaryKeyEncoding::Sparse,
1922        ));
1923        let row_group_size = 100;
1924
1925        // Create flat format RecordBatches with non-overlapping timestamp ranges
1926        // Each batch becomes one row group (row_group_size = 100)
1927        // Data: ts tag_0 tag_1
1928        // RG 0:   0-50  [a, d]
1929        // RG 0:  50-100 [b, e]
1930        // RG 1: 100-150 [c, d]
1931        // RG 1: 150-200 [c, f]
1932        let flat_batches = vec![
1933            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1934            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1935            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1936            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1937        ];
1938
1939        let flat_source = new_flat_source_from_record_batches(flat_batches);
1940
1941        let write_opts = WriteOptions {
1942            row_group_size,
1943            ..Default::default()
1944        };
1945
1946        let indexer_builder = create_test_indexer_builder(
1947            &env,
1948            object_store.clone(),
1949            file_path.clone(),
1950            metadata.clone(),
1951            row_group_size,
1952        );
1953
1954        let info = write_flat_sst(
1955            object_store.clone(),
1956            metadata.clone(),
1957            indexer_builder,
1958            file_path.clone(),
1959            flat_source,
1960            &write_opts,
1961        )
1962        .await;
1963        assert_eq!(200, info.num_rows);
1964        assert!(info.file_size > 0);
1965        assert!(info.index_metadata.file_size > 0);
1966
1967        let handle = create_file_handle_from_sst_info(&info, &metadata);
1968
1969        let cache = create_test_cache();
1970
1971        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1972        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1973        let preds = vec![
1974            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1975            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1976            col("tag_1").eq(lit("d")),
1977        ];
1978        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1979            FILE_DIR.to_string(),
1980            PathType::Bare,
1981            object_store.clone(),
1982            &metadata,
1983            env.get_puffin_manager(),
1984        )
1985        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1986        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1987        .build(&preds)
1988        .unwrap()
1989        .map(Arc::new);
1990
1991        let builder = ParquetReaderBuilder::new(
1992            FILE_DIR.to_string(),
1993            PathType::Bare,
1994            handle.clone(),
1995            object_store.clone(),
1996        )
1997        .predicate(Some(Predicate::new(preds)))
1998        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1999        .cache(CacheStrategy::EnableAll(cache.clone()));
2000
2001        let mut metrics = ReaderMetrics::default();
2002        let (_context, selection) = builder
2003            .build_reader_input(&mut metrics)
2004            .await
2005            .unwrap()
2006            .unwrap();
2007
2008        // Verify selection contains RG 0 and RG 1
2009        assert_eq!(selection.row_group_count(), 2);
2010        assert_eq!(50, selection.get(0).unwrap().row_count());
2011        assert_eq!(50, selection.get(1).unwrap().row_count());
2012
2013        // Verify filtering metrics
2014        assert_eq!(metrics.filter_metrics.rg_total, 2);
2015        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2016        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2017        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2018    }
2019
2020    /// Creates region metadata for testing fulltext indexes.
2021    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
2022    fn fulltext_region_metadata() -> RegionMetadata {
2023        let mut builder = RegionMetadataBuilder::new(REGION_ID);
2024        builder
2025            .push_column_metadata(ColumnMetadata {
2026                column_schema: ColumnSchema::new(
2027                    "tag_0".to_string(),
2028                    ConcreteDataType::string_datatype(),
2029                    true,
2030                ),
2031                semantic_type: SemanticType::Tag,
2032                column_id: 0,
2033            })
2034            .push_column_metadata(ColumnMetadata {
2035                column_schema: ColumnSchema::new(
2036                    "text_bloom".to_string(),
2037                    ConcreteDataType::string_datatype(),
2038                    true,
2039                )
2040                .with_fulltext_options(FulltextOptions {
2041                    enable: true,
2042                    analyzer: FulltextAnalyzer::English,
2043                    case_sensitive: false,
2044                    backend: FulltextBackend::Bloom,
2045                    granularity: 1,
2046                    false_positive_rate_in_10000: 50,
2047                })
2048                .unwrap(),
2049                semantic_type: SemanticType::Field,
2050                column_id: 1,
2051            })
2052            .push_column_metadata(ColumnMetadata {
2053                column_schema: ColumnSchema::new(
2054                    "text_tantivy".to_string(),
2055                    ConcreteDataType::string_datatype(),
2056                    true,
2057                )
2058                .with_fulltext_options(FulltextOptions {
2059                    enable: true,
2060                    analyzer: FulltextAnalyzer::English,
2061                    case_sensitive: false,
2062                    backend: FulltextBackend::Tantivy,
2063                    granularity: 1,
2064                    false_positive_rate_in_10000: 50,
2065                })
2066                .unwrap(),
2067                semantic_type: SemanticType::Field,
2068                column_id: 2,
2069            })
2070            .push_column_metadata(ColumnMetadata {
2071                column_schema: ColumnSchema::new(
2072                    "field_0".to_string(),
2073                    ConcreteDataType::uint64_datatype(),
2074                    true,
2075                ),
2076                semantic_type: SemanticType::Field,
2077                column_id: 3,
2078            })
2079            .push_column_metadata(ColumnMetadata {
2080                column_schema: ColumnSchema::new(
2081                    "ts".to_string(),
2082                    ConcreteDataType::timestamp_millisecond_datatype(),
2083                    false,
2084                ),
2085                semantic_type: SemanticType::Timestamp,
2086                column_id: 4,
2087            })
2088            .primary_key(vec![0]);
2089        builder.build().unwrap()
2090    }
2091
2092    /// Creates a flat format RecordBatch with string fields for fulltext testing.
2093    fn new_fulltext_record_batch_by_range(
2094        tag: &str,
2095        text_bloom: &str,
2096        text_tantivy: &str,
2097        start: usize,
2098        end: usize,
2099    ) -> RecordBatch {
2100        assert!(end >= start);
2101        let metadata = Arc::new(fulltext_region_metadata());
2102        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2103
2104        let num_rows = end - start;
2105        let mut columns = Vec::new();
2106
2107        // Add primary key column (tag_0) as dictionary array
2108        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2109        for _ in 0..num_rows {
2110            tag_builder.append_value(tag);
2111        }
2112        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2113
2114        // Add text_bloom field (fulltext with bloom backend)
2115        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2116        columns.push(Arc::new(StringArray::from(text_bloom_values)));
2117
2118        // Add text_tantivy field (fulltext with tantivy backend)
2119        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2120        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2121
2122        // Add field column (field_0)
2123        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2124        columns.push(Arc::new(UInt64Array::from(field_values)));
2125
2126        // Add time index column (ts)
2127        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2128        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2129
2130        // Add encoded primary key column
2131        let pk = new_primary_key(&[tag]);
2132        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2133        for _ in 0..num_rows {
2134            pk_builder.append(&pk).unwrap();
2135        }
2136        columns.push(Arc::new(pk_builder.finish()));
2137
2138        // Add sequence column
2139        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2140
2141        // Add op_type column
2142        columns.push(Arc::new(UInt8Array::from_value(
2143            OpType::Put as u8,
2144            num_rows,
2145        )));
2146
2147        RecordBatch::try_new(flat_schema, columns).unwrap()
2148    }
2149
2150    #[tokio::test]
2151    async fn test_write_flat_read_with_fulltext_index() {
2152        let mut env = TestEnv::new().await;
2153        let object_store = env.init_object_store_manager();
2154        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2155        let metadata = Arc::new(fulltext_region_metadata());
2156        let row_group_size = 50;
2157
2158        // Create flat format RecordBatches with different text content
2159        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
2160        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
2161        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
2162        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
2163        let flat_batches = vec![
2164            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2165            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2166            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2167            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2168        ];
2169
2170        let flat_source = new_flat_source_from_record_batches(flat_batches);
2171
2172        let write_opts = WriteOptions {
2173            row_group_size,
2174            ..Default::default()
2175        };
2176
2177        let indexer_builder = create_test_indexer_builder(
2178            &env,
2179            object_store.clone(),
2180            file_path.clone(),
2181            metadata.clone(),
2182            row_group_size,
2183        );
2184
2185        let mut info = write_flat_sst(
2186            object_store.clone(),
2187            metadata.clone(),
2188            indexer_builder,
2189            file_path.clone(),
2190            flat_source,
2191            &write_opts,
2192        )
2193        .await;
2194        assert_eq!(200, info.num_rows);
2195        assert!(info.file_size > 0);
2196        assert!(info.index_metadata.file_size > 0);
2197
2198        // Verify fulltext indexes were created
2199        assert!(info.index_metadata.fulltext_index.index_size > 0);
2200        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2201        // text_bloom (column_id 1) and text_tantivy (column_id 2)
2202        info.index_metadata.fulltext_index.columns.sort_unstable();
2203        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2204
2205        assert_eq!(
2206            (
2207                Timestamp::new_millisecond(0),
2208                Timestamp::new_millisecond(199)
2209            ),
2210            info.time_range
2211        );
2212
2213        let handle = create_file_handle_from_sst_info(&info, &metadata);
2214
2215        let cache = create_test_cache();
2216
2217        // Helper functions to create fulltext function expressions
2218        let matches_func = || {
2219            Arc::new(
2220                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2221                    .provide(Default::default()),
2222            )
2223        };
2224
2225        let matches_term_func = || {
2226            Arc::new(
2227                ScalarFunctionFactory::from(
2228                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2229                )
2230                .provide(Default::default()),
2231            )
2232        };
2233
2234        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2235        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2236        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2237            args: vec![col("text_bloom"), "hello".lit()],
2238            func: matches_term_func(),
2239        })];
2240
2241        let fulltext_applier = FulltextIndexApplierBuilder::new(
2242            FILE_DIR.to_string(),
2243            PathType::Bare,
2244            object_store.clone(),
2245            env.get_puffin_manager(),
2246            &metadata,
2247        )
2248        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2249        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2250        .build(&preds)
2251        .unwrap()
2252        .map(Arc::new);
2253
2254        let builder = ParquetReaderBuilder::new(
2255            FILE_DIR.to_string(),
2256            PathType::Bare,
2257            handle.clone(),
2258            object_store.clone(),
2259        )
2260        .predicate(Some(Predicate::new(preds)))
2261        .fulltext_index_appliers([None, fulltext_applier.clone()])
2262        .cache(CacheStrategy::EnableAll(cache.clone()));
2263
2264        let mut metrics = ReaderMetrics::default();
2265        let (_context, selection) = builder
2266            .build_reader_input(&mut metrics)
2267            .await
2268            .unwrap()
2269            .unwrap();
2270
2271        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2272        assert_eq!(selection.row_group_count(), 2);
2273        assert_eq!(50, selection.get(0).unwrap().row_count());
2274        assert_eq!(50, selection.get(1).unwrap().row_count());
2275
2276        // Verify filtering metrics
2277        assert_eq!(metrics.filter_metrics.rg_total, 4);
2278        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2279        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2280        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2281
2282        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2283        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2284        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2285            args: vec![col("text_tantivy"), "lazy".lit()],
2286            func: matches_func(),
2287        })];
2288
2289        let fulltext_applier = FulltextIndexApplierBuilder::new(
2290            FILE_DIR.to_string(),
2291            PathType::Bare,
2292            object_store.clone(),
2293            env.get_puffin_manager(),
2294            &metadata,
2295        )
2296        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2297        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2298        .build(&preds)
2299        .unwrap()
2300        .map(Arc::new);
2301
2302        let builder = ParquetReaderBuilder::new(
2303            FILE_DIR.to_string(),
2304            PathType::Bare,
2305            handle.clone(),
2306            object_store.clone(),
2307        )
2308        .predicate(Some(Predicate::new(preds)))
2309        .fulltext_index_appliers([None, fulltext_applier.clone()])
2310        .cache(CacheStrategy::EnableAll(cache.clone()));
2311
2312        let mut metrics = ReaderMetrics::default();
2313        let (_context, selection) = builder
2314            .build_reader_input(&mut metrics)
2315            .await
2316            .unwrap()
2317            .unwrap();
2318
2319        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2320        assert_eq!(selection.row_group_count(), 2);
2321        assert_eq!(50, selection.get(2).unwrap().row_count());
2322        assert_eq!(50, selection.get(3).unwrap().row_count());
2323
2324        // Verify filtering metrics
2325        assert_eq!(metrics.filter_metrics.rg_total, 4);
2326        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2327        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2328        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2329    }
2330}