1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub num_rows: usize,
81 pub num_row_groups: u64,
83 pub file_metadata: Option<Arc<ParquetMetaData>>,
85 pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91 use std::collections::HashSet;
92 use std::sync::Arc;
93
94 use common_time::Timestamp;
95 use datafusion_common::{Column, ScalarValue};
96 use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
97 use datatypes::arrow;
98 use datatypes::arrow::array::RecordBatch;
99 use datatypes::arrow::datatypes::{DataType, Field, Schema};
100 use parquet::arrow::AsyncArrowWriter;
101 use parquet::basic::{Compression, Encoding, ZstdLevel};
102 use parquet::file::metadata::KeyValue;
103 use parquet::file::properties::WriterProperties;
104 use table::predicate::Predicate;
105 use tokio_util::compat::FuturesAsyncWriteCompatExt;
106
107 use super::*;
108 use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory};
109 use crate::cache::{CacheManager, CacheStrategy, PageKey};
110 use crate::read::BatchReader;
111 use crate::region::options::{IndexOptions, InvertedIndexOptions};
112 use crate::sst::file::{FileHandle, FileMeta};
113 use crate::sst::file_purger::NoopFilePurger;
114 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
115 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
116 use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
117 use crate::sst::parquet::format::WriteFormat;
118 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
119 use crate::sst::parquet::writer::ParquetWriter;
120 use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
121 use crate::test_util::sst_util::{
122 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
123 new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
124 sst_region_metadata,
125 };
126 use crate::test_util::{check_reader_result, TestEnv};
127
128 const FILE_DIR: &str = "/";
129
130 #[derive(Clone)]
131 struct FixedPathProvider {
132 file_id: FileId,
133 }
134
135 impl FilePathProvider for FixedPathProvider {
136 fn build_index_file_path(&self, _file_id: FileId) -> String {
137 location::index_file_path(FILE_DIR, self.file_id)
138 }
139
140 fn build_sst_file_path(&self, _file_id: FileId) -> String {
141 location::sst_file_path(FILE_DIR, self.file_id)
142 }
143 }
144
145 struct NoopIndexBuilder;
146
147 #[async_trait::async_trait]
148 impl IndexerBuilder for NoopIndexBuilder {
149 async fn build(&self, _file_id: FileId) -> Indexer {
150 Indexer::default()
151 }
152 }
153
154 #[tokio::test]
155 async fn test_write_read() {
156 let mut env = TestEnv::new().await;
157 let object_store = env.init_object_store_manager();
158 let handle = sst_file_handle(0, 1000);
159 let file_path = FixedPathProvider {
160 file_id: handle.file_id(),
161 };
162 let metadata = Arc::new(sst_region_metadata());
163 let source = new_source(&[
164 new_batch_by_range(&["a", "d"], 0, 60),
165 new_batch_by_range(&["b", "f"], 0, 40),
166 new_batch_by_range(&["b", "h"], 100, 200),
167 ]);
168 let write_opts = WriteOptions {
170 row_group_size: 50,
171 ..Default::default()
172 };
173
174 let mut writer = ParquetWriter::new_with_object_store(
175 object_store.clone(),
176 metadata.clone(),
177 NoopIndexBuilder,
178 file_path,
179 )
180 .await;
181
182 let info = writer
183 .write_all(source, None, &write_opts)
184 .await
185 .unwrap()
186 .remove(0);
187 assert_eq!(200, info.num_rows);
188 assert!(info.file_size > 0);
189 assert_eq!(
190 (
191 Timestamp::new_millisecond(0),
192 Timestamp::new_millisecond(199)
193 ),
194 info.time_range
195 );
196
197 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
198 let mut reader = builder.build().await.unwrap();
199 check_reader_result(
200 &mut reader,
201 &[
202 new_batch_by_range(&["a", "d"], 0, 50),
203 new_batch_by_range(&["a", "d"], 50, 60),
204 new_batch_by_range(&["b", "f"], 0, 40),
205 new_batch_by_range(&["b", "h"], 100, 150),
206 new_batch_by_range(&["b", "h"], 150, 200),
207 ],
208 )
209 .await;
210 }
211
212 #[tokio::test]
213 async fn test_read_with_cache() {
214 let mut env = TestEnv::new().await;
215 let object_store = env.init_object_store_manager();
216 let handle = sst_file_handle(0, 1000);
217 let metadata = Arc::new(sst_region_metadata());
218 let source = new_source(&[
219 new_batch_by_range(&["a", "d"], 0, 60),
220 new_batch_by_range(&["b", "f"], 0, 40),
221 new_batch_by_range(&["b", "h"], 100, 200),
222 ]);
223 let write_opts = WriteOptions {
225 row_group_size: 50,
226 ..Default::default()
227 };
228 let mut writer = ParquetWriter::new_with_object_store(
230 object_store.clone(),
231 metadata.clone(),
232 NoopIndexBuilder,
233 FixedPathProvider {
234 file_id: handle.file_id(),
235 },
236 )
237 .await;
238
239 writer
240 .write_all(source, None, &write_opts)
241 .await
242 .unwrap()
243 .remove(0);
244
245 let cache = CacheStrategy::EnableAll(Arc::new(
247 CacheManager::builder()
248 .page_cache_size(64 * 1024 * 1024)
249 .build(),
250 ));
251 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
252 .cache(cache.clone());
253 for _ in 0..3 {
254 let mut reader = builder.build().await.unwrap();
255 check_reader_result(
256 &mut reader,
257 &[
258 new_batch_by_range(&["a", "d"], 0, 50),
259 new_batch_by_range(&["a", "d"], 50, 60),
260 new_batch_by_range(&["b", "f"], 0, 40),
261 new_batch_by_range(&["b", "h"], 100, 150),
262 new_batch_by_range(&["b", "h"], 150, 200),
263 ],
264 )
265 .await;
266 }
267
268 let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
270 assert!(cache.get_pages(&page_key).is_none());
271
272 for i in 0..4 {
274 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
275 assert!(cache.get_pages(&page_key).is_some());
276 }
277 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
278 assert!(cache.get_pages(&page_key).is_none());
279 }
280
281 #[tokio::test]
282 async fn test_parquet_metadata_eq() {
283 let mut env = crate::test_util::TestEnv::new().await;
285 let object_store = env.init_object_store_manager();
286 let handle = sst_file_handle(0, 1000);
287 let metadata = Arc::new(sst_region_metadata());
288 let source = new_source(&[
289 new_batch_by_range(&["a", "d"], 0, 60),
290 new_batch_by_range(&["b", "f"], 0, 40),
291 new_batch_by_range(&["b", "h"], 100, 200),
292 ]);
293 let write_opts = WriteOptions {
294 row_group_size: 50,
295 ..Default::default()
296 };
297
298 let mut writer = ParquetWriter::new_with_object_store(
301 object_store.clone(),
302 metadata.clone(),
303 NoopIndexBuilder,
304 FixedPathProvider {
305 file_id: handle.file_id(),
306 },
307 )
308 .await;
309
310 let sst_info = writer
311 .write_all(source, None, &write_opts)
312 .await
313 .unwrap()
314 .remove(0);
315 let writer_metadata = sst_info.file_metadata.unwrap();
316
317 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
319 let reader = builder.build().await.unwrap();
320 let reader_metadata = reader.parquet_metadata();
321
322 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
323 }
324
325 #[tokio::test]
326 async fn test_read_with_tag_filter() {
327 let mut env = TestEnv::new().await;
328 let object_store = env.init_object_store_manager();
329 let handle = sst_file_handle(0, 1000);
330 let metadata = Arc::new(sst_region_metadata());
331 let source = new_source(&[
332 new_batch_by_range(&["a", "d"], 0, 60),
333 new_batch_by_range(&["b", "f"], 0, 40),
334 new_batch_by_range(&["b", "h"], 100, 200),
335 ]);
336 let write_opts = WriteOptions {
338 row_group_size: 50,
339 ..Default::default()
340 };
341 let mut writer = ParquetWriter::new_with_object_store(
343 object_store.clone(),
344 metadata.clone(),
345 NoopIndexBuilder,
346 FixedPathProvider {
347 file_id: handle.file_id(),
348 },
349 )
350 .await;
351 writer
352 .write_all(source, None, &write_opts)
353 .await
354 .unwrap()
355 .remove(0);
356
357 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
359 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
360 op: Operator::Eq,
361 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
362 })]));
363
364 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
365 .predicate(predicate);
366 let mut reader = builder.build().await.unwrap();
367 check_reader_result(
368 &mut reader,
369 &[
370 new_batch_by_range(&["a", "d"], 0, 50),
371 new_batch_by_range(&["a", "d"], 50, 60),
372 ],
373 )
374 .await;
375 }
376
377 #[tokio::test]
378 async fn test_read_empty_batch() {
379 let mut env = TestEnv::new().await;
380 let object_store = env.init_object_store_manager();
381 let handle = sst_file_handle(0, 1000);
382 let metadata = Arc::new(sst_region_metadata());
383 let source = new_source(&[
384 new_batch_by_range(&["a", "z"], 0, 0),
385 new_batch_by_range(&["a", "z"], 100, 100),
386 new_batch_by_range(&["a", "z"], 200, 230),
387 ]);
388 let write_opts = WriteOptions {
390 row_group_size: 50,
391 ..Default::default()
392 };
393 let mut writer = ParquetWriter::new_with_object_store(
395 object_store.clone(),
396 metadata.clone(),
397 NoopIndexBuilder,
398 FixedPathProvider {
399 file_id: handle.file_id(),
400 },
401 )
402 .await;
403 writer
404 .write_all(source, None, &write_opts)
405 .await
406 .unwrap()
407 .remove(0);
408
409 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
410 let mut reader = builder.build().await.unwrap();
411 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
412 }
413
414 #[tokio::test]
415 async fn test_read_with_field_filter() {
416 let mut env = TestEnv::new().await;
417 let object_store = env.init_object_store_manager();
418 let handle = sst_file_handle(0, 1000);
419 let metadata = Arc::new(sst_region_metadata());
420 let source = new_source(&[
421 new_batch_by_range(&["a", "d"], 0, 60),
422 new_batch_by_range(&["b", "f"], 0, 40),
423 new_batch_by_range(&["b", "h"], 100, 200),
424 ]);
425 let write_opts = WriteOptions {
427 row_group_size: 50,
428 ..Default::default()
429 };
430 let mut writer = ParquetWriter::new_with_object_store(
432 object_store.clone(),
433 metadata.clone(),
434 NoopIndexBuilder,
435 FixedPathProvider {
436 file_id: handle.file_id(),
437 },
438 )
439 .await;
440
441 writer
442 .write_all(source, None, &write_opts)
443 .await
444 .unwrap()
445 .remove(0);
446
447 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
449 left: Box::new(Expr::Column(Column::from_name("field_0"))),
450 op: Operator::GtEq,
451 right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
452 })]));
453
454 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
455 .predicate(predicate);
456 let mut reader = builder.build().await.unwrap();
457 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
458 }
459
460 #[tokio::test]
461 async fn test_read_large_binary() {
462 let mut env = TestEnv::new().await;
463 let object_store = env.init_object_store_manager();
464 let handle = sst_file_handle(0, 1000);
465 let file_path = handle.file_path(FILE_DIR);
466
467 let write_opts = WriteOptions {
468 row_group_size: 50,
469 ..Default::default()
470 };
471
472 let metadata = build_test_binary_test_region_metadata();
473 let json = metadata.to_json().unwrap();
474 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
475
476 let props_builder = WriterProperties::builder()
477 .set_key_value_metadata(Some(vec![key_value_meta]))
478 .set_compression(Compression::ZSTD(ZstdLevel::default()))
479 .set_encoding(Encoding::PLAIN)
480 .set_max_row_group_size(write_opts.row_group_size);
481
482 let writer_props = props_builder.build();
483
484 let write_format = WriteFormat::new(metadata);
485 let fields: Vec<_> = write_format
486 .arrow_schema()
487 .fields()
488 .into_iter()
489 .map(|field| {
490 let data_type = field.data_type().clone();
491 if data_type == DataType::Binary {
492 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
493 } else {
494 Field::new(field.name(), data_type, field.is_nullable())
495 }
496 })
497 .collect();
498
499 let arrow_schema = Arc::new(Schema::new(fields));
500
501 assert_eq!(
503 &DataType::LargeBinary,
504 arrow_schema.field_with_name("field_0").unwrap().data_type()
505 );
506 let mut writer = AsyncArrowWriter::try_new(
507 object_store
508 .writer_with(&file_path)
509 .concurrent(DEFAULT_WRITE_CONCURRENCY)
510 .await
511 .map(|w| w.into_futures_async_write().compat_write())
512 .unwrap(),
513 arrow_schema.clone(),
514 Some(writer_props),
515 )
516 .unwrap();
517
518 let batch = new_batch_with_binary(&["a"], 0, 60);
519 let arrow_batch = write_format.convert_batch(&batch).unwrap();
520 let arrays: Vec<_> = arrow_batch
521 .columns()
522 .iter()
523 .map(|array| {
524 let data_type = array.data_type().clone();
525 if data_type == DataType::Binary {
526 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
527 } else {
528 array.clone()
529 }
530 })
531 .collect();
532 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
533
534 writer.write(&result).await.unwrap();
535 writer.close().await.unwrap();
536
537 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
538 let mut reader = builder.build().await.unwrap();
539 check_reader_result(
540 &mut reader,
541 &[
542 new_batch_with_binary(&["a"], 0, 50),
543 new_batch_with_binary(&["a"], 50, 60),
544 ],
545 )
546 .await;
547 }
548
549 #[tokio::test]
550 async fn test_write_multiple_files() {
551 common_telemetry::init_default_ut_logging();
552 let mut env = TestEnv::new().await;
554 let object_store = env.init_object_store_manager();
555 let metadata = Arc::new(sst_region_metadata());
556 let batches = &[
557 new_batch_by_range(&["a", "d"], 0, 1000),
558 new_batch_by_range(&["b", "f"], 0, 1000),
559 new_batch_by_range(&["b", "h"], 100, 200),
560 new_batch_by_range(&["b", "h"], 200, 300),
561 new_batch_by_range(&["b", "h"], 300, 1000),
562 ];
563 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
564
565 let source = new_source(batches);
566 let write_opts = WriteOptions {
567 row_group_size: 50,
568 max_file_size: Some(1024 * 16),
569 ..Default::default()
570 };
571
572 let path_provider = RegionFilePathFactory {
573 region_dir: "test".to_string(),
574 };
575 let mut writer = ParquetWriter::new_with_object_store(
576 object_store.clone(),
577 metadata.clone(),
578 NoopIndexBuilder,
579 path_provider,
580 )
581 .await;
582
583 let files = writer.write_all(source, None, &write_opts).await.unwrap();
584 assert_eq!(2, files.len());
585
586 let mut rows_read = 0;
587 for f in &files {
588 let file_handle = sst_file_handle_with_file_id(
589 f.file_id,
590 f.time_range.0.value(),
591 f.time_range.1.value(),
592 );
593 let builder =
594 ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
595 let mut reader = builder.build().await.unwrap();
596 while let Some(batch) = reader.next_batch().await.unwrap() {
597 rows_read += batch.num_rows();
598 }
599 }
600 assert_eq!(total_rows, rows_read);
601 }
602
603 #[tokio::test]
604 async fn test_write_read_with_index() {
605 let mut env = TestEnv::new().await;
606 let object_store = env.init_object_store_manager();
607 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string());
608 let metadata = Arc::new(sst_region_metadata());
609 let row_group_size = 50;
610
611 let source = new_source(&[
612 new_batch_by_range(&["a", "d"], 0, 20),
613 new_batch_by_range(&["b", "d"], 0, 20),
614 new_batch_by_range(&["c", "d"], 0, 20),
615 new_batch_by_range(&["c", "f"], 0, 40),
616 new_batch_by_range(&["c", "h"], 100, 200),
617 ]);
618 let write_opts = WriteOptions {
620 row_group_size,
621 ..Default::default()
622 };
623
624 let puffin_manager = env
625 .get_puffin_manager()
626 .build(object_store.clone(), file_path.clone());
627 let intermediate_manager = env.get_intermediate_manager();
628
629 let indexer_builder = IndexerBuilderImpl {
630 op_type: OperationType::Flush,
631 metadata: metadata.clone(),
632 row_group_size,
633 puffin_manager,
634 intermediate_manager,
635 index_options: IndexOptions {
636 inverted_index: InvertedIndexOptions {
637 segment_row_count: 1,
638 ..Default::default()
639 },
640 },
641 inverted_index_config: Default::default(),
642 fulltext_index_config: Default::default(),
643 bloom_filter_index_config: Default::default(),
644 };
645
646 let mut writer = ParquetWriter::new_with_object_store(
647 object_store.clone(),
648 metadata.clone(),
649 indexer_builder,
650 file_path.clone(),
651 )
652 .await;
653
654 let info = writer
655 .write_all(source, None, &write_opts)
656 .await
657 .unwrap()
658 .remove(0);
659 assert_eq!(200, info.num_rows);
660 assert!(info.file_size > 0);
661 assert!(info.index_metadata.file_size > 0);
662
663 assert!(info.index_metadata.inverted_index.index_size > 0);
664 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
665 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
666
667 assert!(info.index_metadata.bloom_filter.index_size > 0);
668 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
669 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
670
671 assert_eq!(
672 (
673 Timestamp::new_millisecond(0),
674 Timestamp::new_millisecond(199)
675 ),
676 info.time_range
677 );
678
679 let handle = FileHandle::new(
680 FileMeta {
681 region_id: metadata.region_id,
682 file_id: info.file_id,
683 time_range: info.time_range,
684 level: 0,
685 file_size: info.file_size,
686 available_indexes: info.index_metadata.build_available_indexes(),
687 index_file_size: info.index_metadata.file_size,
688 num_row_groups: info.num_row_groups,
689 num_rows: info.num_rows as u64,
690 sequence: None,
691 },
692 Arc::new(NoopFilePurger),
693 );
694
695 let cache = Arc::new(
696 CacheManager::builder()
697 .index_result_cache_size(1024 * 1024)
698 .index_metadata_size(1024 * 1024)
699 .index_content_page_size(1024 * 1024)
700 .index_content_size(1024 * 1024)
701 .puffin_metadata_size(1024 * 1024)
702 .build(),
703 );
704 let index_result_cache = cache.index_result_cache().unwrap();
705
706 let build_inverted_index_applier = |exprs: &[Expr]| {
707 InvertedIndexApplierBuilder::new(
708 FILE_DIR.to_string(),
709 object_store.clone(),
710 &metadata,
711 HashSet::from_iter([0]),
712 env.get_puffin_manager(),
713 )
714 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
715 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
716 .build(exprs)
717 .unwrap()
718 .map(Arc::new)
719 };
720
721 let build_bloom_filter_applier = |exprs: &[Expr]| {
722 BloomFilterIndexApplierBuilder::new(
723 FILE_DIR.to_string(),
724 object_store.clone(),
725 &metadata,
726 env.get_puffin_manager(),
727 )
728 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
729 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
730 .build(exprs)
731 .unwrap()
732 .map(Arc::new)
733 };
734
735 let preds = vec![col("tag_0").eq(lit("b"))];
752 let inverted_index_applier = build_inverted_index_applier(&preds);
753 let bloom_filter_applier = build_bloom_filter_applier(&preds);
754
755 let builder =
756 ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
757 .predicate(Some(Predicate::new(preds)))
758 .inverted_index_applier(inverted_index_applier.clone())
759 .bloom_filter_index_applier(bloom_filter_applier.clone())
760 .cache(CacheStrategy::EnableAll(cache.clone()));
761
762 let mut metrics = ReaderMetrics::default();
763 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
764 let mut reader = ParquetReader::new(Arc::new(context), selection)
765 .await
766 .unwrap();
767 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
768
769 assert_eq!(metrics.filter_metrics.rg_total, 4);
770 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
771 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
772 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
773 let cached = index_result_cache
774 .get(
775 inverted_index_applier.unwrap().predicate_key(),
776 handle.file_id(),
777 )
778 .unwrap();
779 assert!(cached.contains_row_group(0));
781 assert!(cached.contains_row_group(1));
782 assert!(cached.contains_row_group(2));
783 assert!(cached.contains_row_group(3));
784
785 let preds = vec![
800 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
801 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
802 col("tag_1").eq(lit("d")),
803 ];
804 let inverted_index_applier = build_inverted_index_applier(&preds);
805 let bloom_filter_applier = build_bloom_filter_applier(&preds);
806
807 let builder =
808 ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
809 .predicate(Some(Predicate::new(preds)))
810 .inverted_index_applier(inverted_index_applier.clone())
811 .bloom_filter_index_applier(bloom_filter_applier.clone())
812 .cache(CacheStrategy::EnableAll(cache.clone()));
813
814 let mut metrics = ReaderMetrics::default();
815 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
816 let mut reader = ParquetReader::new(Arc::new(context), selection)
817 .await
818 .unwrap();
819 check_reader_result(&mut reader, &[]).await;
820
821 assert_eq!(metrics.filter_metrics.rg_total, 4);
822 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
823 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
824 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
825 let cached = index_result_cache
826 .get(
827 bloom_filter_applier.unwrap().predicate_key(),
828 handle.file_id(),
829 )
830 .unwrap();
831 assert!(cached.contains_row_group(2));
832 assert!(cached.contains_row_group(3));
833 assert!(!cached.contains_row_group(0));
834 assert!(!cached.contains_row_group(1));
835
836 let preds = vec![col("tag_1").eq(lit("d"))];
857 let inverted_index_applier = build_inverted_index_applier(&preds);
858 let bloom_filter_applier = build_bloom_filter_applier(&preds);
859
860 let builder =
861 ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
862 .predicate(Some(Predicate::new(preds)))
863 .inverted_index_applier(inverted_index_applier.clone())
864 .bloom_filter_index_applier(bloom_filter_applier.clone())
865 .cache(CacheStrategy::EnableAll(cache.clone()));
866
867 let mut metrics = ReaderMetrics::default();
868 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
869 let mut reader = ParquetReader::new(Arc::new(context), selection)
870 .await
871 .unwrap();
872 check_reader_result(
873 &mut reader,
874 &[
875 new_batch_by_range(&["a", "d"], 0, 20),
876 new_batch_by_range(&["b", "d"], 0, 20),
877 new_batch_by_range(&["c", "d"], 0, 10),
878 new_batch_by_range(&["c", "d"], 10, 20),
879 ],
880 )
881 .await;
882
883 assert_eq!(metrics.filter_metrics.rg_total, 4);
884 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
885 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
886 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
887 let cached = index_result_cache
888 .get(
889 bloom_filter_applier.unwrap().predicate_key(),
890 handle.file_id(),
891 )
892 .unwrap();
893 assert!(cached.contains_row_group(0));
894 assert!(cached.contains_row_group(1));
895 assert!(cached.contains_row_group(2));
896 assert!(cached.contains_row_group(3));
897 }
898}