1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub num_rows: usize,
81 pub num_row_groups: u64,
83 pub file_metadata: Option<Arc<ParquetMetaData>>,
85 pub index_metadata: IndexOutput,
87 pub num_series: u64,
89}
90
91#[cfg(test)]
92mod tests {
93 use std::collections::HashSet;
94 use std::sync::Arc;
95
96 use api::v1::OpType;
97 use common_time::Timestamp;
98 use datafusion_common::{Column, ScalarValue};
99 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
100 use datatypes::arrow;
101 use datatypes::arrow::array::{
102 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
103 TimestampMillisecondArray, UInt8Array, UInt64Array,
104 };
105 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
106 use parquet::arrow::AsyncArrowWriter;
107 use parquet::basic::{Compression, Encoding, ZstdLevel};
108 use parquet::file::metadata::KeyValue;
109 use parquet::file::properties::WriterProperties;
110 use store_api::region_request::PathType;
111 use table::predicate::Predicate;
112 use tokio_util::compat::FuturesAsyncWriteCompatExt;
113
114 use super::*;
115 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
116 use crate::cache::{CacheManager, CacheStrategy, PageKey};
117 use crate::config::IndexConfig;
118 use crate::read::{BatchBuilder, BatchReader, FlatSource};
119 use crate::region::options::{IndexOptions, InvertedIndexOptions};
120 use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
121 use crate::sst::file_purger::NoopFilePurger;
122 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
123 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
124 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
125 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
126 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
127 use crate::sst::parquet::writer::ParquetWriter;
128 use crate::sst::{
129 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
130 };
131 use crate::test_util::sst_util::{
132 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
133 new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
134 sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
135 };
136 use crate::test_util::{TestEnv, check_reader_result};
137
138 const FILE_DIR: &str = "/";
139
140 #[derive(Clone)]
141 struct FixedPathProvider {
142 region_file_id: RegionFileId,
143 }
144
145 impl FilePathProvider for FixedPathProvider {
146 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
147 location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
148 }
149
150 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
151 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
152 }
153 }
154
155 struct NoopIndexBuilder;
156
157 #[async_trait::async_trait]
158 impl IndexerBuilder for NoopIndexBuilder {
159 async fn build(&self, _file_id: FileId) -> Indexer {
160 Indexer::default()
161 }
162 }
163
164 #[tokio::test]
165 async fn test_write_read() {
166 let mut env = TestEnv::new().await;
167 let object_store = env.init_object_store_manager();
168 let handle = sst_file_handle(0, 1000);
169 let file_path = FixedPathProvider {
170 region_file_id: handle.file_id(),
171 };
172 let metadata = Arc::new(sst_region_metadata());
173 let source = new_source(&[
174 new_batch_by_range(&["a", "d"], 0, 60),
175 new_batch_by_range(&["b", "f"], 0, 40),
176 new_batch_by_range(&["b", "h"], 100, 200),
177 ]);
178 let write_opts = WriteOptions {
180 row_group_size: 50,
181 ..Default::default()
182 };
183
184 let mut metrics = Metrics::new(WriteType::Flush);
185 let mut writer = ParquetWriter::new_with_object_store(
186 object_store.clone(),
187 metadata.clone(),
188 IndexConfig::default(),
189 NoopIndexBuilder,
190 file_path,
191 &mut metrics,
192 )
193 .await;
194
195 let info = writer
196 .write_all(source, None, &write_opts)
197 .await
198 .unwrap()
199 .remove(0);
200 assert_eq!(200, info.num_rows);
201 assert!(info.file_size > 0);
202 assert_eq!(
203 (
204 Timestamp::new_millisecond(0),
205 Timestamp::new_millisecond(199)
206 ),
207 info.time_range
208 );
209
210 let builder = ParquetReaderBuilder::new(
211 FILE_DIR.to_string(),
212 PathType::Bare,
213 handle.clone(),
214 object_store,
215 );
216 let mut reader = builder.build().await.unwrap();
217 check_reader_result(
218 &mut reader,
219 &[
220 new_batch_by_range(&["a", "d"], 0, 50),
221 new_batch_by_range(&["a", "d"], 50, 60),
222 new_batch_by_range(&["b", "f"], 0, 40),
223 new_batch_by_range(&["b", "h"], 100, 150),
224 new_batch_by_range(&["b", "h"], 150, 200),
225 ],
226 )
227 .await;
228 }
229
230 #[tokio::test]
231 async fn test_read_with_cache() {
232 let mut env = TestEnv::new().await;
233 let object_store = env.init_object_store_manager();
234 let handle = sst_file_handle(0, 1000);
235 let metadata = Arc::new(sst_region_metadata());
236 let source = new_source(&[
237 new_batch_by_range(&["a", "d"], 0, 60),
238 new_batch_by_range(&["b", "f"], 0, 40),
239 new_batch_by_range(&["b", "h"], 100, 200),
240 ]);
241 let write_opts = WriteOptions {
243 row_group_size: 50,
244 ..Default::default()
245 };
246 let mut metrics = Metrics::new(WriteType::Flush);
248 let mut writer = ParquetWriter::new_with_object_store(
249 object_store.clone(),
250 metadata.clone(),
251 IndexConfig::default(),
252 NoopIndexBuilder,
253 FixedPathProvider {
254 region_file_id: handle.file_id(),
255 },
256 &mut metrics,
257 )
258 .await;
259
260 let sst_info = writer
261 .write_all(source, None, &write_opts)
262 .await
263 .unwrap()
264 .remove(0);
265
266 let cache = CacheStrategy::EnableAll(Arc::new(
268 CacheManager::builder()
269 .page_cache_size(64 * 1024 * 1024)
270 .build(),
271 ));
272 let builder = ParquetReaderBuilder::new(
273 FILE_DIR.to_string(),
274 PathType::Bare,
275 handle.clone(),
276 object_store,
277 )
278 .cache(cache.clone());
279 for _ in 0..3 {
280 let mut reader = builder.build().await.unwrap();
281 check_reader_result(
282 &mut reader,
283 &[
284 new_batch_by_range(&["a", "d"], 0, 50),
285 new_batch_by_range(&["a", "d"], 50, 60),
286 new_batch_by_range(&["b", "f"], 0, 40),
287 new_batch_by_range(&["b", "h"], 100, 150),
288 new_batch_by_range(&["b", "h"], 150, 200),
289 ],
290 )
291 .await;
292 }
293
294 let parquet_meta = sst_info.file_metadata.unwrap();
295 let get_ranges = |row_group_idx: usize| {
296 let row_group = parquet_meta.row_group(row_group_idx);
297 let mut ranges = Vec::with_capacity(row_group.num_columns());
298 for i in 0..row_group.num_columns() {
299 let (start, length) = row_group.column(i).byte_range();
300 ranges.push(start..start + length);
301 }
302
303 ranges
304 };
305
306 for i in 0..4 {
308 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
309 assert!(cache.get_pages(&page_key).is_some());
310 }
311 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
312 assert!(cache.get_pages(&page_key).is_none());
313 }
314
315 #[tokio::test]
316 async fn test_parquet_metadata_eq() {
317 let mut env = crate::test_util::TestEnv::new().await;
319 let object_store = env.init_object_store_manager();
320 let handle = sst_file_handle(0, 1000);
321 let metadata = Arc::new(sst_region_metadata());
322 let source = new_source(&[
323 new_batch_by_range(&["a", "d"], 0, 60),
324 new_batch_by_range(&["b", "f"], 0, 40),
325 new_batch_by_range(&["b", "h"], 100, 200),
326 ]);
327 let write_opts = WriteOptions {
328 row_group_size: 50,
329 ..Default::default()
330 };
331
332 let mut metrics = Metrics::new(WriteType::Flush);
335 let mut writer = ParquetWriter::new_with_object_store(
336 object_store.clone(),
337 metadata.clone(),
338 IndexConfig::default(),
339 NoopIndexBuilder,
340 FixedPathProvider {
341 region_file_id: handle.file_id(),
342 },
343 &mut metrics,
344 )
345 .await;
346
347 let sst_info = writer
348 .write_all(source, None, &write_opts)
349 .await
350 .unwrap()
351 .remove(0);
352 let writer_metadata = sst_info.file_metadata.unwrap();
353
354 let builder = ParquetReaderBuilder::new(
356 FILE_DIR.to_string(),
357 PathType::Bare,
358 handle.clone(),
359 object_store,
360 );
361 let reader = builder.build().await.unwrap();
362 let reader_metadata = reader.parquet_metadata();
363
364 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
365 }
366
367 #[tokio::test]
368 async fn test_read_with_tag_filter() {
369 let mut env = TestEnv::new().await;
370 let object_store = env.init_object_store_manager();
371 let handle = sst_file_handle(0, 1000);
372 let metadata = Arc::new(sst_region_metadata());
373 let source = new_source(&[
374 new_batch_by_range(&["a", "d"], 0, 60),
375 new_batch_by_range(&["b", "f"], 0, 40),
376 new_batch_by_range(&["b", "h"], 100, 200),
377 ]);
378 let write_opts = WriteOptions {
380 row_group_size: 50,
381 ..Default::default()
382 };
383 let mut metrics = Metrics::new(WriteType::Flush);
385 let mut writer = ParquetWriter::new_with_object_store(
386 object_store.clone(),
387 metadata.clone(),
388 IndexConfig::default(),
389 NoopIndexBuilder,
390 FixedPathProvider {
391 region_file_id: handle.file_id(),
392 },
393 &mut metrics,
394 )
395 .await;
396 writer
397 .write_all(source, None, &write_opts)
398 .await
399 .unwrap()
400 .remove(0);
401
402 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
404 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
405 op: Operator::Eq,
406 right: Box::new("a".lit()),
407 })]));
408
409 let builder = ParquetReaderBuilder::new(
410 FILE_DIR.to_string(),
411 PathType::Bare,
412 handle.clone(),
413 object_store,
414 )
415 .predicate(predicate);
416 let mut reader = builder.build().await.unwrap();
417 check_reader_result(
418 &mut reader,
419 &[
420 new_batch_by_range(&["a", "d"], 0, 50),
421 new_batch_by_range(&["a", "d"], 50, 60),
422 ],
423 )
424 .await;
425 }
426
427 #[tokio::test]
428 async fn test_read_empty_batch() {
429 let mut env = TestEnv::new().await;
430 let object_store = env.init_object_store_manager();
431 let handle = sst_file_handle(0, 1000);
432 let metadata = Arc::new(sst_region_metadata());
433 let source = new_source(&[
434 new_batch_by_range(&["a", "z"], 0, 0),
435 new_batch_by_range(&["a", "z"], 100, 100),
436 new_batch_by_range(&["a", "z"], 200, 230),
437 ]);
438 let write_opts = WriteOptions {
440 row_group_size: 50,
441 ..Default::default()
442 };
443 let mut metrics = Metrics::new(WriteType::Flush);
445 let mut writer = ParquetWriter::new_with_object_store(
446 object_store.clone(),
447 metadata.clone(),
448 IndexConfig::default(),
449 NoopIndexBuilder,
450 FixedPathProvider {
451 region_file_id: handle.file_id(),
452 },
453 &mut metrics,
454 )
455 .await;
456 writer
457 .write_all(source, None, &write_opts)
458 .await
459 .unwrap()
460 .remove(0);
461
462 let builder = ParquetReaderBuilder::new(
463 FILE_DIR.to_string(),
464 PathType::Bare,
465 handle.clone(),
466 object_store,
467 );
468 let mut reader = builder.build().await.unwrap();
469 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
470 }
471
472 #[tokio::test]
473 async fn test_read_with_field_filter() {
474 let mut env = TestEnv::new().await;
475 let object_store = env.init_object_store_manager();
476 let handle = sst_file_handle(0, 1000);
477 let metadata = Arc::new(sst_region_metadata());
478 let source = new_source(&[
479 new_batch_by_range(&["a", "d"], 0, 60),
480 new_batch_by_range(&["b", "f"], 0, 40),
481 new_batch_by_range(&["b", "h"], 100, 200),
482 ]);
483 let write_opts = WriteOptions {
485 row_group_size: 50,
486 ..Default::default()
487 };
488 let mut metrics = Metrics::new(WriteType::Flush);
490 let mut writer = ParquetWriter::new_with_object_store(
491 object_store.clone(),
492 metadata.clone(),
493 IndexConfig::default(),
494 NoopIndexBuilder,
495 FixedPathProvider {
496 region_file_id: handle.file_id(),
497 },
498 &mut metrics,
499 )
500 .await;
501
502 writer
503 .write_all(source, None, &write_opts)
504 .await
505 .unwrap()
506 .remove(0);
507
508 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
510 left: Box::new(Expr::Column(Column::from_name("field_0"))),
511 op: Operator::GtEq,
512 right: Box::new(150u64.lit()),
513 })]));
514
515 let builder = ParquetReaderBuilder::new(
516 FILE_DIR.to_string(),
517 PathType::Bare,
518 handle.clone(),
519 object_store,
520 )
521 .predicate(predicate);
522 let mut reader = builder.build().await.unwrap();
523 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
524 }
525
526 #[tokio::test]
527 async fn test_read_large_binary() {
528 let mut env = TestEnv::new().await;
529 let object_store = env.init_object_store_manager();
530 let handle = sst_file_handle(0, 1000);
531 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
532
533 let write_opts = WriteOptions {
534 row_group_size: 50,
535 ..Default::default()
536 };
537
538 let metadata = build_test_binary_test_region_metadata();
539 let json = metadata.to_json().unwrap();
540 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
541
542 let props_builder = WriterProperties::builder()
543 .set_key_value_metadata(Some(vec![key_value_meta]))
544 .set_compression(Compression::ZSTD(ZstdLevel::default()))
545 .set_encoding(Encoding::PLAIN)
546 .set_max_row_group_size(write_opts.row_group_size);
547
548 let writer_props = props_builder.build();
549
550 let write_format = PrimaryKeyWriteFormat::new(metadata);
551 let fields: Vec<_> = write_format
552 .arrow_schema()
553 .fields()
554 .into_iter()
555 .map(|field| {
556 let data_type = field.data_type().clone();
557 if data_type == DataType::Binary {
558 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
559 } else {
560 Field::new(field.name(), data_type, field.is_nullable())
561 }
562 })
563 .collect();
564
565 let arrow_schema = Arc::new(Schema::new(fields));
566
567 assert_eq!(
569 &DataType::LargeBinary,
570 arrow_schema.field_with_name("field_0").unwrap().data_type()
571 );
572 let mut writer = AsyncArrowWriter::try_new(
573 object_store
574 .writer_with(&file_path)
575 .concurrent(DEFAULT_WRITE_CONCURRENCY)
576 .await
577 .map(|w| w.into_futures_async_write().compat_write())
578 .unwrap(),
579 arrow_schema.clone(),
580 Some(writer_props),
581 )
582 .unwrap();
583
584 let batch = new_batch_with_binary(&["a"], 0, 60);
585 let arrow_batch = write_format.convert_batch(&batch).unwrap();
586 let arrays: Vec<_> = arrow_batch
587 .columns()
588 .iter()
589 .map(|array| {
590 let data_type = array.data_type().clone();
591 if data_type == DataType::Binary {
592 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
593 } else {
594 array.clone()
595 }
596 })
597 .collect();
598 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
599
600 writer.write(&result).await.unwrap();
601 writer.close().await.unwrap();
602
603 let builder = ParquetReaderBuilder::new(
604 FILE_DIR.to_string(),
605 PathType::Bare,
606 handle.clone(),
607 object_store,
608 );
609 let mut reader = builder.build().await.unwrap();
610 check_reader_result(
611 &mut reader,
612 &[
613 new_batch_with_binary(&["a"], 0, 50),
614 new_batch_with_binary(&["a"], 50, 60),
615 ],
616 )
617 .await;
618 }
619
620 #[tokio::test]
621 async fn test_write_multiple_files() {
622 common_telemetry::init_default_ut_logging();
623 let mut env = TestEnv::new().await;
625 let object_store = env.init_object_store_manager();
626 let metadata = Arc::new(sst_region_metadata());
627 let batches = &[
628 new_batch_by_range(&["a", "d"], 0, 1000),
629 new_batch_by_range(&["b", "f"], 0, 1000),
630 new_batch_by_range(&["c", "g"], 0, 1000),
631 new_batch_by_range(&["b", "h"], 100, 200),
632 new_batch_by_range(&["b", "h"], 200, 300),
633 new_batch_by_range(&["b", "h"], 300, 1000),
634 ];
635 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
636
637 let source = new_source(batches);
638 let write_opts = WriteOptions {
639 row_group_size: 50,
640 max_file_size: Some(1024 * 16),
641 ..Default::default()
642 };
643
644 let path_provider = RegionFilePathFactory {
645 table_dir: "test".to_string(),
646 path_type: PathType::Bare,
647 };
648 let mut metrics = Metrics::new(WriteType::Flush);
649 let mut writer = ParquetWriter::new_with_object_store(
650 object_store.clone(),
651 metadata.clone(),
652 IndexConfig::default(),
653 NoopIndexBuilder,
654 path_provider,
655 &mut metrics,
656 )
657 .await;
658
659 let files = writer.write_all(source, None, &write_opts).await.unwrap();
660 assert_eq!(2, files.len());
661
662 let mut rows_read = 0;
663 for f in &files {
664 let file_handle = sst_file_handle_with_file_id(
665 f.file_id,
666 f.time_range.0.value(),
667 f.time_range.1.value(),
668 );
669 let builder = ParquetReaderBuilder::new(
670 "test".to_string(),
671 PathType::Bare,
672 file_handle,
673 object_store.clone(),
674 );
675 let mut reader = builder.build().await.unwrap();
676 while let Some(batch) = reader.next_batch().await.unwrap() {
677 rows_read += batch.num_rows();
678 }
679 }
680 assert_eq!(total_rows, rows_read);
681 }
682
683 #[tokio::test]
684 async fn test_write_read_with_index() {
685 let mut env = TestEnv::new().await;
686 let object_store = env.init_object_store_manager();
687 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
688 let metadata = Arc::new(sst_region_metadata());
689 let row_group_size = 50;
690
691 let source = new_source(&[
692 new_batch_by_range(&["a", "d"], 0, 20),
693 new_batch_by_range(&["b", "d"], 0, 20),
694 new_batch_by_range(&["c", "d"], 0, 20),
695 new_batch_by_range(&["c", "f"], 0, 40),
696 new_batch_by_range(&["c", "h"], 100, 200),
697 ]);
698 let write_opts = WriteOptions {
700 row_group_size,
701 ..Default::default()
702 };
703
704 let puffin_manager = env
705 .get_puffin_manager()
706 .build(object_store.clone(), file_path.clone());
707 let intermediate_manager = env.get_intermediate_manager();
708
709 let indexer_builder = IndexerBuilderImpl {
710 build_type: IndexBuildType::Flush,
711 metadata: metadata.clone(),
712 row_group_size,
713 puffin_manager,
714 intermediate_manager,
715 index_options: IndexOptions {
716 inverted_index: InvertedIndexOptions {
717 segment_row_count: 1,
718 ..Default::default()
719 },
720 },
721 inverted_index_config: Default::default(),
722 fulltext_index_config: Default::default(),
723 bloom_filter_index_config: Default::default(),
724 };
725
726 let mut metrics = Metrics::new(WriteType::Flush);
727 let mut writer = ParquetWriter::new_with_object_store(
728 object_store.clone(),
729 metadata.clone(),
730 IndexConfig::default(),
731 indexer_builder,
732 file_path.clone(),
733 &mut metrics,
734 )
735 .await;
736
737 let info = writer
738 .write_all(source, None, &write_opts)
739 .await
740 .unwrap()
741 .remove(0);
742 assert_eq!(200, info.num_rows);
743 assert!(info.file_size > 0);
744 assert!(info.index_metadata.file_size > 0);
745
746 assert!(info.index_metadata.inverted_index.index_size > 0);
747 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
748 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
749
750 assert!(info.index_metadata.bloom_filter.index_size > 0);
751 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
752 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
753
754 assert_eq!(
755 (
756 Timestamp::new_millisecond(0),
757 Timestamp::new_millisecond(199)
758 ),
759 info.time_range
760 );
761
762 let handle = FileHandle::new(
763 FileMeta {
764 region_id: metadata.region_id,
765 file_id: info.file_id,
766 time_range: info.time_range,
767 level: 0,
768 file_size: info.file_size,
769 available_indexes: info.index_metadata.build_available_indexes(),
770 index_file_size: info.index_metadata.file_size,
771 index_file_id: None,
772 num_row_groups: info.num_row_groups,
773 num_rows: info.num_rows as u64,
774 sequence: None,
775 partition_expr: match &metadata.partition_expr {
776 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
777 .expect("partition expression should be valid JSON"),
778 None => None,
779 },
780 num_series: 0,
781 },
782 Arc::new(NoopFilePurger),
783 );
784
785 let cache = Arc::new(
786 CacheManager::builder()
787 .index_result_cache_size(1024 * 1024)
788 .index_metadata_size(1024 * 1024)
789 .index_content_page_size(1024 * 1024)
790 .index_content_size(1024 * 1024)
791 .puffin_metadata_size(1024 * 1024)
792 .build(),
793 );
794 let index_result_cache = cache.index_result_cache().unwrap();
795
796 let build_inverted_index_applier = |exprs: &[Expr]| {
797 InvertedIndexApplierBuilder::new(
798 FILE_DIR.to_string(),
799 PathType::Bare,
800 object_store.clone(),
801 &metadata,
802 HashSet::from_iter([0]),
803 env.get_puffin_manager(),
804 )
805 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
806 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
807 .build(exprs)
808 .unwrap()
809 .map(Arc::new)
810 };
811
812 let build_bloom_filter_applier = |exprs: &[Expr]| {
813 BloomFilterIndexApplierBuilder::new(
814 FILE_DIR.to_string(),
815 PathType::Bare,
816 object_store.clone(),
817 &metadata,
818 env.get_puffin_manager(),
819 )
820 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
821 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
822 .build(exprs)
823 .unwrap()
824 .map(Arc::new)
825 };
826
827 let preds = vec![col("tag_0").eq(lit("b"))];
844 let inverted_index_applier = build_inverted_index_applier(&preds);
845 let bloom_filter_applier = build_bloom_filter_applier(&preds);
846
847 let builder = ParquetReaderBuilder::new(
848 FILE_DIR.to_string(),
849 PathType::Bare,
850 handle.clone(),
851 object_store.clone(),
852 )
853 .predicate(Some(Predicate::new(preds)))
854 .inverted_index_appliers([inverted_index_applier.clone(), None])
855 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
856 .cache(CacheStrategy::EnableAll(cache.clone()));
857
858 let mut metrics = ReaderMetrics::default();
859 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
860 let mut reader = ParquetReader::new(Arc::new(context), selection)
861 .await
862 .unwrap();
863 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
864
865 assert_eq!(metrics.filter_metrics.rg_total, 4);
866 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
867 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
868 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
869 let cached = index_result_cache
870 .get(
871 inverted_index_applier.unwrap().predicate_key(),
872 handle.file_id().file_id(),
873 )
874 .unwrap();
875 assert!(cached.contains_row_group(0));
877 assert!(cached.contains_row_group(1));
878 assert!(cached.contains_row_group(2));
879 assert!(cached.contains_row_group(3));
880
881 let preds = vec![
896 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
897 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
898 col("tag_1").eq(lit("d")),
899 ];
900 let inverted_index_applier = build_inverted_index_applier(&preds);
901 let bloom_filter_applier = build_bloom_filter_applier(&preds);
902
903 let builder = ParquetReaderBuilder::new(
904 FILE_DIR.to_string(),
905 PathType::Bare,
906 handle.clone(),
907 object_store.clone(),
908 )
909 .predicate(Some(Predicate::new(preds)))
910 .inverted_index_appliers([inverted_index_applier.clone(), None])
911 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
912 .cache(CacheStrategy::EnableAll(cache.clone()));
913
914 let mut metrics = ReaderMetrics::default();
915 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
916 let mut reader = ParquetReader::new(Arc::new(context), selection)
917 .await
918 .unwrap();
919 check_reader_result(&mut reader, &[]).await;
920
921 assert_eq!(metrics.filter_metrics.rg_total, 4);
922 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
923 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
924 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
925 let cached = index_result_cache
926 .get(
927 bloom_filter_applier.unwrap().predicate_key(),
928 handle.file_id().file_id(),
929 )
930 .unwrap();
931 assert!(cached.contains_row_group(2));
932 assert!(cached.contains_row_group(3));
933 assert!(!cached.contains_row_group(0));
934 assert!(!cached.contains_row_group(1));
935
936 let preds = vec![col("tag_1").eq(lit("d"))];
957 let inverted_index_applier = build_inverted_index_applier(&preds);
958 let bloom_filter_applier = build_bloom_filter_applier(&preds);
959
960 let builder = ParquetReaderBuilder::new(
961 FILE_DIR.to_string(),
962 PathType::Bare,
963 handle.clone(),
964 object_store.clone(),
965 )
966 .predicate(Some(Predicate::new(preds)))
967 .inverted_index_appliers([inverted_index_applier.clone(), None])
968 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
969 .cache(CacheStrategy::EnableAll(cache.clone()));
970
971 let mut metrics = ReaderMetrics::default();
972 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
973 let mut reader = ParquetReader::new(Arc::new(context), selection)
974 .await
975 .unwrap();
976 check_reader_result(
977 &mut reader,
978 &[
979 new_batch_by_range(&["a", "d"], 0, 20),
980 new_batch_by_range(&["b", "d"], 0, 20),
981 new_batch_by_range(&["c", "d"], 0, 10),
982 new_batch_by_range(&["c", "d"], 10, 20),
983 ],
984 )
985 .await;
986
987 assert_eq!(metrics.filter_metrics.rg_total, 4);
988 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
989 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
990 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
991 let cached = index_result_cache
992 .get(
993 bloom_filter_applier.unwrap().predicate_key(),
994 handle.file_id().file_id(),
995 )
996 .unwrap();
997 assert!(cached.contains_row_group(0));
998 assert!(cached.contains_row_group(1));
999 assert!(cached.contains_row_group(2));
1000 assert!(cached.contains_row_group(3));
1001 }
1002
1003 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1006 assert!(end >= start);
1007 let metadata = Arc::new(sst_region_metadata());
1008 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1009
1010 let num_rows = end - start;
1011 let mut columns = Vec::new();
1012
1013 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1015 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1016
1017 for _ in 0..num_rows {
1018 tag_0_builder.append_value(tags[0]);
1019 tag_1_builder.append_value(tags[1]);
1020 }
1021
1022 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1023 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1024
1025 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1027 columns.push(Arc::new(UInt64Array::from(field_values)));
1028
1029 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1031 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1032
1033 let pk = new_primary_key(tags);
1035 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1036 for _ in 0..num_rows {
1037 pk_builder.append(&pk).unwrap();
1038 }
1039 columns.push(Arc::new(pk_builder.finish()));
1040
1041 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1043
1044 columns.push(Arc::new(UInt8Array::from_value(
1046 OpType::Put as u8,
1047 num_rows,
1048 )));
1049
1050 RecordBatch::try_new(flat_schema, columns).unwrap()
1051 }
1052
1053 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1055 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1056 }
1057
1058 #[tokio::test]
1059 async fn test_write_flat_with_index() {
1060 let mut env = TestEnv::new().await;
1061 let object_store = env.init_object_store_manager();
1062 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1063 let metadata = Arc::new(sst_region_metadata());
1064 let row_group_size = 50;
1065
1066 let flat_batches = vec![
1068 new_record_batch_by_range(&["a", "d"], 0, 20),
1069 new_record_batch_by_range(&["b", "d"], 0, 20),
1070 new_record_batch_by_range(&["c", "d"], 0, 20),
1071 new_record_batch_by_range(&["c", "f"], 0, 40),
1072 new_record_batch_by_range(&["c", "h"], 100, 200),
1073 ];
1074
1075 let flat_source = new_flat_source_from_record_batches(flat_batches);
1076
1077 let write_opts = WriteOptions {
1078 row_group_size,
1079 ..Default::default()
1080 };
1081
1082 let puffin_manager = env
1083 .get_puffin_manager()
1084 .build(object_store.clone(), file_path.clone());
1085 let intermediate_manager = env.get_intermediate_manager();
1086
1087 let indexer_builder = IndexerBuilderImpl {
1088 build_type: IndexBuildType::Flush,
1089 metadata: metadata.clone(),
1090 row_group_size,
1091 puffin_manager,
1092 intermediate_manager,
1093 index_options: IndexOptions {
1094 inverted_index: InvertedIndexOptions {
1095 segment_row_count: 1,
1096 ..Default::default()
1097 },
1098 },
1099 inverted_index_config: Default::default(),
1100 fulltext_index_config: Default::default(),
1101 bloom_filter_index_config: Default::default(),
1102 };
1103
1104 let mut metrics = Metrics::new(WriteType::Flush);
1105 let mut writer = ParquetWriter::new_with_object_store(
1106 object_store.clone(),
1107 metadata.clone(),
1108 IndexConfig::default(),
1109 indexer_builder,
1110 file_path.clone(),
1111 &mut metrics,
1112 )
1113 .await;
1114
1115 let info = writer
1116 .write_all_flat(flat_source, &write_opts)
1117 .await
1118 .unwrap()
1119 .remove(0);
1120 assert_eq!(200, info.num_rows);
1121 assert!(info.file_size > 0);
1122 assert!(info.index_metadata.file_size > 0);
1123
1124 assert!(info.index_metadata.inverted_index.index_size > 0);
1125 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1126 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1127
1128 assert!(info.index_metadata.bloom_filter.index_size > 0);
1129 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1130 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1131
1132 assert_eq!(
1133 (
1134 Timestamp::new_millisecond(0),
1135 Timestamp::new_millisecond(199)
1136 ),
1137 info.time_range
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn test_read_with_override_sequence() {
1143 let mut env = TestEnv::new().await;
1144 let object_store = env.init_object_store_manager();
1145 let handle = sst_file_handle(0, 1000);
1146 let file_path = FixedPathProvider {
1147 region_file_id: handle.file_id(),
1148 };
1149 let metadata = Arc::new(sst_region_metadata());
1150
1151 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1153 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1154 let source = new_source(&[batch1, batch2]);
1155
1156 let write_opts = WriteOptions {
1157 row_group_size: 50,
1158 ..Default::default()
1159 };
1160
1161 let mut metrics = Metrics::new(WriteType::Flush);
1162 let mut writer = ParquetWriter::new_with_object_store(
1163 object_store.clone(),
1164 metadata.clone(),
1165 IndexConfig::default(),
1166 NoopIndexBuilder,
1167 file_path,
1168 &mut metrics,
1169 )
1170 .await;
1171
1172 writer
1173 .write_all(source, None, &write_opts)
1174 .await
1175 .unwrap()
1176 .remove(0);
1177
1178 let builder = ParquetReaderBuilder::new(
1180 FILE_DIR.to_string(),
1181 PathType::Bare,
1182 handle.clone(),
1183 object_store.clone(),
1184 );
1185 let mut reader = builder.build().await.unwrap();
1186 let mut normal_batches = Vec::new();
1187 while let Some(batch) = reader.next_batch().await.unwrap() {
1188 normal_batches.push(batch);
1189 }
1190
1191 let custom_sequence = 12345u64;
1193 let file_meta = handle.meta_ref();
1194 let mut override_file_meta = file_meta.clone();
1195 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1196 let override_handle = FileHandle::new(
1197 override_file_meta,
1198 Arc::new(crate::sst::file_purger::NoopFilePurger),
1199 );
1200
1201 let builder = ParquetReaderBuilder::new(
1202 FILE_DIR.to_string(),
1203 PathType::Bare,
1204 override_handle,
1205 object_store.clone(),
1206 );
1207 let mut reader = builder.build().await.unwrap();
1208 let mut override_batches = Vec::new();
1209 while let Some(batch) = reader.next_batch().await.unwrap() {
1210 override_batches.push(batch);
1211 }
1212
1213 assert_eq!(normal_batches.len(), override_batches.len());
1215 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1216 let expected_batch = {
1218 let num_rows = normal.num_rows();
1219 let mut builder = BatchBuilder::from(normal);
1220 builder
1221 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1222 .unwrap();
1223
1224 builder.build().unwrap()
1225 };
1226
1227 assert_eq!(*override_batch, expected_batch);
1229 }
1230 }
1231}