1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub(crate) mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub max_row_group_uncompressed_size: u64,
81 pub num_rows: usize,
83 pub num_row_groups: u64,
85 pub file_metadata: Option<Arc<ParquetMetaData>>,
87 pub index_metadata: IndexOutput,
89 pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashSet;
96 use std::sync::Arc;
97
98 use api::v1::{OpType, SemanticType};
99 use common_function::function::FunctionRef;
100 use common_function::function_factory::ScalarFunctionFactory;
101 use common_function::scalars::matches::MatchesFunction;
102 use common_function::scalars::matches_term::MatchesTermFunction;
103 use common_time::Timestamp;
104 use datafusion_common::{Column, ScalarValue};
105 use datafusion_expr::expr::ScalarFunction;
106 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107 use datatypes::arrow;
108 use datatypes::arrow::array::{
109 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110 TimestampMillisecondArray, UInt8Array, UInt64Array,
111 };
112 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113 use datatypes::prelude::ConcreteDataType;
114 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
115 use object_store::ObjectStore;
116 use parquet::arrow::AsyncArrowWriter;
117 use parquet::basic::{Compression, Encoding, ZstdLevel};
118 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
119 use parquet::file::properties::WriterProperties;
120 use store_api::codec::PrimaryKeyEncoding;
121 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
122 use store_api::region_request::PathType;
123 use store_api::storage::{ColumnSchema, RegionId};
124 use table::predicate::Predicate;
125 use tokio_util::compat::FuturesAsyncWriteCompatExt;
126
127 use super::*;
128 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
129 use crate::cache::test_util::assert_parquet_metadata_equal;
130 use crate::cache::{CacheManager, CacheStrategy, PageKey};
131 use crate::config::IndexConfig;
132 use crate::read::{BatchBuilder, BatchReader, FlatSource};
133 use crate::region::options::{IndexOptions, InvertedIndexOptions};
134 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
135 use crate::sst::file_purger::NoopFilePurger;
136 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
137 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
138 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
139 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
140 use crate::sst::parquet::format::PrimaryKeyWriteFormat;
141 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
142 use crate::sst::parquet::writer::ParquetWriter;
143 use crate::sst::{
144 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
145 };
146 use crate::test_util::sst_util::{
147 build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary,
148 new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key,
149 sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
150 sst_region_metadata_with_encoding,
151 };
152 use crate::test_util::{TestEnv, check_reader_result};
153
154 const FILE_DIR: &str = "/";
155 const REGION_ID: RegionId = RegionId::new(0, 0);
156
157 #[derive(Clone)]
158 struct FixedPathProvider {
159 region_file_id: RegionFileId,
160 }
161
162 impl FilePathProvider for FixedPathProvider {
163 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
164 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
165 }
166
167 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
168 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
169 }
170
171 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
172 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
173 }
174 }
175
176 struct NoopIndexBuilder;
177
178 #[async_trait::async_trait]
179 impl IndexerBuilder for NoopIndexBuilder {
180 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
181 Indexer::default()
182 }
183 }
184
185 #[tokio::test]
186 async fn test_write_read() {
187 let mut env = TestEnv::new().await;
188 let object_store = env.init_object_store_manager();
189 let handle = sst_file_handle(0, 1000);
190 let file_path = FixedPathProvider {
191 region_file_id: handle.file_id(),
192 };
193 let metadata = Arc::new(sst_region_metadata());
194 let source = new_source(&[
195 new_batch_by_range(&["a", "d"], 0, 60),
196 new_batch_by_range(&["b", "f"], 0, 40),
197 new_batch_by_range(&["b", "h"], 100, 200),
198 ]);
199 let write_opts = WriteOptions {
201 row_group_size: 50,
202 ..Default::default()
203 };
204
205 let mut metrics = Metrics::new(WriteType::Flush);
206 let mut writer = ParquetWriter::new_with_object_store(
207 object_store.clone(),
208 metadata.clone(),
209 IndexConfig::default(),
210 NoopIndexBuilder,
211 file_path,
212 &mut metrics,
213 )
214 .await;
215
216 let info = writer
217 .write_all(source, None, &write_opts)
218 .await
219 .unwrap()
220 .remove(0);
221 assert_eq!(200, info.num_rows);
222 assert!(info.file_size > 0);
223 assert_eq!(
224 (
225 Timestamp::new_millisecond(0),
226 Timestamp::new_millisecond(199)
227 ),
228 info.time_range
229 );
230
231 let builder = ParquetReaderBuilder::new(
232 FILE_DIR.to_string(),
233 PathType::Bare,
234 handle.clone(),
235 object_store,
236 );
237 let mut reader = builder.build().await.unwrap();
238 check_reader_result(
239 &mut reader,
240 &[
241 new_batch_by_range(&["a", "d"], 0, 50),
242 new_batch_by_range(&["a", "d"], 50, 60),
243 new_batch_by_range(&["b", "f"], 0, 40),
244 new_batch_by_range(&["b", "h"], 100, 150),
245 new_batch_by_range(&["b", "h"], 150, 200),
246 ],
247 )
248 .await;
249 }
250
251 #[tokio::test]
252 async fn test_read_with_cache() {
253 let mut env = TestEnv::new().await;
254 let object_store = env.init_object_store_manager();
255 let handle = sst_file_handle(0, 1000);
256 let metadata = Arc::new(sst_region_metadata());
257 let source = new_source(&[
258 new_batch_by_range(&["a", "d"], 0, 60),
259 new_batch_by_range(&["b", "f"], 0, 40),
260 new_batch_by_range(&["b", "h"], 100, 200),
261 ]);
262 let write_opts = WriteOptions {
264 row_group_size: 50,
265 ..Default::default()
266 };
267 let mut metrics = Metrics::new(WriteType::Flush);
269 let mut writer = ParquetWriter::new_with_object_store(
270 object_store.clone(),
271 metadata.clone(),
272 IndexConfig::default(),
273 NoopIndexBuilder,
274 FixedPathProvider {
275 region_file_id: handle.file_id(),
276 },
277 &mut metrics,
278 )
279 .await;
280
281 let sst_info = writer
282 .write_all(source, None, &write_opts)
283 .await
284 .unwrap()
285 .remove(0);
286
287 let cache = CacheStrategy::EnableAll(Arc::new(
289 CacheManager::builder()
290 .page_cache_size(64 * 1024 * 1024)
291 .build(),
292 ));
293 let builder = ParquetReaderBuilder::new(
294 FILE_DIR.to_string(),
295 PathType::Bare,
296 handle.clone(),
297 object_store,
298 )
299 .cache(cache.clone());
300 for _ in 0..3 {
301 let mut reader = builder.build().await.unwrap();
302 check_reader_result(
303 &mut reader,
304 &[
305 new_batch_by_range(&["a", "d"], 0, 50),
306 new_batch_by_range(&["a", "d"], 50, 60),
307 new_batch_by_range(&["b", "f"], 0, 40),
308 new_batch_by_range(&["b", "h"], 100, 150),
309 new_batch_by_range(&["b", "h"], 150, 200),
310 ],
311 )
312 .await;
313 }
314
315 let parquet_meta = sst_info.file_metadata.unwrap();
316 let get_ranges = |row_group_idx: usize| {
317 let row_group = parquet_meta.row_group(row_group_idx);
318 let mut ranges = Vec::with_capacity(row_group.num_columns());
319 for i in 0..row_group.num_columns() {
320 let (start, length) = row_group.column(i).byte_range();
321 ranges.push(start..start + length);
322 }
323
324 ranges
325 };
326
327 for i in 0..4 {
329 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
330 assert!(cache.get_pages(&page_key).is_some());
331 }
332 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
333 assert!(cache.get_pages(&page_key).is_none());
334 }
335
336 #[tokio::test]
337 async fn test_parquet_metadata_eq() {
338 let mut env = crate::test_util::TestEnv::new().await;
340 let object_store = env.init_object_store_manager();
341 let handle = sst_file_handle(0, 1000);
342 let metadata = Arc::new(sst_region_metadata());
343 let source = new_source(&[
344 new_batch_by_range(&["a", "d"], 0, 60),
345 new_batch_by_range(&["b", "f"], 0, 40),
346 new_batch_by_range(&["b", "h"], 100, 200),
347 ]);
348 let write_opts = WriteOptions {
349 row_group_size: 50,
350 ..Default::default()
351 };
352
353 let mut metrics = Metrics::new(WriteType::Flush);
356 let mut writer = ParquetWriter::new_with_object_store(
357 object_store.clone(),
358 metadata.clone(),
359 IndexConfig::default(),
360 NoopIndexBuilder,
361 FixedPathProvider {
362 region_file_id: handle.file_id(),
363 },
364 &mut metrics,
365 )
366 .await;
367
368 let sst_info = writer
369 .write_all(source, None, &write_opts)
370 .await
371 .unwrap()
372 .remove(0);
373 let writer_metadata = sst_info.file_metadata.unwrap();
374
375 let builder = ParquetReaderBuilder::new(
377 FILE_DIR.to_string(),
378 PathType::Bare,
379 handle.clone(),
380 object_store,
381 )
382 .page_index_policy(PageIndexPolicy::Optional);
383 let reader = builder.build().await.unwrap();
384 let reader_metadata = reader.parquet_metadata();
385
386 assert_parquet_metadata_equal(writer_metadata, reader_metadata);
387 }
388
389 #[tokio::test]
390 async fn test_read_with_tag_filter() {
391 let mut env = TestEnv::new().await;
392 let object_store = env.init_object_store_manager();
393 let handle = sst_file_handle(0, 1000);
394 let metadata = Arc::new(sst_region_metadata());
395 let source = new_source(&[
396 new_batch_by_range(&["a", "d"], 0, 60),
397 new_batch_by_range(&["b", "f"], 0, 40),
398 new_batch_by_range(&["b", "h"], 100, 200),
399 ]);
400 let write_opts = WriteOptions {
402 row_group_size: 50,
403 ..Default::default()
404 };
405 let mut metrics = Metrics::new(WriteType::Flush);
407 let mut writer = ParquetWriter::new_with_object_store(
408 object_store.clone(),
409 metadata.clone(),
410 IndexConfig::default(),
411 NoopIndexBuilder,
412 FixedPathProvider {
413 region_file_id: handle.file_id(),
414 },
415 &mut metrics,
416 )
417 .await;
418 writer
419 .write_all(source, None, &write_opts)
420 .await
421 .unwrap()
422 .remove(0);
423
424 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
426 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
427 op: Operator::Eq,
428 right: Box::new("a".lit()),
429 })]));
430
431 let builder = ParquetReaderBuilder::new(
432 FILE_DIR.to_string(),
433 PathType::Bare,
434 handle.clone(),
435 object_store,
436 )
437 .predicate(predicate);
438 let mut reader = builder.build().await.unwrap();
439 check_reader_result(
440 &mut reader,
441 &[
442 new_batch_by_range(&["a", "d"], 0, 50),
443 new_batch_by_range(&["a", "d"], 50, 60),
444 ],
445 )
446 .await;
447 }
448
449 #[tokio::test]
450 async fn test_read_empty_batch() {
451 let mut env = TestEnv::new().await;
452 let object_store = env.init_object_store_manager();
453 let handle = sst_file_handle(0, 1000);
454 let metadata = Arc::new(sst_region_metadata());
455 let source = new_source(&[
456 new_batch_by_range(&["a", "z"], 0, 0),
457 new_batch_by_range(&["a", "z"], 100, 100),
458 new_batch_by_range(&["a", "z"], 200, 230),
459 ]);
460 let write_opts = WriteOptions {
462 row_group_size: 50,
463 ..Default::default()
464 };
465 let mut metrics = Metrics::new(WriteType::Flush);
467 let mut writer = ParquetWriter::new_with_object_store(
468 object_store.clone(),
469 metadata.clone(),
470 IndexConfig::default(),
471 NoopIndexBuilder,
472 FixedPathProvider {
473 region_file_id: handle.file_id(),
474 },
475 &mut metrics,
476 )
477 .await;
478 writer
479 .write_all(source, None, &write_opts)
480 .await
481 .unwrap()
482 .remove(0);
483
484 let builder = ParquetReaderBuilder::new(
485 FILE_DIR.to_string(),
486 PathType::Bare,
487 handle.clone(),
488 object_store,
489 );
490 let mut reader = builder.build().await.unwrap();
491 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
492 }
493
494 #[tokio::test]
495 async fn test_read_with_field_filter() {
496 let mut env = TestEnv::new().await;
497 let object_store = env.init_object_store_manager();
498 let handle = sst_file_handle(0, 1000);
499 let metadata = Arc::new(sst_region_metadata());
500 let source = new_source(&[
501 new_batch_by_range(&["a", "d"], 0, 60),
502 new_batch_by_range(&["b", "f"], 0, 40),
503 new_batch_by_range(&["b", "h"], 100, 200),
504 ]);
505 let write_opts = WriteOptions {
507 row_group_size: 50,
508 ..Default::default()
509 };
510 let mut metrics = Metrics::new(WriteType::Flush);
512 let mut writer = ParquetWriter::new_with_object_store(
513 object_store.clone(),
514 metadata.clone(),
515 IndexConfig::default(),
516 NoopIndexBuilder,
517 FixedPathProvider {
518 region_file_id: handle.file_id(),
519 },
520 &mut metrics,
521 )
522 .await;
523
524 writer
525 .write_all(source, None, &write_opts)
526 .await
527 .unwrap()
528 .remove(0);
529
530 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
532 left: Box::new(Expr::Column(Column::from_name("field_0"))),
533 op: Operator::GtEq,
534 right: Box::new(150u64.lit()),
535 })]));
536
537 let builder = ParquetReaderBuilder::new(
538 FILE_DIR.to_string(),
539 PathType::Bare,
540 handle.clone(),
541 object_store,
542 )
543 .predicate(predicate);
544 let mut reader = builder.build().await.unwrap();
545 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
546 }
547
548 #[tokio::test]
549 async fn test_read_large_binary() {
550 let mut env = TestEnv::new().await;
551 let object_store = env.init_object_store_manager();
552 let handle = sst_file_handle(0, 1000);
553 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
554
555 let write_opts = WriteOptions {
556 row_group_size: 50,
557 ..Default::default()
558 };
559
560 let metadata = build_test_binary_test_region_metadata();
561 let json = metadata.to_json().unwrap();
562 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
563
564 let props_builder = WriterProperties::builder()
565 .set_key_value_metadata(Some(vec![key_value_meta]))
566 .set_compression(Compression::ZSTD(ZstdLevel::default()))
567 .set_encoding(Encoding::PLAIN)
568 .set_max_row_group_size(write_opts.row_group_size);
569
570 let writer_props = props_builder.build();
571
572 let write_format = PrimaryKeyWriteFormat::new(metadata);
573 let fields: Vec<_> = write_format
574 .arrow_schema()
575 .fields()
576 .into_iter()
577 .map(|field| {
578 let data_type = field.data_type().clone();
579 if data_type == DataType::Binary {
580 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
581 } else {
582 Field::new(field.name(), data_type, field.is_nullable())
583 }
584 })
585 .collect();
586
587 let arrow_schema = Arc::new(Schema::new(fields));
588
589 assert_eq!(
591 &DataType::LargeBinary,
592 arrow_schema.field_with_name("field_0").unwrap().data_type()
593 );
594 let mut writer = AsyncArrowWriter::try_new(
595 object_store
596 .writer_with(&file_path)
597 .concurrent(DEFAULT_WRITE_CONCURRENCY)
598 .await
599 .map(|w| w.into_futures_async_write().compat_write())
600 .unwrap(),
601 arrow_schema.clone(),
602 Some(writer_props),
603 )
604 .unwrap();
605
606 let batch = new_batch_with_binary(&["a"], 0, 60);
607 let arrow_batch = write_format.convert_batch(&batch).unwrap();
608 let arrays: Vec<_> = arrow_batch
609 .columns()
610 .iter()
611 .map(|array| {
612 let data_type = array.data_type().clone();
613 if data_type == DataType::Binary {
614 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
615 } else {
616 array.clone()
617 }
618 })
619 .collect();
620 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
621
622 writer.write(&result).await.unwrap();
623 writer.close().await.unwrap();
624
625 let builder = ParquetReaderBuilder::new(
626 FILE_DIR.to_string(),
627 PathType::Bare,
628 handle.clone(),
629 object_store,
630 );
631 let mut reader = builder.build().await.unwrap();
632 check_reader_result(
633 &mut reader,
634 &[
635 new_batch_with_binary(&["a"], 0, 50),
636 new_batch_with_binary(&["a"], 50, 60),
637 ],
638 )
639 .await;
640 }
641
642 #[tokio::test]
643 async fn test_write_multiple_files() {
644 common_telemetry::init_default_ut_logging();
645 let mut env = TestEnv::new().await;
647 let object_store = env.init_object_store_manager();
648 let metadata = Arc::new(sst_region_metadata());
649 let batches = &[
650 new_batch_by_range(&["a", "d"], 0, 1000),
651 new_batch_by_range(&["b", "f"], 0, 1000),
652 new_batch_by_range(&["c", "g"], 0, 1000),
653 new_batch_by_range(&["b", "h"], 100, 200),
654 new_batch_by_range(&["b", "h"], 200, 300),
655 new_batch_by_range(&["b", "h"], 300, 1000),
656 ];
657 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
658
659 let source = new_source(batches);
660 let write_opts = WriteOptions {
661 row_group_size: 50,
662 max_file_size: Some(1024 * 16),
663 ..Default::default()
664 };
665
666 let path_provider = RegionFilePathFactory {
667 table_dir: "test".to_string(),
668 path_type: PathType::Bare,
669 };
670 let mut metrics = Metrics::new(WriteType::Flush);
671 let mut writer = ParquetWriter::new_with_object_store(
672 object_store.clone(),
673 metadata.clone(),
674 IndexConfig::default(),
675 NoopIndexBuilder,
676 path_provider,
677 &mut metrics,
678 )
679 .await;
680
681 let files = writer.write_all(source, None, &write_opts).await.unwrap();
682 assert_eq!(2, files.len());
683
684 let mut rows_read = 0;
685 for f in &files {
686 let file_handle = sst_file_handle_with_file_id(
687 f.file_id,
688 f.time_range.0.value(),
689 f.time_range.1.value(),
690 );
691 let builder = ParquetReaderBuilder::new(
692 "test".to_string(),
693 PathType::Bare,
694 file_handle,
695 object_store.clone(),
696 );
697 let mut reader = builder.build().await.unwrap();
698 while let Some(batch) = reader.next_batch().await.unwrap() {
699 rows_read += batch.num_rows();
700 }
701 }
702 assert_eq!(total_rows, rows_read);
703 }
704
705 #[tokio::test]
706 async fn test_write_read_with_index() {
707 let mut env = TestEnv::new().await;
708 let object_store = env.init_object_store_manager();
709 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
710 let metadata = Arc::new(sst_region_metadata());
711 let row_group_size = 50;
712
713 let source = new_source(&[
714 new_batch_by_range(&["a", "d"], 0, 20),
715 new_batch_by_range(&["b", "d"], 0, 20),
716 new_batch_by_range(&["c", "d"], 0, 20),
717 new_batch_by_range(&["c", "f"], 0, 40),
718 new_batch_by_range(&["c", "h"], 100, 200),
719 ]);
720 let write_opts = WriteOptions {
722 row_group_size,
723 ..Default::default()
724 };
725
726 let puffin_manager = env
727 .get_puffin_manager()
728 .build(object_store.clone(), file_path.clone());
729 let intermediate_manager = env.get_intermediate_manager();
730
731 let indexer_builder = IndexerBuilderImpl {
732 build_type: IndexBuildType::Flush,
733 metadata: metadata.clone(),
734 row_group_size,
735 puffin_manager,
736 write_cache_enabled: false,
737 intermediate_manager,
738 index_options: IndexOptions {
739 inverted_index: InvertedIndexOptions {
740 segment_row_count: 1,
741 ..Default::default()
742 },
743 },
744 inverted_index_config: Default::default(),
745 fulltext_index_config: Default::default(),
746 bloom_filter_index_config: Default::default(),
747 #[cfg(feature = "vector_index")]
748 vector_index_config: Default::default(),
749 };
750
751 let mut metrics = Metrics::new(WriteType::Flush);
752 let mut writer = ParquetWriter::new_with_object_store(
753 object_store.clone(),
754 metadata.clone(),
755 IndexConfig::default(),
756 indexer_builder,
757 file_path.clone(),
758 &mut metrics,
759 )
760 .await;
761
762 let info = writer
763 .write_all(source, None, &write_opts)
764 .await
765 .unwrap()
766 .remove(0);
767 assert_eq!(200, info.num_rows);
768 assert!(info.file_size > 0);
769 assert!(info.index_metadata.file_size > 0);
770
771 assert!(info.index_metadata.inverted_index.index_size > 0);
772 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
773 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
774
775 assert!(info.index_metadata.bloom_filter.index_size > 0);
776 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
777 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
778
779 assert_eq!(
780 (
781 Timestamp::new_millisecond(0),
782 Timestamp::new_millisecond(199)
783 ),
784 info.time_range
785 );
786
787 let handle = FileHandle::new(
788 FileMeta {
789 region_id: metadata.region_id,
790 file_id: info.file_id,
791 time_range: info.time_range,
792 level: 0,
793 file_size: info.file_size,
794 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
795 available_indexes: info.index_metadata.build_available_indexes(),
796 indexes: info.index_metadata.build_indexes(),
797 index_file_size: info.index_metadata.file_size,
798 index_version: 0,
799 num_row_groups: info.num_row_groups,
800 num_rows: info.num_rows as u64,
801 sequence: None,
802 partition_expr: match &metadata.partition_expr {
803 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
804 .expect("partition expression should be valid JSON"),
805 None => None,
806 },
807 num_series: 0,
808 },
809 Arc::new(NoopFilePurger),
810 );
811
812 let cache = Arc::new(
813 CacheManager::builder()
814 .index_result_cache_size(1024 * 1024)
815 .index_metadata_size(1024 * 1024)
816 .index_content_page_size(1024 * 1024)
817 .index_content_size(1024 * 1024)
818 .puffin_metadata_size(1024 * 1024)
819 .build(),
820 );
821 let index_result_cache = cache.index_result_cache().unwrap();
822
823 let build_inverted_index_applier = |exprs: &[Expr]| {
824 InvertedIndexApplierBuilder::new(
825 FILE_DIR.to_string(),
826 PathType::Bare,
827 object_store.clone(),
828 &metadata,
829 HashSet::from_iter([0]),
830 env.get_puffin_manager(),
831 )
832 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
833 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
834 .build(exprs)
835 .unwrap()
836 .map(Arc::new)
837 };
838
839 let build_bloom_filter_applier = |exprs: &[Expr]| {
840 BloomFilterIndexApplierBuilder::new(
841 FILE_DIR.to_string(),
842 PathType::Bare,
843 object_store.clone(),
844 &metadata,
845 env.get_puffin_manager(),
846 )
847 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
848 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
849 .build(exprs)
850 .unwrap()
851 .map(Arc::new)
852 };
853
854 let preds = vec![col("tag_0").eq(lit("b"))];
871 let inverted_index_applier = build_inverted_index_applier(&preds);
872 let bloom_filter_applier = build_bloom_filter_applier(&preds);
873
874 let builder = ParquetReaderBuilder::new(
875 FILE_DIR.to_string(),
876 PathType::Bare,
877 handle.clone(),
878 object_store.clone(),
879 )
880 .predicate(Some(Predicate::new(preds)))
881 .inverted_index_appliers([inverted_index_applier.clone(), None])
882 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
883 .cache(CacheStrategy::EnableAll(cache.clone()));
884
885 let mut metrics = ReaderMetrics::default();
886 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
887 let mut reader = ParquetReader::new(Arc::new(context), selection)
888 .await
889 .unwrap();
890 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
891
892 assert_eq!(metrics.filter_metrics.rg_total, 4);
893 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
894 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
895 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
896 let cached = index_result_cache
897 .get(
898 inverted_index_applier.unwrap().predicate_key(),
899 handle.file_id().file_id(),
900 )
901 .unwrap();
902 assert!(cached.contains_row_group(0));
904 assert!(cached.contains_row_group(1));
905 assert!(cached.contains_row_group(2));
906 assert!(cached.contains_row_group(3));
907
908 let preds = vec![
923 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
924 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
925 col("tag_1").eq(lit("d")),
926 ];
927 let inverted_index_applier = build_inverted_index_applier(&preds);
928 let bloom_filter_applier = build_bloom_filter_applier(&preds);
929
930 let builder = ParquetReaderBuilder::new(
931 FILE_DIR.to_string(),
932 PathType::Bare,
933 handle.clone(),
934 object_store.clone(),
935 )
936 .predicate(Some(Predicate::new(preds)))
937 .inverted_index_appliers([inverted_index_applier.clone(), None])
938 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
939 .cache(CacheStrategy::EnableAll(cache.clone()));
940
941 let mut metrics = ReaderMetrics::default();
942 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
943 let mut reader = ParquetReader::new(Arc::new(context), selection)
944 .await
945 .unwrap();
946 check_reader_result(&mut reader, &[]).await;
947
948 assert_eq!(metrics.filter_metrics.rg_total, 4);
949 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
950 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
951 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
952 let cached = index_result_cache
953 .get(
954 bloom_filter_applier.unwrap().predicate_key(),
955 handle.file_id().file_id(),
956 )
957 .unwrap();
958 assert!(cached.contains_row_group(2));
959 assert!(cached.contains_row_group(3));
960 assert!(!cached.contains_row_group(0));
961 assert!(!cached.contains_row_group(1));
962
963 let preds = vec![col("tag_1").eq(lit("d"))];
984 let inverted_index_applier = build_inverted_index_applier(&preds);
985 let bloom_filter_applier = build_bloom_filter_applier(&preds);
986
987 let builder = ParquetReaderBuilder::new(
988 FILE_DIR.to_string(),
989 PathType::Bare,
990 handle.clone(),
991 object_store.clone(),
992 )
993 .predicate(Some(Predicate::new(preds)))
994 .inverted_index_appliers([inverted_index_applier.clone(), None])
995 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
996 .cache(CacheStrategy::EnableAll(cache.clone()));
997
998 let mut metrics = ReaderMetrics::default();
999 let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1000 let mut reader = ParquetReader::new(Arc::new(context), selection)
1001 .await
1002 .unwrap();
1003 check_reader_result(
1004 &mut reader,
1005 &[
1006 new_batch_by_range(&["a", "d"], 0, 20),
1007 new_batch_by_range(&["b", "d"], 0, 20),
1008 new_batch_by_range(&["c", "d"], 0, 10),
1009 new_batch_by_range(&["c", "d"], 10, 20),
1010 ],
1011 )
1012 .await;
1013
1014 assert_eq!(metrics.filter_metrics.rg_total, 4);
1015 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1016 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1017 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1018 let cached = index_result_cache
1019 .get(
1020 bloom_filter_applier.unwrap().predicate_key(),
1021 handle.file_id().file_id(),
1022 )
1023 .unwrap();
1024 assert!(cached.contains_row_group(0));
1025 assert!(cached.contains_row_group(1));
1026 assert!(cached.contains_row_group(2));
1027 assert!(cached.contains_row_group(3));
1028 }
1029
1030 fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1033 assert!(end >= start);
1034 let metadata = Arc::new(sst_region_metadata());
1035 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1036
1037 let num_rows = end - start;
1038 let mut columns = Vec::new();
1039
1040 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1042 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1043
1044 for _ in 0..num_rows {
1045 tag_0_builder.append_value(tags[0]);
1046 tag_1_builder.append_value(tags[1]);
1047 }
1048
1049 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1050 columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
1051
1052 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1054 columns.push(Arc::new(UInt64Array::from(field_values)));
1055
1056 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1058 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1059
1060 let pk = new_primary_key(tags);
1062 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1063 for _ in 0..num_rows {
1064 pk_builder.append(&pk).unwrap();
1065 }
1066 columns.push(Arc::new(pk_builder.finish()));
1067
1068 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1070
1071 columns.push(Arc::new(UInt8Array::from_value(
1073 OpType::Put as u8,
1074 num_rows,
1075 )));
1076
1077 RecordBatch::try_new(flat_schema, columns).unwrap()
1078 }
1079
1080 fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
1082 FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
1083 }
1084
1085 fn new_record_batch_by_range_sparse(
1088 tags: &[&str],
1089 start: usize,
1090 end: usize,
1091 metadata: &Arc<RegionMetadata>,
1092 ) -> RecordBatch {
1093 assert!(end >= start);
1094 let flat_schema = to_flat_sst_arrow_schema(
1095 metadata,
1096 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1097 );
1098
1099 let num_rows = end - start;
1100 let mut columns: Vec<ArrayRef> = Vec::new();
1101
1102 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1106 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1107
1108 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1110 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1111
1112 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1116
1117 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1118 for _ in 0..num_rows {
1119 pk_builder.append(&pk).unwrap();
1120 }
1121 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1122
1123 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1125
1126 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1128
1129 RecordBatch::try_new(flat_schema, columns).unwrap()
1130 }
1131
1132 fn create_test_indexer_builder(
1134 env: &TestEnv,
1135 object_store: ObjectStore,
1136 file_path: RegionFilePathFactory,
1137 metadata: Arc<RegionMetadata>,
1138 row_group_size: usize,
1139 ) -> IndexerBuilderImpl {
1140 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1141 let intermediate_manager = env.get_intermediate_manager();
1142
1143 IndexerBuilderImpl {
1144 build_type: IndexBuildType::Flush,
1145 metadata,
1146 row_group_size,
1147 puffin_manager,
1148 write_cache_enabled: false,
1149 intermediate_manager,
1150 index_options: IndexOptions {
1151 inverted_index: InvertedIndexOptions {
1152 segment_row_count: 1,
1153 ..Default::default()
1154 },
1155 },
1156 inverted_index_config: Default::default(),
1157 fulltext_index_config: Default::default(),
1158 bloom_filter_index_config: Default::default(),
1159 #[cfg(feature = "vector_index")]
1160 vector_index_config: Default::default(),
1161 }
1162 }
1163
1164 async fn write_flat_sst(
1166 object_store: ObjectStore,
1167 metadata: Arc<RegionMetadata>,
1168 indexer_builder: IndexerBuilderImpl,
1169 file_path: RegionFilePathFactory,
1170 flat_source: FlatSource,
1171 write_opts: &WriteOptions,
1172 ) -> SstInfo {
1173 let mut metrics = Metrics::new(WriteType::Flush);
1174 let mut writer = ParquetWriter::new_with_object_store(
1175 object_store,
1176 metadata,
1177 IndexConfig::default(),
1178 indexer_builder,
1179 file_path,
1180 &mut metrics,
1181 )
1182 .await;
1183
1184 writer
1185 .write_all_flat(flat_source, write_opts)
1186 .await
1187 .unwrap()
1188 .remove(0)
1189 }
1190
1191 fn create_file_handle_from_sst_info(
1193 info: &SstInfo,
1194 metadata: &Arc<RegionMetadata>,
1195 ) -> FileHandle {
1196 FileHandle::new(
1197 FileMeta {
1198 region_id: metadata.region_id,
1199 file_id: info.file_id,
1200 time_range: info.time_range,
1201 level: 0,
1202 file_size: info.file_size,
1203 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1204 available_indexes: info.index_metadata.build_available_indexes(),
1205 indexes: info.index_metadata.build_indexes(),
1206 index_file_size: info.index_metadata.file_size,
1207 index_version: 0,
1208 num_row_groups: info.num_row_groups,
1209 num_rows: info.num_rows as u64,
1210 sequence: None,
1211 partition_expr: match &metadata.partition_expr {
1212 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1213 .expect("partition expression should be valid JSON"),
1214 None => None,
1215 },
1216 num_series: 0,
1217 },
1218 Arc::new(NoopFilePurger),
1219 )
1220 }
1221
1222 fn create_test_cache() -> Arc<CacheManager> {
1224 Arc::new(
1225 CacheManager::builder()
1226 .index_result_cache_size(1024 * 1024)
1227 .index_metadata_size(1024 * 1024)
1228 .index_content_page_size(1024 * 1024)
1229 .index_content_size(1024 * 1024)
1230 .puffin_metadata_size(1024 * 1024)
1231 .build(),
1232 )
1233 }
1234
1235 #[tokio::test]
1236 async fn test_write_flat_with_index() {
1237 let mut env = TestEnv::new().await;
1238 let object_store = env.init_object_store_manager();
1239 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1240 let metadata = Arc::new(sst_region_metadata());
1241 let row_group_size = 50;
1242
1243 let flat_batches = vec![
1245 new_record_batch_by_range(&["a", "d"], 0, 20),
1246 new_record_batch_by_range(&["b", "d"], 0, 20),
1247 new_record_batch_by_range(&["c", "d"], 0, 20),
1248 new_record_batch_by_range(&["c", "f"], 0, 40),
1249 new_record_batch_by_range(&["c", "h"], 100, 200),
1250 ];
1251
1252 let flat_source = new_flat_source_from_record_batches(flat_batches);
1253
1254 let write_opts = WriteOptions {
1255 row_group_size,
1256 ..Default::default()
1257 };
1258
1259 let puffin_manager = env
1260 .get_puffin_manager()
1261 .build(object_store.clone(), file_path.clone());
1262 let intermediate_manager = env.get_intermediate_manager();
1263
1264 let indexer_builder = IndexerBuilderImpl {
1265 build_type: IndexBuildType::Flush,
1266 metadata: metadata.clone(),
1267 row_group_size,
1268 puffin_manager,
1269 write_cache_enabled: false,
1270 intermediate_manager,
1271 index_options: IndexOptions {
1272 inverted_index: InvertedIndexOptions {
1273 segment_row_count: 1,
1274 ..Default::default()
1275 },
1276 },
1277 inverted_index_config: Default::default(),
1278 fulltext_index_config: Default::default(),
1279 bloom_filter_index_config: Default::default(),
1280 #[cfg(feature = "vector_index")]
1281 vector_index_config: Default::default(),
1282 };
1283
1284 let mut metrics = Metrics::new(WriteType::Flush);
1285 let mut writer = ParquetWriter::new_with_object_store(
1286 object_store.clone(),
1287 metadata.clone(),
1288 IndexConfig::default(),
1289 indexer_builder,
1290 file_path.clone(),
1291 &mut metrics,
1292 )
1293 .await;
1294
1295 let info = writer
1296 .write_all_flat(flat_source, &write_opts)
1297 .await
1298 .unwrap()
1299 .remove(0);
1300 assert_eq!(200, info.num_rows);
1301 assert!(info.file_size > 0);
1302 assert!(info.index_metadata.file_size > 0);
1303
1304 assert!(info.index_metadata.inverted_index.index_size > 0);
1305 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1306 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1307
1308 assert!(info.index_metadata.bloom_filter.index_size > 0);
1309 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1310 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1311
1312 assert_eq!(
1313 (
1314 Timestamp::new_millisecond(0),
1315 Timestamp::new_millisecond(199)
1316 ),
1317 info.time_range
1318 );
1319 }
1320
1321 #[tokio::test]
1322 async fn test_read_with_override_sequence() {
1323 let mut env = TestEnv::new().await;
1324 let object_store = env.init_object_store_manager();
1325 let handle = sst_file_handle(0, 1000);
1326 let file_path = FixedPathProvider {
1327 region_file_id: handle.file_id(),
1328 };
1329 let metadata = Arc::new(sst_region_metadata());
1330
1331 let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
1333 let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
1334 let source = new_source(&[batch1, batch2]);
1335
1336 let write_opts = WriteOptions {
1337 row_group_size: 50,
1338 ..Default::default()
1339 };
1340
1341 let mut metrics = Metrics::new(WriteType::Flush);
1342 let mut writer = ParquetWriter::new_with_object_store(
1343 object_store.clone(),
1344 metadata.clone(),
1345 IndexConfig::default(),
1346 NoopIndexBuilder,
1347 file_path,
1348 &mut metrics,
1349 )
1350 .await;
1351
1352 writer
1353 .write_all(source, None, &write_opts)
1354 .await
1355 .unwrap()
1356 .remove(0);
1357
1358 let builder = ParquetReaderBuilder::new(
1360 FILE_DIR.to_string(),
1361 PathType::Bare,
1362 handle.clone(),
1363 object_store.clone(),
1364 );
1365 let mut reader = builder.build().await.unwrap();
1366 let mut normal_batches = Vec::new();
1367 while let Some(batch) = reader.next_batch().await.unwrap() {
1368 normal_batches.push(batch);
1369 }
1370
1371 let custom_sequence = 12345u64;
1373 let file_meta = handle.meta_ref();
1374 let mut override_file_meta = file_meta.clone();
1375 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1376 let override_handle = FileHandle::new(
1377 override_file_meta,
1378 Arc::new(crate::sst::file_purger::NoopFilePurger),
1379 );
1380
1381 let builder = ParquetReaderBuilder::new(
1382 FILE_DIR.to_string(),
1383 PathType::Bare,
1384 override_handle,
1385 object_store.clone(),
1386 );
1387 let mut reader = builder.build().await.unwrap();
1388 let mut override_batches = Vec::new();
1389 while let Some(batch) = reader.next_batch().await.unwrap() {
1390 override_batches.push(batch);
1391 }
1392
1393 assert_eq!(normal_batches.len(), override_batches.len());
1395 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1396 let expected_batch = {
1398 let num_rows = normal.num_rows();
1399 let mut builder = BatchBuilder::from(normal);
1400 builder
1401 .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
1402 .unwrap();
1403
1404 builder.build().unwrap()
1405 };
1406
1407 assert_eq!(*override_batch, expected_batch);
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn test_write_flat_read_with_inverted_index() {
1414 let mut env = TestEnv::new().await;
1415 let object_store = env.init_object_store_manager();
1416 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1417 let metadata = Arc::new(sst_region_metadata());
1418 let row_group_size = 100;
1419
1420 let flat_batches = vec![
1428 new_record_batch_by_range(&["a", "d"], 0, 50),
1429 new_record_batch_by_range(&["b", "d"], 50, 100),
1430 new_record_batch_by_range(&["c", "d"], 100, 150),
1431 new_record_batch_by_range(&["c", "f"], 150, 200),
1432 ];
1433
1434 let flat_source = new_flat_source_from_record_batches(flat_batches);
1435
1436 let write_opts = WriteOptions {
1437 row_group_size,
1438 ..Default::default()
1439 };
1440
1441 let indexer_builder = create_test_indexer_builder(
1442 &env,
1443 object_store.clone(),
1444 file_path.clone(),
1445 metadata.clone(),
1446 row_group_size,
1447 );
1448
1449 let info = write_flat_sst(
1450 object_store.clone(),
1451 metadata.clone(),
1452 indexer_builder,
1453 file_path.clone(),
1454 flat_source,
1455 &write_opts,
1456 )
1457 .await;
1458 assert_eq!(200, info.num_rows);
1459 assert!(info.file_size > 0);
1460 assert!(info.index_metadata.file_size > 0);
1461
1462 let handle = create_file_handle_from_sst_info(&info, &metadata);
1463
1464 let cache = create_test_cache();
1465
1466 let preds = vec![col("tag_0").eq(lit("b"))];
1469 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1470 FILE_DIR.to_string(),
1471 PathType::Bare,
1472 object_store.clone(),
1473 &metadata,
1474 HashSet::from_iter([0]),
1475 env.get_puffin_manager(),
1476 )
1477 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1478 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1479 .build(&preds)
1480 .unwrap()
1481 .map(Arc::new);
1482
1483 let builder = ParquetReaderBuilder::new(
1484 FILE_DIR.to_string(),
1485 PathType::Bare,
1486 handle.clone(),
1487 object_store.clone(),
1488 )
1489 .flat_format(true)
1490 .predicate(Some(Predicate::new(preds)))
1491 .inverted_index_appliers([inverted_index_applier.clone(), None])
1492 .cache(CacheStrategy::EnableAll(cache.clone()));
1493
1494 let mut metrics = ReaderMetrics::default();
1495 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1496
1497 assert_eq!(selection.row_group_count(), 1);
1499 assert_eq!(50, selection.get(0).unwrap().row_count());
1500
1501 assert_eq!(metrics.filter_metrics.rg_total, 2);
1503 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1504 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1505 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1506 }
1507
1508 #[tokio::test]
1509 async fn test_write_flat_read_with_bloom_filter() {
1510 let mut env = TestEnv::new().await;
1511 let object_store = env.init_object_store_manager();
1512 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1513 let metadata = Arc::new(sst_region_metadata());
1514 let row_group_size = 100;
1515
1516 let flat_batches = vec![
1524 new_record_batch_by_range(&["a", "d"], 0, 50),
1525 new_record_batch_by_range(&["b", "e"], 50, 100),
1526 new_record_batch_by_range(&["c", "d"], 100, 150),
1527 new_record_batch_by_range(&["c", "f"], 150, 200),
1528 ];
1529
1530 let flat_source = new_flat_source_from_record_batches(flat_batches);
1531
1532 let write_opts = WriteOptions {
1533 row_group_size,
1534 ..Default::default()
1535 };
1536
1537 let indexer_builder = create_test_indexer_builder(
1538 &env,
1539 object_store.clone(),
1540 file_path.clone(),
1541 metadata.clone(),
1542 row_group_size,
1543 );
1544
1545 let info = write_flat_sst(
1546 object_store.clone(),
1547 metadata.clone(),
1548 indexer_builder,
1549 file_path.clone(),
1550 flat_source,
1551 &write_opts,
1552 )
1553 .await;
1554 assert_eq!(200, info.num_rows);
1555 assert!(info.file_size > 0);
1556 assert!(info.index_metadata.file_size > 0);
1557
1558 let handle = create_file_handle_from_sst_info(&info, &metadata);
1559
1560 let cache = create_test_cache();
1561
1562 let preds = vec![
1565 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1566 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1567 col("tag_1").eq(lit("d")),
1568 ];
1569 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1570 FILE_DIR.to_string(),
1571 PathType::Bare,
1572 object_store.clone(),
1573 &metadata,
1574 env.get_puffin_manager(),
1575 )
1576 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1577 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1578 .build(&preds)
1579 .unwrap()
1580 .map(Arc::new);
1581
1582 let builder = ParquetReaderBuilder::new(
1583 FILE_DIR.to_string(),
1584 PathType::Bare,
1585 handle.clone(),
1586 object_store.clone(),
1587 )
1588 .flat_format(true)
1589 .predicate(Some(Predicate::new(preds)))
1590 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1591 .cache(CacheStrategy::EnableAll(cache.clone()));
1592
1593 let mut metrics = ReaderMetrics::default();
1594 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1595
1596 assert_eq!(selection.row_group_count(), 2);
1598 assert_eq!(50, selection.get(0).unwrap().row_count());
1599 assert_eq!(50, selection.get(1).unwrap().row_count());
1600
1601 assert_eq!(metrics.filter_metrics.rg_total, 2);
1603 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1604 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1605 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1606 }
1607
1608 #[tokio::test]
1609 async fn test_write_flat_read_with_inverted_index_sparse() {
1610 common_telemetry::init_default_ut_logging();
1611
1612 let mut env = TestEnv::new().await;
1613 let object_store = env.init_object_store_manager();
1614 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1615 let metadata = Arc::new(sst_region_metadata_with_encoding(
1616 PrimaryKeyEncoding::Sparse,
1617 ));
1618 let row_group_size = 100;
1619
1620 let flat_batches = vec![
1628 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1629 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1630 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1631 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1632 ];
1633
1634 let flat_source = new_flat_source_from_record_batches(flat_batches);
1635
1636 let write_opts = WriteOptions {
1637 row_group_size,
1638 ..Default::default()
1639 };
1640
1641 let indexer_builder = create_test_indexer_builder(
1642 &env,
1643 object_store.clone(),
1644 file_path.clone(),
1645 metadata.clone(),
1646 row_group_size,
1647 );
1648
1649 let info = write_flat_sst(
1650 object_store.clone(),
1651 metadata.clone(),
1652 indexer_builder,
1653 file_path.clone(),
1654 flat_source,
1655 &write_opts,
1656 )
1657 .await;
1658 assert_eq!(200, info.num_rows);
1659 assert!(info.file_size > 0);
1660 assert!(info.index_metadata.file_size > 0);
1661
1662 let handle = create_file_handle_from_sst_info(&info, &metadata);
1663
1664 let cache = create_test_cache();
1665
1666 let preds = vec![col("tag_0").eq(lit("b"))];
1669 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1670 FILE_DIR.to_string(),
1671 PathType::Bare,
1672 object_store.clone(),
1673 &metadata,
1674 HashSet::from_iter([0]),
1675 env.get_puffin_manager(),
1676 )
1677 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1678 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1679 .build(&preds)
1680 .unwrap()
1681 .map(Arc::new);
1682
1683 let builder = ParquetReaderBuilder::new(
1684 FILE_DIR.to_string(),
1685 PathType::Bare,
1686 handle.clone(),
1687 object_store.clone(),
1688 )
1689 .flat_format(true)
1690 .predicate(Some(Predicate::new(preds)))
1691 .inverted_index_appliers([inverted_index_applier.clone(), None])
1692 .cache(CacheStrategy::EnableAll(cache.clone()));
1693
1694 let mut metrics = ReaderMetrics::default();
1695 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1696
1697 assert_eq!(selection.row_group_count(), 1);
1699 assert_eq!(50, selection.get(0).unwrap().row_count());
1700
1701 assert_eq!(metrics.filter_metrics.rg_total, 2);
1705 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1707 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1708 }
1709
1710 #[tokio::test]
1711 async fn test_write_flat_read_with_bloom_filter_sparse() {
1712 let mut env = TestEnv::new().await;
1713 let object_store = env.init_object_store_manager();
1714 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1715 let metadata = Arc::new(sst_region_metadata_with_encoding(
1716 PrimaryKeyEncoding::Sparse,
1717 ));
1718 let row_group_size = 100;
1719
1720 let flat_batches = vec![
1728 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1729 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1730 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1731 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1732 ];
1733
1734 let flat_source = new_flat_source_from_record_batches(flat_batches);
1735
1736 let write_opts = WriteOptions {
1737 row_group_size,
1738 ..Default::default()
1739 };
1740
1741 let indexer_builder = create_test_indexer_builder(
1742 &env,
1743 object_store.clone(),
1744 file_path.clone(),
1745 metadata.clone(),
1746 row_group_size,
1747 );
1748
1749 let info = write_flat_sst(
1750 object_store.clone(),
1751 metadata.clone(),
1752 indexer_builder,
1753 file_path.clone(),
1754 flat_source,
1755 &write_opts,
1756 )
1757 .await;
1758 assert_eq!(200, info.num_rows);
1759 assert!(info.file_size > 0);
1760 assert!(info.index_metadata.file_size > 0);
1761
1762 let handle = create_file_handle_from_sst_info(&info, &metadata);
1763
1764 let cache = create_test_cache();
1765
1766 let preds = vec![
1769 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1770 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1771 col("tag_1").eq(lit("d")),
1772 ];
1773 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1774 FILE_DIR.to_string(),
1775 PathType::Bare,
1776 object_store.clone(),
1777 &metadata,
1778 env.get_puffin_manager(),
1779 )
1780 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1781 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1782 .build(&preds)
1783 .unwrap()
1784 .map(Arc::new);
1785
1786 let builder = ParquetReaderBuilder::new(
1787 FILE_DIR.to_string(),
1788 PathType::Bare,
1789 handle.clone(),
1790 object_store.clone(),
1791 )
1792 .flat_format(true)
1793 .predicate(Some(Predicate::new(preds)))
1794 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1795 .cache(CacheStrategy::EnableAll(cache.clone()));
1796
1797 let mut metrics = ReaderMetrics::default();
1798 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
1799
1800 assert_eq!(selection.row_group_count(), 2);
1802 assert_eq!(50, selection.get(0).unwrap().row_count());
1803 assert_eq!(50, selection.get(1).unwrap().row_count());
1804
1805 assert_eq!(metrics.filter_metrics.rg_total, 2);
1807 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1808 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1809 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1810 }
1811
1812 fn fulltext_region_metadata() -> RegionMetadata {
1815 let mut builder = RegionMetadataBuilder::new(REGION_ID);
1816 builder
1817 .push_column_metadata(ColumnMetadata {
1818 column_schema: ColumnSchema::new(
1819 "tag_0".to_string(),
1820 ConcreteDataType::string_datatype(),
1821 true,
1822 ),
1823 semantic_type: SemanticType::Tag,
1824 column_id: 0,
1825 })
1826 .push_column_metadata(ColumnMetadata {
1827 column_schema: ColumnSchema::new(
1828 "text_bloom".to_string(),
1829 ConcreteDataType::string_datatype(),
1830 true,
1831 )
1832 .with_fulltext_options(FulltextOptions {
1833 enable: true,
1834 analyzer: FulltextAnalyzer::English,
1835 case_sensitive: false,
1836 backend: FulltextBackend::Bloom,
1837 granularity: 1,
1838 false_positive_rate_in_10000: 50,
1839 })
1840 .unwrap(),
1841 semantic_type: SemanticType::Field,
1842 column_id: 1,
1843 })
1844 .push_column_metadata(ColumnMetadata {
1845 column_schema: ColumnSchema::new(
1846 "text_tantivy".to_string(),
1847 ConcreteDataType::string_datatype(),
1848 true,
1849 )
1850 .with_fulltext_options(FulltextOptions {
1851 enable: true,
1852 analyzer: FulltextAnalyzer::English,
1853 case_sensitive: false,
1854 backend: FulltextBackend::Tantivy,
1855 granularity: 1,
1856 false_positive_rate_in_10000: 50,
1857 })
1858 .unwrap(),
1859 semantic_type: SemanticType::Field,
1860 column_id: 2,
1861 })
1862 .push_column_metadata(ColumnMetadata {
1863 column_schema: ColumnSchema::new(
1864 "field_0".to_string(),
1865 ConcreteDataType::uint64_datatype(),
1866 true,
1867 ),
1868 semantic_type: SemanticType::Field,
1869 column_id: 3,
1870 })
1871 .push_column_metadata(ColumnMetadata {
1872 column_schema: ColumnSchema::new(
1873 "ts".to_string(),
1874 ConcreteDataType::timestamp_millisecond_datatype(),
1875 false,
1876 ),
1877 semantic_type: SemanticType::Timestamp,
1878 column_id: 4,
1879 })
1880 .primary_key(vec![0]);
1881 builder.build().unwrap()
1882 }
1883
1884 fn new_fulltext_record_batch_by_range(
1886 tag: &str,
1887 text_bloom: &str,
1888 text_tantivy: &str,
1889 start: usize,
1890 end: usize,
1891 ) -> RecordBatch {
1892 assert!(end >= start);
1893 let metadata = Arc::new(fulltext_region_metadata());
1894 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1895
1896 let num_rows = end - start;
1897 let mut columns = Vec::new();
1898
1899 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1901 for _ in 0..num_rows {
1902 tag_builder.append_value(tag);
1903 }
1904 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1905
1906 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1908 columns.push(Arc::new(StringArray::from(text_bloom_values)));
1909
1910 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1912 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1913
1914 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1916 columns.push(Arc::new(UInt64Array::from(field_values)));
1917
1918 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1920 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1921
1922 let pk = new_primary_key(&[tag]);
1924 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1925 for _ in 0..num_rows {
1926 pk_builder.append(&pk).unwrap();
1927 }
1928 columns.push(Arc::new(pk_builder.finish()));
1929
1930 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1932
1933 columns.push(Arc::new(UInt8Array::from_value(
1935 OpType::Put as u8,
1936 num_rows,
1937 )));
1938
1939 RecordBatch::try_new(flat_schema, columns).unwrap()
1940 }
1941
1942 #[tokio::test]
1943 async fn test_write_flat_read_with_fulltext_index() {
1944 let mut env = TestEnv::new().await;
1945 let object_store = env.init_object_store_manager();
1946 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1947 let metadata = Arc::new(fulltext_region_metadata());
1948 let row_group_size = 50;
1949
1950 let flat_batches = vec![
1956 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
1957 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
1958 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
1959 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
1960 ];
1961
1962 let flat_source = new_flat_source_from_record_batches(flat_batches);
1963
1964 let write_opts = WriteOptions {
1965 row_group_size,
1966 ..Default::default()
1967 };
1968
1969 let indexer_builder = create_test_indexer_builder(
1970 &env,
1971 object_store.clone(),
1972 file_path.clone(),
1973 metadata.clone(),
1974 row_group_size,
1975 );
1976
1977 let mut info = write_flat_sst(
1978 object_store.clone(),
1979 metadata.clone(),
1980 indexer_builder,
1981 file_path.clone(),
1982 flat_source,
1983 &write_opts,
1984 )
1985 .await;
1986 assert_eq!(200, info.num_rows);
1987 assert!(info.file_size > 0);
1988 assert!(info.index_metadata.file_size > 0);
1989
1990 assert!(info.index_metadata.fulltext_index.index_size > 0);
1992 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
1993 info.index_metadata.fulltext_index.columns.sort_unstable();
1995 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
1996
1997 assert_eq!(
1998 (
1999 Timestamp::new_millisecond(0),
2000 Timestamp::new_millisecond(199)
2001 ),
2002 info.time_range
2003 );
2004
2005 let handle = create_file_handle_from_sst_info(&info, &metadata);
2006
2007 let cache = create_test_cache();
2008
2009 let matches_func = || {
2011 Arc::new(
2012 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2013 .provide(Default::default()),
2014 )
2015 };
2016
2017 let matches_term_func = || {
2018 Arc::new(
2019 ScalarFunctionFactory::from(
2020 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2021 )
2022 .provide(Default::default()),
2023 )
2024 };
2025
2026 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2029 args: vec![col("text_bloom"), "hello".lit()],
2030 func: matches_term_func(),
2031 })];
2032
2033 let fulltext_applier = FulltextIndexApplierBuilder::new(
2034 FILE_DIR.to_string(),
2035 PathType::Bare,
2036 object_store.clone(),
2037 env.get_puffin_manager(),
2038 &metadata,
2039 )
2040 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2041 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2042 .build(&preds)
2043 .unwrap()
2044 .map(Arc::new);
2045
2046 let builder = ParquetReaderBuilder::new(
2047 FILE_DIR.to_string(),
2048 PathType::Bare,
2049 handle.clone(),
2050 object_store.clone(),
2051 )
2052 .flat_format(true)
2053 .predicate(Some(Predicate::new(preds)))
2054 .fulltext_index_appliers([None, fulltext_applier.clone()])
2055 .cache(CacheStrategy::EnableAll(cache.clone()));
2056
2057 let mut metrics = ReaderMetrics::default();
2058 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2059
2060 assert_eq!(selection.row_group_count(), 2);
2062 assert_eq!(50, selection.get(0).unwrap().row_count());
2063 assert_eq!(50, selection.get(1).unwrap().row_count());
2064
2065 assert_eq!(metrics.filter_metrics.rg_total, 4);
2067 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2068 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2069 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2070
2071 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2074 args: vec![col("text_tantivy"), "lazy".lit()],
2075 func: matches_func(),
2076 })];
2077
2078 let fulltext_applier = FulltextIndexApplierBuilder::new(
2079 FILE_DIR.to_string(),
2080 PathType::Bare,
2081 object_store.clone(),
2082 env.get_puffin_manager(),
2083 &metadata,
2084 )
2085 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2086 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2087 .build(&preds)
2088 .unwrap()
2089 .map(Arc::new);
2090
2091 let builder = ParquetReaderBuilder::new(
2092 FILE_DIR.to_string(),
2093 PathType::Bare,
2094 handle.clone(),
2095 object_store.clone(),
2096 )
2097 .flat_format(true)
2098 .predicate(Some(Predicate::new(preds)))
2099 .fulltext_index_appliers([None, fulltext_applier.clone()])
2100 .cache(CacheStrategy::EnableAll(cache.clone()));
2101
2102 let mut metrics = ReaderMetrics::default();
2103 let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
2104
2105 assert_eq!(selection.row_group_count(), 2);
2107 assert_eq!(50, selection.get(2).unwrap().row_count());
2108 assert_eq!(50, selection.get(3).unwrap().row_count());
2109
2110 assert_eq!(metrics.filter_metrics.rg_total, 4);
2112 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2113 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2114 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2115 }
2116}