1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub num_rows: usize,
81 pub num_row_groups: u64,
83 pub file_metadata: Option<Arc<ParquetMetaData>>,
85 pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91 use std::collections::HashSet;
92 use std::sync::Arc;
93
94 use common_time::Timestamp;
95 use datafusion_common::{Column, ScalarValue};
96 use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
97 use datatypes::arrow;
98 use datatypes::arrow::array::{RecordBatch, UInt64Array};
99 use datatypes::arrow::datatypes::{DataType, Field, Schema};
100 use parquet::arrow::AsyncArrowWriter;
101 use parquet::basic::{Compression, Encoding, ZstdLevel};
102 use parquet::file::metadata::KeyValue;
103 use parquet::file::properties::WriterProperties;
104 use store_api::region_request::PathType;
105 use table::predicate::Predicate;
106 use tokio_util::compat::FuturesAsyncWriteCompatExt;
107
108 use super::*;
109 use crate::access_layer::{
110 FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
111 };
112 use crate::cache::{CacheManager, CacheStrategy, PageKey};
113 use crate::read::{BatchBuilder, BatchReader};
114 use crate::region::options::{IndexOptions, InvertedIndexOptions};
115 use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
116 use crate::sst::file_purger::NoopFilePurger;
117 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
118 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
119 use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
120 use crate::sst::parquet::format::WriteFormat;
121 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
122 use crate::sst::parquet::writer::ParquetWriter;
123 use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
124 use crate::test_util::sst_util::{
125 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
126 new_batch_with_binary, new_batch_with_custom_sequence, new_source, sst_file_handle,
127 sst_file_handle_with_file_id, sst_region_metadata,
128 };
129 use crate::test_util::{check_reader_result, TestEnv};
130
131 const FILE_DIR: &str = "/";
132
133 #[derive(Clone)]
134 struct FixedPathProvider {
135 region_file_id: RegionFileId,
136 }
137
138 impl FilePathProvider for FixedPathProvider {
139 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
140 location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
141 }
142
143 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
144 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
145 }
146 }
147
148 struct NoopIndexBuilder;
149
150 #[async_trait::async_trait]
151 impl IndexerBuilder for NoopIndexBuilder {
152 async fn build(&self, _file_id: FileId) -> Indexer {
153 Indexer::default()
154 }
155 }
156
157 #[tokio::test]
158 async fn test_write_read() {
159 let mut env = TestEnv::new().await;
160 let object_store = env.init_object_store_manager();
161 let handle = sst_file_handle(0, 1000);
162 let file_path = FixedPathProvider {
163 region_file_id: handle.file_id(),
164 };
165 let metadata = Arc::new(sst_region_metadata());
166 let source = new_source(&[
167 new_batch_by_range(&["a", "d"], 0, 60),
168 new_batch_by_range(&["b", "f"], 0, 40),
169 new_batch_by_range(&["b", "h"], 100, 200),
170 ]);
171 let write_opts = WriteOptions {
173 row_group_size: 50,
174 ..Default::default()
175 };
176
177 let mut writer = ParquetWriter::new_with_object_store(
178 object_store.clone(),
179 metadata.clone(),
180 NoopIndexBuilder,
181 file_path,
182 Metrics::new(WriteType::Flush),
183 )
184 .await;
185
186 let info = writer
187 .write_all(source, None, &write_opts)
188 .await
189 .unwrap()
190 .remove(0);
191 assert_eq!(200, info.num_rows);
192 assert!(info.file_size > 0);
193 assert_eq!(
194 (
195 Timestamp::new_millisecond(0),
196 Timestamp::new_millisecond(199)
197 ),
198 info.time_range
199 );
200
201 let builder = ParquetReaderBuilder::new(
202 FILE_DIR.to_string(),
203 PathType::Bare,
204 handle.clone(),
205 object_store,
206 );
207 let mut reader = builder.build().await.unwrap();
208 check_reader_result(
209 &mut reader,
210 &[
211 new_batch_by_range(&["a", "d"], 0, 50),
212 new_batch_by_range(&["a", "d"], 50, 60),
213 new_batch_by_range(&["b", "f"], 0, 40),
214 new_batch_by_range(&["b", "h"], 100, 150),
215 new_batch_by_range(&["b", "h"], 150, 200),
216 ],
217 )
218 .await;
219 }
220
221 #[tokio::test]
222 async fn test_read_with_cache() {
223 let mut env = TestEnv::new().await;
224 let object_store = env.init_object_store_manager();
225 let handle = sst_file_handle(0, 1000);
226 let metadata = Arc::new(sst_region_metadata());
227 let source = new_source(&[
228 new_batch_by_range(&["a", "d"], 0, 60),
229 new_batch_by_range(&["b", "f"], 0, 40),
230 new_batch_by_range(&["b", "h"], 100, 200),
231 ]);
232 let write_opts = WriteOptions {
234 row_group_size: 50,
235 ..Default::default()
236 };
237 let mut writer = ParquetWriter::new_with_object_store(
239 object_store.clone(),
240 metadata.clone(),
241 NoopIndexBuilder,
242 FixedPathProvider {
243 region_file_id: handle.file_id(),
244 },
245 Metrics::new(WriteType::Flush),
246 )
247 .await;
248
249 writer
250 .write_all(source, None, &write_opts)
251 .await
252 .unwrap()
253 .remove(0);
254
255 let cache = CacheStrategy::EnableAll(Arc::new(
257 CacheManager::builder()
258 .page_cache_size(64 * 1024 * 1024)
259 .build(),
260 ));
261 let builder = ParquetReaderBuilder::new(
262 FILE_DIR.to_string(),
263 PathType::Bare,
264 handle.clone(),
265 object_store,
266 )
267 .cache(cache.clone());
268 for _ in 0..3 {
269 let mut reader = builder.build().await.unwrap();
270 check_reader_result(
271 &mut reader,
272 &[
273 new_batch_by_range(&["a", "d"], 0, 50),
274 new_batch_by_range(&["a", "d"], 50, 60),
275 new_batch_by_range(&["b", "f"], 0, 40),
276 new_batch_by_range(&["b", "h"], 100, 150),
277 new_batch_by_range(&["b", "h"], 150, 200),
278 ],
279 )
280 .await;
281 }
282
283 let page_key =
285 PageKey::new_compressed(metadata.region_id, handle.file_id().file_id(), 0, 0);
286 assert!(cache.get_pages(&page_key).is_none());
287
288 for i in 0..4 {
290 let page_key =
291 PageKey::new_uncompressed(metadata.region_id, handle.file_id().file_id(), i, 0);
292 assert!(cache.get_pages(&page_key).is_some());
293 }
294 let page_key =
295 PageKey::new_uncompressed(metadata.region_id, handle.file_id().file_id(), 5, 0);
296 assert!(cache.get_pages(&page_key).is_none());
297 }
298
299 #[tokio::test]
300 async fn test_parquet_metadata_eq() {
301 let mut env = crate::test_util::TestEnv::new().await;
303 let object_store = env.init_object_store_manager();
304 let handle = sst_file_handle(0, 1000);
305 let metadata = Arc::new(sst_region_metadata());
306 let source = new_source(&[
307 new_batch_by_range(&["a", "d"], 0, 60),
308 new_batch_by_range(&["b", "f"], 0, 40),
309 new_batch_by_range(&["b", "h"], 100, 200),
310 ]);
311 let write_opts = WriteOptions {
312 row_group_size: 50,
313 ..Default::default()
314 };
315
316 let mut writer = ParquetWriter::new_with_object_store(
319 object_store.clone(),
320 metadata.clone(),
321 NoopIndexBuilder,
322 FixedPathProvider {
323 region_file_id: handle.file_id(),
324 },
325 Metrics::new(WriteType::Flush),
326 )
327 .await;
328
329 let sst_info = writer
330 .write_all(source, None, &write_opts)
331 .await
332 .unwrap()
333 .remove(0);
334 let writer_metadata = sst_info.file_metadata.unwrap();
335
336 let builder = ParquetReaderBuilder::new(
338 FILE_DIR.to_string(),
339 PathType::Bare,
340 handle.clone(),
341 object_store,
342 );
343 let reader = builder.build().await.unwrap();
344 let reader_metadata = reader.parquet_metadata();
345
346 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
347 }
348
349 #[tokio::test]
350 async fn test_read_with_tag_filter() {
351 let mut env = TestEnv::new().await;
352 let object_store = env.init_object_store_manager();
353 let handle = sst_file_handle(0, 1000);
354 let metadata = Arc::new(sst_region_metadata());
355 let source = new_source(&[
356 new_batch_by_range(&["a", "d"], 0, 60),
357 new_batch_by_range(&["b", "f"], 0, 40),
358 new_batch_by_range(&["b", "h"], 100, 200),
359 ]);
360 let write_opts = WriteOptions {
362 row_group_size: 50,
363 ..Default::default()
364 };
365 let mut writer = ParquetWriter::new_with_object_store(
367 object_store.clone(),
368 metadata.clone(),
369 NoopIndexBuilder,
370 FixedPathProvider {
371 region_file_id: handle.file_id(),
372 },
373 Metrics::new(WriteType::Flush),
374 )
375 .await;
376 writer
377 .write_all(source, None, &write_opts)
378 .await
379 .unwrap()
380 .remove(0);
381
382 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
384 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
385 op: Operator::Eq,
386 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
387 })]));
388
389 let builder = ParquetReaderBuilder::new(
390 FILE_DIR.to_string(),
391 PathType::Bare,
392 handle.clone(),
393 object_store,
394 )
395 .predicate(predicate);
396 let mut reader = builder.build().await.unwrap();
397 check_reader_result(
398 &mut reader,
399 &[
400 new_batch_by_range(&["a", "d"], 0, 50),
401 new_batch_by_range(&["a", "d"], 50, 60),
402 ],
403 )
404 .await;
405 }
406
407 #[tokio::test]
408 async fn test_read_empty_batch() {
409 let mut env = TestEnv::new().await;
410 let object_store = env.init_object_store_manager();
411 let handle = sst_file_handle(0, 1000);
412 let metadata = Arc::new(sst_region_metadata());
413 let source = new_source(&[
414 new_batch_by_range(&["a", "z"], 0, 0),
415 new_batch_by_range(&["a", "z"], 100, 100),
416 new_batch_by_range(&["a", "z"], 200, 230),
417 ]);
418 let write_opts = WriteOptions {
420 row_group_size: 50,
421 ..Default::default()
422 };
423 let mut writer = ParquetWriter::new_with_object_store(
425 object_store.clone(),
426 metadata.clone(),
427 NoopIndexBuilder,
428 FixedPathProvider {
429 region_file_id: handle.file_id(),
430 },
431 Metrics::new(WriteType::Flush),
432 )
433 .await;
434 writer
435 .write_all(source, None, &write_opts)
436 .await
437 .unwrap()
438 .remove(0);
439
440 let builder = ParquetReaderBuilder::new(
441 FILE_DIR.to_string(),
442 PathType::Bare,
443 handle.clone(),
444 object_store,
445 );
446 let mut reader = builder.build().await.unwrap();
447 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
448 }
449
450 #[tokio::test]
451 async fn test_read_with_field_filter() {
452 let mut env = TestEnv::new().await;
453 let object_store = env.init_object_store_manager();
454 let handle = sst_file_handle(0, 1000);
455 let metadata = Arc::new(sst_region_metadata());
456 let source = new_source(&[
457 new_batch_by_range(&["a", "d"], 0, 60),
458 new_batch_by_range(&["b", "f"], 0, 40),
459 new_batch_by_range(&["b", "h"], 100, 200),
460 ]);
461 let write_opts = WriteOptions {
463 row_group_size: 50,
464 ..Default::default()
465 };
466 let mut writer = ParquetWriter::new_with_object_store(
468 object_store.clone(),
469 metadata.clone(),
470 NoopIndexBuilder,
471 FixedPathProvider {
472 region_file_id: handle.file_id(),
473 },
474 Metrics::new(WriteType::Flush),
475 )
476 .await;
477
478 writer
479 .write_all(source, None, &write_opts)
480 .await
481 .unwrap()
482 .remove(0);
483
484 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
486 left: Box::new(Expr::Column(Column::from_name("field_0"))),
487 op: Operator::GtEq,
488 right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
489 })]));
490
491 let builder = ParquetReaderBuilder::new(
492 FILE_DIR.to_string(),
493 PathType::Bare,
494 handle.clone(),
495 object_store,
496 )
497 .predicate(predicate);
498 let mut reader = builder.build().await.unwrap();
499 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
500 }
501
502 #[tokio::test]
503 async fn test_read_large_binary() {
504 let mut env = TestEnv::new().await;
505 let object_store = env.init_object_store_manager();
506 let handle = sst_file_handle(0, 1000);
507 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
508
509 let write_opts = WriteOptions {
510 row_group_size: 50,
511 ..Default::default()
512 };
513
514 let metadata = build_test_binary_test_region_metadata();
515 let json = metadata.to_json().unwrap();
516 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
517
518 let props_builder = WriterProperties::builder()
519 .set_key_value_metadata(Some(vec![key_value_meta]))
520 .set_compression(Compression::ZSTD(ZstdLevel::default()))
521 .set_encoding(Encoding::PLAIN)
522 .set_max_row_group_size(write_opts.row_group_size);
523
524 let writer_props = props_builder.build();
525
526 let write_format = WriteFormat::new(metadata);
527 let fields: Vec<_> = write_format
528 .arrow_schema()
529 .fields()
530 .into_iter()
531 .map(|field| {
532 let data_type = field.data_type().clone();
533 if data_type == DataType::Binary {
534 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
535 } else {
536 Field::new(field.name(), data_type, field.is_nullable())
537 }
538 })
539 .collect();
540
541 let arrow_schema = Arc::new(Schema::new(fields));
542
543 assert_eq!(
545 &DataType::LargeBinary,
546 arrow_schema.field_with_name("field_0").unwrap().data_type()
547 );
548 let mut writer = AsyncArrowWriter::try_new(
549 object_store
550 .writer_with(&file_path)
551 .concurrent(DEFAULT_WRITE_CONCURRENCY)
552 .await
553 .map(|w| w.into_futures_async_write().compat_write())
554 .unwrap(),
555 arrow_schema.clone(),
556 Some(writer_props),
557 )
558 .unwrap();
559
560 let batch = new_batch_with_binary(&["a"], 0, 60);
561 let arrow_batch = write_format.convert_batch(&batch).unwrap();
562 let arrays: Vec<_> = arrow_batch
563 .columns()
564 .iter()
565 .map(|array| {
566 let data_type = array.data_type().clone();
567 if data_type == DataType::Binary {
568 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
569 } else {
570 array.clone()
571 }
572 })
573 .collect();
574 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
575
576 writer.write(&result).await.unwrap();
577 writer.close().await.unwrap();
578
579 let builder = ParquetReaderBuilder::new(
580 FILE_DIR.to_string(),
581 PathType::Bare,
582 handle.clone(),
583 object_store,
584 );
585 let mut reader = builder.build().await.unwrap();
586 check_reader_result(
587 &mut reader,
588 &[
589 new_batch_with_binary(&["a"], 0, 50),
590 new_batch_with_binary(&["a"], 50, 60),
591 ],
592 )
593 .await;
594 }
595
596 #[tokio::test]
597 async fn test_write_multiple_files() {
598 common_telemetry::init_default_ut_logging();
599 let mut env = TestEnv::new().await;
601 let object_store = env.init_object_store_manager();
602 let metadata = Arc::new(sst_region_metadata());
603 let batches = &[
604 new_batch_by_range(&["a", "d"], 0, 1000),
605 new_batch_by_range(&["b", "f"], 0, 1000),
606 new_batch_by_range(&["b", "h"], 100, 200),
607 new_batch_by_range(&["b", "h"], 200, 300),
608 new_batch_by_range(&["b", "h"], 300, 1000),
609 ];
610 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
611
612 let source = new_source(batches);
613 let write_opts = WriteOptions {
614 row_group_size: 50,
615 max_file_size: Some(1024 * 16),
616 ..Default::default()
617 };
618
619 let path_provider = RegionFilePathFactory {
620 table_dir: "test".to_string(),
621 path_type: PathType::Bare,
622 };
623 let mut writer = ParquetWriter::new_with_object_store(
624 object_store.clone(),
625 metadata.clone(),
626 NoopIndexBuilder,
627 path_provider,
628 Metrics::new(WriteType::Flush),
629 )
630 .await;
631
632 let files = writer.write_all(source, None, &write_opts).await.unwrap();
633 assert_eq!(2, files.len());
634
635 let mut rows_read = 0;
636 for f in &files {
637 let file_handle = sst_file_handle_with_file_id(
638 f.file_id,
639 f.time_range.0.value(),
640 f.time_range.1.value(),
641 );
642 let builder = ParquetReaderBuilder::new(
643 "test".to_string(),
644 PathType::Bare,
645 file_handle,
646 object_store.clone(),
647 );
648 let mut reader = builder.build().await.unwrap();
649 while let Some(batch) = reader.next_batch().await.unwrap() {
650 rows_read += batch.num_rows();
651 }
652 }
653 assert_eq!(total_rows, rows_read);
654 }
655
656 #[tokio::test]
657 async fn test_write_read_with_index() {
658 let mut env = TestEnv::new().await;
659 let object_store = env.init_object_store_manager();
660 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
661 let metadata = Arc::new(sst_region_metadata());
662 let row_group_size = 50;
663
664 let source = new_source(&[
665 new_batch_by_range(&["a", "d"], 0, 20),
666 new_batch_by_range(&["b", "d"], 0, 20),
667 new_batch_by_range(&["c", "d"], 0, 20),
668 new_batch_by_range(&["c", "f"], 0, 40),
669 new_batch_by_range(&["c", "h"], 100, 200),
670 ]);
671 let write_opts = WriteOptions {
673 row_group_size,
674 ..Default::default()
675 };
676
677 let puffin_manager = env
678 .get_puffin_manager()
679 .build(object_store.clone(), file_path.clone());
680 let intermediate_manager = env.get_intermediate_manager();
681
682 let indexer_builder = IndexerBuilderImpl {
683 op_type: OperationType::Flush,
684 metadata: metadata.clone(),
685 row_group_size,
686 puffin_manager,
687 intermediate_manager,
688 index_options: IndexOptions {
689 inverted_index: InvertedIndexOptions {
690 segment_row_count: 1,
691 ..Default::default()
692 },
693 },
694 inverted_index_config: Default::default(),
695 fulltext_index_config: Default::default(),
696 bloom_filter_index_config: Default::default(),
697 };
698
699 let mut writer = ParquetWriter::new_with_object_store(
700 object_store.clone(),
701 metadata.clone(),
702 indexer_builder,
703 file_path.clone(),
704 Metrics::new(WriteType::Flush),
705 )
706 .await;
707
708 let info = writer
709 .write_all(source, None, &write_opts)
710 .await
711 .unwrap()
712 .remove(0);
713 assert_eq!(200, info.num_rows);
714 assert!(info.file_size > 0);
715 assert!(info.index_metadata.file_size > 0);
716
717 assert!(info.index_metadata.inverted_index.index_size > 0);
718 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
719 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
720
721 assert!(info.index_metadata.bloom_filter.index_size > 0);
722 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
723 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
724
725 assert_eq!(
726 (
727 Timestamp::new_millisecond(0),
728 Timestamp::new_millisecond(199)
729 ),
730 info.time_range
731 );
732
733 let handle = FileHandle::new(
734 FileMeta {
735 region_id: metadata.region_id,
736 file_id: info.file_id,
737 time_range: info.time_range,
738 level: 0,
739 file_size: info.file_size,
740 available_indexes: info.index_metadata.build_available_indexes(),
741 index_file_size: info.index_metadata.file_size,
742 num_row_groups: info.num_row_groups,
743 num_rows: info.num_rows as u64,
744 sequence: None,
745 },
746 Arc::new(NoopFilePurger),
747 );
748
749 let cache = Arc::new(
750 CacheManager::builder()
751 .index_result_cache_size(1024 * 1024)
752 .index_metadata_size(1024 * 1024)
753 .index_content_page_size(1024 * 1024)
754 .index_content_size(1024 * 1024)
755 .puffin_metadata_size(1024 * 1024)
756 .build(),
757 );
758 let index_result_cache = cache.index_result_cache().unwrap();
759
760 let build_inverted_index_applier = |exprs: &[Expr]| {
761 InvertedIndexApplierBuilder::new(
762 FILE_DIR.to_string(),
763 PathType::Bare,
764 object_store.clone(),
765 &metadata,
766 HashSet::from_iter([0]),
767 env.get_puffin_manager(),
768 )
769 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
770 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
771 .build(exprs)
772 .unwrap()
773 .map(Arc::new)
774 };
775
776 let build_bloom_filter_applier = |exprs: &[Expr]| {
777 BloomFilterIndexApplierBuilder::new(
778 FILE_DIR.to_string(),
779 PathType::Bare,
780 object_store.clone(),
781 &metadata,
782 env.get_puffin_manager(),
783 )
784 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
785 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
786 .build(exprs)
787 .unwrap()
788 .map(Arc::new)
789 };
790
791 let preds = vec![col("tag_0").eq(lit("b"))];
808 let inverted_index_applier = build_inverted_index_applier(&preds);
809 let bloom_filter_applier = build_bloom_filter_applier(&preds);
810
811 let builder = ParquetReaderBuilder::new(
812 FILE_DIR.to_string(),
813 PathType::Bare,
814 handle.clone(),
815 object_store.clone(),
816 )
817 .predicate(Some(Predicate::new(preds)))
818 .inverted_index_applier(inverted_index_applier.clone())
819 .bloom_filter_index_applier(bloom_filter_applier.clone())
820 .cache(CacheStrategy::EnableAll(cache.clone()));
821
822 let mut metrics = ReaderMetrics::default();
823 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
824 let mut reader = ParquetReader::new(Arc::new(context), selection)
825 .await
826 .unwrap();
827 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
828
829 assert_eq!(metrics.filter_metrics.rg_total, 4);
830 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
831 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
832 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
833 let cached = index_result_cache
834 .get(
835 inverted_index_applier.unwrap().predicate_key(),
836 handle.file_id().file_id(),
837 )
838 .unwrap();
839 assert!(cached.contains_row_group(0));
841 assert!(cached.contains_row_group(1));
842 assert!(cached.contains_row_group(2));
843 assert!(cached.contains_row_group(3));
844
845 let preds = vec![
860 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
861 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
862 col("tag_1").eq(lit("d")),
863 ];
864 let inverted_index_applier = build_inverted_index_applier(&preds);
865 let bloom_filter_applier = build_bloom_filter_applier(&preds);
866
867 let builder = ParquetReaderBuilder::new(
868 FILE_DIR.to_string(),
869 PathType::Bare,
870 handle.clone(),
871 object_store.clone(),
872 )
873 .predicate(Some(Predicate::new(preds)))
874 .inverted_index_applier(inverted_index_applier.clone())
875 .bloom_filter_index_applier(bloom_filter_applier.clone())
876 .cache(CacheStrategy::EnableAll(cache.clone()));
877
878 let mut metrics = ReaderMetrics::default();
879 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
880 let mut reader = ParquetReader::new(Arc::new(context), selection)
881 .await
882 .unwrap();
883 check_reader_result(&mut reader, &[]).await;
884
885 assert_eq!(metrics.filter_metrics.rg_total, 4);
886 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
887 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
888 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
889 let cached = index_result_cache
890 .get(
891 bloom_filter_applier.unwrap().predicate_key(),
892 handle.file_id().file_id(),
893 )
894 .unwrap();
895 assert!(cached.contains_row_group(2));
896 assert!(cached.contains_row_group(3));
897 assert!(!cached.contains_row_group(0));
898 assert!(!cached.contains_row_group(1));
899
900 let preds = vec![col("tag_1").eq(lit("d"))];
921 let inverted_index_applier = build_inverted_index_applier(&preds);
922 let bloom_filter_applier = build_bloom_filter_applier(&preds);
923
924 let builder = ParquetReaderBuilder::new(
925 FILE_DIR.to_string(),
926 PathType::Bare,
927 handle.clone(),
928 object_store.clone(),
929 )
930 .predicate(Some(Predicate::new(preds)))
931 .inverted_index_applier(inverted_index_applier.clone())
932 .bloom_filter_index_applier(bloom_filter_applier.clone())
933 .cache(CacheStrategy::EnableAll(cache.clone()));
934
935 let mut metrics = ReaderMetrics::default();
936 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
937 let mut reader = ParquetReader::new(Arc::new(context), selection)
938 .await
939 .unwrap();
940 check_reader_result(
941 &mut reader,
942 &[
943 new_batch_by_range(&["a", "d"], 0, 20),
944 new_batch_by_range(&["b", "d"], 0, 20),
945 new_batch_by_range(&["c", "d"], 0, 10),
946 new_batch_by_range(&["c", "d"], 10, 20),
947 ],
948 )
949 .await;
950
951 assert_eq!(metrics.filter_metrics.rg_total, 4);
952 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
953 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
954 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
955 let cached = index_result_cache
956 .get(
957 bloom_filter_applier.unwrap().predicate_key(),
958 handle.file_id().file_id(),
959 )
960 .unwrap();
961 assert!(cached.contains_row_group(0));
962 assert!(cached.contains_row_group(1));
963 assert!(cached.contains_row_group(2));
964 assert!(cached.contains_row_group(3));
965 }
966
967 #[tokio::test]
968 async fn test_read_with_override_sequence() {
969 let mut env = TestEnv::new().await;
970 let object_store = env.init_object_store_manager();
971 let handle = sst_file_handle(0, 1000);
972 let file_path = FixedPathProvider {
973 region_file_id: handle.file_id(),
974 };
975 let metadata = Arc::new(sst_region_metadata());
976
977 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
979 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
980 let source = new_source(&[batch1, batch2]);
981
982 let write_opts = WriteOptions {
983 row_group_size: 50,
984 ..Default::default()
985 };
986
987 let mut writer = ParquetWriter::new_with_object_store(
988 object_store.clone(),
989 metadata.clone(),
990 NoopIndexBuilder,
991 file_path,
992 Metrics::new(WriteType::Flush),
993 )
994 .await;
995
996 writer
997 .write_all(source, None, &write_opts)
998 .await
999 .unwrap()
1000 .remove(0);
1001
1002 let builder = ParquetReaderBuilder::new(
1004 FILE_DIR.to_string(),
1005 PathType::Bare,
1006 handle.clone(),
1007 object_store.clone(),
1008 );
1009 let mut reader = builder.build().await.unwrap();
1010 let mut normal_batches = Vec::new();
1011 while let Some(batch) = reader.next_batch().await.unwrap() {
1012 normal_batches.push(batch);
1013 }
1014
1015 let custom_sequence = 12345u64;
1017 let file_meta = handle.meta_ref();
1018 let mut override_file_meta = file_meta.clone();
1019 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1020 let override_handle = FileHandle::new(
1021 override_file_meta,
1022 Arc::new(crate::sst::file_purger::NoopFilePurger),
1023 );
1024
1025 let builder = ParquetReaderBuilder::new(
1026 FILE_DIR.to_string(),
1027 PathType::Bare,
1028 override_handle,
1029 object_store.clone(),
1030 );
1031 let mut reader = builder.build().await.unwrap();
1032 let mut override_batches = Vec::new();
1033 while let Some(batch) = reader.next_batch().await.unwrap() {
1034 override_batches.push(batch);
1035 }
1036
1037 assert_eq!(normal_batches.len(), override_batches.len());
1039 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1040 let expected_batch = {
1042 let num_rows = normal.num_rows();
1043 let mut builder = BatchBuilder::from(normal);
1044 builder
1045 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1046 .unwrap();
1047
1048 builder.build().unwrap()
1049 };
1050
1051 assert_eq!(*override_batch, expected_batch);
1053 }
1054 }
1055}