1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub num_rows: usize,
81 pub num_row_groups: u64,
83 pub file_metadata: Option<Arc<ParquetMetaData>>,
85 pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91 use std::collections::HashSet;
92 use std::sync::Arc;
93
94 use api::v1::OpType;
95 use common_time::Timestamp;
96 use datafusion_common::{Column, ScalarValue};
97 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
98 use datatypes::arrow;
99 use datatypes::arrow::array::{
100 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
101 TimestampMillisecondArray, UInt8Array, UInt64Array,
102 };
103 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
104 use parquet::arrow::AsyncArrowWriter;
105 use parquet::basic::{Compression, Encoding, ZstdLevel};
106 use parquet::file::metadata::KeyValue;
107 use parquet::file::properties::WriterProperties;
108 use store_api::region_request::PathType;
109 use table::predicate::Predicate;
110 use tokio_util::compat::FuturesAsyncWriteCompatExt;
111
112 use super::*;
113 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
114 use crate::cache::{CacheManager, CacheStrategy, PageKey};
115 use crate::config::IndexConfig;
116 use crate::read::{BatchBuilder, BatchReader, FlatSource};
117 use crate::region::options::{IndexOptions, InvertedIndexOptions};
118 use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
119 use crate::sst::file_purger::NoopFilePurger;
120 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
121 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
122 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
123 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
124 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
125 use crate::sst::parquet::writer::ParquetWriter;
126 use crate::sst::{
127 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
128 };
129 use crate::test_util::sst_util::{
130 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
131 new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
132 sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
133 };
134 use crate::test_util::{TestEnv, check_reader_result};
135
136 const FILE_DIR: &str = "/";
137
138 #[derive(Clone)]
139 struct FixedPathProvider {
140 region_file_id: RegionFileId,
141 }
142
143 impl FilePathProvider for FixedPathProvider {
144 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
145 location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
146 }
147
148 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
149 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
150 }
151 }
152
153 struct NoopIndexBuilder;
154
155 #[async_trait::async_trait]
156 impl IndexerBuilder for NoopIndexBuilder {
157 async fn build(&self, _file_id: FileId) -> Indexer {
158 Indexer::default()
159 }
160 }
161
162 #[tokio::test]
163 async fn test_write_read() {
164 let mut env = TestEnv::new().await;
165 let object_store = env.init_object_store_manager();
166 let handle = sst_file_handle(0, 1000);
167 let file_path = FixedPathProvider {
168 region_file_id: handle.file_id(),
169 };
170 let metadata = Arc::new(sst_region_metadata());
171 let source = new_source(&[
172 new_batch_by_range(&["a", "d"], 0, 60),
173 new_batch_by_range(&["b", "f"], 0, 40),
174 new_batch_by_range(&["b", "h"], 100, 200),
175 ]);
176 let write_opts = WriteOptions {
178 row_group_size: 50,
179 ..Default::default()
180 };
181
182 let mut writer = ParquetWriter::new_with_object_store(
183 object_store.clone(),
184 metadata.clone(),
185 IndexConfig::default(),
186 NoopIndexBuilder,
187 file_path,
188 Metrics::new(WriteType::Flush),
189 )
190 .await;
191
192 let info = writer
193 .write_all(source, None, &write_opts)
194 .await
195 .unwrap()
196 .remove(0);
197 assert_eq!(200, info.num_rows);
198 assert!(info.file_size > 0);
199 assert_eq!(
200 (
201 Timestamp::new_millisecond(0),
202 Timestamp::new_millisecond(199)
203 ),
204 info.time_range
205 );
206
207 let builder = ParquetReaderBuilder::new(
208 FILE_DIR.to_string(),
209 PathType::Bare,
210 handle.clone(),
211 object_store,
212 );
213 let mut reader = builder.build().await.unwrap();
214 check_reader_result(
215 &mut reader,
216 &[
217 new_batch_by_range(&["a", "d"], 0, 50),
218 new_batch_by_range(&["a", "d"], 50, 60),
219 new_batch_by_range(&["b", "f"], 0, 40),
220 new_batch_by_range(&["b", "h"], 100, 150),
221 new_batch_by_range(&["b", "h"], 150, 200),
222 ],
223 )
224 .await;
225 }
226
227 #[tokio::test]
228 async fn test_read_with_cache() {
229 let mut env = TestEnv::new().await;
230 let object_store = env.init_object_store_manager();
231 let handle = sst_file_handle(0, 1000);
232 let metadata = Arc::new(sst_region_metadata());
233 let source = new_source(&[
234 new_batch_by_range(&["a", "d"], 0, 60),
235 new_batch_by_range(&["b", "f"], 0, 40),
236 new_batch_by_range(&["b", "h"], 100, 200),
237 ]);
238 let write_opts = WriteOptions {
240 row_group_size: 50,
241 ..Default::default()
242 };
243 let mut writer = ParquetWriter::new_with_object_store(
245 object_store.clone(),
246 metadata.clone(),
247 IndexConfig::default(),
248 NoopIndexBuilder,
249 FixedPathProvider {
250 region_file_id: handle.file_id(),
251 },
252 Metrics::new(WriteType::Flush),
253 )
254 .await;
255
256 let sst_info = writer
257 .write_all(source, None, &write_opts)
258 .await
259 .unwrap()
260 .remove(0);
261
262 let cache = CacheStrategy::EnableAll(Arc::new(
264 CacheManager::builder()
265 .page_cache_size(64 * 1024 * 1024)
266 .build(),
267 ));
268 let builder = ParquetReaderBuilder::new(
269 FILE_DIR.to_string(),
270 PathType::Bare,
271 handle.clone(),
272 object_store,
273 )
274 .cache(cache.clone());
275 for _ in 0..3 {
276 let mut reader = builder.build().await.unwrap();
277 check_reader_result(
278 &mut reader,
279 &[
280 new_batch_by_range(&["a", "d"], 0, 50),
281 new_batch_by_range(&["a", "d"], 50, 60),
282 new_batch_by_range(&["b", "f"], 0, 40),
283 new_batch_by_range(&["b", "h"], 100, 150),
284 new_batch_by_range(&["b", "h"], 150, 200),
285 ],
286 )
287 .await;
288 }
289
290 let parquet_meta = sst_info.file_metadata.unwrap();
291 let get_ranges = |row_group_idx: usize| {
292 let row_group = parquet_meta.row_group(row_group_idx);
293 let mut ranges = Vec::with_capacity(row_group.num_columns());
294 for i in 0..row_group.num_columns() {
295 let (start, length) = row_group.column(i).byte_range();
296 ranges.push(start..start + length);
297 }
298
299 ranges
300 };
301
302 for i in 0..4 {
304 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
305 assert!(cache.get_pages(&page_key).is_some());
306 }
307 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
308 assert!(cache.get_pages(&page_key).is_none());
309 }
310
311 #[tokio::test]
312 async fn test_parquet_metadata_eq() {
313 let mut env = crate::test_util::TestEnv::new().await;
315 let object_store = env.init_object_store_manager();
316 let handle = sst_file_handle(0, 1000);
317 let metadata = Arc::new(sst_region_metadata());
318 let source = new_source(&[
319 new_batch_by_range(&["a", "d"], 0, 60),
320 new_batch_by_range(&["b", "f"], 0, 40),
321 new_batch_by_range(&["b", "h"], 100, 200),
322 ]);
323 let write_opts = WriteOptions {
324 row_group_size: 50,
325 ..Default::default()
326 };
327
328 let mut writer = ParquetWriter::new_with_object_store(
331 object_store.clone(),
332 metadata.clone(),
333 IndexConfig::default(),
334 NoopIndexBuilder,
335 FixedPathProvider {
336 region_file_id: handle.file_id(),
337 },
338 Metrics::new(WriteType::Flush),
339 )
340 .await;
341
342 let sst_info = writer
343 .write_all(source, None, &write_opts)
344 .await
345 .unwrap()
346 .remove(0);
347 let writer_metadata = sst_info.file_metadata.unwrap();
348
349 let builder = ParquetReaderBuilder::new(
351 FILE_DIR.to_string(),
352 PathType::Bare,
353 handle.clone(),
354 object_store,
355 );
356 let reader = builder.build().await.unwrap();
357 let reader_metadata = reader.parquet_metadata();
358
359 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
360 }
361
362 #[tokio::test]
363 async fn test_read_with_tag_filter() {
364 let mut env = TestEnv::new().await;
365 let object_store = env.init_object_store_manager();
366 let handle = sst_file_handle(0, 1000);
367 let metadata = Arc::new(sst_region_metadata());
368 let source = new_source(&[
369 new_batch_by_range(&["a", "d"], 0, 60),
370 new_batch_by_range(&["b", "f"], 0, 40),
371 new_batch_by_range(&["b", "h"], 100, 200),
372 ]);
373 let write_opts = WriteOptions {
375 row_group_size: 50,
376 ..Default::default()
377 };
378 let mut writer = ParquetWriter::new_with_object_store(
380 object_store.clone(),
381 metadata.clone(),
382 IndexConfig::default(),
383 NoopIndexBuilder,
384 FixedPathProvider {
385 region_file_id: handle.file_id(),
386 },
387 Metrics::new(WriteType::Flush),
388 )
389 .await;
390 writer
391 .write_all(source, None, &write_opts)
392 .await
393 .unwrap()
394 .remove(0);
395
396 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
398 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
399 op: Operator::Eq,
400 right: Box::new("a".lit()),
401 })]));
402
403 let builder = ParquetReaderBuilder::new(
404 FILE_DIR.to_string(),
405 PathType::Bare,
406 handle.clone(),
407 object_store,
408 )
409 .predicate(predicate);
410 let mut reader = builder.build().await.unwrap();
411 check_reader_result(
412 &mut reader,
413 &[
414 new_batch_by_range(&["a", "d"], 0, 50),
415 new_batch_by_range(&["a", "d"], 50, 60),
416 ],
417 )
418 .await;
419 }
420
421 #[tokio::test]
422 async fn test_read_empty_batch() {
423 let mut env = TestEnv::new().await;
424 let object_store = env.init_object_store_manager();
425 let handle = sst_file_handle(0, 1000);
426 let metadata = Arc::new(sst_region_metadata());
427 let source = new_source(&[
428 new_batch_by_range(&["a", "z"], 0, 0),
429 new_batch_by_range(&["a", "z"], 100, 100),
430 new_batch_by_range(&["a", "z"], 200, 230),
431 ]);
432 let write_opts = WriteOptions {
434 row_group_size: 50,
435 ..Default::default()
436 };
437 let mut writer = ParquetWriter::new_with_object_store(
439 object_store.clone(),
440 metadata.clone(),
441 IndexConfig::default(),
442 NoopIndexBuilder,
443 FixedPathProvider {
444 region_file_id: handle.file_id(),
445 },
446 Metrics::new(WriteType::Flush),
447 )
448 .await;
449 writer
450 .write_all(source, None, &write_opts)
451 .await
452 .unwrap()
453 .remove(0);
454
455 let builder = ParquetReaderBuilder::new(
456 FILE_DIR.to_string(),
457 PathType::Bare,
458 handle.clone(),
459 object_store,
460 );
461 let mut reader = builder.build().await.unwrap();
462 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
463 }
464
465 #[tokio::test]
466 async fn test_read_with_field_filter() {
467 let mut env = TestEnv::new().await;
468 let object_store = env.init_object_store_manager();
469 let handle = sst_file_handle(0, 1000);
470 let metadata = Arc::new(sst_region_metadata());
471 let source = new_source(&[
472 new_batch_by_range(&["a", "d"], 0, 60),
473 new_batch_by_range(&["b", "f"], 0, 40),
474 new_batch_by_range(&["b", "h"], 100, 200),
475 ]);
476 let write_opts = WriteOptions {
478 row_group_size: 50,
479 ..Default::default()
480 };
481 let mut writer = ParquetWriter::new_with_object_store(
483 object_store.clone(),
484 metadata.clone(),
485 IndexConfig::default(),
486 NoopIndexBuilder,
487 FixedPathProvider {
488 region_file_id: handle.file_id(),
489 },
490 Metrics::new(WriteType::Flush),
491 )
492 .await;
493
494 writer
495 .write_all(source, None, &write_opts)
496 .await
497 .unwrap()
498 .remove(0);
499
500 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
502 left: Box::new(Expr::Column(Column::from_name("field_0"))),
503 op: Operator::GtEq,
504 right: Box::new(150u64.lit()),
505 })]));
506
507 let builder = ParquetReaderBuilder::new(
508 FILE_DIR.to_string(),
509 PathType::Bare,
510 handle.clone(),
511 object_store,
512 )
513 .predicate(predicate);
514 let mut reader = builder.build().await.unwrap();
515 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
516 }
517
518 #[tokio::test]
519 async fn test_read_large_binary() {
520 let mut env = TestEnv::new().await;
521 let object_store = env.init_object_store_manager();
522 let handle = sst_file_handle(0, 1000);
523 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
524
525 let write_opts = WriteOptions {
526 row_group_size: 50,
527 ..Default::default()
528 };
529
530 let metadata = build_test_binary_test_region_metadata();
531 let json = metadata.to_json().unwrap();
532 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
533
534 let props_builder = WriterProperties::builder()
535 .set_key_value_metadata(Some(vec![key_value_meta]))
536 .set_compression(Compression::ZSTD(ZstdLevel::default()))
537 .set_encoding(Encoding::PLAIN)
538 .set_max_row_group_size(write_opts.row_group_size);
539
540 let writer_props = props_builder.build();
541
542 let write_format = PrimaryKeyWriteFormat::new(metadata);
543 let fields: Vec<_> = write_format
544 .arrow_schema()
545 .fields()
546 .into_iter()
547 .map(|field| {
548 let data_type = field.data_type().clone();
549 if data_type == DataType::Binary {
550 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
551 } else {
552 Field::new(field.name(), data_type, field.is_nullable())
553 }
554 })
555 .collect();
556
557 let arrow_schema = Arc::new(Schema::new(fields));
558
559 assert_eq!(
561 &DataType::LargeBinary,
562 arrow_schema.field_with_name("field_0").unwrap().data_type()
563 );
564 let mut writer = AsyncArrowWriter::try_new(
565 object_store
566 .writer_with(&file_path)
567 .concurrent(DEFAULT_WRITE_CONCURRENCY)
568 .await
569 .map(|w| w.into_futures_async_write().compat_write())
570 .unwrap(),
571 arrow_schema.clone(),
572 Some(writer_props),
573 )
574 .unwrap();
575
576 let batch = new_batch_with_binary(&["a"], 0, 60);
577 let arrow_batch = write_format.convert_batch(&batch).unwrap();
578 let arrays: Vec<_> = arrow_batch
579 .columns()
580 .iter()
581 .map(|array| {
582 let data_type = array.data_type().clone();
583 if data_type == DataType::Binary {
584 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
585 } else {
586 array.clone()
587 }
588 })
589 .collect();
590 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
591
592 writer.write(&result).await.unwrap();
593 writer.close().await.unwrap();
594
595 let builder = ParquetReaderBuilder::new(
596 FILE_DIR.to_string(),
597 PathType::Bare,
598 handle.clone(),
599 object_store,
600 );
601 let mut reader = builder.build().await.unwrap();
602 check_reader_result(
603 &mut reader,
604 &[
605 new_batch_with_binary(&["a"], 0, 50),
606 new_batch_with_binary(&["a"], 50, 60),
607 ],
608 )
609 .await;
610 }
611
612 #[tokio::test]
613 async fn test_write_multiple_files() {
614 common_telemetry::init_default_ut_logging();
615 let mut env = TestEnv::new().await;
617 let object_store = env.init_object_store_manager();
618 let metadata = Arc::new(sst_region_metadata());
619 let batches = &[
620 new_batch_by_range(&["a", "d"], 0, 1000),
621 new_batch_by_range(&["b", "f"], 0, 1000),
622 new_batch_by_range(&["c", "g"], 0, 1000),
623 new_batch_by_range(&["b", "h"], 100, 200),
624 new_batch_by_range(&["b", "h"], 200, 300),
625 new_batch_by_range(&["b", "h"], 300, 1000),
626 ];
627 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
628
629 let source = new_source(batches);
630 let write_opts = WriteOptions {
631 row_group_size: 50,
632 max_file_size: Some(1024 * 16),
633 ..Default::default()
634 };
635
636 let path_provider = RegionFilePathFactory {
637 table_dir: "test".to_string(),
638 path_type: PathType::Bare,
639 };
640 let mut writer = ParquetWriter::new_with_object_store(
641 object_store.clone(),
642 metadata.clone(),
643 IndexConfig::default(),
644 NoopIndexBuilder,
645 path_provider,
646 Metrics::new(WriteType::Flush),
647 )
648 .await;
649
650 let files = writer.write_all(source, None, &write_opts).await.unwrap();
651 assert_eq!(2, files.len());
652
653 let mut rows_read = 0;
654 for f in &files {
655 let file_handle = sst_file_handle_with_file_id(
656 f.file_id,
657 f.time_range.0.value(),
658 f.time_range.1.value(),
659 );
660 let builder = ParquetReaderBuilder::new(
661 "test".to_string(),
662 PathType::Bare,
663 file_handle,
664 object_store.clone(),
665 );
666 let mut reader = builder.build().await.unwrap();
667 while let Some(batch) = reader.next_batch().await.unwrap() {
668 rows_read += batch.num_rows();
669 }
670 }
671 assert_eq!(total_rows, rows_read);
672 }
673
674 #[tokio::test]
675 async fn test_write_read_with_index() {
676 let mut env = TestEnv::new().await;
677 let object_store = env.init_object_store_manager();
678 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
679 let metadata = Arc::new(sst_region_metadata());
680 let row_group_size = 50;
681
682 let source = new_source(&[
683 new_batch_by_range(&["a", "d"], 0, 20),
684 new_batch_by_range(&["b", "d"], 0, 20),
685 new_batch_by_range(&["c", "d"], 0, 20),
686 new_batch_by_range(&["c", "f"], 0, 40),
687 new_batch_by_range(&["c", "h"], 100, 200),
688 ]);
689 let write_opts = WriteOptions {
691 row_group_size,
692 ..Default::default()
693 };
694
695 let puffin_manager = env
696 .get_puffin_manager()
697 .build(object_store.clone(), file_path.clone());
698 let intermediate_manager = env.get_intermediate_manager();
699
700 let indexer_builder = IndexerBuilderImpl {
701 build_type: IndexBuildType::Flush,
702 metadata: metadata.clone(),
703 row_group_size,
704 puffin_manager,
705 intermediate_manager,
706 index_options: IndexOptions {
707 inverted_index: InvertedIndexOptions {
708 segment_row_count: 1,
709 ..Default::default()
710 },
711 },
712 inverted_index_config: Default::default(),
713 fulltext_index_config: Default::default(),
714 bloom_filter_index_config: Default::default(),
715 };
716
717 let mut writer = ParquetWriter::new_with_object_store(
718 object_store.clone(),
719 metadata.clone(),
720 IndexConfig::default(),
721 indexer_builder,
722 file_path.clone(),
723 Metrics::new(WriteType::Flush),
724 )
725 .await;
726
727 let info = writer
728 .write_all(source, None, &write_opts)
729 .await
730 .unwrap()
731 .remove(0);
732 assert_eq!(200, info.num_rows);
733 assert!(info.file_size > 0);
734 assert!(info.index_metadata.file_size > 0);
735
736 assert!(info.index_metadata.inverted_index.index_size > 0);
737 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
738 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
739
740 assert!(info.index_metadata.bloom_filter.index_size > 0);
741 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
742 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
743
744 assert_eq!(
745 (
746 Timestamp::new_millisecond(0),
747 Timestamp::new_millisecond(199)
748 ),
749 info.time_range
750 );
751
752 let handle = FileHandle::new(
753 FileMeta {
754 region_id: metadata.region_id,
755 file_id: info.file_id,
756 time_range: info.time_range,
757 level: 0,
758 file_size: info.file_size,
759 available_indexes: info.index_metadata.build_available_indexes(),
760 index_file_size: info.index_metadata.file_size,
761 num_row_groups: info.num_row_groups,
762 num_rows: info.num_rows as u64,
763 sequence: None,
764 partition_expr: match &metadata.partition_expr {
765 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
766 .expect("partition expression should be valid JSON"),
767 None => None,
768 },
769 },
770 Arc::new(NoopFilePurger),
771 );
772
773 let cache = Arc::new(
774 CacheManager::builder()
775 .index_result_cache_size(1024 * 1024)
776 .index_metadata_size(1024 * 1024)
777 .index_content_page_size(1024 * 1024)
778 .index_content_size(1024 * 1024)
779 .puffin_metadata_size(1024 * 1024)
780 .build(),
781 );
782 let index_result_cache = cache.index_result_cache().unwrap();
783
784 let build_inverted_index_applier = |exprs: &[Expr]| {
785 InvertedIndexApplierBuilder::new(
786 FILE_DIR.to_string(),
787 PathType::Bare,
788 object_store.clone(),
789 &metadata,
790 HashSet::from_iter([0]),
791 env.get_puffin_manager(),
792 )
793 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
794 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
795 .build(exprs)
796 .unwrap()
797 .map(Arc::new)
798 };
799
800 let build_bloom_filter_applier = |exprs: &[Expr]| {
801 BloomFilterIndexApplierBuilder::new(
802 FILE_DIR.to_string(),
803 PathType::Bare,
804 object_store.clone(),
805 &metadata,
806 env.get_puffin_manager(),
807 )
808 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
809 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
810 .build(exprs)
811 .unwrap()
812 .map(Arc::new)
813 };
814
815 let preds = vec![col("tag_0").eq(lit("b"))];
832 let inverted_index_applier = build_inverted_index_applier(&preds);
833 let bloom_filter_applier = build_bloom_filter_applier(&preds);
834
835 let builder = ParquetReaderBuilder::new(
836 FILE_DIR.to_string(),
837 PathType::Bare,
838 handle.clone(),
839 object_store.clone(),
840 )
841 .predicate(Some(Predicate::new(preds)))
842 .inverted_index_applier(inverted_index_applier.clone())
843 .bloom_filter_index_applier(bloom_filter_applier.clone())
844 .cache(CacheStrategy::EnableAll(cache.clone()));
845
846 let mut metrics = ReaderMetrics::default();
847 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
848 let mut reader = ParquetReader::new(Arc::new(context), selection)
849 .await
850 .unwrap();
851 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
852
853 assert_eq!(metrics.filter_metrics.rg_total, 4);
854 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
855 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
856 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
857 let cached = index_result_cache
858 .get(
859 inverted_index_applier.unwrap().predicate_key(),
860 handle.file_id().file_id(),
861 )
862 .unwrap();
863 assert!(cached.contains_row_group(0));
865 assert!(cached.contains_row_group(1));
866 assert!(cached.contains_row_group(2));
867 assert!(cached.contains_row_group(3));
868
869 let preds = vec![
884 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
885 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
886 col("tag_1").eq(lit("d")),
887 ];
888 let inverted_index_applier = build_inverted_index_applier(&preds);
889 let bloom_filter_applier = build_bloom_filter_applier(&preds);
890
891 let builder = ParquetReaderBuilder::new(
892 FILE_DIR.to_string(),
893 PathType::Bare,
894 handle.clone(),
895 object_store.clone(),
896 )
897 .predicate(Some(Predicate::new(preds)))
898 .inverted_index_applier(inverted_index_applier.clone())
899 .bloom_filter_index_applier(bloom_filter_applier.clone())
900 .cache(CacheStrategy::EnableAll(cache.clone()));
901
902 let mut metrics = ReaderMetrics::default();
903 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
904 let mut reader = ParquetReader::new(Arc::new(context), selection)
905 .await
906 .unwrap();
907 check_reader_result(&mut reader, &[]).await;
908
909 assert_eq!(metrics.filter_metrics.rg_total, 4);
910 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
911 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
912 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
913 let cached = index_result_cache
914 .get(
915 bloom_filter_applier.unwrap().predicate_key(),
916 handle.file_id().file_id(),
917 )
918 .unwrap();
919 assert!(cached.contains_row_group(2));
920 assert!(cached.contains_row_group(3));
921 assert!(!cached.contains_row_group(0));
922 assert!(!cached.contains_row_group(1));
923
924 let preds = vec![col("tag_1").eq(lit("d"))];
945 let inverted_index_applier = build_inverted_index_applier(&preds);
946 let bloom_filter_applier = build_bloom_filter_applier(&preds);
947
948 let builder = ParquetReaderBuilder::new(
949 FILE_DIR.to_string(),
950 PathType::Bare,
951 handle.clone(),
952 object_store.clone(),
953 )
954 .predicate(Some(Predicate::new(preds)))
955 .inverted_index_applier(inverted_index_applier.clone())
956 .bloom_filter_index_applier(bloom_filter_applier.clone())
957 .cache(CacheStrategy::EnableAll(cache.clone()));
958
959 let mut metrics = ReaderMetrics::default();
960 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
961 let mut reader = ParquetReader::new(Arc::new(context), selection)
962 .await
963 .unwrap();
964 check_reader_result(
965 &mut reader,
966 &[
967 new_batch_by_range(&["a", "d"], 0, 20),
968 new_batch_by_range(&["b", "d"], 0, 20),
969 new_batch_by_range(&["c", "d"], 0, 10),
970 new_batch_by_range(&["c", "d"], 10, 20),
971 ],
972 )
973 .await;
974
975 assert_eq!(metrics.filter_metrics.rg_total, 4);
976 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
977 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
978 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
979 let cached = index_result_cache
980 .get(
981 bloom_filter_applier.unwrap().predicate_key(),
982 handle.file_id().file_id(),
983 )
984 .unwrap();
985 assert!(cached.contains_row_group(0));
986 assert!(cached.contains_row_group(1));
987 assert!(cached.contains_row_group(2));
988 assert!(cached.contains_row_group(3));
989 }
990
991 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
994 assert!(end >= start);
995 let metadata = Arc::new(sst_region_metadata());
996 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
997
998 let num_rows = end - start;
999 let mut columns = Vec::new();
1000
1001 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1003 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1004
1005 for _ in 0..num_rows {
1006 tag_0_builder.append_value(tags[0]);
1007 tag_1_builder.append_value(tags[1]);
1008 }
1009
1010 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1011 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1012
1013 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1015 columns.push(Arc::new(UInt64Array::from(field_values)));
1016
1017 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1019 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1020
1021 let pk = new_primary_key(tags);
1023 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1024 for _ in 0..num_rows {
1025 pk_builder.append(&pk).unwrap();
1026 }
1027 columns.push(Arc::new(pk_builder.finish()));
1028
1029 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1031
1032 columns.push(Arc::new(UInt8Array::from_value(
1034 OpType::Put as u8,
1035 num_rows,
1036 )));
1037
1038 RecordBatch::try_new(flat_schema, columns).unwrap()
1039 }
1040
1041 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1043 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1044 }
1045
1046 #[tokio::test]
1047 async fn test_write_flat_with_index() {
1048 let mut env = TestEnv::new().await;
1049 let object_store = env.init_object_store_manager();
1050 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1051 let metadata = Arc::new(sst_region_metadata());
1052 let row_group_size = 50;
1053
1054 let flat_batches = vec![
1056 new_record_batch_by_range(&["a", "d"], 0, 20),
1057 new_record_batch_by_range(&["b", "d"], 0, 20),
1058 new_record_batch_by_range(&["c", "d"], 0, 20),
1059 new_record_batch_by_range(&["c", "f"], 0, 40),
1060 new_record_batch_by_range(&["c", "h"], 100, 200),
1061 ];
1062
1063 let flat_source = new_flat_source_from_record_batches(flat_batches);
1064
1065 let write_opts = WriteOptions {
1066 row_group_size,
1067 ..Default::default()
1068 };
1069
1070 let puffin_manager = env
1071 .get_puffin_manager()
1072 .build(object_store.clone(), file_path.clone());
1073 let intermediate_manager = env.get_intermediate_manager();
1074
1075 let indexer_builder = IndexerBuilderImpl {
1076 build_type: IndexBuildType::Flush,
1077 metadata: metadata.clone(),
1078 row_group_size,
1079 puffin_manager,
1080 intermediate_manager,
1081 index_options: IndexOptions {
1082 inverted_index: InvertedIndexOptions {
1083 segment_row_count: 1,
1084 ..Default::default()
1085 },
1086 },
1087 inverted_index_config: Default::default(),
1088 fulltext_index_config: Default::default(),
1089 bloom_filter_index_config: Default::default(),
1090 };
1091
1092 let mut writer = ParquetWriter::new_with_object_store(
1093 object_store.clone(),
1094 metadata.clone(),
1095 IndexConfig::default(),
1096 indexer_builder,
1097 file_path.clone(),
1098 Metrics::new(WriteType::Flush),
1099 )
1100 .await;
1101
1102 let info = writer
1103 .write_all_flat(flat_source, &write_opts)
1104 .await
1105 .unwrap()
1106 .remove(0);
1107 assert_eq!(200, info.num_rows);
1108 assert!(info.file_size > 0);
1109 assert!(info.index_metadata.file_size > 0);
1110
1111 assert!(info.index_metadata.inverted_index.index_size > 0);
1112 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1113 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1114
1115 assert!(info.index_metadata.bloom_filter.index_size > 0);
1116 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1117 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1118
1119 assert_eq!(
1120 (
1121 Timestamp::new_millisecond(0),
1122 Timestamp::new_millisecond(199)
1123 ),
1124 info.time_range
1125 );
1126 }
1127
1128 #[tokio::test]
1129 async fn test_read_with_override_sequence() {
1130 let mut env = TestEnv::new().await;
1131 let object_store = env.init_object_store_manager();
1132 let handle = sst_file_handle(0, 1000);
1133 let file_path = FixedPathProvider {
1134 region_file_id: handle.file_id(),
1135 };
1136 let metadata = Arc::new(sst_region_metadata());
1137
1138 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1140 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1141 let source = new_source(&[batch1, batch2]);
1142
1143 let write_opts = WriteOptions {
1144 row_group_size: 50,
1145 ..Default::default()
1146 };
1147
1148 let mut writer = ParquetWriter::new_with_object_store(
1149 object_store.clone(),
1150 metadata.clone(),
1151 IndexConfig::default(),
1152 NoopIndexBuilder,
1153 file_path,
1154 Metrics::new(WriteType::Flush),
1155 )
1156 .await;
1157
1158 writer
1159 .write_all(source, None, &write_opts)
1160 .await
1161 .unwrap()
1162 .remove(0);
1163
1164 let builder = ParquetReaderBuilder::new(
1166 FILE_DIR.to_string(),
1167 PathType::Bare,
1168 handle.clone(),
1169 object_store.clone(),
1170 );
1171 let mut reader = builder.build().await.unwrap();
1172 let mut normal_batches = Vec::new();
1173 while let Some(batch) = reader.next_batch().await.unwrap() {
1174 normal_batches.push(batch);
1175 }
1176
1177 let custom_sequence = 12345u64;
1179 let file_meta = handle.meta_ref();
1180 let mut override_file_meta = file_meta.clone();
1181 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1182 let override_handle = FileHandle::new(
1183 override_file_meta,
1184 Arc::new(crate::sst::file_purger::NoopFilePurger),
1185 );
1186
1187 let builder = ParquetReaderBuilder::new(
1188 FILE_DIR.to_string(),
1189 PathType::Bare,
1190 override_handle,
1191 object_store.clone(),
1192 );
1193 let mut reader = builder.build().await.unwrap();
1194 let mut override_batches = Vec::new();
1195 while let Some(batch) = reader.next_batch().await.unwrap() {
1196 override_batches.push(batch);
1197 }
1198
1199 assert_eq!(normal_batches.len(), override_batches.len());
1201 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1202 let expected_batch = {
1204 let num_rows = normal.num_rows();
1205 let mut builder = BatchBuilder::from(normal);
1206 builder
1207 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1208 .unwrap();
1209
1210 builder.build().unwrap()
1211 };
1212
1213 assert_eq!(*override_batch, expected_batch);
1215 }
1216 }
1217}