Skip to main content

mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod async_reader;
28pub mod file_range;
29pub mod flat_format;
30pub mod format;
31pub(crate) mod helper;
32pub(crate) mod metadata;
33pub mod prefilter;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41/// Key of metadata in parquet SST.
42pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44/// Default batch size to read parquet files.
45///
46/// This is a runtime-only scan granularity, so we align it with DataFusion's
47/// default execution batch size to reduce rebatching and concatenation in the
48/// query pipeline.
49pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50/// Default row group size for parquet files.
51///
52/// Keep the existing persisted/on-disk default stable. It intentionally stays
53/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
54/// batching without changing the row group layout of newly written SSTs.
55pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57/// Parquet write options.
58#[derive(Debug, Clone)]
59pub struct WriteOptions {
60    /// Buffer size for async writer.
61    pub write_buffer_size: ReadableSize,
62    /// Row group size.
63    pub row_group_size: usize,
64    /// Max single output file size.
65    /// Note: This is not a hard limit as we can only observe the file size when
66    /// ArrowWrite writes to underlying writers.
67    pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71    fn default() -> Self {
72        WriteOptions {
73            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74            row_group_size: DEFAULT_ROW_GROUP_SIZE,
75            max_file_size: None,
76        }
77    }
78}
79
80/// Parquet SST info returned by the writer.
81#[derive(Debug, Default)]
82pub struct SstInfo {
83    /// SST file id.
84    pub file_id: FileId,
85    /// Time range of the SST. The timestamps have the same time unit as the
86    /// data in the SST.
87    pub time_range: FileTimeRange,
88    /// File size in bytes.
89    pub file_size: u64,
90    /// Maximum uncompressed row group size in bytes. 0 if unknown.
91    pub max_row_group_uncompressed_size: u64,
92    /// Number of rows.
93    pub num_rows: usize,
94    /// Number of row groups
95    pub num_row_groups: u64,
96    /// File Meta Data
97    pub file_metadata: Option<Arc<ParquetMetaData>>,
98    /// Index Meta Data
99    pub index_metadata: IndexOutput,
100    /// Number of series
101    pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106    use std::collections::HashSet;
107    use std::sync::Arc;
108
109    use api::v1::{OpType, SemanticType};
110    use common_function::function::FunctionRef;
111    use common_function::function_factory::ScalarFunctionFactory;
112    use common_function::scalars::matches::MatchesFunction;
113    use common_function::scalars::matches_term::MatchesTermFunction;
114    use common_time::Timestamp;
115    use datafusion_common::{Column, ScalarValue};
116    use datafusion_expr::expr::ScalarFunction;
117    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118    use datatypes::arrow;
119    use datatypes::arrow::array::{
120        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121        TimestampMillisecondArray, UInt8Array, UInt64Array,
122    };
123    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124    use datatypes::arrow::util::pretty::pretty_format_batches;
125    use datatypes::prelude::ConcreteDataType;
126    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127    use object_store::ObjectStore;
128    use parquet::arrow::AsyncArrowWriter;
129    use parquet::basic::{Compression, Encoding, ZstdLevel};
130    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131    use parquet::file::properties::WriterProperties;
132    use store_api::codec::PrimaryKeyEncoding;
133    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134    use store_api::region_request::PathType;
135    use store_api::storage::{ColumnSchema, RegionId};
136    use table::predicate::Predicate;
137    use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139    use super::*;
140    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141    use crate::cache::test_util::assert_parquet_metadata_equal;
142    use crate::cache::{CacheManager, CacheStrategy, PageKey};
143    use crate::config::IndexConfig;
144    use crate::read::FlatSource;
145    use crate::region::options::{IndexOptions, InvertedIndexOptions};
146    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
147    use crate::sst::file_purger::NoopFilePurger;
148    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
149    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
150    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
151    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
152    use crate::sst::parquet::flat_format::FlatWriteFormat;
153    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
154    use crate::sst::parquet::row_selection::RowGroupSelection;
155    use crate::sst::parquet::writer::ParquetWriter;
156    use crate::sst::{
157        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
158    };
159    use crate::test_util::TestEnv;
160    use crate::test_util::sst_util::{
161        build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
162        new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
163        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
164        sst_region_metadata_with_encoding,
165    };
166
167    const FILE_DIR: &str = "/";
168    const REGION_ID: RegionId = RegionId::new(0, 0);
169
170    #[derive(Clone)]
171    struct FixedPathProvider {
172        region_file_id: RegionFileId,
173    }
174
175    impl FilePathProvider for FixedPathProvider {
176        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
177            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
178        }
179
180        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
181            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
182        }
183
184        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
185            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
186        }
187    }
188
189    struct NoopIndexBuilder;
190
191    #[async_trait::async_trait]
192    impl IndexerBuilder for NoopIndexBuilder {
193        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
194            Indexer::default()
195        }
196    }
197
198    #[tokio::test]
199    async fn test_write_read() {
200        let mut env = TestEnv::new().await;
201        let object_store = env.init_object_store_manager();
202        let handle = sst_file_handle(0, 1000);
203        let file_path = FixedPathProvider {
204            region_file_id: handle.file_id(),
205        };
206        let metadata = Arc::new(sst_region_metadata());
207        let source = new_flat_source_from_record_batches(vec![
208            new_record_batch_by_range(&["a", "d"], 0, 60),
209            new_record_batch_by_range(&["b", "f"], 0, 40),
210            new_record_batch_by_range(&["b", "h"], 100, 200),
211        ]);
212        // Use a small row group size for test.
213        let write_opts = WriteOptions {
214            row_group_size: 50,
215            ..Default::default()
216        };
217
218        let mut metrics = Metrics::new(WriteType::Flush);
219        let mut writer = ParquetWriter::new_with_object_store(
220            object_store.clone(),
221            metadata.clone(),
222            IndexConfig::default(),
223            NoopIndexBuilder,
224            file_path,
225            &mut metrics,
226        )
227        .await;
228
229        let info = writer
230            .write_all_flat_as_primary_key(source, None, &write_opts)
231            .await
232            .unwrap()
233            .remove(0);
234        assert_eq!(200, info.num_rows);
235        assert!(info.file_size > 0);
236        assert_eq!(
237            (
238                Timestamp::new_millisecond(0),
239                Timestamp::new_millisecond(199)
240            ),
241            info.time_range
242        );
243
244        let builder = ParquetReaderBuilder::new(
245            FILE_DIR.to_string(),
246            PathType::Bare,
247            handle.clone(),
248            object_store,
249        );
250        let mut reader = builder.build().await.unwrap().unwrap();
251        check_record_batch_reader_result(
252            &mut reader,
253            &[
254                new_record_batch_by_range(&["a", "d"], 0, 50),
255                new_record_batch_by_range(&["a", "d"], 50, 60),
256                new_record_batch_by_range(&["b", "f"], 0, 40),
257                new_record_batch_by_range(&["b", "h"], 100, 150),
258                new_record_batch_by_range(&["b", "h"], 150, 200),
259            ],
260        )
261        .await;
262    }
263
264    #[tokio::test]
265    async fn test_read_with_cache() {
266        let mut env = TestEnv::new().await;
267        let object_store = env.init_object_store_manager();
268        let handle = sst_file_handle(0, 1000);
269        let metadata = Arc::new(sst_region_metadata());
270        let source = new_flat_source_from_record_batches(vec![
271            new_record_batch_by_range(&["a", "d"], 0, 60),
272            new_record_batch_by_range(&["b", "f"], 0, 40),
273            new_record_batch_by_range(&["b", "h"], 100, 200),
274        ]);
275        // Use a small row group size for test.
276        let write_opts = WriteOptions {
277            row_group_size: 50,
278            ..Default::default()
279        };
280        // Prepare data.
281        let mut metrics = Metrics::new(WriteType::Flush);
282        let mut writer = ParquetWriter::new_with_object_store(
283            object_store.clone(),
284            metadata.clone(),
285            IndexConfig::default(),
286            NoopIndexBuilder,
287            FixedPathProvider {
288                region_file_id: handle.file_id(),
289            },
290            &mut metrics,
291        )
292        .await;
293
294        let sst_info = writer
295            .write_all_flat_as_primary_key(source, None, &write_opts)
296            .await
297            .unwrap()
298            .remove(0);
299
300        // Enable page cache.
301        let cache = CacheStrategy::EnableAll(Arc::new(
302            CacheManager::builder()
303                .page_cache_size(64 * 1024 * 1024)
304                .build(),
305        ));
306        let builder = ParquetReaderBuilder::new(
307            FILE_DIR.to_string(),
308            PathType::Bare,
309            handle.clone(),
310            object_store,
311        )
312        .cache(cache.clone());
313        for _ in 0..3 {
314            let mut reader = builder.build().await.unwrap().unwrap();
315            check_record_batch_reader_result(
316                &mut reader,
317                &[
318                    new_record_batch_by_range(&["a", "d"], 0, 50),
319                    new_record_batch_by_range(&["a", "d"], 50, 60),
320                    new_record_batch_by_range(&["b", "f"], 0, 40),
321                    new_record_batch_by_range(&["b", "h"], 100, 150),
322                    new_record_batch_by_range(&["b", "h"], 150, 200),
323                ],
324            )
325            .await;
326        }
327
328        let parquet_meta = sst_info.file_metadata.unwrap();
329        let get_ranges = |row_group_idx: usize| {
330            let row_group = parquet_meta.row_group(row_group_idx);
331            let mut ranges = Vec::with_capacity(row_group.num_columns());
332            for i in 0..row_group.num_columns() {
333                let (start, length) = row_group.column(i).byte_range();
334                ranges.push(start..start + length);
335            }
336
337            ranges
338        };
339
340        // Cache 4 row groups.
341        for i in 0..4 {
342            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
343            assert!(cache.get_pages(&page_key).is_some());
344        }
345        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
346        assert!(cache.get_pages(&page_key).is_none());
347    }
348
349    #[tokio::test]
350    async fn test_parquet_metadata_eq() {
351        // create test env
352        let mut env = crate::test_util::TestEnv::new().await;
353        let object_store = env.init_object_store_manager();
354        let handle = sst_file_handle(0, 1000);
355        let metadata = Arc::new(sst_region_metadata());
356        let source = new_flat_source_from_record_batches(vec![
357            new_record_batch_by_range(&["a", "d"], 0, 60),
358            new_record_batch_by_range(&["b", "f"], 0, 40),
359            new_record_batch_by_range(&["b", "h"], 100, 200),
360        ]);
361        let write_opts = WriteOptions {
362            row_group_size: 50,
363            ..Default::default()
364        };
365
366        // write the sst file and get sst info
367        // sst info contains the parquet metadata, which is converted from FileMetaData
368        let mut metrics = Metrics::new(WriteType::Flush);
369        let mut writer = ParquetWriter::new_with_object_store(
370            object_store.clone(),
371            metadata.clone(),
372            IndexConfig::default(),
373            NoopIndexBuilder,
374            FixedPathProvider {
375                region_file_id: handle.file_id(),
376            },
377            &mut metrics,
378        )
379        .await;
380
381        let sst_info = writer
382            .write_all_flat_as_primary_key(source, None, &write_opts)
383            .await
384            .unwrap()
385            .remove(0);
386        let writer_metadata = sst_info.file_metadata.unwrap();
387
388        // read the sst file metadata
389        let builder = ParquetReaderBuilder::new(
390            FILE_DIR.to_string(),
391            PathType::Bare,
392            handle.clone(),
393            object_store,
394        )
395        .page_index_policy(PageIndexPolicy::Optional);
396        let reader = builder.build().await.unwrap().unwrap();
397        let reader_metadata = reader.parquet_metadata();
398        let cached_writer_metadata =
399            crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
400                .unwrap()
401                .parquet_metadata();
402
403        assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
404    }
405
406    #[tokio::test]
407    async fn test_read_with_tag_filter() {
408        let mut env = TestEnv::new().await;
409        let object_store = env.init_object_store_manager();
410        let handle = sst_file_handle(0, 1000);
411        let metadata = Arc::new(sst_region_metadata());
412        let source = new_flat_source_from_record_batches(vec![
413            new_record_batch_by_range(&["a", "d"], 0, 60),
414            new_record_batch_by_range(&["b", "f"], 0, 40),
415            new_record_batch_by_range(&["b", "h"], 100, 200),
416        ]);
417        // Use a small row group size for test.
418        let write_opts = WriteOptions {
419            row_group_size: 50,
420            ..Default::default()
421        };
422        // Prepare data.
423        let mut metrics = Metrics::new(WriteType::Flush);
424        let mut writer = ParquetWriter::new_with_object_store(
425            object_store.clone(),
426            metadata.clone(),
427            IndexConfig::default(),
428            NoopIndexBuilder,
429            FixedPathProvider {
430                region_file_id: handle.file_id(),
431            },
432            &mut metrics,
433        )
434        .await;
435        writer
436            .write_all_flat_as_primary_key(source, None, &write_opts)
437            .await
438            .unwrap()
439            .remove(0);
440
441        // Predicate
442        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
444            op: Operator::Eq,
445            right: Box::new("a".lit()),
446        })]));
447
448        let builder = ParquetReaderBuilder::new(
449            FILE_DIR.to_string(),
450            PathType::Bare,
451            handle.clone(),
452            object_store,
453        )
454        .predicate(predicate);
455        let mut reader = builder.build().await.unwrap().unwrap();
456        check_record_batch_reader_result(
457            &mut reader,
458            &[
459                new_record_batch_by_range(&["a", "d"], 0, 50),
460                new_record_batch_by_range(&["a", "d"], 50, 60),
461            ],
462        )
463        .await;
464    }
465
466    #[tokio::test]
467    async fn test_read_empty_batch() {
468        let mut env = TestEnv::new().await;
469        let object_store = env.init_object_store_manager();
470        let handle = sst_file_handle(0, 1000);
471        let metadata = Arc::new(sst_region_metadata());
472        let source = new_flat_source_from_record_batches(vec![
473            new_record_batch_by_range(&["a", "z"], 0, 0),
474            new_record_batch_by_range(&["a", "z"], 100, 100),
475            new_record_batch_by_range(&["a", "z"], 200, 230),
476        ]);
477        // Use a small row group size for test.
478        let write_opts = WriteOptions {
479            row_group_size: 50,
480            ..Default::default()
481        };
482        // Prepare data.
483        let mut metrics = Metrics::new(WriteType::Flush);
484        let mut writer = ParquetWriter::new_with_object_store(
485            object_store.clone(),
486            metadata.clone(),
487            IndexConfig::default(),
488            NoopIndexBuilder,
489            FixedPathProvider {
490                region_file_id: handle.file_id(),
491            },
492            &mut metrics,
493        )
494        .await;
495        writer
496            .write_all_flat_as_primary_key(source, None, &write_opts)
497            .await
498            .unwrap()
499            .remove(0);
500
501        let builder = ParquetReaderBuilder::new(
502            FILE_DIR.to_string(),
503            PathType::Bare,
504            handle.clone(),
505            object_store,
506        );
507        let mut reader = builder.build().await.unwrap().unwrap();
508        check_record_batch_reader_result(
509            &mut reader,
510            &[new_record_batch_by_range(&["a", "z"], 200, 230)],
511        )
512        .await;
513    }
514
515    #[tokio::test]
516    async fn test_read_with_field_filter() {
517        let mut env = TestEnv::new().await;
518        let object_store = env.init_object_store_manager();
519        let handle = sst_file_handle(0, 1000);
520        let metadata = Arc::new(sst_region_metadata());
521        let source = new_flat_source_from_record_batches(vec![
522            new_record_batch_by_range(&["a", "d"], 0, 60),
523            new_record_batch_by_range(&["b", "f"], 0, 40),
524            new_record_batch_by_range(&["b", "h"], 100, 200),
525        ]);
526        // Use a small row group size for test.
527        let write_opts = WriteOptions {
528            row_group_size: 50,
529            ..Default::default()
530        };
531        // Prepare data.
532        let mut metrics = Metrics::new(WriteType::Flush);
533        let mut writer = ParquetWriter::new_with_object_store(
534            object_store.clone(),
535            metadata.clone(),
536            IndexConfig::default(),
537            NoopIndexBuilder,
538            FixedPathProvider {
539                region_file_id: handle.file_id(),
540            },
541            &mut metrics,
542        )
543        .await;
544
545        writer
546            .write_all_flat_as_primary_key(source, None, &write_opts)
547            .await
548            .unwrap()
549            .remove(0);
550
551        // Predicate
552        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
553            left: Box::new(Expr::Column(Column::from_name("field_0"))),
554            op: Operator::GtEq,
555            right: Box::new(150u64.lit()),
556        })]));
557
558        let builder = ParquetReaderBuilder::new(
559            FILE_DIR.to_string(),
560            PathType::Bare,
561            handle.clone(),
562            object_store,
563        )
564        .predicate(predicate);
565        let mut reader = builder.build().await.unwrap().unwrap();
566        check_record_batch_reader_result(
567            &mut reader,
568            &[new_record_batch_by_range(&["b", "h"], 150, 200)],
569        )
570        .await;
571    }
572
573    #[tokio::test]
574    async fn test_read_large_binary() {
575        let mut env = TestEnv::new().await;
576        let object_store = env.init_object_store_manager();
577        let handle = sst_file_handle(0, 1000);
578        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
579
580        let write_opts = WriteOptions {
581            row_group_size: 50,
582            ..Default::default()
583        };
584
585        let metadata = build_test_binary_test_region_metadata();
586        let json = metadata.to_json().unwrap();
587        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
588
589        let props_builder = WriterProperties::builder()
590            .set_key_value_metadata(Some(vec![key_value_meta]))
591            .set_compression(Compression::ZSTD(ZstdLevel::default()))
592            .set_encoding(Encoding::PLAIN)
593            .set_max_row_group_size(write_opts.row_group_size);
594
595        let writer_props = props_builder.build();
596
597        let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
598        let fields: Vec<_> = write_format
599            .arrow_schema()
600            .fields()
601            .into_iter()
602            .map(|field| {
603                let data_type = field.data_type().clone();
604                if data_type == DataType::Binary {
605                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
606                } else {
607                    Field::new(field.name(), data_type, field.is_nullable())
608                }
609            })
610            .collect();
611
612        let arrow_schema = Arc::new(Schema::new(fields));
613
614        // Ensures field_0 has LargeBinary type.
615        assert_eq!(
616            &DataType::LargeBinary,
617            arrow_schema.field_with_name("field_0").unwrap().data_type()
618        );
619        let mut writer = AsyncArrowWriter::try_new(
620            object_store
621                .writer_with(&file_path)
622                .concurrent(DEFAULT_WRITE_CONCURRENCY)
623                .await
624                .map(|w| w.into_futures_async_write().compat_write())
625                .unwrap(),
626            arrow_schema.clone(),
627            Some(writer_props),
628        )
629        .unwrap();
630
631        let batch = new_record_batch_with_binary(&["a"], 0, 60);
632        let arrays: Vec<_> = batch
633            .columns()
634            .iter()
635            .map(|array| {
636                let data_type = array.data_type().clone();
637                if data_type == DataType::Binary {
638                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
639                } else {
640                    array.clone()
641                }
642            })
643            .collect();
644        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
645
646        writer.write(&result).await.unwrap();
647        writer.close().await.unwrap();
648
649        let builder = ParquetReaderBuilder::new(
650            FILE_DIR.to_string(),
651            PathType::Bare,
652            handle.clone(),
653            object_store,
654        );
655        let mut reader = builder.build().await.unwrap().unwrap();
656        check_record_batch_reader_result(
657            &mut reader,
658            &[
659                new_record_batch_with_binary(&["a"], 0, 50),
660                new_record_batch_with_binary(&["a"], 50, 60),
661            ],
662        )
663        .await;
664    }
665
666    #[tokio::test]
667    async fn test_write_multiple_files() {
668        common_telemetry::init_default_ut_logging();
669        // create test env
670        let mut env = TestEnv::new().await;
671        let object_store = env.init_object_store_manager();
672        let metadata = Arc::new(sst_region_metadata());
673        let batches = vec![
674            new_record_batch_by_range(&["a", "d"], 0, 1000),
675            new_record_batch_by_range(&["b", "f"], 0, 1000),
676            new_record_batch_by_range(&["c", "g"], 0, 1000),
677            new_record_batch_by_range(&["b", "h"], 100, 200),
678            new_record_batch_by_range(&["b", "h"], 200, 300),
679            new_record_batch_by_range(&["b", "h"], 300, 1000),
680        ];
681        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
682
683        let source = new_flat_source_from_record_batches(batches);
684        let write_opts = WriteOptions {
685            row_group_size: 50,
686            max_file_size: Some(1024 * 16),
687            ..Default::default()
688        };
689
690        let path_provider = RegionFilePathFactory {
691            table_dir: "test".to_string(),
692            path_type: PathType::Bare,
693        };
694        let mut metrics = Metrics::new(WriteType::Flush);
695        let mut writer = ParquetWriter::new_with_object_store(
696            object_store.clone(),
697            metadata.clone(),
698            IndexConfig::default(),
699            NoopIndexBuilder,
700            path_provider,
701            &mut metrics,
702        )
703        .await;
704
705        let files = writer
706            .write_all_flat_as_primary_key(source, None, &write_opts)
707            .await
708            .unwrap();
709        assert_eq!(2, files.len());
710
711        let mut rows_read = 0;
712        for f in &files {
713            let file_handle = sst_file_handle_with_file_id(
714                f.file_id,
715                f.time_range.0.value(),
716                f.time_range.1.value(),
717            );
718            let builder = ParquetReaderBuilder::new(
719                "test".to_string(),
720                PathType::Bare,
721                file_handle,
722                object_store.clone(),
723            );
724            let mut reader = builder.build().await.unwrap().unwrap();
725            while let Some(batch) = reader.next_record_batch().await.unwrap() {
726                rows_read += batch.num_rows();
727            }
728        }
729        assert_eq!(total_rows, rows_read);
730    }
731
732    #[tokio::test]
733    async fn test_write_read_with_index() {
734        let mut env = TestEnv::new().await;
735        let object_store = env.init_object_store_manager();
736        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
737        let metadata = Arc::new(sst_region_metadata());
738        let row_group_size = 50;
739
740        let source = new_flat_source_from_record_batches(vec![
741            new_record_batch_by_range(&["a", "d"], 0, 20),
742            new_record_batch_by_range(&["b", "d"], 0, 20),
743            new_record_batch_by_range(&["c", "d"], 0, 20),
744            new_record_batch_by_range(&["c", "f"], 0, 40),
745            new_record_batch_by_range(&["c", "h"], 100, 200),
746        ]);
747        // Use a small row group size for test.
748        let write_opts = WriteOptions {
749            row_group_size,
750            ..Default::default()
751        };
752
753        let puffin_manager = env
754            .get_puffin_manager()
755            .build(object_store.clone(), file_path.clone());
756        let intermediate_manager = env.get_intermediate_manager();
757
758        let indexer_builder = IndexerBuilderImpl {
759            build_type: IndexBuildType::Flush,
760            metadata: metadata.clone(),
761            row_group_size,
762            puffin_manager,
763            write_cache_enabled: false,
764            intermediate_manager,
765            index_options: IndexOptions {
766                inverted_index: InvertedIndexOptions {
767                    segment_row_count: 1,
768                    ..Default::default()
769                },
770            },
771            inverted_index_config: Default::default(),
772            fulltext_index_config: Default::default(),
773            bloom_filter_index_config: Default::default(),
774            #[cfg(feature = "vector_index")]
775            vector_index_config: Default::default(),
776        };
777
778        let mut metrics = Metrics::new(WriteType::Flush);
779        let mut writer = ParquetWriter::new_with_object_store(
780            object_store.clone(),
781            metadata.clone(),
782            IndexConfig::default(),
783            indexer_builder,
784            file_path.clone(),
785            &mut metrics,
786        )
787        .await;
788
789        let info = writer
790            .write_all_flat_as_primary_key(source, None, &write_opts)
791            .await
792            .unwrap()
793            .remove(0);
794        assert_eq!(200, info.num_rows);
795        assert!(info.file_size > 0);
796        assert!(info.index_metadata.file_size > 0);
797
798        assert!(info.index_metadata.inverted_index.index_size > 0);
799        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
800        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
801
802        assert!(info.index_metadata.bloom_filter.index_size > 0);
803        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
804        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
805
806        assert_eq!(
807            (
808                Timestamp::new_millisecond(0),
809                Timestamp::new_millisecond(199)
810            ),
811            info.time_range
812        );
813
814        let handle = FileHandle::new(
815            FileMeta {
816                region_id: metadata.region_id,
817                file_id: info.file_id,
818                time_range: info.time_range,
819                level: 0,
820                file_size: info.file_size,
821                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
822                available_indexes: info.index_metadata.build_available_indexes(),
823                indexes: info.index_metadata.build_indexes(),
824                index_file_size: info.index_metadata.file_size,
825                index_version: 0,
826                num_row_groups: info.num_row_groups,
827                num_rows: info.num_rows as u64,
828                sequence: None,
829                partition_expr: match &metadata.partition_expr {
830                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
831                        .expect("partition expression should be valid JSON"),
832                    None => None,
833                },
834                num_series: 0,
835                ..Default::default()
836            },
837            Arc::new(NoopFilePurger),
838        );
839
840        let cache = Arc::new(
841            CacheManager::builder()
842                .index_result_cache_size(1024 * 1024)
843                .index_metadata_size(1024 * 1024)
844                .index_content_page_size(1024 * 1024)
845                .index_content_size(1024 * 1024)
846                .puffin_metadata_size(1024 * 1024)
847                .build(),
848        );
849        let index_result_cache = cache.index_result_cache().unwrap();
850
851        let build_inverted_index_applier = |exprs: &[Expr]| {
852            InvertedIndexApplierBuilder::new(
853                FILE_DIR.to_string(),
854                PathType::Bare,
855                object_store.clone(),
856                &metadata,
857                HashSet::from_iter([0]),
858                env.get_puffin_manager(),
859            )
860            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
861            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
862            .build(exprs)
863            .unwrap()
864            .map(Arc::new)
865        };
866
867        let build_bloom_filter_applier = |exprs: &[Expr]| {
868            BloomFilterIndexApplierBuilder::new(
869                FILE_DIR.to_string(),
870                PathType::Bare,
871                object_store.clone(),
872                &metadata,
873                env.get_puffin_manager(),
874            )
875            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
876            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
877            .build(exprs)
878            .unwrap()
879            .map(Arc::new)
880        };
881
882        // Data: ts tag_0 tag_1
883        // Data: 0-20 [a, d]
884        //       0-20 [b, d]
885        //       0-20 [c, d]
886        //       0-40 [c, f]
887        //    100-200 [c, h]
888        //
889        // Pred: tag_0 = "b"
890        //
891        // Row groups & rows pruning:
892        //
893        // Row Groups:
894        // - min-max: filter out row groups 1..=3
895        //
896        // Rows:
897        // - inverted index: hit row group 0, hit 20 rows
898        let preds = vec![col("tag_0").eq(lit("b"))];
899        let inverted_index_applier = build_inverted_index_applier(&preds);
900        let bloom_filter_applier = build_bloom_filter_applier(&preds);
901
902        let builder = ParquetReaderBuilder::new(
903            FILE_DIR.to_string(),
904            PathType::Bare,
905            handle.clone(),
906            object_store.clone(),
907        )
908        .predicate(Some(Predicate::new(preds)))
909        .inverted_index_appliers([inverted_index_applier.clone(), None])
910        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
911        .cache(CacheStrategy::EnableAll(cache.clone()));
912
913        let mut metrics = ReaderMetrics::default();
914        let (context, selection) = builder
915            .build_reader_input(&mut metrics)
916            .await
917            .unwrap()
918            .unwrap();
919        let mut reader = ParquetReader::new(Arc::new(context), selection)
920            .await
921            .unwrap();
922        check_record_batch_reader_result(
923            &mut reader,
924            &[new_record_batch_by_range(&["b", "d"], 0, 20)],
925        )
926        .await;
927
928        assert_eq!(metrics.filter_metrics.rg_total, 4);
929        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
930        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
931        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
932        let plan = inverted_index_applier
933            .as_ref()
934            .unwrap()
935            .plan_for_sst(&metadata)
936            .unwrap()
937            .unwrap();
938        let cached = index_result_cache
939            .get(&plan.predicate_key, handle.file_id().file_id())
940            .unwrap();
941        // inverted index will search all row groups
942        assert!(cached.contains_row_group(0));
943        assert!(cached.contains_row_group(1));
944        assert!(cached.contains_row_group(2));
945        assert!(cached.contains_row_group(3));
946
947        // Data: ts tag_0 tag_1
948        // Data: 0-20 [a, d]
949        //       0-20 [b, d]
950        //       0-20 [c, d]
951        //       0-40 [c, f]
952        //    100-200 [c, h]
953        //
954        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
955        //
956        // Row groups & rows pruning:
957        //
958        // Row Groups:
959        // - min-max: filter out row groups 0..=1
960        // - bloom filter: filter out row groups 2..=3
961        let preds = vec![
962            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
963            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
964            col("tag_1").eq(lit("d")),
965        ];
966        let inverted_index_applier = build_inverted_index_applier(&preds);
967        let bloom_filter_applier = build_bloom_filter_applier(&preds);
968
969        let builder = ParquetReaderBuilder::new(
970            FILE_DIR.to_string(),
971            PathType::Bare,
972            handle.clone(),
973            object_store.clone(),
974        )
975        .predicate(Some(Predicate::new(preds)))
976        .inverted_index_appliers([inverted_index_applier.clone(), None])
977        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
978        .cache(CacheStrategy::EnableAll(cache.clone()));
979
980        let mut metrics = ReaderMetrics::default();
981        let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
982        assert!(read_input.is_none());
983
984        assert_eq!(metrics.filter_metrics.rg_total, 4);
985        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
986        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
987        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
988        let cached = index_result_cache
989            .get(
990                bloom_filter_applier.unwrap().predicate_key(),
991                handle.file_id().file_id(),
992            )
993            .unwrap();
994        assert!(cached.contains_row_group(2));
995        assert!(cached.contains_row_group(3));
996        assert!(!cached.contains_row_group(0));
997        assert!(!cached.contains_row_group(1));
998
999        // Remove the pred of `ts`, continue to use the pred of `tag_1`
1000        // to test if cache works.
1001
1002        // Data: ts tag_0 tag_1
1003        // Data: 0-20 [a, d]
1004        //       0-20 [b, d]
1005        //       0-20 [c, d]
1006        //       0-40 [c, f]
1007        //    100-200 [c, h]
1008        //
1009        // Pred: tag_1 = "d"
1010        //
1011        // Row groups & rows pruning:
1012        //
1013        // Row Groups:
1014        // - bloom filter: filter out row groups 2..=3
1015        //
1016        // Rows:
1017        // - bloom filter: hit row group 0, hit 50 rows
1018        //                 hit row group 1, hit 10 rows
1019        let preds = vec![col("tag_1").eq(lit("d"))];
1020        let inverted_index_applier = build_inverted_index_applier(&preds);
1021        let bloom_filter_applier = build_bloom_filter_applier(&preds);
1022
1023        let builder = ParquetReaderBuilder::new(
1024            FILE_DIR.to_string(),
1025            PathType::Bare,
1026            handle.clone(),
1027            object_store.clone(),
1028        )
1029        .predicate(Some(Predicate::new(preds)))
1030        .inverted_index_appliers([inverted_index_applier.clone(), None])
1031        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1032        .cache(CacheStrategy::EnableAll(cache.clone()));
1033
1034        let mut metrics = ReaderMetrics::default();
1035        let (context, selection) = builder
1036            .build_reader_input(&mut metrics)
1037            .await
1038            .unwrap()
1039            .unwrap();
1040        let mut reader = ParquetReader::new(Arc::new(context), selection)
1041            .await
1042            .unwrap();
1043        check_record_batch_reader_result(
1044            &mut reader,
1045            &[
1046                new_record_batch_by_range(&["a", "d"], 0, 20),
1047                new_record_batch_by_range(&["b", "d"], 0, 20),
1048                new_record_batch_by_range(&["c", "d"], 0, 10),
1049                new_record_batch_by_range(&["c", "d"], 10, 20),
1050            ],
1051        )
1052        .await;
1053
1054        assert_eq!(metrics.filter_metrics.rg_total, 4);
1055        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1056        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1057        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1058        let cached = index_result_cache
1059            .get(
1060                bloom_filter_applier.unwrap().predicate_key(),
1061                handle.file_id().file_id(),
1062            )
1063            .unwrap();
1064        assert!(cached.contains_row_group(0));
1065        assert!(cached.contains_row_group(1));
1066        assert!(cached.contains_row_group(2));
1067        assert!(cached.contains_row_group(3));
1068    }
1069
1070    fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1071        assert!(end >= start);
1072        let metadata = build_test_binary_test_region_metadata();
1073        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1074
1075        let num_rows = end - start;
1076        let mut columns = Vec::new();
1077
1078        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1079        for _ in 0..num_rows {
1080            tag_0_builder.append_value(tags[0]);
1081        }
1082        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1083
1084        let values = (0..num_rows)
1085            .map(|_| "some data".as_bytes())
1086            .collect::<Vec<_>>();
1087        columns.push(
1088            Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1089                values,
1090            )) as ArrayRef,
1091        );
1092
1093        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1094        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1095
1096        let pk = new_primary_key(tags);
1097        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1098        for _ in 0..num_rows {
1099            pk_builder.append(&pk).unwrap();
1100        }
1101        columns.push(Arc::new(pk_builder.finish()));
1102
1103        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1104        columns.push(Arc::new(UInt8Array::from_value(
1105            OpType::Put as u8,
1106            num_rows,
1107        )));
1108
1109        RecordBatch::try_new(flat_schema, columns).unwrap()
1110    }
1111
1112    async fn check_record_batch_reader_result(
1113        reader: &mut ParquetReader,
1114        expected: &[RecordBatch],
1115    ) {
1116        let mut actual = Vec::new();
1117        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1118            actual.push(batch);
1119        }
1120        assert_eq!(
1121            pretty_format_batches(expected).unwrap().to_string(),
1122            pretty_format_batches(&actual).unwrap().to_string()
1123        );
1124        assert!(reader.next_record_batch().await.unwrap().is_none());
1125    }
1126
1127    fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1128        let metadata = Arc::new(sst_region_metadata());
1129        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1130
1131        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1132        let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1133        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1134        let mut field_values = Vec::with_capacity(rows.len());
1135        let mut timestamps = Vec::with_capacity(rows.len());
1136
1137        for (tag_0, tag_1, ts) in rows {
1138            tag_0_builder.append_value(*tag_0);
1139            tag_1_builder.append_value(*tag_1);
1140            pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1141            field_values.push(*ts as u64);
1142            timestamps.push(*ts);
1143        }
1144
1145        RecordBatch::try_new(
1146            flat_schema,
1147            vec![
1148                Arc::new(tag_0_builder.finish()) as ArrayRef,
1149                Arc::new(tag_1_builder.finish()) as ArrayRef,
1150                Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1151                Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1152                Arc::new(pk_builder.finish()) as ArrayRef,
1153                Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1154                Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1155            ],
1156        )
1157        .unwrap()
1158    }
1159
1160    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1161    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1162    fn new_record_batch_by_range_sparse(
1163        tags: &[&str],
1164        start: usize,
1165        end: usize,
1166        metadata: &Arc<RegionMetadata>,
1167    ) -> RecordBatch {
1168        assert!(end >= start);
1169        let flat_schema = to_flat_sst_arrow_schema(
1170            metadata,
1171            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1172        );
1173
1174        let num_rows = end - start;
1175        let mut columns: Vec<ArrayRef> = Vec::new();
1176
1177        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1178
1179        // Add field column (field_0)
1180        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1181        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1182
1183        // Add time index column (ts)
1184        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1185        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1186
1187        // Add encoded primary key column using sparse encoding
1188        let table_id = 1u32; // Test table ID
1189        let tsid = 100u64; // Base TSID
1190        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1191
1192        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1193        for _ in 0..num_rows {
1194            pk_builder.append(&pk).unwrap();
1195        }
1196        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1197
1198        // Add sequence column
1199        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1200
1201        // Add op_type column
1202        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1203
1204        RecordBatch::try_new(flat_schema, columns).unwrap()
1205    }
1206
1207    /// Helper function to create IndexerBuilderImpl for tests.
1208    fn create_test_indexer_builder(
1209        env: &TestEnv,
1210        object_store: ObjectStore,
1211        file_path: RegionFilePathFactory,
1212        metadata: Arc<RegionMetadata>,
1213        row_group_size: usize,
1214    ) -> IndexerBuilderImpl {
1215        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1216        let intermediate_manager = env.get_intermediate_manager();
1217
1218        IndexerBuilderImpl {
1219            build_type: IndexBuildType::Flush,
1220            metadata,
1221            row_group_size,
1222            puffin_manager,
1223            write_cache_enabled: false,
1224            intermediate_manager,
1225            index_options: IndexOptions {
1226                inverted_index: InvertedIndexOptions {
1227                    segment_row_count: 1,
1228                    ..Default::default()
1229                },
1230            },
1231            inverted_index_config: Default::default(),
1232            fulltext_index_config: Default::default(),
1233            bloom_filter_index_config: Default::default(),
1234            #[cfg(feature = "vector_index")]
1235            vector_index_config: Default::default(),
1236        }
1237    }
1238
1239    /// Helper function to write flat SST and return SstInfo.
1240    async fn write_flat_sst(
1241        object_store: ObjectStore,
1242        metadata: Arc<RegionMetadata>,
1243        indexer_builder: IndexerBuilderImpl,
1244        file_path: RegionFilePathFactory,
1245        flat_source: FlatSource,
1246        write_opts: &WriteOptions,
1247    ) -> SstInfo {
1248        let mut metrics = Metrics::new(WriteType::Flush);
1249        let mut writer = ParquetWriter::new_with_object_store(
1250            object_store,
1251            metadata,
1252            IndexConfig::default(),
1253            indexer_builder,
1254            file_path,
1255            &mut metrics,
1256        )
1257        .await;
1258
1259        writer
1260            .write_all_flat(flat_source, None, write_opts)
1261            .await
1262            .unwrap()
1263            .remove(0)
1264    }
1265
1266    /// Helper function to create FileHandle from SstInfo.
1267    fn create_file_handle_from_sst_info(
1268        info: &SstInfo,
1269        metadata: &Arc<RegionMetadata>,
1270    ) -> FileHandle {
1271        FileHandle::new(
1272            FileMeta {
1273                region_id: metadata.region_id,
1274                file_id: info.file_id,
1275                time_range: info.time_range,
1276                level: 0,
1277                file_size: info.file_size,
1278                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1279                available_indexes: info.index_metadata.build_available_indexes(),
1280                indexes: info.index_metadata.build_indexes(),
1281                index_file_size: info.index_metadata.file_size,
1282                index_version: 0,
1283                num_row_groups: info.num_row_groups,
1284                num_rows: info.num_rows as u64,
1285                sequence: None,
1286                partition_expr: match &metadata.partition_expr {
1287                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1288                        .expect("partition expression should be valid JSON"),
1289                    None => None,
1290                },
1291                num_series: 0,
1292                ..Default::default()
1293            },
1294            Arc::new(NoopFilePurger),
1295        )
1296    }
1297
1298    /// Helper function to create test cache with standard settings.
1299    fn create_test_cache() -> Arc<CacheManager> {
1300        Arc::new(
1301            CacheManager::builder()
1302                .index_result_cache_size(1024 * 1024)
1303                .index_metadata_size(1024 * 1024)
1304                .index_content_page_size(1024 * 1024)
1305                .index_content_size(1024 * 1024)
1306                .puffin_metadata_size(1024 * 1024)
1307                .build(),
1308        )
1309    }
1310
1311    #[tokio::test]
1312    async fn test_write_flat_with_index() {
1313        let mut env = TestEnv::new().await;
1314        let object_store = env.init_object_store_manager();
1315        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1316        let metadata = Arc::new(sst_region_metadata());
1317        let row_group_size = 50;
1318
1319        // Create flat format RecordBatches
1320        let flat_batches = vec![
1321            new_record_batch_by_range(&["a", "d"], 0, 20),
1322            new_record_batch_by_range(&["b", "d"], 0, 20),
1323            new_record_batch_by_range(&["c", "d"], 0, 20),
1324            new_record_batch_by_range(&["c", "f"], 0, 40),
1325            new_record_batch_by_range(&["c", "h"], 100, 200),
1326        ];
1327
1328        let flat_source = new_flat_source_from_record_batches(flat_batches);
1329
1330        let write_opts = WriteOptions {
1331            row_group_size,
1332            ..Default::default()
1333        };
1334
1335        let puffin_manager = env
1336            .get_puffin_manager()
1337            .build(object_store.clone(), file_path.clone());
1338        let intermediate_manager = env.get_intermediate_manager();
1339
1340        let indexer_builder = IndexerBuilderImpl {
1341            build_type: IndexBuildType::Flush,
1342            metadata: metadata.clone(),
1343            row_group_size,
1344            puffin_manager,
1345            write_cache_enabled: false,
1346            intermediate_manager,
1347            index_options: IndexOptions {
1348                inverted_index: InvertedIndexOptions {
1349                    segment_row_count: 1,
1350                    ..Default::default()
1351                },
1352            },
1353            inverted_index_config: Default::default(),
1354            fulltext_index_config: Default::default(),
1355            bloom_filter_index_config: Default::default(),
1356            #[cfg(feature = "vector_index")]
1357            vector_index_config: Default::default(),
1358        };
1359
1360        let mut metrics = Metrics::new(WriteType::Flush);
1361        let mut writer = ParquetWriter::new_with_object_store(
1362            object_store.clone(),
1363            metadata.clone(),
1364            IndexConfig::default(),
1365            indexer_builder,
1366            file_path.clone(),
1367            &mut metrics,
1368        )
1369        .await;
1370
1371        let info = writer
1372            .write_all_flat(flat_source, None, &write_opts)
1373            .await
1374            .unwrap()
1375            .remove(0);
1376        assert_eq!(200, info.num_rows);
1377        assert!(info.file_size > 0);
1378        assert!(info.index_metadata.file_size > 0);
1379
1380        assert!(info.index_metadata.inverted_index.index_size > 0);
1381        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1382        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1383
1384        assert!(info.index_metadata.bloom_filter.index_size > 0);
1385        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1386        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1387
1388        assert_eq!(
1389            (
1390                Timestamp::new_millisecond(0),
1391                Timestamp::new_millisecond(199)
1392            ),
1393            info.time_range
1394        );
1395    }
1396
1397    #[tokio::test]
1398    async fn test_read_with_override_sequence() {
1399        let mut env = TestEnv::new().await;
1400        let object_store = env.init_object_store_manager();
1401        let handle = sst_file_handle(0, 1000);
1402        let file_path = FixedPathProvider {
1403            region_file_id: handle.file_id(),
1404        };
1405        let metadata = Arc::new(sst_region_metadata());
1406
1407        // Create batches with sequence 0 to trigger override functionality.
1408        let source = new_flat_source_from_record_batches(vec![
1409            new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1410            new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1411        ]);
1412
1413        let write_opts = WriteOptions {
1414            row_group_size: 50,
1415            ..Default::default()
1416        };
1417
1418        let mut metrics = Metrics::new(WriteType::Flush);
1419        let mut writer = ParquetWriter::new_with_object_store(
1420            object_store.clone(),
1421            metadata.clone(),
1422            IndexConfig::default(),
1423            NoopIndexBuilder,
1424            file_path,
1425            &mut metrics,
1426        )
1427        .await;
1428
1429        writer
1430            .write_all_flat_as_primary_key(source, None, &write_opts)
1431            .await
1432            .unwrap()
1433            .remove(0);
1434
1435        // Read without override sequence (should read sequence 0)
1436        let builder = ParquetReaderBuilder::new(
1437            FILE_DIR.to_string(),
1438            PathType::Bare,
1439            handle.clone(),
1440            object_store.clone(),
1441        );
1442        let mut reader = builder.build().await.unwrap().unwrap();
1443        let mut normal_batches = Vec::new();
1444        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1445            normal_batches.push(batch);
1446        }
1447
1448        // Read with override sequence using FileMeta.sequence
1449        let custom_sequence = 12345u64;
1450        let file_meta = handle.meta_ref();
1451        let mut override_file_meta = file_meta.clone();
1452        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1453        let override_handle = FileHandle::new(
1454            override_file_meta,
1455            Arc::new(crate::sst::file_purger::NoopFilePurger),
1456        );
1457
1458        let builder = ParquetReaderBuilder::new(
1459            FILE_DIR.to_string(),
1460            PathType::Bare,
1461            override_handle,
1462            object_store.clone(),
1463        );
1464        let mut reader = builder.build().await.unwrap().unwrap();
1465        let mut override_batches = Vec::new();
1466        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1467            override_batches.push(batch);
1468        }
1469
1470        // Compare the results
1471        assert_eq!(normal_batches.len(), override_batches.len());
1472        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1473            let expected_batch = {
1474                let mut columns = normal.columns().to_vec();
1475                let num_cols = columns.len();
1476                columns[num_cols - 2] =
1477                    Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1478                RecordBatch::try_new(normal.schema(), columns).unwrap()
1479            };
1480
1481            // Override batch should match expected batch
1482            assert_eq!(*override_batch, expected_batch);
1483        }
1484    }
1485
1486    #[tokio::test]
1487    async fn test_write_flat_read_with_inverted_index() {
1488        let mut env = TestEnv::new().await;
1489        let object_store = env.init_object_store_manager();
1490        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1491        let metadata = Arc::new(sst_region_metadata());
1492        let row_group_size = 100;
1493
1494        // Create flat format RecordBatches with non-overlapping timestamp ranges
1495        // Each batch becomes one row group (row_group_size = 100)
1496        // Data: ts tag_0 tag_1
1497        // RG 0:   0-50  [a, d]
1498        // RG 0:  50-100 [b, d]
1499        // RG 1: 100-150 [c, d]
1500        // RG 1: 150-200 [c, f]
1501        let flat_batches = vec![
1502            new_record_batch_by_range(&["a", "d"], 0, 50),
1503            new_record_batch_by_range(&["b", "d"], 50, 100),
1504            new_record_batch_by_range(&["c", "d"], 100, 150),
1505            new_record_batch_by_range(&["c", "f"], 150, 200),
1506        ];
1507
1508        let flat_source = new_flat_source_from_record_batches(flat_batches);
1509
1510        let write_opts = WriteOptions {
1511            row_group_size,
1512            ..Default::default()
1513        };
1514
1515        let indexer_builder = create_test_indexer_builder(
1516            &env,
1517            object_store.clone(),
1518            file_path.clone(),
1519            metadata.clone(),
1520            row_group_size,
1521        );
1522
1523        let info = write_flat_sst(
1524            object_store.clone(),
1525            metadata.clone(),
1526            indexer_builder,
1527            file_path.clone(),
1528            flat_source,
1529            &write_opts,
1530        )
1531        .await;
1532        assert_eq!(200, info.num_rows);
1533        assert!(info.file_size > 0);
1534        assert!(info.index_metadata.file_size > 0);
1535
1536        let handle = create_file_handle_from_sst_info(&info, &metadata);
1537
1538        let cache = create_test_cache();
1539
1540        // Test 1: Filter by tag_0 = "b"
1541        // Expected: Only rows with tag_0="b"
1542        let preds = vec![col("tag_0").eq(lit("b"))];
1543        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1544            FILE_DIR.to_string(),
1545            PathType::Bare,
1546            object_store.clone(),
1547            &metadata,
1548            HashSet::from_iter([0]),
1549            env.get_puffin_manager(),
1550        )
1551        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1552        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1553        .build(&preds)
1554        .unwrap()
1555        .map(Arc::new);
1556
1557        let builder = ParquetReaderBuilder::new(
1558            FILE_DIR.to_string(),
1559            PathType::Bare,
1560            handle.clone(),
1561            object_store.clone(),
1562        )
1563        .predicate(Some(Predicate::new(preds)))
1564        .inverted_index_appliers([inverted_index_applier.clone(), None])
1565        .cache(CacheStrategy::EnableAll(cache.clone()));
1566
1567        let mut metrics = ReaderMetrics::default();
1568        let (_context, selection) = builder
1569            .build_reader_input(&mut metrics)
1570            .await
1571            .unwrap()
1572            .unwrap();
1573
1574        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1575        assert_eq!(selection.row_group_count(), 1);
1576        assert_eq!(50, selection.get(0).unwrap().row_count());
1577
1578        // Verify filtering metrics
1579        assert_eq!(metrics.filter_metrics.rg_total, 2);
1580        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1581        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1582        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1583    }
1584
1585    #[tokio::test]
1586    async fn test_write_flat_read_with_bloom_filter() {
1587        let mut env = TestEnv::new().await;
1588        let object_store = env.init_object_store_manager();
1589        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1590        let metadata = Arc::new(sst_region_metadata());
1591        let row_group_size = 100;
1592
1593        // Create flat format RecordBatches with non-overlapping timestamp ranges
1594        // Each batch becomes one row group (row_group_size = 100)
1595        // Data: ts tag_0 tag_1
1596        // RG 0:   0-50  [a, d]
1597        // RG 0:  50-100 [b, e]
1598        // RG 1: 100-150 [c, d]
1599        // RG 1: 150-200 [c, f]
1600        let flat_batches = vec![
1601            new_record_batch_by_range(&["a", "d"], 0, 50),
1602            new_record_batch_by_range(&["b", "e"], 50, 100),
1603            new_record_batch_by_range(&["c", "d"], 100, 150),
1604            new_record_batch_by_range(&["c", "f"], 150, 200),
1605        ];
1606
1607        let flat_source = new_flat_source_from_record_batches(flat_batches);
1608
1609        let write_opts = WriteOptions {
1610            row_group_size,
1611            ..Default::default()
1612        };
1613
1614        let indexer_builder = create_test_indexer_builder(
1615            &env,
1616            object_store.clone(),
1617            file_path.clone(),
1618            metadata.clone(),
1619            row_group_size,
1620        );
1621
1622        let info = write_flat_sst(
1623            object_store.clone(),
1624            metadata.clone(),
1625            indexer_builder,
1626            file_path.clone(),
1627            flat_source,
1628            &write_opts,
1629        )
1630        .await;
1631        assert_eq!(200, info.num_rows);
1632        assert!(info.file_size > 0);
1633        assert!(info.index_metadata.file_size > 0);
1634
1635        let handle = create_file_handle_from_sst_info(&info, &metadata);
1636
1637        let cache = create_test_cache();
1638
1639        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1640        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1641        let preds = vec![
1642            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1643            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1644            col("tag_1").eq(lit("d")),
1645        ];
1646        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1647            FILE_DIR.to_string(),
1648            PathType::Bare,
1649            object_store.clone(),
1650            &metadata,
1651            env.get_puffin_manager(),
1652        )
1653        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1654        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1655        .build(&preds)
1656        .unwrap()
1657        .map(Arc::new);
1658
1659        let builder = ParquetReaderBuilder::new(
1660            FILE_DIR.to_string(),
1661            PathType::Bare,
1662            handle.clone(),
1663            object_store.clone(),
1664        )
1665        .predicate(Some(Predicate::new(preds)))
1666        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1667        .cache(CacheStrategy::EnableAll(cache.clone()));
1668
1669        let mut metrics = ReaderMetrics::default();
1670        let (_context, selection) = builder
1671            .build_reader_input(&mut metrics)
1672            .await
1673            .unwrap()
1674            .unwrap();
1675
1676        // Verify selection contains RG 0 and RG 1
1677        assert_eq!(selection.row_group_count(), 2);
1678        assert_eq!(50, selection.get(0).unwrap().row_count());
1679        assert_eq!(50, selection.get(1).unwrap().row_count());
1680
1681        // Verify filtering metrics
1682        assert_eq!(metrics.filter_metrics.rg_total, 2);
1683        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1684        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1685        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1686    }
1687
1688    #[tokio::test]
1689    async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1690        let mut env = TestEnv::new().await;
1691        let object_store = env.init_object_store_manager();
1692        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1693        let metadata = Arc::new(sst_region_metadata());
1694        let row_group_size = 10;
1695
1696        let flat_source = new_flat_source_from_record_batches(vec![
1697            new_record_batch_by_range(&["a", "d"], 0, 3),
1698            new_record_batch_by_range(&["b", "d"], 3, 10),
1699        ]);
1700        let write_opts = WriteOptions {
1701            row_group_size,
1702            ..Default::default()
1703        };
1704        let indexer_builder = create_test_indexer_builder(
1705            &env,
1706            object_store.clone(),
1707            file_path.clone(),
1708            metadata.clone(),
1709            row_group_size,
1710        );
1711        let info = write_flat_sst(
1712            object_store.clone(),
1713            metadata.clone(),
1714            indexer_builder,
1715            file_path,
1716            flat_source,
1717            &write_opts,
1718        )
1719        .await;
1720        let handle = create_file_handle_from_sst_info(&info, &metadata);
1721
1722        let builder =
1723            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1724                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1725
1726        let mut metrics = ReaderMetrics::default();
1727        let (context, _) = builder
1728            .build_reader_input(&mut metrics)
1729            .await
1730            .unwrap()
1731            .unwrap();
1732        let selection = RowGroupSelection::from_row_ranges(
1733            vec![(0, std::iter::once(0..6).collect())],
1734            row_group_size,
1735        );
1736
1737        let mut reader = ParquetReader::new(Arc::new(context), selection)
1738            .await
1739            .unwrap();
1740        check_record_batch_reader_result(
1741            &mut reader,
1742            &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1743        )
1744        .await;
1745    }
1746
1747    #[tokio::test]
1748    async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1749        let mut env = TestEnv::new().await;
1750        let object_store = env.init_object_store_manager();
1751        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1752        let metadata = Arc::new(sst_region_metadata());
1753        let row_group_size = 8;
1754
1755        let flat_source = new_flat_source_from_record_batches(vec![
1756            new_record_batch_by_range(&["a", "d"], 0, 2),
1757            new_record_batch_by_range(&["b", "d"], 2, 4),
1758            new_record_batch_by_range(&["a", "d"], 4, 6),
1759            new_record_batch_by_range(&["c", "d"], 6, 8),
1760        ]);
1761        let write_opts = WriteOptions {
1762            row_group_size,
1763            ..Default::default()
1764        };
1765        let indexer_builder = create_test_indexer_builder(
1766            &env,
1767            object_store.clone(),
1768            file_path.clone(),
1769            metadata.clone(),
1770            row_group_size,
1771        );
1772        let info = write_flat_sst(
1773            object_store.clone(),
1774            metadata.clone(),
1775            indexer_builder,
1776            file_path,
1777            flat_source,
1778            &write_opts,
1779        )
1780        .await;
1781        let handle = create_file_handle_from_sst_info(&info, &metadata);
1782
1783        let builder =
1784            ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1785                .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1786
1787        let mut metrics = ReaderMetrics::default();
1788        let (context, _) = builder
1789            .build_reader_input(&mut metrics)
1790            .await
1791            .unwrap()
1792            .unwrap();
1793        let selection = RowGroupSelection::from_row_ranges(
1794            vec![(0, std::iter::once(0..8).collect())],
1795            row_group_size,
1796        );
1797
1798        let mut reader = ParquetReader::new(Arc::new(context), selection)
1799            .await
1800            .unwrap();
1801        check_record_batch_reader_result(
1802            &mut reader,
1803            &[new_record_batch_from_rows(&[
1804                ("a", "d", 0),
1805                ("a", "d", 1),
1806                ("a", "d", 4),
1807                ("a", "d", 5),
1808            ])],
1809        )
1810        .await;
1811    }
1812
1813    #[tokio::test]
1814    async fn test_write_flat_read_with_inverted_index_sparse() {
1815        common_telemetry::init_default_ut_logging();
1816
1817        let mut env = TestEnv::new().await;
1818        let object_store = env.init_object_store_manager();
1819        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1820        let metadata = Arc::new(sst_region_metadata_with_encoding(
1821            PrimaryKeyEncoding::Sparse,
1822        ));
1823        let row_group_size = 100;
1824
1825        // Create flat format RecordBatches with non-overlapping timestamp ranges
1826        // Each batch becomes one row group (row_group_size = 100)
1827        // Data: ts tag_0 tag_1
1828        // RG 0:   0-50  [a, d]
1829        // RG 0:  50-100 [b, d]
1830        // RG 1: 100-150 [c, d]
1831        // RG 1: 150-200 [c, f]
1832        let flat_batches = vec![
1833            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1834            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1835            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1836            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1837        ];
1838
1839        let flat_source = new_flat_source_from_record_batches(flat_batches);
1840
1841        let write_opts = WriteOptions {
1842            row_group_size,
1843            ..Default::default()
1844        };
1845
1846        let indexer_builder = create_test_indexer_builder(
1847            &env,
1848            object_store.clone(),
1849            file_path.clone(),
1850            metadata.clone(),
1851            row_group_size,
1852        );
1853
1854        let info = write_flat_sst(
1855            object_store.clone(),
1856            metadata.clone(),
1857            indexer_builder,
1858            file_path.clone(),
1859            flat_source,
1860            &write_opts,
1861        )
1862        .await;
1863        assert_eq!(200, info.num_rows);
1864        assert!(info.file_size > 0);
1865        assert!(info.index_metadata.file_size > 0);
1866
1867        let handle = create_file_handle_from_sst_info(&info, &metadata);
1868
1869        let cache = create_test_cache();
1870
1871        // Test 1: Filter by tag_0 = "b"
1872        // Expected: Only rows with tag_0="b"
1873        let preds = vec![col("tag_0").eq(lit("b"))];
1874        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1875            FILE_DIR.to_string(),
1876            PathType::Bare,
1877            object_store.clone(),
1878            &metadata,
1879            HashSet::from_iter([0]),
1880            env.get_puffin_manager(),
1881        )
1882        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1883        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1884        .build(&preds)
1885        .unwrap()
1886        .map(Arc::new);
1887
1888        let builder = ParquetReaderBuilder::new(
1889            FILE_DIR.to_string(),
1890            PathType::Bare,
1891            handle.clone(),
1892            object_store.clone(),
1893        )
1894        .predicate(Some(Predicate::new(preds)))
1895        .inverted_index_appliers([inverted_index_applier.clone(), None])
1896        .cache(CacheStrategy::EnableAll(cache.clone()));
1897
1898        let mut metrics = ReaderMetrics::default();
1899        let (_context, selection) = builder
1900            .build_reader_input(&mut metrics)
1901            .await
1902            .unwrap()
1903            .unwrap();
1904
1905        // RG 0 has 50 matching rows (tag_0="b")
1906        assert_eq!(selection.row_group_count(), 1);
1907        assert_eq!(50, selection.get(0).unwrap().row_count());
1908
1909        // Verify filtering metrics
1910        // Note: With sparse encoding, tag columns aren't stored separately,
1911        // so minmax filtering on tags doesn't work (only inverted index)
1912        assert_eq!(metrics.filter_metrics.rg_total, 2);
1913        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1914        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1915        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1916    }
1917
1918    #[tokio::test]
1919    async fn test_write_flat_read_with_bloom_filter_sparse() {
1920        let mut env = TestEnv::new().await;
1921        let object_store = env.init_object_store_manager();
1922        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1923        let metadata = Arc::new(sst_region_metadata_with_encoding(
1924            PrimaryKeyEncoding::Sparse,
1925        ));
1926        let row_group_size = 100;
1927
1928        // Create flat format RecordBatches with non-overlapping timestamp ranges
1929        // Each batch becomes one row group (row_group_size = 100)
1930        // Data: ts tag_0 tag_1
1931        // RG 0:   0-50  [a, d]
1932        // RG 0:  50-100 [b, e]
1933        // RG 1: 100-150 [c, d]
1934        // RG 1: 150-200 [c, f]
1935        let flat_batches = vec![
1936            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1937            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1938            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1939            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1940        ];
1941
1942        let flat_source = new_flat_source_from_record_batches(flat_batches);
1943
1944        let write_opts = WriteOptions {
1945            row_group_size,
1946            ..Default::default()
1947        };
1948
1949        let indexer_builder = create_test_indexer_builder(
1950            &env,
1951            object_store.clone(),
1952            file_path.clone(),
1953            metadata.clone(),
1954            row_group_size,
1955        );
1956
1957        let info = write_flat_sst(
1958            object_store.clone(),
1959            metadata.clone(),
1960            indexer_builder,
1961            file_path.clone(),
1962            flat_source,
1963            &write_opts,
1964        )
1965        .await;
1966        assert_eq!(200, info.num_rows);
1967        assert!(info.file_size > 0);
1968        assert!(info.index_metadata.file_size > 0);
1969
1970        let handle = create_file_handle_from_sst_info(&info, &metadata);
1971
1972        let cache = create_test_cache();
1973
1974        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1975        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1976        let preds = vec![
1977            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1978            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1979            col("tag_1").eq(lit("d")),
1980        ];
1981        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1982            FILE_DIR.to_string(),
1983            PathType::Bare,
1984            object_store.clone(),
1985            &metadata,
1986            env.get_puffin_manager(),
1987        )
1988        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1989        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1990        .build(&preds)
1991        .unwrap()
1992        .map(Arc::new);
1993
1994        let builder = ParquetReaderBuilder::new(
1995            FILE_DIR.to_string(),
1996            PathType::Bare,
1997            handle.clone(),
1998            object_store.clone(),
1999        )
2000        .predicate(Some(Predicate::new(preds)))
2001        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2002        .cache(CacheStrategy::EnableAll(cache.clone()));
2003
2004        let mut metrics = ReaderMetrics::default();
2005        let (_context, selection) = builder
2006            .build_reader_input(&mut metrics)
2007            .await
2008            .unwrap()
2009            .unwrap();
2010
2011        // Verify selection contains RG 0 and RG 1
2012        assert_eq!(selection.row_group_count(), 2);
2013        assert_eq!(50, selection.get(0).unwrap().row_count());
2014        assert_eq!(50, selection.get(1).unwrap().row_count());
2015
2016        // Verify filtering metrics
2017        assert_eq!(metrics.filter_metrics.rg_total, 2);
2018        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2019        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2020        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2021    }
2022
2023    /// Creates region metadata for testing fulltext indexes.
2024    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
2025    fn fulltext_region_metadata() -> RegionMetadata {
2026        let mut builder = RegionMetadataBuilder::new(REGION_ID);
2027        builder
2028            .push_column_metadata(ColumnMetadata {
2029                column_schema: ColumnSchema::new(
2030                    "tag_0".to_string(),
2031                    ConcreteDataType::string_datatype(),
2032                    true,
2033                ),
2034                semantic_type: SemanticType::Tag,
2035                column_id: 0,
2036            })
2037            .push_column_metadata(ColumnMetadata {
2038                column_schema: ColumnSchema::new(
2039                    "text_bloom".to_string(),
2040                    ConcreteDataType::string_datatype(),
2041                    true,
2042                )
2043                .with_fulltext_options(FulltextOptions {
2044                    enable: true,
2045                    analyzer: FulltextAnalyzer::English,
2046                    case_sensitive: false,
2047                    backend: FulltextBackend::Bloom,
2048                    granularity: 1,
2049                    false_positive_rate_in_10000: 50,
2050                })
2051                .unwrap(),
2052                semantic_type: SemanticType::Field,
2053                column_id: 1,
2054            })
2055            .push_column_metadata(ColumnMetadata {
2056                column_schema: ColumnSchema::new(
2057                    "text_tantivy".to_string(),
2058                    ConcreteDataType::string_datatype(),
2059                    true,
2060                )
2061                .with_fulltext_options(FulltextOptions {
2062                    enable: true,
2063                    analyzer: FulltextAnalyzer::English,
2064                    case_sensitive: false,
2065                    backend: FulltextBackend::Tantivy,
2066                    granularity: 1,
2067                    false_positive_rate_in_10000: 50,
2068                })
2069                .unwrap(),
2070                semantic_type: SemanticType::Field,
2071                column_id: 2,
2072            })
2073            .push_column_metadata(ColumnMetadata {
2074                column_schema: ColumnSchema::new(
2075                    "field_0".to_string(),
2076                    ConcreteDataType::uint64_datatype(),
2077                    true,
2078                ),
2079                semantic_type: SemanticType::Field,
2080                column_id: 3,
2081            })
2082            .push_column_metadata(ColumnMetadata {
2083                column_schema: ColumnSchema::new(
2084                    "ts".to_string(),
2085                    ConcreteDataType::timestamp_millisecond_datatype(),
2086                    false,
2087                ),
2088                semantic_type: SemanticType::Timestamp,
2089                column_id: 4,
2090            })
2091            .primary_key(vec![0]);
2092        builder.build().unwrap()
2093    }
2094
2095    /// Creates a flat format RecordBatch with string fields for fulltext testing.
2096    fn new_fulltext_record_batch_by_range(
2097        tag: &str,
2098        text_bloom: &str,
2099        text_tantivy: &str,
2100        start: usize,
2101        end: usize,
2102    ) -> RecordBatch {
2103        assert!(end >= start);
2104        let metadata = Arc::new(fulltext_region_metadata());
2105        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2106
2107        let num_rows = end - start;
2108        let mut columns = Vec::new();
2109
2110        // Add primary key column (tag_0) as dictionary array
2111        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2112        for _ in 0..num_rows {
2113            tag_builder.append_value(tag);
2114        }
2115        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2116
2117        // Add text_bloom field (fulltext with bloom backend)
2118        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2119        columns.push(Arc::new(StringArray::from(text_bloom_values)));
2120
2121        // Add text_tantivy field (fulltext with tantivy backend)
2122        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2123        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2124
2125        // Add field column (field_0)
2126        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2127        columns.push(Arc::new(UInt64Array::from(field_values)));
2128
2129        // Add time index column (ts)
2130        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2131        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2132
2133        // Add encoded primary key column
2134        let pk = new_primary_key(&[tag]);
2135        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2136        for _ in 0..num_rows {
2137            pk_builder.append(&pk).unwrap();
2138        }
2139        columns.push(Arc::new(pk_builder.finish()));
2140
2141        // Add sequence column
2142        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2143
2144        // Add op_type column
2145        columns.push(Arc::new(UInt8Array::from_value(
2146            OpType::Put as u8,
2147            num_rows,
2148        )));
2149
2150        RecordBatch::try_new(flat_schema, columns).unwrap()
2151    }
2152
2153    #[tokio::test]
2154    async fn test_write_flat_read_with_fulltext_index() {
2155        let mut env = TestEnv::new().await;
2156        let object_store = env.init_object_store_manager();
2157        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2158        let metadata = Arc::new(fulltext_region_metadata());
2159        let row_group_size = 50;
2160
2161        // Create flat format RecordBatches with different text content
2162        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
2163        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
2164        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
2165        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
2166        let flat_batches = vec![
2167            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2168            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2169            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2170            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2171        ];
2172
2173        let flat_source = new_flat_source_from_record_batches(flat_batches);
2174
2175        let write_opts = WriteOptions {
2176            row_group_size,
2177            ..Default::default()
2178        };
2179
2180        let indexer_builder = create_test_indexer_builder(
2181            &env,
2182            object_store.clone(),
2183            file_path.clone(),
2184            metadata.clone(),
2185            row_group_size,
2186        );
2187
2188        let mut info = write_flat_sst(
2189            object_store.clone(),
2190            metadata.clone(),
2191            indexer_builder,
2192            file_path.clone(),
2193            flat_source,
2194            &write_opts,
2195        )
2196        .await;
2197        assert_eq!(200, info.num_rows);
2198        assert!(info.file_size > 0);
2199        assert!(info.index_metadata.file_size > 0);
2200
2201        // Verify fulltext indexes were created
2202        assert!(info.index_metadata.fulltext_index.index_size > 0);
2203        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2204        // text_bloom (column_id 1) and text_tantivy (column_id 2)
2205        info.index_metadata.fulltext_index.columns.sort_unstable();
2206        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2207
2208        assert_eq!(
2209            (
2210                Timestamp::new_millisecond(0),
2211                Timestamp::new_millisecond(199)
2212            ),
2213            info.time_range
2214        );
2215
2216        let handle = create_file_handle_from_sst_info(&info, &metadata);
2217
2218        let cache = create_test_cache();
2219
2220        // Helper functions to create fulltext function expressions
2221        let matches_func = || {
2222            Arc::new(
2223                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2224                    .provide(Default::default()),
2225            )
2226        };
2227
2228        let matches_term_func = || {
2229            Arc::new(
2230                ScalarFunctionFactory::from(
2231                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2232                )
2233                .provide(Default::default()),
2234            )
2235        };
2236
2237        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2238        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2239        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2240            args: vec![col("text_bloom"), "hello".lit()],
2241            func: matches_term_func(),
2242        })];
2243
2244        let fulltext_applier = FulltextIndexApplierBuilder::new(
2245            FILE_DIR.to_string(),
2246            PathType::Bare,
2247            object_store.clone(),
2248            env.get_puffin_manager(),
2249            &metadata,
2250        )
2251        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2252        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2253        .build(&preds)
2254        .unwrap()
2255        .map(Arc::new);
2256
2257        let builder = ParquetReaderBuilder::new(
2258            FILE_DIR.to_string(),
2259            PathType::Bare,
2260            handle.clone(),
2261            object_store.clone(),
2262        )
2263        .predicate(Some(Predicate::new(preds)))
2264        .fulltext_index_appliers([None, fulltext_applier.clone()])
2265        .cache(CacheStrategy::EnableAll(cache.clone()));
2266
2267        let mut metrics = ReaderMetrics::default();
2268        let (_context, selection) = builder
2269            .build_reader_input(&mut metrics)
2270            .await
2271            .unwrap()
2272            .unwrap();
2273
2274        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2275        assert_eq!(selection.row_group_count(), 2);
2276        assert_eq!(50, selection.get(0).unwrap().row_count());
2277        assert_eq!(50, selection.get(1).unwrap().row_count());
2278
2279        // Verify filtering metrics
2280        assert_eq!(metrics.filter_metrics.rg_total, 4);
2281        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2282        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2283        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2284
2285        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2286        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2287        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2288            args: vec![col("text_tantivy"), "lazy".lit()],
2289            func: matches_func(),
2290        })];
2291
2292        let fulltext_applier = FulltextIndexApplierBuilder::new(
2293            FILE_DIR.to_string(),
2294            PathType::Bare,
2295            object_store.clone(),
2296            env.get_puffin_manager(),
2297            &metadata,
2298        )
2299        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2300        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2301        .build(&preds)
2302        .unwrap()
2303        .map(Arc::new);
2304
2305        let builder = ParquetReaderBuilder::new(
2306            FILE_DIR.to_string(),
2307            PathType::Bare,
2308            handle.clone(),
2309            object_store.clone(),
2310        )
2311        .predicate(Some(Predicate::new(preds)))
2312        .fulltext_index_appliers([None, fulltext_applier.clone()])
2313        .cache(CacheStrategy::EnableAll(cache.clone()));
2314
2315        let mut metrics = ReaderMetrics::default();
2316        let (_context, selection) = builder
2317            .build_reader_input(&mut metrics)
2318            .await
2319            .unwrap()
2320            .unwrap();
2321
2322        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2323        assert_eq!(selection.row_group_count(), 2);
2324        assert_eq!(50, selection.get(2).unwrap().row_count());
2325        assert_eq!(50, selection.get(3).unwrap().row_count());
2326
2327        // Verify filtering metrics
2328        assert_eq!(metrics.filter_metrics.rg_total, 4);
2329        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2330        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2331        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2332    }
2333}