1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod async_reader;
28pub mod file_range;
29pub mod flat_format;
30pub mod format;
31pub(crate) mod helper;
32pub(crate) mod metadata;
33pub mod prefilter;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57#[derive(Debug, Clone)]
59pub struct WriteOptions {
60 pub write_buffer_size: ReadableSize,
62 pub row_group_size: usize,
64 pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71 fn default() -> Self {
72 WriteOptions {
73 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74 row_group_size: DEFAULT_ROW_GROUP_SIZE,
75 max_file_size: None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct SstInfo {
83 pub file_id: FileId,
85 pub time_range: FileTimeRange,
88 pub file_size: u64,
90 pub max_row_group_uncompressed_size: u64,
92 pub num_rows: usize,
94 pub num_row_groups: u64,
96 pub file_metadata: Option<Arc<ParquetMetaData>>,
98 pub index_metadata: IndexOutput,
100 pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106 use std::collections::HashSet;
107 use std::sync::Arc;
108
109 use api::v1::{OpType, SemanticType};
110 use common_function::function::FunctionRef;
111 use common_function::function_factory::ScalarFunctionFactory;
112 use common_function::scalars::matches::MatchesFunction;
113 use common_function::scalars::matches_term::MatchesTermFunction;
114 use common_time::Timestamp;
115 use datafusion_common::{Column, ScalarValue};
116 use datafusion_expr::expr::ScalarFunction;
117 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118 use datatypes::arrow;
119 use datatypes::arrow::array::{
120 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121 TimestampMillisecondArray, UInt8Array, UInt64Array,
122 };
123 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124 use datatypes::arrow::util::pretty::pretty_format_batches;
125 use datatypes::prelude::ConcreteDataType;
126 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127 use object_store::ObjectStore;
128 use parquet::arrow::AsyncArrowWriter;
129 use parquet::basic::{Compression, Encoding, ZstdLevel};
130 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131 use parquet::file::properties::WriterProperties;
132 use store_api::codec::PrimaryKeyEncoding;
133 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134 use store_api::region_request::PathType;
135 use store_api::storage::{ColumnSchema, RegionId};
136 use table::predicate::Predicate;
137 use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139 use super::*;
140 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141 use crate::cache::test_util::assert_parquet_metadata_equal;
142 use crate::cache::{CacheManager, CacheStrategy, PageKey};
143 use crate::config::IndexConfig;
144 use crate::read::FlatSource;
145 use crate::region::options::{IndexOptions, InvertedIndexOptions};
146 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
147 use crate::sst::file_purger::NoopFilePurger;
148 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
149 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
150 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
151 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
152 use crate::sst::parquet::flat_format::FlatWriteFormat;
153 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
154 use crate::sst::parquet::row_selection::RowGroupSelection;
155 use crate::sst::parquet::writer::ParquetWriter;
156 use crate::sst::{
157 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
158 };
159 use crate::test_util::TestEnv;
160 use crate::test_util::sst_util::{
161 build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
162 new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
163 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
164 sst_region_metadata_with_encoding,
165 };
166
167 const FILE_DIR: &str = "/";
168 const REGION_ID: RegionId = RegionId::new(0, 0);
169
170 #[derive(Clone)]
171 struct FixedPathProvider {
172 region_file_id: RegionFileId,
173 }
174
175 impl FilePathProvider for FixedPathProvider {
176 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
177 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
178 }
179
180 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
181 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
182 }
183
184 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
185 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
186 }
187 }
188
189 struct NoopIndexBuilder;
190
191 #[async_trait::async_trait]
192 impl IndexerBuilder for NoopIndexBuilder {
193 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
194 Indexer::default()
195 }
196 }
197
198 #[tokio::test]
199 async fn test_write_read() {
200 let mut env = TestEnv::new().await;
201 let object_store = env.init_object_store_manager();
202 let handle = sst_file_handle(0, 1000);
203 let file_path = FixedPathProvider {
204 region_file_id: handle.file_id(),
205 };
206 let metadata = Arc::new(sst_region_metadata());
207 let source = new_flat_source_from_record_batches(vec![
208 new_record_batch_by_range(&["a", "d"], 0, 60),
209 new_record_batch_by_range(&["b", "f"], 0, 40),
210 new_record_batch_by_range(&["b", "h"], 100, 200),
211 ]);
212 let write_opts = WriteOptions {
214 row_group_size: 50,
215 ..Default::default()
216 };
217
218 let mut metrics = Metrics::new(WriteType::Flush);
219 let mut writer = ParquetWriter::new_with_object_store(
220 object_store.clone(),
221 metadata.clone(),
222 IndexConfig::default(),
223 NoopIndexBuilder,
224 file_path,
225 &mut metrics,
226 )
227 .await;
228
229 let info = writer
230 .write_all_flat_as_primary_key(source, None, &write_opts)
231 .await
232 .unwrap()
233 .remove(0);
234 assert_eq!(200, info.num_rows);
235 assert!(info.file_size > 0);
236 assert_eq!(
237 (
238 Timestamp::new_millisecond(0),
239 Timestamp::new_millisecond(199)
240 ),
241 info.time_range
242 );
243
244 let builder = ParquetReaderBuilder::new(
245 FILE_DIR.to_string(),
246 PathType::Bare,
247 handle.clone(),
248 object_store,
249 );
250 let mut reader = builder.build().await.unwrap().unwrap();
251 check_record_batch_reader_result(
252 &mut reader,
253 &[
254 new_record_batch_by_range(&["a", "d"], 0, 50),
255 new_record_batch_by_range(&["a", "d"], 50, 60),
256 new_record_batch_by_range(&["b", "f"], 0, 40),
257 new_record_batch_by_range(&["b", "h"], 100, 150),
258 new_record_batch_by_range(&["b", "h"], 150, 200),
259 ],
260 )
261 .await;
262 }
263
264 #[tokio::test]
265 async fn test_read_with_cache() {
266 let mut env = TestEnv::new().await;
267 let object_store = env.init_object_store_manager();
268 let handle = sst_file_handle(0, 1000);
269 let metadata = Arc::new(sst_region_metadata());
270 let source = new_flat_source_from_record_batches(vec![
271 new_record_batch_by_range(&["a", "d"], 0, 60),
272 new_record_batch_by_range(&["b", "f"], 0, 40),
273 new_record_batch_by_range(&["b", "h"], 100, 200),
274 ]);
275 let write_opts = WriteOptions {
277 row_group_size: 50,
278 ..Default::default()
279 };
280 let mut metrics = Metrics::new(WriteType::Flush);
282 let mut writer = ParquetWriter::new_with_object_store(
283 object_store.clone(),
284 metadata.clone(),
285 IndexConfig::default(),
286 NoopIndexBuilder,
287 FixedPathProvider {
288 region_file_id: handle.file_id(),
289 },
290 &mut metrics,
291 )
292 .await;
293
294 let sst_info = writer
295 .write_all_flat_as_primary_key(source, None, &write_opts)
296 .await
297 .unwrap()
298 .remove(0);
299
300 let cache = CacheStrategy::EnableAll(Arc::new(
302 CacheManager::builder()
303 .page_cache_size(64 * 1024 * 1024)
304 .build(),
305 ));
306 let builder = ParquetReaderBuilder::new(
307 FILE_DIR.to_string(),
308 PathType::Bare,
309 handle.clone(),
310 object_store,
311 )
312 .cache(cache.clone());
313 for _ in 0..3 {
314 let mut reader = builder.build().await.unwrap().unwrap();
315 check_record_batch_reader_result(
316 &mut reader,
317 &[
318 new_record_batch_by_range(&["a", "d"], 0, 50),
319 new_record_batch_by_range(&["a", "d"], 50, 60),
320 new_record_batch_by_range(&["b", "f"], 0, 40),
321 new_record_batch_by_range(&["b", "h"], 100, 150),
322 new_record_batch_by_range(&["b", "h"], 150, 200),
323 ],
324 )
325 .await;
326 }
327
328 let parquet_meta = sst_info.file_metadata.unwrap();
329 let get_ranges = |row_group_idx: usize| {
330 let row_group = parquet_meta.row_group(row_group_idx);
331 let mut ranges = Vec::with_capacity(row_group.num_columns());
332 for i in 0..row_group.num_columns() {
333 let (start, length) = row_group.column(i).byte_range();
334 ranges.push(start..start + length);
335 }
336
337 ranges
338 };
339
340 for i in 0..4 {
342 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
343 assert!(cache.get_pages(&page_key).is_some());
344 }
345 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
346 assert!(cache.get_pages(&page_key).is_none());
347 }
348
349 #[tokio::test]
350 async fn test_parquet_metadata_eq() {
351 let mut env = crate::test_util::TestEnv::new().await;
353 let object_store = env.init_object_store_manager();
354 let handle = sst_file_handle(0, 1000);
355 let metadata = Arc::new(sst_region_metadata());
356 let source = new_flat_source_from_record_batches(vec![
357 new_record_batch_by_range(&["a", "d"], 0, 60),
358 new_record_batch_by_range(&["b", "f"], 0, 40),
359 new_record_batch_by_range(&["b", "h"], 100, 200),
360 ]);
361 let write_opts = WriteOptions {
362 row_group_size: 50,
363 ..Default::default()
364 };
365
366 let mut metrics = Metrics::new(WriteType::Flush);
369 let mut writer = ParquetWriter::new_with_object_store(
370 object_store.clone(),
371 metadata.clone(),
372 IndexConfig::default(),
373 NoopIndexBuilder,
374 FixedPathProvider {
375 region_file_id: handle.file_id(),
376 },
377 &mut metrics,
378 )
379 .await;
380
381 let sst_info = writer
382 .write_all_flat_as_primary_key(source, None, &write_opts)
383 .await
384 .unwrap()
385 .remove(0);
386 let writer_metadata = sst_info.file_metadata.unwrap();
387
388 let builder = ParquetReaderBuilder::new(
390 FILE_DIR.to_string(),
391 PathType::Bare,
392 handle.clone(),
393 object_store,
394 )
395 .page_index_policy(PageIndexPolicy::Optional);
396 let reader = builder.build().await.unwrap().unwrap();
397 let reader_metadata = reader.parquet_metadata();
398 let cached_writer_metadata =
399 crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
400 .unwrap()
401 .parquet_metadata();
402
403 assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
404 }
405
406 #[tokio::test]
407 async fn test_read_with_tag_filter() {
408 let mut env = TestEnv::new().await;
409 let object_store = env.init_object_store_manager();
410 let handle = sst_file_handle(0, 1000);
411 let metadata = Arc::new(sst_region_metadata());
412 let source = new_flat_source_from_record_batches(vec![
413 new_record_batch_by_range(&["a", "d"], 0, 60),
414 new_record_batch_by_range(&["b", "f"], 0, 40),
415 new_record_batch_by_range(&["b", "h"], 100, 200),
416 ]);
417 let write_opts = WriteOptions {
419 row_group_size: 50,
420 ..Default::default()
421 };
422 let mut metrics = Metrics::new(WriteType::Flush);
424 let mut writer = ParquetWriter::new_with_object_store(
425 object_store.clone(),
426 metadata.clone(),
427 IndexConfig::default(),
428 NoopIndexBuilder,
429 FixedPathProvider {
430 region_file_id: handle.file_id(),
431 },
432 &mut metrics,
433 )
434 .await;
435 writer
436 .write_all_flat_as_primary_key(source, None, &write_opts)
437 .await
438 .unwrap()
439 .remove(0);
440
441 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
444 op: Operator::Eq,
445 right: Box::new("a".lit()),
446 })]));
447
448 let builder = ParquetReaderBuilder::new(
449 FILE_DIR.to_string(),
450 PathType::Bare,
451 handle.clone(),
452 object_store,
453 )
454 .predicate(predicate);
455 let mut reader = builder.build().await.unwrap().unwrap();
456 check_record_batch_reader_result(
457 &mut reader,
458 &[
459 new_record_batch_by_range(&["a", "d"], 0, 50),
460 new_record_batch_by_range(&["a", "d"], 50, 60),
461 ],
462 )
463 .await;
464 }
465
466 #[tokio::test]
467 async fn test_read_empty_batch() {
468 let mut env = TestEnv::new().await;
469 let object_store = env.init_object_store_manager();
470 let handle = sst_file_handle(0, 1000);
471 let metadata = Arc::new(sst_region_metadata());
472 let source = new_flat_source_from_record_batches(vec![
473 new_record_batch_by_range(&["a", "z"], 0, 0),
474 new_record_batch_by_range(&["a", "z"], 100, 100),
475 new_record_batch_by_range(&["a", "z"], 200, 230),
476 ]);
477 let write_opts = WriteOptions {
479 row_group_size: 50,
480 ..Default::default()
481 };
482 let mut metrics = Metrics::new(WriteType::Flush);
484 let mut writer = ParquetWriter::new_with_object_store(
485 object_store.clone(),
486 metadata.clone(),
487 IndexConfig::default(),
488 NoopIndexBuilder,
489 FixedPathProvider {
490 region_file_id: handle.file_id(),
491 },
492 &mut metrics,
493 )
494 .await;
495 writer
496 .write_all_flat_as_primary_key(source, None, &write_opts)
497 .await
498 .unwrap()
499 .remove(0);
500
501 let builder = ParquetReaderBuilder::new(
502 FILE_DIR.to_string(),
503 PathType::Bare,
504 handle.clone(),
505 object_store,
506 );
507 let mut reader = builder.build().await.unwrap().unwrap();
508 check_record_batch_reader_result(
509 &mut reader,
510 &[new_record_batch_by_range(&["a", "z"], 200, 230)],
511 )
512 .await;
513 }
514
515 #[tokio::test]
516 async fn test_read_with_field_filter() {
517 let mut env = TestEnv::new().await;
518 let object_store = env.init_object_store_manager();
519 let handle = sst_file_handle(0, 1000);
520 let metadata = Arc::new(sst_region_metadata());
521 let source = new_flat_source_from_record_batches(vec![
522 new_record_batch_by_range(&["a", "d"], 0, 60),
523 new_record_batch_by_range(&["b", "f"], 0, 40),
524 new_record_batch_by_range(&["b", "h"], 100, 200),
525 ]);
526 let write_opts = WriteOptions {
528 row_group_size: 50,
529 ..Default::default()
530 };
531 let mut metrics = Metrics::new(WriteType::Flush);
533 let mut writer = ParquetWriter::new_with_object_store(
534 object_store.clone(),
535 metadata.clone(),
536 IndexConfig::default(),
537 NoopIndexBuilder,
538 FixedPathProvider {
539 region_file_id: handle.file_id(),
540 },
541 &mut metrics,
542 )
543 .await;
544
545 writer
546 .write_all_flat_as_primary_key(source, None, &write_opts)
547 .await
548 .unwrap()
549 .remove(0);
550
551 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
553 left: Box::new(Expr::Column(Column::from_name("field_0"))),
554 op: Operator::GtEq,
555 right: Box::new(150u64.lit()),
556 })]));
557
558 let builder = ParquetReaderBuilder::new(
559 FILE_DIR.to_string(),
560 PathType::Bare,
561 handle.clone(),
562 object_store,
563 )
564 .predicate(predicate);
565 let mut reader = builder.build().await.unwrap().unwrap();
566 check_record_batch_reader_result(
567 &mut reader,
568 &[new_record_batch_by_range(&["b", "h"], 150, 200)],
569 )
570 .await;
571 }
572
573 #[tokio::test]
574 async fn test_read_large_binary() {
575 let mut env = TestEnv::new().await;
576 let object_store = env.init_object_store_manager();
577 let handle = sst_file_handle(0, 1000);
578 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
579
580 let write_opts = WriteOptions {
581 row_group_size: 50,
582 ..Default::default()
583 };
584
585 let metadata = build_test_binary_test_region_metadata();
586 let json = metadata.to_json().unwrap();
587 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
588
589 let props_builder = WriterProperties::builder()
590 .set_key_value_metadata(Some(vec![key_value_meta]))
591 .set_compression(Compression::ZSTD(ZstdLevel::default()))
592 .set_encoding(Encoding::PLAIN)
593 .set_max_row_group_size(write_opts.row_group_size);
594
595 let writer_props = props_builder.build();
596
597 let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
598 let fields: Vec<_> = write_format
599 .arrow_schema()
600 .fields()
601 .into_iter()
602 .map(|field| {
603 let data_type = field.data_type().clone();
604 if data_type == DataType::Binary {
605 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
606 } else {
607 Field::new(field.name(), data_type, field.is_nullable())
608 }
609 })
610 .collect();
611
612 let arrow_schema = Arc::new(Schema::new(fields));
613
614 assert_eq!(
616 &DataType::LargeBinary,
617 arrow_schema.field_with_name("field_0").unwrap().data_type()
618 );
619 let mut writer = AsyncArrowWriter::try_new(
620 object_store
621 .writer_with(&file_path)
622 .concurrent(DEFAULT_WRITE_CONCURRENCY)
623 .await
624 .map(|w| w.into_futures_async_write().compat_write())
625 .unwrap(),
626 arrow_schema.clone(),
627 Some(writer_props),
628 )
629 .unwrap();
630
631 let batch = new_record_batch_with_binary(&["a"], 0, 60);
632 let arrays: Vec<_> = batch
633 .columns()
634 .iter()
635 .map(|array| {
636 let data_type = array.data_type().clone();
637 if data_type == DataType::Binary {
638 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
639 } else {
640 array.clone()
641 }
642 })
643 .collect();
644 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
645
646 writer.write(&result).await.unwrap();
647 writer.close().await.unwrap();
648
649 let builder = ParquetReaderBuilder::new(
650 FILE_DIR.to_string(),
651 PathType::Bare,
652 handle.clone(),
653 object_store,
654 );
655 let mut reader = builder.build().await.unwrap().unwrap();
656 check_record_batch_reader_result(
657 &mut reader,
658 &[
659 new_record_batch_with_binary(&["a"], 0, 50),
660 new_record_batch_with_binary(&["a"], 50, 60),
661 ],
662 )
663 .await;
664 }
665
666 #[tokio::test]
667 async fn test_write_multiple_files() {
668 common_telemetry::init_default_ut_logging();
669 let mut env = TestEnv::new().await;
671 let object_store = env.init_object_store_manager();
672 let metadata = Arc::new(sst_region_metadata());
673 let batches = vec![
674 new_record_batch_by_range(&["a", "d"], 0, 1000),
675 new_record_batch_by_range(&["b", "f"], 0, 1000),
676 new_record_batch_by_range(&["c", "g"], 0, 1000),
677 new_record_batch_by_range(&["b", "h"], 100, 200),
678 new_record_batch_by_range(&["b", "h"], 200, 300),
679 new_record_batch_by_range(&["b", "h"], 300, 1000),
680 ];
681 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
682
683 let source = new_flat_source_from_record_batches(batches);
684 let write_opts = WriteOptions {
685 row_group_size: 50,
686 max_file_size: Some(1024 * 16),
687 ..Default::default()
688 };
689
690 let path_provider = RegionFilePathFactory {
691 table_dir: "test".to_string(),
692 path_type: PathType::Bare,
693 };
694 let mut metrics = Metrics::new(WriteType::Flush);
695 let mut writer = ParquetWriter::new_with_object_store(
696 object_store.clone(),
697 metadata.clone(),
698 IndexConfig::default(),
699 NoopIndexBuilder,
700 path_provider,
701 &mut metrics,
702 )
703 .await;
704
705 let files = writer
706 .write_all_flat_as_primary_key(source, None, &write_opts)
707 .await
708 .unwrap();
709 assert_eq!(2, files.len());
710
711 let mut rows_read = 0;
712 for f in &files {
713 let file_handle = sst_file_handle_with_file_id(
714 f.file_id,
715 f.time_range.0.value(),
716 f.time_range.1.value(),
717 );
718 let builder = ParquetReaderBuilder::new(
719 "test".to_string(),
720 PathType::Bare,
721 file_handle,
722 object_store.clone(),
723 );
724 let mut reader = builder.build().await.unwrap().unwrap();
725 while let Some(batch) = reader.next_record_batch().await.unwrap() {
726 rows_read += batch.num_rows();
727 }
728 }
729 assert_eq!(total_rows, rows_read);
730 }
731
732 #[tokio::test]
733 async fn test_write_read_with_index() {
734 let mut env = TestEnv::new().await;
735 let object_store = env.init_object_store_manager();
736 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
737 let metadata = Arc::new(sst_region_metadata());
738 let row_group_size = 50;
739
740 let source = new_flat_source_from_record_batches(vec![
741 new_record_batch_by_range(&["a", "d"], 0, 20),
742 new_record_batch_by_range(&["b", "d"], 0, 20),
743 new_record_batch_by_range(&["c", "d"], 0, 20),
744 new_record_batch_by_range(&["c", "f"], 0, 40),
745 new_record_batch_by_range(&["c", "h"], 100, 200),
746 ]);
747 let write_opts = WriteOptions {
749 row_group_size,
750 ..Default::default()
751 };
752
753 let puffin_manager = env
754 .get_puffin_manager()
755 .build(object_store.clone(), file_path.clone());
756 let intermediate_manager = env.get_intermediate_manager();
757
758 let indexer_builder = IndexerBuilderImpl {
759 build_type: IndexBuildType::Flush,
760 metadata: metadata.clone(),
761 row_group_size,
762 puffin_manager,
763 write_cache_enabled: false,
764 intermediate_manager,
765 index_options: IndexOptions {
766 inverted_index: InvertedIndexOptions {
767 segment_row_count: 1,
768 ..Default::default()
769 },
770 },
771 inverted_index_config: Default::default(),
772 fulltext_index_config: Default::default(),
773 bloom_filter_index_config: Default::default(),
774 #[cfg(feature = "vector_index")]
775 vector_index_config: Default::default(),
776 };
777
778 let mut metrics = Metrics::new(WriteType::Flush);
779 let mut writer = ParquetWriter::new_with_object_store(
780 object_store.clone(),
781 metadata.clone(),
782 IndexConfig::default(),
783 indexer_builder,
784 file_path.clone(),
785 &mut metrics,
786 )
787 .await;
788
789 let info = writer
790 .write_all_flat_as_primary_key(source, None, &write_opts)
791 .await
792 .unwrap()
793 .remove(0);
794 assert_eq!(200, info.num_rows);
795 assert!(info.file_size > 0);
796 assert!(info.index_metadata.file_size > 0);
797
798 assert!(info.index_metadata.inverted_index.index_size > 0);
799 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
800 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
801
802 assert!(info.index_metadata.bloom_filter.index_size > 0);
803 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
804 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
805
806 assert_eq!(
807 (
808 Timestamp::new_millisecond(0),
809 Timestamp::new_millisecond(199)
810 ),
811 info.time_range
812 );
813
814 let handle = FileHandle::new(
815 FileMeta {
816 region_id: metadata.region_id,
817 file_id: info.file_id,
818 time_range: info.time_range,
819 level: 0,
820 file_size: info.file_size,
821 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
822 available_indexes: info.index_metadata.build_available_indexes(),
823 indexes: info.index_metadata.build_indexes(),
824 index_file_size: info.index_metadata.file_size,
825 index_version: 0,
826 num_row_groups: info.num_row_groups,
827 num_rows: info.num_rows as u64,
828 sequence: None,
829 partition_expr: match &metadata.partition_expr {
830 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
831 .expect("partition expression should be valid JSON"),
832 None => None,
833 },
834 num_series: 0,
835 ..Default::default()
836 },
837 Arc::new(NoopFilePurger),
838 );
839
840 let cache = Arc::new(
841 CacheManager::builder()
842 .index_result_cache_size(1024 * 1024)
843 .index_metadata_size(1024 * 1024)
844 .index_content_page_size(1024 * 1024)
845 .index_content_size(1024 * 1024)
846 .puffin_metadata_size(1024 * 1024)
847 .build(),
848 );
849 let index_result_cache = cache.index_result_cache().unwrap();
850
851 let build_inverted_index_applier = |exprs: &[Expr]| {
852 InvertedIndexApplierBuilder::new(
853 FILE_DIR.to_string(),
854 PathType::Bare,
855 object_store.clone(),
856 &metadata,
857 HashSet::from_iter([0]),
858 env.get_puffin_manager(),
859 )
860 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
861 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
862 .build(exprs)
863 .unwrap()
864 .map(Arc::new)
865 };
866
867 let build_bloom_filter_applier = |exprs: &[Expr]| {
868 BloomFilterIndexApplierBuilder::new(
869 FILE_DIR.to_string(),
870 PathType::Bare,
871 object_store.clone(),
872 &metadata,
873 env.get_puffin_manager(),
874 )
875 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
876 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
877 .build(exprs)
878 .unwrap()
879 .map(Arc::new)
880 };
881
882 let preds = vec![col("tag_0").eq(lit("b"))];
899 let inverted_index_applier = build_inverted_index_applier(&preds);
900 let bloom_filter_applier = build_bloom_filter_applier(&preds);
901
902 let builder = ParquetReaderBuilder::new(
903 FILE_DIR.to_string(),
904 PathType::Bare,
905 handle.clone(),
906 object_store.clone(),
907 )
908 .predicate(Some(Predicate::new(preds)))
909 .inverted_index_appliers([inverted_index_applier.clone(), None])
910 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
911 .cache(CacheStrategy::EnableAll(cache.clone()));
912
913 let mut metrics = ReaderMetrics::default();
914 let (context, selection) = builder
915 .build_reader_input(&mut metrics)
916 .await
917 .unwrap()
918 .unwrap();
919 let mut reader = ParquetReader::new(Arc::new(context), selection)
920 .await
921 .unwrap();
922 check_record_batch_reader_result(
923 &mut reader,
924 &[new_record_batch_by_range(&["b", "d"], 0, 20)],
925 )
926 .await;
927
928 assert_eq!(metrics.filter_metrics.rg_total, 4);
929 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
930 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
931 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
932 let plan = inverted_index_applier
933 .as_ref()
934 .unwrap()
935 .plan_for_sst(&metadata)
936 .unwrap()
937 .unwrap();
938 let cached = index_result_cache
939 .get(&plan.predicate_key, handle.file_id().file_id())
940 .unwrap();
941 assert!(cached.contains_row_group(0));
943 assert!(cached.contains_row_group(1));
944 assert!(cached.contains_row_group(2));
945 assert!(cached.contains_row_group(3));
946
947 let preds = vec![
962 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
963 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
964 col("tag_1").eq(lit("d")),
965 ];
966 let inverted_index_applier = build_inverted_index_applier(&preds);
967 let bloom_filter_applier = build_bloom_filter_applier(&preds);
968
969 let builder = ParquetReaderBuilder::new(
970 FILE_DIR.to_string(),
971 PathType::Bare,
972 handle.clone(),
973 object_store.clone(),
974 )
975 .predicate(Some(Predicate::new(preds)))
976 .inverted_index_appliers([inverted_index_applier.clone(), None])
977 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
978 .cache(CacheStrategy::EnableAll(cache.clone()));
979
980 let mut metrics = ReaderMetrics::default();
981 let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
982 assert!(read_input.is_none());
983
984 assert_eq!(metrics.filter_metrics.rg_total, 4);
985 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
986 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
987 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
988 let cached = index_result_cache
989 .get(
990 bloom_filter_applier.unwrap().predicate_key(),
991 handle.file_id().file_id(),
992 )
993 .unwrap();
994 assert!(cached.contains_row_group(2));
995 assert!(cached.contains_row_group(3));
996 assert!(!cached.contains_row_group(0));
997 assert!(!cached.contains_row_group(1));
998
999 let preds = vec![col("tag_1").eq(lit("d"))];
1020 let inverted_index_applier = build_inverted_index_applier(&preds);
1021 let bloom_filter_applier = build_bloom_filter_applier(&preds);
1022
1023 let builder = ParquetReaderBuilder::new(
1024 FILE_DIR.to_string(),
1025 PathType::Bare,
1026 handle.clone(),
1027 object_store.clone(),
1028 )
1029 .predicate(Some(Predicate::new(preds)))
1030 .inverted_index_appliers([inverted_index_applier.clone(), None])
1031 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1032 .cache(CacheStrategy::EnableAll(cache.clone()));
1033
1034 let mut metrics = ReaderMetrics::default();
1035 let (context, selection) = builder
1036 .build_reader_input(&mut metrics)
1037 .await
1038 .unwrap()
1039 .unwrap();
1040 let mut reader = ParquetReader::new(Arc::new(context), selection)
1041 .await
1042 .unwrap();
1043 check_record_batch_reader_result(
1044 &mut reader,
1045 &[
1046 new_record_batch_by_range(&["a", "d"], 0, 20),
1047 new_record_batch_by_range(&["b", "d"], 0, 20),
1048 new_record_batch_by_range(&["c", "d"], 0, 10),
1049 new_record_batch_by_range(&["c", "d"], 10, 20),
1050 ],
1051 )
1052 .await;
1053
1054 assert_eq!(metrics.filter_metrics.rg_total, 4);
1055 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1056 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1057 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1058 let cached = index_result_cache
1059 .get(
1060 bloom_filter_applier.unwrap().predicate_key(),
1061 handle.file_id().file_id(),
1062 )
1063 .unwrap();
1064 assert!(cached.contains_row_group(0));
1065 assert!(cached.contains_row_group(1));
1066 assert!(cached.contains_row_group(2));
1067 assert!(cached.contains_row_group(3));
1068 }
1069
1070 fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1071 assert!(end >= start);
1072 let metadata = build_test_binary_test_region_metadata();
1073 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1074
1075 let num_rows = end - start;
1076 let mut columns = Vec::new();
1077
1078 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1079 for _ in 0..num_rows {
1080 tag_0_builder.append_value(tags[0]);
1081 }
1082 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1083
1084 let values = (0..num_rows)
1085 .map(|_| "some data".as_bytes())
1086 .collect::<Vec<_>>();
1087 columns.push(
1088 Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1089 values,
1090 )) as ArrayRef,
1091 );
1092
1093 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1094 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1095
1096 let pk = new_primary_key(tags);
1097 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1098 for _ in 0..num_rows {
1099 pk_builder.append(&pk).unwrap();
1100 }
1101 columns.push(Arc::new(pk_builder.finish()));
1102
1103 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1104 columns.push(Arc::new(UInt8Array::from_value(
1105 OpType::Put as u8,
1106 num_rows,
1107 )));
1108
1109 RecordBatch::try_new(flat_schema, columns).unwrap()
1110 }
1111
1112 async fn check_record_batch_reader_result(
1113 reader: &mut ParquetReader,
1114 expected: &[RecordBatch],
1115 ) {
1116 let mut actual = Vec::new();
1117 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1118 actual.push(batch);
1119 }
1120 assert_eq!(
1121 pretty_format_batches(expected).unwrap().to_string(),
1122 pretty_format_batches(&actual).unwrap().to_string()
1123 );
1124 assert!(reader.next_record_batch().await.unwrap().is_none());
1125 }
1126
1127 fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1128 let metadata = Arc::new(sst_region_metadata());
1129 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1130
1131 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1132 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1133 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1134 let mut field_values = Vec::with_capacity(rows.len());
1135 let mut timestamps = Vec::with_capacity(rows.len());
1136
1137 for (tag_0, tag_1, ts) in rows {
1138 tag_0_builder.append_value(*tag_0);
1139 tag_1_builder.append_value(*tag_1);
1140 pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1141 field_values.push(*ts as u64);
1142 timestamps.push(*ts);
1143 }
1144
1145 RecordBatch::try_new(
1146 flat_schema,
1147 vec![
1148 Arc::new(tag_0_builder.finish()) as ArrayRef,
1149 Arc::new(tag_1_builder.finish()) as ArrayRef,
1150 Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1151 Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1152 Arc::new(pk_builder.finish()) as ArrayRef,
1153 Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1154 Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1155 ],
1156 )
1157 .unwrap()
1158 }
1159
1160 fn new_record_batch_by_range_sparse(
1163 tags: &[&str],
1164 start: usize,
1165 end: usize,
1166 metadata: &Arc<RegionMetadata>,
1167 ) -> RecordBatch {
1168 assert!(end >= start);
1169 let flat_schema = to_flat_sst_arrow_schema(
1170 metadata,
1171 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1172 );
1173
1174 let num_rows = end - start;
1175 let mut columns: Vec<ArrayRef> = Vec::new();
1176
1177 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1181 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1182
1183 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1185 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1186
1187 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1191
1192 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1193 for _ in 0..num_rows {
1194 pk_builder.append(&pk).unwrap();
1195 }
1196 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1197
1198 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1200
1201 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1203
1204 RecordBatch::try_new(flat_schema, columns).unwrap()
1205 }
1206
1207 fn create_test_indexer_builder(
1209 env: &TestEnv,
1210 object_store: ObjectStore,
1211 file_path: RegionFilePathFactory,
1212 metadata: Arc<RegionMetadata>,
1213 row_group_size: usize,
1214 ) -> IndexerBuilderImpl {
1215 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1216 let intermediate_manager = env.get_intermediate_manager();
1217
1218 IndexerBuilderImpl {
1219 build_type: IndexBuildType::Flush,
1220 metadata,
1221 row_group_size,
1222 puffin_manager,
1223 write_cache_enabled: false,
1224 intermediate_manager,
1225 index_options: IndexOptions {
1226 inverted_index: InvertedIndexOptions {
1227 segment_row_count: 1,
1228 ..Default::default()
1229 },
1230 },
1231 inverted_index_config: Default::default(),
1232 fulltext_index_config: Default::default(),
1233 bloom_filter_index_config: Default::default(),
1234 #[cfg(feature = "vector_index")]
1235 vector_index_config: Default::default(),
1236 }
1237 }
1238
1239 async fn write_flat_sst(
1241 object_store: ObjectStore,
1242 metadata: Arc<RegionMetadata>,
1243 indexer_builder: IndexerBuilderImpl,
1244 file_path: RegionFilePathFactory,
1245 flat_source: FlatSource,
1246 write_opts: &WriteOptions,
1247 ) -> SstInfo {
1248 let mut metrics = Metrics::new(WriteType::Flush);
1249 let mut writer = ParquetWriter::new_with_object_store(
1250 object_store,
1251 metadata,
1252 IndexConfig::default(),
1253 indexer_builder,
1254 file_path,
1255 &mut metrics,
1256 )
1257 .await;
1258
1259 writer
1260 .write_all_flat(flat_source, None, write_opts)
1261 .await
1262 .unwrap()
1263 .remove(0)
1264 }
1265
1266 fn create_file_handle_from_sst_info(
1268 info: &SstInfo,
1269 metadata: &Arc<RegionMetadata>,
1270 ) -> FileHandle {
1271 FileHandle::new(
1272 FileMeta {
1273 region_id: metadata.region_id,
1274 file_id: info.file_id,
1275 time_range: info.time_range,
1276 level: 0,
1277 file_size: info.file_size,
1278 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1279 available_indexes: info.index_metadata.build_available_indexes(),
1280 indexes: info.index_metadata.build_indexes(),
1281 index_file_size: info.index_metadata.file_size,
1282 index_version: 0,
1283 num_row_groups: info.num_row_groups,
1284 num_rows: info.num_rows as u64,
1285 sequence: None,
1286 partition_expr: match &metadata.partition_expr {
1287 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1288 .expect("partition expression should be valid JSON"),
1289 None => None,
1290 },
1291 num_series: 0,
1292 ..Default::default()
1293 },
1294 Arc::new(NoopFilePurger),
1295 )
1296 }
1297
1298 fn create_test_cache() -> Arc<CacheManager> {
1300 Arc::new(
1301 CacheManager::builder()
1302 .index_result_cache_size(1024 * 1024)
1303 .index_metadata_size(1024 * 1024)
1304 .index_content_page_size(1024 * 1024)
1305 .index_content_size(1024 * 1024)
1306 .puffin_metadata_size(1024 * 1024)
1307 .build(),
1308 )
1309 }
1310
1311 #[tokio::test]
1312 async fn test_write_flat_with_index() {
1313 let mut env = TestEnv::new().await;
1314 let object_store = env.init_object_store_manager();
1315 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1316 let metadata = Arc::new(sst_region_metadata());
1317 let row_group_size = 50;
1318
1319 let flat_batches = vec![
1321 new_record_batch_by_range(&["a", "d"], 0, 20),
1322 new_record_batch_by_range(&["b", "d"], 0, 20),
1323 new_record_batch_by_range(&["c", "d"], 0, 20),
1324 new_record_batch_by_range(&["c", "f"], 0, 40),
1325 new_record_batch_by_range(&["c", "h"], 100, 200),
1326 ];
1327
1328 let flat_source = new_flat_source_from_record_batches(flat_batches);
1329
1330 let write_opts = WriteOptions {
1331 row_group_size,
1332 ..Default::default()
1333 };
1334
1335 let puffin_manager = env
1336 .get_puffin_manager()
1337 .build(object_store.clone(), file_path.clone());
1338 let intermediate_manager = env.get_intermediate_manager();
1339
1340 let indexer_builder = IndexerBuilderImpl {
1341 build_type: IndexBuildType::Flush,
1342 metadata: metadata.clone(),
1343 row_group_size,
1344 puffin_manager,
1345 write_cache_enabled: false,
1346 intermediate_manager,
1347 index_options: IndexOptions {
1348 inverted_index: InvertedIndexOptions {
1349 segment_row_count: 1,
1350 ..Default::default()
1351 },
1352 },
1353 inverted_index_config: Default::default(),
1354 fulltext_index_config: Default::default(),
1355 bloom_filter_index_config: Default::default(),
1356 #[cfg(feature = "vector_index")]
1357 vector_index_config: Default::default(),
1358 };
1359
1360 let mut metrics = Metrics::new(WriteType::Flush);
1361 let mut writer = ParquetWriter::new_with_object_store(
1362 object_store.clone(),
1363 metadata.clone(),
1364 IndexConfig::default(),
1365 indexer_builder,
1366 file_path.clone(),
1367 &mut metrics,
1368 )
1369 .await;
1370
1371 let info = writer
1372 .write_all_flat(flat_source, None, &write_opts)
1373 .await
1374 .unwrap()
1375 .remove(0);
1376 assert_eq!(200, info.num_rows);
1377 assert!(info.file_size > 0);
1378 assert!(info.index_metadata.file_size > 0);
1379
1380 assert!(info.index_metadata.inverted_index.index_size > 0);
1381 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1382 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1383
1384 assert!(info.index_metadata.bloom_filter.index_size > 0);
1385 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1386 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1387
1388 assert_eq!(
1389 (
1390 Timestamp::new_millisecond(0),
1391 Timestamp::new_millisecond(199)
1392 ),
1393 info.time_range
1394 );
1395 }
1396
1397 #[tokio::test]
1398 async fn test_read_with_override_sequence() {
1399 let mut env = TestEnv::new().await;
1400 let object_store = env.init_object_store_manager();
1401 let handle = sst_file_handle(0, 1000);
1402 let file_path = FixedPathProvider {
1403 region_file_id: handle.file_id(),
1404 };
1405 let metadata = Arc::new(sst_region_metadata());
1406
1407 let source = new_flat_source_from_record_batches(vec![
1409 new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1410 new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1411 ]);
1412
1413 let write_opts = WriteOptions {
1414 row_group_size: 50,
1415 ..Default::default()
1416 };
1417
1418 let mut metrics = Metrics::new(WriteType::Flush);
1419 let mut writer = ParquetWriter::new_with_object_store(
1420 object_store.clone(),
1421 metadata.clone(),
1422 IndexConfig::default(),
1423 NoopIndexBuilder,
1424 file_path,
1425 &mut metrics,
1426 )
1427 .await;
1428
1429 writer
1430 .write_all_flat_as_primary_key(source, None, &write_opts)
1431 .await
1432 .unwrap()
1433 .remove(0);
1434
1435 let builder = ParquetReaderBuilder::new(
1437 FILE_DIR.to_string(),
1438 PathType::Bare,
1439 handle.clone(),
1440 object_store.clone(),
1441 );
1442 let mut reader = builder.build().await.unwrap().unwrap();
1443 let mut normal_batches = Vec::new();
1444 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1445 normal_batches.push(batch);
1446 }
1447
1448 let custom_sequence = 12345u64;
1450 let file_meta = handle.meta_ref();
1451 let mut override_file_meta = file_meta.clone();
1452 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1453 let override_handle = FileHandle::new(
1454 override_file_meta,
1455 Arc::new(crate::sst::file_purger::NoopFilePurger),
1456 );
1457
1458 let builder = ParquetReaderBuilder::new(
1459 FILE_DIR.to_string(),
1460 PathType::Bare,
1461 override_handle,
1462 object_store.clone(),
1463 );
1464 let mut reader = builder.build().await.unwrap().unwrap();
1465 let mut override_batches = Vec::new();
1466 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1467 override_batches.push(batch);
1468 }
1469
1470 assert_eq!(normal_batches.len(), override_batches.len());
1472 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1473 let expected_batch = {
1474 let mut columns = normal.columns().to_vec();
1475 let num_cols = columns.len();
1476 columns[num_cols - 2] =
1477 Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1478 RecordBatch::try_new(normal.schema(), columns).unwrap()
1479 };
1480
1481 assert_eq!(*override_batch, expected_batch);
1483 }
1484 }
1485
1486 #[tokio::test]
1487 async fn test_write_flat_read_with_inverted_index() {
1488 let mut env = TestEnv::new().await;
1489 let object_store = env.init_object_store_manager();
1490 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1491 let metadata = Arc::new(sst_region_metadata());
1492 let row_group_size = 100;
1493
1494 let flat_batches = vec![
1502 new_record_batch_by_range(&["a", "d"], 0, 50),
1503 new_record_batch_by_range(&["b", "d"], 50, 100),
1504 new_record_batch_by_range(&["c", "d"], 100, 150),
1505 new_record_batch_by_range(&["c", "f"], 150, 200),
1506 ];
1507
1508 let flat_source = new_flat_source_from_record_batches(flat_batches);
1509
1510 let write_opts = WriteOptions {
1511 row_group_size,
1512 ..Default::default()
1513 };
1514
1515 let indexer_builder = create_test_indexer_builder(
1516 &env,
1517 object_store.clone(),
1518 file_path.clone(),
1519 metadata.clone(),
1520 row_group_size,
1521 );
1522
1523 let info = write_flat_sst(
1524 object_store.clone(),
1525 metadata.clone(),
1526 indexer_builder,
1527 file_path.clone(),
1528 flat_source,
1529 &write_opts,
1530 )
1531 .await;
1532 assert_eq!(200, info.num_rows);
1533 assert!(info.file_size > 0);
1534 assert!(info.index_metadata.file_size > 0);
1535
1536 let handle = create_file_handle_from_sst_info(&info, &metadata);
1537
1538 let cache = create_test_cache();
1539
1540 let preds = vec![col("tag_0").eq(lit("b"))];
1543 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1544 FILE_DIR.to_string(),
1545 PathType::Bare,
1546 object_store.clone(),
1547 &metadata,
1548 HashSet::from_iter([0]),
1549 env.get_puffin_manager(),
1550 )
1551 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1552 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1553 .build(&preds)
1554 .unwrap()
1555 .map(Arc::new);
1556
1557 let builder = ParquetReaderBuilder::new(
1558 FILE_DIR.to_string(),
1559 PathType::Bare,
1560 handle.clone(),
1561 object_store.clone(),
1562 )
1563 .predicate(Some(Predicate::new(preds)))
1564 .inverted_index_appliers([inverted_index_applier.clone(), None])
1565 .cache(CacheStrategy::EnableAll(cache.clone()));
1566
1567 let mut metrics = ReaderMetrics::default();
1568 let (_context, selection) = builder
1569 .build_reader_input(&mut metrics)
1570 .await
1571 .unwrap()
1572 .unwrap();
1573
1574 assert_eq!(selection.row_group_count(), 1);
1576 assert_eq!(50, selection.get(0).unwrap().row_count());
1577
1578 assert_eq!(metrics.filter_metrics.rg_total, 2);
1580 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1581 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1582 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1583 }
1584
1585 #[tokio::test]
1586 async fn test_write_flat_read_with_bloom_filter() {
1587 let mut env = TestEnv::new().await;
1588 let object_store = env.init_object_store_manager();
1589 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1590 let metadata = Arc::new(sst_region_metadata());
1591 let row_group_size = 100;
1592
1593 let flat_batches = vec![
1601 new_record_batch_by_range(&["a", "d"], 0, 50),
1602 new_record_batch_by_range(&["b", "e"], 50, 100),
1603 new_record_batch_by_range(&["c", "d"], 100, 150),
1604 new_record_batch_by_range(&["c", "f"], 150, 200),
1605 ];
1606
1607 let flat_source = new_flat_source_from_record_batches(flat_batches);
1608
1609 let write_opts = WriteOptions {
1610 row_group_size,
1611 ..Default::default()
1612 };
1613
1614 let indexer_builder = create_test_indexer_builder(
1615 &env,
1616 object_store.clone(),
1617 file_path.clone(),
1618 metadata.clone(),
1619 row_group_size,
1620 );
1621
1622 let info = write_flat_sst(
1623 object_store.clone(),
1624 metadata.clone(),
1625 indexer_builder,
1626 file_path.clone(),
1627 flat_source,
1628 &write_opts,
1629 )
1630 .await;
1631 assert_eq!(200, info.num_rows);
1632 assert!(info.file_size > 0);
1633 assert!(info.index_metadata.file_size > 0);
1634
1635 let handle = create_file_handle_from_sst_info(&info, &metadata);
1636
1637 let cache = create_test_cache();
1638
1639 let preds = vec![
1642 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1643 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1644 col("tag_1").eq(lit("d")),
1645 ];
1646 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1647 FILE_DIR.to_string(),
1648 PathType::Bare,
1649 object_store.clone(),
1650 &metadata,
1651 env.get_puffin_manager(),
1652 )
1653 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1654 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1655 .build(&preds)
1656 .unwrap()
1657 .map(Arc::new);
1658
1659 let builder = ParquetReaderBuilder::new(
1660 FILE_DIR.to_string(),
1661 PathType::Bare,
1662 handle.clone(),
1663 object_store.clone(),
1664 )
1665 .predicate(Some(Predicate::new(preds)))
1666 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1667 .cache(CacheStrategy::EnableAll(cache.clone()));
1668
1669 let mut metrics = ReaderMetrics::default();
1670 let (_context, selection) = builder
1671 .build_reader_input(&mut metrics)
1672 .await
1673 .unwrap()
1674 .unwrap();
1675
1676 assert_eq!(selection.row_group_count(), 2);
1678 assert_eq!(50, selection.get(0).unwrap().row_count());
1679 assert_eq!(50, selection.get(1).unwrap().row_count());
1680
1681 assert_eq!(metrics.filter_metrics.rg_total, 2);
1683 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1684 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1685 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1686 }
1687
1688 #[tokio::test]
1689 async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1690 let mut env = TestEnv::new().await;
1691 let object_store = env.init_object_store_manager();
1692 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1693 let metadata = Arc::new(sst_region_metadata());
1694 let row_group_size = 10;
1695
1696 let flat_source = new_flat_source_from_record_batches(vec![
1697 new_record_batch_by_range(&["a", "d"], 0, 3),
1698 new_record_batch_by_range(&["b", "d"], 3, 10),
1699 ]);
1700 let write_opts = WriteOptions {
1701 row_group_size,
1702 ..Default::default()
1703 };
1704 let indexer_builder = create_test_indexer_builder(
1705 &env,
1706 object_store.clone(),
1707 file_path.clone(),
1708 metadata.clone(),
1709 row_group_size,
1710 );
1711 let info = write_flat_sst(
1712 object_store.clone(),
1713 metadata.clone(),
1714 indexer_builder,
1715 file_path,
1716 flat_source,
1717 &write_opts,
1718 )
1719 .await;
1720 let handle = create_file_handle_from_sst_info(&info, &metadata);
1721
1722 let builder =
1723 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1724 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1725
1726 let mut metrics = ReaderMetrics::default();
1727 let (context, _) = builder
1728 .build_reader_input(&mut metrics)
1729 .await
1730 .unwrap()
1731 .unwrap();
1732 let selection = RowGroupSelection::from_row_ranges(
1733 vec![(0, std::iter::once(0..6).collect())],
1734 row_group_size,
1735 );
1736
1737 let mut reader = ParquetReader::new(Arc::new(context), selection)
1738 .await
1739 .unwrap();
1740 check_record_batch_reader_result(
1741 &mut reader,
1742 &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1743 )
1744 .await;
1745 }
1746
1747 #[tokio::test]
1748 async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1749 let mut env = TestEnv::new().await;
1750 let object_store = env.init_object_store_manager();
1751 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1752 let metadata = Arc::new(sst_region_metadata());
1753 let row_group_size = 8;
1754
1755 let flat_source = new_flat_source_from_record_batches(vec![
1756 new_record_batch_by_range(&["a", "d"], 0, 2),
1757 new_record_batch_by_range(&["b", "d"], 2, 4),
1758 new_record_batch_by_range(&["a", "d"], 4, 6),
1759 new_record_batch_by_range(&["c", "d"], 6, 8),
1760 ]);
1761 let write_opts = WriteOptions {
1762 row_group_size,
1763 ..Default::default()
1764 };
1765 let indexer_builder = create_test_indexer_builder(
1766 &env,
1767 object_store.clone(),
1768 file_path.clone(),
1769 metadata.clone(),
1770 row_group_size,
1771 );
1772 let info = write_flat_sst(
1773 object_store.clone(),
1774 metadata.clone(),
1775 indexer_builder,
1776 file_path,
1777 flat_source,
1778 &write_opts,
1779 )
1780 .await;
1781 let handle = create_file_handle_from_sst_info(&info, &metadata);
1782
1783 let builder =
1784 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1785 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1786
1787 let mut metrics = ReaderMetrics::default();
1788 let (context, _) = builder
1789 .build_reader_input(&mut metrics)
1790 .await
1791 .unwrap()
1792 .unwrap();
1793 let selection = RowGroupSelection::from_row_ranges(
1794 vec![(0, std::iter::once(0..8).collect())],
1795 row_group_size,
1796 );
1797
1798 let mut reader = ParquetReader::new(Arc::new(context), selection)
1799 .await
1800 .unwrap();
1801 check_record_batch_reader_result(
1802 &mut reader,
1803 &[new_record_batch_from_rows(&[
1804 ("a", "d", 0),
1805 ("a", "d", 1),
1806 ("a", "d", 4),
1807 ("a", "d", 5),
1808 ])],
1809 )
1810 .await;
1811 }
1812
1813 #[tokio::test]
1814 async fn test_write_flat_read_with_inverted_index_sparse() {
1815 common_telemetry::init_default_ut_logging();
1816
1817 let mut env = TestEnv::new().await;
1818 let object_store = env.init_object_store_manager();
1819 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1820 let metadata = Arc::new(sst_region_metadata_with_encoding(
1821 PrimaryKeyEncoding::Sparse,
1822 ));
1823 let row_group_size = 100;
1824
1825 let flat_batches = vec![
1833 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1834 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1835 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1836 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1837 ];
1838
1839 let flat_source = new_flat_source_from_record_batches(flat_batches);
1840
1841 let write_opts = WriteOptions {
1842 row_group_size,
1843 ..Default::default()
1844 };
1845
1846 let indexer_builder = create_test_indexer_builder(
1847 &env,
1848 object_store.clone(),
1849 file_path.clone(),
1850 metadata.clone(),
1851 row_group_size,
1852 );
1853
1854 let info = write_flat_sst(
1855 object_store.clone(),
1856 metadata.clone(),
1857 indexer_builder,
1858 file_path.clone(),
1859 flat_source,
1860 &write_opts,
1861 )
1862 .await;
1863 assert_eq!(200, info.num_rows);
1864 assert!(info.file_size > 0);
1865 assert!(info.index_metadata.file_size > 0);
1866
1867 let handle = create_file_handle_from_sst_info(&info, &metadata);
1868
1869 let cache = create_test_cache();
1870
1871 let preds = vec![col("tag_0").eq(lit("b"))];
1874 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1875 FILE_DIR.to_string(),
1876 PathType::Bare,
1877 object_store.clone(),
1878 &metadata,
1879 HashSet::from_iter([0]),
1880 env.get_puffin_manager(),
1881 )
1882 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1883 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1884 .build(&preds)
1885 .unwrap()
1886 .map(Arc::new);
1887
1888 let builder = ParquetReaderBuilder::new(
1889 FILE_DIR.to_string(),
1890 PathType::Bare,
1891 handle.clone(),
1892 object_store.clone(),
1893 )
1894 .predicate(Some(Predicate::new(preds)))
1895 .inverted_index_appliers([inverted_index_applier.clone(), None])
1896 .cache(CacheStrategy::EnableAll(cache.clone()));
1897
1898 let mut metrics = ReaderMetrics::default();
1899 let (_context, selection) = builder
1900 .build_reader_input(&mut metrics)
1901 .await
1902 .unwrap()
1903 .unwrap();
1904
1905 assert_eq!(selection.row_group_count(), 1);
1907 assert_eq!(50, selection.get(0).unwrap().row_count());
1908
1909 assert_eq!(metrics.filter_metrics.rg_total, 2);
1913 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1915 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1916 }
1917
1918 #[tokio::test]
1919 async fn test_write_flat_read_with_bloom_filter_sparse() {
1920 let mut env = TestEnv::new().await;
1921 let object_store = env.init_object_store_manager();
1922 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1923 let metadata = Arc::new(sst_region_metadata_with_encoding(
1924 PrimaryKeyEncoding::Sparse,
1925 ));
1926 let row_group_size = 100;
1927
1928 let flat_batches = vec![
1936 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1937 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1938 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1939 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1940 ];
1941
1942 let flat_source = new_flat_source_from_record_batches(flat_batches);
1943
1944 let write_opts = WriteOptions {
1945 row_group_size,
1946 ..Default::default()
1947 };
1948
1949 let indexer_builder = create_test_indexer_builder(
1950 &env,
1951 object_store.clone(),
1952 file_path.clone(),
1953 metadata.clone(),
1954 row_group_size,
1955 );
1956
1957 let info = write_flat_sst(
1958 object_store.clone(),
1959 metadata.clone(),
1960 indexer_builder,
1961 file_path.clone(),
1962 flat_source,
1963 &write_opts,
1964 )
1965 .await;
1966 assert_eq!(200, info.num_rows);
1967 assert!(info.file_size > 0);
1968 assert!(info.index_metadata.file_size > 0);
1969
1970 let handle = create_file_handle_from_sst_info(&info, &metadata);
1971
1972 let cache = create_test_cache();
1973
1974 let preds = vec![
1977 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1978 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1979 col("tag_1").eq(lit("d")),
1980 ];
1981 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1982 FILE_DIR.to_string(),
1983 PathType::Bare,
1984 object_store.clone(),
1985 &metadata,
1986 env.get_puffin_manager(),
1987 )
1988 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1989 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1990 .build(&preds)
1991 .unwrap()
1992 .map(Arc::new);
1993
1994 let builder = ParquetReaderBuilder::new(
1995 FILE_DIR.to_string(),
1996 PathType::Bare,
1997 handle.clone(),
1998 object_store.clone(),
1999 )
2000 .predicate(Some(Predicate::new(preds)))
2001 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2002 .cache(CacheStrategy::EnableAll(cache.clone()));
2003
2004 let mut metrics = ReaderMetrics::default();
2005 let (_context, selection) = builder
2006 .build_reader_input(&mut metrics)
2007 .await
2008 .unwrap()
2009 .unwrap();
2010
2011 assert_eq!(selection.row_group_count(), 2);
2013 assert_eq!(50, selection.get(0).unwrap().row_count());
2014 assert_eq!(50, selection.get(1).unwrap().row_count());
2015
2016 assert_eq!(metrics.filter_metrics.rg_total, 2);
2018 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2019 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2020 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2021 }
2022
2023 fn fulltext_region_metadata() -> RegionMetadata {
2026 let mut builder = RegionMetadataBuilder::new(REGION_ID);
2027 builder
2028 .push_column_metadata(ColumnMetadata {
2029 column_schema: ColumnSchema::new(
2030 "tag_0".to_string(),
2031 ConcreteDataType::string_datatype(),
2032 true,
2033 ),
2034 semantic_type: SemanticType::Tag,
2035 column_id: 0,
2036 })
2037 .push_column_metadata(ColumnMetadata {
2038 column_schema: ColumnSchema::new(
2039 "text_bloom".to_string(),
2040 ConcreteDataType::string_datatype(),
2041 true,
2042 )
2043 .with_fulltext_options(FulltextOptions {
2044 enable: true,
2045 analyzer: FulltextAnalyzer::English,
2046 case_sensitive: false,
2047 backend: FulltextBackend::Bloom,
2048 granularity: 1,
2049 false_positive_rate_in_10000: 50,
2050 })
2051 .unwrap(),
2052 semantic_type: SemanticType::Field,
2053 column_id: 1,
2054 })
2055 .push_column_metadata(ColumnMetadata {
2056 column_schema: ColumnSchema::new(
2057 "text_tantivy".to_string(),
2058 ConcreteDataType::string_datatype(),
2059 true,
2060 )
2061 .with_fulltext_options(FulltextOptions {
2062 enable: true,
2063 analyzer: FulltextAnalyzer::English,
2064 case_sensitive: false,
2065 backend: FulltextBackend::Tantivy,
2066 granularity: 1,
2067 false_positive_rate_in_10000: 50,
2068 })
2069 .unwrap(),
2070 semantic_type: SemanticType::Field,
2071 column_id: 2,
2072 })
2073 .push_column_metadata(ColumnMetadata {
2074 column_schema: ColumnSchema::new(
2075 "field_0".to_string(),
2076 ConcreteDataType::uint64_datatype(),
2077 true,
2078 ),
2079 semantic_type: SemanticType::Field,
2080 column_id: 3,
2081 })
2082 .push_column_metadata(ColumnMetadata {
2083 column_schema: ColumnSchema::new(
2084 "ts".to_string(),
2085 ConcreteDataType::timestamp_millisecond_datatype(),
2086 false,
2087 ),
2088 semantic_type: SemanticType::Timestamp,
2089 column_id: 4,
2090 })
2091 .primary_key(vec![0]);
2092 builder.build().unwrap()
2093 }
2094
2095 fn new_fulltext_record_batch_by_range(
2097 tag: &str,
2098 text_bloom: &str,
2099 text_tantivy: &str,
2100 start: usize,
2101 end: usize,
2102 ) -> RecordBatch {
2103 assert!(end >= start);
2104 let metadata = Arc::new(fulltext_region_metadata());
2105 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2106
2107 let num_rows = end - start;
2108 let mut columns = Vec::new();
2109
2110 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2112 for _ in 0..num_rows {
2113 tag_builder.append_value(tag);
2114 }
2115 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2116
2117 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2119 columns.push(Arc::new(StringArray::from(text_bloom_values)));
2120
2121 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2123 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2124
2125 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2127 columns.push(Arc::new(UInt64Array::from(field_values)));
2128
2129 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2131 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2132
2133 let pk = new_primary_key(&[tag]);
2135 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2136 for _ in 0..num_rows {
2137 pk_builder.append(&pk).unwrap();
2138 }
2139 columns.push(Arc::new(pk_builder.finish()));
2140
2141 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2143
2144 columns.push(Arc::new(UInt8Array::from_value(
2146 OpType::Put as u8,
2147 num_rows,
2148 )));
2149
2150 RecordBatch::try_new(flat_schema, columns).unwrap()
2151 }
2152
2153 #[tokio::test]
2154 async fn test_write_flat_read_with_fulltext_index() {
2155 let mut env = TestEnv::new().await;
2156 let object_store = env.init_object_store_manager();
2157 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2158 let metadata = Arc::new(fulltext_region_metadata());
2159 let row_group_size = 50;
2160
2161 let flat_batches = vec![
2167 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2168 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2169 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2170 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2171 ];
2172
2173 let flat_source = new_flat_source_from_record_batches(flat_batches);
2174
2175 let write_opts = WriteOptions {
2176 row_group_size,
2177 ..Default::default()
2178 };
2179
2180 let indexer_builder = create_test_indexer_builder(
2181 &env,
2182 object_store.clone(),
2183 file_path.clone(),
2184 metadata.clone(),
2185 row_group_size,
2186 );
2187
2188 let mut info = write_flat_sst(
2189 object_store.clone(),
2190 metadata.clone(),
2191 indexer_builder,
2192 file_path.clone(),
2193 flat_source,
2194 &write_opts,
2195 )
2196 .await;
2197 assert_eq!(200, info.num_rows);
2198 assert!(info.file_size > 0);
2199 assert!(info.index_metadata.file_size > 0);
2200
2201 assert!(info.index_metadata.fulltext_index.index_size > 0);
2203 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2204 info.index_metadata.fulltext_index.columns.sort_unstable();
2206 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2207
2208 assert_eq!(
2209 (
2210 Timestamp::new_millisecond(0),
2211 Timestamp::new_millisecond(199)
2212 ),
2213 info.time_range
2214 );
2215
2216 let handle = create_file_handle_from_sst_info(&info, &metadata);
2217
2218 let cache = create_test_cache();
2219
2220 let matches_func = || {
2222 Arc::new(
2223 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2224 .provide(Default::default()),
2225 )
2226 };
2227
2228 let matches_term_func = || {
2229 Arc::new(
2230 ScalarFunctionFactory::from(
2231 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2232 )
2233 .provide(Default::default()),
2234 )
2235 };
2236
2237 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2240 args: vec![col("text_bloom"), "hello".lit()],
2241 func: matches_term_func(),
2242 })];
2243
2244 let fulltext_applier = FulltextIndexApplierBuilder::new(
2245 FILE_DIR.to_string(),
2246 PathType::Bare,
2247 object_store.clone(),
2248 env.get_puffin_manager(),
2249 &metadata,
2250 )
2251 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2252 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2253 .build(&preds)
2254 .unwrap()
2255 .map(Arc::new);
2256
2257 let builder = ParquetReaderBuilder::new(
2258 FILE_DIR.to_string(),
2259 PathType::Bare,
2260 handle.clone(),
2261 object_store.clone(),
2262 )
2263 .predicate(Some(Predicate::new(preds)))
2264 .fulltext_index_appliers([None, fulltext_applier.clone()])
2265 .cache(CacheStrategy::EnableAll(cache.clone()));
2266
2267 let mut metrics = ReaderMetrics::default();
2268 let (_context, selection) = builder
2269 .build_reader_input(&mut metrics)
2270 .await
2271 .unwrap()
2272 .unwrap();
2273
2274 assert_eq!(selection.row_group_count(), 2);
2276 assert_eq!(50, selection.get(0).unwrap().row_count());
2277 assert_eq!(50, selection.get(1).unwrap().row_count());
2278
2279 assert_eq!(metrics.filter_metrics.rg_total, 4);
2281 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2282 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2283 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2284
2285 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2288 args: vec![col("text_tantivy"), "lazy".lit()],
2289 func: matches_func(),
2290 })];
2291
2292 let fulltext_applier = FulltextIndexApplierBuilder::new(
2293 FILE_DIR.to_string(),
2294 PathType::Bare,
2295 object_store.clone(),
2296 env.get_puffin_manager(),
2297 &metadata,
2298 )
2299 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2300 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2301 .build(&preds)
2302 .unwrap()
2303 .map(Arc::new);
2304
2305 let builder = ParquetReaderBuilder::new(
2306 FILE_DIR.to_string(),
2307 PathType::Bare,
2308 handle.clone(),
2309 object_store.clone(),
2310 )
2311 .predicate(Some(Predicate::new(preds)))
2312 .fulltext_index_appliers([None, fulltext_applier.clone()])
2313 .cache(CacheStrategy::EnableAll(cache.clone()));
2314
2315 let mut metrics = ReaderMetrics::default();
2316 let (_context, selection) = builder
2317 .build_reader_input(&mut metrics)
2318 .await
2319 .unwrap()
2320 .unwrap();
2321
2322 assert_eq!(selection.row_group_count(), 2);
2324 assert_eq!(50, selection.get(2).unwrap().row_count());
2325 assert_eq!(50, selection.get(3).unwrap().row_count());
2326
2327 assert_eq!(metrics.filter_metrics.rg_total, 4);
2329 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2330 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2331 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2332 }
2333}