Skip to main content

mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod prefilter;
33pub(crate) mod push_decoder;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41/// Key of metadata in parquet SST.
42pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44/// Default batch size to read parquet files.
45///
46/// This is a runtime-only scan granularity, so we align it with DataFusion's
47/// default execution batch size to reduce rebatching and concatenation in the
48/// query pipeline.
49pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50/// Default row group size for parquet files.
51///
52/// Keep the existing persisted/on-disk default stable. It intentionally stays
53/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
54/// batching without changing the row group layout of newly written SSTs.
55pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57/// Parquet write options.
58#[derive(Debug, Clone)]
59pub struct WriteOptions {
60    /// Buffer size for async writer.
61    pub write_buffer_size: ReadableSize,
62    /// Row group size.
63    pub row_group_size: usize,
64    /// Max single output file size.
65    /// Note: This is not a hard limit as we can only observe the file size when
66    /// ArrowWrite writes to underlying writers.
67    pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71    fn default() -> Self {
72        WriteOptions {
73            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74            row_group_size: DEFAULT_ROW_GROUP_SIZE,
75            max_file_size: None,
76        }
77    }
78}
79
80/// Parquet SST info returned by the writer.
81#[derive(Debug, Default)]
82pub struct SstInfo {
83    /// SST file id.
84    pub file_id: FileId,
85    /// Time range of the SST. The timestamps have the same time unit as the
86    /// data in the SST.
87    pub time_range: FileTimeRange,
88    /// File size in bytes.
89    pub file_size: u64,
90    /// Maximum uncompressed row group size in bytes. 0 if unknown.
91    pub max_row_group_uncompressed_size: u64,
92    /// Number of rows.
93    pub num_rows: usize,
94    /// Number of row groups
95    pub num_row_groups: u64,
96    /// File Meta Data
97    pub file_metadata: Option<Arc<ParquetMetaData>>,
98    /// Index Meta Data
99    pub index_metadata: IndexOutput,
100    /// Number of series
101    pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106    use std::collections::HashSet;
107    use std::sync::Arc;
108
109    use api::v1::{OpType, SemanticType};
110    use common_function::function::FunctionRef;
111    use common_function::function_factory::ScalarFunctionFactory;
112    use common_function::scalars::matches::MatchesFunction;
113    use common_function::scalars::matches_term::MatchesTermFunction;
114    use common_time::Timestamp;
115    use datafusion_common::{Column, ScalarValue};
116    use datafusion_expr::expr::ScalarFunction;
117    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118    use datatypes::arrow;
119    use datatypes::arrow::array::{
120        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121        TimestampMillisecondArray, UInt8Array, UInt64Array,
122    };
123    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124    use datatypes::arrow::util::pretty::pretty_format_batches;
125    use datatypes::prelude::ConcreteDataType;
126    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127    use object_store::ObjectStore;
128    use parquet::arrow::AsyncArrowWriter;
129    use parquet::basic::{Compression, Encoding, ZstdLevel};
130    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131    use parquet::file::properties::WriterProperties;
132    use store_api::codec::PrimaryKeyEncoding;
133    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134    use store_api::region_request::PathType;
135    use store_api::storage::{ColumnSchema, RegionId};
136    use table::predicate::Predicate;
137    use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139    use super::*;
140    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141    use crate::cache::index::result_cache::PredicateKey;
142    use crate::cache::test_util::assert_parquet_metadata_equal;
143    use crate::cache::{CacheManager, CacheStrategy, PageKey};
144    use crate::config::IndexConfig;
145    use crate::read::FlatSource;
146    use crate::region::options::{IndexOptions, InvertedIndexOptions};
147    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
148    use crate::sst::file_purger::NoopFilePurger;
149    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
150    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
151    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
152    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
153    use crate::sst::parquet::flat_format::FlatWriteFormat;
154    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
155    use crate::sst::parquet::row_selection::RowGroupSelection;
156    use crate::sst::parquet::writer::ParquetWriter;
157    use crate::sst::{
158        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
159    };
160    use crate::test_util::TestEnv;
161    use crate::test_util::sst_util::{
162        build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
163        new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
164        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
165        sst_region_metadata_with_encoding,
166    };
167
168    const FILE_DIR: &str = "/";
169    const REGION_ID: RegionId = RegionId::new(0, 0);
170
171    #[derive(Clone)]
172    struct FixedPathProvider {
173        region_file_id: RegionFileId,
174    }
175
176    impl FilePathProvider for FixedPathProvider {
177        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
178            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
179        }
180
181        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
182            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
183        }
184
185        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
186            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
187        }
188    }
189
190    struct NoopIndexBuilder;
191
192    #[async_trait::async_trait]
193    impl IndexerBuilder for NoopIndexBuilder {
194        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
195            Indexer::default()
196        }
197    }
198
199    #[tokio::test]
200    async fn test_write_read() {
201        let mut env = TestEnv::new().await;
202        let object_store = env.init_object_store_manager();
203        let handle = sst_file_handle(0, 1000);
204        let file_path = FixedPathProvider {
205            region_file_id: handle.file_id(),
206        };
207        let metadata = Arc::new(sst_region_metadata());
208        let source = new_flat_source_from_record_batches(vec![
209            new_record_batch_by_range(&["a", "d"], 0, 60),
210            new_record_batch_by_range(&["b", "f"], 0, 40),
211            new_record_batch_by_range(&["b", "h"], 100, 200),
212        ]);
213        // Use a small row group size for test.
214        let write_opts = WriteOptions {
215            row_group_size: 50,
216            ..Default::default()
217        };
218
219        let mut metrics = Metrics::new(WriteType::Flush);
220        let mut writer = ParquetWriter::new_with_object_store(
221            object_store.clone(),
222            metadata.clone(),
223            IndexConfig::default(),
224            NoopIndexBuilder,
225            file_path,
226            &mut metrics,
227        )
228        .await;
229
230        let info = writer
231            .write_all_flat_as_primary_key(source, None, &write_opts)
232            .await
233            .unwrap()
234            .remove(0);
235        assert_eq!(200, info.num_rows);
236        assert!(info.file_size > 0);
237        assert_eq!(
238            (
239                Timestamp::new_millisecond(0),
240                Timestamp::new_millisecond(199)
241            ),
242            info.time_range
243        );
244
245        let builder = ParquetReaderBuilder::new(
246            FILE_DIR.to_string(),
247            PathType::Bare,
248            handle.clone(),
249            object_store,
250        );
251        let mut reader = builder.build().await.unwrap().unwrap();
252        check_record_batch_reader_result(
253            &mut reader,
254            &[
255                new_record_batch_by_range(&["a", "d"], 0, 50),
256                new_record_batch_by_range(&["a", "d"], 50, 60),
257                new_record_batch_by_range(&["b", "f"], 0, 40),
258                new_record_batch_by_range(&["b", "h"], 100, 150),
259                new_record_batch_by_range(&["b", "h"], 150, 200),
260            ],
261        )
262        .await;
263    }
264
265    #[tokio::test]
266    async fn test_read_with_cache() {
267        let mut env = TestEnv::new().await;
268        let object_store = env.init_object_store_manager();
269        let handle = sst_file_handle(0, 1000);
270        let metadata = Arc::new(sst_region_metadata());
271        let source = new_flat_source_from_record_batches(vec![
272            new_record_batch_by_range(&["a", "d"], 0, 60),
273            new_record_batch_by_range(&["b", "f"], 0, 40),
274            new_record_batch_by_range(&["b", "h"], 100, 200),
275        ]);
276        // Use a small row group size for test.
277        let write_opts = WriteOptions {
278            row_group_size: 50,
279            ..Default::default()
280        };
281        // Prepare data.
282        let mut metrics = Metrics::new(WriteType::Flush);
283        let mut writer = ParquetWriter::new_with_object_store(
284            object_store.clone(),
285            metadata.clone(),
286            IndexConfig::default(),
287            NoopIndexBuilder,
288            FixedPathProvider {
289                region_file_id: handle.file_id(),
290            },
291            &mut metrics,
292        )
293        .await;
294
295        let sst_info = writer
296            .write_all_flat_as_primary_key(source, None, &write_opts)
297            .await
298            .unwrap()
299            .remove(0);
300
301        // Enable page cache.
302        let cache = CacheStrategy::EnableAll(Arc::new(
303            CacheManager::builder()
304                .page_cache_size(64 * 1024 * 1024)
305                .build(),
306        ));
307        let builder = ParquetReaderBuilder::new(
308            FILE_DIR.to_string(),
309            PathType::Bare,
310            handle.clone(),
311            object_store,
312        )
313        .cache(cache.clone());
314        for _ in 0..3 {
315            let mut reader = builder.build().await.unwrap().unwrap();
316            check_record_batch_reader_result(
317                &mut reader,
318                &[
319                    new_record_batch_by_range(&["a", "d"], 0, 50),
320                    new_record_batch_by_range(&["a", "d"], 50, 60),
321                    new_record_batch_by_range(&["b", "f"], 0, 40),
322                    new_record_batch_by_range(&["b", "h"], 100, 150),
323                    new_record_batch_by_range(&["b", "h"], 150, 200),
324                ],
325            )
326            .await;
327        }
328
329        let parquet_meta = sst_info.file_metadata.unwrap();
330        let get_ranges = |row_group_idx: usize| {
331            let row_group = parquet_meta.row_group(row_group_idx);
332            let mut ranges = Vec::with_capacity(row_group.num_columns());
333            for i in 0..row_group.num_columns() {
334                let (start, length) = row_group.column(i).byte_range();
335                ranges.push(start..start + length);
336            }
337
338            ranges
339        };
340
341        // Cache 4 row groups.
342        for i in 0..4 {
343            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
344            assert!(cache.get_pages(&page_key).is_some());
345        }
346        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
347        assert!(cache.get_pages(&page_key).is_none());
348    }
349
350    #[tokio::test]
351    async fn test_parquet_metadata_eq() {
352        // create test env
353        let mut env = crate::test_util::TestEnv::new().await;
354        let object_store = env.init_object_store_manager();
355        let handle = sst_file_handle(0, 1000);
356        let metadata = Arc::new(sst_region_metadata());
357        let source = new_flat_source_from_record_batches(vec![
358            new_record_batch_by_range(&["a", "d"], 0, 60),
359            new_record_batch_by_range(&["b", "f"], 0, 40),
360            new_record_batch_by_range(&["b", "h"], 100, 200),
361        ]);
362        let write_opts = WriteOptions {
363            row_group_size: 50,
364            ..Default::default()
365        };
366
367        // write the sst file and get sst info
368        // sst info contains the parquet metadata, which is converted from FileMetaData
369        let mut metrics = Metrics::new(WriteType::Flush);
370        let mut writer = ParquetWriter::new_with_object_store(
371            object_store.clone(),
372            metadata.clone(),
373            IndexConfig::default(),
374            NoopIndexBuilder,
375            FixedPathProvider {
376                region_file_id: handle.file_id(),
377            },
378            &mut metrics,
379        )
380        .await;
381
382        let sst_info = writer
383            .write_all_flat_as_primary_key(source, None, &write_opts)
384            .await
385            .unwrap()
386            .remove(0);
387        let writer_metadata = sst_info.file_metadata.unwrap();
388
389        // read the sst file metadata
390        let builder = ParquetReaderBuilder::new(
391            FILE_DIR.to_string(),
392            PathType::Bare,
393            handle.clone(),
394            object_store,
395        )
396        .page_index_policy(PageIndexPolicy::Optional);
397        let reader = builder.build().await.unwrap().unwrap();
398        let reader_metadata = reader.parquet_metadata();
399        let cached_writer_metadata =
400            crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
401                .unwrap()
402                .parquet_metadata();
403
404        assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
405    }
406
407    #[tokio::test]
408    async fn test_read_with_tag_filter() {
409        let mut env = TestEnv::new().await;
410        let object_store = env.init_object_store_manager();
411        let handle = sst_file_handle(0, 1000);
412        let metadata = Arc::new(sst_region_metadata());
413        let source = new_flat_source_from_record_batches(vec![
414            new_record_batch_by_range(&["a", "d"], 0, 60),
415            new_record_batch_by_range(&["b", "f"], 0, 40),
416            new_record_batch_by_range(&["b", "h"], 100, 200),
417        ]);
418        // Use a small row group size for test.
419        let write_opts = WriteOptions {
420            row_group_size: 50,
421            ..Default::default()
422        };
423        // Prepare data.
424        let mut metrics = Metrics::new(WriteType::Flush);
425        let mut writer = ParquetWriter::new_with_object_store(
426            object_store.clone(),
427            metadata.clone(),
428            IndexConfig::default(),
429            NoopIndexBuilder,
430            FixedPathProvider {
431                region_file_id: handle.file_id(),
432            },
433            &mut metrics,
434        )
435        .await;
436        writer
437            .write_all_flat_as_primary_key(source, None, &write_opts)
438            .await
439            .unwrap()
440            .remove(0);
441
442        // Predicate
443        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
444            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
445            op: Operator::Eq,
446            right: Box::new("a".lit()),
447        })]));
448
449        let builder = ParquetReaderBuilder::new(
450            FILE_DIR.to_string(),
451            PathType::Bare,
452            handle.clone(),
453            object_store,
454        )
455        .predicate(predicate);
456        let mut reader = builder.build().await.unwrap().unwrap();
457        check_record_batch_reader_result(
458            &mut reader,
459            &[
460                new_record_batch_by_range(&["a", "d"], 0, 50),
461                new_record_batch_by_range(&["a", "d"], 50, 60),
462            ],
463        )
464        .await;
465    }
466
467    #[tokio::test]
468    async fn test_read_empty_batch() {
469        let mut env = TestEnv::new().await;
470        let object_store = env.init_object_store_manager();
471        let handle = sst_file_handle(0, 1000);
472        let metadata = Arc::new(sst_region_metadata());
473        let source = new_flat_source_from_record_batches(vec![
474            new_record_batch_by_range(&["a", "z"], 0, 0),
475            new_record_batch_by_range(&["a", "z"], 100, 100),
476            new_record_batch_by_range(&["a", "z"], 200, 230),
477        ]);
478        // Use a small row group size for test.
479        let write_opts = WriteOptions {
480            row_group_size: 50,
481            ..Default::default()
482        };
483        // Prepare data.
484        let mut metrics = Metrics::new(WriteType::Flush);
485        let mut writer = ParquetWriter::new_with_object_store(
486            object_store.clone(),
487            metadata.clone(),
488            IndexConfig::default(),
489            NoopIndexBuilder,
490            FixedPathProvider {
491                region_file_id: handle.file_id(),
492            },
493            &mut metrics,
494        )
495        .await;
496        writer
497            .write_all_flat_as_primary_key(source, None, &write_opts)
498            .await
499            .unwrap()
500            .remove(0);
501
502        let builder = ParquetReaderBuilder::new(
503            FILE_DIR.to_string(),
504            PathType::Bare,
505            handle.clone(),
506            object_store,
507        );
508        let mut reader = builder.build().await.unwrap().unwrap();
509        check_record_batch_reader_result(
510            &mut reader,
511            &[new_record_batch_by_range(&["a", "z"], 200, 230)],
512        )
513        .await;
514    }
515
516    #[tokio::test]
517    async fn test_read_with_field_filter() {
518        let mut env = TestEnv::new().await;
519        let object_store = env.init_object_store_manager();
520        let handle = sst_file_handle(0, 1000);
521        let metadata = Arc::new(sst_region_metadata());
522        let source = new_flat_source_from_record_batches(vec![
523            new_record_batch_by_range(&["a", "d"], 0, 60),
524            new_record_batch_by_range(&["b", "f"], 0, 40),
525            new_record_batch_by_range(&["b", "h"], 100, 200),
526        ]);
527        // Use a small row group size for test.
528        let write_opts = WriteOptions {
529            row_group_size: 50,
530            ..Default::default()
531        };
532        // Prepare data.
533        let mut metrics = Metrics::new(WriteType::Flush);
534        let mut writer = ParquetWriter::new_with_object_store(
535            object_store.clone(),
536            metadata.clone(),
537            IndexConfig::default(),
538            NoopIndexBuilder,
539            FixedPathProvider {
540                region_file_id: handle.file_id(),
541            },
542            &mut metrics,
543        )
544        .await;
545
546        writer
547            .write_all_flat_as_primary_key(source, None, &write_opts)
548            .await
549            .unwrap()
550            .remove(0);
551
552        // Predicate
553        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
554            left: Box::new(Expr::Column(Column::from_name("field_0"))),
555            op: Operator::GtEq,
556            right: Box::new(150u64.lit()),
557        })]));
558
559        let builder = ParquetReaderBuilder::new(
560            FILE_DIR.to_string(),
561            PathType::Bare,
562            handle.clone(),
563            object_store,
564        )
565        .predicate(predicate);
566        let mut reader = builder.build().await.unwrap().unwrap();
567        check_record_batch_reader_result(
568            &mut reader,
569            &[new_record_batch_by_range(&["b", "h"], 150, 200)],
570        )
571        .await;
572    }
573
574    #[tokio::test]
575    async fn test_read_large_binary() {
576        let mut env = TestEnv::new().await;
577        let object_store = env.init_object_store_manager();
578        let handle = sst_file_handle(0, 1000);
579        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
580
581        let write_opts = WriteOptions {
582            row_group_size: 50,
583            ..Default::default()
584        };
585
586        let metadata = build_test_binary_test_region_metadata();
587        let json = metadata.to_json().unwrap();
588        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
589
590        let props_builder = WriterProperties::builder()
591            .set_key_value_metadata(Some(vec![key_value_meta]))
592            .set_compression(Compression::ZSTD(ZstdLevel::default()))
593            .set_encoding(Encoding::PLAIN)
594            .set_max_row_group_row_count(Some(write_opts.row_group_size));
595
596        let writer_props = props_builder.build();
597
598        let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
599        let fields: Vec<_> = write_format
600            .arrow_schema()
601            .fields()
602            .into_iter()
603            .map(|field| {
604                let data_type = field.data_type().clone();
605                if data_type == DataType::Binary {
606                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
607                } else {
608                    Field::new(field.name(), data_type, field.is_nullable())
609                }
610            })
611            .collect();
612
613        let arrow_schema = Arc::new(Schema::new(fields));
614
615        // Ensures field_0 has LargeBinary type.
616        assert_eq!(
617            &DataType::LargeBinary,
618            arrow_schema.field_with_name("field_0").unwrap().data_type()
619        );
620        let mut writer = AsyncArrowWriter::try_new(
621            object_store
622                .writer_with(&file_path)
623                .concurrent(DEFAULT_WRITE_CONCURRENCY)
624                .await
625                .map(|w| w.into_futures_async_write().compat_write())
626                .unwrap(),
627            arrow_schema.clone(),
628            Some(writer_props),
629        )
630        .unwrap();
631
632        let batch = new_record_batch_with_binary(&["a"], 0, 60);
633        let arrays: Vec<_> = batch
634            .columns()
635            .iter()
636            .map(|array| {
637                let data_type = array.data_type().clone();
638                if data_type == DataType::Binary {
639                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
640                } else {
641                    array.clone()
642                }
643            })
644            .collect();
645        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
646
647        writer.write(&result).await.unwrap();
648        writer.close().await.unwrap();
649
650        let builder = ParquetReaderBuilder::new(
651            FILE_DIR.to_string(),
652            PathType::Bare,
653            handle.clone(),
654            object_store,
655        );
656        let mut reader = builder.build().await.unwrap().unwrap();
657        check_record_batch_reader_result(
658            &mut reader,
659            &[
660                new_record_batch_with_binary(&["a"], 0, 50),
661                new_record_batch_with_binary(&["a"], 50, 60),
662            ],
663        )
664        .await;
665    }
666
667    #[tokio::test]
668    async fn test_write_multiple_files() {
669        common_telemetry::init_default_ut_logging();
670        // create test env
671        let mut env = TestEnv::new().await;
672        let object_store = env.init_object_store_manager();
673        let metadata = Arc::new(sst_region_metadata());
674        let batches = vec![
675            new_record_batch_by_range(&["a", "d"], 0, 1000),
676            new_record_batch_by_range(&["b", "f"], 0, 1000),
677            new_record_batch_by_range(&["c", "g"], 0, 1000),
678            new_record_batch_by_range(&["b", "h"], 100, 200),
679            new_record_batch_by_range(&["b", "h"], 200, 300),
680            new_record_batch_by_range(&["b", "h"], 300, 1000),
681        ];
682        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
683
684        let source = new_flat_source_from_record_batches(batches);
685        let write_opts = WriteOptions {
686            row_group_size: 50,
687            max_file_size: Some(1024 * 16),
688            ..Default::default()
689        };
690
691        let path_provider = RegionFilePathFactory {
692            table_dir: "test".to_string(),
693            path_type: PathType::Bare,
694        };
695        let mut metrics = Metrics::new(WriteType::Flush);
696        let mut writer = ParquetWriter::new_with_object_store(
697            object_store.clone(),
698            metadata.clone(),
699            IndexConfig::default(),
700            NoopIndexBuilder,
701            path_provider,
702            &mut metrics,
703        )
704        .await;
705
706        let files = writer
707            .write_all_flat_as_primary_key(source, None, &write_opts)
708            .await
709            .unwrap();
710        assert_eq!(2, files.len());
711
712        let mut rows_read = 0;
713        for f in &files {
714            let file_handle = sst_file_handle_with_file_id(
715                f.file_id,
716                f.time_range.0.value(),
717                f.time_range.1.value(),
718            );
719            let builder = ParquetReaderBuilder::new(
720                "test".to_string(),
721                PathType::Bare,
722                file_handle,
723                object_store.clone(),
724            );
725            let mut reader = builder.build().await.unwrap().unwrap();
726            while let Some(batch) = reader.next_record_batch().await.unwrap() {
727                rows_read += batch.num_rows();
728            }
729        }
730        assert_eq!(total_rows, rows_read);
731    }
732
733    #[tokio::test]
734    async fn test_write_read_with_index() {
735        let mut env = TestEnv::new().await;
736        let object_store = env.init_object_store_manager();
737        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
738        let metadata = Arc::new(sst_region_metadata());
739        let row_group_size = 50;
740
741        let source = new_flat_source_from_record_batches(vec![
742            new_record_batch_by_range(&["a", "d"], 0, 20),
743            new_record_batch_by_range(&["b", "d"], 0, 20),
744            new_record_batch_by_range(&["c", "d"], 0, 20),
745            new_record_batch_by_range(&["c", "f"], 0, 40),
746            new_record_batch_by_range(&["c", "h"], 100, 200),
747        ]);
748        // Use a small row group size for test.
749        let write_opts = WriteOptions {
750            row_group_size,
751            ..Default::default()
752        };
753
754        let puffin_manager = env
755            .get_puffin_manager()
756            .build(object_store.clone(), file_path.clone());
757        let intermediate_manager = env.get_intermediate_manager();
758
759        let indexer_builder = IndexerBuilderImpl {
760            build_type: IndexBuildType::Flush,
761            metadata: metadata.clone(),
762            row_group_size,
763            puffin_manager,
764            write_cache_enabled: false,
765            intermediate_manager,
766            index_options: IndexOptions {
767                inverted_index: InvertedIndexOptions {
768                    segment_row_count: 1,
769                    ..Default::default()
770                },
771            },
772            inverted_index_config: Default::default(),
773            fulltext_index_config: Default::default(),
774            bloom_filter_index_config: Default::default(),
775            #[cfg(feature = "vector_index")]
776            vector_index_config: Default::default(),
777        };
778
779        let mut metrics = Metrics::new(WriteType::Flush);
780        let mut writer = ParquetWriter::new_with_object_store(
781            object_store.clone(),
782            metadata.clone(),
783            IndexConfig::default(),
784            indexer_builder,
785            file_path.clone(),
786            &mut metrics,
787        )
788        .await;
789
790        let info = writer
791            .write_all_flat_as_primary_key(source, None, &write_opts)
792            .await
793            .unwrap()
794            .remove(0);
795        assert_eq!(200, info.num_rows);
796        assert!(info.file_size > 0);
797        assert!(info.index_metadata.file_size > 0);
798
799        assert!(info.index_metadata.inverted_index.index_size > 0);
800        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
801        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
802
803        assert!(info.index_metadata.bloom_filter.index_size > 0);
804        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
805        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
806
807        assert_eq!(
808            (
809                Timestamp::new_millisecond(0),
810                Timestamp::new_millisecond(199)
811            ),
812            info.time_range
813        );
814
815        let handle = FileHandle::new(
816            FileMeta {
817                region_id: metadata.region_id,
818                file_id: info.file_id,
819                time_range: info.time_range,
820                level: 0,
821                file_size: info.file_size,
822                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
823                available_indexes: info.index_metadata.build_available_indexes(),
824                indexes: info.index_metadata.build_indexes(),
825                index_file_size: info.index_metadata.file_size,
826                index_version: 0,
827                num_row_groups: info.num_row_groups,
828                num_rows: info.num_rows as u64,
829                sequence: None,
830                partition_expr: match &metadata.partition_expr {
831                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
832                        .expect("partition expression should be valid JSON"),
833                    None => None,
834                },
835                num_series: 0,
836                ..Default::default()
837            },
838            Arc::new(NoopFilePurger),
839        );
840
841        let cache = Arc::new(
842            CacheManager::builder()
843                .index_result_cache_size(1024 * 1024)
844                .index_metadata_size(1024 * 1024)
845                .index_content_page_size(1024 * 1024)
846                .index_content_size(1024 * 1024)
847                .puffin_metadata_size(1024 * 1024)
848                .build(),
849        );
850        let index_result_cache = cache.index_result_cache().unwrap();
851
852        let build_inverted_index_applier = |exprs: &[Expr]| {
853            InvertedIndexApplierBuilder::new(
854                FILE_DIR.to_string(),
855                PathType::Bare,
856                object_store.clone(),
857                &metadata,
858                HashSet::from_iter([0]),
859                env.get_puffin_manager(),
860            )
861            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
862            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
863            .build(exprs)
864            .unwrap()
865            .map(Arc::new)
866        };
867
868        let build_bloom_filter_applier = |exprs: &[Expr]| {
869            BloomFilterIndexApplierBuilder::new(
870                FILE_DIR.to_string(),
871                PathType::Bare,
872                object_store.clone(),
873                &metadata,
874                env.get_puffin_manager(),
875            )
876            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
877            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
878            .build(exprs)
879            .unwrap()
880            .map(Arc::new)
881        };
882
883        // Data: ts tag_0 tag_1
884        // Data: 0-20 [a, d]
885        //       0-20 [b, d]
886        //       0-20 [c, d]
887        //       0-40 [c, f]
888        //    100-200 [c, h]
889        //
890        // Pred: tag_0 = "b"
891        //
892        // Row groups & rows pruning:
893        //
894        // Row Groups:
895        // - min-max: filter out row groups 1..=3
896        //
897        // Rows:
898        // - inverted index: hit row group 0, hit 20 rows
899        let preds = vec![col("tag_0").eq(lit("b"))];
900        let inverted_index_applier = build_inverted_index_applier(&preds);
901        let bloom_filter_applier = build_bloom_filter_applier(&preds);
902
903        let builder = ParquetReaderBuilder::new(
904            FILE_DIR.to_string(),
905            PathType::Bare,
906            handle.clone(),
907            object_store.clone(),
908        )
909        .predicate(Some(Predicate::new(preds)))
910        .inverted_index_appliers([inverted_index_applier.clone(), None])
911        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
912        .cache(CacheStrategy::EnableAll(cache.clone()));
913
914        let mut metrics = ReaderMetrics::default();
915        let (context, selection) = builder
916            .build_reader_input(&mut metrics)
917            .await
918            .unwrap()
919            .unwrap();
920        let mut reader = ParquetReader::new(Arc::new(context), selection)
921            .await
922            .unwrap();
923        check_record_batch_reader_result(
924            &mut reader,
925            &[new_record_batch_by_range(&["b", "d"], 0, 20)],
926        )
927        .await;
928
929        assert_eq!(metrics.filter_metrics.rg_total, 4);
930        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
931        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
932        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
933        let plan = inverted_index_applier
934            .as_ref()
935            .unwrap()
936            .plan_for_sst(&metadata)
937            .unwrap()
938            .unwrap();
939        let cached = index_result_cache
940            .get(&plan.predicate_key, handle.file_id().file_id())
941            .unwrap();
942        // inverted index will search all row groups
943        assert!(cached.contains_row_group(0));
944        assert!(cached.contains_row_group(1));
945        assert!(cached.contains_row_group(2));
946        assert!(cached.contains_row_group(3));
947
948        // Data: ts tag_0 tag_1
949        // Data: 0-20 [a, d]
950        //       0-20 [b, d]
951        //       0-20 [c, d]
952        //       0-40 [c, f]
953        //    100-200 [c, h]
954        //
955        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
956        //
957        // Row groups & rows pruning:
958        //
959        // Row Groups:
960        // - min-max: filter out row groups 0..=1
961        // - bloom filter: filter out row groups 2..=3
962        let preds = vec![
963            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
964            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
965            col("tag_1").eq(lit("d")),
966        ];
967        let inverted_index_applier = build_inverted_index_applier(&preds);
968        let bloom_filter_applier = build_bloom_filter_applier(&preds);
969
970        let builder = ParquetReaderBuilder::new(
971            FILE_DIR.to_string(),
972            PathType::Bare,
973            handle.clone(),
974            object_store.clone(),
975        )
976        .predicate(Some(Predicate::new(preds)))
977        .inverted_index_appliers([inverted_index_applier.clone(), None])
978        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
979        .cache(CacheStrategy::EnableAll(cache.clone()));
980
981        let mut metrics = ReaderMetrics::default();
982        let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
983        assert!(read_input.is_none());
984
985        assert_eq!(metrics.filter_metrics.rg_total, 4);
986        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
987        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
988        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
989        let bloom_predicates = bloom_filter_applier
990            .as_ref()
991            .unwrap()
992            .compatible_predicate_for_sst(&metadata)
993            .unwrap();
994        let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
995        let cached = index_result_cache
996            .get(&bloom_predicate_key, handle.file_id().file_id())
997            .unwrap();
998        assert!(cached.contains_row_group(2));
999        assert!(cached.contains_row_group(3));
1000        assert!(!cached.contains_row_group(0));
1001        assert!(!cached.contains_row_group(1));
1002
1003        // Remove the pred of `ts`, continue to use the pred of `tag_1`
1004        // to test if cache works.
1005
1006        // Data: ts tag_0 tag_1
1007        // Data: 0-20 [a, d]
1008        //       0-20 [b, d]
1009        //       0-20 [c, d]
1010        //       0-40 [c, f]
1011        //    100-200 [c, h]
1012        //
1013        // Pred: tag_1 = "d"
1014        //
1015        // Row groups & rows pruning:
1016        //
1017        // Row Groups:
1018        // - bloom filter: filter out row groups 2..=3
1019        //
1020        // Rows:
1021        // - bloom filter: hit row group 0, hit 50 rows
1022        //                 hit row group 1, hit 10 rows
1023        let preds = vec![col("tag_1").eq(lit("d"))];
1024        let inverted_index_applier = build_inverted_index_applier(&preds);
1025        let bloom_filter_applier = build_bloom_filter_applier(&preds);
1026
1027        let builder = ParquetReaderBuilder::new(
1028            FILE_DIR.to_string(),
1029            PathType::Bare,
1030            handle.clone(),
1031            object_store.clone(),
1032        )
1033        .predicate(Some(Predicate::new(preds)))
1034        .inverted_index_appliers([inverted_index_applier.clone(), None])
1035        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1036        .cache(CacheStrategy::EnableAll(cache.clone()));
1037
1038        let mut metrics = ReaderMetrics::default();
1039        let (context, selection) = builder
1040            .build_reader_input(&mut metrics)
1041            .await
1042            .unwrap()
1043            .unwrap();
1044        let mut reader = ParquetReader::new(Arc::new(context), selection)
1045            .await
1046            .unwrap();
1047        check_record_batch_reader_result(
1048            &mut reader,
1049            &[
1050                new_record_batch_by_range(&["a", "d"], 0, 20),
1051                new_record_batch_by_range(&["b", "d"], 0, 20),
1052                new_record_batch_by_range(&["c", "d"], 0, 10),
1053                new_record_batch_by_range(&["c", "d"], 10, 20),
1054            ],
1055        )
1056        .await;
1057
1058        assert_eq!(metrics.filter_metrics.rg_total, 4);
1059        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1060        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1061        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1062        let bloom_predicates = bloom_filter_applier
1063            .as_ref()
1064            .unwrap()
1065            .compatible_predicate_for_sst(&metadata)
1066            .unwrap();
1067        let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1068        let cached = index_result_cache
1069            .get(&bloom_predicate_key, handle.file_id().file_id())
1070            .unwrap();
1071        assert!(cached.contains_row_group(0));
1072        assert!(cached.contains_row_group(1));
1073        assert!(cached.contains_row_group(2));
1074        assert!(cached.contains_row_group(3));
1075    }
1076
1077    fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1078        assert!(end >= start);
1079        let metadata = build_test_binary_test_region_metadata();
1080        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1081
1082        let num_rows = end - start;
1083        let mut columns = Vec::new();
1084
1085        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1086        for _ in 0..num_rows {
1087            tag_0_builder.append_value(tags[0]);
1088        }
1089        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1090
1091        let values = (0..num_rows)
1092            .map(|_| "some data".as_bytes())
1093            .collect::<Vec<_>>();
1094        columns.push(
1095            Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1096                values,
1097            )) as ArrayRef,
1098        );
1099
1100        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1101        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1102
1103        let pk = new_primary_key(tags);
1104        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1105        for _ in 0..num_rows {
1106            pk_builder.append(&pk).unwrap();
1107        }
1108        columns.push(Arc::new(pk_builder.finish()));
1109
1110        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1111        columns.push(Arc::new(UInt8Array::from_value(
1112            OpType::Put as u8,
1113            num_rows,
1114        )));
1115
1116        RecordBatch::try_new(flat_schema, columns).unwrap()
1117    }
1118
1119    async fn check_record_batch_reader_result(
1120        reader: &mut ParquetReader,
1121        expected: &[RecordBatch],
1122    ) {
1123        let mut actual = Vec::new();
1124        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1125            actual.push(batch);
1126        }
1127        assert_eq!(
1128            pretty_format_batches(expected).unwrap().to_string(),
1129            pretty_format_batches(&actual).unwrap().to_string()
1130        );
1131        assert!(reader.next_record_batch().await.unwrap().is_none());
1132    }
1133
1134    fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1135        let metadata = Arc::new(sst_region_metadata());
1136        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1137
1138        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1139        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1140        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1141        let mut field_values = Vec::with_capacity(rows.len());
1142        let mut timestamps = Vec::with_capacity(rows.len());
1143
1144        for (tag_0, tag_1, ts) in rows {
1145            tag_0_builder.append_value(*tag_0);
1146            tag_1_builder.append_value(*tag_1);
1147            pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1148            field_values.push(*ts as u64);
1149            timestamps.push(*ts);
1150        }
1151
1152        RecordBatch::try_new(
1153            flat_schema,
1154            vec![
1155                Arc::new(tag_0_builder.finish()) as ArrayRef,
1156                Arc::new(tag_1_builder.finish()) as ArrayRef,
1157                Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1158                Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1159                Arc::new(pk_builder.finish()) as ArrayRef,
1160                Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1161                Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1162            ],
1163        )
1164        .unwrap()
1165    }
1166
1167    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1168    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1169    fn new_record_batch_by_range_sparse(
1170        tags: &[&str],
1171        start: usize,
1172        end: usize,
1173        metadata: &Arc<RegionMetadata>,
1174    ) -> RecordBatch {
1175        assert!(end >= start);
1176        let flat_schema = to_flat_sst_arrow_schema(
1177            metadata,
1178            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1179        );
1180
1181        let num_rows = end - start;
1182        let mut columns: Vec<ArrayRef> = Vec::new();
1183
1184        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1185
1186        // Add field column (field_0)
1187        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1188        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1189
1190        // Add time index column (ts)
1191        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1192        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1193
1194        // Add encoded primary key column using sparse encoding
1195        let table_id = 1u32; // Test table ID
1196        let tsid = 100u64; // Base TSID
1197        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1198
1199        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1200        for _ in 0..num_rows {
1201            pk_builder.append(&pk).unwrap();
1202        }
1203        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1204
1205        // Add sequence column
1206        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1207
1208        // Add op_type column
1209        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1210
1211        RecordBatch::try_new(flat_schema, columns).unwrap()
1212    }
1213
1214    /// Helper function to create IndexerBuilderImpl for tests.
1215    fn create_test_indexer_builder(
1216        env: &TestEnv,
1217        object_store: ObjectStore,
1218        file_path: RegionFilePathFactory,
1219        metadata: Arc<RegionMetadata>,
1220        row_group_size: usize,
1221    ) -> IndexerBuilderImpl {
1222        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1223        let intermediate_manager = env.get_intermediate_manager();
1224
1225        IndexerBuilderImpl {
1226            build_type: IndexBuildType::Flush,
1227            metadata,
1228            row_group_size,
1229            puffin_manager,
1230            write_cache_enabled: false,
1231            intermediate_manager,
1232            index_options: IndexOptions {
1233                inverted_index: InvertedIndexOptions {
1234                    segment_row_count: 1,
1235                    ..Default::default()
1236                },
1237            },
1238            inverted_index_config: Default::default(),
1239            fulltext_index_config: Default::default(),
1240            bloom_filter_index_config: Default::default(),
1241            #[cfg(feature = "vector_index")]
1242            vector_index_config: Default::default(),
1243        }
1244    }
1245
1246    /// Helper function to write flat SST and return SstInfo.
1247    async fn write_flat_sst(
1248        object_store: ObjectStore,
1249        metadata: Arc<RegionMetadata>,
1250        indexer_builder: IndexerBuilderImpl,
1251        file_path: RegionFilePathFactory,
1252        flat_source: FlatSource,
1253        write_opts: &WriteOptions,
1254    ) -> SstInfo {
1255        let mut metrics = Metrics::new(WriteType::Flush);
1256        let mut writer = ParquetWriter::new_with_object_store(
1257            object_store,
1258            metadata,
1259            IndexConfig::default(),
1260            indexer_builder,
1261            file_path,
1262            &mut metrics,
1263        )
1264        .await;
1265
1266        writer
1267            .write_all_flat(flat_source, None, write_opts)
1268            .await
1269            .unwrap()
1270            .remove(0)
1271    }
1272
1273    /// Helper function to create FileHandle from SstInfo.
1274    fn create_file_handle_from_sst_info(
1275        info: &SstInfo,
1276        metadata: &Arc<RegionMetadata>,
1277    ) -> FileHandle {
1278        FileHandle::new(
1279            FileMeta {
1280                region_id: metadata.region_id,
1281                file_id: info.file_id,
1282                time_range: info.time_range,
1283                level: 0,
1284                file_size: info.file_size,
1285                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1286                available_indexes: info.index_metadata.build_available_indexes(),
1287                indexes: info.index_metadata.build_indexes(),
1288                index_file_size: info.index_metadata.file_size,
1289                index_version: 0,
1290                num_row_groups: info.num_row_groups,
1291                num_rows: info.num_rows as u64,
1292                sequence: None,
1293                partition_expr: match &metadata.partition_expr {
1294                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1295                        .expect("partition expression should be valid JSON"),
1296                    None => None,
1297                },
1298                num_series: 0,
1299                ..Default::default()
1300            },
1301            Arc::new(NoopFilePurger),
1302        )
1303    }
1304
1305    /// Helper function to create test cache with standard settings.
1306    fn create_test_cache() -> Arc<CacheManager> {
1307        Arc::new(
1308            CacheManager::builder()
1309                .index_result_cache_size(1024 * 1024)
1310                .index_metadata_size(1024 * 1024)
1311                .index_content_page_size(1024 * 1024)
1312                .index_content_size(1024 * 1024)
1313                .puffin_metadata_size(1024 * 1024)
1314                .build(),
1315        )
1316    }
1317
1318    #[tokio::test]
1319    async fn test_write_flat_with_index() {
1320        let mut env = TestEnv::new().await;
1321        let object_store = env.init_object_store_manager();
1322        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1323        let metadata = Arc::new(sst_region_metadata());
1324        let row_group_size = 50;
1325
1326        // Create flat format RecordBatches
1327        let flat_batches = vec![
1328            new_record_batch_by_range(&["a", "d"], 0, 20),
1329            new_record_batch_by_range(&["b", "d"], 0, 20),
1330            new_record_batch_by_range(&["c", "d"], 0, 20),
1331            new_record_batch_by_range(&["c", "f"], 0, 40),
1332            new_record_batch_by_range(&["c", "h"], 100, 200),
1333        ];
1334
1335        let flat_source = new_flat_source_from_record_batches(flat_batches);
1336
1337        let write_opts = WriteOptions {
1338            row_group_size,
1339            ..Default::default()
1340        };
1341
1342        let puffin_manager = env
1343            .get_puffin_manager()
1344            .build(object_store.clone(), file_path.clone());
1345        let intermediate_manager = env.get_intermediate_manager();
1346
1347        let indexer_builder = IndexerBuilderImpl {
1348            build_type: IndexBuildType::Flush,
1349            metadata: metadata.clone(),
1350            row_group_size,
1351            puffin_manager,
1352            write_cache_enabled: false,
1353            intermediate_manager,
1354            index_options: IndexOptions {
1355                inverted_index: InvertedIndexOptions {
1356                    segment_row_count: 1,
1357                    ..Default::default()
1358                },
1359            },
1360            inverted_index_config: Default::default(),
1361            fulltext_index_config: Default::default(),
1362            bloom_filter_index_config: Default::default(),
1363            #[cfg(feature = "vector_index")]
1364            vector_index_config: Default::default(),
1365        };
1366
1367        let mut metrics = Metrics::new(WriteType::Flush);
1368        let mut writer = ParquetWriter::new_with_object_store(
1369            object_store.clone(),
1370            metadata.clone(),
1371            IndexConfig::default(),
1372            indexer_builder,
1373            file_path.clone(),
1374            &mut metrics,
1375        )
1376        .await;
1377
1378        let info = writer
1379            .write_all_flat(flat_source, None, &write_opts)
1380            .await
1381            .unwrap()
1382            .remove(0);
1383        assert_eq!(200, info.num_rows);
1384        assert!(info.file_size > 0);
1385        assert!(info.index_metadata.file_size > 0);
1386
1387        assert!(info.index_metadata.inverted_index.index_size > 0);
1388        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1389        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1390
1391        assert!(info.index_metadata.bloom_filter.index_size > 0);
1392        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1393        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1394
1395        assert_eq!(
1396            (
1397                Timestamp::new_millisecond(0),
1398                Timestamp::new_millisecond(199)
1399            ),
1400            info.time_range
1401        );
1402    }
1403
1404    #[tokio::test]
1405    async fn test_read_with_override_sequence() {
1406        let mut env = TestEnv::new().await;
1407        let object_store = env.init_object_store_manager();
1408        let handle = sst_file_handle(0, 1000);
1409        let file_path = FixedPathProvider {
1410            region_file_id: handle.file_id(),
1411        };
1412        let metadata = Arc::new(sst_region_metadata());
1413
1414        // Create batches with sequence 0 to trigger override functionality.
1415        let source = new_flat_source_from_record_batches(vec![
1416            new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1417            new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1418        ]);
1419
1420        let write_opts = WriteOptions {
1421            row_group_size: 50,
1422            ..Default::default()
1423        };
1424
1425        let mut metrics = Metrics::new(WriteType::Flush);
1426        let mut writer = ParquetWriter::new_with_object_store(
1427            object_store.clone(),
1428            metadata.clone(),
1429            IndexConfig::default(),
1430            NoopIndexBuilder,
1431            file_path,
1432            &mut metrics,
1433        )
1434        .await;
1435
1436        writer
1437            .write_all_flat_as_primary_key(source, None, &write_opts)
1438            .await
1439            .unwrap()
1440            .remove(0);
1441
1442        // Read without override sequence (should read sequence 0)
1443        let builder = ParquetReaderBuilder::new(
1444            FILE_DIR.to_string(),
1445            PathType::Bare,
1446            handle.clone(),
1447            object_store.clone(),
1448        );
1449        let mut reader = builder.build().await.unwrap().unwrap();
1450        let mut normal_batches = Vec::new();
1451        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1452            normal_batches.push(batch);
1453        }
1454
1455        // Read with override sequence using FileMeta.sequence
1456        let custom_sequence = 12345u64;
1457        let file_meta = handle.meta_ref();
1458        let mut override_file_meta = file_meta.clone();
1459        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1460        let override_handle = FileHandle::new(
1461            override_file_meta,
1462            Arc::new(crate::sst::file_purger::NoopFilePurger),
1463        );
1464
1465        let builder = ParquetReaderBuilder::new(
1466            FILE_DIR.to_string(),
1467            PathType::Bare,
1468            override_handle,
1469            object_store.clone(),
1470        );
1471        let mut reader = builder.build().await.unwrap().unwrap();
1472        let mut override_batches = Vec::new();
1473        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1474            override_batches.push(batch);
1475        }
1476
1477        // Compare the results
1478        assert_eq!(normal_batches.len(), override_batches.len());
1479        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1480            let expected_batch = {
1481                let mut columns = normal.columns().to_vec();
1482                let num_cols = columns.len();
1483                columns[num_cols - 2] =
1484                    Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1485                RecordBatch::try_new(normal.schema(), columns).unwrap()
1486            };
1487
1488            // Override batch should match expected batch
1489            assert_eq!(*override_batch, expected_batch);
1490        }
1491    }
1492
1493    #[tokio::test]
1494    async fn test_write_flat_read_with_inverted_index() {
1495        let mut env = TestEnv::new().await;
1496        let object_store = env.init_object_store_manager();
1497        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1498        let metadata = Arc::new(sst_region_metadata());
1499        let row_group_size = 100;
1500
1501        // Create flat format RecordBatches with non-overlapping timestamp ranges
1502        // Each batch becomes one row group (row_group_size = 100)
1503        // Data: ts tag_0 tag_1
1504        // RG 0:   0-50  [a, d]
1505        // RG 0:  50-100 [b, d]
1506        // RG 1: 100-150 [c, d]
1507        // RG 1: 150-200 [c, f]
1508        let flat_batches = vec![
1509            new_record_batch_by_range(&["a", "d"], 0, 50),
1510            new_record_batch_by_range(&["b", "d"], 50, 100),
1511            new_record_batch_by_range(&["c", "d"], 100, 150),
1512            new_record_batch_by_range(&["c", "f"], 150, 200),
1513        ];
1514
1515        let flat_source = new_flat_source_from_record_batches(flat_batches);
1516
1517        let write_opts = WriteOptions {
1518            row_group_size,
1519            ..Default::default()
1520        };
1521
1522        let indexer_builder = create_test_indexer_builder(
1523            &env,
1524            object_store.clone(),
1525            file_path.clone(),
1526            metadata.clone(),
1527            row_group_size,
1528        );
1529
1530        let info = write_flat_sst(
1531            object_store.clone(),
1532            metadata.clone(),
1533            indexer_builder,
1534            file_path.clone(),
1535            flat_source,
1536            &write_opts,
1537        )
1538        .await;
1539        assert_eq!(200, info.num_rows);
1540        assert!(info.file_size > 0);
1541        assert!(info.index_metadata.file_size > 0);
1542
1543        let handle = create_file_handle_from_sst_info(&info, &metadata);
1544
1545        let cache = create_test_cache();
1546
1547        // Test 1: Filter by tag_0 = "b"
1548        // Expected: Only rows with tag_0="b"
1549        let preds = vec![col("tag_0").eq(lit("b"))];
1550        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1551            FILE_DIR.to_string(),
1552            PathType::Bare,
1553            object_store.clone(),
1554            &metadata,
1555            HashSet::from_iter([0]),
1556            env.get_puffin_manager(),
1557        )
1558        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1559        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1560        .build(&preds)
1561        .unwrap()
1562        .map(Arc::new);
1563
1564        let builder = ParquetReaderBuilder::new(
1565            FILE_DIR.to_string(),
1566            PathType::Bare,
1567            handle.clone(),
1568            object_store.clone(),
1569        )
1570        .predicate(Some(Predicate::new(preds)))
1571        .inverted_index_appliers([inverted_index_applier.clone(), None])
1572        .cache(CacheStrategy::EnableAll(cache.clone()));
1573
1574        let mut metrics = ReaderMetrics::default();
1575        let (_context, selection) = builder
1576            .build_reader_input(&mut metrics)
1577            .await
1578            .unwrap()
1579            .unwrap();
1580
1581        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1582        assert_eq!(selection.row_group_count(), 1);
1583        assert_eq!(50, selection.get(0).unwrap().row_count());
1584
1585        // Verify filtering metrics
1586        assert_eq!(metrics.filter_metrics.rg_total, 2);
1587        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1588        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1589        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1590    }
1591
1592    #[tokio::test]
1593    async fn test_write_flat_read_with_bloom_filter() {
1594        let mut env = TestEnv::new().await;
1595        let object_store = env.init_object_store_manager();
1596        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1597        let metadata = Arc::new(sst_region_metadata());
1598        let row_group_size = 100;
1599
1600        // Create flat format RecordBatches with non-overlapping timestamp ranges
1601        // Each batch becomes one row group (row_group_size = 100)
1602        // Data: ts tag_0 tag_1
1603        // RG 0:   0-50  [a, d]
1604        // RG 0:  50-100 [b, e]
1605        // RG 1: 100-150 [c, d]
1606        // RG 1: 150-200 [c, f]
1607        let flat_batches = vec![
1608            new_record_batch_by_range(&["a", "d"], 0, 50),
1609            new_record_batch_by_range(&["b", "e"], 50, 100),
1610            new_record_batch_by_range(&["c", "d"], 100, 150),
1611            new_record_batch_by_range(&["c", "f"], 150, 200),
1612        ];
1613
1614        let flat_source = new_flat_source_from_record_batches(flat_batches);
1615
1616        let write_opts = WriteOptions {
1617            row_group_size,
1618            ..Default::default()
1619        };
1620
1621        let indexer_builder = create_test_indexer_builder(
1622            &env,
1623            object_store.clone(),
1624            file_path.clone(),
1625            metadata.clone(),
1626            row_group_size,
1627        );
1628
1629        let info = write_flat_sst(
1630            object_store.clone(),
1631            metadata.clone(),
1632            indexer_builder,
1633            file_path.clone(),
1634            flat_source,
1635            &write_opts,
1636        )
1637        .await;
1638        assert_eq!(200, info.num_rows);
1639        assert!(info.file_size > 0);
1640        assert!(info.index_metadata.file_size > 0);
1641
1642        let handle = create_file_handle_from_sst_info(&info, &metadata);
1643
1644        let cache = create_test_cache();
1645
1646        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1647        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1648        let preds = vec![
1649            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1650            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1651            col("tag_1").eq(lit("d")),
1652        ];
1653        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1654            FILE_DIR.to_string(),
1655            PathType::Bare,
1656            object_store.clone(),
1657            &metadata,
1658            env.get_puffin_manager(),
1659        )
1660        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1661        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1662        .build(&preds)
1663        .unwrap()
1664        .map(Arc::new);
1665
1666        let builder = ParquetReaderBuilder::new(
1667            FILE_DIR.to_string(),
1668            PathType::Bare,
1669            handle.clone(),
1670            object_store.clone(),
1671        )
1672        .predicate(Some(Predicate::new(preds)))
1673        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1674        .cache(CacheStrategy::EnableAll(cache.clone()));
1675
1676        let mut metrics = ReaderMetrics::default();
1677        let (_context, selection) = builder
1678            .build_reader_input(&mut metrics)
1679            .await
1680            .unwrap()
1681            .unwrap();
1682
1683        // Verify selection contains RG 0 and RG 1
1684        assert_eq!(selection.row_group_count(), 2);
1685        assert_eq!(50, selection.get(0).unwrap().row_count());
1686        assert_eq!(50, selection.get(1).unwrap().row_count());
1687
1688        // Verify filtering metrics
1689        assert_eq!(metrics.filter_metrics.rg_total, 2);
1690        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1691        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1692        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1693    }
1694
1695    #[tokio::test]
1696    async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1697        let mut env = TestEnv::new().await;
1698        let object_store = env.init_object_store_manager();
1699        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1700        let metadata = Arc::new(sst_region_metadata());
1701        let row_group_size = 10;
1702
1703        let flat_source = new_flat_source_from_record_batches(vec![
1704            new_record_batch_by_range(&["a", "d"], 0, 3),
1705            new_record_batch_by_range(&["b", "d"], 3, 10),
1706        ]);
1707        let write_opts = WriteOptions {
1708            row_group_size,
1709            ..Default::default()
1710        };
1711        let indexer_builder = create_test_indexer_builder(
1712            &env,
1713            object_store.clone(),
1714            file_path.clone(),
1715            metadata.clone(),
1716            row_group_size,
1717        );
1718        let info = write_flat_sst(
1719            object_store.clone(),
1720            metadata.clone(),
1721            indexer_builder,
1722            file_path,
1723            flat_source,
1724            &write_opts,
1725        )
1726        .await;
1727        let handle = create_file_handle_from_sst_info(&info, &metadata);
1728
1729        let builder =
1730            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1731                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1732
1733        let mut metrics = ReaderMetrics::default();
1734        let (context, _) = builder
1735            .build_reader_input(&mut metrics)
1736            .await
1737            .unwrap()
1738            .unwrap();
1739        let selection = RowGroupSelection::from_row_ranges(
1740            vec![(0, std::iter::once(0..6).collect())],
1741            row_group_size,
1742        );
1743
1744        let mut reader = ParquetReader::new(Arc::new(context), selection)
1745            .await
1746            .unwrap();
1747        check_record_batch_reader_result(
1748            &mut reader,
1749            &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1750        )
1751        .await;
1752    }
1753
1754    #[tokio::test]
1755    async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1756        let mut env = TestEnv::new().await;
1757        let object_store = env.init_object_store_manager();
1758        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1759        let metadata = Arc::new(sst_region_metadata());
1760        let row_group_size = 8;
1761
1762        let flat_source = new_flat_source_from_record_batches(vec![
1763            new_record_batch_by_range(&["a", "d"], 0, 2),
1764            new_record_batch_by_range(&["b", "d"], 2, 4),
1765            new_record_batch_by_range(&["a", "d"], 4, 6),
1766            new_record_batch_by_range(&["c", "d"], 6, 8),
1767        ]);
1768        let write_opts = WriteOptions {
1769            row_group_size,
1770            ..Default::default()
1771        };
1772        let indexer_builder = create_test_indexer_builder(
1773            &env,
1774            object_store.clone(),
1775            file_path.clone(),
1776            metadata.clone(),
1777            row_group_size,
1778        );
1779        let info = write_flat_sst(
1780            object_store.clone(),
1781            metadata.clone(),
1782            indexer_builder,
1783            file_path,
1784            flat_source,
1785            &write_opts,
1786        )
1787        .await;
1788        let handle = create_file_handle_from_sst_info(&info, &metadata);
1789
1790        let builder =
1791            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1792                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1793
1794        let mut metrics = ReaderMetrics::default();
1795        let (context, _) = builder
1796            .build_reader_input(&mut metrics)
1797            .await
1798            .unwrap()
1799            .unwrap();
1800        let selection = RowGroupSelection::from_row_ranges(
1801            vec![(0, std::iter::once(0..8).collect())],
1802            row_group_size,
1803        );
1804
1805        let mut reader = ParquetReader::new(Arc::new(context), selection)
1806            .await
1807            .unwrap();
1808        check_record_batch_reader_result(
1809            &mut reader,
1810            &[new_record_batch_from_rows(&[
1811                ("a", "d", 0),
1812                ("a", "d", 1),
1813                ("a", "d", 4),
1814                ("a", "d", 5),
1815            ])],
1816        )
1817        .await;
1818    }
1819
1820    #[tokio::test]
1821    async fn test_write_flat_read_with_inverted_index_sparse() {
1822        common_telemetry::init_default_ut_logging();
1823
1824        let mut env = TestEnv::new().await;
1825        let object_store = env.init_object_store_manager();
1826        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1827        let metadata = Arc::new(sst_region_metadata_with_encoding(
1828            PrimaryKeyEncoding::Sparse,
1829        ));
1830        let row_group_size = 100;
1831
1832        // Create flat format RecordBatches with non-overlapping timestamp ranges
1833        // Each batch becomes one row group (row_group_size = 100)
1834        // Data: ts tag_0 tag_1
1835        // RG 0:   0-50  [a, d]
1836        // RG 0:  50-100 [b, d]
1837        // RG 1: 100-150 [c, d]
1838        // RG 1: 150-200 [c, f]
1839        let flat_batches = vec![
1840            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1841            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1842            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1843            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1844        ];
1845
1846        let flat_source = new_flat_source_from_record_batches(flat_batches);
1847
1848        let write_opts = WriteOptions {
1849            row_group_size,
1850            ..Default::default()
1851        };
1852
1853        let indexer_builder = create_test_indexer_builder(
1854            &env,
1855            object_store.clone(),
1856            file_path.clone(),
1857            metadata.clone(),
1858            row_group_size,
1859        );
1860
1861        let info = write_flat_sst(
1862            object_store.clone(),
1863            metadata.clone(),
1864            indexer_builder,
1865            file_path.clone(),
1866            flat_source,
1867            &write_opts,
1868        )
1869        .await;
1870        assert_eq!(200, info.num_rows);
1871        assert!(info.file_size > 0);
1872        assert!(info.index_metadata.file_size > 0);
1873
1874        let handle = create_file_handle_from_sst_info(&info, &metadata);
1875
1876        let cache = create_test_cache();
1877
1878        // Test 1: Filter by tag_0 = "b"
1879        // Expected: Only rows with tag_0="b"
1880        let preds = vec![col("tag_0").eq(lit("b"))];
1881        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1882            FILE_DIR.to_string(),
1883            PathType::Bare,
1884            object_store.clone(),
1885            &metadata,
1886            HashSet::from_iter([0]),
1887            env.get_puffin_manager(),
1888        )
1889        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1890        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1891        .build(&preds)
1892        .unwrap()
1893        .map(Arc::new);
1894
1895        let builder = ParquetReaderBuilder::new(
1896            FILE_DIR.to_string(),
1897            PathType::Bare,
1898            handle.clone(),
1899            object_store.clone(),
1900        )
1901        .predicate(Some(Predicate::new(preds)))
1902        .inverted_index_appliers([inverted_index_applier.clone(), None])
1903        .cache(CacheStrategy::EnableAll(cache.clone()));
1904
1905        let mut metrics = ReaderMetrics::default();
1906        let (_context, selection) = builder
1907            .build_reader_input(&mut metrics)
1908            .await
1909            .unwrap()
1910            .unwrap();
1911
1912        // RG 0 has 50 matching rows (tag_0="b")
1913        assert_eq!(selection.row_group_count(), 1);
1914        assert_eq!(50, selection.get(0).unwrap().row_count());
1915
1916        // Verify filtering metrics
1917        // Note: With sparse encoding, tag columns aren't stored separately,
1918        // so minmax filtering on tags doesn't work (only inverted index)
1919        assert_eq!(metrics.filter_metrics.rg_total, 2);
1920        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1921        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1922        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1923    }
1924
1925    #[tokio::test]
1926    async fn test_write_flat_read_with_bloom_filter_sparse() {
1927        let mut env = TestEnv::new().await;
1928        let object_store = env.init_object_store_manager();
1929        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1930        let metadata = Arc::new(sst_region_metadata_with_encoding(
1931            PrimaryKeyEncoding::Sparse,
1932        ));
1933        let row_group_size = 100;
1934
1935        // Create flat format RecordBatches with non-overlapping timestamp ranges
1936        // Each batch becomes one row group (row_group_size = 100)
1937        // Data: ts tag_0 tag_1
1938        // RG 0:   0-50  [a, d]
1939        // RG 0:  50-100 [b, e]
1940        // RG 1: 100-150 [c, d]
1941        // RG 1: 150-200 [c, f]
1942        let flat_batches = vec![
1943            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1944            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1945            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1946            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1947        ];
1948
1949        let flat_source = new_flat_source_from_record_batches(flat_batches);
1950
1951        let write_opts = WriteOptions {
1952            row_group_size,
1953            ..Default::default()
1954        };
1955
1956        let indexer_builder = create_test_indexer_builder(
1957            &env,
1958            object_store.clone(),
1959            file_path.clone(),
1960            metadata.clone(),
1961            row_group_size,
1962        );
1963
1964        let info = write_flat_sst(
1965            object_store.clone(),
1966            metadata.clone(),
1967            indexer_builder,
1968            file_path.clone(),
1969            flat_source,
1970            &write_opts,
1971        )
1972        .await;
1973        assert_eq!(200, info.num_rows);
1974        assert!(info.file_size > 0);
1975        assert!(info.index_metadata.file_size > 0);
1976
1977        let handle = create_file_handle_from_sst_info(&info, &metadata);
1978
1979        let cache = create_test_cache();
1980
1981        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1982        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1983        let preds = vec![
1984            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1985            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1986            col("tag_1").eq(lit("d")),
1987        ];
1988        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1989            FILE_DIR.to_string(),
1990            PathType::Bare,
1991            object_store.clone(),
1992            &metadata,
1993            env.get_puffin_manager(),
1994        )
1995        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1996        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1997        .build(&preds)
1998        .unwrap()
1999        .map(Arc::new);
2000
2001        let builder = ParquetReaderBuilder::new(
2002            FILE_DIR.to_string(),
2003            PathType::Bare,
2004            handle.clone(),
2005            object_store.clone(),
2006        )
2007        .predicate(Some(Predicate::new(preds)))
2008        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2009        .cache(CacheStrategy::EnableAll(cache.clone()));
2010
2011        let mut metrics = ReaderMetrics::default();
2012        let (_context, selection) = builder
2013            .build_reader_input(&mut metrics)
2014            .await
2015            .unwrap()
2016            .unwrap();
2017
2018        // Verify selection contains RG 0 and RG 1
2019        assert_eq!(selection.row_group_count(), 2);
2020        assert_eq!(50, selection.get(0).unwrap().row_count());
2021        assert_eq!(50, selection.get(1).unwrap().row_count());
2022
2023        // Verify filtering metrics
2024        assert_eq!(metrics.filter_metrics.rg_total, 2);
2025        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2026        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2027        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2028    }
2029
2030    /// Creates region metadata for testing fulltext indexes.
2031    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
2032    fn fulltext_region_metadata() -> RegionMetadata {
2033        let mut builder = RegionMetadataBuilder::new(REGION_ID);
2034        builder
2035            .push_column_metadata(ColumnMetadata {
2036                column_schema: ColumnSchema::new(
2037                    "tag_0".to_string(),
2038                    ConcreteDataType::string_datatype(),
2039                    true,
2040                ),
2041                semantic_type: SemanticType::Tag,
2042                column_id: 0,
2043            })
2044            .push_column_metadata(ColumnMetadata {
2045                column_schema: ColumnSchema::new(
2046                    "text_bloom".to_string(),
2047                    ConcreteDataType::string_datatype(),
2048                    true,
2049                )
2050                .with_fulltext_options(FulltextOptions {
2051                    enable: true,
2052                    analyzer: FulltextAnalyzer::English,
2053                    case_sensitive: false,
2054                    backend: FulltextBackend::Bloom,
2055                    granularity: 1,
2056                    false_positive_rate_in_10000: 50,
2057                })
2058                .unwrap(),
2059                semantic_type: SemanticType::Field,
2060                column_id: 1,
2061            })
2062            .push_column_metadata(ColumnMetadata {
2063                column_schema: ColumnSchema::new(
2064                    "text_tantivy".to_string(),
2065                    ConcreteDataType::string_datatype(),
2066                    true,
2067                )
2068                .with_fulltext_options(FulltextOptions {
2069                    enable: true,
2070                    analyzer: FulltextAnalyzer::English,
2071                    case_sensitive: false,
2072                    backend: FulltextBackend::Tantivy,
2073                    granularity: 1,
2074                    false_positive_rate_in_10000: 50,
2075                })
2076                .unwrap(),
2077                semantic_type: SemanticType::Field,
2078                column_id: 2,
2079            })
2080            .push_column_metadata(ColumnMetadata {
2081                column_schema: ColumnSchema::new(
2082                    "field_0".to_string(),
2083                    ConcreteDataType::uint64_datatype(),
2084                    true,
2085                ),
2086                semantic_type: SemanticType::Field,
2087                column_id: 3,
2088            })
2089            .push_column_metadata(ColumnMetadata {
2090                column_schema: ColumnSchema::new(
2091                    "ts".to_string(),
2092                    ConcreteDataType::timestamp_millisecond_datatype(),
2093                    false,
2094                ),
2095                semantic_type: SemanticType::Timestamp,
2096                column_id: 4,
2097            })
2098            .primary_key(vec![0]);
2099        builder.build().unwrap()
2100    }
2101
2102    /// Creates a flat format RecordBatch with string fields for fulltext testing.
2103    fn new_fulltext_record_batch_by_range(
2104        tag: &str,
2105        text_bloom: &str,
2106        text_tantivy: &str,
2107        start: usize,
2108        end: usize,
2109    ) -> RecordBatch {
2110        assert!(end >= start);
2111        let metadata = Arc::new(fulltext_region_metadata());
2112        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2113
2114        let num_rows = end - start;
2115        let mut columns = Vec::new();
2116
2117        // Add primary key column (tag_0) as dictionary array
2118        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2119        for _ in 0..num_rows {
2120            tag_builder.append_value(tag);
2121        }
2122        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2123
2124        // Add text_bloom field (fulltext with bloom backend)
2125        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2126        columns.push(Arc::new(StringArray::from(text_bloom_values)));
2127
2128        // Add text_tantivy field (fulltext with tantivy backend)
2129        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2130        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2131
2132        // Add field column (field_0)
2133        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2134        columns.push(Arc::new(UInt64Array::from(field_values)));
2135
2136        // Add time index column (ts)
2137        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2138        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2139
2140        // Add encoded primary key column
2141        let pk = new_primary_key(&[tag]);
2142        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2143        for _ in 0..num_rows {
2144            pk_builder.append(&pk).unwrap();
2145        }
2146        columns.push(Arc::new(pk_builder.finish()));
2147
2148        // Add sequence column
2149        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2150
2151        // Add op_type column
2152        columns.push(Arc::new(UInt8Array::from_value(
2153            OpType::Put as u8,
2154            num_rows,
2155        )));
2156
2157        RecordBatch::try_new(flat_schema, columns).unwrap()
2158    }
2159
2160    #[tokio::test]
2161    async fn test_write_flat_read_with_fulltext_index() {
2162        let mut env = TestEnv::new().await;
2163        let object_store = env.init_object_store_manager();
2164        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2165        let metadata = Arc::new(fulltext_region_metadata());
2166        let row_group_size = 50;
2167
2168        // Create flat format RecordBatches with different text content
2169        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
2170        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
2171        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
2172        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
2173        let flat_batches = vec![
2174            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2175            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2176            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2177            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2178        ];
2179
2180        let flat_source = new_flat_source_from_record_batches(flat_batches);
2181
2182        let write_opts = WriteOptions {
2183            row_group_size,
2184            ..Default::default()
2185        };
2186
2187        let indexer_builder = create_test_indexer_builder(
2188            &env,
2189            object_store.clone(),
2190            file_path.clone(),
2191            metadata.clone(),
2192            row_group_size,
2193        );
2194
2195        let mut info = write_flat_sst(
2196            object_store.clone(),
2197            metadata.clone(),
2198            indexer_builder,
2199            file_path.clone(),
2200            flat_source,
2201            &write_opts,
2202        )
2203        .await;
2204        assert_eq!(200, info.num_rows);
2205        assert!(info.file_size > 0);
2206        assert!(info.index_metadata.file_size > 0);
2207
2208        // Verify fulltext indexes were created
2209        assert!(info.index_metadata.fulltext_index.index_size > 0);
2210        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2211        // text_bloom (column_id 1) and text_tantivy (column_id 2)
2212        info.index_metadata.fulltext_index.columns.sort_unstable();
2213        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2214
2215        assert_eq!(
2216            (
2217                Timestamp::new_millisecond(0),
2218                Timestamp::new_millisecond(199)
2219            ),
2220            info.time_range
2221        );
2222
2223        let handle = create_file_handle_from_sst_info(&info, &metadata);
2224
2225        let cache = create_test_cache();
2226
2227        // Helper functions to create fulltext function expressions
2228        let matches_func = || {
2229            Arc::new(
2230                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2231                    .provide(Default::default()),
2232            )
2233        };
2234
2235        let matches_term_func = || {
2236            Arc::new(
2237                ScalarFunctionFactory::from(
2238                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2239                )
2240                .provide(Default::default()),
2241            )
2242        };
2243
2244        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2245        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2246        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2247            args: vec![col("text_bloom"), "hello".lit()],
2248            func: matches_term_func(),
2249        })];
2250
2251        let fulltext_applier = FulltextIndexApplierBuilder::new(
2252            FILE_DIR.to_string(),
2253            PathType::Bare,
2254            object_store.clone(),
2255            env.get_puffin_manager(),
2256            &metadata,
2257        )
2258        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2259        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2260        .build(&preds)
2261        .unwrap()
2262        .map(Arc::new);
2263
2264        let builder = ParquetReaderBuilder::new(
2265            FILE_DIR.to_string(),
2266            PathType::Bare,
2267            handle.clone(),
2268            object_store.clone(),
2269        )
2270        .predicate(Some(Predicate::new(preds)))
2271        .fulltext_index_appliers([None, fulltext_applier.clone()])
2272        .cache(CacheStrategy::EnableAll(cache.clone()));
2273
2274        let mut metrics = ReaderMetrics::default();
2275        let (_context, selection) = builder
2276            .build_reader_input(&mut metrics)
2277            .await
2278            .unwrap()
2279            .unwrap();
2280
2281        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2282        assert_eq!(selection.row_group_count(), 2);
2283        assert_eq!(50, selection.get(0).unwrap().row_count());
2284        assert_eq!(50, selection.get(1).unwrap().row_count());
2285
2286        // Verify filtering metrics
2287        assert_eq!(metrics.filter_metrics.rg_total, 4);
2288        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2289        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2290        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2291
2292        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2293        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2294        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2295            args: vec![col("text_tantivy"), "lazy".lit()],
2296            func: matches_func(),
2297        })];
2298
2299        let fulltext_applier = FulltextIndexApplierBuilder::new(
2300            FILE_DIR.to_string(),
2301            PathType::Bare,
2302            object_store.clone(),
2303            env.get_puffin_manager(),
2304            &metadata,
2305        )
2306        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2307        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2308        .build(&preds)
2309        .unwrap()
2310        .map(Arc::new);
2311
2312        let builder = ParquetReaderBuilder::new(
2313            FILE_DIR.to_string(),
2314            PathType::Bare,
2315            handle.clone(),
2316            object_store.clone(),
2317        )
2318        .predicate(Some(Predicate::new(preds)))
2319        .fulltext_index_appliers([None, fulltext_applier.clone()])
2320        .cache(CacheStrategy::EnableAll(cache.clone()));
2321
2322        let mut metrics = ReaderMetrics::default();
2323        let (_context, selection) = builder
2324            .build_reader_input(&mut metrics)
2325            .await
2326            .unwrap()
2327            .unwrap();
2328
2329        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2330        assert_eq!(selection.row_group_count(), 2);
2331        assert_eq!(50, selection.get(2).unwrap().row_count());
2332        assert_eq!(50, selection.get(3).unwrap().row_count());
2333
2334        // Verify filtering metrics
2335        assert_eq!(metrics.filter_metrics.rg_total, 4);
2336        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2337        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2338        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2339    }
2340}