mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug, Clone)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug, Default)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Maximum uncompressed row group size in bytes. 0 if unknown.
80    pub max_row_group_uncompressed_size: u64,
81    /// Number of rows.
82    pub num_rows: usize,
83    /// Number of row groups
84    pub num_row_groups: u64,
85    /// File Meta Data
86    pub file_metadata: Option<Arc<ParquetMetaData>>,
87    /// Index Meta Data
88    pub index_metadata: IndexOutput,
89    /// Number of series
90    pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95    use std::collections::HashSet;
96    use std::sync::Arc;
97
98    use api::v1::{OpType, SemanticType};
99    use common_function::function::FunctionRef;
100    use common_function::function_factory::ScalarFunctionFactory;
101    use common_function::scalars::matches::MatchesFunction;
102    use common_function::scalars::matches_term::MatchesTermFunction;
103    use common_time::Timestamp;
104    use datafusion_common::{Column, ScalarValue};
105    use datafusion_expr::expr::ScalarFunction;
106    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107    use datatypes::arrow;
108    use datatypes::arrow::array::{
109        ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110        TimestampMillisecondArray, UInt8Array, UInt64Array,
111    };
112    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113    use datatypes::arrow::util::pretty::pretty_format_batches;
114    use datatypes::prelude::ConcreteDataType;
115    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
116    use object_store::ObjectStore;
117    use parquet::arrow::AsyncArrowWriter;
118    use parquet::basic::{Compression, Encoding, ZstdLevel};
119    use parquet::file::metadata::{KeyValue, PageIndexPolicy};
120    use parquet::file::properties::WriterProperties;
121    use store_api::codec::PrimaryKeyEncoding;
122    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
123    use store_api::region_request::PathType;
124    use store_api::storage::{ColumnSchema, RegionId};
125    use table::predicate::Predicate;
126    use tokio_util::compat::FuturesAsyncWriteCompatExt;
127
128    use super::*;
129    use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
130    use crate::cache::test_util::assert_parquet_metadata_equal;
131    use crate::cache::{CacheManager, CacheStrategy, PageKey};
132    use crate::config::IndexConfig;
133    use crate::read::FlatSource;
134    use crate::region::options::{IndexOptions, InvertedIndexOptions};
135    use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
136    use crate::sst::file_purger::NoopFilePurger;
137    use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
138    use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
139    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
140    use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
141    use crate::sst::parquet::flat_format::FlatWriteFormat;
142    use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
143    use crate::sst::parquet::writer::ParquetWriter;
144    use crate::sst::{
145        DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
146    };
147    use crate::test_util::TestEnv;
148    use crate::test_util::sst_util::{
149        build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
150        new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
151        new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
152        sst_region_metadata_with_encoding,
153    };
154
155    const FILE_DIR: &str = "/";
156    const REGION_ID: RegionId = RegionId::new(0, 0);
157
158    #[derive(Clone)]
159    struct FixedPathProvider {
160        region_file_id: RegionFileId,
161    }
162
163    impl FilePathProvider for FixedPathProvider {
164        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
165            location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
166        }
167
168        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
169            location::index_file_path(FILE_DIR, index_id, PathType::Bare)
170        }
171
172        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
173            location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
174        }
175    }
176
177    struct NoopIndexBuilder;
178
179    #[async_trait::async_trait]
180    impl IndexerBuilder for NoopIndexBuilder {
181        async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
182            Indexer::default()
183        }
184    }
185
186    #[tokio::test]
187    async fn test_write_read() {
188        let mut env = TestEnv::new().await;
189        let object_store = env.init_object_store_manager();
190        let handle = sst_file_handle(0, 1000);
191        let file_path = FixedPathProvider {
192            region_file_id: handle.file_id(),
193        };
194        let metadata = Arc::new(sst_region_metadata());
195        let source = new_flat_source_from_record_batches(vec![
196            new_record_batch_by_range(&["a", "d"], 0, 60),
197            new_record_batch_by_range(&["b", "f"], 0, 40),
198            new_record_batch_by_range(&["b", "h"], 100, 200),
199        ]);
200        // Use a small row group size for test.
201        let write_opts = WriteOptions {
202            row_group_size: 50,
203            ..Default::default()
204        };
205
206        let mut metrics = Metrics::new(WriteType::Flush);
207        let mut writer = ParquetWriter::new_with_object_store(
208            object_store.clone(),
209            metadata.clone(),
210            IndexConfig::default(),
211            NoopIndexBuilder,
212            file_path,
213            &mut metrics,
214        )
215        .await;
216
217        let info = writer
218            .write_all_flat_as_primary_key(source, None, &write_opts)
219            .await
220            .unwrap()
221            .remove(0);
222        assert_eq!(200, info.num_rows);
223        assert!(info.file_size > 0);
224        assert_eq!(
225            (
226                Timestamp::new_millisecond(0),
227                Timestamp::new_millisecond(199)
228            ),
229            info.time_range
230        );
231
232        let builder = ParquetReaderBuilder::new(
233            FILE_DIR.to_string(),
234            PathType::Bare,
235            handle.clone(),
236            object_store,
237        );
238        let mut reader = builder.build().await.unwrap().unwrap();
239        check_record_batch_reader_result(
240            &mut reader,
241            &[
242                new_record_batch_by_range(&["a", "d"], 0, 50),
243                new_record_batch_by_range(&["a", "d"], 50, 60),
244                new_record_batch_by_range(&["b", "f"], 0, 40),
245                new_record_batch_by_range(&["b", "h"], 100, 150),
246                new_record_batch_by_range(&["b", "h"], 150, 200),
247            ],
248        )
249        .await;
250    }
251
252    #[tokio::test]
253    async fn test_read_with_cache() {
254        let mut env = TestEnv::new().await;
255        let object_store = env.init_object_store_manager();
256        let handle = sst_file_handle(0, 1000);
257        let metadata = Arc::new(sst_region_metadata());
258        let source = new_flat_source_from_record_batches(vec![
259            new_record_batch_by_range(&["a", "d"], 0, 60),
260            new_record_batch_by_range(&["b", "f"], 0, 40),
261            new_record_batch_by_range(&["b", "h"], 100, 200),
262        ]);
263        // Use a small row group size for test.
264        let write_opts = WriteOptions {
265            row_group_size: 50,
266            ..Default::default()
267        };
268        // Prepare data.
269        let mut metrics = Metrics::new(WriteType::Flush);
270        let mut writer = ParquetWriter::new_with_object_store(
271            object_store.clone(),
272            metadata.clone(),
273            IndexConfig::default(),
274            NoopIndexBuilder,
275            FixedPathProvider {
276                region_file_id: handle.file_id(),
277            },
278            &mut metrics,
279        )
280        .await;
281
282        let sst_info = writer
283            .write_all_flat_as_primary_key(source, None, &write_opts)
284            .await
285            .unwrap()
286            .remove(0);
287
288        // Enable page cache.
289        let cache = CacheStrategy::EnableAll(Arc::new(
290            CacheManager::builder()
291                .page_cache_size(64 * 1024 * 1024)
292                .build(),
293        ));
294        let builder = ParquetReaderBuilder::new(
295            FILE_DIR.to_string(),
296            PathType::Bare,
297            handle.clone(),
298            object_store,
299        )
300        .cache(cache.clone());
301        for _ in 0..3 {
302            let mut reader = builder.build().await.unwrap().unwrap();
303            check_record_batch_reader_result(
304                &mut reader,
305                &[
306                    new_record_batch_by_range(&["a", "d"], 0, 50),
307                    new_record_batch_by_range(&["a", "d"], 50, 60),
308                    new_record_batch_by_range(&["b", "f"], 0, 40),
309                    new_record_batch_by_range(&["b", "h"], 100, 150),
310                    new_record_batch_by_range(&["b", "h"], 150, 200),
311                ],
312            )
313            .await;
314        }
315
316        let parquet_meta = sst_info.file_metadata.unwrap();
317        let get_ranges = |row_group_idx: usize| {
318            let row_group = parquet_meta.row_group(row_group_idx);
319            let mut ranges = Vec::with_capacity(row_group.num_columns());
320            for i in 0..row_group.num_columns() {
321                let (start, length) = row_group.column(i).byte_range();
322                ranges.push(start..start + length);
323            }
324
325            ranges
326        };
327
328        // Cache 4 row groups.
329        for i in 0..4 {
330            let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
331            assert!(cache.get_pages(&page_key).is_some());
332        }
333        let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
334        assert!(cache.get_pages(&page_key).is_none());
335    }
336
337    #[tokio::test]
338    async fn test_parquet_metadata_eq() {
339        // create test env
340        let mut env = crate::test_util::TestEnv::new().await;
341        let object_store = env.init_object_store_manager();
342        let handle = sst_file_handle(0, 1000);
343        let metadata = Arc::new(sst_region_metadata());
344        let source = new_flat_source_from_record_batches(vec![
345            new_record_batch_by_range(&["a", "d"], 0, 60),
346            new_record_batch_by_range(&["b", "f"], 0, 40),
347            new_record_batch_by_range(&["b", "h"], 100, 200),
348        ]);
349        let write_opts = WriteOptions {
350            row_group_size: 50,
351            ..Default::default()
352        };
353
354        // write the sst file and get sst info
355        // sst info contains the parquet metadata, which is converted from FileMetaData
356        let mut metrics = Metrics::new(WriteType::Flush);
357        let mut writer = ParquetWriter::new_with_object_store(
358            object_store.clone(),
359            metadata.clone(),
360            IndexConfig::default(),
361            NoopIndexBuilder,
362            FixedPathProvider {
363                region_file_id: handle.file_id(),
364            },
365            &mut metrics,
366        )
367        .await;
368
369        let sst_info = writer
370            .write_all_flat_as_primary_key(source, None, &write_opts)
371            .await
372            .unwrap()
373            .remove(0);
374        let writer_metadata = sst_info.file_metadata.unwrap();
375
376        // read the sst file metadata
377        let builder = ParquetReaderBuilder::new(
378            FILE_DIR.to_string(),
379            PathType::Bare,
380            handle.clone(),
381            object_store,
382        )
383        .page_index_policy(PageIndexPolicy::Optional);
384        let reader = builder.build().await.unwrap().unwrap();
385        let reader_metadata = reader.parquet_metadata();
386        let cached_writer_metadata =
387            crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
388                .unwrap()
389                .parquet_metadata();
390
391        assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
392    }
393
394    #[tokio::test]
395    async fn test_read_with_tag_filter() {
396        let mut env = TestEnv::new().await;
397        let object_store = env.init_object_store_manager();
398        let handle = sst_file_handle(0, 1000);
399        let metadata = Arc::new(sst_region_metadata());
400        let source = new_flat_source_from_record_batches(vec![
401            new_record_batch_by_range(&["a", "d"], 0, 60),
402            new_record_batch_by_range(&["b", "f"], 0, 40),
403            new_record_batch_by_range(&["b", "h"], 100, 200),
404        ]);
405        // Use a small row group size for test.
406        let write_opts = WriteOptions {
407            row_group_size: 50,
408            ..Default::default()
409        };
410        // Prepare data.
411        let mut metrics = Metrics::new(WriteType::Flush);
412        let mut writer = ParquetWriter::new_with_object_store(
413            object_store.clone(),
414            metadata.clone(),
415            IndexConfig::default(),
416            NoopIndexBuilder,
417            FixedPathProvider {
418                region_file_id: handle.file_id(),
419            },
420            &mut metrics,
421        )
422        .await;
423        writer
424            .write_all_flat_as_primary_key(source, None, &write_opts)
425            .await
426            .unwrap()
427            .remove(0);
428
429        // Predicate
430        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
431            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
432            op: Operator::Eq,
433            right: Box::new("a".lit()),
434        })]));
435
436        let builder = ParquetReaderBuilder::new(
437            FILE_DIR.to_string(),
438            PathType::Bare,
439            handle.clone(),
440            object_store,
441        )
442        .predicate(predicate);
443        let mut reader = builder.build().await.unwrap().unwrap();
444        check_record_batch_reader_result(
445            &mut reader,
446            &[
447                new_record_batch_by_range(&["a", "d"], 0, 50),
448                new_record_batch_by_range(&["a", "d"], 50, 60),
449            ],
450        )
451        .await;
452    }
453
454    #[tokio::test]
455    async fn test_read_empty_batch() {
456        let mut env = TestEnv::new().await;
457        let object_store = env.init_object_store_manager();
458        let handle = sst_file_handle(0, 1000);
459        let metadata = Arc::new(sst_region_metadata());
460        let source = new_flat_source_from_record_batches(vec![
461            new_record_batch_by_range(&["a", "z"], 0, 0),
462            new_record_batch_by_range(&["a", "z"], 100, 100),
463            new_record_batch_by_range(&["a", "z"], 200, 230),
464        ]);
465        // Use a small row group size for test.
466        let write_opts = WriteOptions {
467            row_group_size: 50,
468            ..Default::default()
469        };
470        // Prepare data.
471        let mut metrics = Metrics::new(WriteType::Flush);
472        let mut writer = ParquetWriter::new_with_object_store(
473            object_store.clone(),
474            metadata.clone(),
475            IndexConfig::default(),
476            NoopIndexBuilder,
477            FixedPathProvider {
478                region_file_id: handle.file_id(),
479            },
480            &mut metrics,
481        )
482        .await;
483        writer
484            .write_all_flat_as_primary_key(source, None, &write_opts)
485            .await
486            .unwrap()
487            .remove(0);
488
489        let builder = ParquetReaderBuilder::new(
490            FILE_DIR.to_string(),
491            PathType::Bare,
492            handle.clone(),
493            object_store,
494        );
495        let mut reader = builder.build().await.unwrap().unwrap();
496        check_record_batch_reader_result(
497            &mut reader,
498            &[new_record_batch_by_range(&["a", "z"], 200, 230)],
499        )
500        .await;
501    }
502
503    #[tokio::test]
504    async fn test_read_with_field_filter() {
505        let mut env = TestEnv::new().await;
506        let object_store = env.init_object_store_manager();
507        let handle = sst_file_handle(0, 1000);
508        let metadata = Arc::new(sst_region_metadata());
509        let source = new_flat_source_from_record_batches(vec![
510            new_record_batch_by_range(&["a", "d"], 0, 60),
511            new_record_batch_by_range(&["b", "f"], 0, 40),
512            new_record_batch_by_range(&["b", "h"], 100, 200),
513        ]);
514        // Use a small row group size for test.
515        let write_opts = WriteOptions {
516            row_group_size: 50,
517            ..Default::default()
518        };
519        // Prepare data.
520        let mut metrics = Metrics::new(WriteType::Flush);
521        let mut writer = ParquetWriter::new_with_object_store(
522            object_store.clone(),
523            metadata.clone(),
524            IndexConfig::default(),
525            NoopIndexBuilder,
526            FixedPathProvider {
527                region_file_id: handle.file_id(),
528            },
529            &mut metrics,
530        )
531        .await;
532
533        writer
534            .write_all_flat_as_primary_key(source, None, &write_opts)
535            .await
536            .unwrap()
537            .remove(0);
538
539        // Predicate
540        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
541            left: Box::new(Expr::Column(Column::from_name("field_0"))),
542            op: Operator::GtEq,
543            right: Box::new(150u64.lit()),
544        })]));
545
546        let builder = ParquetReaderBuilder::new(
547            FILE_DIR.to_string(),
548            PathType::Bare,
549            handle.clone(),
550            object_store,
551        )
552        .predicate(predicate);
553        let mut reader = builder.build().await.unwrap().unwrap();
554        check_record_batch_reader_result(
555            &mut reader,
556            &[new_record_batch_by_range(&["b", "h"], 150, 200)],
557        )
558        .await;
559    }
560
561    #[tokio::test]
562    async fn test_read_large_binary() {
563        let mut env = TestEnv::new().await;
564        let object_store = env.init_object_store_manager();
565        let handle = sst_file_handle(0, 1000);
566        let file_path = handle.file_path(FILE_DIR, PathType::Bare);
567
568        let write_opts = WriteOptions {
569            row_group_size: 50,
570            ..Default::default()
571        };
572
573        let metadata = build_test_binary_test_region_metadata();
574        let json = metadata.to_json().unwrap();
575        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
576
577        let props_builder = WriterProperties::builder()
578            .set_key_value_metadata(Some(vec![key_value_meta]))
579            .set_compression(Compression::ZSTD(ZstdLevel::default()))
580            .set_encoding(Encoding::PLAIN)
581            .set_max_row_group_size(write_opts.row_group_size);
582
583        let writer_props = props_builder.build();
584
585        let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
586        let fields: Vec<_> = write_format
587            .arrow_schema()
588            .fields()
589            .into_iter()
590            .map(|field| {
591                let data_type = field.data_type().clone();
592                if data_type == DataType::Binary {
593                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
594                } else {
595                    Field::new(field.name(), data_type, field.is_nullable())
596                }
597            })
598            .collect();
599
600        let arrow_schema = Arc::new(Schema::new(fields));
601
602        // Ensures field_0 has LargeBinary type.
603        assert_eq!(
604            &DataType::LargeBinary,
605            arrow_schema.field_with_name("field_0").unwrap().data_type()
606        );
607        let mut writer = AsyncArrowWriter::try_new(
608            object_store
609                .writer_with(&file_path)
610                .concurrent(DEFAULT_WRITE_CONCURRENCY)
611                .await
612                .map(|w| w.into_futures_async_write().compat_write())
613                .unwrap(),
614            arrow_schema.clone(),
615            Some(writer_props),
616        )
617        .unwrap();
618
619        let batch = new_record_batch_with_binary(&["a"], 0, 60);
620        let arrays: Vec<_> = batch
621            .columns()
622            .iter()
623            .map(|array| {
624                let data_type = array.data_type().clone();
625                if data_type == DataType::Binary {
626                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
627                } else {
628                    array.clone()
629                }
630            })
631            .collect();
632        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
633
634        writer.write(&result).await.unwrap();
635        writer.close().await.unwrap();
636
637        let builder = ParquetReaderBuilder::new(
638            FILE_DIR.to_string(),
639            PathType::Bare,
640            handle.clone(),
641            object_store,
642        );
643        let mut reader = builder.build().await.unwrap().unwrap();
644        check_record_batch_reader_result(
645            &mut reader,
646            &[
647                new_record_batch_with_binary(&["a"], 0, 50),
648                new_record_batch_with_binary(&["a"], 50, 60),
649            ],
650        )
651        .await;
652    }
653
654    #[tokio::test]
655    async fn test_write_multiple_files() {
656        common_telemetry::init_default_ut_logging();
657        // create test env
658        let mut env = TestEnv::new().await;
659        let object_store = env.init_object_store_manager();
660        let metadata = Arc::new(sst_region_metadata());
661        let batches = vec![
662            new_record_batch_by_range(&["a", "d"], 0, 1000),
663            new_record_batch_by_range(&["b", "f"], 0, 1000),
664            new_record_batch_by_range(&["c", "g"], 0, 1000),
665            new_record_batch_by_range(&["b", "h"], 100, 200),
666            new_record_batch_by_range(&["b", "h"], 200, 300),
667            new_record_batch_by_range(&["b", "h"], 300, 1000),
668        ];
669        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
670
671        let source = new_flat_source_from_record_batches(batches);
672        let write_opts = WriteOptions {
673            row_group_size: 50,
674            max_file_size: Some(1024 * 16),
675            ..Default::default()
676        };
677
678        let path_provider = RegionFilePathFactory {
679            table_dir: "test".to_string(),
680            path_type: PathType::Bare,
681        };
682        let mut metrics = Metrics::new(WriteType::Flush);
683        let mut writer = ParquetWriter::new_with_object_store(
684            object_store.clone(),
685            metadata.clone(),
686            IndexConfig::default(),
687            NoopIndexBuilder,
688            path_provider,
689            &mut metrics,
690        )
691        .await;
692
693        let files = writer
694            .write_all_flat_as_primary_key(source, None, &write_opts)
695            .await
696            .unwrap();
697        assert_eq!(2, files.len());
698
699        let mut rows_read = 0;
700        for f in &files {
701            let file_handle = sst_file_handle_with_file_id(
702                f.file_id,
703                f.time_range.0.value(),
704                f.time_range.1.value(),
705            );
706            let builder = ParquetReaderBuilder::new(
707                "test".to_string(),
708                PathType::Bare,
709                file_handle,
710                object_store.clone(),
711            );
712            let mut reader = builder.build().await.unwrap().unwrap();
713            while let Some(batch) = reader.next_record_batch().await.unwrap() {
714                rows_read += batch.num_rows();
715            }
716        }
717        assert_eq!(total_rows, rows_read);
718    }
719
720    #[tokio::test]
721    async fn test_write_read_with_index() {
722        let mut env = TestEnv::new().await;
723        let object_store = env.init_object_store_manager();
724        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
725        let metadata = Arc::new(sst_region_metadata());
726        let row_group_size = 50;
727
728        let source = new_flat_source_from_record_batches(vec![
729            new_record_batch_by_range(&["a", "d"], 0, 20),
730            new_record_batch_by_range(&["b", "d"], 0, 20),
731            new_record_batch_by_range(&["c", "d"], 0, 20),
732            new_record_batch_by_range(&["c", "f"], 0, 40),
733            new_record_batch_by_range(&["c", "h"], 100, 200),
734        ]);
735        // Use a small row group size for test.
736        let write_opts = WriteOptions {
737            row_group_size,
738            ..Default::default()
739        };
740
741        let puffin_manager = env
742            .get_puffin_manager()
743            .build(object_store.clone(), file_path.clone());
744        let intermediate_manager = env.get_intermediate_manager();
745
746        let indexer_builder = IndexerBuilderImpl {
747            build_type: IndexBuildType::Flush,
748            metadata: metadata.clone(),
749            row_group_size,
750            puffin_manager,
751            write_cache_enabled: false,
752            intermediate_manager,
753            index_options: IndexOptions {
754                inverted_index: InvertedIndexOptions {
755                    segment_row_count: 1,
756                    ..Default::default()
757                },
758            },
759            inverted_index_config: Default::default(),
760            fulltext_index_config: Default::default(),
761            bloom_filter_index_config: Default::default(),
762            #[cfg(feature = "vector_index")]
763            vector_index_config: Default::default(),
764        };
765
766        let mut metrics = Metrics::new(WriteType::Flush);
767        let mut writer = ParquetWriter::new_with_object_store(
768            object_store.clone(),
769            metadata.clone(),
770            IndexConfig::default(),
771            indexer_builder,
772            file_path.clone(),
773            &mut metrics,
774        )
775        .await;
776
777        let info = writer
778            .write_all_flat_as_primary_key(source, None, &write_opts)
779            .await
780            .unwrap()
781            .remove(0);
782        assert_eq!(200, info.num_rows);
783        assert!(info.file_size > 0);
784        assert!(info.index_metadata.file_size > 0);
785
786        assert!(info.index_metadata.inverted_index.index_size > 0);
787        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
788        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
789
790        assert!(info.index_metadata.bloom_filter.index_size > 0);
791        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
792        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
793
794        assert_eq!(
795            (
796                Timestamp::new_millisecond(0),
797                Timestamp::new_millisecond(199)
798            ),
799            info.time_range
800        );
801
802        let handle = FileHandle::new(
803            FileMeta {
804                region_id: metadata.region_id,
805                file_id: info.file_id,
806                time_range: info.time_range,
807                level: 0,
808                file_size: info.file_size,
809                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
810                available_indexes: info.index_metadata.build_available_indexes(),
811                indexes: info.index_metadata.build_indexes(),
812                index_file_size: info.index_metadata.file_size,
813                index_version: 0,
814                num_row_groups: info.num_row_groups,
815                num_rows: info.num_rows as u64,
816                sequence: None,
817                partition_expr: match &metadata.partition_expr {
818                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
819                        .expect("partition expression should be valid JSON"),
820                    None => None,
821                },
822                num_series: 0,
823            },
824            Arc::new(NoopFilePurger),
825        );
826
827        let cache = Arc::new(
828            CacheManager::builder()
829                .index_result_cache_size(1024 * 1024)
830                .index_metadata_size(1024 * 1024)
831                .index_content_page_size(1024 * 1024)
832                .index_content_size(1024 * 1024)
833                .puffin_metadata_size(1024 * 1024)
834                .build(),
835        );
836        let index_result_cache = cache.index_result_cache().unwrap();
837
838        let build_inverted_index_applier = |exprs: &[Expr]| {
839            InvertedIndexApplierBuilder::new(
840                FILE_DIR.to_string(),
841                PathType::Bare,
842                object_store.clone(),
843                &metadata,
844                HashSet::from_iter([0]),
845                env.get_puffin_manager(),
846            )
847            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
848            .with_inverted_index_cache(cache.inverted_index_cache().cloned())
849            .build(exprs)
850            .unwrap()
851            .map(Arc::new)
852        };
853
854        let build_bloom_filter_applier = |exprs: &[Expr]| {
855            BloomFilterIndexApplierBuilder::new(
856                FILE_DIR.to_string(),
857                PathType::Bare,
858                object_store.clone(),
859                &metadata,
860                env.get_puffin_manager(),
861            )
862            .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
863            .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
864            .build(exprs)
865            .unwrap()
866            .map(Arc::new)
867        };
868
869        // Data: ts tag_0 tag_1
870        // Data: 0-20 [a, d]
871        //       0-20 [b, d]
872        //       0-20 [c, d]
873        //       0-40 [c, f]
874        //    100-200 [c, h]
875        //
876        // Pred: tag_0 = "b"
877        //
878        // Row groups & rows pruning:
879        //
880        // Row Groups:
881        // - min-max: filter out row groups 1..=3
882        //
883        // Rows:
884        // - inverted index: hit row group 0, hit 20 rows
885        let preds = vec![col("tag_0").eq(lit("b"))];
886        let inverted_index_applier = build_inverted_index_applier(&preds);
887        let bloom_filter_applier = build_bloom_filter_applier(&preds);
888
889        let builder = ParquetReaderBuilder::new(
890            FILE_DIR.to_string(),
891            PathType::Bare,
892            handle.clone(),
893            object_store.clone(),
894        )
895        .flat_format(true)
896        .predicate(Some(Predicate::new(preds)))
897        .inverted_index_appliers([inverted_index_applier.clone(), None])
898        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
899        .cache(CacheStrategy::EnableAll(cache.clone()));
900
901        let mut metrics = ReaderMetrics::default();
902        let (context, selection) = builder
903            .build_reader_input(&mut metrics)
904            .await
905            .unwrap()
906            .unwrap();
907        let mut reader = ParquetReader::new(Arc::new(context), selection)
908            .await
909            .unwrap();
910        check_record_batch_reader_result(
911            &mut reader,
912            &[new_record_batch_by_range(&["b", "d"], 0, 20)],
913        )
914        .await;
915
916        assert_eq!(metrics.filter_metrics.rg_total, 4);
917        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
918        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
919        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
920        let cached = index_result_cache
921            .get(
922                inverted_index_applier.unwrap().predicate_key(),
923                handle.file_id().file_id(),
924            )
925            .unwrap();
926        // inverted index will search all row groups
927        assert!(cached.contains_row_group(0));
928        assert!(cached.contains_row_group(1));
929        assert!(cached.contains_row_group(2));
930        assert!(cached.contains_row_group(3));
931
932        // Data: ts tag_0 tag_1
933        // Data: 0-20 [a, d]
934        //       0-20 [b, d]
935        //       0-20 [c, d]
936        //       0-40 [c, f]
937        //    100-200 [c, h]
938        //
939        // Pred: 50 <= ts && ts < 200 && tag_1 = "d"
940        //
941        // Row groups & rows pruning:
942        //
943        // Row Groups:
944        // - min-max: filter out row groups 0..=1
945        // - bloom filter: filter out row groups 2..=3
946        let preds = vec![
947            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
948            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
949            col("tag_1").eq(lit("d")),
950        ];
951        let inverted_index_applier = build_inverted_index_applier(&preds);
952        let bloom_filter_applier = build_bloom_filter_applier(&preds);
953
954        let builder = ParquetReaderBuilder::new(
955            FILE_DIR.to_string(),
956            PathType::Bare,
957            handle.clone(),
958            object_store.clone(),
959        )
960        .flat_format(true)
961        .predicate(Some(Predicate::new(preds)))
962        .inverted_index_appliers([inverted_index_applier.clone(), None])
963        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
964        .cache(CacheStrategy::EnableAll(cache.clone()));
965
966        let mut metrics = ReaderMetrics::default();
967        let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
968        assert!(read_input.is_none());
969
970        assert_eq!(metrics.filter_metrics.rg_total, 4);
971        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
972        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
973        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
974        let cached = index_result_cache
975            .get(
976                bloom_filter_applier.unwrap().predicate_key(),
977                handle.file_id().file_id(),
978            )
979            .unwrap();
980        assert!(cached.contains_row_group(2));
981        assert!(cached.contains_row_group(3));
982        assert!(!cached.contains_row_group(0));
983        assert!(!cached.contains_row_group(1));
984
985        // Remove the pred of `ts`, continue to use the pred of `tag_1`
986        // to test if cache works.
987
988        // Data: ts tag_0 tag_1
989        // Data: 0-20 [a, d]
990        //       0-20 [b, d]
991        //       0-20 [c, d]
992        //       0-40 [c, f]
993        //    100-200 [c, h]
994        //
995        // Pred: tag_1 = "d"
996        //
997        // Row groups & rows pruning:
998        //
999        // Row Groups:
1000        // - bloom filter: filter out row groups 2..=3
1001        //
1002        // Rows:
1003        // - bloom filter: hit row group 0, hit 50 rows
1004        //                 hit row group 1, hit 10 rows
1005        let preds = vec![col("tag_1").eq(lit("d"))];
1006        let inverted_index_applier = build_inverted_index_applier(&preds);
1007        let bloom_filter_applier = build_bloom_filter_applier(&preds);
1008
1009        let builder = ParquetReaderBuilder::new(
1010            FILE_DIR.to_string(),
1011            PathType::Bare,
1012            handle.clone(),
1013            object_store.clone(),
1014        )
1015        .flat_format(true)
1016        .predicate(Some(Predicate::new(preds)))
1017        .inverted_index_appliers([inverted_index_applier.clone(), None])
1018        .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1019        .cache(CacheStrategy::EnableAll(cache.clone()));
1020
1021        let mut metrics = ReaderMetrics::default();
1022        let (context, selection) = builder
1023            .build_reader_input(&mut metrics)
1024            .await
1025            .unwrap()
1026            .unwrap();
1027        let mut reader = ParquetReader::new(Arc::new(context), selection)
1028            .await
1029            .unwrap();
1030        check_record_batch_reader_result(
1031            &mut reader,
1032            &[
1033                new_record_batch_by_range(&["a", "d"], 0, 20),
1034                new_record_batch_by_range(&["b", "d"], 0, 20),
1035                new_record_batch_by_range(&["c", "d"], 0, 10),
1036                new_record_batch_by_range(&["c", "d"], 10, 20),
1037            ],
1038        )
1039        .await;
1040
1041        assert_eq!(metrics.filter_metrics.rg_total, 4);
1042        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1043        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1044        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1045        let cached = index_result_cache
1046            .get(
1047                bloom_filter_applier.unwrap().predicate_key(),
1048                handle.file_id().file_id(),
1049            )
1050            .unwrap();
1051        assert!(cached.contains_row_group(0));
1052        assert!(cached.contains_row_group(1));
1053        assert!(cached.contains_row_group(2));
1054        assert!(cached.contains_row_group(3));
1055    }
1056
1057    fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1058        assert!(end >= start);
1059        let metadata = build_test_binary_test_region_metadata();
1060        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1061
1062        let num_rows = end - start;
1063        let mut columns = Vec::new();
1064
1065        let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1066        for _ in 0..num_rows {
1067            tag_0_builder.append_value(tags[0]);
1068        }
1069        columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1070
1071        let values = (0..num_rows)
1072            .map(|_| "some data".as_bytes())
1073            .collect::<Vec<_>>();
1074        columns.push(
1075            Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1076                values,
1077            )) as ArrayRef,
1078        );
1079
1080        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1081        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1082
1083        let pk = new_primary_key(tags);
1084        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1085        for _ in 0..num_rows {
1086            pk_builder.append(&pk).unwrap();
1087        }
1088        columns.push(Arc::new(pk_builder.finish()));
1089
1090        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1091        columns.push(Arc::new(UInt8Array::from_value(
1092            OpType::Put as u8,
1093            num_rows,
1094        )));
1095
1096        RecordBatch::try_new(flat_schema, columns).unwrap()
1097    }
1098
1099    async fn check_record_batch_reader_result(
1100        reader: &mut ParquetReader,
1101        expected: &[RecordBatch],
1102    ) {
1103        let mut actual = Vec::new();
1104        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1105            actual.push(batch);
1106        }
1107        assert_eq!(
1108            pretty_format_batches(expected).unwrap().to_string(),
1109            pretty_format_batches(&actual).unwrap().to_string()
1110        );
1111        assert!(reader.next_record_batch().await.unwrap().is_none());
1112    }
1113
1114    /// Creates a flat format RecordBatch for testing with sparse primary key encoding.
1115    /// Similar to `new_record_batch_by_range` but without individual primary key columns.
1116    fn new_record_batch_by_range_sparse(
1117        tags: &[&str],
1118        start: usize,
1119        end: usize,
1120        metadata: &Arc<RegionMetadata>,
1121    ) -> RecordBatch {
1122        assert!(end >= start);
1123        let flat_schema = to_flat_sst_arrow_schema(
1124            metadata,
1125            &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1126        );
1127
1128        let num_rows = end - start;
1129        let mut columns: Vec<ArrayRef> = Vec::new();
1130
1131        // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
1132
1133        // Add field column (field_0)
1134        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1135        columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1136
1137        // Add time index column (ts)
1138        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1139        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1140
1141        // Add encoded primary key column using sparse encoding
1142        let table_id = 1u32; // Test table ID
1143        let tsid = 100u64; // Base TSID
1144        let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1145
1146        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1147        for _ in 0..num_rows {
1148            pk_builder.append(&pk).unwrap();
1149        }
1150        columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1151
1152        // Add sequence column
1153        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1154
1155        // Add op_type column
1156        columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1157
1158        RecordBatch::try_new(flat_schema, columns).unwrap()
1159    }
1160
1161    /// Helper function to create IndexerBuilderImpl for tests.
1162    fn create_test_indexer_builder(
1163        env: &TestEnv,
1164        object_store: ObjectStore,
1165        file_path: RegionFilePathFactory,
1166        metadata: Arc<RegionMetadata>,
1167        row_group_size: usize,
1168    ) -> IndexerBuilderImpl {
1169        let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1170        let intermediate_manager = env.get_intermediate_manager();
1171
1172        IndexerBuilderImpl {
1173            build_type: IndexBuildType::Flush,
1174            metadata,
1175            row_group_size,
1176            puffin_manager,
1177            write_cache_enabled: false,
1178            intermediate_manager,
1179            index_options: IndexOptions {
1180                inverted_index: InvertedIndexOptions {
1181                    segment_row_count: 1,
1182                    ..Default::default()
1183                },
1184            },
1185            inverted_index_config: Default::default(),
1186            fulltext_index_config: Default::default(),
1187            bloom_filter_index_config: Default::default(),
1188            #[cfg(feature = "vector_index")]
1189            vector_index_config: Default::default(),
1190        }
1191    }
1192
1193    /// Helper function to write flat SST and return SstInfo.
1194    async fn write_flat_sst(
1195        object_store: ObjectStore,
1196        metadata: Arc<RegionMetadata>,
1197        indexer_builder: IndexerBuilderImpl,
1198        file_path: RegionFilePathFactory,
1199        flat_source: FlatSource,
1200        write_opts: &WriteOptions,
1201    ) -> SstInfo {
1202        let mut metrics = Metrics::new(WriteType::Flush);
1203        let mut writer = ParquetWriter::new_with_object_store(
1204            object_store,
1205            metadata,
1206            IndexConfig::default(),
1207            indexer_builder,
1208            file_path,
1209            &mut metrics,
1210        )
1211        .await;
1212
1213        writer
1214            .write_all_flat(flat_source, None, write_opts)
1215            .await
1216            .unwrap()
1217            .remove(0)
1218    }
1219
1220    /// Helper function to create FileHandle from SstInfo.
1221    fn create_file_handle_from_sst_info(
1222        info: &SstInfo,
1223        metadata: &Arc<RegionMetadata>,
1224    ) -> FileHandle {
1225        FileHandle::new(
1226            FileMeta {
1227                region_id: metadata.region_id,
1228                file_id: info.file_id,
1229                time_range: info.time_range,
1230                level: 0,
1231                file_size: info.file_size,
1232                max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1233                available_indexes: info.index_metadata.build_available_indexes(),
1234                indexes: info.index_metadata.build_indexes(),
1235                index_file_size: info.index_metadata.file_size,
1236                index_version: 0,
1237                num_row_groups: info.num_row_groups,
1238                num_rows: info.num_rows as u64,
1239                sequence: None,
1240                partition_expr: match &metadata.partition_expr {
1241                    Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1242                        .expect("partition expression should be valid JSON"),
1243                    None => None,
1244                },
1245                num_series: 0,
1246            },
1247            Arc::new(NoopFilePurger),
1248        )
1249    }
1250
1251    /// Helper function to create test cache with standard settings.
1252    fn create_test_cache() -> Arc<CacheManager> {
1253        Arc::new(
1254            CacheManager::builder()
1255                .index_result_cache_size(1024 * 1024)
1256                .index_metadata_size(1024 * 1024)
1257                .index_content_page_size(1024 * 1024)
1258                .index_content_size(1024 * 1024)
1259                .puffin_metadata_size(1024 * 1024)
1260                .build(),
1261        )
1262    }
1263
1264    #[tokio::test]
1265    async fn test_write_flat_with_index() {
1266        let mut env = TestEnv::new().await;
1267        let object_store = env.init_object_store_manager();
1268        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1269        let metadata = Arc::new(sst_region_metadata());
1270        let row_group_size = 50;
1271
1272        // Create flat format RecordBatches
1273        let flat_batches = vec![
1274            new_record_batch_by_range(&["a", "d"], 0, 20),
1275            new_record_batch_by_range(&["b", "d"], 0, 20),
1276            new_record_batch_by_range(&["c", "d"], 0, 20),
1277            new_record_batch_by_range(&["c", "f"], 0, 40),
1278            new_record_batch_by_range(&["c", "h"], 100, 200),
1279        ];
1280
1281        let flat_source = new_flat_source_from_record_batches(flat_batches);
1282
1283        let write_opts = WriteOptions {
1284            row_group_size,
1285            ..Default::default()
1286        };
1287
1288        let puffin_manager = env
1289            .get_puffin_manager()
1290            .build(object_store.clone(), file_path.clone());
1291        let intermediate_manager = env.get_intermediate_manager();
1292
1293        let indexer_builder = IndexerBuilderImpl {
1294            build_type: IndexBuildType::Flush,
1295            metadata: metadata.clone(),
1296            row_group_size,
1297            puffin_manager,
1298            write_cache_enabled: false,
1299            intermediate_manager,
1300            index_options: IndexOptions {
1301                inverted_index: InvertedIndexOptions {
1302                    segment_row_count: 1,
1303                    ..Default::default()
1304                },
1305            },
1306            inverted_index_config: Default::default(),
1307            fulltext_index_config: Default::default(),
1308            bloom_filter_index_config: Default::default(),
1309            #[cfg(feature = "vector_index")]
1310            vector_index_config: Default::default(),
1311        };
1312
1313        let mut metrics = Metrics::new(WriteType::Flush);
1314        let mut writer = ParquetWriter::new_with_object_store(
1315            object_store.clone(),
1316            metadata.clone(),
1317            IndexConfig::default(),
1318            indexer_builder,
1319            file_path.clone(),
1320            &mut metrics,
1321        )
1322        .await;
1323
1324        let info = writer
1325            .write_all_flat(flat_source, None, &write_opts)
1326            .await
1327            .unwrap()
1328            .remove(0);
1329        assert_eq!(200, info.num_rows);
1330        assert!(info.file_size > 0);
1331        assert!(info.index_metadata.file_size > 0);
1332
1333        assert!(info.index_metadata.inverted_index.index_size > 0);
1334        assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1335        assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1336
1337        assert!(info.index_metadata.bloom_filter.index_size > 0);
1338        assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1339        assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1340
1341        assert_eq!(
1342            (
1343                Timestamp::new_millisecond(0),
1344                Timestamp::new_millisecond(199)
1345            ),
1346            info.time_range
1347        );
1348    }
1349
1350    #[tokio::test]
1351    async fn test_read_with_override_sequence() {
1352        let mut env = TestEnv::new().await;
1353        let object_store = env.init_object_store_manager();
1354        let handle = sst_file_handle(0, 1000);
1355        let file_path = FixedPathProvider {
1356            region_file_id: handle.file_id(),
1357        };
1358        let metadata = Arc::new(sst_region_metadata());
1359
1360        // Create batches with sequence 0 to trigger override functionality.
1361        let source = new_flat_source_from_record_batches(vec![
1362            new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1363            new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1364        ]);
1365
1366        let write_opts = WriteOptions {
1367            row_group_size: 50,
1368            ..Default::default()
1369        };
1370
1371        let mut metrics = Metrics::new(WriteType::Flush);
1372        let mut writer = ParquetWriter::new_with_object_store(
1373            object_store.clone(),
1374            metadata.clone(),
1375            IndexConfig::default(),
1376            NoopIndexBuilder,
1377            file_path,
1378            &mut metrics,
1379        )
1380        .await;
1381
1382        writer
1383            .write_all_flat_as_primary_key(source, None, &write_opts)
1384            .await
1385            .unwrap()
1386            .remove(0);
1387
1388        // Read without override sequence (should read sequence 0)
1389        let builder = ParquetReaderBuilder::new(
1390            FILE_DIR.to_string(),
1391            PathType::Bare,
1392            handle.clone(),
1393            object_store.clone(),
1394        );
1395        let mut reader = builder.build().await.unwrap().unwrap();
1396        let mut normal_batches = Vec::new();
1397        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1398            normal_batches.push(batch);
1399        }
1400
1401        // Read with override sequence using FileMeta.sequence
1402        let custom_sequence = 12345u64;
1403        let file_meta = handle.meta_ref();
1404        let mut override_file_meta = file_meta.clone();
1405        override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1406        let override_handle = FileHandle::new(
1407            override_file_meta,
1408            Arc::new(crate::sst::file_purger::NoopFilePurger),
1409        );
1410
1411        let builder = ParquetReaderBuilder::new(
1412            FILE_DIR.to_string(),
1413            PathType::Bare,
1414            override_handle,
1415            object_store.clone(),
1416        );
1417        let mut reader = builder.build().await.unwrap().unwrap();
1418        let mut override_batches = Vec::new();
1419        while let Some(batch) = reader.next_record_batch().await.unwrap() {
1420            override_batches.push(batch);
1421        }
1422
1423        // Compare the results
1424        assert_eq!(normal_batches.len(), override_batches.len());
1425        for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1426            let expected_batch = {
1427                let mut columns = normal.columns().to_vec();
1428                let num_cols = columns.len();
1429                columns[num_cols - 2] =
1430                    Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1431                RecordBatch::try_new(normal.schema(), columns).unwrap()
1432            };
1433
1434            // Override batch should match expected batch
1435            assert_eq!(*override_batch, expected_batch);
1436        }
1437    }
1438
1439    #[tokio::test]
1440    async fn test_write_flat_read_with_inverted_index() {
1441        let mut env = TestEnv::new().await;
1442        let object_store = env.init_object_store_manager();
1443        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1444        let metadata = Arc::new(sst_region_metadata());
1445        let row_group_size = 100;
1446
1447        // Create flat format RecordBatches with non-overlapping timestamp ranges
1448        // Each batch becomes one row group (row_group_size = 100)
1449        // Data: ts tag_0 tag_1
1450        // RG 0:   0-50  [a, d]
1451        // RG 0:  50-100 [b, d]
1452        // RG 1: 100-150 [c, d]
1453        // RG 1: 150-200 [c, f]
1454        let flat_batches = vec![
1455            new_record_batch_by_range(&["a", "d"], 0, 50),
1456            new_record_batch_by_range(&["b", "d"], 50, 100),
1457            new_record_batch_by_range(&["c", "d"], 100, 150),
1458            new_record_batch_by_range(&["c", "f"], 150, 200),
1459        ];
1460
1461        let flat_source = new_flat_source_from_record_batches(flat_batches);
1462
1463        let write_opts = WriteOptions {
1464            row_group_size,
1465            ..Default::default()
1466        };
1467
1468        let indexer_builder = create_test_indexer_builder(
1469            &env,
1470            object_store.clone(),
1471            file_path.clone(),
1472            metadata.clone(),
1473            row_group_size,
1474        );
1475
1476        let info = write_flat_sst(
1477            object_store.clone(),
1478            metadata.clone(),
1479            indexer_builder,
1480            file_path.clone(),
1481            flat_source,
1482            &write_opts,
1483        )
1484        .await;
1485        assert_eq!(200, info.num_rows);
1486        assert!(info.file_size > 0);
1487        assert!(info.index_metadata.file_size > 0);
1488
1489        let handle = create_file_handle_from_sst_info(&info, &metadata);
1490
1491        let cache = create_test_cache();
1492
1493        // Test 1: Filter by tag_0 = "b"
1494        // Expected: Only rows with tag_0="b"
1495        let preds = vec![col("tag_0").eq(lit("b"))];
1496        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1497            FILE_DIR.to_string(),
1498            PathType::Bare,
1499            object_store.clone(),
1500            &metadata,
1501            HashSet::from_iter([0]),
1502            env.get_puffin_manager(),
1503        )
1504        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1505        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1506        .build(&preds)
1507        .unwrap()
1508        .map(Arc::new);
1509
1510        let builder = ParquetReaderBuilder::new(
1511            FILE_DIR.to_string(),
1512            PathType::Bare,
1513            handle.clone(),
1514            object_store.clone(),
1515        )
1516        .flat_format(true)
1517        .predicate(Some(Predicate::new(preds)))
1518        .inverted_index_appliers([inverted_index_applier.clone(), None])
1519        .cache(CacheStrategy::EnableAll(cache.clone()));
1520
1521        let mut metrics = ReaderMetrics::default();
1522        let (_context, selection) = builder
1523            .build_reader_input(&mut metrics)
1524            .await
1525            .unwrap()
1526            .unwrap();
1527
1528        // Verify selection contains only RG 0 (tag_0="b", ts 0-100)
1529        assert_eq!(selection.row_group_count(), 1);
1530        assert_eq!(50, selection.get(0).unwrap().row_count());
1531
1532        // Verify filtering metrics
1533        assert_eq!(metrics.filter_metrics.rg_total, 2);
1534        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1535        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1536        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1537    }
1538
1539    #[tokio::test]
1540    async fn test_write_flat_read_with_bloom_filter() {
1541        let mut env = TestEnv::new().await;
1542        let object_store = env.init_object_store_manager();
1543        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1544        let metadata = Arc::new(sst_region_metadata());
1545        let row_group_size = 100;
1546
1547        // Create flat format RecordBatches with non-overlapping timestamp ranges
1548        // Each batch becomes one row group (row_group_size = 100)
1549        // Data: ts tag_0 tag_1
1550        // RG 0:   0-50  [a, d]
1551        // RG 0:  50-100 [b, e]
1552        // RG 1: 100-150 [c, d]
1553        // RG 1: 150-200 [c, f]
1554        let flat_batches = vec![
1555            new_record_batch_by_range(&["a", "d"], 0, 50),
1556            new_record_batch_by_range(&["b", "e"], 50, 100),
1557            new_record_batch_by_range(&["c", "d"], 100, 150),
1558            new_record_batch_by_range(&["c", "f"], 150, 200),
1559        ];
1560
1561        let flat_source = new_flat_source_from_record_batches(flat_batches);
1562
1563        let write_opts = WriteOptions {
1564            row_group_size,
1565            ..Default::default()
1566        };
1567
1568        let indexer_builder = create_test_indexer_builder(
1569            &env,
1570            object_store.clone(),
1571            file_path.clone(),
1572            metadata.clone(),
1573            row_group_size,
1574        );
1575
1576        let info = write_flat_sst(
1577            object_store.clone(),
1578            metadata.clone(),
1579            indexer_builder,
1580            file_path.clone(),
1581            flat_source,
1582            &write_opts,
1583        )
1584        .await;
1585        assert_eq!(200, info.num_rows);
1586        assert!(info.file_size > 0);
1587        assert!(info.index_metadata.file_size > 0);
1588
1589        let handle = create_file_handle_from_sst_info(&info, &metadata);
1590
1591        let cache = create_test_cache();
1592
1593        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1594        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1595        let preds = vec![
1596            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1597            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1598            col("tag_1").eq(lit("d")),
1599        ];
1600        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1601            FILE_DIR.to_string(),
1602            PathType::Bare,
1603            object_store.clone(),
1604            &metadata,
1605            env.get_puffin_manager(),
1606        )
1607        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1608        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1609        .build(&preds)
1610        .unwrap()
1611        .map(Arc::new);
1612
1613        let builder = ParquetReaderBuilder::new(
1614            FILE_DIR.to_string(),
1615            PathType::Bare,
1616            handle.clone(),
1617            object_store.clone(),
1618        )
1619        .flat_format(true)
1620        .predicate(Some(Predicate::new(preds)))
1621        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1622        .cache(CacheStrategy::EnableAll(cache.clone()));
1623
1624        let mut metrics = ReaderMetrics::default();
1625        let (_context, selection) = builder
1626            .build_reader_input(&mut metrics)
1627            .await
1628            .unwrap()
1629            .unwrap();
1630
1631        // Verify selection contains RG 0 and RG 1
1632        assert_eq!(selection.row_group_count(), 2);
1633        assert_eq!(50, selection.get(0).unwrap().row_count());
1634        assert_eq!(50, selection.get(1).unwrap().row_count());
1635
1636        // Verify filtering metrics
1637        assert_eq!(metrics.filter_metrics.rg_total, 2);
1638        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1639        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1640        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1641    }
1642
1643    #[tokio::test]
1644    async fn test_write_flat_read_with_inverted_index_sparse() {
1645        common_telemetry::init_default_ut_logging();
1646
1647        let mut env = TestEnv::new().await;
1648        let object_store = env.init_object_store_manager();
1649        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1650        let metadata = Arc::new(sst_region_metadata_with_encoding(
1651            PrimaryKeyEncoding::Sparse,
1652        ));
1653        let row_group_size = 100;
1654
1655        // Create flat format RecordBatches with non-overlapping timestamp ranges
1656        // Each batch becomes one row group (row_group_size = 100)
1657        // Data: ts tag_0 tag_1
1658        // RG 0:   0-50  [a, d]
1659        // RG 0:  50-100 [b, d]
1660        // RG 1: 100-150 [c, d]
1661        // RG 1: 150-200 [c, f]
1662        let flat_batches = vec![
1663            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1664            new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1665            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1666            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1667        ];
1668
1669        let flat_source = new_flat_source_from_record_batches(flat_batches);
1670
1671        let write_opts = WriteOptions {
1672            row_group_size,
1673            ..Default::default()
1674        };
1675
1676        let indexer_builder = create_test_indexer_builder(
1677            &env,
1678            object_store.clone(),
1679            file_path.clone(),
1680            metadata.clone(),
1681            row_group_size,
1682        );
1683
1684        let info = write_flat_sst(
1685            object_store.clone(),
1686            metadata.clone(),
1687            indexer_builder,
1688            file_path.clone(),
1689            flat_source,
1690            &write_opts,
1691        )
1692        .await;
1693        assert_eq!(200, info.num_rows);
1694        assert!(info.file_size > 0);
1695        assert!(info.index_metadata.file_size > 0);
1696
1697        let handle = create_file_handle_from_sst_info(&info, &metadata);
1698
1699        let cache = create_test_cache();
1700
1701        // Test 1: Filter by tag_0 = "b"
1702        // Expected: Only rows with tag_0="b"
1703        let preds = vec![col("tag_0").eq(lit("b"))];
1704        let inverted_index_applier = InvertedIndexApplierBuilder::new(
1705            FILE_DIR.to_string(),
1706            PathType::Bare,
1707            object_store.clone(),
1708            &metadata,
1709            HashSet::from_iter([0]),
1710            env.get_puffin_manager(),
1711        )
1712        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1713        .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1714        .build(&preds)
1715        .unwrap()
1716        .map(Arc::new);
1717
1718        let builder = ParquetReaderBuilder::new(
1719            FILE_DIR.to_string(),
1720            PathType::Bare,
1721            handle.clone(),
1722            object_store.clone(),
1723        )
1724        .flat_format(true)
1725        .predicate(Some(Predicate::new(preds)))
1726        .inverted_index_appliers([inverted_index_applier.clone(), None])
1727        .cache(CacheStrategy::EnableAll(cache.clone()));
1728
1729        let mut metrics = ReaderMetrics::default();
1730        let (_context, selection) = builder
1731            .build_reader_input(&mut metrics)
1732            .await
1733            .unwrap()
1734            .unwrap();
1735
1736        // RG 0 has 50 matching rows (tag_0="b")
1737        assert_eq!(selection.row_group_count(), 1);
1738        assert_eq!(50, selection.get(0).unwrap().row_count());
1739
1740        // Verify filtering metrics
1741        // Note: With sparse encoding, tag columns aren't stored separately,
1742        // so minmax filtering on tags doesn't work (only inverted index)
1743        assert_eq!(metrics.filter_metrics.rg_total, 2);
1744        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
1745        assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1746        assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1747    }
1748
1749    #[tokio::test]
1750    async fn test_write_flat_read_with_bloom_filter_sparse() {
1751        let mut env = TestEnv::new().await;
1752        let object_store = env.init_object_store_manager();
1753        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1754        let metadata = Arc::new(sst_region_metadata_with_encoding(
1755            PrimaryKeyEncoding::Sparse,
1756        ));
1757        let row_group_size = 100;
1758
1759        // Create flat format RecordBatches with non-overlapping timestamp ranges
1760        // Each batch becomes one row group (row_group_size = 100)
1761        // Data: ts tag_0 tag_1
1762        // RG 0:   0-50  [a, d]
1763        // RG 0:  50-100 [b, e]
1764        // RG 1: 100-150 [c, d]
1765        // RG 1: 150-200 [c, f]
1766        let flat_batches = vec![
1767            new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1768            new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1769            new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1770            new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1771        ];
1772
1773        let flat_source = new_flat_source_from_record_batches(flat_batches);
1774
1775        let write_opts = WriteOptions {
1776            row_group_size,
1777            ..Default::default()
1778        };
1779
1780        let indexer_builder = create_test_indexer_builder(
1781            &env,
1782            object_store.clone(),
1783            file_path.clone(),
1784            metadata.clone(),
1785            row_group_size,
1786        );
1787
1788        let info = write_flat_sst(
1789            object_store.clone(),
1790            metadata.clone(),
1791            indexer_builder,
1792            file_path.clone(),
1793            flat_source,
1794            &write_opts,
1795        )
1796        .await;
1797        assert_eq!(200, info.num_rows);
1798        assert!(info.file_size > 0);
1799        assert!(info.index_metadata.file_size > 0);
1800
1801        let handle = create_file_handle_from_sst_info(&info, &metadata);
1802
1803        let cache = create_test_cache();
1804
1805        // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
1806        // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
1807        let preds = vec![
1808            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1809            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1810            col("tag_1").eq(lit("d")),
1811        ];
1812        let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1813            FILE_DIR.to_string(),
1814            PathType::Bare,
1815            object_store.clone(),
1816            &metadata,
1817            env.get_puffin_manager(),
1818        )
1819        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1820        .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1821        .build(&preds)
1822        .unwrap()
1823        .map(Arc::new);
1824
1825        let builder = ParquetReaderBuilder::new(
1826            FILE_DIR.to_string(),
1827            PathType::Bare,
1828            handle.clone(),
1829            object_store.clone(),
1830        )
1831        .flat_format(true)
1832        .predicate(Some(Predicate::new(preds)))
1833        .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1834        .cache(CacheStrategy::EnableAll(cache.clone()));
1835
1836        let mut metrics = ReaderMetrics::default();
1837        let (_context, selection) = builder
1838            .build_reader_input(&mut metrics)
1839            .await
1840            .unwrap()
1841            .unwrap();
1842
1843        // Verify selection contains RG 0 and RG 1
1844        assert_eq!(selection.row_group_count(), 2);
1845        assert_eq!(50, selection.get(0).unwrap().row_count());
1846        assert_eq!(50, selection.get(1).unwrap().row_count());
1847
1848        // Verify filtering metrics
1849        assert_eq!(metrics.filter_metrics.rg_total, 2);
1850        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1851        assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1852        assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1853    }
1854
1855    /// Creates region metadata for testing fulltext indexes.
1856    /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
1857    fn fulltext_region_metadata() -> RegionMetadata {
1858        let mut builder = RegionMetadataBuilder::new(REGION_ID);
1859        builder
1860            .push_column_metadata(ColumnMetadata {
1861                column_schema: ColumnSchema::new(
1862                    "tag_0".to_string(),
1863                    ConcreteDataType::string_datatype(),
1864                    true,
1865                ),
1866                semantic_type: SemanticType::Tag,
1867                column_id: 0,
1868            })
1869            .push_column_metadata(ColumnMetadata {
1870                column_schema: ColumnSchema::new(
1871                    "text_bloom".to_string(),
1872                    ConcreteDataType::string_datatype(),
1873                    true,
1874                )
1875                .with_fulltext_options(FulltextOptions {
1876                    enable: true,
1877                    analyzer: FulltextAnalyzer::English,
1878                    case_sensitive: false,
1879                    backend: FulltextBackend::Bloom,
1880                    granularity: 1,
1881                    false_positive_rate_in_10000: 50,
1882                })
1883                .unwrap(),
1884                semantic_type: SemanticType::Field,
1885                column_id: 1,
1886            })
1887            .push_column_metadata(ColumnMetadata {
1888                column_schema: ColumnSchema::new(
1889                    "text_tantivy".to_string(),
1890                    ConcreteDataType::string_datatype(),
1891                    true,
1892                )
1893                .with_fulltext_options(FulltextOptions {
1894                    enable: true,
1895                    analyzer: FulltextAnalyzer::English,
1896                    case_sensitive: false,
1897                    backend: FulltextBackend::Tantivy,
1898                    granularity: 1,
1899                    false_positive_rate_in_10000: 50,
1900                })
1901                .unwrap(),
1902                semantic_type: SemanticType::Field,
1903                column_id: 2,
1904            })
1905            .push_column_metadata(ColumnMetadata {
1906                column_schema: ColumnSchema::new(
1907                    "field_0".to_string(),
1908                    ConcreteDataType::uint64_datatype(),
1909                    true,
1910                ),
1911                semantic_type: SemanticType::Field,
1912                column_id: 3,
1913            })
1914            .push_column_metadata(ColumnMetadata {
1915                column_schema: ColumnSchema::new(
1916                    "ts".to_string(),
1917                    ConcreteDataType::timestamp_millisecond_datatype(),
1918                    false,
1919                ),
1920                semantic_type: SemanticType::Timestamp,
1921                column_id: 4,
1922            })
1923            .primary_key(vec![0]);
1924        builder.build().unwrap()
1925    }
1926
1927    /// Creates a flat format RecordBatch with string fields for fulltext testing.
1928    fn new_fulltext_record_batch_by_range(
1929        tag: &str,
1930        text_bloom: &str,
1931        text_tantivy: &str,
1932        start: usize,
1933        end: usize,
1934    ) -> RecordBatch {
1935        assert!(end >= start);
1936        let metadata = Arc::new(fulltext_region_metadata());
1937        let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1938
1939        let num_rows = end - start;
1940        let mut columns = Vec::new();
1941
1942        // Add primary key column (tag_0) as dictionary array
1943        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1944        for _ in 0..num_rows {
1945            tag_builder.append_value(tag);
1946        }
1947        columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1948
1949        // Add text_bloom field (fulltext with bloom backend)
1950        let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1951        columns.push(Arc::new(StringArray::from(text_bloom_values)));
1952
1953        // Add text_tantivy field (fulltext with tantivy backend)
1954        let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1955        columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1956
1957        // Add field column (field_0)
1958        let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1959        columns.push(Arc::new(UInt64Array::from(field_values)));
1960
1961        // Add time index column (ts)
1962        let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1963        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1964
1965        // Add encoded primary key column
1966        let pk = new_primary_key(&[tag]);
1967        let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1968        for _ in 0..num_rows {
1969            pk_builder.append(&pk).unwrap();
1970        }
1971        columns.push(Arc::new(pk_builder.finish()));
1972
1973        // Add sequence column
1974        columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1975
1976        // Add op_type column
1977        columns.push(Arc::new(UInt8Array::from_value(
1978            OpType::Put as u8,
1979            num_rows,
1980        )));
1981
1982        RecordBatch::try_new(flat_schema, columns).unwrap()
1983    }
1984
1985    #[tokio::test]
1986    async fn test_write_flat_read_with_fulltext_index() {
1987        let mut env = TestEnv::new().await;
1988        let object_store = env.init_object_store_manager();
1989        let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1990        let metadata = Arc::new(fulltext_region_metadata());
1991        let row_group_size = 50;
1992
1993        // Create flat format RecordBatches with different text content
1994        // RG 0:   0-50  tag="a", bloom="hello world", tantivy="quick brown fox"
1995        // RG 1:  50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
1996        // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
1997        // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
1998        let flat_batches = vec![
1999            new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2000            new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2001            new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2002            new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2003        ];
2004
2005        let flat_source = new_flat_source_from_record_batches(flat_batches);
2006
2007        let write_opts = WriteOptions {
2008            row_group_size,
2009            ..Default::default()
2010        };
2011
2012        let indexer_builder = create_test_indexer_builder(
2013            &env,
2014            object_store.clone(),
2015            file_path.clone(),
2016            metadata.clone(),
2017            row_group_size,
2018        );
2019
2020        let mut info = write_flat_sst(
2021            object_store.clone(),
2022            metadata.clone(),
2023            indexer_builder,
2024            file_path.clone(),
2025            flat_source,
2026            &write_opts,
2027        )
2028        .await;
2029        assert_eq!(200, info.num_rows);
2030        assert!(info.file_size > 0);
2031        assert!(info.index_metadata.file_size > 0);
2032
2033        // Verify fulltext indexes were created
2034        assert!(info.index_metadata.fulltext_index.index_size > 0);
2035        assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2036        // text_bloom (column_id 1) and text_tantivy (column_id 2)
2037        info.index_metadata.fulltext_index.columns.sort_unstable();
2038        assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2039
2040        assert_eq!(
2041            (
2042                Timestamp::new_millisecond(0),
2043                Timestamp::new_millisecond(199)
2044            ),
2045            info.time_range
2046        );
2047
2048        let handle = create_file_handle_from_sst_info(&info, &metadata);
2049
2050        let cache = create_test_cache();
2051
2052        // Helper functions to create fulltext function expressions
2053        let matches_func = || {
2054            Arc::new(
2055                ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2056                    .provide(Default::default()),
2057            )
2058        };
2059
2060        let matches_term_func = || {
2061            Arc::new(
2062                ScalarFunctionFactory::from(
2063                    Arc::new(MatchesTermFunction::default()) as FunctionRef,
2064                )
2065                .provide(Default::default()),
2066            )
2067        };
2068
2069        // Test 1: Filter by text_bloom field using matches_term (bloom backend)
2070        // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
2071        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2072            args: vec![col("text_bloom"), "hello".lit()],
2073            func: matches_term_func(),
2074        })];
2075
2076        let fulltext_applier = FulltextIndexApplierBuilder::new(
2077            FILE_DIR.to_string(),
2078            PathType::Bare,
2079            object_store.clone(),
2080            env.get_puffin_manager(),
2081            &metadata,
2082        )
2083        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2084        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2085        .build(&preds)
2086        .unwrap()
2087        .map(Arc::new);
2088
2089        let builder = ParquetReaderBuilder::new(
2090            FILE_DIR.to_string(),
2091            PathType::Bare,
2092            handle.clone(),
2093            object_store.clone(),
2094        )
2095        .flat_format(true)
2096        .predicate(Some(Predicate::new(preds)))
2097        .fulltext_index_appliers([None, fulltext_applier.clone()])
2098        .cache(CacheStrategy::EnableAll(cache.clone()));
2099
2100        let mut metrics = ReaderMetrics::default();
2101        let (_context, selection) = builder
2102            .build_reader_input(&mut metrics)
2103            .await
2104            .unwrap()
2105            .unwrap();
2106
2107        // Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
2108        assert_eq!(selection.row_group_count(), 2);
2109        assert_eq!(50, selection.get(0).unwrap().row_count());
2110        assert_eq!(50, selection.get(1).unwrap().row_count());
2111
2112        // Verify filtering metrics
2113        assert_eq!(metrics.filter_metrics.rg_total, 4);
2114        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2115        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2116        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2117
2118        // Test 2: Filter by text_tantivy field using matches (tantivy backend)
2119        // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
2120        let preds = vec![Expr::ScalarFunction(ScalarFunction {
2121            args: vec![col("text_tantivy"), "lazy".lit()],
2122            func: matches_func(),
2123        })];
2124
2125        let fulltext_applier = FulltextIndexApplierBuilder::new(
2126            FILE_DIR.to_string(),
2127            PathType::Bare,
2128            object_store.clone(),
2129            env.get_puffin_manager(),
2130            &metadata,
2131        )
2132        .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2133        .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2134        .build(&preds)
2135        .unwrap()
2136        .map(Arc::new);
2137
2138        let builder = ParquetReaderBuilder::new(
2139            FILE_DIR.to_string(),
2140            PathType::Bare,
2141            handle.clone(),
2142            object_store.clone(),
2143        )
2144        .flat_format(true)
2145        .predicate(Some(Predicate::new(preds)))
2146        .fulltext_index_appliers([None, fulltext_applier.clone()])
2147        .cache(CacheStrategy::EnableAll(cache.clone()));
2148
2149        let mut metrics = ReaderMetrics::default();
2150        let (_context, selection) = builder
2151            .build_reader_input(&mut metrics)
2152            .await
2153            .unwrap()
2154            .unwrap();
2155
2156        // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
2157        assert_eq!(selection.row_group_count(), 2);
2158        assert_eq!(50, selection.get(2).unwrap().row_count());
2159        assert_eq!(50, selection.get(3).unwrap().row_count());
2160
2161        // Verify filtering metrics
2162        assert_eq!(metrics.filter_metrics.rg_total, 4);
2163        assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2164        assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2165        assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2166    }
2167}