1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub max_row_group_uncompressed_size: u64,
81 pub num_rows: usize,
83 pub num_row_groups: u64,
85 pub file_metadata: Option<Arc<ParquetMetaData>>,
87 pub index_metadata: IndexOutput,
89 pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashSet;
96 use std::sync::Arc;
97
98 use api::v1::OpType;
99 use common_time::Timestamp;
100 use datafusion_common::{Column, ScalarValue};
101 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
102 use datatypes::arrow;
103 use datatypes::arrow::array::{
104 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
105 TimestampMillisecondArray, UInt8Array, UInt64Array,
106 };
107 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
108 use parquet::arrow::AsyncArrowWriter;
109 use parquet::basic::{Compression, Encoding, ZstdLevel};
110 use parquet::file::metadata::KeyValue;
111 use parquet::file::properties::WriterProperties;
112 use store_api::region_request::PathType;
113 use table::predicate::Predicate;
114 use tokio_util::compat::FuturesAsyncWriteCompatExt;
115
116 use super::*;
117 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
118 use crate::cache::{CacheManager, CacheStrategy, PageKey};
119 use crate::config::IndexConfig;
120 use crate::read::{BatchBuilder, BatchReader, FlatSource};
121 use crate::region::options::{IndexOptions, InvertedIndexOptions};
122 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
123 use crate::sst::file_purger::NoopFilePurger;
124 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
125 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
126 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
127 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
128 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
129 use crate::sst::parquet::writer::ParquetWriter;
130 use crate::sst::{
131 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
132 };
133 use crate::test_util::sst_util::{
134 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
135 new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
136 sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
137 };
138 use crate::test_util::{TestEnv, check_reader_result};
139
140 const FILE_DIR: &str = "/";
141
142 #[derive(Clone)]
143 struct FixedPathProvider {
144 region_file_id: RegionFileId,
145 }
146
147 impl FilePathProvider for FixedPathProvider {
148 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
149 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
150 }
151
152 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
153 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
154 }
155
156 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
157 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
158 }
159 }
160
161 struct NoopIndexBuilder;
162
163 #[async_trait::async_trait]
164 impl IndexerBuilder for NoopIndexBuilder {
165 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
166 Indexer::default()
167 }
168 }
169
170 #[tokio::test]
171 async fn test_write_read() {
172 let mut env = TestEnv::new().await;
173 let object_store = env.init_object_store_manager();
174 let handle = sst_file_handle(0, 1000);
175 let file_path = FixedPathProvider {
176 region_file_id: handle.file_id(),
177 };
178 let metadata = Arc::new(sst_region_metadata());
179 let source = new_source(&[
180 new_batch_by_range(&["a", "d"], 0, 60),
181 new_batch_by_range(&["b", "f"], 0, 40),
182 new_batch_by_range(&["b", "h"], 100, 200),
183 ]);
184 let write_opts = WriteOptions {
186 row_group_size: 50,
187 ..Default::default()
188 };
189
190 let mut metrics = Metrics::new(WriteType::Flush);
191 let mut writer = ParquetWriter::new_with_object_store(
192 object_store.clone(),
193 metadata.clone(),
194 IndexConfig::default(),
195 NoopIndexBuilder,
196 file_path,
197 &mut metrics,
198 )
199 .await;
200
201 let info = writer
202 .write_all(source, None, &write_opts)
203 .await
204 .unwrap()
205 .remove(0);
206 assert_eq!(200, info.num_rows);
207 assert!(info.file_size > 0);
208 assert_eq!(
209 (
210 Timestamp::new_millisecond(0),
211 Timestamp::new_millisecond(199)
212 ),
213 info.time_range
214 );
215
216 let builder = ParquetReaderBuilder::new(
217 FILE_DIR.to_string(),
218 PathType::Bare,
219 handle.clone(),
220 object_store,
221 );
222 let mut reader = builder.build().await.unwrap();
223 check_reader_result(
224 &mut reader,
225 &[
226 new_batch_by_range(&["a", "d"], 0, 50),
227 new_batch_by_range(&["a", "d"], 50, 60),
228 new_batch_by_range(&["b", "f"], 0, 40),
229 new_batch_by_range(&["b", "h"], 100, 150),
230 new_batch_by_range(&["b", "h"], 150, 200),
231 ],
232 )
233 .await;
234 }
235
236 #[tokio::test]
237 async fn test_read_with_cache() {
238 let mut env = TestEnv::new().await;
239 let object_store = env.init_object_store_manager();
240 let handle = sst_file_handle(0, 1000);
241 let metadata = Arc::new(sst_region_metadata());
242 let source = new_source(&[
243 new_batch_by_range(&["a", "d"], 0, 60),
244 new_batch_by_range(&["b", "f"], 0, 40),
245 new_batch_by_range(&["b", "h"], 100, 200),
246 ]);
247 let write_opts = WriteOptions {
249 row_group_size: 50,
250 ..Default::default()
251 };
252 let mut metrics = Metrics::new(WriteType::Flush);
254 let mut writer = ParquetWriter::new_with_object_store(
255 object_store.clone(),
256 metadata.clone(),
257 IndexConfig::default(),
258 NoopIndexBuilder,
259 FixedPathProvider {
260 region_file_id: handle.file_id(),
261 },
262 &mut metrics,
263 )
264 .await;
265
266 let sst_info = writer
267 .write_all(source, None, &write_opts)
268 .await
269 .unwrap()
270 .remove(0);
271
272 let cache = CacheStrategy::EnableAll(Arc::new(
274 CacheManager::builder()
275 .page_cache_size(64 * 1024 * 1024)
276 .build(),
277 ));
278 let builder = ParquetReaderBuilder::new(
279 FILE_DIR.to_string(),
280 PathType::Bare,
281 handle.clone(),
282 object_store,
283 )
284 .cache(cache.clone());
285 for _ in 0..3 {
286 let mut reader = builder.build().await.unwrap();
287 check_reader_result(
288 &mut reader,
289 &[
290 new_batch_by_range(&["a", "d"], 0, 50),
291 new_batch_by_range(&["a", "d"], 50, 60),
292 new_batch_by_range(&["b", "f"], 0, 40),
293 new_batch_by_range(&["b", "h"], 100, 150),
294 new_batch_by_range(&["b", "h"], 150, 200),
295 ],
296 )
297 .await;
298 }
299
300 let parquet_meta = sst_info.file_metadata.unwrap();
301 let get_ranges = |row_group_idx: usize| {
302 let row_group = parquet_meta.row_group(row_group_idx);
303 let mut ranges = Vec::with_capacity(row_group.num_columns());
304 for i in 0..row_group.num_columns() {
305 let (start, length) = row_group.column(i).byte_range();
306 ranges.push(start..start + length);
307 }
308
309 ranges
310 };
311
312 for i in 0..4 {
314 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
315 assert!(cache.get_pages(&page_key).is_some());
316 }
317 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
318 assert!(cache.get_pages(&page_key).is_none());
319 }
320
321 #[tokio::test]
322 async fn test_parquet_metadata_eq() {
323 let mut env = crate::test_util::TestEnv::new().await;
325 let object_store = env.init_object_store_manager();
326 let handle = sst_file_handle(0, 1000);
327 let metadata = Arc::new(sst_region_metadata());
328 let source = new_source(&[
329 new_batch_by_range(&["a", "d"], 0, 60),
330 new_batch_by_range(&["b", "f"], 0, 40),
331 new_batch_by_range(&["b", "h"], 100, 200),
332 ]);
333 let write_opts = WriteOptions {
334 row_group_size: 50,
335 ..Default::default()
336 };
337
338 let mut metrics = Metrics::new(WriteType::Flush);
341 let mut writer = ParquetWriter::new_with_object_store(
342 object_store.clone(),
343 metadata.clone(),
344 IndexConfig::default(),
345 NoopIndexBuilder,
346 FixedPathProvider {
347 region_file_id: handle.file_id(),
348 },
349 &mut metrics,
350 )
351 .await;
352
353 let sst_info = writer
354 .write_all(source, None, &write_opts)
355 .await
356 .unwrap()
357 .remove(0);
358 let writer_metadata = sst_info.file_metadata.unwrap();
359
360 let builder = ParquetReaderBuilder::new(
362 FILE_DIR.to_string(),
363 PathType::Bare,
364 handle.clone(),
365 object_store,
366 );
367 let reader = builder.build().await.unwrap();
368 let reader_metadata = reader.parquet_metadata();
369
370 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
371 }
372
373 #[tokio::test]
374 async fn test_read_with_tag_filter() {
375 let mut env = TestEnv::new().await;
376 let object_store = env.init_object_store_manager();
377 let handle = sst_file_handle(0, 1000);
378 let metadata = Arc::new(sst_region_metadata());
379 let source = new_source(&[
380 new_batch_by_range(&["a", "d"], 0, 60),
381 new_batch_by_range(&["b", "f"], 0, 40),
382 new_batch_by_range(&["b", "h"], 100, 200),
383 ]);
384 let write_opts = WriteOptions {
386 row_group_size: 50,
387 ..Default::default()
388 };
389 let mut metrics = Metrics::new(WriteType::Flush);
391 let mut writer = ParquetWriter::new_with_object_store(
392 object_store.clone(),
393 metadata.clone(),
394 IndexConfig::default(),
395 NoopIndexBuilder,
396 FixedPathProvider {
397 region_file_id: handle.file_id(),
398 },
399 &mut metrics,
400 )
401 .await;
402 writer
403 .write_all(source, None, &write_opts)
404 .await
405 .unwrap()
406 .remove(0);
407
408 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
410 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
411 op: Operator::Eq,
412 right: Box::new("a".lit()),
413 })]));
414
415 let builder = ParquetReaderBuilder::new(
416 FILE_DIR.to_string(),
417 PathType::Bare,
418 handle.clone(),
419 object_store,
420 )
421 .predicate(predicate);
422 let mut reader = builder.build().await.unwrap();
423 check_reader_result(
424 &mut reader,
425 &[
426 new_batch_by_range(&["a", "d"], 0, 50),
427 new_batch_by_range(&["a", "d"], 50, 60),
428 ],
429 )
430 .await;
431 }
432
433 #[tokio::test]
434 async fn test_read_empty_batch() {
435 let mut env = TestEnv::new().await;
436 let object_store = env.init_object_store_manager();
437 let handle = sst_file_handle(0, 1000);
438 let metadata = Arc::new(sst_region_metadata());
439 let source = new_source(&[
440 new_batch_by_range(&["a", "z"], 0, 0),
441 new_batch_by_range(&["a", "z"], 100, 100),
442 new_batch_by_range(&["a", "z"], 200, 230),
443 ]);
444 let write_opts = WriteOptions {
446 row_group_size: 50,
447 ..Default::default()
448 };
449 let mut metrics = Metrics::new(WriteType::Flush);
451 let mut writer = ParquetWriter::new_with_object_store(
452 object_store.clone(),
453 metadata.clone(),
454 IndexConfig::default(),
455 NoopIndexBuilder,
456 FixedPathProvider {
457 region_file_id: handle.file_id(),
458 },
459 &mut metrics,
460 )
461 .await;
462 writer
463 .write_all(source, None, &write_opts)
464 .await
465 .unwrap()
466 .remove(0);
467
468 let builder = ParquetReaderBuilder::new(
469 FILE_DIR.to_string(),
470 PathType::Bare,
471 handle.clone(),
472 object_store,
473 );
474 let mut reader = builder.build().await.unwrap();
475 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
476 }
477
478 #[tokio::test]
479 async fn test_read_with_field_filter() {
480 let mut env = TestEnv::new().await;
481 let object_store = env.init_object_store_manager();
482 let handle = sst_file_handle(0, 1000);
483 let metadata = Arc::new(sst_region_metadata());
484 let source = new_source(&[
485 new_batch_by_range(&["a", "d"], 0, 60),
486 new_batch_by_range(&["b", "f"], 0, 40),
487 new_batch_by_range(&["b", "h"], 100, 200),
488 ]);
489 let write_opts = WriteOptions {
491 row_group_size: 50,
492 ..Default::default()
493 };
494 let mut metrics = Metrics::new(WriteType::Flush);
496 let mut writer = ParquetWriter::new_with_object_store(
497 object_store.clone(),
498 metadata.clone(),
499 IndexConfig::default(),
500 NoopIndexBuilder,
501 FixedPathProvider {
502 region_file_id: handle.file_id(),
503 },
504 &mut metrics,
505 )
506 .await;
507
508 writer
509 .write_all(source, None, &write_opts)
510 .await
511 .unwrap()
512 .remove(0);
513
514 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
516 left: Box::new(Expr::Column(Column::from_name("field_0"))),
517 op: Operator::GtEq,
518 right: Box::new(150u64.lit()),
519 })]));
520
521 let builder = ParquetReaderBuilder::new(
522 FILE_DIR.to_string(),
523 PathType::Bare,
524 handle.clone(),
525 object_store,
526 )
527 .predicate(predicate);
528 let mut reader = builder.build().await.unwrap();
529 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
530 }
531
532 #[tokio::test]
533 async fn test_read_large_binary() {
534 let mut env = TestEnv::new().await;
535 let object_store = env.init_object_store_manager();
536 let handle = sst_file_handle(0, 1000);
537 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
538
539 let write_opts = WriteOptions {
540 row_group_size: 50,
541 ..Default::default()
542 };
543
544 let metadata = build_test_binary_test_region_metadata();
545 let json = metadata.to_json().unwrap();
546 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
547
548 let props_builder = WriterProperties::builder()
549 .set_key_value_metadata(Some(vec![key_value_meta]))
550 .set_compression(Compression::ZSTD(ZstdLevel::default()))
551 .set_encoding(Encoding::PLAIN)
552 .set_max_row_group_size(write_opts.row_group_size);
553
554 let writer_props = props_builder.build();
555
556 let write_format = PrimaryKeyWriteFormat::new(metadata);
557 let fields: Vec<_> = write_format
558 .arrow_schema()
559 .fields()
560 .into_iter()
561 .map(|field| {
562 let data_type = field.data_type().clone();
563 if data_type == DataType::Binary {
564 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
565 } else {
566 Field::new(field.name(), data_type, field.is_nullable())
567 }
568 })
569 .collect();
570
571 let arrow_schema = Arc::new(Schema::new(fields));
572
573 assert_eq!(
575 &DataType::LargeBinary,
576 arrow_schema.field_with_name("field_0").unwrap().data_type()
577 );
578 let mut writer = AsyncArrowWriter::try_new(
579 object_store
580 .writer_with(&file_path)
581 .concurrent(DEFAULT_WRITE_CONCURRENCY)
582 .await
583 .map(|w| w.into_futures_async_write().compat_write())
584 .unwrap(),
585 arrow_schema.clone(),
586 Some(writer_props),
587 )
588 .unwrap();
589
590 let batch = new_batch_with_binary(&["a"], 0, 60);
591 let arrow_batch = write_format.convert_batch(&batch).unwrap();
592 let arrays: Vec<_> = arrow_batch
593 .columns()
594 .iter()
595 .map(|array| {
596 let data_type = array.data_type().clone();
597 if data_type == DataType::Binary {
598 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
599 } else {
600 array.clone()
601 }
602 })
603 .collect();
604 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
605
606 writer.write(&result).await.unwrap();
607 writer.close().await.unwrap();
608
609 let builder = ParquetReaderBuilder::new(
610 FILE_DIR.to_string(),
611 PathType::Bare,
612 handle.clone(),
613 object_store,
614 );
615 let mut reader = builder.build().await.unwrap();
616 check_reader_result(
617 &mut reader,
618 &[
619 new_batch_with_binary(&["a"], 0, 50),
620 new_batch_with_binary(&["a"], 50, 60),
621 ],
622 )
623 .await;
624 }
625
626 #[tokio::test]
627 async fn test_write_multiple_files() {
628 common_telemetry::init_default_ut_logging();
629 let mut env = TestEnv::new().await;
631 let object_store = env.init_object_store_manager();
632 let metadata = Arc::new(sst_region_metadata());
633 let batches = &[
634 new_batch_by_range(&["a", "d"], 0, 1000),
635 new_batch_by_range(&["b", "f"], 0, 1000),
636 new_batch_by_range(&["c", "g"], 0, 1000),
637 new_batch_by_range(&["b", "h"], 100, 200),
638 new_batch_by_range(&["b", "h"], 200, 300),
639 new_batch_by_range(&["b", "h"], 300, 1000),
640 ];
641 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
642
643 let source = new_source(batches);
644 let write_opts = WriteOptions {
645 row_group_size: 50,
646 max_file_size: Some(1024 * 16),
647 ..Default::default()
648 };
649
650 let path_provider = RegionFilePathFactory {
651 table_dir: "test".to_string(),
652 path_type: PathType::Bare,
653 };
654 let mut metrics = Metrics::new(WriteType::Flush);
655 let mut writer = ParquetWriter::new_with_object_store(
656 object_store.clone(),
657 metadata.clone(),
658 IndexConfig::default(),
659 NoopIndexBuilder,
660 path_provider,
661 &mut metrics,
662 )
663 .await;
664
665 let files = writer.write_all(source, None, &write_opts).await.unwrap();
666 assert_eq!(2, files.len());
667
668 let mut rows_read = 0;
669 for f in &files {
670 let file_handle = sst_file_handle_with_file_id(
671 f.file_id,
672 f.time_range.0.value(),
673 f.time_range.1.value(),
674 );
675 let builder = ParquetReaderBuilder::new(
676 "test".to_string(),
677 PathType::Bare,
678 file_handle,
679 object_store.clone(),
680 );
681 let mut reader = builder.build().await.unwrap();
682 while let Some(batch) = reader.next_batch().await.unwrap() {
683 rows_read += batch.num_rows();
684 }
685 }
686 assert_eq!(total_rows, rows_read);
687 }
688
689 #[tokio::test]
690 async fn test_write_read_with_index() {
691 let mut env = TestEnv::new().await;
692 let object_store = env.init_object_store_manager();
693 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
694 let metadata = Arc::new(sst_region_metadata());
695 let row_group_size = 50;
696
697 let source = new_source(&[
698 new_batch_by_range(&["a", "d"], 0, 20),
699 new_batch_by_range(&["b", "d"], 0, 20),
700 new_batch_by_range(&["c", "d"], 0, 20),
701 new_batch_by_range(&["c", "f"], 0, 40),
702 new_batch_by_range(&["c", "h"], 100, 200),
703 ]);
704 let write_opts = WriteOptions {
706 row_group_size,
707 ..Default::default()
708 };
709
710 let puffin_manager = env
711 .get_puffin_manager()
712 .build(object_store.clone(), file_path.clone());
713 let intermediate_manager = env.get_intermediate_manager();
714
715 let indexer_builder = IndexerBuilderImpl {
716 build_type: IndexBuildType::Flush,
717 metadata: metadata.clone(),
718 row_group_size,
719 puffin_manager,
720 write_cache_enabled: false,
721 intermediate_manager,
722 index_options: IndexOptions {
723 inverted_index: InvertedIndexOptions {
724 segment_row_count: 1,
725 ..Default::default()
726 },
727 },
728 inverted_index_config: Default::default(),
729 fulltext_index_config: Default::default(),
730 bloom_filter_index_config: Default::default(),
731 };
732
733 let mut metrics = Metrics::new(WriteType::Flush);
734 let mut writer = ParquetWriter::new_with_object_store(
735 object_store.clone(),
736 metadata.clone(),
737 IndexConfig::default(),
738 indexer_builder,
739 file_path.clone(),
740 &mut metrics,
741 )
742 .await;
743
744 let info = writer
745 .write_all(source, None, &write_opts)
746 .await
747 .unwrap()
748 .remove(0);
749 assert_eq!(200, info.num_rows);
750 assert!(info.file_size > 0);
751 assert!(info.index_metadata.file_size > 0);
752
753 assert!(info.index_metadata.inverted_index.index_size > 0);
754 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
755 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
756
757 assert!(info.index_metadata.bloom_filter.index_size > 0);
758 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
759 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
760
761 assert_eq!(
762 (
763 Timestamp::new_millisecond(0),
764 Timestamp::new_millisecond(199)
765 ),
766 info.time_range
767 );
768
769 let handle = FileHandle::new(
770 FileMeta {
771 region_id: metadata.region_id,
772 file_id: info.file_id,
773 time_range: info.time_range,
774 level: 0,
775 file_size: info.file_size,
776 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
777 available_indexes: info.index_metadata.build_available_indexes(),
778 indexes: info.index_metadata.build_indexes(),
779 index_file_size: info.index_metadata.file_size,
780 index_version: 0,
781 num_row_groups: info.num_row_groups,
782 num_rows: info.num_rows as u64,
783 sequence: None,
784 partition_expr: match &metadata.partition_expr {
785 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
786 .expect("partition expression should be valid JSON"),
787 None => None,
788 },
789 num_series: 0,
790 },
791 Arc::new(NoopFilePurger),
792 );
793
794 let cache = Arc::new(
795 CacheManager::builder()
796 .index_result_cache_size(1024 * 1024)
797 .index_metadata_size(1024 * 1024)
798 .index_content_page_size(1024 * 1024)
799 .index_content_size(1024 * 1024)
800 .puffin_metadata_size(1024 * 1024)
801 .build(),
802 );
803 let index_result_cache = cache.index_result_cache().unwrap();
804
805 let build_inverted_index_applier = |exprs: &[Expr]| {
806 InvertedIndexApplierBuilder::new(
807 FILE_DIR.to_string(),
808 PathType::Bare,
809 object_store.clone(),
810 &metadata,
811 HashSet::from_iter([0]),
812 env.get_puffin_manager(),
813 )
814 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
815 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
816 .build(exprs)
817 .unwrap()
818 .map(Arc::new)
819 };
820
821 let build_bloom_filter_applier = |exprs: &[Expr]| {
822 BloomFilterIndexApplierBuilder::new(
823 FILE_DIR.to_string(),
824 PathType::Bare,
825 object_store.clone(),
826 &metadata,
827 env.get_puffin_manager(),
828 )
829 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
830 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
831 .build(exprs)
832 .unwrap()
833 .map(Arc::new)
834 };
835
836 let preds = vec![col("tag_0").eq(lit("b"))];
853 let inverted_index_applier = build_inverted_index_applier(&preds);
854 let bloom_filter_applier = build_bloom_filter_applier(&preds);
855
856 let builder = ParquetReaderBuilder::new(
857 FILE_DIR.to_string(),
858 PathType::Bare,
859 handle.clone(),
860 object_store.clone(),
861 )
862 .predicate(Some(Predicate::new(preds)))
863 .inverted_index_appliers([inverted_index_applier.clone(), None])
864 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
865 .cache(CacheStrategy::EnableAll(cache.clone()));
866
867 let mut metrics = ReaderMetrics::default();
868 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
869 let mut reader = ParquetReader::new(Arc::new(context), selection)
870 .await
871 .unwrap();
872 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
873
874 assert_eq!(metrics.filter_metrics.rg_total, 4);
875 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
876 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
877 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
878 let cached = index_result_cache
879 .get(
880 inverted_index_applier.unwrap().predicate_key(),
881 handle.file_id().file_id(),
882 )
883 .unwrap();
884 assert!(cached.contains_row_group(0));
886 assert!(cached.contains_row_group(1));
887 assert!(cached.contains_row_group(2));
888 assert!(cached.contains_row_group(3));
889
890 let preds = vec![
905 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
906 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
907 col("tag_1").eq(lit("d")),
908 ];
909 let inverted_index_applier = build_inverted_index_applier(&preds);
910 let bloom_filter_applier = build_bloom_filter_applier(&preds);
911
912 let builder = ParquetReaderBuilder::new(
913 FILE_DIR.to_string(),
914 PathType::Bare,
915 handle.clone(),
916 object_store.clone(),
917 )
918 .predicate(Some(Predicate::new(preds)))
919 .inverted_index_appliers([inverted_index_applier.clone(), None])
920 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
921 .cache(CacheStrategy::EnableAll(cache.clone()));
922
923 let mut metrics = ReaderMetrics::default();
924 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
925 let mut reader = ParquetReader::new(Arc::new(context), selection)
926 .await
927 .unwrap();
928 check_reader_result(&mut reader, &[]).await;
929
930 assert_eq!(metrics.filter_metrics.rg_total, 4);
931 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
932 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
933 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
934 let cached = index_result_cache
935 .get(
936 bloom_filter_applier.unwrap().predicate_key(),
937 handle.file_id().file_id(),
938 )
939 .unwrap();
940 assert!(cached.contains_row_group(2));
941 assert!(cached.contains_row_group(3));
942 assert!(!cached.contains_row_group(0));
943 assert!(!cached.contains_row_group(1));
944
945 let preds = vec![col("tag_1").eq(lit("d"))];
966 let inverted_index_applier = build_inverted_index_applier(&preds);
967 let bloom_filter_applier = build_bloom_filter_applier(&preds);
968
969 let builder = ParquetReaderBuilder::new(
970 FILE_DIR.to_string(),
971 PathType::Bare,
972 handle.clone(),
973 object_store.clone(),
974 )
975 .predicate(Some(Predicate::new(preds)))
976 .inverted_index_appliers([inverted_index_applier.clone(), None])
977 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
978 .cache(CacheStrategy::EnableAll(cache.clone()));
979
980 let mut metrics = ReaderMetrics::default();
981 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
982 let mut reader = ParquetReader::new(Arc::new(context), selection)
983 .await
984 .unwrap();
985 check_reader_result(
986 &mut reader,
987 &[
988 new_batch_by_range(&["a", "d"], 0, 20),
989 new_batch_by_range(&["b", "d"], 0, 20),
990 new_batch_by_range(&["c", "d"], 0, 10),
991 new_batch_by_range(&["c", "d"], 10, 20),
992 ],
993 )
994 .await;
995
996 assert_eq!(metrics.filter_metrics.rg_total, 4);
997 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
998 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
999 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1000 let cached = index_result_cache
1001 .get(
1002 bloom_filter_applier.unwrap().predicate_key(),
1003 handle.file_id().file_id(),
1004 )
1005 .unwrap();
1006 assert!(cached.contains_row_group(0));
1007 assert!(cached.contains_row_group(1));
1008 assert!(cached.contains_row_group(2));
1009 assert!(cached.contains_row_group(3));
1010 }
1011
1012 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1015 assert!(end >= start);
1016 let metadata = Arc::new(sst_region_metadata());
1017 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1018
1019 let num_rows = end - start;
1020 let mut columns = Vec::new();
1021
1022 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1024 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1025
1026 for _ in 0..num_rows {
1027 tag_0_builder.append_value(tags[0]);
1028 tag_1_builder.append_value(tags[1]);
1029 }
1030
1031 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1032 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1033
1034 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1036 columns.push(Arc::new(UInt64Array::from(field_values)));
1037
1038 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1040 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1041
1042 let pk = new_primary_key(tags);
1044 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1045 for _ in 0..num_rows {
1046 pk_builder.append(&pk).unwrap();
1047 }
1048 columns.push(Arc::new(pk_builder.finish()));
1049
1050 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1052
1053 columns.push(Arc::new(UInt8Array::from_value(
1055 OpType::Put as u8,
1056 num_rows,
1057 )));
1058
1059 RecordBatch::try_new(flat_schema, columns).unwrap()
1060 }
1061
1062 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1064 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1065 }
1066
1067 #[tokio::test]
1068 async fn test_write_flat_with_index() {
1069 let mut env = TestEnv::new().await;
1070 let object_store = env.init_object_store_manager();
1071 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1072 let metadata = Arc::new(sst_region_metadata());
1073 let row_group_size = 50;
1074
1075 let flat_batches = vec![
1077 new_record_batch_by_range(&["a", "d"], 0, 20),
1078 new_record_batch_by_range(&["b", "d"], 0, 20),
1079 new_record_batch_by_range(&["c", "d"], 0, 20),
1080 new_record_batch_by_range(&["c", "f"], 0, 40),
1081 new_record_batch_by_range(&["c", "h"], 100, 200),
1082 ];
1083
1084 let flat_source = new_flat_source_from_record_batches(flat_batches);
1085
1086 let write_opts = WriteOptions {
1087 row_group_size,
1088 ..Default::default()
1089 };
1090
1091 let puffin_manager = env
1092 .get_puffin_manager()
1093 .build(object_store.clone(), file_path.clone());
1094 let intermediate_manager = env.get_intermediate_manager();
1095
1096 let indexer_builder = IndexerBuilderImpl {
1097 build_type: IndexBuildType::Flush,
1098 metadata: metadata.clone(),
1099 row_group_size,
1100 puffin_manager,
1101 write_cache_enabled: false,
1102 intermediate_manager,
1103 index_options: IndexOptions {
1104 inverted_index: InvertedIndexOptions {
1105 segment_row_count: 1,
1106 ..Default::default()
1107 },
1108 },
1109 inverted_index_config: Default::default(),
1110 fulltext_index_config: Default::default(),
1111 bloom_filter_index_config: Default::default(),
1112 };
1113
1114 let mut metrics = Metrics::new(WriteType::Flush);
1115 let mut writer = ParquetWriter::new_with_object_store(
1116 object_store.clone(),
1117 metadata.clone(),
1118 IndexConfig::default(),
1119 indexer_builder,
1120 file_path.clone(),
1121 &mut metrics,
1122 )
1123 .await;
1124
1125 let info = writer
1126 .write_all_flat(flat_source, &write_opts)
1127 .await
1128 .unwrap()
1129 .remove(0);
1130 assert_eq!(200, info.num_rows);
1131 assert!(info.file_size > 0);
1132 assert!(info.index_metadata.file_size > 0);
1133
1134 assert!(info.index_metadata.inverted_index.index_size > 0);
1135 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1136 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1137
1138 assert!(info.index_metadata.bloom_filter.index_size > 0);
1139 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1140 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1141
1142 assert_eq!(
1143 (
1144 Timestamp::new_millisecond(0),
1145 Timestamp::new_millisecond(199)
1146 ),
1147 info.time_range
1148 );
1149 }
1150
1151 #[tokio::test]
1152 async fn test_read_with_override_sequence() {
1153 let mut env = TestEnv::new().await;
1154 let object_store = env.init_object_store_manager();
1155 let handle = sst_file_handle(0, 1000);
1156 let file_path = FixedPathProvider {
1157 region_file_id: handle.file_id(),
1158 };
1159 let metadata = Arc::new(sst_region_metadata());
1160
1161 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1163 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1164 let source = new_source(&[batch1, batch2]);
1165
1166 let write_opts = WriteOptions {
1167 row_group_size: 50,
1168 ..Default::default()
1169 };
1170
1171 let mut metrics = Metrics::new(WriteType::Flush);
1172 let mut writer = ParquetWriter::new_with_object_store(
1173 object_store.clone(),
1174 metadata.clone(),
1175 IndexConfig::default(),
1176 NoopIndexBuilder,
1177 file_path,
1178 &mut metrics,
1179 )
1180 .await;
1181
1182 writer
1183 .write_all(source, None, &write_opts)
1184 .await
1185 .unwrap()
1186 .remove(0);
1187
1188 let builder = ParquetReaderBuilder::new(
1190 FILE_DIR.to_string(),
1191 PathType::Bare,
1192 handle.clone(),
1193 object_store.clone(),
1194 );
1195 let mut reader = builder.build().await.unwrap();
1196 let mut normal_batches = Vec::new();
1197 while let Some(batch) = reader.next_batch().await.unwrap() {
1198 normal_batches.push(batch);
1199 }
1200
1201 let custom_sequence = 12345u64;
1203 let file_meta = handle.meta_ref();
1204 let mut override_file_meta = file_meta.clone();
1205 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1206 let override_handle = FileHandle::new(
1207 override_file_meta,
1208 Arc::new(crate::sst::file_purger::NoopFilePurger),
1209 );
1210
1211 let builder = ParquetReaderBuilder::new(
1212 FILE_DIR.to_string(),
1213 PathType::Bare,
1214 override_handle,
1215 object_store.clone(),
1216 );
1217 let mut reader = builder.build().await.unwrap();
1218 let mut override_batches = Vec::new();
1219 while let Some(batch) = reader.next_batch().await.unwrap() {
1220 override_batches.push(batch);
1221 }
1222
1223 assert_eq!(normal_batches.len(), override_batches.len());
1225 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1226 let expected_batch = {
1228 let num_rows = normal.num_rows();
1229 let mut builder = BatchBuilder::from(normal);
1230 builder
1231 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1232 .unwrap();
1233
1234 builder.build().unwrap()
1235 };
1236
1237 assert_eq!(*override_batch, expected_batch);
1239 }
1240 }
1241}