1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod prefilter;
33pub(crate) mod push_decoder;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57#[derive(Debug, Clone)]
59pub struct WriteOptions {
60 pub write_buffer_size: ReadableSize,
62 pub row_group_size: usize,
64 pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71 fn default() -> Self {
72 WriteOptions {
73 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74 row_group_size: DEFAULT_ROW_GROUP_SIZE,
75 max_file_size: None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct SstInfo {
83 pub file_id: FileId,
85 pub time_range: FileTimeRange,
88 pub file_size: u64,
90 pub max_row_group_uncompressed_size: u64,
92 pub num_rows: usize,
94 pub num_row_groups: u64,
96 pub file_metadata: Option<Arc<ParquetMetaData>>,
98 pub index_metadata: IndexOutput,
100 pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106 use std::collections::HashSet;
107 use std::sync::Arc;
108
109 use api::v1::{OpType, SemanticType};
110 use common_function::function::FunctionRef;
111 use common_function::function_factory::ScalarFunctionFactory;
112 use common_function::scalars::matches::MatchesFunction;
113 use common_function::scalars::matches_term::MatchesTermFunction;
114 use common_time::Timestamp;
115 use datafusion_common::{Column, ScalarValue};
116 use datafusion_expr::expr::ScalarFunction;
117 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118 use datatypes::arrow;
119 use datatypes::arrow::array::{
120 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121 TimestampMillisecondArray, UInt8Array, UInt64Array,
122 };
123 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124 use datatypes::arrow::util::pretty::pretty_format_batches;
125 use datatypes::prelude::ConcreteDataType;
126 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127 use object_store::ObjectStore;
128 use parquet::arrow::AsyncArrowWriter;
129 use parquet::basic::{Compression, Encoding, ZstdLevel};
130 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131 use parquet::file::properties::WriterProperties;
132 use store_api::codec::PrimaryKeyEncoding;
133 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134 use store_api::region_request::PathType;
135 use store_api::storage::{ColumnSchema, RegionId};
136 use table::predicate::Predicate;
137 use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139 use super::*;
140 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141 use crate::cache::index::result_cache::PredicateKey;
142 use crate::cache::test_util::assert_parquet_metadata_equal;
143 use crate::cache::{CacheManager, CacheStrategy, PageKey};
144 use crate::config::IndexConfig;
145 use crate::read::FlatSource;
146 use crate::region::options::{IndexOptions, InvertedIndexOptions};
147 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
148 use crate::sst::file_purger::NoopFilePurger;
149 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
150 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
151 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
152 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
153 use crate::sst::parquet::flat_format::FlatWriteFormat;
154 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
155 use crate::sst::parquet::row_selection::RowGroupSelection;
156 use crate::sst::parquet::writer::ParquetWriter;
157 use crate::sst::{
158 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
159 };
160 use crate::test_util::TestEnv;
161 use crate::test_util::sst_util::{
162 build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
163 new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
164 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
165 sst_region_metadata_with_encoding,
166 };
167
168 const FILE_DIR: &str = "/";
169 const REGION_ID: RegionId = RegionId::new(0, 0);
170
171 #[derive(Clone)]
172 struct FixedPathProvider {
173 region_file_id: RegionFileId,
174 }
175
176 impl FilePathProvider for FixedPathProvider {
177 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
178 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
179 }
180
181 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
182 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
183 }
184
185 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
186 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
187 }
188 }
189
190 struct NoopIndexBuilder;
191
192 #[async_trait::async_trait]
193 impl IndexerBuilder for NoopIndexBuilder {
194 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
195 Indexer::default()
196 }
197 }
198
199 #[tokio::test]
200 async fn test_write_read() {
201 let mut env = TestEnv::new().await;
202 let object_store = env.init_object_store_manager();
203 let handle = sst_file_handle(0, 1000);
204 let file_path = FixedPathProvider {
205 region_file_id: handle.file_id(),
206 };
207 let metadata = Arc::new(sst_region_metadata());
208 let source = new_flat_source_from_record_batches(vec![
209 new_record_batch_by_range(&["a", "d"], 0, 60),
210 new_record_batch_by_range(&["b", "f"], 0, 40),
211 new_record_batch_by_range(&["b", "h"], 100, 200),
212 ]);
213 let write_opts = WriteOptions {
215 row_group_size: 50,
216 ..Default::default()
217 };
218
219 let mut metrics = Metrics::new(WriteType::Flush);
220 let mut writer = ParquetWriter::new_with_object_store(
221 object_store.clone(),
222 metadata.clone(),
223 IndexConfig::default(),
224 NoopIndexBuilder,
225 file_path,
226 &mut metrics,
227 )
228 .await;
229
230 let info = writer
231 .write_all_flat_as_primary_key(source, None, &write_opts)
232 .await
233 .unwrap()
234 .remove(0);
235 assert_eq!(200, info.num_rows);
236 assert!(info.file_size > 0);
237 assert_eq!(
238 (
239 Timestamp::new_millisecond(0),
240 Timestamp::new_millisecond(199)
241 ),
242 info.time_range
243 );
244
245 let builder = ParquetReaderBuilder::new(
246 FILE_DIR.to_string(),
247 PathType::Bare,
248 handle.clone(),
249 object_store,
250 );
251 let mut reader = builder.build().await.unwrap().unwrap();
252 check_record_batch_reader_result(
253 &mut reader,
254 &[
255 new_record_batch_by_range(&["a", "d"], 0, 50),
256 new_record_batch_by_range(&["a", "d"], 50, 60),
257 new_record_batch_by_range(&["b", "f"], 0, 40),
258 new_record_batch_by_range(&["b", "h"], 100, 150),
259 new_record_batch_by_range(&["b", "h"], 150, 200),
260 ],
261 )
262 .await;
263 }
264
265 #[tokio::test]
266 async fn test_read_with_cache() {
267 let mut env = TestEnv::new().await;
268 let object_store = env.init_object_store_manager();
269 let handle = sst_file_handle(0, 1000);
270 let metadata = Arc::new(sst_region_metadata());
271 let source = new_flat_source_from_record_batches(vec![
272 new_record_batch_by_range(&["a", "d"], 0, 60),
273 new_record_batch_by_range(&["b", "f"], 0, 40),
274 new_record_batch_by_range(&["b", "h"], 100, 200),
275 ]);
276 let write_opts = WriteOptions {
278 row_group_size: 50,
279 ..Default::default()
280 };
281 let mut metrics = Metrics::new(WriteType::Flush);
283 let mut writer = ParquetWriter::new_with_object_store(
284 object_store.clone(),
285 metadata.clone(),
286 IndexConfig::default(),
287 NoopIndexBuilder,
288 FixedPathProvider {
289 region_file_id: handle.file_id(),
290 },
291 &mut metrics,
292 )
293 .await;
294
295 let sst_info = writer
296 .write_all_flat_as_primary_key(source, None, &write_opts)
297 .await
298 .unwrap()
299 .remove(0);
300
301 let cache = CacheStrategy::EnableAll(Arc::new(
303 CacheManager::builder()
304 .page_cache_size(64 * 1024 * 1024)
305 .build(),
306 ));
307 let builder = ParquetReaderBuilder::new(
308 FILE_DIR.to_string(),
309 PathType::Bare,
310 handle.clone(),
311 object_store,
312 )
313 .cache(cache.clone());
314 for _ in 0..3 {
315 let mut reader = builder.build().await.unwrap().unwrap();
316 check_record_batch_reader_result(
317 &mut reader,
318 &[
319 new_record_batch_by_range(&["a", "d"], 0, 50),
320 new_record_batch_by_range(&["a", "d"], 50, 60),
321 new_record_batch_by_range(&["b", "f"], 0, 40),
322 new_record_batch_by_range(&["b", "h"], 100, 150),
323 new_record_batch_by_range(&["b", "h"], 150, 200),
324 ],
325 )
326 .await;
327 }
328
329 let parquet_meta = sst_info.file_metadata.unwrap();
330 let get_ranges = |row_group_idx: usize| {
331 let row_group = parquet_meta.row_group(row_group_idx);
332 let mut ranges = Vec::with_capacity(row_group.num_columns());
333 for i in 0..row_group.num_columns() {
334 let (start, length) = row_group.column(i).byte_range();
335 ranges.push(start..start + length);
336 }
337
338 ranges
339 };
340
341 for i in 0..4 {
343 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
344 assert!(cache.get_pages(&page_key).is_some());
345 }
346 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
347 assert!(cache.get_pages(&page_key).is_none());
348 }
349
350 #[tokio::test]
351 async fn test_parquet_metadata_eq() {
352 let mut env = crate::test_util::TestEnv::new().await;
354 let object_store = env.init_object_store_manager();
355 let handle = sst_file_handle(0, 1000);
356 let metadata = Arc::new(sst_region_metadata());
357 let source = new_flat_source_from_record_batches(vec![
358 new_record_batch_by_range(&["a", "d"], 0, 60),
359 new_record_batch_by_range(&["b", "f"], 0, 40),
360 new_record_batch_by_range(&["b", "h"], 100, 200),
361 ]);
362 let write_opts = WriteOptions {
363 row_group_size: 50,
364 ..Default::default()
365 };
366
367 let mut metrics = Metrics::new(WriteType::Flush);
370 let mut writer = ParquetWriter::new_with_object_store(
371 object_store.clone(),
372 metadata.clone(),
373 IndexConfig::default(),
374 NoopIndexBuilder,
375 FixedPathProvider {
376 region_file_id: handle.file_id(),
377 },
378 &mut metrics,
379 )
380 .await;
381
382 let sst_info = writer
383 .write_all_flat_as_primary_key(source, None, &write_opts)
384 .await
385 .unwrap()
386 .remove(0);
387 let writer_metadata = sst_info.file_metadata.unwrap();
388
389 let builder = ParquetReaderBuilder::new(
391 FILE_DIR.to_string(),
392 PathType::Bare,
393 handle.clone(),
394 object_store,
395 )
396 .page_index_policy(PageIndexPolicy::Optional);
397 let reader = builder.build().await.unwrap().unwrap();
398 let reader_metadata = reader.parquet_metadata();
399 let cached_writer_metadata =
400 crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
401 .unwrap()
402 .parquet_metadata();
403
404 assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
405 }
406
407 #[tokio::test]
408 async fn test_read_with_tag_filter() {
409 let mut env = TestEnv::new().await;
410 let object_store = env.init_object_store_manager();
411 let handle = sst_file_handle(0, 1000);
412 let metadata = Arc::new(sst_region_metadata());
413 let source = new_flat_source_from_record_batches(vec![
414 new_record_batch_by_range(&["a", "d"], 0, 60),
415 new_record_batch_by_range(&["b", "f"], 0, 40),
416 new_record_batch_by_range(&["b", "h"], 100, 200),
417 ]);
418 let write_opts = WriteOptions {
420 row_group_size: 50,
421 ..Default::default()
422 };
423 let mut metrics = Metrics::new(WriteType::Flush);
425 let mut writer = ParquetWriter::new_with_object_store(
426 object_store.clone(),
427 metadata.clone(),
428 IndexConfig::default(),
429 NoopIndexBuilder,
430 FixedPathProvider {
431 region_file_id: handle.file_id(),
432 },
433 &mut metrics,
434 )
435 .await;
436 writer
437 .write_all_flat_as_primary_key(source, None, &write_opts)
438 .await
439 .unwrap()
440 .remove(0);
441
442 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
444 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
445 op: Operator::Eq,
446 right: Box::new("a".lit()),
447 })]));
448
449 let builder = ParquetReaderBuilder::new(
450 FILE_DIR.to_string(),
451 PathType::Bare,
452 handle.clone(),
453 object_store,
454 )
455 .predicate(predicate);
456 let mut reader = builder.build().await.unwrap().unwrap();
457 check_record_batch_reader_result(
458 &mut reader,
459 &[
460 new_record_batch_by_range(&["a", "d"], 0, 50),
461 new_record_batch_by_range(&["a", "d"], 50, 60),
462 ],
463 )
464 .await;
465 }
466
467 #[tokio::test]
468 async fn test_read_empty_batch() {
469 let mut env = TestEnv::new().await;
470 let object_store = env.init_object_store_manager();
471 let handle = sst_file_handle(0, 1000);
472 let metadata = Arc::new(sst_region_metadata());
473 let source = new_flat_source_from_record_batches(vec![
474 new_record_batch_by_range(&["a", "z"], 0, 0),
475 new_record_batch_by_range(&["a", "z"], 100, 100),
476 new_record_batch_by_range(&["a", "z"], 200, 230),
477 ]);
478 let write_opts = WriteOptions {
480 row_group_size: 50,
481 ..Default::default()
482 };
483 let mut metrics = Metrics::new(WriteType::Flush);
485 let mut writer = ParquetWriter::new_with_object_store(
486 object_store.clone(),
487 metadata.clone(),
488 IndexConfig::default(),
489 NoopIndexBuilder,
490 FixedPathProvider {
491 region_file_id: handle.file_id(),
492 },
493 &mut metrics,
494 )
495 .await;
496 writer
497 .write_all_flat_as_primary_key(source, None, &write_opts)
498 .await
499 .unwrap()
500 .remove(0);
501
502 let builder = ParquetReaderBuilder::new(
503 FILE_DIR.to_string(),
504 PathType::Bare,
505 handle.clone(),
506 object_store,
507 );
508 let mut reader = builder.build().await.unwrap().unwrap();
509 check_record_batch_reader_result(
510 &mut reader,
511 &[new_record_batch_by_range(&["a", "z"], 200, 230)],
512 )
513 .await;
514 }
515
516 #[tokio::test]
517 async fn test_read_with_field_filter() {
518 let mut env = TestEnv::new().await;
519 let object_store = env.init_object_store_manager();
520 let handle = sst_file_handle(0, 1000);
521 let metadata = Arc::new(sst_region_metadata());
522 let source = new_flat_source_from_record_batches(vec![
523 new_record_batch_by_range(&["a", "d"], 0, 60),
524 new_record_batch_by_range(&["b", "f"], 0, 40),
525 new_record_batch_by_range(&["b", "h"], 100, 200),
526 ]);
527 let write_opts = WriteOptions {
529 row_group_size: 50,
530 ..Default::default()
531 };
532 let mut metrics = Metrics::new(WriteType::Flush);
534 let mut writer = ParquetWriter::new_with_object_store(
535 object_store.clone(),
536 metadata.clone(),
537 IndexConfig::default(),
538 NoopIndexBuilder,
539 FixedPathProvider {
540 region_file_id: handle.file_id(),
541 },
542 &mut metrics,
543 )
544 .await;
545
546 writer
547 .write_all_flat_as_primary_key(source, None, &write_opts)
548 .await
549 .unwrap()
550 .remove(0);
551
552 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
554 left: Box::new(Expr::Column(Column::from_name("field_0"))),
555 op: Operator::GtEq,
556 right: Box::new(150u64.lit()),
557 })]));
558
559 let builder = ParquetReaderBuilder::new(
560 FILE_DIR.to_string(),
561 PathType::Bare,
562 handle.clone(),
563 object_store,
564 )
565 .predicate(predicate);
566 let mut reader = builder.build().await.unwrap().unwrap();
567 check_record_batch_reader_result(
568 &mut reader,
569 &[new_record_batch_by_range(&["b", "h"], 150, 200)],
570 )
571 .await;
572 }
573
574 #[tokio::test]
575 async fn test_read_large_binary() {
576 let mut env = TestEnv::new().await;
577 let object_store = env.init_object_store_manager();
578 let handle = sst_file_handle(0, 1000);
579 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
580
581 let write_opts = WriteOptions {
582 row_group_size: 50,
583 ..Default::default()
584 };
585
586 let metadata = build_test_binary_test_region_metadata();
587 let json = metadata.to_json().unwrap();
588 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
589
590 let props_builder = WriterProperties::builder()
591 .set_key_value_metadata(Some(vec![key_value_meta]))
592 .set_compression(Compression::ZSTD(ZstdLevel::default()))
593 .set_encoding(Encoding::PLAIN)
594 .set_max_row_group_row_count(Some(write_opts.row_group_size));
595
596 let writer_props = props_builder.build();
597
598 let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
599 let fields: Vec<_> = write_format
600 .arrow_schema()
601 .fields()
602 .into_iter()
603 .map(|field| {
604 let data_type = field.data_type().clone();
605 if data_type == DataType::Binary {
606 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
607 } else {
608 Field::new(field.name(), data_type, field.is_nullable())
609 }
610 })
611 .collect();
612
613 let arrow_schema = Arc::new(Schema::new(fields));
614
615 assert_eq!(
617 &DataType::LargeBinary,
618 arrow_schema.field_with_name("field_0").unwrap().data_type()
619 );
620 let mut writer = AsyncArrowWriter::try_new(
621 object_store
622 .writer_with(&file_path)
623 .concurrent(DEFAULT_WRITE_CONCURRENCY)
624 .await
625 .map(|w| w.into_futures_async_write().compat_write())
626 .unwrap(),
627 arrow_schema.clone(),
628 Some(writer_props),
629 )
630 .unwrap();
631
632 let batch = new_record_batch_with_binary(&["a"], 0, 60);
633 let arrays: Vec<_> = batch
634 .columns()
635 .iter()
636 .map(|array| {
637 let data_type = array.data_type().clone();
638 if data_type == DataType::Binary {
639 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
640 } else {
641 array.clone()
642 }
643 })
644 .collect();
645 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
646
647 writer.write(&result).await.unwrap();
648 writer.close().await.unwrap();
649
650 let builder = ParquetReaderBuilder::new(
651 FILE_DIR.to_string(),
652 PathType::Bare,
653 handle.clone(),
654 object_store,
655 );
656 let mut reader = builder.build().await.unwrap().unwrap();
657 check_record_batch_reader_result(
658 &mut reader,
659 &[
660 new_record_batch_with_binary(&["a"], 0, 50),
661 new_record_batch_with_binary(&["a"], 50, 60),
662 ],
663 )
664 .await;
665 }
666
667 #[tokio::test]
668 async fn test_write_multiple_files() {
669 common_telemetry::init_default_ut_logging();
670 let mut env = TestEnv::new().await;
672 let object_store = env.init_object_store_manager();
673 let metadata = Arc::new(sst_region_metadata());
674 let batches = vec![
675 new_record_batch_by_range(&["a", "d"], 0, 1000),
676 new_record_batch_by_range(&["b", "f"], 0, 1000),
677 new_record_batch_by_range(&["c", "g"], 0, 1000),
678 new_record_batch_by_range(&["b", "h"], 100, 200),
679 new_record_batch_by_range(&["b", "h"], 200, 300),
680 new_record_batch_by_range(&["b", "h"], 300, 1000),
681 ];
682 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
683
684 let source = new_flat_source_from_record_batches(batches);
685 let write_opts = WriteOptions {
686 row_group_size: 50,
687 max_file_size: Some(1024 * 16),
688 ..Default::default()
689 };
690
691 let path_provider = RegionFilePathFactory {
692 table_dir: "test".to_string(),
693 path_type: PathType::Bare,
694 };
695 let mut metrics = Metrics::new(WriteType::Flush);
696 let mut writer = ParquetWriter::new_with_object_store(
697 object_store.clone(),
698 metadata.clone(),
699 IndexConfig::default(),
700 NoopIndexBuilder,
701 path_provider,
702 &mut metrics,
703 )
704 .await;
705
706 let files = writer
707 .write_all_flat_as_primary_key(source, None, &write_opts)
708 .await
709 .unwrap();
710 assert_eq!(2, files.len());
711
712 let mut rows_read = 0;
713 for f in &files {
714 let file_handle = sst_file_handle_with_file_id(
715 f.file_id,
716 f.time_range.0.value(),
717 f.time_range.1.value(),
718 );
719 let builder = ParquetReaderBuilder::new(
720 "test".to_string(),
721 PathType::Bare,
722 file_handle,
723 object_store.clone(),
724 );
725 let mut reader = builder.build().await.unwrap().unwrap();
726 while let Some(batch) = reader.next_record_batch().await.unwrap() {
727 rows_read += batch.num_rows();
728 }
729 }
730 assert_eq!(total_rows, rows_read);
731 }
732
733 #[tokio::test]
734 async fn test_write_read_with_index() {
735 let mut env = TestEnv::new().await;
736 let object_store = env.init_object_store_manager();
737 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
738 let metadata = Arc::new(sst_region_metadata());
739 let row_group_size = 50;
740
741 let source = new_flat_source_from_record_batches(vec![
742 new_record_batch_by_range(&["a", "d"], 0, 20),
743 new_record_batch_by_range(&["b", "d"], 0, 20),
744 new_record_batch_by_range(&["c", "d"], 0, 20),
745 new_record_batch_by_range(&["c", "f"], 0, 40),
746 new_record_batch_by_range(&["c", "h"], 100, 200),
747 ]);
748 let write_opts = WriteOptions {
750 row_group_size,
751 ..Default::default()
752 };
753
754 let puffin_manager = env
755 .get_puffin_manager()
756 .build(object_store.clone(), file_path.clone());
757 let intermediate_manager = env.get_intermediate_manager();
758
759 let indexer_builder = IndexerBuilderImpl {
760 build_type: IndexBuildType::Flush,
761 metadata: metadata.clone(),
762 row_group_size,
763 puffin_manager,
764 write_cache_enabled: false,
765 intermediate_manager,
766 index_options: IndexOptions {
767 inverted_index: InvertedIndexOptions {
768 segment_row_count: 1,
769 ..Default::default()
770 },
771 },
772 inverted_index_config: Default::default(),
773 fulltext_index_config: Default::default(),
774 bloom_filter_index_config: Default::default(),
775 #[cfg(feature = "vector_index")]
776 vector_index_config: Default::default(),
777 };
778
779 let mut metrics = Metrics::new(WriteType::Flush);
780 let mut writer = ParquetWriter::new_with_object_store(
781 object_store.clone(),
782 metadata.clone(),
783 IndexConfig::default(),
784 indexer_builder,
785 file_path.clone(),
786 &mut metrics,
787 )
788 .await;
789
790 let info = writer
791 .write_all_flat_as_primary_key(source, None, &write_opts)
792 .await
793 .unwrap()
794 .remove(0);
795 assert_eq!(200, info.num_rows);
796 assert!(info.file_size > 0);
797 assert!(info.index_metadata.file_size > 0);
798
799 assert!(info.index_metadata.inverted_index.index_size > 0);
800 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
801 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
802
803 assert!(info.index_metadata.bloom_filter.index_size > 0);
804 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
805 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
806
807 assert_eq!(
808 (
809 Timestamp::new_millisecond(0),
810 Timestamp::new_millisecond(199)
811 ),
812 info.time_range
813 );
814
815 let handle = FileHandle::new(
816 FileMeta {
817 region_id: metadata.region_id,
818 file_id: info.file_id,
819 time_range: info.time_range,
820 level: 0,
821 file_size: info.file_size,
822 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
823 available_indexes: info.index_metadata.build_available_indexes(),
824 indexes: info.index_metadata.build_indexes(),
825 index_file_size: info.index_metadata.file_size,
826 index_version: 0,
827 num_row_groups: info.num_row_groups,
828 num_rows: info.num_rows as u64,
829 sequence: None,
830 partition_expr: match &metadata.partition_expr {
831 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
832 .expect("partition expression should be valid JSON"),
833 None => None,
834 },
835 num_series: 0,
836 ..Default::default()
837 },
838 Arc::new(NoopFilePurger),
839 );
840
841 let cache = Arc::new(
842 CacheManager::builder()
843 .index_result_cache_size(1024 * 1024)
844 .index_metadata_size(1024 * 1024)
845 .index_content_page_size(1024 * 1024)
846 .index_content_size(1024 * 1024)
847 .puffin_metadata_size(1024 * 1024)
848 .build(),
849 );
850 let index_result_cache = cache.index_result_cache().unwrap();
851
852 let build_inverted_index_applier = |exprs: &[Expr]| {
853 InvertedIndexApplierBuilder::new(
854 FILE_DIR.to_string(),
855 PathType::Bare,
856 object_store.clone(),
857 &metadata,
858 HashSet::from_iter([0]),
859 env.get_puffin_manager(),
860 )
861 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
862 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
863 .build(exprs)
864 .unwrap()
865 .map(Arc::new)
866 };
867
868 let build_bloom_filter_applier = |exprs: &[Expr]| {
869 BloomFilterIndexApplierBuilder::new(
870 FILE_DIR.to_string(),
871 PathType::Bare,
872 object_store.clone(),
873 &metadata,
874 env.get_puffin_manager(),
875 )
876 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
877 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
878 .build(exprs)
879 .unwrap()
880 .map(Arc::new)
881 };
882
883 let preds = vec![col("tag_0").eq(lit("b"))];
900 let inverted_index_applier = build_inverted_index_applier(&preds);
901 let bloom_filter_applier = build_bloom_filter_applier(&preds);
902
903 let builder = ParquetReaderBuilder::new(
904 FILE_DIR.to_string(),
905 PathType::Bare,
906 handle.clone(),
907 object_store.clone(),
908 )
909 .predicate(Some(Predicate::new(preds)))
910 .inverted_index_appliers([inverted_index_applier.clone(), None])
911 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
912 .cache(CacheStrategy::EnableAll(cache.clone()));
913
914 let mut metrics = ReaderMetrics::default();
915 let (context, selection) = builder
916 .build_reader_input(&mut metrics)
917 .await
918 .unwrap()
919 .unwrap();
920 let mut reader = ParquetReader::new(Arc::new(context), selection)
921 .await
922 .unwrap();
923 check_record_batch_reader_result(
924 &mut reader,
925 &[new_record_batch_by_range(&["b", "d"], 0, 20)],
926 )
927 .await;
928
929 assert_eq!(metrics.filter_metrics.rg_total, 4);
930 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
931 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
932 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
933 let plan = inverted_index_applier
934 .as_ref()
935 .unwrap()
936 .plan_for_sst(&metadata)
937 .unwrap()
938 .unwrap();
939 let cached = index_result_cache
940 .get(&plan.predicate_key, handle.file_id().file_id())
941 .unwrap();
942 assert!(cached.contains_row_group(0));
944 assert!(cached.contains_row_group(1));
945 assert!(cached.contains_row_group(2));
946 assert!(cached.contains_row_group(3));
947
948 let preds = vec![
963 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
964 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
965 col("tag_1").eq(lit("d")),
966 ];
967 let inverted_index_applier = build_inverted_index_applier(&preds);
968 let bloom_filter_applier = build_bloom_filter_applier(&preds);
969
970 let builder = ParquetReaderBuilder::new(
971 FILE_DIR.to_string(),
972 PathType::Bare,
973 handle.clone(),
974 object_store.clone(),
975 )
976 .predicate(Some(Predicate::new(preds)))
977 .inverted_index_appliers([inverted_index_applier.clone(), None])
978 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
979 .cache(CacheStrategy::EnableAll(cache.clone()));
980
981 let mut metrics = ReaderMetrics::default();
982 let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
983 assert!(read_input.is_none());
984
985 assert_eq!(metrics.filter_metrics.rg_total, 4);
986 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
987 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
988 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
989 let bloom_predicates = bloom_filter_applier
990 .as_ref()
991 .unwrap()
992 .compatible_predicate_for_sst(&metadata)
993 .unwrap();
994 let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
995 let cached = index_result_cache
996 .get(&bloom_predicate_key, handle.file_id().file_id())
997 .unwrap();
998 assert!(cached.contains_row_group(2));
999 assert!(cached.contains_row_group(3));
1000 assert!(!cached.contains_row_group(0));
1001 assert!(!cached.contains_row_group(1));
1002
1003 let preds = vec![col("tag_1").eq(lit("d"))];
1024 let inverted_index_applier = build_inverted_index_applier(&preds);
1025 let bloom_filter_applier = build_bloom_filter_applier(&preds);
1026
1027 let builder = ParquetReaderBuilder::new(
1028 FILE_DIR.to_string(),
1029 PathType::Bare,
1030 handle.clone(),
1031 object_store.clone(),
1032 )
1033 .predicate(Some(Predicate::new(preds)))
1034 .inverted_index_appliers([inverted_index_applier.clone(), None])
1035 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1036 .cache(CacheStrategy::EnableAll(cache.clone()));
1037
1038 let mut metrics = ReaderMetrics::default();
1039 let (context, selection) = builder
1040 .build_reader_input(&mut metrics)
1041 .await
1042 .unwrap()
1043 .unwrap();
1044 let mut reader = ParquetReader::new(Arc::new(context), selection)
1045 .await
1046 .unwrap();
1047 check_record_batch_reader_result(
1048 &mut reader,
1049 &[
1050 new_record_batch_by_range(&["a", "d"], 0, 20),
1051 new_record_batch_by_range(&["b", "d"], 0, 20),
1052 new_record_batch_by_range(&["c", "d"], 0, 10),
1053 new_record_batch_by_range(&["c", "d"], 10, 20),
1054 ],
1055 )
1056 .await;
1057
1058 assert_eq!(metrics.filter_metrics.rg_total, 4);
1059 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1060 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1061 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1062 let bloom_predicates = bloom_filter_applier
1063 .as_ref()
1064 .unwrap()
1065 .compatible_predicate_for_sst(&metadata)
1066 .unwrap();
1067 let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1068 let cached = index_result_cache
1069 .get(&bloom_predicate_key, handle.file_id().file_id())
1070 .unwrap();
1071 assert!(cached.contains_row_group(0));
1072 assert!(cached.contains_row_group(1));
1073 assert!(cached.contains_row_group(2));
1074 assert!(cached.contains_row_group(3));
1075 }
1076
1077 fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1078 assert!(end >= start);
1079 let metadata = build_test_binary_test_region_metadata();
1080 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1081
1082 let num_rows = end - start;
1083 let mut columns = Vec::new();
1084
1085 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1086 for _ in 0..num_rows {
1087 tag_0_builder.append_value(tags[0]);
1088 }
1089 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1090
1091 let values = (0..num_rows)
1092 .map(|_| "some data".as_bytes())
1093 .collect::<Vec<_>>();
1094 columns.push(
1095 Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1096 values,
1097 )) as ArrayRef,
1098 );
1099
1100 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1101 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1102
1103 let pk = new_primary_key(tags);
1104 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1105 for _ in 0..num_rows {
1106 pk_builder.append(&pk).unwrap();
1107 }
1108 columns.push(Arc::new(pk_builder.finish()));
1109
1110 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1111 columns.push(Arc::new(UInt8Array::from_value(
1112 OpType::Put as u8,
1113 num_rows,
1114 )));
1115
1116 RecordBatch::try_new(flat_schema, columns).unwrap()
1117 }
1118
1119 async fn check_record_batch_reader_result(
1120 reader: &mut ParquetReader,
1121 expected: &[RecordBatch],
1122 ) {
1123 let mut actual = Vec::new();
1124 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1125 actual.push(batch);
1126 }
1127 assert_eq!(
1128 pretty_format_batches(expected).unwrap().to_string(),
1129 pretty_format_batches(&actual).unwrap().to_string()
1130 );
1131 assert!(reader.next_record_batch().await.unwrap().is_none());
1132 }
1133
1134 fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1135 let metadata = Arc::new(sst_region_metadata());
1136 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1137
1138 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1139 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1140 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1141 let mut field_values = Vec::with_capacity(rows.len());
1142 let mut timestamps = Vec::with_capacity(rows.len());
1143
1144 for (tag_0, tag_1, ts) in rows {
1145 tag_0_builder.append_value(*tag_0);
1146 tag_1_builder.append_value(*tag_1);
1147 pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1148 field_values.push(*ts as u64);
1149 timestamps.push(*ts);
1150 }
1151
1152 RecordBatch::try_new(
1153 flat_schema,
1154 vec![
1155 Arc::new(tag_0_builder.finish()) as ArrayRef,
1156 Arc::new(tag_1_builder.finish()) as ArrayRef,
1157 Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1158 Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1159 Arc::new(pk_builder.finish()) as ArrayRef,
1160 Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1161 Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1162 ],
1163 )
1164 .unwrap()
1165 }
1166
1167 fn new_record_batch_by_range_sparse(
1170 tags: &[&str],
1171 start: usize,
1172 end: usize,
1173 metadata: &Arc<RegionMetadata>,
1174 ) -> RecordBatch {
1175 assert!(end >= start);
1176 let flat_schema = to_flat_sst_arrow_schema(
1177 metadata,
1178 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1179 );
1180
1181 let num_rows = end - start;
1182 let mut columns: Vec<ArrayRef> = Vec::new();
1183
1184 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1188 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1189
1190 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1192 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1193
1194 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1198
1199 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1200 for _ in 0..num_rows {
1201 pk_builder.append(&pk).unwrap();
1202 }
1203 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1204
1205 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1207
1208 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1210
1211 RecordBatch::try_new(flat_schema, columns).unwrap()
1212 }
1213
1214 fn create_test_indexer_builder(
1216 env: &TestEnv,
1217 object_store: ObjectStore,
1218 file_path: RegionFilePathFactory,
1219 metadata: Arc<RegionMetadata>,
1220 row_group_size: usize,
1221 ) -> IndexerBuilderImpl {
1222 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1223 let intermediate_manager = env.get_intermediate_manager();
1224
1225 IndexerBuilderImpl {
1226 build_type: IndexBuildType::Flush,
1227 metadata,
1228 row_group_size,
1229 puffin_manager,
1230 write_cache_enabled: false,
1231 intermediate_manager,
1232 index_options: IndexOptions {
1233 inverted_index: InvertedIndexOptions {
1234 segment_row_count: 1,
1235 ..Default::default()
1236 },
1237 },
1238 inverted_index_config: Default::default(),
1239 fulltext_index_config: Default::default(),
1240 bloom_filter_index_config: Default::default(),
1241 #[cfg(feature = "vector_index")]
1242 vector_index_config: Default::default(),
1243 }
1244 }
1245
1246 async fn write_flat_sst(
1248 object_store: ObjectStore,
1249 metadata: Arc<RegionMetadata>,
1250 indexer_builder: IndexerBuilderImpl,
1251 file_path: RegionFilePathFactory,
1252 flat_source: FlatSource,
1253 write_opts: &WriteOptions,
1254 ) -> SstInfo {
1255 let mut metrics = Metrics::new(WriteType::Flush);
1256 let mut writer = ParquetWriter::new_with_object_store(
1257 object_store,
1258 metadata,
1259 IndexConfig::default(),
1260 indexer_builder,
1261 file_path,
1262 &mut metrics,
1263 )
1264 .await;
1265
1266 writer
1267 .write_all_flat(flat_source, None, write_opts)
1268 .await
1269 .unwrap()
1270 .remove(0)
1271 }
1272
1273 fn create_file_handle_from_sst_info(
1275 info: &SstInfo,
1276 metadata: &Arc<RegionMetadata>,
1277 ) -> FileHandle {
1278 FileHandle::new(
1279 FileMeta {
1280 region_id: metadata.region_id,
1281 file_id: info.file_id,
1282 time_range: info.time_range,
1283 level: 0,
1284 file_size: info.file_size,
1285 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1286 available_indexes: info.index_metadata.build_available_indexes(),
1287 indexes: info.index_metadata.build_indexes(),
1288 index_file_size: info.index_metadata.file_size,
1289 index_version: 0,
1290 num_row_groups: info.num_row_groups,
1291 num_rows: info.num_rows as u64,
1292 sequence: None,
1293 partition_expr: match &metadata.partition_expr {
1294 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1295 .expect("partition expression should be valid JSON"),
1296 None => None,
1297 },
1298 num_series: 0,
1299 ..Default::default()
1300 },
1301 Arc::new(NoopFilePurger),
1302 )
1303 }
1304
1305 fn create_test_cache() -> Arc<CacheManager> {
1307 Arc::new(
1308 CacheManager::builder()
1309 .index_result_cache_size(1024 * 1024)
1310 .index_metadata_size(1024 * 1024)
1311 .index_content_page_size(1024 * 1024)
1312 .index_content_size(1024 * 1024)
1313 .puffin_metadata_size(1024 * 1024)
1314 .build(),
1315 )
1316 }
1317
1318 #[tokio::test]
1319 async fn test_write_flat_with_index() {
1320 let mut env = TestEnv::new().await;
1321 let object_store = env.init_object_store_manager();
1322 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1323 let metadata = Arc::new(sst_region_metadata());
1324 let row_group_size = 50;
1325
1326 let flat_batches = vec![
1328 new_record_batch_by_range(&["a", "d"], 0, 20),
1329 new_record_batch_by_range(&["b", "d"], 0, 20),
1330 new_record_batch_by_range(&["c", "d"], 0, 20),
1331 new_record_batch_by_range(&["c", "f"], 0, 40),
1332 new_record_batch_by_range(&["c", "h"], 100, 200),
1333 ];
1334
1335 let flat_source = new_flat_source_from_record_batches(flat_batches);
1336
1337 let write_opts = WriteOptions {
1338 row_group_size,
1339 ..Default::default()
1340 };
1341
1342 let puffin_manager = env
1343 .get_puffin_manager()
1344 .build(object_store.clone(), file_path.clone());
1345 let intermediate_manager = env.get_intermediate_manager();
1346
1347 let indexer_builder = IndexerBuilderImpl {
1348 build_type: IndexBuildType::Flush,
1349 metadata: metadata.clone(),
1350 row_group_size,
1351 puffin_manager,
1352 write_cache_enabled: false,
1353 intermediate_manager,
1354 index_options: IndexOptions {
1355 inverted_index: InvertedIndexOptions {
1356 segment_row_count: 1,
1357 ..Default::default()
1358 },
1359 },
1360 inverted_index_config: Default::default(),
1361 fulltext_index_config: Default::default(),
1362 bloom_filter_index_config: Default::default(),
1363 #[cfg(feature = "vector_index")]
1364 vector_index_config: Default::default(),
1365 };
1366
1367 let mut metrics = Metrics::new(WriteType::Flush);
1368 let mut writer = ParquetWriter::new_with_object_store(
1369 object_store.clone(),
1370 metadata.clone(),
1371 IndexConfig::default(),
1372 indexer_builder,
1373 file_path.clone(),
1374 &mut metrics,
1375 )
1376 .await;
1377
1378 let info = writer
1379 .write_all_flat(flat_source, None, &write_opts)
1380 .await
1381 .unwrap()
1382 .remove(0);
1383 assert_eq!(200, info.num_rows);
1384 assert!(info.file_size > 0);
1385 assert!(info.index_metadata.file_size > 0);
1386
1387 assert!(info.index_metadata.inverted_index.index_size > 0);
1388 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1389 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1390
1391 assert!(info.index_metadata.bloom_filter.index_size > 0);
1392 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1393 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1394
1395 assert_eq!(
1396 (
1397 Timestamp::new_millisecond(0),
1398 Timestamp::new_millisecond(199)
1399 ),
1400 info.time_range
1401 );
1402 }
1403
1404 #[tokio::test]
1405 async fn test_read_with_override_sequence() {
1406 let mut env = TestEnv::new().await;
1407 let object_store = env.init_object_store_manager();
1408 let handle = sst_file_handle(0, 1000);
1409 let file_path = FixedPathProvider {
1410 region_file_id: handle.file_id(),
1411 };
1412 let metadata = Arc::new(sst_region_metadata());
1413
1414 let source = new_flat_source_from_record_batches(vec![
1416 new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1417 new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1418 ]);
1419
1420 let write_opts = WriteOptions {
1421 row_group_size: 50,
1422 ..Default::default()
1423 };
1424
1425 let mut metrics = Metrics::new(WriteType::Flush);
1426 let mut writer = ParquetWriter::new_with_object_store(
1427 object_store.clone(),
1428 metadata.clone(),
1429 IndexConfig::default(),
1430 NoopIndexBuilder,
1431 file_path,
1432 &mut metrics,
1433 )
1434 .await;
1435
1436 writer
1437 .write_all_flat_as_primary_key(source, None, &write_opts)
1438 .await
1439 .unwrap()
1440 .remove(0);
1441
1442 let builder = ParquetReaderBuilder::new(
1444 FILE_DIR.to_string(),
1445 PathType::Bare,
1446 handle.clone(),
1447 object_store.clone(),
1448 );
1449 let mut reader = builder.build().await.unwrap().unwrap();
1450 let mut normal_batches = Vec::new();
1451 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1452 normal_batches.push(batch);
1453 }
1454
1455 let custom_sequence = 12345u64;
1457 let file_meta = handle.meta_ref();
1458 let mut override_file_meta = file_meta.clone();
1459 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1460 let override_handle = FileHandle::new(
1461 override_file_meta,
1462 Arc::new(crate::sst::file_purger::NoopFilePurger),
1463 );
1464
1465 let builder = ParquetReaderBuilder::new(
1466 FILE_DIR.to_string(),
1467 PathType::Bare,
1468 override_handle,
1469 object_store.clone(),
1470 );
1471 let mut reader = builder.build().await.unwrap().unwrap();
1472 let mut override_batches = Vec::new();
1473 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1474 override_batches.push(batch);
1475 }
1476
1477 assert_eq!(normal_batches.len(), override_batches.len());
1479 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1480 let expected_batch = {
1481 let mut columns = normal.columns().to_vec();
1482 let num_cols = columns.len();
1483 columns[num_cols - 2] =
1484 Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1485 RecordBatch::try_new(normal.schema(), columns).unwrap()
1486 };
1487
1488 assert_eq!(*override_batch, expected_batch);
1490 }
1491 }
1492
1493 #[tokio::test]
1494 async fn test_write_flat_read_with_inverted_index() {
1495 let mut env = TestEnv::new().await;
1496 let object_store = env.init_object_store_manager();
1497 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1498 let metadata = Arc::new(sst_region_metadata());
1499 let row_group_size = 100;
1500
1501 let flat_batches = vec![
1509 new_record_batch_by_range(&["a", "d"], 0, 50),
1510 new_record_batch_by_range(&["b", "d"], 50, 100),
1511 new_record_batch_by_range(&["c", "d"], 100, 150),
1512 new_record_batch_by_range(&["c", "f"], 150, 200),
1513 ];
1514
1515 let flat_source = new_flat_source_from_record_batches(flat_batches);
1516
1517 let write_opts = WriteOptions {
1518 row_group_size,
1519 ..Default::default()
1520 };
1521
1522 let indexer_builder = create_test_indexer_builder(
1523 &env,
1524 object_store.clone(),
1525 file_path.clone(),
1526 metadata.clone(),
1527 row_group_size,
1528 );
1529
1530 let info = write_flat_sst(
1531 object_store.clone(),
1532 metadata.clone(),
1533 indexer_builder,
1534 file_path.clone(),
1535 flat_source,
1536 &write_opts,
1537 )
1538 .await;
1539 assert_eq!(200, info.num_rows);
1540 assert!(info.file_size > 0);
1541 assert!(info.index_metadata.file_size > 0);
1542
1543 let handle = create_file_handle_from_sst_info(&info, &metadata);
1544
1545 let cache = create_test_cache();
1546
1547 let preds = vec![col("tag_0").eq(lit("b"))];
1550 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1551 FILE_DIR.to_string(),
1552 PathType::Bare,
1553 object_store.clone(),
1554 &metadata,
1555 HashSet::from_iter([0]),
1556 env.get_puffin_manager(),
1557 )
1558 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1559 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1560 .build(&preds)
1561 .unwrap()
1562 .map(Arc::new);
1563
1564 let builder = ParquetReaderBuilder::new(
1565 FILE_DIR.to_string(),
1566 PathType::Bare,
1567 handle.clone(),
1568 object_store.clone(),
1569 )
1570 .predicate(Some(Predicate::new(preds)))
1571 .inverted_index_appliers([inverted_index_applier.clone(), None])
1572 .cache(CacheStrategy::EnableAll(cache.clone()));
1573
1574 let mut metrics = ReaderMetrics::default();
1575 let (_context, selection) = builder
1576 .build_reader_input(&mut metrics)
1577 .await
1578 .unwrap()
1579 .unwrap();
1580
1581 assert_eq!(selection.row_group_count(), 1);
1583 assert_eq!(50, selection.get(0).unwrap().row_count());
1584
1585 assert_eq!(metrics.filter_metrics.rg_total, 2);
1587 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1588 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1589 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1590 }
1591
1592 #[tokio::test]
1593 async fn test_write_flat_read_with_bloom_filter() {
1594 let mut env = TestEnv::new().await;
1595 let object_store = env.init_object_store_manager();
1596 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1597 let metadata = Arc::new(sst_region_metadata());
1598 let row_group_size = 100;
1599
1600 let flat_batches = vec![
1608 new_record_batch_by_range(&["a", "d"], 0, 50),
1609 new_record_batch_by_range(&["b", "e"], 50, 100),
1610 new_record_batch_by_range(&["c", "d"], 100, 150),
1611 new_record_batch_by_range(&["c", "f"], 150, 200),
1612 ];
1613
1614 let flat_source = new_flat_source_from_record_batches(flat_batches);
1615
1616 let write_opts = WriteOptions {
1617 row_group_size,
1618 ..Default::default()
1619 };
1620
1621 let indexer_builder = create_test_indexer_builder(
1622 &env,
1623 object_store.clone(),
1624 file_path.clone(),
1625 metadata.clone(),
1626 row_group_size,
1627 );
1628
1629 let info = write_flat_sst(
1630 object_store.clone(),
1631 metadata.clone(),
1632 indexer_builder,
1633 file_path.clone(),
1634 flat_source,
1635 &write_opts,
1636 )
1637 .await;
1638 assert_eq!(200, info.num_rows);
1639 assert!(info.file_size > 0);
1640 assert!(info.index_metadata.file_size > 0);
1641
1642 let handle = create_file_handle_from_sst_info(&info, &metadata);
1643
1644 let cache = create_test_cache();
1645
1646 let preds = vec![
1649 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1650 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1651 col("tag_1").eq(lit("d")),
1652 ];
1653 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1654 FILE_DIR.to_string(),
1655 PathType::Bare,
1656 object_store.clone(),
1657 &metadata,
1658 env.get_puffin_manager(),
1659 )
1660 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1661 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1662 .build(&preds)
1663 .unwrap()
1664 .map(Arc::new);
1665
1666 let builder = ParquetReaderBuilder::new(
1667 FILE_DIR.to_string(),
1668 PathType::Bare,
1669 handle.clone(),
1670 object_store.clone(),
1671 )
1672 .predicate(Some(Predicate::new(preds)))
1673 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1674 .cache(CacheStrategy::EnableAll(cache.clone()));
1675
1676 let mut metrics = ReaderMetrics::default();
1677 let (_context, selection) = builder
1678 .build_reader_input(&mut metrics)
1679 .await
1680 .unwrap()
1681 .unwrap();
1682
1683 assert_eq!(selection.row_group_count(), 2);
1685 assert_eq!(50, selection.get(0).unwrap().row_count());
1686 assert_eq!(50, selection.get(1).unwrap().row_count());
1687
1688 assert_eq!(metrics.filter_metrics.rg_total, 2);
1690 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1691 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1692 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1693 }
1694
1695 #[tokio::test]
1696 async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1697 let mut env = TestEnv::new().await;
1698 let object_store = env.init_object_store_manager();
1699 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1700 let metadata = Arc::new(sst_region_metadata());
1701 let row_group_size = 10;
1702
1703 let flat_source = new_flat_source_from_record_batches(vec![
1704 new_record_batch_by_range(&["a", "d"], 0, 3),
1705 new_record_batch_by_range(&["b", "d"], 3, 10),
1706 ]);
1707 let write_opts = WriteOptions {
1708 row_group_size,
1709 ..Default::default()
1710 };
1711 let indexer_builder = create_test_indexer_builder(
1712 &env,
1713 object_store.clone(),
1714 file_path.clone(),
1715 metadata.clone(),
1716 row_group_size,
1717 );
1718 let info = write_flat_sst(
1719 object_store.clone(),
1720 metadata.clone(),
1721 indexer_builder,
1722 file_path,
1723 flat_source,
1724 &write_opts,
1725 )
1726 .await;
1727 let handle = create_file_handle_from_sst_info(&info, &metadata);
1728
1729 let builder =
1730 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1731 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1732
1733 let mut metrics = ReaderMetrics::default();
1734 let (context, _) = builder
1735 .build_reader_input(&mut metrics)
1736 .await
1737 .unwrap()
1738 .unwrap();
1739 let selection = RowGroupSelection::from_row_ranges(
1740 vec![(0, std::iter::once(0..6).collect())],
1741 row_group_size,
1742 );
1743
1744 let mut reader = ParquetReader::new(Arc::new(context), selection)
1745 .await
1746 .unwrap();
1747 check_record_batch_reader_result(
1748 &mut reader,
1749 &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1750 )
1751 .await;
1752 }
1753
1754 #[tokio::test]
1755 async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1756 let mut env = TestEnv::new().await;
1757 let object_store = env.init_object_store_manager();
1758 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1759 let metadata = Arc::new(sst_region_metadata());
1760 let row_group_size = 8;
1761
1762 let flat_source = new_flat_source_from_record_batches(vec![
1763 new_record_batch_by_range(&["a", "d"], 0, 2),
1764 new_record_batch_by_range(&["b", "d"], 2, 4),
1765 new_record_batch_by_range(&["a", "d"], 4, 6),
1766 new_record_batch_by_range(&["c", "d"], 6, 8),
1767 ]);
1768 let write_opts = WriteOptions {
1769 row_group_size,
1770 ..Default::default()
1771 };
1772 let indexer_builder = create_test_indexer_builder(
1773 &env,
1774 object_store.clone(),
1775 file_path.clone(),
1776 metadata.clone(),
1777 row_group_size,
1778 );
1779 let info = write_flat_sst(
1780 object_store.clone(),
1781 metadata.clone(),
1782 indexer_builder,
1783 file_path,
1784 flat_source,
1785 &write_opts,
1786 )
1787 .await;
1788 let handle = create_file_handle_from_sst_info(&info, &metadata);
1789
1790 let builder =
1791 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1792 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1793
1794 let mut metrics = ReaderMetrics::default();
1795 let (context, _) = builder
1796 .build_reader_input(&mut metrics)
1797 .await
1798 .unwrap()
1799 .unwrap();
1800 let selection = RowGroupSelection::from_row_ranges(
1801 vec![(0, std::iter::once(0..8).collect())],
1802 row_group_size,
1803 );
1804
1805 let mut reader = ParquetReader::new(Arc::new(context), selection)
1806 .await
1807 .unwrap();
1808 check_record_batch_reader_result(
1809 &mut reader,
1810 &[new_record_batch_from_rows(&[
1811 ("a", "d", 0),
1812 ("a", "d", 1),
1813 ("a", "d", 4),
1814 ("a", "d", 5),
1815 ])],
1816 )
1817 .await;
1818 }
1819
1820 #[tokio::test]
1821 async fn test_write_flat_read_with_inverted_index_sparse() {
1822 common_telemetry::init_default_ut_logging();
1823
1824 let mut env = TestEnv::new().await;
1825 let object_store = env.init_object_store_manager();
1826 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1827 let metadata = Arc::new(sst_region_metadata_with_encoding(
1828 PrimaryKeyEncoding::Sparse,
1829 ));
1830 let row_group_size = 100;
1831
1832 let flat_batches = vec![
1840 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1841 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1842 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1843 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1844 ];
1845
1846 let flat_source = new_flat_source_from_record_batches(flat_batches);
1847
1848 let write_opts = WriteOptions {
1849 row_group_size,
1850 ..Default::default()
1851 };
1852
1853 let indexer_builder = create_test_indexer_builder(
1854 &env,
1855 object_store.clone(),
1856 file_path.clone(),
1857 metadata.clone(),
1858 row_group_size,
1859 );
1860
1861 let info = write_flat_sst(
1862 object_store.clone(),
1863 metadata.clone(),
1864 indexer_builder,
1865 file_path.clone(),
1866 flat_source,
1867 &write_opts,
1868 )
1869 .await;
1870 assert_eq!(200, info.num_rows);
1871 assert!(info.file_size > 0);
1872 assert!(info.index_metadata.file_size > 0);
1873
1874 let handle = create_file_handle_from_sst_info(&info, &metadata);
1875
1876 let cache = create_test_cache();
1877
1878 let preds = vec![col("tag_0").eq(lit("b"))];
1881 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1882 FILE_DIR.to_string(),
1883 PathType::Bare,
1884 object_store.clone(),
1885 &metadata,
1886 HashSet::from_iter([0]),
1887 env.get_puffin_manager(),
1888 )
1889 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1890 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1891 .build(&preds)
1892 .unwrap()
1893 .map(Arc::new);
1894
1895 let builder = ParquetReaderBuilder::new(
1896 FILE_DIR.to_string(),
1897 PathType::Bare,
1898 handle.clone(),
1899 object_store.clone(),
1900 )
1901 .predicate(Some(Predicate::new(preds)))
1902 .inverted_index_appliers([inverted_index_applier.clone(), None])
1903 .cache(CacheStrategy::EnableAll(cache.clone()));
1904
1905 let mut metrics = ReaderMetrics::default();
1906 let (_context, selection) = builder
1907 .build_reader_input(&mut metrics)
1908 .await
1909 .unwrap()
1910 .unwrap();
1911
1912 assert_eq!(selection.row_group_count(), 1);
1914 assert_eq!(50, selection.get(0).unwrap().row_count());
1915
1916 assert_eq!(metrics.filter_metrics.rg_total, 2);
1920 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1922 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1923 }
1924
1925 #[tokio::test]
1926 async fn test_write_flat_read_with_bloom_filter_sparse() {
1927 let mut env = TestEnv::new().await;
1928 let object_store = env.init_object_store_manager();
1929 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1930 let metadata = Arc::new(sst_region_metadata_with_encoding(
1931 PrimaryKeyEncoding::Sparse,
1932 ));
1933 let row_group_size = 100;
1934
1935 let flat_batches = vec![
1943 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1944 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1945 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1946 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1947 ];
1948
1949 let flat_source = new_flat_source_from_record_batches(flat_batches);
1950
1951 let write_opts = WriteOptions {
1952 row_group_size,
1953 ..Default::default()
1954 };
1955
1956 let indexer_builder = create_test_indexer_builder(
1957 &env,
1958 object_store.clone(),
1959 file_path.clone(),
1960 metadata.clone(),
1961 row_group_size,
1962 );
1963
1964 let info = write_flat_sst(
1965 object_store.clone(),
1966 metadata.clone(),
1967 indexer_builder,
1968 file_path.clone(),
1969 flat_source,
1970 &write_opts,
1971 )
1972 .await;
1973 assert_eq!(200, info.num_rows);
1974 assert!(info.file_size > 0);
1975 assert!(info.index_metadata.file_size > 0);
1976
1977 let handle = create_file_handle_from_sst_info(&info, &metadata);
1978
1979 let cache = create_test_cache();
1980
1981 let preds = vec![
1984 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1985 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1986 col("tag_1").eq(lit("d")),
1987 ];
1988 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1989 FILE_DIR.to_string(),
1990 PathType::Bare,
1991 object_store.clone(),
1992 &metadata,
1993 env.get_puffin_manager(),
1994 )
1995 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1996 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1997 .build(&preds)
1998 .unwrap()
1999 .map(Arc::new);
2000
2001 let builder = ParquetReaderBuilder::new(
2002 FILE_DIR.to_string(),
2003 PathType::Bare,
2004 handle.clone(),
2005 object_store.clone(),
2006 )
2007 .predicate(Some(Predicate::new(preds)))
2008 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2009 .cache(CacheStrategy::EnableAll(cache.clone()));
2010
2011 let mut metrics = ReaderMetrics::default();
2012 let (_context, selection) = builder
2013 .build_reader_input(&mut metrics)
2014 .await
2015 .unwrap()
2016 .unwrap();
2017
2018 assert_eq!(selection.row_group_count(), 2);
2020 assert_eq!(50, selection.get(0).unwrap().row_count());
2021 assert_eq!(50, selection.get(1).unwrap().row_count());
2022
2023 assert_eq!(metrics.filter_metrics.rg_total, 2);
2025 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2026 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2027 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2028 }
2029
2030 fn fulltext_region_metadata() -> RegionMetadata {
2033 let mut builder = RegionMetadataBuilder::new(REGION_ID);
2034 builder
2035 .push_column_metadata(ColumnMetadata {
2036 column_schema: ColumnSchema::new(
2037 "tag_0".to_string(),
2038 ConcreteDataType::string_datatype(),
2039 true,
2040 ),
2041 semantic_type: SemanticType::Tag,
2042 column_id: 0,
2043 })
2044 .push_column_metadata(ColumnMetadata {
2045 column_schema: ColumnSchema::new(
2046 "text_bloom".to_string(),
2047 ConcreteDataType::string_datatype(),
2048 true,
2049 )
2050 .with_fulltext_options(FulltextOptions {
2051 enable: true,
2052 analyzer: FulltextAnalyzer::English,
2053 case_sensitive: false,
2054 backend: FulltextBackend::Bloom,
2055 granularity: 1,
2056 false_positive_rate_in_10000: 50,
2057 })
2058 .unwrap(),
2059 semantic_type: SemanticType::Field,
2060 column_id: 1,
2061 })
2062 .push_column_metadata(ColumnMetadata {
2063 column_schema: ColumnSchema::new(
2064 "text_tantivy".to_string(),
2065 ConcreteDataType::string_datatype(),
2066 true,
2067 )
2068 .with_fulltext_options(FulltextOptions {
2069 enable: true,
2070 analyzer: FulltextAnalyzer::English,
2071 case_sensitive: false,
2072 backend: FulltextBackend::Tantivy,
2073 granularity: 1,
2074 false_positive_rate_in_10000: 50,
2075 })
2076 .unwrap(),
2077 semantic_type: SemanticType::Field,
2078 column_id: 2,
2079 })
2080 .push_column_metadata(ColumnMetadata {
2081 column_schema: ColumnSchema::new(
2082 "field_0".to_string(),
2083 ConcreteDataType::uint64_datatype(),
2084 true,
2085 ),
2086 semantic_type: SemanticType::Field,
2087 column_id: 3,
2088 })
2089 .push_column_metadata(ColumnMetadata {
2090 column_schema: ColumnSchema::new(
2091 "ts".to_string(),
2092 ConcreteDataType::timestamp_millisecond_datatype(),
2093 false,
2094 ),
2095 semantic_type: SemanticType::Timestamp,
2096 column_id: 4,
2097 })
2098 .primary_key(vec![0]);
2099 builder.build().unwrap()
2100 }
2101
2102 fn new_fulltext_record_batch_by_range(
2104 tag: &str,
2105 text_bloom: &str,
2106 text_tantivy: &str,
2107 start: usize,
2108 end: usize,
2109 ) -> RecordBatch {
2110 assert!(end >= start);
2111 let metadata = Arc::new(fulltext_region_metadata());
2112 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2113
2114 let num_rows = end - start;
2115 let mut columns = Vec::new();
2116
2117 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2119 for _ in 0..num_rows {
2120 tag_builder.append_value(tag);
2121 }
2122 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2123
2124 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2126 columns.push(Arc::new(StringArray::from(text_bloom_values)));
2127
2128 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2130 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2131
2132 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2134 columns.push(Arc::new(UInt64Array::from(field_values)));
2135
2136 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2138 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2139
2140 let pk = new_primary_key(&[tag]);
2142 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2143 for _ in 0..num_rows {
2144 pk_builder.append(&pk).unwrap();
2145 }
2146 columns.push(Arc::new(pk_builder.finish()));
2147
2148 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2150
2151 columns.push(Arc::new(UInt8Array::from_value(
2153 OpType::Put as u8,
2154 num_rows,
2155 )));
2156
2157 RecordBatch::try_new(flat_schema, columns).unwrap()
2158 }
2159
2160 #[tokio::test]
2161 async fn test_write_flat_read_with_fulltext_index() {
2162 let mut env = TestEnv::new().await;
2163 let object_store = env.init_object_store_manager();
2164 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2165 let metadata = Arc::new(fulltext_region_metadata());
2166 let row_group_size = 50;
2167
2168 let flat_batches = vec![
2174 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2175 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2176 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2177 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2178 ];
2179
2180 let flat_source = new_flat_source_from_record_batches(flat_batches);
2181
2182 let write_opts = WriteOptions {
2183 row_group_size,
2184 ..Default::default()
2185 };
2186
2187 let indexer_builder = create_test_indexer_builder(
2188 &env,
2189 object_store.clone(),
2190 file_path.clone(),
2191 metadata.clone(),
2192 row_group_size,
2193 );
2194
2195 let mut info = write_flat_sst(
2196 object_store.clone(),
2197 metadata.clone(),
2198 indexer_builder,
2199 file_path.clone(),
2200 flat_source,
2201 &write_opts,
2202 )
2203 .await;
2204 assert_eq!(200, info.num_rows);
2205 assert!(info.file_size > 0);
2206 assert!(info.index_metadata.file_size > 0);
2207
2208 assert!(info.index_metadata.fulltext_index.index_size > 0);
2210 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2211 info.index_metadata.fulltext_index.columns.sort_unstable();
2213 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2214
2215 assert_eq!(
2216 (
2217 Timestamp::new_millisecond(0),
2218 Timestamp::new_millisecond(199)
2219 ),
2220 info.time_range
2221 );
2222
2223 let handle = create_file_handle_from_sst_info(&info, &metadata);
2224
2225 let cache = create_test_cache();
2226
2227 let matches_func = || {
2229 Arc::new(
2230 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2231 .provide(Default::default()),
2232 )
2233 };
2234
2235 let matches_term_func = || {
2236 Arc::new(
2237 ScalarFunctionFactory::from(
2238 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2239 )
2240 .provide(Default::default()),
2241 )
2242 };
2243
2244 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2247 args: vec![col("text_bloom"), "hello".lit()],
2248 func: matches_term_func(),
2249 })];
2250
2251 let fulltext_applier = FulltextIndexApplierBuilder::new(
2252 FILE_DIR.to_string(),
2253 PathType::Bare,
2254 object_store.clone(),
2255 env.get_puffin_manager(),
2256 &metadata,
2257 )
2258 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2259 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2260 .build(&preds)
2261 .unwrap()
2262 .map(Arc::new);
2263
2264 let builder = ParquetReaderBuilder::new(
2265 FILE_DIR.to_string(),
2266 PathType::Bare,
2267 handle.clone(),
2268 object_store.clone(),
2269 )
2270 .predicate(Some(Predicate::new(preds)))
2271 .fulltext_index_appliers([None, fulltext_applier.clone()])
2272 .cache(CacheStrategy::EnableAll(cache.clone()));
2273
2274 let mut metrics = ReaderMetrics::default();
2275 let (_context, selection) = builder
2276 .build_reader_input(&mut metrics)
2277 .await
2278 .unwrap()
2279 .unwrap();
2280
2281 assert_eq!(selection.row_group_count(), 2);
2283 assert_eq!(50, selection.get(0).unwrap().row_count());
2284 assert_eq!(50, selection.get(1).unwrap().row_count());
2285
2286 assert_eq!(metrics.filter_metrics.rg_total, 4);
2288 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2289 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2290 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2291
2292 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2295 args: vec![col("text_tantivy"), "lazy".lit()],
2296 func: matches_func(),
2297 })];
2298
2299 let fulltext_applier = FulltextIndexApplierBuilder::new(
2300 FILE_DIR.to_string(),
2301 PathType::Bare,
2302 object_store.clone(),
2303 env.get_puffin_manager(),
2304 &metadata,
2305 )
2306 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2307 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2308 .build(&preds)
2309 .unwrap()
2310 .map(Arc::new);
2311
2312 let builder = ParquetReaderBuilder::new(
2313 FILE_DIR.to_string(),
2314 PathType::Bare,
2315 handle.clone(),
2316 object_store.clone(),
2317 )
2318 .predicate(Some(Predicate::new(preds)))
2319 .fulltext_index_appliers([None, fulltext_applier.clone()])
2320 .cache(CacheStrategy::EnableAll(cache.clone()));
2321
2322 let mut metrics = ReaderMetrics::default();
2323 let (_context, selection) = builder
2324 .build_reader_input(&mut metrics)
2325 .await
2326 .unwrap()
2327 .unwrap();
2328
2329 assert_eq!(selection.row_group_count(), 2);
2331 assert_eq!(50, selection.get(2).unwrap().row_count());
2332 assert_eq!(50, selection.get(3).unwrap().row_count());
2333
2334 assert_eq!(metrics.filter_metrics.rg_total, 4);
2336 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2337 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2338 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2339 }
2340}