1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod async_reader;
28pub mod file_range;
29pub mod flat_format;
30pub mod format;
31pub(crate) mod helper;
32pub(crate) mod metadata;
33pub mod prefilter;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57#[derive(Debug, Clone)]
59pub struct WriteOptions {
60 pub write_buffer_size: ReadableSize,
62 pub row_group_size: usize,
64 pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71 fn default() -> Self {
72 WriteOptions {
73 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74 row_group_size: DEFAULT_ROW_GROUP_SIZE,
75 max_file_size: None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct SstInfo {
83 pub file_id: FileId,
85 pub time_range: FileTimeRange,
88 pub file_size: u64,
90 pub max_row_group_uncompressed_size: u64,
92 pub num_rows: usize,
94 pub num_row_groups: u64,
96 pub file_metadata: Option<Arc<ParquetMetaData>>,
98 pub index_metadata: IndexOutput,
100 pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106 use std::collections::HashSet;
107 use std::sync::Arc;
108
109 use api::v1::{OpType, SemanticType};
110 use common_function::function::FunctionRef;
111 use common_function::function_factory::ScalarFunctionFactory;
112 use common_function::scalars::matches::MatchesFunction;
113 use common_function::scalars::matches_term::MatchesTermFunction;
114 use common_time::Timestamp;
115 use datafusion_common::{Column, ScalarValue};
116 use datafusion_expr::expr::ScalarFunction;
117 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118 use datatypes::arrow;
119 use datatypes::arrow::array::{
120 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121 TimestampMillisecondArray, UInt8Array, UInt64Array,
122 };
123 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124 use datatypes::arrow::util::pretty::pretty_format_batches;
125 use datatypes::prelude::ConcreteDataType;
126 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127 use object_store::ObjectStore;
128 use parquet::arrow::AsyncArrowWriter;
129 use parquet::basic::{Compression, Encoding, ZstdLevel};
130 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131 use parquet::file::properties::WriterProperties;
132 use store_api::codec::PrimaryKeyEncoding;
133 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134 use store_api::region_request::PathType;
135 use store_api::storage::{ColumnSchema, RegionId};
136 use table::predicate::Predicate;
137 use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139 use super::*;
140 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141 use crate::cache::test_util::assert_parquet_metadata_equal;
142 use crate::cache::{CacheManager, CacheStrategy, PageKey};
143 use crate::config::IndexConfig;
144 use crate::read::FlatSource;
145 use crate::region::options::{IndexOptions, InvertedIndexOptions};
146 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
147 use crate::sst::file_purger::NoopFilePurger;
148 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
149 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
150 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
151 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
152 use crate::sst::parquet::flat_format::FlatWriteFormat;
153 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
154 use crate::sst::parquet::row_selection::RowGroupSelection;
155 use crate::sst::parquet::writer::ParquetWriter;
156 use crate::sst::{
157 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
158 };
159 use crate::test_util::TestEnv;
160 use crate::test_util::sst_util::{
161 build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
162 new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
163 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
164 sst_region_metadata_with_encoding,
165 };
166
167 const FILE_DIR: &str = "/";
168 const REGION_ID: RegionId = RegionId::new(0, 0);
169
170 #[derive(Clone)]
171 struct FixedPathProvider {
172 region_file_id: RegionFileId,
173 }
174
175 impl FilePathProvider for FixedPathProvider {
176 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
177 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
178 }
179
180 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
181 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
182 }
183
184 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
185 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
186 }
187 }
188
189 struct NoopIndexBuilder;
190
191 #[async_trait::async_trait]
192 impl IndexerBuilder for NoopIndexBuilder {
193 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
194 Indexer::default()
195 }
196 }
197
198 #[tokio::test]
199 async fn test_write_read() {
200 let mut env = TestEnv::new().await;
201 let object_store = env.init_object_store_manager();
202 let handle = sst_file_handle(0, 1000);
203 let file_path = FixedPathProvider {
204 region_file_id: handle.file_id(),
205 };
206 let metadata = Arc::new(sst_region_metadata());
207 let source = new_flat_source_from_record_batches(vec![
208 new_record_batch_by_range(&["a", "d"], 0, 60),
209 new_record_batch_by_range(&["b", "f"], 0, 40),
210 new_record_batch_by_range(&["b", "h"], 100, 200),
211 ]);
212 let write_opts = WriteOptions {
214 row_group_size: 50,
215 ..Default::default()
216 };
217
218 let mut metrics = Metrics::new(WriteType::Flush);
219 let mut writer = ParquetWriter::new_with_object_store(
220 object_store.clone(),
221 metadata.clone(),
222 IndexConfig::default(),
223 NoopIndexBuilder,
224 file_path,
225 &mut metrics,
226 )
227 .await;
228
229 let info = writer
230 .write_all_flat_as_primary_key(source, None, &write_opts)
231 .await
232 .unwrap()
233 .remove(0);
234 assert_eq!(200, info.num_rows);
235 assert!(info.file_size > 0);
236 assert_eq!(
237 (
238 Timestamp::new_millisecond(0),
239 Timestamp::new_millisecond(199)
240 ),
241 info.time_range
242 );
243
244 let builder = ParquetReaderBuilder::new(
245 FILE_DIR.to_string(),
246 PathType::Bare,
247 handle.clone(),
248 object_store,
249 );
250 let mut reader = builder.build().await.unwrap().unwrap();
251 check_record_batch_reader_result(
252 &mut reader,
253 &[
254 new_record_batch_by_range(&["a", "d"], 0, 50),
255 new_record_batch_by_range(&["a", "d"], 50, 60),
256 new_record_batch_by_range(&["b", "f"], 0, 40),
257 new_record_batch_by_range(&["b", "h"], 100, 150),
258 new_record_batch_by_range(&["b", "h"], 150, 200),
259 ],
260 )
261 .await;
262 }
263
264 #[tokio::test]
265 async fn test_read_with_cache() {
266 let mut env = TestEnv::new().await;
267 let object_store = env.init_object_store_manager();
268 let handle = sst_file_handle(0, 1000);
269 let metadata = Arc::new(sst_region_metadata());
270 let source = new_flat_source_from_record_batches(vec![
271 new_record_batch_by_range(&["a", "d"], 0, 60),
272 new_record_batch_by_range(&["b", "f"], 0, 40),
273 new_record_batch_by_range(&["b", "h"], 100, 200),
274 ]);
275 let write_opts = WriteOptions {
277 row_group_size: 50,
278 ..Default::default()
279 };
280 let mut metrics = Metrics::new(WriteType::Flush);
282 let mut writer = ParquetWriter::new_with_object_store(
283 object_store.clone(),
284 metadata.clone(),
285 IndexConfig::default(),
286 NoopIndexBuilder,
287 FixedPathProvider {
288 region_file_id: handle.file_id(),
289 },
290 &mut metrics,
291 )
292 .await;
293
294 let sst_info = writer
295 .write_all_flat_as_primary_key(source, None, &write_opts)
296 .await
297 .unwrap()
298 .remove(0);
299
300 let cache = CacheStrategy::EnableAll(Arc::new(
302 CacheManager::builder()
303 .page_cache_size(64 * 1024 * 1024)
304 .build(),
305 ));
306 let builder = ParquetReaderBuilder::new(
307 FILE_DIR.to_string(),
308 PathType::Bare,
309 handle.clone(),
310 object_store,
311 )
312 .cache(cache.clone());
313 for _ in 0..3 {
314 let mut reader = builder.build().await.unwrap().unwrap();
315 check_record_batch_reader_result(
316 &mut reader,
317 &[
318 new_record_batch_by_range(&["a", "d"], 0, 50),
319 new_record_batch_by_range(&["a", "d"], 50, 60),
320 new_record_batch_by_range(&["b", "f"], 0, 40),
321 new_record_batch_by_range(&["b", "h"], 100, 150),
322 new_record_batch_by_range(&["b", "h"], 150, 200),
323 ],
324 )
325 .await;
326 }
327
328 let parquet_meta = sst_info.file_metadata.unwrap();
329 let get_ranges = |row_group_idx: usize| {
330 let row_group = parquet_meta.row_group(row_group_idx);
331 let mut ranges = Vec::with_capacity(row_group.num_columns());
332 for i in 0..row_group.num_columns() {
333 let (start, length) = row_group.column(i).byte_range();
334 ranges.push(start..start + length);
335 }
336
337 ranges
338 };
339
340 for i in 0..4 {
342 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
343 assert!(cache.get_pages(&page_key).is_some());
344 }
345 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
346 assert!(cache.get_pages(&page_key).is_none());
347 }
348
349 #[tokio::test]
350 async fn test_parquet_metadata_eq() {
351 let mut env = crate::test_util::TestEnv::new().await;
353 let object_store = env.init_object_store_manager();
354 let handle = sst_file_handle(0, 1000);
355 let metadata = Arc::new(sst_region_metadata());
356 let source = new_flat_source_from_record_batches(vec![
357 new_record_batch_by_range(&["a", "d"], 0, 60),
358 new_record_batch_by_range(&["b", "f"], 0, 40),
359 new_record_batch_by_range(&["b", "h"], 100, 200),
360 ]);
361 let write_opts = WriteOptions {
362 row_group_size: 50,
363 ..Default::default()
364 };
365
366 let mut metrics = Metrics::new(WriteType::Flush);
369 let mut writer = ParquetWriter::new_with_object_store(
370 object_store.clone(),
371 metadata.clone(),
372 IndexConfig::default(),
373 NoopIndexBuilder,
374 FixedPathProvider {
375 region_file_id: handle.file_id(),
376 },
377 &mut metrics,
378 )
379 .await;
380
381 let sst_info = writer
382 .write_all_flat_as_primary_key(source, None, &write_opts)
383 .await
384 .unwrap()
385 .remove(0);
386 let writer_metadata = sst_info.file_metadata.unwrap();
387
388 let builder = ParquetReaderBuilder::new(
390 FILE_DIR.to_string(),
391 PathType::Bare,
392 handle.clone(),
393 object_store,
394 )
395 .page_index_policy(PageIndexPolicy::Optional);
396 let reader = builder.build().await.unwrap().unwrap();
397 let reader_metadata = reader.parquet_metadata();
398 let cached_writer_metadata =
399 crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
400 .unwrap()
401 .parquet_metadata();
402
403 assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
404 }
405
406 #[tokio::test]
407 async fn test_read_with_tag_filter() {
408 let mut env = TestEnv::new().await;
409 let object_store = env.init_object_store_manager();
410 let handle = sst_file_handle(0, 1000);
411 let metadata = Arc::new(sst_region_metadata());
412 let source = new_flat_source_from_record_batches(vec![
413 new_record_batch_by_range(&["a", "d"], 0, 60),
414 new_record_batch_by_range(&["b", "f"], 0, 40),
415 new_record_batch_by_range(&["b", "h"], 100, 200),
416 ]);
417 let write_opts = WriteOptions {
419 row_group_size: 50,
420 ..Default::default()
421 };
422 let mut metrics = Metrics::new(WriteType::Flush);
424 let mut writer = ParquetWriter::new_with_object_store(
425 object_store.clone(),
426 metadata.clone(),
427 IndexConfig::default(),
428 NoopIndexBuilder,
429 FixedPathProvider {
430 region_file_id: handle.file_id(),
431 },
432 &mut metrics,
433 )
434 .await;
435 writer
436 .write_all_flat_as_primary_key(source, None, &write_opts)
437 .await
438 .unwrap()
439 .remove(0);
440
441 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
444 op: Operator::Eq,
445 right: Box::new("a".lit()),
446 })]));
447
448 let builder = ParquetReaderBuilder::new(
449 FILE_DIR.to_string(),
450 PathType::Bare,
451 handle.clone(),
452 object_store,
453 )
454 .predicate(predicate);
455 let mut reader = builder.build().await.unwrap().unwrap();
456 check_record_batch_reader_result(
457 &mut reader,
458 &[
459 new_record_batch_by_range(&["a", "d"], 0, 50),
460 new_record_batch_by_range(&["a", "d"], 50, 60),
461 ],
462 )
463 .await;
464 }
465
466 #[tokio::test]
467 async fn test_read_empty_batch() {
468 let mut env = TestEnv::new().await;
469 let object_store = env.init_object_store_manager();
470 let handle = sst_file_handle(0, 1000);
471 let metadata = Arc::new(sst_region_metadata());
472 let source = new_flat_source_from_record_batches(vec![
473 new_record_batch_by_range(&["a", "z"], 0, 0),
474 new_record_batch_by_range(&["a", "z"], 100, 100),
475 new_record_batch_by_range(&["a", "z"], 200, 230),
476 ]);
477 let write_opts = WriteOptions {
479 row_group_size: 50,
480 ..Default::default()
481 };
482 let mut metrics = Metrics::new(WriteType::Flush);
484 let mut writer = ParquetWriter::new_with_object_store(
485 object_store.clone(),
486 metadata.clone(),
487 IndexConfig::default(),
488 NoopIndexBuilder,
489 FixedPathProvider {
490 region_file_id: handle.file_id(),
491 },
492 &mut metrics,
493 )
494 .await;
495 writer
496 .write_all_flat_as_primary_key(source, None, &write_opts)
497 .await
498 .unwrap()
499 .remove(0);
500
501 let builder = ParquetReaderBuilder::new(
502 FILE_DIR.to_string(),
503 PathType::Bare,
504 handle.clone(),
505 object_store,
506 );
507 let mut reader = builder.build().await.unwrap().unwrap();
508 check_record_batch_reader_result(
509 &mut reader,
510 &[new_record_batch_by_range(&["a", "z"], 200, 230)],
511 )
512 .await;
513 }
514
515 #[tokio::test]
516 async fn test_read_with_field_filter() {
517 let mut env = TestEnv::new().await;
518 let object_store = env.init_object_store_manager();
519 let handle = sst_file_handle(0, 1000);
520 let metadata = Arc::new(sst_region_metadata());
521 let source = new_flat_source_from_record_batches(vec![
522 new_record_batch_by_range(&["a", "d"], 0, 60),
523 new_record_batch_by_range(&["b", "f"], 0, 40),
524 new_record_batch_by_range(&["b", "h"], 100, 200),
525 ]);
526 let write_opts = WriteOptions {
528 row_group_size: 50,
529 ..Default::default()
530 };
531 let mut metrics = Metrics::new(WriteType::Flush);
533 let mut writer = ParquetWriter::new_with_object_store(
534 object_store.clone(),
535 metadata.clone(),
536 IndexConfig::default(),
537 NoopIndexBuilder,
538 FixedPathProvider {
539 region_file_id: handle.file_id(),
540 },
541 &mut metrics,
542 )
543 .await;
544
545 writer
546 .write_all_flat_as_primary_key(source, None, &write_opts)
547 .await
548 .unwrap()
549 .remove(0);
550
551 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
553 left: Box::new(Expr::Column(Column::from_name("field_0"))),
554 op: Operator::GtEq,
555 right: Box::new(150u64.lit()),
556 })]));
557
558 let builder = ParquetReaderBuilder::new(
559 FILE_DIR.to_string(),
560 PathType::Bare,
561 handle.clone(),
562 object_store,
563 )
564 .predicate(predicate);
565 let mut reader = builder.build().await.unwrap().unwrap();
566 check_record_batch_reader_result(
567 &mut reader,
568 &[new_record_batch_by_range(&["b", "h"], 150, 200)],
569 )
570 .await;
571 }
572
573 #[tokio::test]
574 async fn test_read_large_binary() {
575 let mut env = TestEnv::new().await;
576 let object_store = env.init_object_store_manager();
577 let handle = sst_file_handle(0, 1000);
578 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
579
580 let write_opts = WriteOptions {
581 row_group_size: 50,
582 ..Default::default()
583 };
584
585 let metadata = build_test_binary_test_region_metadata();
586 let json = metadata.to_json().unwrap();
587 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
588
589 let props_builder = WriterProperties::builder()
590 .set_key_value_metadata(Some(vec![key_value_meta]))
591 .set_compression(Compression::ZSTD(ZstdLevel::default()))
592 .set_encoding(Encoding::PLAIN)
593 .set_max_row_group_size(write_opts.row_group_size);
594
595 let writer_props = props_builder.build();
596
597 let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
598 let fields: Vec<_> = write_format
599 .arrow_schema()
600 .fields()
601 .into_iter()
602 .map(|field| {
603 let data_type = field.data_type().clone();
604 if data_type == DataType::Binary {
605 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
606 } else {
607 Field::new(field.name(), data_type, field.is_nullable())
608 }
609 })
610 .collect();
611
612 let arrow_schema = Arc::new(Schema::new(fields));
613
614 assert_eq!(
616 &DataType::LargeBinary,
617 arrow_schema.field_with_name("field_0").unwrap().data_type()
618 );
619 let mut writer = AsyncArrowWriter::try_new(
620 object_store
621 .writer_with(&file_path)
622 .concurrent(DEFAULT_WRITE_CONCURRENCY)
623 .await
624 .map(|w| w.into_futures_async_write().compat_write())
625 .unwrap(),
626 arrow_schema.clone(),
627 Some(writer_props),
628 )
629 .unwrap();
630
631 let batch = new_record_batch_with_binary(&["a"], 0, 60);
632 let arrays: Vec<_> = batch
633 .columns()
634 .iter()
635 .map(|array| {
636 let data_type = array.data_type().clone();
637 if data_type == DataType::Binary {
638 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
639 } else {
640 array.clone()
641 }
642 })
643 .collect();
644 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
645
646 writer.write(&result).await.unwrap();
647 writer.close().await.unwrap();
648
649 let builder = ParquetReaderBuilder::new(
650 FILE_DIR.to_string(),
651 PathType::Bare,
652 handle.clone(),
653 object_store,
654 );
655 let mut reader = builder.build().await.unwrap().unwrap();
656 check_record_batch_reader_result(
657 &mut reader,
658 &[
659 new_record_batch_with_binary(&["a"], 0, 50),
660 new_record_batch_with_binary(&["a"], 50, 60),
661 ],
662 )
663 .await;
664 }
665
666 #[tokio::test]
667 async fn test_write_multiple_files() {
668 common_telemetry::init_default_ut_logging();
669 let mut env = TestEnv::new().await;
671 let object_store = env.init_object_store_manager();
672 let metadata = Arc::new(sst_region_metadata());
673 let batches = vec![
674 new_record_batch_by_range(&["a", "d"], 0, 1000),
675 new_record_batch_by_range(&["b", "f"], 0, 1000),
676 new_record_batch_by_range(&["c", "g"], 0, 1000),
677 new_record_batch_by_range(&["b", "h"], 100, 200),
678 new_record_batch_by_range(&["b", "h"], 200, 300),
679 new_record_batch_by_range(&["b", "h"], 300, 1000),
680 ];
681 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
682
683 let source = new_flat_source_from_record_batches(batches);
684 let write_opts = WriteOptions {
685 row_group_size: 50,
686 max_file_size: Some(1024 * 16),
687 ..Default::default()
688 };
689
690 let path_provider = RegionFilePathFactory {
691 table_dir: "test".to_string(),
692 path_type: PathType::Bare,
693 };
694 let mut metrics = Metrics::new(WriteType::Flush);
695 let mut writer = ParquetWriter::new_with_object_store(
696 object_store.clone(),
697 metadata.clone(),
698 IndexConfig::default(),
699 NoopIndexBuilder,
700 path_provider,
701 &mut metrics,
702 )
703 .await;
704
705 let files = writer
706 .write_all_flat_as_primary_key(source, None, &write_opts)
707 .await
708 .unwrap();
709 assert_eq!(2, files.len());
710
711 let mut rows_read = 0;
712 for f in &files {
713 let file_handle = sst_file_handle_with_file_id(
714 f.file_id,
715 f.time_range.0.value(),
716 f.time_range.1.value(),
717 );
718 let builder = ParquetReaderBuilder::new(
719 "test".to_string(),
720 PathType::Bare,
721 file_handle,
722 object_store.clone(),
723 );
724 let mut reader = builder.build().await.unwrap().unwrap();
725 while let Some(batch) = reader.next_record_batch().await.unwrap() {
726 rows_read += batch.num_rows();
727 }
728 }
729 assert_eq!(total_rows, rows_read);
730 }
731
732 #[tokio::test]
733 async fn test_write_read_with_index() {
734 let mut env = TestEnv::new().await;
735 let object_store = env.init_object_store_manager();
736 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
737 let metadata = Arc::new(sst_region_metadata());
738 let row_group_size = 50;
739
740 let source = new_flat_source_from_record_batches(vec![
741 new_record_batch_by_range(&["a", "d"], 0, 20),
742 new_record_batch_by_range(&["b", "d"], 0, 20),
743 new_record_batch_by_range(&["c", "d"], 0, 20),
744 new_record_batch_by_range(&["c", "f"], 0, 40),
745 new_record_batch_by_range(&["c", "h"], 100, 200),
746 ]);
747 let write_opts = WriteOptions {
749 row_group_size,
750 ..Default::default()
751 };
752
753 let puffin_manager = env
754 .get_puffin_manager()
755 .build(object_store.clone(), file_path.clone());
756 let intermediate_manager = env.get_intermediate_manager();
757
758 let indexer_builder = IndexerBuilderImpl {
759 build_type: IndexBuildType::Flush,
760 metadata: metadata.clone(),
761 row_group_size,
762 puffin_manager,
763 write_cache_enabled: false,
764 intermediate_manager,
765 index_options: IndexOptions {
766 inverted_index: InvertedIndexOptions {
767 segment_row_count: 1,
768 ..Default::default()
769 },
770 },
771 inverted_index_config: Default::default(),
772 fulltext_index_config: Default::default(),
773 bloom_filter_index_config: Default::default(),
774 #[cfg(feature = "vector_index")]
775 vector_index_config: Default::default(),
776 };
777
778 let mut metrics = Metrics::new(WriteType::Flush);
779 let mut writer = ParquetWriter::new_with_object_store(
780 object_store.clone(),
781 metadata.clone(),
782 IndexConfig::default(),
783 indexer_builder,
784 file_path.clone(),
785 &mut metrics,
786 )
787 .await;
788
789 let info = writer
790 .write_all_flat_as_primary_key(source, None, &write_opts)
791 .await
792 .unwrap()
793 .remove(0);
794 assert_eq!(200, info.num_rows);
795 assert!(info.file_size > 0);
796 assert!(info.index_metadata.file_size > 0);
797
798 assert!(info.index_metadata.inverted_index.index_size > 0);
799 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
800 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
801
802 assert!(info.index_metadata.bloom_filter.index_size > 0);
803 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
804 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
805
806 assert_eq!(
807 (
808 Timestamp::new_millisecond(0),
809 Timestamp::new_millisecond(199)
810 ),
811 info.time_range
812 );
813
814 let handle = FileHandle::new(
815 FileMeta {
816 region_id: metadata.region_id,
817 file_id: info.file_id,
818 time_range: info.time_range,
819 level: 0,
820 file_size: info.file_size,
821 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
822 available_indexes: info.index_metadata.build_available_indexes(),
823 indexes: info.index_metadata.build_indexes(),
824 index_file_size: info.index_metadata.file_size,
825 index_version: 0,
826 num_row_groups: info.num_row_groups,
827 num_rows: info.num_rows as u64,
828 sequence: None,
829 partition_expr: match &metadata.partition_expr {
830 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
831 .expect("partition expression should be valid JSON"),
832 None => None,
833 },
834 num_series: 0,
835 ..Default::default()
836 },
837 Arc::new(NoopFilePurger),
838 );
839
840 let cache = Arc::new(
841 CacheManager::builder()
842 .index_result_cache_size(1024 * 1024)
843 .index_metadata_size(1024 * 1024)
844 .index_content_page_size(1024 * 1024)
845 .index_content_size(1024 * 1024)
846 .puffin_metadata_size(1024 * 1024)
847 .build(),
848 );
849 let index_result_cache = cache.index_result_cache().unwrap();
850
851 let build_inverted_index_applier = |exprs: &[Expr]| {
852 InvertedIndexApplierBuilder::new(
853 FILE_DIR.to_string(),
854 PathType::Bare,
855 object_store.clone(),
856 &metadata,
857 HashSet::from_iter([0]),
858 env.get_puffin_manager(),
859 )
860 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
861 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
862 .build(exprs)
863 .unwrap()
864 .map(Arc::new)
865 };
866
867 let build_bloom_filter_applier = |exprs: &[Expr]| {
868 BloomFilterIndexApplierBuilder::new(
869 FILE_DIR.to_string(),
870 PathType::Bare,
871 object_store.clone(),
872 &metadata,
873 env.get_puffin_manager(),
874 )
875 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
876 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
877 .build(exprs)
878 .unwrap()
879 .map(Arc::new)
880 };
881
882 let preds = vec![col("tag_0").eq(lit("b"))];
899 let inverted_index_applier = build_inverted_index_applier(&preds);
900 let bloom_filter_applier = build_bloom_filter_applier(&preds);
901
902 let builder = ParquetReaderBuilder::new(
903 FILE_DIR.to_string(),
904 PathType::Bare,
905 handle.clone(),
906 object_store.clone(),
907 )
908 .predicate(Some(Predicate::new(preds)))
909 .inverted_index_appliers([inverted_index_applier.clone(), None])
910 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
911 .cache(CacheStrategy::EnableAll(cache.clone()));
912
913 let mut metrics = ReaderMetrics::default();
914 let (context, selection) = builder
915 .build_reader_input(&mut metrics)
916 .await
917 .unwrap()
918 .unwrap();
919 let mut reader = ParquetReader::new(Arc::new(context), selection)
920 .await
921 .unwrap();
922 check_record_batch_reader_result(
923 &mut reader,
924 &[new_record_batch_by_range(&["b", "d"], 0, 20)],
925 )
926 .await;
927
928 assert_eq!(metrics.filter_metrics.rg_total, 4);
929 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
930 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
931 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
932 let cached = index_result_cache
933 .get(
934 inverted_index_applier.unwrap().predicate_key(),
935 handle.file_id().file_id(),
936 )
937 .unwrap();
938 assert!(cached.contains_row_group(0));
940 assert!(cached.contains_row_group(1));
941 assert!(cached.contains_row_group(2));
942 assert!(cached.contains_row_group(3));
943
944 let preds = vec![
959 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
960 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
961 col("tag_1").eq(lit("d")),
962 ];
963 let inverted_index_applier = build_inverted_index_applier(&preds);
964 let bloom_filter_applier = build_bloom_filter_applier(&preds);
965
966 let builder = ParquetReaderBuilder::new(
967 FILE_DIR.to_string(),
968 PathType::Bare,
969 handle.clone(),
970 object_store.clone(),
971 )
972 .predicate(Some(Predicate::new(preds)))
973 .inverted_index_appliers([inverted_index_applier.clone(), None])
974 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
975 .cache(CacheStrategy::EnableAll(cache.clone()));
976
977 let mut metrics = ReaderMetrics::default();
978 let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
979 assert!(read_input.is_none());
980
981 assert_eq!(metrics.filter_metrics.rg_total, 4);
982 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
983 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
984 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
985 let cached = index_result_cache
986 .get(
987 bloom_filter_applier.unwrap().predicate_key(),
988 handle.file_id().file_id(),
989 )
990 .unwrap();
991 assert!(cached.contains_row_group(2));
992 assert!(cached.contains_row_group(3));
993 assert!(!cached.contains_row_group(0));
994 assert!(!cached.contains_row_group(1));
995
996 let preds = vec![col("tag_1").eq(lit("d"))];
1017 let inverted_index_applier = build_inverted_index_applier(&preds);
1018 let bloom_filter_applier = build_bloom_filter_applier(&preds);
1019
1020 let builder = ParquetReaderBuilder::new(
1021 FILE_DIR.to_string(),
1022 PathType::Bare,
1023 handle.clone(),
1024 object_store.clone(),
1025 )
1026 .predicate(Some(Predicate::new(preds)))
1027 .inverted_index_appliers([inverted_index_applier.clone(), None])
1028 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1029 .cache(CacheStrategy::EnableAll(cache.clone()));
1030
1031 let mut metrics = ReaderMetrics::default();
1032 let (context, selection) = builder
1033 .build_reader_input(&mut metrics)
1034 .await
1035 .unwrap()
1036 .unwrap();
1037 let mut reader = ParquetReader::new(Arc::new(context), selection)
1038 .await
1039 .unwrap();
1040 check_record_batch_reader_result(
1041 &mut reader,
1042 &[
1043 new_record_batch_by_range(&["a", "d"], 0, 20),
1044 new_record_batch_by_range(&["b", "d"], 0, 20),
1045 new_record_batch_by_range(&["c", "d"], 0, 10),
1046 new_record_batch_by_range(&["c", "d"], 10, 20),
1047 ],
1048 )
1049 .await;
1050
1051 assert_eq!(metrics.filter_metrics.rg_total, 4);
1052 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1053 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1054 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1055 let cached = index_result_cache
1056 .get(
1057 bloom_filter_applier.unwrap().predicate_key(),
1058 handle.file_id().file_id(),
1059 )
1060 .unwrap();
1061 assert!(cached.contains_row_group(0));
1062 assert!(cached.contains_row_group(1));
1063 assert!(cached.contains_row_group(2));
1064 assert!(cached.contains_row_group(3));
1065 }
1066
1067 fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1068 assert!(end >= start);
1069 let metadata = build_test_binary_test_region_metadata();
1070 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1071
1072 let num_rows = end - start;
1073 let mut columns = Vec::new();
1074
1075 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1076 for _ in 0..num_rows {
1077 tag_0_builder.append_value(tags[0]);
1078 }
1079 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1080
1081 let values = (0..num_rows)
1082 .map(|_| "some data".as_bytes())
1083 .collect::<Vec<_>>();
1084 columns.push(
1085 Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1086 values,
1087 )) as ArrayRef,
1088 );
1089
1090 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1091 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1092
1093 let pk = new_primary_key(tags);
1094 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1095 for _ in 0..num_rows {
1096 pk_builder.append(&pk).unwrap();
1097 }
1098 columns.push(Arc::new(pk_builder.finish()));
1099
1100 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1101 columns.push(Arc::new(UInt8Array::from_value(
1102 OpType::Put as u8,
1103 num_rows,
1104 )));
1105
1106 RecordBatch::try_new(flat_schema, columns).unwrap()
1107 }
1108
1109 async fn check_record_batch_reader_result(
1110 reader: &mut ParquetReader,
1111 expected: &[RecordBatch],
1112 ) {
1113 let mut actual = Vec::new();
1114 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1115 actual.push(batch);
1116 }
1117 assert_eq!(
1118 pretty_format_batches(expected).unwrap().to_string(),
1119 pretty_format_batches(&actual).unwrap().to_string()
1120 );
1121 assert!(reader.next_record_batch().await.unwrap().is_none());
1122 }
1123
1124 fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1125 let metadata = Arc::new(sst_region_metadata());
1126 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1127
1128 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1129 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1130 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1131 let mut field_values = Vec::with_capacity(rows.len());
1132 let mut timestamps = Vec::with_capacity(rows.len());
1133
1134 for (tag_0, tag_1, ts) in rows {
1135 tag_0_builder.append_value(*tag_0);
1136 tag_1_builder.append_value(*tag_1);
1137 pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1138 field_values.push(*ts as u64);
1139 timestamps.push(*ts);
1140 }
1141
1142 RecordBatch::try_new(
1143 flat_schema,
1144 vec![
1145 Arc::new(tag_0_builder.finish()) as ArrayRef,
1146 Arc::new(tag_1_builder.finish()) as ArrayRef,
1147 Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1148 Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1149 Arc::new(pk_builder.finish()) as ArrayRef,
1150 Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1151 Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1152 ],
1153 )
1154 .unwrap()
1155 }
1156
1157 fn new_record_batch_by_range_sparse(
1160 tags: &[&str],
1161 start: usize,
1162 end: usize,
1163 metadata: &Arc<RegionMetadata>,
1164 ) -> RecordBatch {
1165 assert!(end >= start);
1166 let flat_schema = to_flat_sst_arrow_schema(
1167 metadata,
1168 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1169 );
1170
1171 let num_rows = end - start;
1172 let mut columns: Vec<ArrayRef> = Vec::new();
1173
1174 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1178 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1179
1180 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1182 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1183
1184 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1188
1189 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1190 for _ in 0..num_rows {
1191 pk_builder.append(&pk).unwrap();
1192 }
1193 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1194
1195 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1197
1198 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1200
1201 RecordBatch::try_new(flat_schema, columns).unwrap()
1202 }
1203
1204 fn create_test_indexer_builder(
1206 env: &TestEnv,
1207 object_store: ObjectStore,
1208 file_path: RegionFilePathFactory,
1209 metadata: Arc<RegionMetadata>,
1210 row_group_size: usize,
1211 ) -> IndexerBuilderImpl {
1212 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1213 let intermediate_manager = env.get_intermediate_manager();
1214
1215 IndexerBuilderImpl {
1216 build_type: IndexBuildType::Flush,
1217 metadata,
1218 row_group_size,
1219 puffin_manager,
1220 write_cache_enabled: false,
1221 intermediate_manager,
1222 index_options: IndexOptions {
1223 inverted_index: InvertedIndexOptions {
1224 segment_row_count: 1,
1225 ..Default::default()
1226 },
1227 },
1228 inverted_index_config: Default::default(),
1229 fulltext_index_config: Default::default(),
1230 bloom_filter_index_config: Default::default(),
1231 #[cfg(feature = "vector_index")]
1232 vector_index_config: Default::default(),
1233 }
1234 }
1235
1236 async fn write_flat_sst(
1238 object_store: ObjectStore,
1239 metadata: Arc<RegionMetadata>,
1240 indexer_builder: IndexerBuilderImpl,
1241 file_path: RegionFilePathFactory,
1242 flat_source: FlatSource,
1243 write_opts: &WriteOptions,
1244 ) -> SstInfo {
1245 let mut metrics = Metrics::new(WriteType::Flush);
1246 let mut writer = ParquetWriter::new_with_object_store(
1247 object_store,
1248 metadata,
1249 IndexConfig::default(),
1250 indexer_builder,
1251 file_path,
1252 &mut metrics,
1253 )
1254 .await;
1255
1256 writer
1257 .write_all_flat(flat_source, None, write_opts)
1258 .await
1259 .unwrap()
1260 .remove(0)
1261 }
1262
1263 fn create_file_handle_from_sst_info(
1265 info: &SstInfo,
1266 metadata: &Arc<RegionMetadata>,
1267 ) -> FileHandle {
1268 FileHandle::new(
1269 FileMeta {
1270 region_id: metadata.region_id,
1271 file_id: info.file_id,
1272 time_range: info.time_range,
1273 level: 0,
1274 file_size: info.file_size,
1275 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1276 available_indexes: info.index_metadata.build_available_indexes(),
1277 indexes: info.index_metadata.build_indexes(),
1278 index_file_size: info.index_metadata.file_size,
1279 index_version: 0,
1280 num_row_groups: info.num_row_groups,
1281 num_rows: info.num_rows as u64,
1282 sequence: None,
1283 partition_expr: match &metadata.partition_expr {
1284 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1285 .expect("partition expression should be valid JSON"),
1286 None => None,
1287 },
1288 num_series: 0,
1289 ..Default::default()
1290 },
1291 Arc::new(NoopFilePurger),
1292 )
1293 }
1294
1295 fn create_test_cache() -> Arc<CacheManager> {
1297 Arc::new(
1298 CacheManager::builder()
1299 .index_result_cache_size(1024 * 1024)
1300 .index_metadata_size(1024 * 1024)
1301 .index_content_page_size(1024 * 1024)
1302 .index_content_size(1024 * 1024)
1303 .puffin_metadata_size(1024 * 1024)
1304 .build(),
1305 )
1306 }
1307
1308 #[tokio::test]
1309 async fn test_write_flat_with_index() {
1310 let mut env = TestEnv::new().await;
1311 let object_store = env.init_object_store_manager();
1312 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1313 let metadata = Arc::new(sst_region_metadata());
1314 let row_group_size = 50;
1315
1316 let flat_batches = vec![
1318 new_record_batch_by_range(&["a", "d"], 0, 20),
1319 new_record_batch_by_range(&["b", "d"], 0, 20),
1320 new_record_batch_by_range(&["c", "d"], 0, 20),
1321 new_record_batch_by_range(&["c", "f"], 0, 40),
1322 new_record_batch_by_range(&["c", "h"], 100, 200),
1323 ];
1324
1325 let flat_source = new_flat_source_from_record_batches(flat_batches);
1326
1327 let write_opts = WriteOptions {
1328 row_group_size,
1329 ..Default::default()
1330 };
1331
1332 let puffin_manager = env
1333 .get_puffin_manager()
1334 .build(object_store.clone(), file_path.clone());
1335 let intermediate_manager = env.get_intermediate_manager();
1336
1337 let indexer_builder = IndexerBuilderImpl {
1338 build_type: IndexBuildType::Flush,
1339 metadata: metadata.clone(),
1340 row_group_size,
1341 puffin_manager,
1342 write_cache_enabled: false,
1343 intermediate_manager,
1344 index_options: IndexOptions {
1345 inverted_index: InvertedIndexOptions {
1346 segment_row_count: 1,
1347 ..Default::default()
1348 },
1349 },
1350 inverted_index_config: Default::default(),
1351 fulltext_index_config: Default::default(),
1352 bloom_filter_index_config: Default::default(),
1353 #[cfg(feature = "vector_index")]
1354 vector_index_config: Default::default(),
1355 };
1356
1357 let mut metrics = Metrics::new(WriteType::Flush);
1358 let mut writer = ParquetWriter::new_with_object_store(
1359 object_store.clone(),
1360 metadata.clone(),
1361 IndexConfig::default(),
1362 indexer_builder,
1363 file_path.clone(),
1364 &mut metrics,
1365 )
1366 .await;
1367
1368 let info = writer
1369 .write_all_flat(flat_source, None, &write_opts)
1370 .await
1371 .unwrap()
1372 .remove(0);
1373 assert_eq!(200, info.num_rows);
1374 assert!(info.file_size > 0);
1375 assert!(info.index_metadata.file_size > 0);
1376
1377 assert!(info.index_metadata.inverted_index.index_size > 0);
1378 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1379 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1380
1381 assert!(info.index_metadata.bloom_filter.index_size > 0);
1382 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1383 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1384
1385 assert_eq!(
1386 (
1387 Timestamp::new_millisecond(0),
1388 Timestamp::new_millisecond(199)
1389 ),
1390 info.time_range
1391 );
1392 }
1393
1394 #[tokio::test]
1395 async fn test_read_with_override_sequence() {
1396 let mut env = TestEnv::new().await;
1397 let object_store = env.init_object_store_manager();
1398 let handle = sst_file_handle(0, 1000);
1399 let file_path = FixedPathProvider {
1400 region_file_id: handle.file_id(),
1401 };
1402 let metadata = Arc::new(sst_region_metadata());
1403
1404 let source = new_flat_source_from_record_batches(vec![
1406 new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1407 new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1408 ]);
1409
1410 let write_opts = WriteOptions {
1411 row_group_size: 50,
1412 ..Default::default()
1413 };
1414
1415 let mut metrics = Metrics::new(WriteType::Flush);
1416 let mut writer = ParquetWriter::new_with_object_store(
1417 object_store.clone(),
1418 metadata.clone(),
1419 IndexConfig::default(),
1420 NoopIndexBuilder,
1421 file_path,
1422 &mut metrics,
1423 )
1424 .await;
1425
1426 writer
1427 .write_all_flat_as_primary_key(source, None, &write_opts)
1428 .await
1429 .unwrap()
1430 .remove(0);
1431
1432 let builder = ParquetReaderBuilder::new(
1434 FILE_DIR.to_string(),
1435 PathType::Bare,
1436 handle.clone(),
1437 object_store.clone(),
1438 );
1439 let mut reader = builder.build().await.unwrap().unwrap();
1440 let mut normal_batches = Vec::new();
1441 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1442 normal_batches.push(batch);
1443 }
1444
1445 let custom_sequence = 12345u64;
1447 let file_meta = handle.meta_ref();
1448 let mut override_file_meta = file_meta.clone();
1449 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1450 let override_handle = FileHandle::new(
1451 override_file_meta,
1452 Arc::new(crate::sst::file_purger::NoopFilePurger),
1453 );
1454
1455 let builder = ParquetReaderBuilder::new(
1456 FILE_DIR.to_string(),
1457 PathType::Bare,
1458 override_handle,
1459 object_store.clone(),
1460 );
1461 let mut reader = builder.build().await.unwrap().unwrap();
1462 let mut override_batches = Vec::new();
1463 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1464 override_batches.push(batch);
1465 }
1466
1467 assert_eq!(normal_batches.len(), override_batches.len());
1469 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1470 let expected_batch = {
1471 let mut columns = normal.columns().to_vec();
1472 let num_cols = columns.len();
1473 columns[num_cols - 2] =
1474 Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1475 RecordBatch::try_new(normal.schema(), columns).unwrap()
1476 };
1477
1478 assert_eq!(*override_batch, expected_batch);
1480 }
1481 }
1482
1483 #[tokio::test]
1484 async fn test_write_flat_read_with_inverted_index() {
1485 let mut env = TestEnv::new().await;
1486 let object_store = env.init_object_store_manager();
1487 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1488 let metadata = Arc::new(sst_region_metadata());
1489 let row_group_size = 100;
1490
1491 let flat_batches = vec![
1499 new_record_batch_by_range(&["a", "d"], 0, 50),
1500 new_record_batch_by_range(&["b", "d"], 50, 100),
1501 new_record_batch_by_range(&["c", "d"], 100, 150),
1502 new_record_batch_by_range(&["c", "f"], 150, 200),
1503 ];
1504
1505 let flat_source = new_flat_source_from_record_batches(flat_batches);
1506
1507 let write_opts = WriteOptions {
1508 row_group_size,
1509 ..Default::default()
1510 };
1511
1512 let indexer_builder = create_test_indexer_builder(
1513 &env,
1514 object_store.clone(),
1515 file_path.clone(),
1516 metadata.clone(),
1517 row_group_size,
1518 );
1519
1520 let info = write_flat_sst(
1521 object_store.clone(),
1522 metadata.clone(),
1523 indexer_builder,
1524 file_path.clone(),
1525 flat_source,
1526 &write_opts,
1527 )
1528 .await;
1529 assert_eq!(200, info.num_rows);
1530 assert!(info.file_size > 0);
1531 assert!(info.index_metadata.file_size > 0);
1532
1533 let handle = create_file_handle_from_sst_info(&info, &metadata);
1534
1535 let cache = create_test_cache();
1536
1537 let preds = vec![col("tag_0").eq(lit("b"))];
1540 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1541 FILE_DIR.to_string(),
1542 PathType::Bare,
1543 object_store.clone(),
1544 &metadata,
1545 HashSet::from_iter([0]),
1546 env.get_puffin_manager(),
1547 )
1548 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1549 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1550 .build(&preds)
1551 .unwrap()
1552 .map(Arc::new);
1553
1554 let builder = ParquetReaderBuilder::new(
1555 FILE_DIR.to_string(),
1556 PathType::Bare,
1557 handle.clone(),
1558 object_store.clone(),
1559 )
1560 .predicate(Some(Predicate::new(preds)))
1561 .inverted_index_appliers([inverted_index_applier.clone(), None])
1562 .cache(CacheStrategy::EnableAll(cache.clone()));
1563
1564 let mut metrics = ReaderMetrics::default();
1565 let (_context, selection) = builder
1566 .build_reader_input(&mut metrics)
1567 .await
1568 .unwrap()
1569 .unwrap();
1570
1571 assert_eq!(selection.row_group_count(), 1);
1573 assert_eq!(50, selection.get(0).unwrap().row_count());
1574
1575 assert_eq!(metrics.filter_metrics.rg_total, 2);
1577 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1578 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1579 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1580 }
1581
1582 #[tokio::test]
1583 async fn test_write_flat_read_with_bloom_filter() {
1584 let mut env = TestEnv::new().await;
1585 let object_store = env.init_object_store_manager();
1586 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1587 let metadata = Arc::new(sst_region_metadata());
1588 let row_group_size = 100;
1589
1590 let flat_batches = vec![
1598 new_record_batch_by_range(&["a", "d"], 0, 50),
1599 new_record_batch_by_range(&["b", "e"], 50, 100),
1600 new_record_batch_by_range(&["c", "d"], 100, 150),
1601 new_record_batch_by_range(&["c", "f"], 150, 200),
1602 ];
1603
1604 let flat_source = new_flat_source_from_record_batches(flat_batches);
1605
1606 let write_opts = WriteOptions {
1607 row_group_size,
1608 ..Default::default()
1609 };
1610
1611 let indexer_builder = create_test_indexer_builder(
1612 &env,
1613 object_store.clone(),
1614 file_path.clone(),
1615 metadata.clone(),
1616 row_group_size,
1617 );
1618
1619 let info = write_flat_sst(
1620 object_store.clone(),
1621 metadata.clone(),
1622 indexer_builder,
1623 file_path.clone(),
1624 flat_source,
1625 &write_opts,
1626 )
1627 .await;
1628 assert_eq!(200, info.num_rows);
1629 assert!(info.file_size > 0);
1630 assert!(info.index_metadata.file_size > 0);
1631
1632 let handle = create_file_handle_from_sst_info(&info, &metadata);
1633
1634 let cache = create_test_cache();
1635
1636 let preds = vec![
1639 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1640 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1641 col("tag_1").eq(lit("d")),
1642 ];
1643 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1644 FILE_DIR.to_string(),
1645 PathType::Bare,
1646 object_store.clone(),
1647 &metadata,
1648 env.get_puffin_manager(),
1649 )
1650 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1651 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1652 .build(&preds)
1653 .unwrap()
1654 .map(Arc::new);
1655
1656 let builder = ParquetReaderBuilder::new(
1657 FILE_DIR.to_string(),
1658 PathType::Bare,
1659 handle.clone(),
1660 object_store.clone(),
1661 )
1662 .predicate(Some(Predicate::new(preds)))
1663 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1664 .cache(CacheStrategy::EnableAll(cache.clone()));
1665
1666 let mut metrics = ReaderMetrics::default();
1667 let (_context, selection) = builder
1668 .build_reader_input(&mut metrics)
1669 .await
1670 .unwrap()
1671 .unwrap();
1672
1673 assert_eq!(selection.row_group_count(), 2);
1675 assert_eq!(50, selection.get(0).unwrap().row_count());
1676 assert_eq!(50, selection.get(1).unwrap().row_count());
1677
1678 assert_eq!(metrics.filter_metrics.rg_total, 2);
1680 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1681 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1682 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1683 }
1684
1685 #[tokio::test]
1686 async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1687 let mut env = TestEnv::new().await;
1688 let object_store = env.init_object_store_manager();
1689 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1690 let metadata = Arc::new(sst_region_metadata());
1691 let row_group_size = 10;
1692
1693 let flat_source = new_flat_source_from_record_batches(vec![
1694 new_record_batch_by_range(&["a", "d"], 0, 3),
1695 new_record_batch_by_range(&["b", "d"], 3, 10),
1696 ]);
1697 let write_opts = WriteOptions {
1698 row_group_size,
1699 ..Default::default()
1700 };
1701 let indexer_builder = create_test_indexer_builder(
1702 &env,
1703 object_store.clone(),
1704 file_path.clone(),
1705 metadata.clone(),
1706 row_group_size,
1707 );
1708 let info = write_flat_sst(
1709 object_store.clone(),
1710 metadata.clone(),
1711 indexer_builder,
1712 file_path,
1713 flat_source,
1714 &write_opts,
1715 )
1716 .await;
1717 let handle = create_file_handle_from_sst_info(&info, &metadata);
1718
1719 let builder =
1720 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1721 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1722
1723 let mut metrics = ReaderMetrics::default();
1724 let (context, _) = builder
1725 .build_reader_input(&mut metrics)
1726 .await
1727 .unwrap()
1728 .unwrap();
1729 let selection = RowGroupSelection::from_row_ranges(
1730 vec![(0, std::iter::once(0..6).collect())],
1731 row_group_size,
1732 );
1733
1734 let mut reader = ParquetReader::new(Arc::new(context), selection)
1735 .await
1736 .unwrap();
1737 check_record_batch_reader_result(
1738 &mut reader,
1739 &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1740 )
1741 .await;
1742 }
1743
1744 #[tokio::test]
1745 async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1746 let mut env = TestEnv::new().await;
1747 let object_store = env.init_object_store_manager();
1748 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1749 let metadata = Arc::new(sst_region_metadata());
1750 let row_group_size = 8;
1751
1752 let flat_source = new_flat_source_from_record_batches(vec![
1753 new_record_batch_by_range(&["a", "d"], 0, 2),
1754 new_record_batch_by_range(&["b", "d"], 2, 4),
1755 new_record_batch_by_range(&["a", "d"], 4, 6),
1756 new_record_batch_by_range(&["c", "d"], 6, 8),
1757 ]);
1758 let write_opts = WriteOptions {
1759 row_group_size,
1760 ..Default::default()
1761 };
1762 let indexer_builder = create_test_indexer_builder(
1763 &env,
1764 object_store.clone(),
1765 file_path.clone(),
1766 metadata.clone(),
1767 row_group_size,
1768 );
1769 let info = write_flat_sst(
1770 object_store.clone(),
1771 metadata.clone(),
1772 indexer_builder,
1773 file_path,
1774 flat_source,
1775 &write_opts,
1776 )
1777 .await;
1778 let handle = create_file_handle_from_sst_info(&info, &metadata);
1779
1780 let builder =
1781 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1782 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1783
1784 let mut metrics = ReaderMetrics::default();
1785 let (context, _) = builder
1786 .build_reader_input(&mut metrics)
1787 .await
1788 .unwrap()
1789 .unwrap();
1790 let selection = RowGroupSelection::from_row_ranges(
1791 vec![(0, std::iter::once(0..8).collect())],
1792 row_group_size,
1793 );
1794
1795 let mut reader = ParquetReader::new(Arc::new(context), selection)
1796 .await
1797 .unwrap();
1798 check_record_batch_reader_result(
1799 &mut reader,
1800 &[new_record_batch_from_rows(&[
1801 ("a", "d", 0),
1802 ("a", "d", 1),
1803 ("a", "d", 4),
1804 ("a", "d", 5),
1805 ])],
1806 )
1807 .await;
1808 }
1809
1810 #[tokio::test]
1811 async fn test_write_flat_read_with_inverted_index_sparse() {
1812 common_telemetry::init_default_ut_logging();
1813
1814 let mut env = TestEnv::new().await;
1815 let object_store = env.init_object_store_manager();
1816 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1817 let metadata = Arc::new(sst_region_metadata_with_encoding(
1818 PrimaryKeyEncoding::Sparse,
1819 ));
1820 let row_group_size = 100;
1821
1822 let flat_batches = vec![
1830 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1831 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1832 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1833 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1834 ];
1835
1836 let flat_source = new_flat_source_from_record_batches(flat_batches);
1837
1838 let write_opts = WriteOptions {
1839 row_group_size,
1840 ..Default::default()
1841 };
1842
1843 let indexer_builder = create_test_indexer_builder(
1844 &env,
1845 object_store.clone(),
1846 file_path.clone(),
1847 metadata.clone(),
1848 row_group_size,
1849 );
1850
1851 let info = write_flat_sst(
1852 object_store.clone(),
1853 metadata.clone(),
1854 indexer_builder,
1855 file_path.clone(),
1856 flat_source,
1857 &write_opts,
1858 )
1859 .await;
1860 assert_eq!(200, info.num_rows);
1861 assert!(info.file_size > 0);
1862 assert!(info.index_metadata.file_size > 0);
1863
1864 let handle = create_file_handle_from_sst_info(&info, &metadata);
1865
1866 let cache = create_test_cache();
1867
1868 let preds = vec![col("tag_0").eq(lit("b"))];
1871 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1872 FILE_DIR.to_string(),
1873 PathType::Bare,
1874 object_store.clone(),
1875 &metadata,
1876 HashSet::from_iter([0]),
1877 env.get_puffin_manager(),
1878 )
1879 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1880 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1881 .build(&preds)
1882 .unwrap()
1883 .map(Arc::new);
1884
1885 let builder = ParquetReaderBuilder::new(
1886 FILE_DIR.to_string(),
1887 PathType::Bare,
1888 handle.clone(),
1889 object_store.clone(),
1890 )
1891 .predicate(Some(Predicate::new(preds)))
1892 .inverted_index_appliers([inverted_index_applier.clone(), None])
1893 .cache(CacheStrategy::EnableAll(cache.clone()));
1894
1895 let mut metrics = ReaderMetrics::default();
1896 let (_context, selection) = builder
1897 .build_reader_input(&mut metrics)
1898 .await
1899 .unwrap()
1900 .unwrap();
1901
1902 assert_eq!(selection.row_group_count(), 1);
1904 assert_eq!(50, selection.get(0).unwrap().row_count());
1905
1906 assert_eq!(metrics.filter_metrics.rg_total, 2);
1910 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1912 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1913 }
1914
1915 #[tokio::test]
1916 async fn test_write_flat_read_with_bloom_filter_sparse() {
1917 let mut env = TestEnv::new().await;
1918 let object_store = env.init_object_store_manager();
1919 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1920 let metadata = Arc::new(sst_region_metadata_with_encoding(
1921 PrimaryKeyEncoding::Sparse,
1922 ));
1923 let row_group_size = 100;
1924
1925 let flat_batches = vec![
1933 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1934 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1935 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1936 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1937 ];
1938
1939 let flat_source = new_flat_source_from_record_batches(flat_batches);
1940
1941 let write_opts = WriteOptions {
1942 row_group_size,
1943 ..Default::default()
1944 };
1945
1946 let indexer_builder = create_test_indexer_builder(
1947 &env,
1948 object_store.clone(),
1949 file_path.clone(),
1950 metadata.clone(),
1951 row_group_size,
1952 );
1953
1954 let info = write_flat_sst(
1955 object_store.clone(),
1956 metadata.clone(),
1957 indexer_builder,
1958 file_path.clone(),
1959 flat_source,
1960 &write_opts,
1961 )
1962 .await;
1963 assert_eq!(200, info.num_rows);
1964 assert!(info.file_size > 0);
1965 assert!(info.index_metadata.file_size > 0);
1966
1967 let handle = create_file_handle_from_sst_info(&info, &metadata);
1968
1969 let cache = create_test_cache();
1970
1971 let preds = vec![
1974 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1975 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1976 col("tag_1").eq(lit("d")),
1977 ];
1978 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1979 FILE_DIR.to_string(),
1980 PathType::Bare,
1981 object_store.clone(),
1982 &metadata,
1983 env.get_puffin_manager(),
1984 )
1985 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1986 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1987 .build(&preds)
1988 .unwrap()
1989 .map(Arc::new);
1990
1991 let builder = ParquetReaderBuilder::new(
1992 FILE_DIR.to_string(),
1993 PathType::Bare,
1994 handle.clone(),
1995 object_store.clone(),
1996 )
1997 .predicate(Some(Predicate::new(preds)))
1998 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1999 .cache(CacheStrategy::EnableAll(cache.clone()));
2000
2001 let mut metrics = ReaderMetrics::default();
2002 let (_context, selection) = builder
2003 .build_reader_input(&mut metrics)
2004 .await
2005 .unwrap()
2006 .unwrap();
2007
2008 assert_eq!(selection.row_group_count(), 2);
2010 assert_eq!(50, selection.get(0).unwrap().row_count());
2011 assert_eq!(50, selection.get(1).unwrap().row_count());
2012
2013 assert_eq!(metrics.filter_metrics.rg_total, 2);
2015 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2016 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2017 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2018 }
2019
2020 fn fulltext_region_metadata() -> RegionMetadata {
2023 let mut builder = RegionMetadataBuilder::new(REGION_ID);
2024 builder
2025 .push_column_metadata(ColumnMetadata {
2026 column_schema: ColumnSchema::new(
2027 "tag_0".to_string(),
2028 ConcreteDataType::string_datatype(),
2029 true,
2030 ),
2031 semantic_type: SemanticType::Tag,
2032 column_id: 0,
2033 })
2034 .push_column_metadata(ColumnMetadata {
2035 column_schema: ColumnSchema::new(
2036 "text_bloom".to_string(),
2037 ConcreteDataType::string_datatype(),
2038 true,
2039 )
2040 .with_fulltext_options(FulltextOptions {
2041 enable: true,
2042 analyzer: FulltextAnalyzer::English,
2043 case_sensitive: false,
2044 backend: FulltextBackend::Bloom,
2045 granularity: 1,
2046 false_positive_rate_in_10000: 50,
2047 })
2048 .unwrap(),
2049 semantic_type: SemanticType::Field,
2050 column_id: 1,
2051 })
2052 .push_column_metadata(ColumnMetadata {
2053 column_schema: ColumnSchema::new(
2054 "text_tantivy".to_string(),
2055 ConcreteDataType::string_datatype(),
2056 true,
2057 )
2058 .with_fulltext_options(FulltextOptions {
2059 enable: true,
2060 analyzer: FulltextAnalyzer::English,
2061 case_sensitive: false,
2062 backend: FulltextBackend::Tantivy,
2063 granularity: 1,
2064 false_positive_rate_in_10000: 50,
2065 })
2066 .unwrap(),
2067 semantic_type: SemanticType::Field,
2068 column_id: 2,
2069 })
2070 .push_column_metadata(ColumnMetadata {
2071 column_schema: ColumnSchema::new(
2072 "field_0".to_string(),
2073 ConcreteDataType::uint64_datatype(),
2074 true,
2075 ),
2076 semantic_type: SemanticType::Field,
2077 column_id: 3,
2078 })
2079 .push_column_metadata(ColumnMetadata {
2080 column_schema: ColumnSchema::new(
2081 "ts".to_string(),
2082 ConcreteDataType::timestamp_millisecond_datatype(),
2083 false,
2084 ),
2085 semantic_type: SemanticType::Timestamp,
2086 column_id: 4,
2087 })
2088 .primary_key(vec![0]);
2089 builder.build().unwrap()
2090 }
2091
2092 fn new_fulltext_record_batch_by_range(
2094 tag: &str,
2095 text_bloom: &str,
2096 text_tantivy: &str,
2097 start: usize,
2098 end: usize,
2099 ) -> RecordBatch {
2100 assert!(end >= start);
2101 let metadata = Arc::new(fulltext_region_metadata());
2102 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2103
2104 let num_rows = end - start;
2105 let mut columns = Vec::new();
2106
2107 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2109 for _ in 0..num_rows {
2110 tag_builder.append_value(tag);
2111 }
2112 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2113
2114 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2116 columns.push(Arc::new(StringArray::from(text_bloom_values)));
2117
2118 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2120 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2121
2122 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2124 columns.push(Arc::new(UInt64Array::from(field_values)));
2125
2126 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2128 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2129
2130 let pk = new_primary_key(&[tag]);
2132 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2133 for _ in 0..num_rows {
2134 pk_builder.append(&pk).unwrap();
2135 }
2136 columns.push(Arc::new(pk_builder.finish()));
2137
2138 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2140
2141 columns.push(Arc::new(UInt8Array::from_value(
2143 OpType::Put as u8,
2144 num_rows,
2145 )));
2146
2147 RecordBatch::try_new(flat_schema, columns).unwrap()
2148 }
2149
2150 #[tokio::test]
2151 async fn test_write_flat_read_with_fulltext_index() {
2152 let mut env = TestEnv::new().await;
2153 let object_store = env.init_object_store_manager();
2154 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2155 let metadata = Arc::new(fulltext_region_metadata());
2156 let row_group_size = 50;
2157
2158 let flat_batches = vec![
2164 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2165 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2166 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2167 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2168 ];
2169
2170 let flat_source = new_flat_source_from_record_batches(flat_batches);
2171
2172 let write_opts = WriteOptions {
2173 row_group_size,
2174 ..Default::default()
2175 };
2176
2177 let indexer_builder = create_test_indexer_builder(
2178 &env,
2179 object_store.clone(),
2180 file_path.clone(),
2181 metadata.clone(),
2182 row_group_size,
2183 );
2184
2185 let mut info = write_flat_sst(
2186 object_store.clone(),
2187 metadata.clone(),
2188 indexer_builder,
2189 file_path.clone(),
2190 flat_source,
2191 &write_opts,
2192 )
2193 .await;
2194 assert_eq!(200, info.num_rows);
2195 assert!(info.file_size > 0);
2196 assert!(info.index_metadata.file_size > 0);
2197
2198 assert!(info.index_metadata.fulltext_index.index_size > 0);
2200 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2201 info.index_metadata.fulltext_index.columns.sort_unstable();
2203 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2204
2205 assert_eq!(
2206 (
2207 Timestamp::new_millisecond(0),
2208 Timestamp::new_millisecond(199)
2209 ),
2210 info.time_range
2211 );
2212
2213 let handle = create_file_handle_from_sst_info(&info, &metadata);
2214
2215 let cache = create_test_cache();
2216
2217 let matches_func = || {
2219 Arc::new(
2220 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2221 .provide(Default::default()),
2222 )
2223 };
2224
2225 let matches_term_func = || {
2226 Arc::new(
2227 ScalarFunctionFactory::from(
2228 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2229 )
2230 .provide(Default::default()),
2231 )
2232 };
2233
2234 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2237 args: vec![col("text_bloom"), "hello".lit()],
2238 func: matches_term_func(),
2239 })];
2240
2241 let fulltext_applier = FulltextIndexApplierBuilder::new(
2242 FILE_DIR.to_string(),
2243 PathType::Bare,
2244 object_store.clone(),
2245 env.get_puffin_manager(),
2246 &metadata,
2247 )
2248 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2249 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2250 .build(&preds)
2251 .unwrap()
2252 .map(Arc::new);
2253
2254 let builder = ParquetReaderBuilder::new(
2255 FILE_DIR.to_string(),
2256 PathType::Bare,
2257 handle.clone(),
2258 object_store.clone(),
2259 )
2260 .predicate(Some(Predicate::new(preds)))
2261 .fulltext_index_appliers([None, fulltext_applier.clone()])
2262 .cache(CacheStrategy::EnableAll(cache.clone()));
2263
2264 let mut metrics = ReaderMetrics::default();
2265 let (_context, selection) = builder
2266 .build_reader_input(&mut metrics)
2267 .await
2268 .unwrap()
2269 .unwrap();
2270
2271 assert_eq!(selection.row_group_count(), 2);
2273 assert_eq!(50, selection.get(0).unwrap().row_count());
2274 assert_eq!(50, selection.get(1).unwrap().row_count());
2275
2276 assert_eq!(metrics.filter_metrics.rg_total, 4);
2278 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2279 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2280 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2281
2282 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2285 args: vec![col("text_tantivy"), "lazy".lit()],
2286 func: matches_func(),
2287 })];
2288
2289 let fulltext_applier = FulltextIndexApplierBuilder::new(
2290 FILE_DIR.to_string(),
2291 PathType::Bare,
2292 object_store.clone(),
2293 env.get_puffin_manager(),
2294 &metadata,
2295 )
2296 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2297 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2298 .build(&preds)
2299 .unwrap()
2300 .map(Arc::new);
2301
2302 let builder = ParquetReaderBuilder::new(
2303 FILE_DIR.to_string(),
2304 PathType::Bare,
2305 handle.clone(),
2306 object_store.clone(),
2307 )
2308 .predicate(Some(Predicate::new(preds)))
2309 .fulltext_index_appliers([None, fulltext_applier.clone()])
2310 .cache(CacheStrategy::EnableAll(cache.clone()));
2311
2312 let mut metrics = ReaderMetrics::default();
2313 let (_context, selection) = builder
2314 .build_reader_input(&mut metrics)
2315 .await
2316 .unwrap()
2317 .unwrap();
2318
2319 assert_eq!(selection.row_group_count(), 2);
2321 assert_eq!(50, selection.get(2).unwrap().row_count());
2322 assert_eq!(50, selection.get(3).unwrap().row_count());
2323
2324 assert_eq!(metrics.filter_metrics.rg_total, 4);
2326 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2327 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2328 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2329 }
2330}