1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub max_row_group_uncompressed_size: u64,
81 pub num_rows: usize,
83 pub num_row_groups: u64,
85 pub file_metadata: Option<Arc<ParquetMetaData>>,
87 pub index_metadata: IndexOutput,
89 pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashSet;
96 use std::sync::Arc;
97
98 use api::v1::{OpType, SemanticType};
99 use common_function::function::FunctionRef;
100 use common_function::function_factory::ScalarFunctionFactory;
101 use common_function::scalars::matches::MatchesFunction;
102 use common_function::scalars::matches_term::MatchesTermFunction;
103 use common_time::Timestamp;
104 use datafusion_common::{Column, ScalarValue};
105 use datafusion_expr::expr::ScalarFunction;
106 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107 use datatypes::arrow;
108 use datatypes::arrow::array::{
109 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110 TimestampMillisecondArray, UInt8Array, UInt64Array,
111 };
112 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113 use datatypes::prelude::ConcreteDataType;
114 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
115 use object_store::ObjectStore;
116 use parquet::arrow::AsyncArrowWriter;
117 use parquet::basic::{Compression, Encoding, ZstdLevel};
118 use parquet::file::metadata::KeyValue;
119 use parquet::file::properties::WriterProperties;
120 use store_api::codec::PrimaryKeyEncoding;
121 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
122 use store_api::region_request::PathType;
123 use store_api::storage::{ColumnSchema, RegionId};
124 use table::predicate::Predicate;
125 use tokio_util::compat::FuturesAsyncWriteCompatExt;
126
127 use super::*;
128 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
129 use crate::cache::{CacheManager, CacheStrategy, PageKey};
130 use crate::config::IndexConfig;
131 use crate::read::{BatchBuilder, BatchReader, FlatSource};
132 use crate::region::options::{IndexOptions, InvertedIndexOptions};
133 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
134 use crate::sst::file_purger::NoopFilePurger;
135 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
136 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
137 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
138 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
139 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
140 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
141 use crate::sst::parquet::writer::ParquetWriter;
142 use crate::sst::{
143 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
144 };
145 use crate::test_util::sst_util::{
146 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
147 new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
148 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
149 sst_region_metadata_with_encoding,
150 };
151 use crate::test_util::{TestEnv, check_reader_result};
152
153 const FILE_DIR: &str = "/";
154 const REGION_ID: RegionId = RegionId::new(0, 0);
155
156 #[derive(Clone)]
157 struct FixedPathProvider {
158 region_file_id: RegionFileId,
159 }
160
161 impl FilePathProvider for FixedPathProvider {
162 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
163 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
164 }
165
166 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
167 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
168 }
169
170 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
171 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
172 }
173 }
174
175 struct NoopIndexBuilder;
176
177 #[async_trait::async_trait]
178 impl IndexerBuilder for NoopIndexBuilder {
179 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
180 Indexer::default()
181 }
182 }
183
184 #[tokio::test]
185 async fn test_write_read() {
186 let mut env = TestEnv::new().await;
187 let object_store = env.init_object_store_manager();
188 let handle = sst_file_handle(0, 1000);
189 let file_path = FixedPathProvider {
190 region_file_id: handle.file_id(),
191 };
192 let metadata = Arc::new(sst_region_metadata());
193 let source = new_source(&[
194 new_batch_by_range(&["a", "d"], 0, 60),
195 new_batch_by_range(&["b", "f"], 0, 40),
196 new_batch_by_range(&["b", "h"], 100, 200),
197 ]);
198 let write_opts = WriteOptions {
200 row_group_size: 50,
201 ..Default::default()
202 };
203
204 let mut metrics = Metrics::new(WriteType::Flush);
205 let mut writer = ParquetWriter::new_with_object_store(
206 object_store.clone(),
207 metadata.clone(),
208 IndexConfig::default(),
209 NoopIndexBuilder,
210 file_path,
211 &mut metrics,
212 )
213 .await;
214
215 let info = writer
216 .write_all(source, None, &write_opts)
217 .await
218 .unwrap()
219 .remove(0);
220 assert_eq!(200, info.num_rows);
221 assert!(info.file_size > 0);
222 assert_eq!(
223 (
224 Timestamp::new_millisecond(0),
225 Timestamp::new_millisecond(199)
226 ),
227 info.time_range
228 );
229
230 let builder = ParquetReaderBuilder::new(
231 FILE_DIR.to_string(),
232 PathType::Bare,
233 handle.clone(),
234 object_store,
235 );
236 let mut reader = builder.build().await.unwrap();
237 check_reader_result(
238 &mut reader,
239 &[
240 new_batch_by_range(&["a", "d"], 0, 50),
241 new_batch_by_range(&["a", "d"], 50, 60),
242 new_batch_by_range(&["b", "f"], 0, 40),
243 new_batch_by_range(&["b", "h"], 100, 150),
244 new_batch_by_range(&["b", "h"], 150, 200),
245 ],
246 )
247 .await;
248 }
249
250 #[tokio::test]
251 async fn test_read_with_cache() {
252 let mut env = TestEnv::new().await;
253 let object_store = env.init_object_store_manager();
254 let handle = sst_file_handle(0, 1000);
255 let metadata = Arc::new(sst_region_metadata());
256 let source = new_source(&[
257 new_batch_by_range(&["a", "d"], 0, 60),
258 new_batch_by_range(&["b", "f"], 0, 40),
259 new_batch_by_range(&["b", "h"], 100, 200),
260 ]);
261 let write_opts = WriteOptions {
263 row_group_size: 50,
264 ..Default::default()
265 };
266 let mut metrics = Metrics::new(WriteType::Flush);
268 let mut writer = ParquetWriter::new_with_object_store(
269 object_store.clone(),
270 metadata.clone(),
271 IndexConfig::default(),
272 NoopIndexBuilder,
273 FixedPathProvider {
274 region_file_id: handle.file_id(),
275 },
276 &mut metrics,
277 )
278 .await;
279
280 let sst_info = writer
281 .write_all(source, None, &write_opts)
282 .await
283 .unwrap()
284 .remove(0);
285
286 let cache = CacheStrategy::EnableAll(Arc::new(
288 CacheManager::builder()
289 .page_cache_size(64 * 1024 * 1024)
290 .build(),
291 ));
292 let builder = ParquetReaderBuilder::new(
293 FILE_DIR.to_string(),
294 PathType::Bare,
295 handle.clone(),
296 object_store,
297 )
298 .cache(cache.clone());
299 for _ in 0..3 {
300 let mut reader = builder.build().await.unwrap();
301 check_reader_result(
302 &mut reader,
303 &[
304 new_batch_by_range(&["a", "d"], 0, 50),
305 new_batch_by_range(&["a", "d"], 50, 60),
306 new_batch_by_range(&["b", "f"], 0, 40),
307 new_batch_by_range(&["b", "h"], 100, 150),
308 new_batch_by_range(&["b", "h"], 150, 200),
309 ],
310 )
311 .await;
312 }
313
314 let parquet_meta = sst_info.file_metadata.unwrap();
315 let get_ranges = |row_group_idx: usize| {
316 let row_group = parquet_meta.row_group(row_group_idx);
317 let mut ranges = Vec::with_capacity(row_group.num_columns());
318 for i in 0..row_group.num_columns() {
319 let (start, length) = row_group.column(i).byte_range();
320 ranges.push(start..start + length);
321 }
322
323 ranges
324 };
325
326 for i in 0..4 {
328 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
329 assert!(cache.get_pages(&page_key).is_some());
330 }
331 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
332 assert!(cache.get_pages(&page_key).is_none());
333 }
334
335 #[tokio::test]
336 async fn test_parquet_metadata_eq() {
337 let mut env = crate::test_util::TestEnv::new().await;
339 let object_store = env.init_object_store_manager();
340 let handle = sst_file_handle(0, 1000);
341 let metadata = Arc::new(sst_region_metadata());
342 let source = new_source(&[
343 new_batch_by_range(&["a", "d"], 0, 60),
344 new_batch_by_range(&["b", "f"], 0, 40),
345 new_batch_by_range(&["b", "h"], 100, 200),
346 ]);
347 let write_opts = WriteOptions {
348 row_group_size: 50,
349 ..Default::default()
350 };
351
352 let mut metrics = Metrics::new(WriteType::Flush);
355 let mut writer = ParquetWriter::new_with_object_store(
356 object_store.clone(),
357 metadata.clone(),
358 IndexConfig::default(),
359 NoopIndexBuilder,
360 FixedPathProvider {
361 region_file_id: handle.file_id(),
362 },
363 &mut metrics,
364 )
365 .await;
366
367 let sst_info = writer
368 .write_all(source, None, &write_opts)
369 .await
370 .unwrap()
371 .remove(0);
372 let writer_metadata = sst_info.file_metadata.unwrap();
373
374 let builder = ParquetReaderBuilder::new(
376 FILE_DIR.to_string(),
377 PathType::Bare,
378 handle.clone(),
379 object_store,
380 );
381 let reader = builder.build().await.unwrap();
382 let reader_metadata = reader.parquet_metadata();
383
384 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
385 }
386
387 #[tokio::test]
388 async fn test_read_with_tag_filter() {
389 let mut env = TestEnv::new().await;
390 let object_store = env.init_object_store_manager();
391 let handle = sst_file_handle(0, 1000);
392 let metadata = Arc::new(sst_region_metadata());
393 let source = new_source(&[
394 new_batch_by_range(&["a", "d"], 0, 60),
395 new_batch_by_range(&["b", "f"], 0, 40),
396 new_batch_by_range(&["b", "h"], 100, 200),
397 ]);
398 let write_opts = WriteOptions {
400 row_group_size: 50,
401 ..Default::default()
402 };
403 let mut metrics = Metrics::new(WriteType::Flush);
405 let mut writer = ParquetWriter::new_with_object_store(
406 object_store.clone(),
407 metadata.clone(),
408 IndexConfig::default(),
409 NoopIndexBuilder,
410 FixedPathProvider {
411 region_file_id: handle.file_id(),
412 },
413 &mut metrics,
414 )
415 .await;
416 writer
417 .write_all(source, None, &write_opts)
418 .await
419 .unwrap()
420 .remove(0);
421
422 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
424 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
425 op: Operator::Eq,
426 right: Box::new("a".lit()),
427 })]));
428
429 let builder = ParquetReaderBuilder::new(
430 FILE_DIR.to_string(),
431 PathType::Bare,
432 handle.clone(),
433 object_store,
434 )
435 .predicate(predicate);
436 let mut reader = builder.build().await.unwrap();
437 check_reader_result(
438 &mut reader,
439 &[
440 new_batch_by_range(&["a", "d"], 0, 50),
441 new_batch_by_range(&["a", "d"], 50, 60),
442 ],
443 )
444 .await;
445 }
446
447 #[tokio::test]
448 async fn test_read_empty_batch() {
449 let mut env = TestEnv::new().await;
450 let object_store = env.init_object_store_manager();
451 let handle = sst_file_handle(0, 1000);
452 let metadata = Arc::new(sst_region_metadata());
453 let source = new_source(&[
454 new_batch_by_range(&["a", "z"], 0, 0),
455 new_batch_by_range(&["a", "z"], 100, 100),
456 new_batch_by_range(&["a", "z"], 200, 230),
457 ]);
458 let write_opts = WriteOptions {
460 row_group_size: 50,
461 ..Default::default()
462 };
463 let mut metrics = Metrics::new(WriteType::Flush);
465 let mut writer = ParquetWriter::new_with_object_store(
466 object_store.clone(),
467 metadata.clone(),
468 IndexConfig::default(),
469 NoopIndexBuilder,
470 FixedPathProvider {
471 region_file_id: handle.file_id(),
472 },
473 &mut metrics,
474 )
475 .await;
476 writer
477 .write_all(source, None, &write_opts)
478 .await
479 .unwrap()
480 .remove(0);
481
482 let builder = ParquetReaderBuilder::new(
483 FILE_DIR.to_string(),
484 PathType::Bare,
485 handle.clone(),
486 object_store,
487 );
488 let mut reader = builder.build().await.unwrap();
489 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
490 }
491
492 #[tokio::test]
493 async fn test_read_with_field_filter() {
494 let mut env = TestEnv::new().await;
495 let object_store = env.init_object_store_manager();
496 let handle = sst_file_handle(0, 1000);
497 let metadata = Arc::new(sst_region_metadata());
498 let source = new_source(&[
499 new_batch_by_range(&["a", "d"], 0, 60),
500 new_batch_by_range(&["b", "f"], 0, 40),
501 new_batch_by_range(&["b", "h"], 100, 200),
502 ]);
503 let write_opts = WriteOptions {
505 row_group_size: 50,
506 ..Default::default()
507 };
508 let mut metrics = Metrics::new(WriteType::Flush);
510 let mut writer = ParquetWriter::new_with_object_store(
511 object_store.clone(),
512 metadata.clone(),
513 IndexConfig::default(),
514 NoopIndexBuilder,
515 FixedPathProvider {
516 region_file_id: handle.file_id(),
517 },
518 &mut metrics,
519 )
520 .await;
521
522 writer
523 .write_all(source, None, &write_opts)
524 .await
525 .unwrap()
526 .remove(0);
527
528 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
530 left: Box::new(Expr::Column(Column::from_name("field_0"))),
531 op: Operator::GtEq,
532 right: Box::new(150u64.lit()),
533 })]));
534
535 let builder = ParquetReaderBuilder::new(
536 FILE_DIR.to_string(),
537 PathType::Bare,
538 handle.clone(),
539 object_store,
540 )
541 .predicate(predicate);
542 let mut reader = builder.build().await.unwrap();
543 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
544 }
545
546 #[tokio::test]
547 async fn test_read_large_binary() {
548 let mut env = TestEnv::new().await;
549 let object_store = env.init_object_store_manager();
550 let handle = sst_file_handle(0, 1000);
551 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
552
553 let write_opts = WriteOptions {
554 row_group_size: 50,
555 ..Default::default()
556 };
557
558 let metadata = build_test_binary_test_region_metadata();
559 let json = metadata.to_json().unwrap();
560 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
561
562 let props_builder = WriterProperties::builder()
563 .set_key_value_metadata(Some(vec![key_value_meta]))
564 .set_compression(Compression::ZSTD(ZstdLevel::default()))
565 .set_encoding(Encoding::PLAIN)
566 .set_max_row_group_size(write_opts.row_group_size);
567
568 let writer_props = props_builder.build();
569
570 let write_format = PrimaryKeyWriteFormat::new(metadata);
571 let fields: Vec<_> = write_format
572 .arrow_schema()
573 .fields()
574 .into_iter()
575 .map(|field| {
576 let data_type = field.data_type().clone();
577 if data_type == DataType::Binary {
578 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
579 } else {
580 Field::new(field.name(), data_type, field.is_nullable())
581 }
582 })
583 .collect();
584
585 let arrow_schema = Arc::new(Schema::new(fields));
586
587 assert_eq!(
589 &DataType::LargeBinary,
590 arrow_schema.field_with_name("field_0").unwrap().data_type()
591 );
592 let mut writer = AsyncArrowWriter::try_new(
593 object_store
594 .writer_with(&file_path)
595 .concurrent(DEFAULT_WRITE_CONCURRENCY)
596 .await
597 .map(|w| w.into_futures_async_write().compat_write())
598 .unwrap(),
599 arrow_schema.clone(),
600 Some(writer_props),
601 )
602 .unwrap();
603
604 let batch = new_batch_with_binary(&["a"], 0, 60);
605 let arrow_batch = write_format.convert_batch(&batch).unwrap();
606 let arrays: Vec<_> = arrow_batch
607 .columns()
608 .iter()
609 .map(|array| {
610 let data_type = array.data_type().clone();
611 if data_type == DataType::Binary {
612 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
613 } else {
614 array.clone()
615 }
616 })
617 .collect();
618 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
619
620 writer.write(&result).await.unwrap();
621 writer.close().await.unwrap();
622
623 let builder = ParquetReaderBuilder::new(
624 FILE_DIR.to_string(),
625 PathType::Bare,
626 handle.clone(),
627 object_store,
628 );
629 let mut reader = builder.build().await.unwrap();
630 check_reader_result(
631 &mut reader,
632 &[
633 new_batch_with_binary(&["a"], 0, 50),
634 new_batch_with_binary(&["a"], 50, 60),
635 ],
636 )
637 .await;
638 }
639
640 #[tokio::test]
641 async fn test_write_multiple_files() {
642 common_telemetry::init_default_ut_logging();
643 let mut env = TestEnv::new().await;
645 let object_store = env.init_object_store_manager();
646 let metadata = Arc::new(sst_region_metadata());
647 let batches = &[
648 new_batch_by_range(&["a", "d"], 0, 1000),
649 new_batch_by_range(&["b", "f"], 0, 1000),
650 new_batch_by_range(&["c", "g"], 0, 1000),
651 new_batch_by_range(&["b", "h"], 100, 200),
652 new_batch_by_range(&["b", "h"], 200, 300),
653 new_batch_by_range(&["b", "h"], 300, 1000),
654 ];
655 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
656
657 let source = new_source(batches);
658 let write_opts = WriteOptions {
659 row_group_size: 50,
660 max_file_size: Some(1024 * 16),
661 ..Default::default()
662 };
663
664 let path_provider = RegionFilePathFactory {
665 table_dir: "test".to_string(),
666 path_type: PathType::Bare,
667 };
668 let mut metrics = Metrics::new(WriteType::Flush);
669 let mut writer = ParquetWriter::new_with_object_store(
670 object_store.clone(),
671 metadata.clone(),
672 IndexConfig::default(),
673 NoopIndexBuilder,
674 path_provider,
675 &mut metrics,
676 )
677 .await;
678
679 let files = writer.write_all(source, None, &write_opts).await.unwrap();
680 assert_eq!(2, files.len());
681
682 let mut rows_read = 0;
683 for f in &files {
684 let file_handle = sst_file_handle_with_file_id(
685 f.file_id,
686 f.time_range.0.value(),
687 f.time_range.1.value(),
688 );
689 let builder = ParquetReaderBuilder::new(
690 "test".to_string(),
691 PathType::Bare,
692 file_handle,
693 object_store.clone(),
694 );
695 let mut reader = builder.build().await.unwrap();
696 while let Some(batch) = reader.next_batch().await.unwrap() {
697 rows_read += batch.num_rows();
698 }
699 }
700 assert_eq!(total_rows, rows_read);
701 }
702
703 #[tokio::test]
704 async fn test_write_read_with_index() {
705 let mut env = TestEnv::new().await;
706 let object_store = env.init_object_store_manager();
707 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
708 let metadata = Arc::new(sst_region_metadata());
709 let row_group_size = 50;
710
711 let source = new_source(&[
712 new_batch_by_range(&["a", "d"], 0, 20),
713 new_batch_by_range(&["b", "d"], 0, 20),
714 new_batch_by_range(&["c", "d"], 0, 20),
715 new_batch_by_range(&["c", "f"], 0, 40),
716 new_batch_by_range(&["c", "h"], 100, 200),
717 ]);
718 let write_opts = WriteOptions {
720 row_group_size,
721 ..Default::default()
722 };
723
724 let puffin_manager = env
725 .get_puffin_manager()
726 .build(object_store.clone(), file_path.clone());
727 let intermediate_manager = env.get_intermediate_manager();
728
729 let indexer_builder = IndexerBuilderImpl {
730 build_type: IndexBuildType::Flush,
731 metadata: metadata.clone(),
732 row_group_size,
733 puffin_manager,
734 write_cache_enabled: false,
735 intermediate_manager,
736 index_options: IndexOptions {
737 inverted_index: InvertedIndexOptions {
738 segment_row_count: 1,
739 ..Default::default()
740 },
741 },
742 inverted_index_config: Default::default(),
743 fulltext_index_config: Default::default(),
744 bloom_filter_index_config: Default::default(),
745 #[cfg(feature = "vector_index")]
746 vector_index_config: Default::default(),
747 };
748
749 let mut metrics = Metrics::new(WriteType::Flush);
750 let mut writer = ParquetWriter::new_with_object_store(
751 object_store.clone(),
752 metadata.clone(),
753 IndexConfig::default(),
754 indexer_builder,
755 file_path.clone(),
756 &mut metrics,
757 )
758 .await;
759
760 let info = writer
761 .write_all(source, None, &write_opts)
762 .await
763 .unwrap()
764 .remove(0);
765 assert_eq!(200, info.num_rows);
766 assert!(info.file_size > 0);
767 assert!(info.index_metadata.file_size > 0);
768
769 assert!(info.index_metadata.inverted_index.index_size > 0);
770 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
771 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
772
773 assert!(info.index_metadata.bloom_filter.index_size > 0);
774 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
775 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
776
777 assert_eq!(
778 (
779 Timestamp::new_millisecond(0),
780 Timestamp::new_millisecond(199)
781 ),
782 info.time_range
783 );
784
785 let handle = FileHandle::new(
786 FileMeta {
787 region_id: metadata.region_id,
788 file_id: info.file_id,
789 time_range: info.time_range,
790 level: 0,
791 file_size: info.file_size,
792 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
793 available_indexes: info.index_metadata.build_available_indexes(),
794 indexes: info.index_metadata.build_indexes(),
795 index_file_size: info.index_metadata.file_size,
796 index_version: 0,
797 num_row_groups: info.num_row_groups,
798 num_rows: info.num_rows as u64,
799 sequence: None,
800 partition_expr: match &metadata.partition_expr {
801 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
802 .expect("partition expression should be valid JSON"),
803 None => None,
804 },
805 num_series: 0,
806 },
807 Arc::new(NoopFilePurger),
808 );
809
810 let cache = Arc::new(
811 CacheManager::builder()
812 .index_result_cache_size(1024 * 1024)
813 .index_metadata_size(1024 * 1024)
814 .index_content_page_size(1024 * 1024)
815 .index_content_size(1024 * 1024)
816 .puffin_metadata_size(1024 * 1024)
817 .build(),
818 );
819 let index_result_cache = cache.index_result_cache().unwrap();
820
821 let build_inverted_index_applier = |exprs: &[Expr]| {
822 InvertedIndexApplierBuilder::new(
823 FILE_DIR.to_string(),
824 PathType::Bare,
825 object_store.clone(),
826 &metadata,
827 HashSet::from_iter([0]),
828 env.get_puffin_manager(),
829 )
830 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
831 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
832 .build(exprs)
833 .unwrap()
834 .map(Arc::new)
835 };
836
837 let build_bloom_filter_applier = |exprs: &[Expr]| {
838 BloomFilterIndexApplierBuilder::new(
839 FILE_DIR.to_string(),
840 PathType::Bare,
841 object_store.clone(),
842 &metadata,
843 env.get_puffin_manager(),
844 )
845 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
846 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
847 .build(exprs)
848 .unwrap()
849 .map(Arc::new)
850 };
851
852 let preds = vec![col("tag_0").eq(lit("b"))];
869 let inverted_index_applier = build_inverted_index_applier(&preds);
870 let bloom_filter_applier = build_bloom_filter_applier(&preds);
871
872 let builder = ParquetReaderBuilder::new(
873 FILE_DIR.to_string(),
874 PathType::Bare,
875 handle.clone(),
876 object_store.clone(),
877 )
878 .predicate(Some(Predicate::new(preds)))
879 .inverted_index_appliers([inverted_index_applier.clone(), None])
880 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
881 .cache(CacheStrategy::EnableAll(cache.clone()));
882
883 let mut metrics = ReaderMetrics::default();
884 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
885 let mut reader = ParquetReader::new(Arc::new(context), selection)
886 .await
887 .unwrap();
888 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
889
890 assert_eq!(metrics.filter_metrics.rg_total, 4);
891 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
892 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
893 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
894 let cached = index_result_cache
895 .get(
896 inverted_index_applier.unwrap().predicate_key(),
897 handle.file_id().file_id(),
898 )
899 .unwrap();
900 assert!(cached.contains_row_group(0));
902 assert!(cached.contains_row_group(1));
903 assert!(cached.contains_row_group(2));
904 assert!(cached.contains_row_group(3));
905
906 let preds = vec![
921 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
922 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
923 col("tag_1").eq(lit("d")),
924 ];
925 let inverted_index_applier = build_inverted_index_applier(&preds);
926 let bloom_filter_applier = build_bloom_filter_applier(&preds);
927
928 let builder = ParquetReaderBuilder::new(
929 FILE_DIR.to_string(),
930 PathType::Bare,
931 handle.clone(),
932 object_store.clone(),
933 )
934 .predicate(Some(Predicate::new(preds)))
935 .inverted_index_appliers([inverted_index_applier.clone(), None])
936 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
937 .cache(CacheStrategy::EnableAll(cache.clone()));
938
939 let mut metrics = ReaderMetrics::default();
940 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
941 let mut reader = ParquetReader::new(Arc::new(context), selection)
942 .await
943 .unwrap();
944 check_reader_result(&mut reader, &[]).await;
945
946 assert_eq!(metrics.filter_metrics.rg_total, 4);
947 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
948 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
949 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
950 let cached = index_result_cache
951 .get(
952 bloom_filter_applier.unwrap().predicate_key(),
953 handle.file_id().file_id(),
954 )
955 .unwrap();
956 assert!(cached.contains_row_group(2));
957 assert!(cached.contains_row_group(3));
958 assert!(!cached.contains_row_group(0));
959 assert!(!cached.contains_row_group(1));
960
961 let preds = vec![col("tag_1").eq(lit("d"))];
982 let inverted_index_applier = build_inverted_index_applier(&preds);
983 let bloom_filter_applier = build_bloom_filter_applier(&preds);
984
985 let builder = ParquetReaderBuilder::new(
986 FILE_DIR.to_string(),
987 PathType::Bare,
988 handle.clone(),
989 object_store.clone(),
990 )
991 .predicate(Some(Predicate::new(preds)))
992 .inverted_index_appliers([inverted_index_applier.clone(), None])
993 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
994 .cache(CacheStrategy::EnableAll(cache.clone()));
995
996 let mut metrics = ReaderMetrics::default();
997 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
998 let mut reader = ParquetReader::new(Arc::new(context), selection)
999 .await
1000 .unwrap();
1001 check_reader_result(
1002 &mut reader,
1003 &[
1004 new_batch_by_range(&["a", "d"], 0, 20),
1005 new_batch_by_range(&["b", "d"], 0, 20),
1006 new_batch_by_range(&["c", "d"], 0, 10),
1007 new_batch_by_range(&["c", "d"], 10, 20),
1008 ],
1009 )
1010 .await;
1011
1012 assert_eq!(metrics.filter_metrics.rg_total, 4);
1013 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1014 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1015 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1016 let cached = index_result_cache
1017 .get(
1018 bloom_filter_applier.unwrap().predicate_key(),
1019 handle.file_id().file_id(),
1020 )
1021 .unwrap();
1022 assert!(cached.contains_row_group(0));
1023 assert!(cached.contains_row_group(1));
1024 assert!(cached.contains_row_group(2));
1025 assert!(cached.contains_row_group(3));
1026 }
1027
1028 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1031 assert!(end >= start);
1032 let metadata = Arc::new(sst_region_metadata());
1033 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1034
1035 let num_rows = end - start;
1036 let mut columns = Vec::new();
1037
1038 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1040 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1041
1042 for _ in 0..num_rows {
1043 tag_0_builder.append_value(tags[0]);
1044 tag_1_builder.append_value(tags[1]);
1045 }
1046
1047 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1048 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1049
1050 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1052 columns.push(Arc::new(UInt64Array::from(field_values)));
1053
1054 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1056 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1057
1058 let pk = new_primary_key(tags);
1060 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1061 for _ in 0..num_rows {
1062 pk_builder.append(&pk).unwrap();
1063 }
1064 columns.push(Arc::new(pk_builder.finish()));
1065
1066 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1068
1069 columns.push(Arc::new(UInt8Array::from_value(
1071 OpType::Put as u8,
1072 num_rows,
1073 )));
1074
1075 RecordBatch::try_new(flat_schema, columns).unwrap()
1076 }
1077
1078 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1080 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1081 }
1082
1083 fn new_record_batch_by_range_sparse(
1086 tags: &[&str],
1087 start: usize,
1088 end: usize,
1089 metadata: &Arc<RegionMetadata>,
1090 ) -> RecordBatch {
1091 assert!(end >= start);
1092 let flat_schema = to_flat_sst_arrow_schema(
1093 metadata,
1094 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1095 );
1096
1097 let num_rows = end - start;
1098 let mut columns: Vec<ArrayRef> = Vec::new();
1099
1100 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1104 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1105
1106 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1108 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1109
1110 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1114
1115 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1116 for _ in 0..num_rows {
1117 pk_builder.append(&pk).unwrap();
1118 }
1119 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1120
1121 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1123
1124 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1126
1127 RecordBatch::try_new(flat_schema, columns).unwrap()
1128 }
1129
1130 fn create_test_indexer_builder(
1132 env: &TestEnv,
1133 object_store: ObjectStore,
1134 file_path: RegionFilePathFactory,
1135 metadata: Arc<RegionMetadata>,
1136 row_group_size: usize,
1137 ) -> IndexerBuilderImpl {
1138 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1139 let intermediate_manager = env.get_intermediate_manager();
1140
1141 IndexerBuilderImpl {
1142 build_type: IndexBuildType::Flush,
1143 metadata,
1144 row_group_size,
1145 puffin_manager,
1146 write_cache_enabled: false,
1147 intermediate_manager,
1148 index_options: IndexOptions {
1149 inverted_index: InvertedIndexOptions {
1150 segment_row_count: 1,
1151 ..Default::default()
1152 },
1153 },
1154 inverted_index_config: Default::default(),
1155 fulltext_index_config: Default::default(),
1156 bloom_filter_index_config: Default::default(),
1157 #[cfg(feature = "vector_index")]
1158 vector_index_config: Default::default(),
1159 }
1160 }
1161
1162 async fn write_flat_sst(
1164 object_store: ObjectStore,
1165 metadata: Arc<RegionMetadata>,
1166 indexer_builder: IndexerBuilderImpl,
1167 file_path: RegionFilePathFactory,
1168 flat_source: FlatSource,
1169 write_opts: &WriteOptions,
1170 ) -> SstInfo {
1171 let mut metrics = Metrics::new(WriteType::Flush);
1172 let mut writer = ParquetWriter::new_with_object_store(
1173 object_store,
1174 metadata,
1175 IndexConfig::default(),
1176 indexer_builder,
1177 file_path,
1178 &mut metrics,
1179 )
1180 .await;
1181
1182 writer
1183 .write_all_flat(flat_source, write_opts)
1184 .await
1185 .unwrap()
1186 .remove(0)
1187 }
1188
1189 fn create_file_handle_from_sst_info(
1191 info: &SstInfo,
1192 metadata: &Arc<RegionMetadata>,
1193 ) -> FileHandle {
1194 FileHandle::new(
1195 FileMeta {
1196 region_id: metadata.region_id,
1197 file_id: info.file_id,
1198 time_range: info.time_range,
1199 level: 0,
1200 file_size: info.file_size,
1201 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1202 available_indexes: info.index_metadata.build_available_indexes(),
1203 indexes: info.index_metadata.build_indexes(),
1204 index_file_size: info.index_metadata.file_size,
1205 index_version: 0,
1206 num_row_groups: info.num_row_groups,
1207 num_rows: info.num_rows as u64,
1208 sequence: None,
1209 partition_expr: match &metadata.partition_expr {
1210 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1211 .expect("partition expression should be valid JSON"),
1212 None => None,
1213 },
1214 num_series: 0,
1215 },
1216 Arc::new(NoopFilePurger),
1217 )
1218 }
1219
1220 fn create_test_cache() -> Arc<CacheManager> {
1222 Arc::new(
1223 CacheManager::builder()
1224 .index_result_cache_size(1024 * 1024)
1225 .index_metadata_size(1024 * 1024)
1226 .index_content_page_size(1024 * 1024)
1227 .index_content_size(1024 * 1024)
1228 .puffin_metadata_size(1024 * 1024)
1229 .build(),
1230 )
1231 }
1232
1233 #[tokio::test]
1234 async fn test_write_flat_with_index() {
1235 let mut env = TestEnv::new().await;
1236 let object_store = env.init_object_store_manager();
1237 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1238 let metadata = Arc::new(sst_region_metadata());
1239 let row_group_size = 50;
1240
1241 let flat_batches = vec![
1243 new_record_batch_by_range(&["a", "d"], 0, 20),
1244 new_record_batch_by_range(&["b", "d"], 0, 20),
1245 new_record_batch_by_range(&["c", "d"], 0, 20),
1246 new_record_batch_by_range(&["c", "f"], 0, 40),
1247 new_record_batch_by_range(&["c", "h"], 100, 200),
1248 ];
1249
1250 let flat_source = new_flat_source_from_record_batches(flat_batches);
1251
1252 let write_opts = WriteOptions {
1253 row_group_size,
1254 ..Default::default()
1255 };
1256
1257 let puffin_manager = env
1258 .get_puffin_manager()
1259 .build(object_store.clone(), file_path.clone());
1260 let intermediate_manager = env.get_intermediate_manager();
1261
1262 let indexer_builder = IndexerBuilderImpl {
1263 build_type: IndexBuildType::Flush,
1264 metadata: metadata.clone(),
1265 row_group_size,
1266 puffin_manager,
1267 write_cache_enabled: false,
1268 intermediate_manager,
1269 index_options: IndexOptions {
1270 inverted_index: InvertedIndexOptions {
1271 segment_row_count: 1,
1272 ..Default::default()
1273 },
1274 },
1275 inverted_index_config: Default::default(),
1276 fulltext_index_config: Default::default(),
1277 bloom_filter_index_config: Default::default(),
1278 #[cfg(feature = "vector_index")]
1279 vector_index_config: Default::default(),
1280 };
1281
1282 let mut metrics = Metrics::new(WriteType::Flush);
1283 let mut writer = ParquetWriter::new_with_object_store(
1284 object_store.clone(),
1285 metadata.clone(),
1286 IndexConfig::default(),
1287 indexer_builder,
1288 file_path.clone(),
1289 &mut metrics,
1290 )
1291 .await;
1292
1293 let info = writer
1294 .write_all_flat(flat_source, &write_opts)
1295 .await
1296 .unwrap()
1297 .remove(0);
1298 assert_eq!(200, info.num_rows);
1299 assert!(info.file_size > 0);
1300 assert!(info.index_metadata.file_size > 0);
1301
1302 assert!(info.index_metadata.inverted_index.index_size > 0);
1303 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1304 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1305
1306 assert!(info.index_metadata.bloom_filter.index_size > 0);
1307 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1308 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1309
1310 assert_eq!(
1311 (
1312 Timestamp::new_millisecond(0),
1313 Timestamp::new_millisecond(199)
1314 ),
1315 info.time_range
1316 );
1317 }
1318
1319 #[tokio::test]
1320 async fn test_read_with_override_sequence() {
1321 let mut env = TestEnv::new().await;
1322 let object_store = env.init_object_store_manager();
1323 let handle = sst_file_handle(0, 1000);
1324 let file_path = FixedPathProvider {
1325 region_file_id: handle.file_id(),
1326 };
1327 let metadata = Arc::new(sst_region_metadata());
1328
1329 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1331 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1332 let source = new_source(&[batch1, batch2]);
1333
1334 let write_opts = WriteOptions {
1335 row_group_size: 50,
1336 ..Default::default()
1337 };
1338
1339 let mut metrics = Metrics::new(WriteType::Flush);
1340 let mut writer = ParquetWriter::new_with_object_store(
1341 object_store.clone(),
1342 metadata.clone(),
1343 IndexConfig::default(),
1344 NoopIndexBuilder,
1345 file_path,
1346 &mut metrics,
1347 )
1348 .await;
1349
1350 writer
1351 .write_all(source, None, &write_opts)
1352 .await
1353 .unwrap()
1354 .remove(0);
1355
1356 let builder = ParquetReaderBuilder::new(
1358 FILE_DIR.to_string(),
1359 PathType::Bare,
1360 handle.clone(),
1361 object_store.clone(),
1362 );
1363 let mut reader = builder.build().await.unwrap();
1364 let mut normal_batches = Vec::new();
1365 while let Some(batch) = reader.next_batch().await.unwrap() {
1366 normal_batches.push(batch);
1367 }
1368
1369 let custom_sequence = 12345u64;
1371 let file_meta = handle.meta_ref();
1372 let mut override_file_meta = file_meta.clone();
1373 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1374 let override_handle = FileHandle::new(
1375 override_file_meta,
1376 Arc::new(crate::sst::file_purger::NoopFilePurger),
1377 );
1378
1379 let builder = ParquetReaderBuilder::new(
1380 FILE_DIR.to_string(),
1381 PathType::Bare,
1382 override_handle,
1383 object_store.clone(),
1384 );
1385 let mut reader = builder.build().await.unwrap();
1386 let mut override_batches = Vec::new();
1387 while let Some(batch) = reader.next_batch().await.unwrap() {
1388 override_batches.push(batch);
1389 }
1390
1391 assert_eq!(normal_batches.len(), override_batches.len());
1393 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1394 let expected_batch = {
1396 let num_rows = normal.num_rows();
1397 let mut builder = BatchBuilder::from(normal);
1398 builder
1399 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1400 .unwrap();
1401
1402 builder.build().unwrap()
1403 };
1404
1405 assert_eq!(*override_batch, expected_batch);
1407 }
1408 }
1409
1410 #[tokio::test]
1411 async fn test_write_flat_read_with_inverted_index() {
1412 let mut env = TestEnv::new().await;
1413 let object_store = env.init_object_store_manager();
1414 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1415 let metadata = Arc::new(sst_region_metadata());
1416 let row_group_size = 100;
1417
1418 let flat_batches = vec![
1426 new_record_batch_by_range(&["a", "d"], 0, 50),
1427 new_record_batch_by_range(&["b", "d"], 50, 100),
1428 new_record_batch_by_range(&["c", "d"], 100, 150),
1429 new_record_batch_by_range(&["c", "f"], 150, 200),
1430 ];
1431
1432 let flat_source = new_flat_source_from_record_batches(flat_batches);
1433
1434 let write_opts = WriteOptions {
1435 row_group_size,
1436 ..Default::default()
1437 };
1438
1439 let indexer_builder = create_test_indexer_builder(
1440 &env,
1441 object_store.clone(),
1442 file_path.clone(),
1443 metadata.clone(),
1444 row_group_size,
1445 );
1446
1447 let info = write_flat_sst(
1448 object_store.clone(),
1449 metadata.clone(),
1450 indexer_builder,
1451 file_path.clone(),
1452 flat_source,
1453 &write_opts,
1454 )
1455 .await;
1456 assert_eq!(200, info.num_rows);
1457 assert!(info.file_size > 0);
1458 assert!(info.index_metadata.file_size > 0);
1459
1460 let handle = create_file_handle_from_sst_info(&info, &metadata);
1461
1462 let cache = create_test_cache();
1463
1464 let preds = vec![col("tag_0").eq(lit("b"))];
1467 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1468 FILE_DIR.to_string(),
1469 PathType::Bare,
1470 object_store.clone(),
1471 &metadata,
1472 HashSet::from_iter([0]),
1473 env.get_puffin_manager(),
1474 )
1475 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1476 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1477 .build(&preds)
1478 .unwrap()
1479 .map(Arc::new);
1480
1481 let builder = ParquetReaderBuilder::new(
1482 FILE_DIR.to_string(),
1483 PathType::Bare,
1484 handle.clone(),
1485 object_store.clone(),
1486 )
1487 .flat_format(true)
1488 .predicate(Some(Predicate::new(preds)))
1489 .inverted_index_appliers([inverted_index_applier.clone(), None])
1490 .cache(CacheStrategy::EnableAll(cache.clone()));
1491
1492 let mut metrics = ReaderMetrics::default();
1493 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1494
1495 assert_eq!(selection.row_group_count(), 1);
1497 assert_eq!(50, selection.get(0).unwrap().row_count());
1498
1499 assert_eq!(metrics.filter_metrics.rg_total, 2);
1501 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1502 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1503 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1504 }
1505
1506 #[tokio::test]
1507 async fn test_write_flat_read_with_bloom_filter() {
1508 let mut env = TestEnv::new().await;
1509 let object_store = env.init_object_store_manager();
1510 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1511 let metadata = Arc::new(sst_region_metadata());
1512 let row_group_size = 100;
1513
1514 let flat_batches = vec![
1522 new_record_batch_by_range(&["a", "d"], 0, 50),
1523 new_record_batch_by_range(&["b", "e"], 50, 100),
1524 new_record_batch_by_range(&["c", "d"], 100, 150),
1525 new_record_batch_by_range(&["c", "f"], 150, 200),
1526 ];
1527
1528 let flat_source = new_flat_source_from_record_batches(flat_batches);
1529
1530 let write_opts = WriteOptions {
1531 row_group_size,
1532 ..Default::default()
1533 };
1534
1535 let indexer_builder = create_test_indexer_builder(
1536 &env,
1537 object_store.clone(),
1538 file_path.clone(),
1539 metadata.clone(),
1540 row_group_size,
1541 );
1542
1543 let info = write_flat_sst(
1544 object_store.clone(),
1545 metadata.clone(),
1546 indexer_builder,
1547 file_path.clone(),
1548 flat_source,
1549 &write_opts,
1550 )
1551 .await;
1552 assert_eq!(200, info.num_rows);
1553 assert!(info.file_size > 0);
1554 assert!(info.index_metadata.file_size > 0);
1555
1556 let handle = create_file_handle_from_sst_info(&info, &metadata);
1557
1558 let cache = create_test_cache();
1559
1560 let preds = vec![
1563 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1564 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1565 col("tag_1").eq(lit("d")),
1566 ];
1567 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1568 FILE_DIR.to_string(),
1569 PathType::Bare,
1570 object_store.clone(),
1571 &metadata,
1572 env.get_puffin_manager(),
1573 )
1574 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1575 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1576 .build(&preds)
1577 .unwrap()
1578 .map(Arc::new);
1579
1580 let builder = ParquetReaderBuilder::new(
1581 FILE_DIR.to_string(),
1582 PathType::Bare,
1583 handle.clone(),
1584 object_store.clone(),
1585 )
1586 .flat_format(true)
1587 .predicate(Some(Predicate::new(preds)))
1588 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1589 .cache(CacheStrategy::EnableAll(cache.clone()));
1590
1591 let mut metrics = ReaderMetrics::default();
1592 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1593
1594 assert_eq!(selection.row_group_count(), 2);
1596 assert_eq!(50, selection.get(0).unwrap().row_count());
1597 assert_eq!(50, selection.get(1).unwrap().row_count());
1598
1599 assert_eq!(metrics.filter_metrics.rg_total, 2);
1601 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1602 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1603 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1604 }
1605
1606 #[tokio::test]
1607 async fn test_write_flat_read_with_inverted_index_sparse() {
1608 common_telemetry::init_default_ut_logging();
1609
1610 let mut env = TestEnv::new().await;
1611 let object_store = env.init_object_store_manager();
1612 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1613 let metadata = Arc::new(sst_region_metadata_with_encoding(
1614 PrimaryKeyEncoding::Sparse,
1615 ));
1616 let row_group_size = 100;
1617
1618 let flat_batches = vec![
1626 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1627 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1628 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1629 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1630 ];
1631
1632 let flat_source = new_flat_source_from_record_batches(flat_batches);
1633
1634 let write_opts = WriteOptions {
1635 row_group_size,
1636 ..Default::default()
1637 };
1638
1639 let indexer_builder = create_test_indexer_builder(
1640 &env,
1641 object_store.clone(),
1642 file_path.clone(),
1643 metadata.clone(),
1644 row_group_size,
1645 );
1646
1647 let info = write_flat_sst(
1648 object_store.clone(),
1649 metadata.clone(),
1650 indexer_builder,
1651 file_path.clone(),
1652 flat_source,
1653 &write_opts,
1654 )
1655 .await;
1656 assert_eq!(200, info.num_rows);
1657 assert!(info.file_size > 0);
1658 assert!(info.index_metadata.file_size > 0);
1659
1660 let handle = create_file_handle_from_sst_info(&info, &metadata);
1661
1662 let cache = create_test_cache();
1663
1664 let preds = vec![col("tag_0").eq(lit("b"))];
1667 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1668 FILE_DIR.to_string(),
1669 PathType::Bare,
1670 object_store.clone(),
1671 &metadata,
1672 HashSet::from_iter([0]),
1673 env.get_puffin_manager(),
1674 )
1675 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1676 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1677 .build(&preds)
1678 .unwrap()
1679 .map(Arc::new);
1680
1681 let builder = ParquetReaderBuilder::new(
1682 FILE_DIR.to_string(),
1683 PathType::Bare,
1684 handle.clone(),
1685 object_store.clone(),
1686 )
1687 .flat_format(true)
1688 .predicate(Some(Predicate::new(preds)))
1689 .inverted_index_appliers([inverted_index_applier.clone(), None])
1690 .cache(CacheStrategy::EnableAll(cache.clone()));
1691
1692 let mut metrics = ReaderMetrics::default();
1693 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1694
1695 assert_eq!(selection.row_group_count(), 1);
1697 assert_eq!(50, selection.get(0).unwrap().row_count());
1698
1699 assert_eq!(metrics.filter_metrics.rg_total, 2);
1703 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1705 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1706 }
1707
1708 #[tokio::test]
1709 async fn test_write_flat_read_with_bloom_filter_sparse() {
1710 let mut env = TestEnv::new().await;
1711 let object_store = env.init_object_store_manager();
1712 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1713 let metadata = Arc::new(sst_region_metadata_with_encoding(
1714 PrimaryKeyEncoding::Sparse,
1715 ));
1716 let row_group_size = 100;
1717
1718 let flat_batches = vec![
1726 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1727 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1728 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1729 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1730 ];
1731
1732 let flat_source = new_flat_source_from_record_batches(flat_batches);
1733
1734 let write_opts = WriteOptions {
1735 row_group_size,
1736 ..Default::default()
1737 };
1738
1739 let indexer_builder = create_test_indexer_builder(
1740 &env,
1741 object_store.clone(),
1742 file_path.clone(),
1743 metadata.clone(),
1744 row_group_size,
1745 );
1746
1747 let info = write_flat_sst(
1748 object_store.clone(),
1749 metadata.clone(),
1750 indexer_builder,
1751 file_path.clone(),
1752 flat_source,
1753 &write_opts,
1754 )
1755 .await;
1756 assert_eq!(200, info.num_rows);
1757 assert!(info.file_size > 0);
1758 assert!(info.index_metadata.file_size > 0);
1759
1760 let handle = create_file_handle_from_sst_info(&info, &metadata);
1761
1762 let cache = create_test_cache();
1763
1764 let preds = vec![
1767 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1768 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1769 col("tag_1").eq(lit("d")),
1770 ];
1771 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1772 FILE_DIR.to_string(),
1773 PathType::Bare,
1774 object_store.clone(),
1775 &metadata,
1776 env.get_puffin_manager(),
1777 )
1778 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1779 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1780 .build(&preds)
1781 .unwrap()
1782 .map(Arc::new);
1783
1784 let builder = ParquetReaderBuilder::new(
1785 FILE_DIR.to_string(),
1786 PathType::Bare,
1787 handle.clone(),
1788 object_store.clone(),
1789 )
1790 .flat_format(true)
1791 .predicate(Some(Predicate::new(preds)))
1792 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1793 .cache(CacheStrategy::EnableAll(cache.clone()));
1794
1795 let mut metrics = ReaderMetrics::default();
1796 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1797
1798 assert_eq!(selection.row_group_count(), 2);
1800 assert_eq!(50, selection.get(0).unwrap().row_count());
1801 assert_eq!(50, selection.get(1).unwrap().row_count());
1802
1803 assert_eq!(metrics.filter_metrics.rg_total, 2);
1805 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1806 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1807 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1808 }
1809
1810 fn fulltext_region_metadata() -> RegionMetadata {
1813 let mut builder = RegionMetadataBuilder::new(REGION_ID);
1814 builder
1815 .push_column_metadata(ColumnMetadata {
1816 column_schema: ColumnSchema::new(
1817 "tag_0".to_string(),
1818 ConcreteDataType::string_datatype(),
1819 true,
1820 ),
1821 semantic_type: SemanticType::Tag,
1822 column_id: 0,
1823 })
1824 .push_column_metadata(ColumnMetadata {
1825 column_schema: ColumnSchema::new(
1826 "text_bloom".to_string(),
1827 ConcreteDataType::string_datatype(),
1828 true,
1829 )
1830 .with_fulltext_options(FulltextOptions {
1831 enable: true,
1832 analyzer: FulltextAnalyzer::English,
1833 case_sensitive: false,
1834 backend: FulltextBackend::Bloom,
1835 granularity: 1,
1836 false_positive_rate_in_10000: 50,
1837 })
1838 .unwrap(),
1839 semantic_type: SemanticType::Field,
1840 column_id: 1,
1841 })
1842 .push_column_metadata(ColumnMetadata {
1843 column_schema: ColumnSchema::new(
1844 "text_tantivy".to_string(),
1845 ConcreteDataType::string_datatype(),
1846 true,
1847 )
1848 .with_fulltext_options(FulltextOptions {
1849 enable: true,
1850 analyzer: FulltextAnalyzer::English,
1851 case_sensitive: false,
1852 backend: FulltextBackend::Tantivy,
1853 granularity: 1,
1854 false_positive_rate_in_10000: 50,
1855 })
1856 .unwrap(),
1857 semantic_type: SemanticType::Field,
1858 column_id: 2,
1859 })
1860 .push_column_metadata(ColumnMetadata {
1861 column_schema: ColumnSchema::new(
1862 "field_0".to_string(),
1863 ConcreteDataType::uint64_datatype(),
1864 true,
1865 ),
1866 semantic_type: SemanticType::Field,
1867 column_id: 3,
1868 })
1869 .push_column_metadata(ColumnMetadata {
1870 column_schema: ColumnSchema::new(
1871 "ts".to_string(),
1872 ConcreteDataType::timestamp_millisecond_datatype(),
1873 false,
1874 ),
1875 semantic_type: SemanticType::Timestamp,
1876 column_id: 4,
1877 })
1878 .primary_key(vec![0]);
1879 builder.build().unwrap()
1880 }
1881
1882 fn new_fulltext_record_batch_by_range(
1884 tag: &str,
1885 text_bloom: &str,
1886 text_tantivy: &str,
1887 start: usize,
1888 end: usize,
1889 ) -> RecordBatch {
1890 assert!(end >= start);
1891 let metadata = Arc::new(fulltext_region_metadata());
1892 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1893
1894 let num_rows = end - start;
1895 let mut columns = Vec::new();
1896
1897 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1899 for _ in 0..num_rows {
1900 tag_builder.append_value(tag);
1901 }
1902 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1903
1904 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1906 columns.push(Arc::new(StringArray::from(text_bloom_values)));
1907
1908 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1910 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1911
1912 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1914 columns.push(Arc::new(UInt64Array::from(field_values)));
1915
1916 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1918 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1919
1920 let pk = new_primary_key(&[tag]);
1922 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1923 for _ in 0..num_rows {
1924 pk_builder.append(&pk).unwrap();
1925 }
1926 columns.push(Arc::new(pk_builder.finish()));
1927
1928 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1930
1931 columns.push(Arc::new(UInt8Array::from_value(
1933 OpType::Put as u8,
1934 num_rows,
1935 )));
1936
1937 RecordBatch::try_new(flat_schema, columns).unwrap()
1938 }
1939
1940 #[tokio::test]
1941 async fn test_write_flat_read_with_fulltext_index() {
1942 let mut env = TestEnv::new().await;
1943 let object_store = env.init_object_store_manager();
1944 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1945 let metadata = Arc::new(fulltext_region_metadata());
1946 let row_group_size = 50;
1947
1948 let flat_batches = vec![
1954 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
1955 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
1956 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
1957 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
1958 ];
1959
1960 let flat_source = new_flat_source_from_record_batches(flat_batches);
1961
1962 let write_opts = WriteOptions {
1963 row_group_size,
1964 ..Default::default()
1965 };
1966
1967 let indexer_builder = create_test_indexer_builder(
1968 &env,
1969 object_store.clone(),
1970 file_path.clone(),
1971 metadata.clone(),
1972 row_group_size,
1973 );
1974
1975 let mut info = write_flat_sst(
1976 object_store.clone(),
1977 metadata.clone(),
1978 indexer_builder,
1979 file_path.clone(),
1980 flat_source,
1981 &write_opts,
1982 )
1983 .await;
1984 assert_eq!(200, info.num_rows);
1985 assert!(info.file_size > 0);
1986 assert!(info.index_metadata.file_size > 0);
1987
1988 assert!(info.index_metadata.fulltext_index.index_size > 0);
1990 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
1991 info.index_metadata.fulltext_index.columns.sort_unstable();
1993 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
1994
1995 assert_eq!(
1996 (
1997 Timestamp::new_millisecond(0),
1998 Timestamp::new_millisecond(199)
1999 ),
2000 info.time_range
2001 );
2002
2003 let handle = create_file_handle_from_sst_info(&info, &metadata);
2004
2005 let cache = create_test_cache();
2006
2007 let matches_func = || {
2009 Arc::new(
2010 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2011 .provide(Default::default()),
2012 )
2013 };
2014
2015 let matches_term_func = || {
2016 Arc::new(
2017 ScalarFunctionFactory::from(
2018 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2019 )
2020 .provide(Default::default()),
2021 )
2022 };
2023
2024 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2027 args: vec![col("text_bloom"), "hello".lit()],
2028 func: matches_term_func(),
2029 })];
2030
2031 let fulltext_applier = FulltextIndexApplierBuilder::new(
2032 FILE_DIR.to_string(),
2033 PathType::Bare,
2034 object_store.clone(),
2035 env.get_puffin_manager(),
2036 &metadata,
2037 )
2038 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2039 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2040 .build(&preds)
2041 .unwrap()
2042 .map(Arc::new);
2043
2044 let builder = ParquetReaderBuilder::new(
2045 FILE_DIR.to_string(),
2046 PathType::Bare,
2047 handle.clone(),
2048 object_store.clone(),
2049 )
2050 .flat_format(true)
2051 .predicate(Some(Predicate::new(preds)))
2052 .fulltext_index_appliers([None, fulltext_applier.clone()])
2053 .cache(CacheStrategy::EnableAll(cache.clone()));
2054
2055 let mut metrics = ReaderMetrics::default();
2056 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2057
2058 assert_eq!(selection.row_group_count(), 2);
2060 assert_eq!(50, selection.get(0).unwrap().row_count());
2061 assert_eq!(50, selection.get(1).unwrap().row_count());
2062
2063 assert_eq!(metrics.filter_metrics.rg_total, 4);
2065 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2066 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2067 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2068
2069 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2072 args: vec![col("text_tantivy"), "lazy".lit()],
2073 func: matches_func(),
2074 })];
2075
2076 let fulltext_applier = FulltextIndexApplierBuilder::new(
2077 FILE_DIR.to_string(),
2078 PathType::Bare,
2079 object_store.clone(),
2080 env.get_puffin_manager(),
2081 &metadata,
2082 )
2083 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2084 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2085 .build(&preds)
2086 .unwrap()
2087 .map(Arc::new);
2088
2089 let builder = ParquetReaderBuilder::new(
2090 FILE_DIR.to_string(),
2091 PathType::Bare,
2092 handle.clone(),
2093 object_store.clone(),
2094 )
2095 .flat_format(true)
2096 .predicate(Some(Predicate::new(preds)))
2097 .fulltext_index_appliers([None, fulltext_applier.clone()])
2098 .cache(CacheStrategy::EnableAll(cache.clone()));
2099
2100 let mut metrics = ReaderMetrics::default();
2101 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2102
2103 assert_eq!(selection.row_group_count(), 2);
2105 assert_eq!(50, selection.get(2).unwrap().row_count());
2106 assert_eq!(50, selection.get(3).unwrap().row_count());
2107
2108 assert_eq!(metrics.filter_metrics.rg_total, 4);
2110 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2111 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2112 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2113 }
2114}