1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod flat_format;
28pub mod format;
29pub(crate) mod helper;
30pub(crate) mod metadata;
31pub mod reader;
32pub mod row_group;
33pub mod row_selection;
34pub(crate) mod stats;
35pub mod writer;
36
37pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
39
40pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
42pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
44
45#[derive(Debug)]
47pub struct WriteOptions {
48 pub write_buffer_size: ReadableSize,
50 pub row_group_size: usize,
52 pub max_file_size: Option<usize>,
56}
57
58impl Default for WriteOptions {
59 fn default() -> Self {
60 WriteOptions {
61 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
62 row_group_size: DEFAULT_ROW_GROUP_SIZE,
63 max_file_size: None,
64 }
65 }
66}
67
68#[derive(Debug)]
70pub struct SstInfo {
71 pub file_id: FileId,
73 pub time_range: FileTimeRange,
76 pub file_size: u64,
78 pub num_rows: usize,
80 pub num_row_groups: u64,
82 pub file_metadata: Option<Arc<ParquetMetaData>>,
84 pub index_metadata: IndexOutput,
86}
87
88#[cfg(test)]
89mod tests {
90 use std::collections::HashSet;
91 use std::sync::Arc;
92
93 use api::v1::OpType;
94 use common_time::Timestamp;
95 use datafusion_common::{Column, ScalarValue};
96 use datafusion_expr::{col, lit, BinaryExpr, Expr, Literal, Operator};
97 use datatypes::arrow;
98 use datatypes::arrow::array::{
99 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
100 TimestampMillisecondArray, UInt64Array, UInt8Array,
101 };
102 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
103 use parquet::arrow::AsyncArrowWriter;
104 use parquet::basic::{Compression, Encoding, ZstdLevel};
105 use parquet::file::metadata::KeyValue;
106 use parquet::file::properties::WriterProperties;
107 use store_api::region_request::PathType;
108 use table::predicate::Predicate;
109 use tokio_util::compat::FuturesAsyncWriteCompatExt;
110
111 use super::*;
112 use crate::access_layer::{
113 FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
114 };
115 use crate::cache::{CacheManager, CacheStrategy, PageKey};
116 use crate::read::{BatchBuilder, BatchReader, FlatSource};
117 use crate::region::options::{IndexOptions, InvertedIndexOptions};
118 use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
119 use crate::sst::file_purger::NoopFilePurger;
120 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
121 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
122 use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
123 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
124 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
125 use crate::sst::parquet::writer::ParquetWriter;
126 use crate::sst::{
127 location, to_flat_sst_arrow_schema, FlatSchemaOptions, DEFAULT_WRITE_CONCURRENCY,
128 };
129 use crate::test_util::sst_util::{
130 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
131 new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
132 sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
133 };
134 use crate::test_util::{check_reader_result, TestEnv};
135
136 const FILE_DIR: &str = "/";
137
138 #[derive(Clone)]
139 struct FixedPathProvider {
140 region_file_id: RegionFileId,
141 }
142
143 impl FilePathProvider for FixedPathProvider {
144 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
145 location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
146 }
147
148 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
149 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
150 }
151 }
152
153 struct NoopIndexBuilder;
154
155 #[async_trait::async_trait]
156 impl IndexerBuilder for NoopIndexBuilder {
157 async fn build(&self, _file_id: FileId) -> Indexer {
158 Indexer::default()
159 }
160 }
161
162 #[tokio::test]
163 async fn test_write_read() {
164 let mut env = TestEnv::new().await;
165 let object_store = env.init_object_store_manager();
166 let handle = sst_file_handle(0, 1000);
167 let file_path = FixedPathProvider {
168 region_file_id: handle.file_id(),
169 };
170 let metadata = Arc::new(sst_region_metadata());
171 let source = new_source(&[
172 new_batch_by_range(&["a", "d"], 0, 60),
173 new_batch_by_range(&["b", "f"], 0, 40),
174 new_batch_by_range(&["b", "h"], 100, 200),
175 ]);
176 let write_opts = WriteOptions {
178 row_group_size: 50,
179 ..Default::default()
180 };
181
182 let mut writer = ParquetWriter::new_with_object_store(
183 object_store.clone(),
184 metadata.clone(),
185 NoopIndexBuilder,
186 file_path,
187 Metrics::new(WriteType::Flush),
188 )
189 .await;
190
191 let info = writer
192 .write_all(source, None, &write_opts)
193 .await
194 .unwrap()
195 .remove(0);
196 assert_eq!(200, info.num_rows);
197 assert!(info.file_size > 0);
198 assert_eq!(
199 (
200 Timestamp::new_millisecond(0),
201 Timestamp::new_millisecond(199)
202 ),
203 info.time_range
204 );
205
206 let builder = ParquetReaderBuilder::new(
207 FILE_DIR.to_string(),
208 PathType::Bare,
209 handle.clone(),
210 object_store,
211 );
212 let mut reader = builder.build().await.unwrap();
213 check_reader_result(
214 &mut reader,
215 &[
216 new_batch_by_range(&["a", "d"], 0, 50),
217 new_batch_by_range(&["a", "d"], 50, 60),
218 new_batch_by_range(&["b", "f"], 0, 40),
219 new_batch_by_range(&["b", "h"], 100, 150),
220 new_batch_by_range(&["b", "h"], 150, 200),
221 ],
222 )
223 .await;
224 }
225
226 #[tokio::test]
227 async fn test_read_with_cache() {
228 let mut env = TestEnv::new().await;
229 let object_store = env.init_object_store_manager();
230 let handle = sst_file_handle(0, 1000);
231 let metadata = Arc::new(sst_region_metadata());
232 let source = new_source(&[
233 new_batch_by_range(&["a", "d"], 0, 60),
234 new_batch_by_range(&["b", "f"], 0, 40),
235 new_batch_by_range(&["b", "h"], 100, 200),
236 ]);
237 let write_opts = WriteOptions {
239 row_group_size: 50,
240 ..Default::default()
241 };
242 let mut writer = ParquetWriter::new_with_object_store(
244 object_store.clone(),
245 metadata.clone(),
246 NoopIndexBuilder,
247 FixedPathProvider {
248 region_file_id: handle.file_id(),
249 },
250 Metrics::new(WriteType::Flush),
251 )
252 .await;
253
254 let sst_info = writer
255 .write_all(source, None, &write_opts)
256 .await
257 .unwrap()
258 .remove(0);
259
260 let cache = CacheStrategy::EnableAll(Arc::new(
262 CacheManager::builder()
263 .page_cache_size(64 * 1024 * 1024)
264 .build(),
265 ));
266 let builder = ParquetReaderBuilder::new(
267 FILE_DIR.to_string(),
268 PathType::Bare,
269 handle.clone(),
270 object_store,
271 )
272 .cache(cache.clone());
273 for _ in 0..3 {
274 let mut reader = builder.build().await.unwrap();
275 check_reader_result(
276 &mut reader,
277 &[
278 new_batch_by_range(&["a", "d"], 0, 50),
279 new_batch_by_range(&["a", "d"], 50, 60),
280 new_batch_by_range(&["b", "f"], 0, 40),
281 new_batch_by_range(&["b", "h"], 100, 150),
282 new_batch_by_range(&["b", "h"], 150, 200),
283 ],
284 )
285 .await;
286 }
287
288 let parquet_meta = sst_info.file_metadata.unwrap();
289 let get_ranges = |row_group_idx: usize| {
290 let row_group = parquet_meta.row_group(row_group_idx);
291 let mut ranges = Vec::with_capacity(row_group.num_columns());
292 for i in 0..row_group.num_columns() {
293 let (start, length) = row_group.column(i).byte_range();
294 ranges.push(start..start + length);
295 }
296
297 ranges
298 };
299
300 for i in 0..4 {
302 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
303 assert!(cache.get_pages(&page_key).is_some());
304 }
305 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
306 assert!(cache.get_pages(&page_key).is_none());
307 }
308
309 #[tokio::test]
310 async fn test_parquet_metadata_eq() {
311 let mut env = crate::test_util::TestEnv::new().await;
313 let object_store = env.init_object_store_manager();
314 let handle = sst_file_handle(0, 1000);
315 let metadata = Arc::new(sst_region_metadata());
316 let source = new_source(&[
317 new_batch_by_range(&["a", "d"], 0, 60),
318 new_batch_by_range(&["b", "f"], 0, 40),
319 new_batch_by_range(&["b", "h"], 100, 200),
320 ]);
321 let write_opts = WriteOptions {
322 row_group_size: 50,
323 ..Default::default()
324 };
325
326 let mut writer = ParquetWriter::new_with_object_store(
329 object_store.clone(),
330 metadata.clone(),
331 NoopIndexBuilder,
332 FixedPathProvider {
333 region_file_id: handle.file_id(),
334 },
335 Metrics::new(WriteType::Flush),
336 )
337 .await;
338
339 let sst_info = writer
340 .write_all(source, None, &write_opts)
341 .await
342 .unwrap()
343 .remove(0);
344 let writer_metadata = sst_info.file_metadata.unwrap();
345
346 let builder = ParquetReaderBuilder::new(
348 FILE_DIR.to_string(),
349 PathType::Bare,
350 handle.clone(),
351 object_store,
352 );
353 let reader = builder.build().await.unwrap();
354 let reader_metadata = reader.parquet_metadata();
355
356 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
357 }
358
359 #[tokio::test]
360 async fn test_read_with_tag_filter() {
361 let mut env = TestEnv::new().await;
362 let object_store = env.init_object_store_manager();
363 let handle = sst_file_handle(0, 1000);
364 let metadata = Arc::new(sst_region_metadata());
365 let source = new_source(&[
366 new_batch_by_range(&["a", "d"], 0, 60),
367 new_batch_by_range(&["b", "f"], 0, 40),
368 new_batch_by_range(&["b", "h"], 100, 200),
369 ]);
370 let write_opts = WriteOptions {
372 row_group_size: 50,
373 ..Default::default()
374 };
375 let mut writer = ParquetWriter::new_with_object_store(
377 object_store.clone(),
378 metadata.clone(),
379 NoopIndexBuilder,
380 FixedPathProvider {
381 region_file_id: handle.file_id(),
382 },
383 Metrics::new(WriteType::Flush),
384 )
385 .await;
386 writer
387 .write_all(source, None, &write_opts)
388 .await
389 .unwrap()
390 .remove(0);
391
392 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
394 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
395 op: Operator::Eq,
396 right: Box::new("a".lit()),
397 })]));
398
399 let builder = ParquetReaderBuilder::new(
400 FILE_DIR.to_string(),
401 PathType::Bare,
402 handle.clone(),
403 object_store,
404 )
405 .predicate(predicate);
406 let mut reader = builder.build().await.unwrap();
407 check_reader_result(
408 &mut reader,
409 &[
410 new_batch_by_range(&["a", "d"], 0, 50),
411 new_batch_by_range(&["a", "d"], 50, 60),
412 ],
413 )
414 .await;
415 }
416
417 #[tokio::test]
418 async fn test_read_empty_batch() {
419 let mut env = TestEnv::new().await;
420 let object_store = env.init_object_store_manager();
421 let handle = sst_file_handle(0, 1000);
422 let metadata = Arc::new(sst_region_metadata());
423 let source = new_source(&[
424 new_batch_by_range(&["a", "z"], 0, 0),
425 new_batch_by_range(&["a", "z"], 100, 100),
426 new_batch_by_range(&["a", "z"], 200, 230),
427 ]);
428 let write_opts = WriteOptions {
430 row_group_size: 50,
431 ..Default::default()
432 };
433 let mut writer = ParquetWriter::new_with_object_store(
435 object_store.clone(),
436 metadata.clone(),
437 NoopIndexBuilder,
438 FixedPathProvider {
439 region_file_id: handle.file_id(),
440 },
441 Metrics::new(WriteType::Flush),
442 )
443 .await;
444 writer
445 .write_all(source, None, &write_opts)
446 .await
447 .unwrap()
448 .remove(0);
449
450 let builder = ParquetReaderBuilder::new(
451 FILE_DIR.to_string(),
452 PathType::Bare,
453 handle.clone(),
454 object_store,
455 );
456 let mut reader = builder.build().await.unwrap();
457 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
458 }
459
460 #[tokio::test]
461 async fn test_read_with_field_filter() {
462 let mut env = TestEnv::new().await;
463 let object_store = env.init_object_store_manager();
464 let handle = sst_file_handle(0, 1000);
465 let metadata = Arc::new(sst_region_metadata());
466 let source = new_source(&[
467 new_batch_by_range(&["a", "d"], 0, 60),
468 new_batch_by_range(&["b", "f"], 0, 40),
469 new_batch_by_range(&["b", "h"], 100, 200),
470 ]);
471 let write_opts = WriteOptions {
473 row_group_size: 50,
474 ..Default::default()
475 };
476 let mut writer = ParquetWriter::new_with_object_store(
478 object_store.clone(),
479 metadata.clone(),
480 NoopIndexBuilder,
481 FixedPathProvider {
482 region_file_id: handle.file_id(),
483 },
484 Metrics::new(WriteType::Flush),
485 )
486 .await;
487
488 writer
489 .write_all(source, None, &write_opts)
490 .await
491 .unwrap()
492 .remove(0);
493
494 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
496 left: Box::new(Expr::Column(Column::from_name("field_0"))),
497 op: Operator::GtEq,
498 right: Box::new(150u64.lit()),
499 })]));
500
501 let builder = ParquetReaderBuilder::new(
502 FILE_DIR.to_string(),
503 PathType::Bare,
504 handle.clone(),
505 object_store,
506 )
507 .predicate(predicate);
508 let mut reader = builder.build().await.unwrap();
509 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
510 }
511
512 #[tokio::test]
513 async fn test_read_large_binary() {
514 let mut env = TestEnv::new().await;
515 let object_store = env.init_object_store_manager();
516 let handle = sst_file_handle(0, 1000);
517 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
518
519 let write_opts = WriteOptions {
520 row_group_size: 50,
521 ..Default::default()
522 };
523
524 let metadata = build_test_binary_test_region_metadata();
525 let json = metadata.to_json().unwrap();
526 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
527
528 let props_builder = WriterProperties::builder()
529 .set_key_value_metadata(Some(vec![key_value_meta]))
530 .set_compression(Compression::ZSTD(ZstdLevel::default()))
531 .set_encoding(Encoding::PLAIN)
532 .set_max_row_group_size(write_opts.row_group_size);
533
534 let writer_props = props_builder.build();
535
536 let write_format = PrimaryKeyWriteFormat::new(metadata);
537 let fields: Vec<_> = write_format
538 .arrow_schema()
539 .fields()
540 .into_iter()
541 .map(|field| {
542 let data_type = field.data_type().clone();
543 if data_type == DataType::Binary {
544 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
545 } else {
546 Field::new(field.name(), data_type, field.is_nullable())
547 }
548 })
549 .collect();
550
551 let arrow_schema = Arc::new(Schema::new(fields));
552
553 assert_eq!(
555 &DataType::LargeBinary,
556 arrow_schema.field_with_name("field_0").unwrap().data_type()
557 );
558 let mut writer = AsyncArrowWriter::try_new(
559 object_store
560 .writer_with(&file_path)
561 .concurrent(DEFAULT_WRITE_CONCURRENCY)
562 .await
563 .map(|w| w.into_futures_async_write().compat_write())
564 .unwrap(),
565 arrow_schema.clone(),
566 Some(writer_props),
567 )
568 .unwrap();
569
570 let batch = new_batch_with_binary(&["a"], 0, 60);
571 let arrow_batch = write_format.convert_batch(&batch).unwrap();
572 let arrays: Vec<_> = arrow_batch
573 .columns()
574 .iter()
575 .map(|array| {
576 let data_type = array.data_type().clone();
577 if data_type == DataType::Binary {
578 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
579 } else {
580 array.clone()
581 }
582 })
583 .collect();
584 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
585
586 writer.write(&result).await.unwrap();
587 writer.close().await.unwrap();
588
589 let builder = ParquetReaderBuilder::new(
590 FILE_DIR.to_string(),
591 PathType::Bare,
592 handle.clone(),
593 object_store,
594 );
595 let mut reader = builder.build().await.unwrap();
596 check_reader_result(
597 &mut reader,
598 &[
599 new_batch_with_binary(&["a"], 0, 50),
600 new_batch_with_binary(&["a"], 50, 60),
601 ],
602 )
603 .await;
604 }
605
606 #[tokio::test]
607 async fn test_write_multiple_files() {
608 common_telemetry::init_default_ut_logging();
609 let mut env = TestEnv::new().await;
611 let object_store = env.init_object_store_manager();
612 let metadata = Arc::new(sst_region_metadata());
613 let batches = &[
614 new_batch_by_range(&["a", "d"], 0, 1000),
615 new_batch_by_range(&["b", "f"], 0, 1000),
616 new_batch_by_range(&["c", "g"], 0, 1000),
617 new_batch_by_range(&["b", "h"], 100, 200),
618 new_batch_by_range(&["b", "h"], 200, 300),
619 new_batch_by_range(&["b", "h"], 300, 1000),
620 ];
621 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
622
623 let source = new_source(batches);
624 let write_opts = WriteOptions {
625 row_group_size: 50,
626 max_file_size: Some(1024 * 16),
627 ..Default::default()
628 };
629
630 let path_provider = RegionFilePathFactory {
631 table_dir: "test".to_string(),
632 path_type: PathType::Bare,
633 };
634 let mut writer = ParquetWriter::new_with_object_store(
635 object_store.clone(),
636 metadata.clone(),
637 NoopIndexBuilder,
638 path_provider,
639 Metrics::new(WriteType::Flush),
640 )
641 .await;
642
643 let files = writer.write_all(source, None, &write_opts).await.unwrap();
644 assert_eq!(2, files.len());
645
646 let mut rows_read = 0;
647 for f in &files {
648 let file_handle = sst_file_handle_with_file_id(
649 f.file_id,
650 f.time_range.0.value(),
651 f.time_range.1.value(),
652 );
653 let builder = ParquetReaderBuilder::new(
654 "test".to_string(),
655 PathType::Bare,
656 file_handle,
657 object_store.clone(),
658 );
659 let mut reader = builder.build().await.unwrap();
660 while let Some(batch) = reader.next_batch().await.unwrap() {
661 rows_read += batch.num_rows();
662 }
663 }
664 assert_eq!(total_rows, rows_read);
665 }
666
667 #[tokio::test]
668 async fn test_write_read_with_index() {
669 let mut env = TestEnv::new().await;
670 let object_store = env.init_object_store_manager();
671 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
672 let metadata = Arc::new(sst_region_metadata());
673 let row_group_size = 50;
674
675 let source = new_source(&[
676 new_batch_by_range(&["a", "d"], 0, 20),
677 new_batch_by_range(&["b", "d"], 0, 20),
678 new_batch_by_range(&["c", "d"], 0, 20),
679 new_batch_by_range(&["c", "f"], 0, 40),
680 new_batch_by_range(&["c", "h"], 100, 200),
681 ]);
682 let write_opts = WriteOptions {
684 row_group_size,
685 ..Default::default()
686 };
687
688 let puffin_manager = env
689 .get_puffin_manager()
690 .build(object_store.clone(), file_path.clone());
691 let intermediate_manager = env.get_intermediate_manager();
692
693 let indexer_builder = IndexerBuilderImpl {
694 op_type: OperationType::Flush,
695 metadata: metadata.clone(),
696 row_group_size,
697 puffin_manager,
698 intermediate_manager,
699 index_options: IndexOptions {
700 inverted_index: InvertedIndexOptions {
701 segment_row_count: 1,
702 ..Default::default()
703 },
704 },
705 inverted_index_config: Default::default(),
706 fulltext_index_config: Default::default(),
707 bloom_filter_index_config: Default::default(),
708 };
709
710 let mut writer = ParquetWriter::new_with_object_store(
711 object_store.clone(),
712 metadata.clone(),
713 indexer_builder,
714 file_path.clone(),
715 Metrics::new(WriteType::Flush),
716 )
717 .await;
718
719 let info = writer
720 .write_all(source, None, &write_opts)
721 .await
722 .unwrap()
723 .remove(0);
724 assert_eq!(200, info.num_rows);
725 assert!(info.file_size > 0);
726 assert!(info.index_metadata.file_size > 0);
727
728 assert!(info.index_metadata.inverted_index.index_size > 0);
729 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
730 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
731
732 assert!(info.index_metadata.bloom_filter.index_size > 0);
733 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
734 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
735
736 assert_eq!(
737 (
738 Timestamp::new_millisecond(0),
739 Timestamp::new_millisecond(199)
740 ),
741 info.time_range
742 );
743
744 let handle = FileHandle::new(
745 FileMeta {
746 region_id: metadata.region_id,
747 file_id: info.file_id,
748 time_range: info.time_range,
749 level: 0,
750 file_size: info.file_size,
751 available_indexes: info.index_metadata.build_available_indexes(),
752 index_file_size: info.index_metadata.file_size,
753 num_row_groups: info.num_row_groups,
754 num_rows: info.num_rows as u64,
755 sequence: None,
756 },
757 Arc::new(NoopFilePurger),
758 );
759
760 let cache = Arc::new(
761 CacheManager::builder()
762 .index_result_cache_size(1024 * 1024)
763 .index_metadata_size(1024 * 1024)
764 .index_content_page_size(1024 * 1024)
765 .index_content_size(1024 * 1024)
766 .puffin_metadata_size(1024 * 1024)
767 .build(),
768 );
769 let index_result_cache = cache.index_result_cache().unwrap();
770
771 let build_inverted_index_applier = |exprs: &[Expr]| {
772 InvertedIndexApplierBuilder::new(
773 FILE_DIR.to_string(),
774 PathType::Bare,
775 object_store.clone(),
776 &metadata,
777 HashSet::from_iter([0]),
778 env.get_puffin_manager(),
779 )
780 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
781 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
782 .build(exprs)
783 .unwrap()
784 .map(Arc::new)
785 };
786
787 let build_bloom_filter_applier = |exprs: &[Expr]| {
788 BloomFilterIndexApplierBuilder::new(
789 FILE_DIR.to_string(),
790 PathType::Bare,
791 object_store.clone(),
792 &metadata,
793 env.get_puffin_manager(),
794 )
795 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
796 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
797 .build(exprs)
798 .unwrap()
799 .map(Arc::new)
800 };
801
802 let preds = vec![col("tag_0").eq(lit("b"))];
819 let inverted_index_applier = build_inverted_index_applier(&preds);
820 let bloom_filter_applier = build_bloom_filter_applier(&preds);
821
822 let builder = ParquetReaderBuilder::new(
823 FILE_DIR.to_string(),
824 PathType::Bare,
825 handle.clone(),
826 object_store.clone(),
827 )
828 .predicate(Some(Predicate::new(preds)))
829 .inverted_index_applier(inverted_index_applier.clone())
830 .bloom_filter_index_applier(bloom_filter_applier.clone())
831 .cache(CacheStrategy::EnableAll(cache.clone()));
832
833 let mut metrics = ReaderMetrics::default();
834 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
835 let mut reader = ParquetReader::new(Arc::new(context), selection)
836 .await
837 .unwrap();
838 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
839
840 assert_eq!(metrics.filter_metrics.rg_total, 4);
841 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
842 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
843 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
844 let cached = index_result_cache
845 .get(
846 inverted_index_applier.unwrap().predicate_key(),
847 handle.file_id().file_id(),
848 )
849 .unwrap();
850 assert!(cached.contains_row_group(0));
852 assert!(cached.contains_row_group(1));
853 assert!(cached.contains_row_group(2));
854 assert!(cached.contains_row_group(3));
855
856 let preds = vec![
871 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
872 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
873 col("tag_1").eq(lit("d")),
874 ];
875 let inverted_index_applier = build_inverted_index_applier(&preds);
876 let bloom_filter_applier = build_bloom_filter_applier(&preds);
877
878 let builder = ParquetReaderBuilder::new(
879 FILE_DIR.to_string(),
880 PathType::Bare,
881 handle.clone(),
882 object_store.clone(),
883 )
884 .predicate(Some(Predicate::new(preds)))
885 .inverted_index_applier(inverted_index_applier.clone())
886 .bloom_filter_index_applier(bloom_filter_applier.clone())
887 .cache(CacheStrategy::EnableAll(cache.clone()));
888
889 let mut metrics = ReaderMetrics::default();
890 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
891 let mut reader = ParquetReader::new(Arc::new(context), selection)
892 .await
893 .unwrap();
894 check_reader_result(&mut reader, &[]).await;
895
896 assert_eq!(metrics.filter_metrics.rg_total, 4);
897 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
898 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
899 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
900 let cached = index_result_cache
901 .get(
902 bloom_filter_applier.unwrap().predicate_key(),
903 handle.file_id().file_id(),
904 )
905 .unwrap();
906 assert!(cached.contains_row_group(2));
907 assert!(cached.contains_row_group(3));
908 assert!(!cached.contains_row_group(0));
909 assert!(!cached.contains_row_group(1));
910
911 let preds = vec![col("tag_1").eq(lit("d"))];
932 let inverted_index_applier = build_inverted_index_applier(&preds);
933 let bloom_filter_applier = build_bloom_filter_applier(&preds);
934
935 let builder = ParquetReaderBuilder::new(
936 FILE_DIR.to_string(),
937 PathType::Bare,
938 handle.clone(),
939 object_store.clone(),
940 )
941 .predicate(Some(Predicate::new(preds)))
942 .inverted_index_applier(inverted_index_applier.clone())
943 .bloom_filter_index_applier(bloom_filter_applier.clone())
944 .cache(CacheStrategy::EnableAll(cache.clone()));
945
946 let mut metrics = ReaderMetrics::default();
947 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
948 let mut reader = ParquetReader::new(Arc::new(context), selection)
949 .await
950 .unwrap();
951 check_reader_result(
952 &mut reader,
953 &[
954 new_batch_by_range(&["a", "d"], 0, 20),
955 new_batch_by_range(&["b", "d"], 0, 20),
956 new_batch_by_range(&["c", "d"], 0, 10),
957 new_batch_by_range(&["c", "d"], 10, 20),
958 ],
959 )
960 .await;
961
962 assert_eq!(metrics.filter_metrics.rg_total, 4);
963 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
964 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
965 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
966 let cached = index_result_cache
967 .get(
968 bloom_filter_applier.unwrap().predicate_key(),
969 handle.file_id().file_id(),
970 )
971 .unwrap();
972 assert!(cached.contains_row_group(0));
973 assert!(cached.contains_row_group(1));
974 assert!(cached.contains_row_group(2));
975 assert!(cached.contains_row_group(3));
976 }
977
978 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
981 assert!(end >= start);
982 let metadata = Arc::new(sst_region_metadata());
983 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
984
985 let num_rows = end - start;
986 let mut columns = Vec::new();
987
988 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
990 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
991
992 for _ in 0..num_rows {
993 tag_0_builder.append_value(tags[0]);
994 tag_1_builder.append_value(tags[1]);
995 }
996
997 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
998 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
999
1000 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1002 columns.push(Arc::new(UInt64Array::from(field_values)));
1003
1004 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1006 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1007
1008 let pk = new_primary_key(tags);
1010 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1011 for _ in 0..num_rows {
1012 pk_builder.append(&pk).unwrap();
1013 }
1014 columns.push(Arc::new(pk_builder.finish()));
1015
1016 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1018
1019 columns.push(Arc::new(UInt8Array::from_value(
1021 OpType::Put as u8,
1022 num_rows,
1023 )));
1024
1025 RecordBatch::try_new(flat_schema, columns).unwrap()
1026 }
1027
1028 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1030 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1031 }
1032
1033 #[tokio::test]
1034 async fn test_write_flat_with_index() {
1035 let mut env = TestEnv::new().await;
1036 let object_store = env.init_object_store_manager();
1037 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1038 let metadata = Arc::new(sst_region_metadata());
1039 let row_group_size = 50;
1040
1041 let flat_batches = vec![
1043 new_record_batch_by_range(&["a", "d"], 0, 20),
1044 new_record_batch_by_range(&["b", "d"], 0, 20),
1045 new_record_batch_by_range(&["c", "d"], 0, 20),
1046 new_record_batch_by_range(&["c", "f"], 0, 40),
1047 new_record_batch_by_range(&["c", "h"], 100, 200),
1048 ];
1049
1050 let flat_source = new_flat_source_from_record_batches(flat_batches);
1051
1052 let write_opts = WriteOptions {
1053 row_group_size,
1054 ..Default::default()
1055 };
1056
1057 let puffin_manager = env
1058 .get_puffin_manager()
1059 .build(object_store.clone(), file_path.clone());
1060 let intermediate_manager = env.get_intermediate_manager();
1061
1062 let indexer_builder = IndexerBuilderImpl {
1063 op_type: OperationType::Flush,
1064 metadata: metadata.clone(),
1065 row_group_size,
1066 puffin_manager,
1067 intermediate_manager,
1068 index_options: IndexOptions {
1069 inverted_index: InvertedIndexOptions {
1070 segment_row_count: 1,
1071 ..Default::default()
1072 },
1073 },
1074 inverted_index_config: Default::default(),
1075 fulltext_index_config: Default::default(),
1076 bloom_filter_index_config: Default::default(),
1077 };
1078
1079 let mut writer = ParquetWriter::new_with_object_store(
1080 object_store.clone(),
1081 metadata.clone(),
1082 indexer_builder,
1083 file_path.clone(),
1084 Metrics::new(WriteType::Flush),
1085 )
1086 .await;
1087
1088 let info = writer
1089 .write_all_flat(flat_source, &write_opts)
1090 .await
1091 .unwrap()
1092 .remove(0);
1093 assert_eq!(200, info.num_rows);
1094 assert!(info.file_size > 0);
1095 assert!(info.index_metadata.file_size > 0);
1096
1097 assert!(info.index_metadata.inverted_index.index_size > 0);
1098 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1099 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1100
1101 assert!(info.index_metadata.bloom_filter.index_size > 0);
1102 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1103 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1104
1105 assert_eq!(
1106 (
1107 Timestamp::new_millisecond(0),
1108 Timestamp::new_millisecond(199)
1109 ),
1110 info.time_range
1111 );
1112 }
1113
1114 #[tokio::test]
1115 async fn test_read_with_override_sequence() {
1116 let mut env = TestEnv::new().await;
1117 let object_store = env.init_object_store_manager();
1118 let handle = sst_file_handle(0, 1000);
1119 let file_path = FixedPathProvider {
1120 region_file_id: handle.file_id(),
1121 };
1122 let metadata = Arc::new(sst_region_metadata());
1123
1124 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1126 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1127 let source = new_source(&[batch1, batch2]);
1128
1129 let write_opts = WriteOptions {
1130 row_group_size: 50,
1131 ..Default::default()
1132 };
1133
1134 let mut writer = ParquetWriter::new_with_object_store(
1135 object_store.clone(),
1136 metadata.clone(),
1137 NoopIndexBuilder,
1138 file_path,
1139 Metrics::new(WriteType::Flush),
1140 )
1141 .await;
1142
1143 writer
1144 .write_all(source, None, &write_opts)
1145 .await
1146 .unwrap()
1147 .remove(0);
1148
1149 let builder = ParquetReaderBuilder::new(
1151 FILE_DIR.to_string(),
1152 PathType::Bare,
1153 handle.clone(),
1154 object_store.clone(),
1155 );
1156 let mut reader = builder.build().await.unwrap();
1157 let mut normal_batches = Vec::new();
1158 while let Some(batch) = reader.next_batch().await.unwrap() {
1159 normal_batches.push(batch);
1160 }
1161
1162 let custom_sequence = 12345u64;
1164 let file_meta = handle.meta_ref();
1165 let mut override_file_meta = file_meta.clone();
1166 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1167 let override_handle = FileHandle::new(
1168 override_file_meta,
1169 Arc::new(crate::sst::file_purger::NoopFilePurger),
1170 );
1171
1172 let builder = ParquetReaderBuilder::new(
1173 FILE_DIR.to_string(),
1174 PathType::Bare,
1175 override_handle,
1176 object_store.clone(),
1177 );
1178 let mut reader = builder.build().await.unwrap();
1179 let mut override_batches = Vec::new();
1180 while let Some(batch) = reader.next_batch().await.unwrap() {
1181 override_batches.push(batch);
1182 }
1183
1184 assert_eq!(normal_batches.len(), override_batches.len());
1186 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1187 let expected_batch = {
1189 let num_rows = normal.num_rows();
1190 let mut builder = BatchBuilder::from(normal);
1191 builder
1192 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1193 .unwrap();
1194
1195 builder.build().unwrap()
1196 };
1197
1198 assert_eq!(*override_batch, expected_batch);
1200 }
1201 }
1202}