1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub mod metadata;
32pub mod prefilter;
33pub mod push_decoder;
34pub mod read_columns;
35pub mod reader;
36pub mod row_group;
37pub mod row_selection;
38pub(crate) mod stats;
39pub mod writer;
40
41pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
43
44pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
50pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
56
57#[derive(Debug, Clone)]
59pub struct WriteOptions {
60 pub write_buffer_size: ReadableSize,
62 pub row_group_size: usize,
64 pub max_file_size: Option<usize>,
68}
69
70impl Default for WriteOptions {
71 fn default() -> Self {
72 WriteOptions {
73 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
74 row_group_size: DEFAULT_ROW_GROUP_SIZE,
75 max_file_size: None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct SstInfo {
83 pub file_id: FileId,
85 pub time_range: FileTimeRange,
88 pub file_size: u64,
90 pub max_row_group_uncompressed_size: u64,
92 pub num_rows: usize,
94 pub num_row_groups: u64,
96 pub file_metadata: Option<Arc<ParquetMetaData>>,
98 pub index_metadata: IndexOutput,
100 pub num_series: u64,
102}
103
104#[cfg(test)]
105mod tests {
106 use std::collections::HashSet;
107 use std::sync::Arc;
108
109 use api::v1::{OpType, SemanticType};
110 use common_function::function::FunctionRef;
111 use common_function::function_factory::ScalarFunctionFactory;
112 use common_function::scalars::matches::MatchesFunction;
113 use common_function::scalars::matches_term::MatchesTermFunction;
114 use common_time::Timestamp;
115 use datafusion_common::{Column, ScalarValue};
116 use datafusion_expr::expr::ScalarFunction;
117 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
118 use datatypes::arrow;
119 use datatypes::arrow::array::{
120 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
121 TimestampMillisecondArray, UInt8Array, UInt64Array,
122 };
123 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
124 use datatypes::arrow::util::pretty::pretty_format_batches;
125 use datatypes::prelude::ConcreteDataType;
126 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
127 use object_store::ObjectStore;
128 use parquet::arrow::AsyncArrowWriter;
129 use parquet::basic::{Compression, Encoding, ZstdLevel};
130 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
131 use parquet::file::properties::WriterProperties;
132 use store_api::codec::PrimaryKeyEncoding;
133 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
134 use store_api::region_request::PathType;
135 use store_api::storage::{ColumnSchema, RegionId};
136 use table::predicate::Predicate;
137 use tokio_util::compat::FuturesAsyncWriteCompatExt;
138
139 use super::*;
140 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
141 use crate::cache::index::result_cache::PredicateKey;
142 use crate::cache::test_util::assert_parquet_metadata_equal;
143 use crate::cache::{CacheManager, CacheStrategy};
144 use crate::config::IndexConfig;
145 use crate::read::FlatSource;
146 use crate::region::options::{IndexOptions, InvertedIndexOptions};
147 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
148 use crate::sst::file_purger::NoopFilePurger;
149 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
150 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
151 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
152 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
153 use crate::sst::parquet::flat_format::FlatWriteFormat;
154 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
155 use crate::sst::parquet::row_selection::RowGroupSelection;
156 use crate::sst::parquet::writer::ParquetWriter;
157 use crate::sst::{
158 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
159 };
160 use crate::test_util::TestEnv;
161 use crate::test_util::sst_util::{
162 build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
163 new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
164 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
165 sst_region_metadata_with_encoding,
166 };
167
168 const FILE_DIR: &str = "/";
169 const REGION_ID: RegionId = RegionId::new(0, 0);
170
171 #[derive(Clone)]
172 struct FixedPathProvider {
173 region_file_id: RegionFileId,
174 }
175
176 impl FilePathProvider for FixedPathProvider {
177 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
178 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
179 }
180
181 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
182 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
183 }
184
185 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
186 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
187 }
188 }
189
190 struct NoopIndexBuilder;
191
192 #[async_trait::async_trait]
193 impl IndexerBuilder for NoopIndexBuilder {
194 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
195 Indexer::default()
196 }
197 }
198
199 #[tokio::test]
200 async fn test_write_read() {
201 let mut env = TestEnv::new().await;
202 let object_store = env.init_object_store_manager();
203 let handle = sst_file_handle(0, 1000);
204 let file_path = FixedPathProvider {
205 region_file_id: handle.file_id(),
206 };
207 let metadata = Arc::new(sst_region_metadata());
208 let source = new_flat_source_from_record_batches(vec![
209 new_record_batch_by_range(&["a", "d"], 0, 60),
210 new_record_batch_by_range(&["b", "f"], 0, 40),
211 new_record_batch_by_range(&["b", "h"], 100, 200),
212 ]);
213 let write_opts = WriteOptions {
215 row_group_size: 50,
216 ..Default::default()
217 };
218
219 let mut metrics = Metrics::new(WriteType::Flush);
220 let mut writer = ParquetWriter::new_with_object_store(
221 object_store.clone(),
222 metadata.clone(),
223 IndexConfig::default(),
224 NoopIndexBuilder,
225 file_path,
226 &mut metrics,
227 )
228 .await;
229
230 let info = writer
231 .write_all_flat_as_primary_key(source, None, &write_opts)
232 .await
233 .unwrap()
234 .remove(0);
235 assert_eq!(200, info.num_rows);
236 assert!(info.file_size > 0);
237 assert_eq!(
238 (
239 Timestamp::new_millisecond(0),
240 Timestamp::new_millisecond(199)
241 ),
242 info.time_range
243 );
244
245 let builder = ParquetReaderBuilder::new(
246 FILE_DIR.to_string(),
247 PathType::Bare,
248 handle.clone(),
249 object_store,
250 );
251 let mut reader = builder.build().await.unwrap().unwrap();
252 check_record_batch_reader_result(
253 &mut reader,
254 &[
255 new_record_batch_by_range(&["a", "d"], 0, 50),
256 new_record_batch_by_range(&["a", "d"], 50, 60),
257 new_record_batch_by_range(&["b", "f"], 0, 40),
258 new_record_batch_by_range(&["b", "h"], 100, 150),
259 new_record_batch_by_range(&["b", "h"], 150, 200),
260 ],
261 )
262 .await;
263 }
264
265 #[tokio::test]
266 async fn test_read_with_cache() {
267 let mut env = TestEnv::new().await;
268 let object_store = env.init_object_store_manager();
269 let handle = sst_file_handle(0, 1000);
270 let metadata = Arc::new(sst_region_metadata());
271 let source = new_flat_source_from_record_batches(vec![
272 new_record_batch_by_range(&["a", "d"], 0, 60),
273 new_record_batch_by_range(&["b", "f"], 0, 40),
274 new_record_batch_by_range(&["b", "h"], 100, 200),
275 ]);
276 let write_opts = WriteOptions {
278 row_group_size: 50,
279 ..Default::default()
280 };
281 let mut metrics = Metrics::new(WriteType::Flush);
283 let mut writer = ParquetWriter::new_with_object_store(
284 object_store.clone(),
285 metadata.clone(),
286 IndexConfig::default(),
287 NoopIndexBuilder,
288 FixedPathProvider {
289 region_file_id: handle.file_id(),
290 },
291 &mut metrics,
292 )
293 .await;
294
295 let sst_info = writer
296 .write_all_flat_as_primary_key(source, None, &write_opts)
297 .await
298 .unwrap()
299 .remove(0);
300
301 let cache = CacheStrategy::EnableAll(Arc::new(
303 CacheManager::builder()
304 .page_cache_size(64 * 1024 * 1024)
305 .build(),
306 ));
307 let builder = ParquetReaderBuilder::new(
308 FILE_DIR.to_string(),
309 PathType::Bare,
310 handle.clone(),
311 object_store,
312 )
313 .cache(cache.clone());
314 for _ in 0..3 {
315 let mut reader = builder.build().await.unwrap().unwrap();
316 check_record_batch_reader_result(
317 &mut reader,
318 &[
319 new_record_batch_by_range(&["a", "d"], 0, 50),
320 new_record_batch_by_range(&["a", "d"], 50, 60),
321 new_record_batch_by_range(&["b", "f"], 0, 40),
322 new_record_batch_by_range(&["b", "h"], 100, 150),
323 new_record_batch_by_range(&["b", "h"], 150, 200),
324 ],
325 )
326 .await;
327 }
328
329 let parquet_meta = sst_info.file_metadata.unwrap();
330 let get_ranges = |row_group_idx: usize| {
331 let row_group = parquet_meta.row_group(row_group_idx);
332 let mut ranges = Vec::with_capacity(row_group.num_columns());
333 for i in 0..row_group.num_columns() {
334 let (start, length) = row_group.column(i).byte_range();
335 ranges.push(start..start + length);
336 }
337
338 ranges
339 };
340
341 for i in 0..4 {
343 let lookup = cache
344 .get_page_ranges(handle.file_id().file_id(), i, &get_ranges(i))
345 .unwrap();
346 assert!(lookup.is_fully_cached());
347 }
348 let missing_range = 0..10;
349 let lookup = cache
350 .get_page_ranges(
351 handle.file_id().file_id(),
352 5,
353 std::slice::from_ref(&missing_range),
354 )
355 .unwrap();
356 assert_eq!(vec![0..10], lookup.missing_ranges);
357 }
358
359 #[tokio::test]
360 async fn test_parquet_metadata_eq() {
361 let mut env = crate::test_util::TestEnv::new().await;
363 let object_store = env.init_object_store_manager();
364 let handle = sst_file_handle(0, 1000);
365 let metadata = Arc::new(sst_region_metadata());
366 let source = new_flat_source_from_record_batches(vec![
367 new_record_batch_by_range(&["a", "d"], 0, 60),
368 new_record_batch_by_range(&["b", "f"], 0, 40),
369 new_record_batch_by_range(&["b", "h"], 100, 200),
370 ]);
371 let write_opts = WriteOptions {
372 row_group_size: 50,
373 ..Default::default()
374 };
375
376 let mut metrics = Metrics::new(WriteType::Flush);
379 let mut writer = ParquetWriter::new_with_object_store(
380 object_store.clone(),
381 metadata.clone(),
382 IndexConfig::default(),
383 NoopIndexBuilder,
384 FixedPathProvider {
385 region_file_id: handle.file_id(),
386 },
387 &mut metrics,
388 )
389 .await;
390
391 let sst_info = writer
392 .write_all_flat_as_primary_key(source, None, &write_opts)
393 .await
394 .unwrap()
395 .remove(0);
396 let writer_metadata = sst_info.file_metadata.unwrap();
397
398 let builder = ParquetReaderBuilder::new(
400 FILE_DIR.to_string(),
401 PathType::Bare,
402 handle.clone(),
403 object_store,
404 )
405 .page_index_policy(PageIndexPolicy::Optional);
406 let reader = builder.build().await.unwrap().unwrap();
407 let reader_metadata = reader.parquet_metadata();
408 let cached_writer_metadata =
409 crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
410 .unwrap()
411 .parquet_metadata();
412
413 assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
414 }
415
416 #[tokio::test]
417 async fn test_read_with_tag_filter() {
418 let mut env = TestEnv::new().await;
419 let object_store = env.init_object_store_manager();
420 let handle = sst_file_handle(0, 1000);
421 let metadata = Arc::new(sst_region_metadata());
422 let source = new_flat_source_from_record_batches(vec![
423 new_record_batch_by_range(&["a", "d"], 0, 60),
424 new_record_batch_by_range(&["b", "f"], 0, 40),
425 new_record_batch_by_range(&["b", "h"], 100, 200),
426 ]);
427 let write_opts = WriteOptions {
429 row_group_size: 50,
430 ..Default::default()
431 };
432 let mut metrics = Metrics::new(WriteType::Flush);
434 let mut writer = ParquetWriter::new_with_object_store(
435 object_store.clone(),
436 metadata.clone(),
437 IndexConfig::default(),
438 NoopIndexBuilder,
439 FixedPathProvider {
440 region_file_id: handle.file_id(),
441 },
442 &mut metrics,
443 )
444 .await;
445 writer
446 .write_all_flat_as_primary_key(source, None, &write_opts)
447 .await
448 .unwrap()
449 .remove(0);
450
451 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
453 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
454 op: Operator::Eq,
455 right: Box::new("a".lit()),
456 })]));
457
458 let builder = ParquetReaderBuilder::new(
459 FILE_DIR.to_string(),
460 PathType::Bare,
461 handle.clone(),
462 object_store,
463 )
464 .predicate(predicate);
465 let mut reader = builder.build().await.unwrap().unwrap();
466 check_record_batch_reader_result(
467 &mut reader,
468 &[
469 new_record_batch_by_range(&["a", "d"], 0, 50),
470 new_record_batch_by_range(&["a", "d"], 50, 60),
471 ],
472 )
473 .await;
474 }
475
476 #[tokio::test]
477 async fn test_read_empty_batch() {
478 let mut env = TestEnv::new().await;
479 let object_store = env.init_object_store_manager();
480 let handle = sst_file_handle(0, 1000);
481 let metadata = Arc::new(sst_region_metadata());
482 let source = new_flat_source_from_record_batches(vec![
483 new_record_batch_by_range(&["a", "z"], 0, 0),
484 new_record_batch_by_range(&["a", "z"], 100, 100),
485 new_record_batch_by_range(&["a", "z"], 200, 230),
486 ]);
487 let write_opts = WriteOptions {
489 row_group_size: 50,
490 ..Default::default()
491 };
492 let mut metrics = Metrics::new(WriteType::Flush);
494 let mut writer = ParquetWriter::new_with_object_store(
495 object_store.clone(),
496 metadata.clone(),
497 IndexConfig::default(),
498 NoopIndexBuilder,
499 FixedPathProvider {
500 region_file_id: handle.file_id(),
501 },
502 &mut metrics,
503 )
504 .await;
505 writer
506 .write_all_flat_as_primary_key(source, None, &write_opts)
507 .await
508 .unwrap()
509 .remove(0);
510
511 let builder = ParquetReaderBuilder::new(
512 FILE_DIR.to_string(),
513 PathType::Bare,
514 handle.clone(),
515 object_store,
516 );
517 let mut reader = builder.build().await.unwrap().unwrap();
518 check_record_batch_reader_result(
519 &mut reader,
520 &[new_record_batch_by_range(&["a", "z"], 200, 230)],
521 )
522 .await;
523 }
524
525 #[tokio::test]
526 async fn test_read_with_field_filter() {
527 let mut env = TestEnv::new().await;
528 let object_store = env.init_object_store_manager();
529 let handle = sst_file_handle(0, 1000);
530 let metadata = Arc::new(sst_region_metadata());
531 let source = new_flat_source_from_record_batches(vec![
532 new_record_batch_by_range(&["a", "d"], 0, 60),
533 new_record_batch_by_range(&["b", "f"], 0, 40),
534 new_record_batch_by_range(&["b", "h"], 100, 200),
535 ]);
536 let write_opts = WriteOptions {
538 row_group_size: 50,
539 ..Default::default()
540 };
541 let mut metrics = Metrics::new(WriteType::Flush);
543 let mut writer = ParquetWriter::new_with_object_store(
544 object_store.clone(),
545 metadata.clone(),
546 IndexConfig::default(),
547 NoopIndexBuilder,
548 FixedPathProvider {
549 region_file_id: handle.file_id(),
550 },
551 &mut metrics,
552 )
553 .await;
554
555 writer
556 .write_all_flat_as_primary_key(source, None, &write_opts)
557 .await
558 .unwrap()
559 .remove(0);
560
561 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
563 left: Box::new(Expr::Column(Column::from_name("field_0"))),
564 op: Operator::GtEq,
565 right: Box::new(150u64.lit()),
566 })]));
567
568 let builder = ParquetReaderBuilder::new(
569 FILE_DIR.to_string(),
570 PathType::Bare,
571 handle.clone(),
572 object_store,
573 )
574 .predicate(predicate);
575 let mut reader = builder.build().await.unwrap().unwrap();
576 check_record_batch_reader_result(
577 &mut reader,
578 &[new_record_batch_by_range(&["b", "h"], 150, 200)],
579 )
580 .await;
581 }
582
583 #[tokio::test]
584 async fn test_read_large_binary() {
585 let mut env = TestEnv::new().await;
586 let object_store = env.init_object_store_manager();
587 let handle = sst_file_handle(0, 1000);
588 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
589
590 let write_opts = WriteOptions {
591 row_group_size: 50,
592 ..Default::default()
593 };
594
595 let metadata = build_test_binary_test_region_metadata();
596 let json = metadata.to_json().unwrap();
597 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
598
599 let props_builder = WriterProperties::builder()
600 .set_key_value_metadata(Some(vec![key_value_meta]))
601 .set_compression(Compression::ZSTD(ZstdLevel::default()))
602 .set_encoding(Encoding::PLAIN)
603 .set_max_row_group_row_count(Some(write_opts.row_group_size));
604
605 let writer_props = props_builder.build();
606
607 let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
608 let fields: Vec<_> = write_format
609 .arrow_schema()
610 .fields()
611 .into_iter()
612 .map(|field| {
613 let data_type = field.data_type().clone();
614 if data_type == DataType::Binary {
615 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
616 } else {
617 Field::new(field.name(), data_type, field.is_nullable())
618 }
619 })
620 .collect();
621
622 let arrow_schema = Arc::new(Schema::new(fields));
623
624 assert_eq!(
626 &DataType::LargeBinary,
627 arrow_schema.field_with_name("field_0").unwrap().data_type()
628 );
629 let mut writer = AsyncArrowWriter::try_new(
630 object_store
631 .writer_with(&file_path)
632 .concurrent(DEFAULT_WRITE_CONCURRENCY)
633 .await
634 .map(|w| w.into_futures_async_write().compat_write())
635 .unwrap(),
636 arrow_schema.clone(),
637 Some(writer_props),
638 )
639 .unwrap();
640
641 let batch = new_record_batch_with_binary(&["a"], 0, 60);
642 let arrays: Vec<_> = batch
643 .columns()
644 .iter()
645 .map(|array| {
646 let data_type = array.data_type().clone();
647 if data_type == DataType::Binary {
648 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
649 } else {
650 array.clone()
651 }
652 })
653 .collect();
654 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
655
656 writer.write(&result).await.unwrap();
657 writer.close().await.unwrap();
658
659 let builder = ParquetReaderBuilder::new(
660 FILE_DIR.to_string(),
661 PathType::Bare,
662 handle.clone(),
663 object_store,
664 );
665 let mut reader = builder.build().await.unwrap().unwrap();
666 check_record_batch_reader_result(
667 &mut reader,
668 &[
669 new_record_batch_with_binary(&["a"], 0, 50),
670 new_record_batch_with_binary(&["a"], 50, 60),
671 ],
672 )
673 .await;
674 }
675
676 #[tokio::test]
677 async fn test_write_multiple_files() {
678 common_telemetry::init_default_ut_logging();
679 let mut env = TestEnv::new().await;
681 let object_store = env.init_object_store_manager();
682 let metadata = Arc::new(sst_region_metadata());
683 let batches = vec![
684 new_record_batch_by_range(&["a", "d"], 0, 1000),
685 new_record_batch_by_range(&["b", "f"], 0, 1000),
686 new_record_batch_by_range(&["c", "g"], 0, 1000),
687 new_record_batch_by_range(&["b", "h"], 100, 200),
688 new_record_batch_by_range(&["b", "h"], 200, 300),
689 new_record_batch_by_range(&["b", "h"], 300, 1000),
690 ];
691 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
692
693 let source = new_flat_source_from_record_batches(batches);
694 let write_opts = WriteOptions {
695 row_group_size: 50,
696 max_file_size: Some(1024 * 16),
697 ..Default::default()
698 };
699
700 let path_provider = RegionFilePathFactory {
701 table_dir: "test".to_string(),
702 path_type: PathType::Bare,
703 };
704 let mut metrics = Metrics::new(WriteType::Flush);
705 let mut writer = ParquetWriter::new_with_object_store(
706 object_store.clone(),
707 metadata.clone(),
708 IndexConfig::default(),
709 NoopIndexBuilder,
710 path_provider,
711 &mut metrics,
712 )
713 .await;
714
715 let files = writer
716 .write_all_flat_as_primary_key(source, None, &write_opts)
717 .await
718 .unwrap();
719 assert_eq!(2, files.len());
720
721 let mut rows_read = 0;
722 for f in &files {
723 let file_handle = sst_file_handle_with_file_id(
724 f.file_id,
725 f.time_range.0.value(),
726 f.time_range.1.value(),
727 );
728 let builder = ParquetReaderBuilder::new(
729 "test".to_string(),
730 PathType::Bare,
731 file_handle,
732 object_store.clone(),
733 );
734 let mut reader = builder.build().await.unwrap().unwrap();
735 while let Some(batch) = reader.next_record_batch().await.unwrap() {
736 rows_read += batch.num_rows();
737 }
738 }
739 assert_eq!(total_rows, rows_read);
740 }
741
742 #[tokio::test]
743 async fn test_write_read_with_index() {
744 let mut env = TestEnv::new().await;
745 let object_store = env.init_object_store_manager();
746 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
747 let metadata = Arc::new(sst_region_metadata());
748 let row_group_size = 50;
749
750 let source = new_flat_source_from_record_batches(vec![
751 new_record_batch_by_range(&["a", "d"], 0, 20),
752 new_record_batch_by_range(&["b", "d"], 0, 20),
753 new_record_batch_by_range(&["c", "d"], 0, 20),
754 new_record_batch_by_range(&["c", "f"], 0, 40),
755 new_record_batch_by_range(&["c", "h"], 100, 200),
756 ]);
757 let write_opts = WriteOptions {
759 row_group_size,
760 ..Default::default()
761 };
762
763 let puffin_manager = env
764 .get_puffin_manager()
765 .build(object_store.clone(), file_path.clone());
766 let intermediate_manager = env.get_intermediate_manager();
767
768 let indexer_builder = IndexerBuilderImpl {
769 build_type: IndexBuildType::Flush,
770 metadata: metadata.clone(),
771 row_group_size,
772 puffin_manager,
773 write_cache_enabled: false,
774 intermediate_manager,
775 index_options: IndexOptions {
776 inverted_index: InvertedIndexOptions {
777 segment_row_count: 1,
778 ..Default::default()
779 },
780 },
781 inverted_index_config: Default::default(),
782 fulltext_index_config: Default::default(),
783 bloom_filter_index_config: Default::default(),
784 #[cfg(feature = "vector_index")]
785 vector_index_config: Default::default(),
786 };
787
788 let mut metrics = Metrics::new(WriteType::Flush);
789 let mut writer = ParquetWriter::new_with_object_store(
790 object_store.clone(),
791 metadata.clone(),
792 IndexConfig::default(),
793 indexer_builder,
794 file_path.clone(),
795 &mut metrics,
796 )
797 .await;
798
799 let info = writer
800 .write_all_flat_as_primary_key(source, None, &write_opts)
801 .await
802 .unwrap()
803 .remove(0);
804 assert_eq!(200, info.num_rows);
805 assert!(info.file_size > 0);
806 assert!(info.index_metadata.file_size > 0);
807
808 assert!(info.index_metadata.inverted_index.index_size > 0);
809 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
810 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
811
812 assert!(info.index_metadata.bloom_filter.index_size > 0);
813 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
814 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
815
816 assert_eq!(
817 (
818 Timestamp::new_millisecond(0),
819 Timestamp::new_millisecond(199)
820 ),
821 info.time_range
822 );
823
824 let handle = FileHandle::new(
825 FileMeta {
826 region_id: metadata.region_id,
827 file_id: info.file_id,
828 time_range: info.time_range,
829 level: 0,
830 file_size: info.file_size,
831 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
832 available_indexes: info.index_metadata.build_available_indexes(),
833 indexes: info.index_metadata.build_indexes(),
834 index_file_size: info.index_metadata.file_size,
835 index_version: 0,
836 num_row_groups: info.num_row_groups,
837 num_rows: info.num_rows as u64,
838 sequence: None,
839 partition_expr: match &metadata.partition_expr {
840 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
841 .expect("partition expression should be valid JSON"),
842 None => None,
843 },
844 num_series: 0,
845 ..Default::default()
846 },
847 Arc::new(NoopFilePurger),
848 );
849
850 let cache = Arc::new(
851 CacheManager::builder()
852 .index_result_cache_size(1024 * 1024)
853 .index_metadata_size(1024 * 1024)
854 .index_content_page_size(1024 * 1024)
855 .index_content_size(1024 * 1024)
856 .puffin_metadata_size(1024 * 1024)
857 .build(),
858 );
859 let index_result_cache = cache.index_result_cache().unwrap();
860
861 let build_inverted_index_applier = |exprs: &[Expr]| {
862 InvertedIndexApplierBuilder::new(
863 FILE_DIR.to_string(),
864 PathType::Bare,
865 object_store.clone(),
866 &metadata,
867 HashSet::from_iter([0]),
868 env.get_puffin_manager(),
869 )
870 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
871 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
872 .build(exprs)
873 .unwrap()
874 .map(Arc::new)
875 };
876
877 let build_bloom_filter_applier = |exprs: &[Expr]| {
878 BloomFilterIndexApplierBuilder::new(
879 FILE_DIR.to_string(),
880 PathType::Bare,
881 object_store.clone(),
882 &metadata,
883 env.get_puffin_manager(),
884 )
885 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
886 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
887 .build(exprs)
888 .unwrap()
889 .map(Arc::new)
890 };
891
892 let preds = vec![col("tag_0").eq(lit("b"))];
909 let inverted_index_applier = build_inverted_index_applier(&preds);
910 let bloom_filter_applier = build_bloom_filter_applier(&preds);
911
912 let builder = ParquetReaderBuilder::new(
913 FILE_DIR.to_string(),
914 PathType::Bare,
915 handle.clone(),
916 object_store.clone(),
917 )
918 .predicate(Some(Predicate::new(preds)))
919 .inverted_index_appliers([inverted_index_applier.clone(), None])
920 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
921 .cache(CacheStrategy::EnableAll(cache.clone()));
922
923 let mut metrics = ReaderMetrics::default();
924 let (context, selection) = builder
925 .build_reader_input(&mut metrics)
926 .await
927 .unwrap()
928 .unwrap();
929 let mut reader = ParquetReader::new(Arc::new(context), selection)
930 .await
931 .unwrap();
932 check_record_batch_reader_result(
933 &mut reader,
934 &[new_record_batch_by_range(&["b", "d"], 0, 20)],
935 )
936 .await;
937
938 assert_eq!(metrics.filter_metrics.rg_total, 4);
939 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
940 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
941 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
942 let plan = inverted_index_applier
943 .as_ref()
944 .unwrap()
945 .plan_for_sst(&metadata)
946 .unwrap()
947 .unwrap();
948 let cached = index_result_cache
949 .get(&plan.predicate_key, handle.file_id().file_id())
950 .unwrap();
951 assert!(cached.contains_row_group(0));
953 assert!(cached.contains_row_group(1));
954 assert!(cached.contains_row_group(2));
955 assert!(cached.contains_row_group(3));
956
957 let preds = vec![
972 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
973 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
974 col("tag_1").eq(lit("d")),
975 ];
976 let inverted_index_applier = build_inverted_index_applier(&preds);
977 let bloom_filter_applier = build_bloom_filter_applier(&preds);
978
979 let builder = ParquetReaderBuilder::new(
980 FILE_DIR.to_string(),
981 PathType::Bare,
982 handle.clone(),
983 object_store.clone(),
984 )
985 .predicate(Some(Predicate::new(preds)))
986 .inverted_index_appliers([inverted_index_applier.clone(), None])
987 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
988 .cache(CacheStrategy::EnableAll(cache.clone()));
989
990 let mut metrics = ReaderMetrics::default();
991 let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
992 assert!(read_input.is_none());
993
994 assert_eq!(metrics.filter_metrics.rg_total, 4);
995 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
996 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
997 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
998 let bloom_predicates = bloom_filter_applier
999 .as_ref()
1000 .unwrap()
1001 .compatible_predicate_for_sst(&metadata)
1002 .unwrap();
1003 let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1004 let cached = index_result_cache
1005 .get(&bloom_predicate_key, handle.file_id().file_id())
1006 .unwrap();
1007 assert!(cached.contains_row_group(2));
1008 assert!(cached.contains_row_group(3));
1009 assert!(!cached.contains_row_group(0));
1010 assert!(!cached.contains_row_group(1));
1011
1012 let preds = vec![col("tag_1").eq(lit("d"))];
1033 let inverted_index_applier = build_inverted_index_applier(&preds);
1034 let bloom_filter_applier = build_bloom_filter_applier(&preds);
1035
1036 let builder = ParquetReaderBuilder::new(
1037 FILE_DIR.to_string(),
1038 PathType::Bare,
1039 handle.clone(),
1040 object_store.clone(),
1041 )
1042 .predicate(Some(Predicate::new(preds)))
1043 .inverted_index_appliers([inverted_index_applier.clone(), None])
1044 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1045 .cache(CacheStrategy::EnableAll(cache.clone()));
1046
1047 let mut metrics = ReaderMetrics::default();
1048 let (context, selection) = builder
1049 .build_reader_input(&mut metrics)
1050 .await
1051 .unwrap()
1052 .unwrap();
1053 let mut reader = ParquetReader::new(Arc::new(context), selection)
1054 .await
1055 .unwrap();
1056 check_record_batch_reader_result(
1057 &mut reader,
1058 &[
1059 new_record_batch_by_range(&["a", "d"], 0, 20),
1060 new_record_batch_by_range(&["b", "d"], 0, 20),
1061 new_record_batch_by_range(&["c", "d"], 0, 10),
1062 new_record_batch_by_range(&["c", "d"], 10, 20),
1063 ],
1064 )
1065 .await;
1066
1067 assert_eq!(metrics.filter_metrics.rg_total, 4);
1068 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1069 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1070 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1071 let bloom_predicates = bloom_filter_applier
1072 .as_ref()
1073 .unwrap()
1074 .compatible_predicate_for_sst(&metadata)
1075 .unwrap();
1076 let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
1077 let cached = index_result_cache
1078 .get(&bloom_predicate_key, handle.file_id().file_id())
1079 .unwrap();
1080 assert!(cached.contains_row_group(0));
1081 assert!(cached.contains_row_group(1));
1082 assert!(cached.contains_row_group(2));
1083 assert!(cached.contains_row_group(3));
1084 }
1085
1086 fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1087 assert!(end >= start);
1088 let metadata = build_test_binary_test_region_metadata();
1089 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1090
1091 let num_rows = end - start;
1092 let mut columns = Vec::new();
1093
1094 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1095 for _ in 0..num_rows {
1096 tag_0_builder.append_value(tags[0]);
1097 }
1098 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1099
1100 let values = (0..num_rows)
1101 .map(|_| "some data".as_bytes())
1102 .collect::<Vec<_>>();
1103 columns.push(
1104 Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1105 values,
1106 )) as ArrayRef,
1107 );
1108
1109 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1110 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1111
1112 let pk = new_primary_key(tags);
1113 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1114 for _ in 0..num_rows {
1115 pk_builder.append(&pk).unwrap();
1116 }
1117 columns.push(Arc::new(pk_builder.finish()));
1118
1119 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1120 columns.push(Arc::new(UInt8Array::from_value(
1121 OpType::Put as u8,
1122 num_rows,
1123 )));
1124
1125 RecordBatch::try_new(flat_schema, columns).unwrap()
1126 }
1127
1128 async fn check_record_batch_reader_result(
1129 reader: &mut ParquetReader,
1130 expected: &[RecordBatch],
1131 ) {
1132 let mut actual = Vec::new();
1133 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1134 actual.push(batch);
1135 }
1136 assert_eq!(
1137 pretty_format_batches(expected).unwrap().to_string(),
1138 pretty_format_batches(&actual).unwrap().to_string()
1139 );
1140 assert!(reader.next_record_batch().await.unwrap().is_none());
1141 }
1142
1143 fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
1144 let metadata = Arc::new(sst_region_metadata());
1145 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1146
1147 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1148 let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1149 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1150 let mut field_values = Vec::with_capacity(rows.len());
1151 let mut timestamps = Vec::with_capacity(rows.len());
1152
1153 for (tag_0, tag_1, ts) in rows {
1154 tag_0_builder.append_value(*tag_0);
1155 tag_1_builder.append_value(*tag_1);
1156 pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
1157 field_values.push(*ts as u64);
1158 timestamps.push(*ts);
1159 }
1160
1161 RecordBatch::try_new(
1162 flat_schema,
1163 vec![
1164 Arc::new(tag_0_builder.finish()) as ArrayRef,
1165 Arc::new(tag_1_builder.finish()) as ArrayRef,
1166 Arc::new(UInt64Array::from(field_values)) as ArrayRef,
1167 Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
1168 Arc::new(pk_builder.finish()) as ArrayRef,
1169 Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
1170 Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
1171 ],
1172 )
1173 .unwrap()
1174 }
1175
1176 fn new_record_batch_by_range_sparse(
1179 tags: &[&str],
1180 start: usize,
1181 end: usize,
1182 metadata: &Arc<RegionMetadata>,
1183 ) -> RecordBatch {
1184 assert!(end >= start);
1185 let flat_schema = to_flat_sst_arrow_schema(
1186 metadata,
1187 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1188 );
1189
1190 let num_rows = end - start;
1191 let mut columns: Vec<ArrayRef> = Vec::new();
1192
1193 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1197 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1198
1199 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1201 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1202
1203 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1207
1208 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1209 for _ in 0..num_rows {
1210 pk_builder.append(&pk).unwrap();
1211 }
1212 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1213
1214 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1216
1217 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1219
1220 RecordBatch::try_new(flat_schema, columns).unwrap()
1221 }
1222
1223 fn create_test_indexer_builder(
1225 env: &TestEnv,
1226 object_store: ObjectStore,
1227 file_path: RegionFilePathFactory,
1228 metadata: Arc<RegionMetadata>,
1229 row_group_size: usize,
1230 ) -> IndexerBuilderImpl {
1231 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1232 let intermediate_manager = env.get_intermediate_manager();
1233
1234 IndexerBuilderImpl {
1235 build_type: IndexBuildType::Flush,
1236 metadata,
1237 row_group_size,
1238 puffin_manager,
1239 write_cache_enabled: false,
1240 intermediate_manager,
1241 index_options: IndexOptions {
1242 inverted_index: InvertedIndexOptions {
1243 segment_row_count: 1,
1244 ..Default::default()
1245 },
1246 },
1247 inverted_index_config: Default::default(),
1248 fulltext_index_config: Default::default(),
1249 bloom_filter_index_config: Default::default(),
1250 #[cfg(feature = "vector_index")]
1251 vector_index_config: Default::default(),
1252 }
1253 }
1254
1255 async fn write_flat_sst(
1257 object_store: ObjectStore,
1258 metadata: Arc<RegionMetadata>,
1259 indexer_builder: IndexerBuilderImpl,
1260 file_path: RegionFilePathFactory,
1261 flat_source: FlatSource,
1262 write_opts: &WriteOptions,
1263 ) -> SstInfo {
1264 let mut metrics = Metrics::new(WriteType::Flush);
1265 let mut writer = ParquetWriter::new_with_object_store(
1266 object_store,
1267 metadata,
1268 IndexConfig::default(),
1269 indexer_builder,
1270 file_path,
1271 &mut metrics,
1272 )
1273 .await;
1274
1275 writer
1276 .write_all_flat(flat_source, None, write_opts)
1277 .await
1278 .unwrap()
1279 .remove(0)
1280 }
1281
1282 fn create_file_handle_from_sst_info(
1284 info: &SstInfo,
1285 metadata: &Arc<RegionMetadata>,
1286 ) -> FileHandle {
1287 FileHandle::new(
1288 FileMeta {
1289 region_id: metadata.region_id,
1290 file_id: info.file_id,
1291 time_range: info.time_range,
1292 level: 0,
1293 file_size: info.file_size,
1294 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1295 available_indexes: info.index_metadata.build_available_indexes(),
1296 indexes: info.index_metadata.build_indexes(),
1297 index_file_size: info.index_metadata.file_size,
1298 index_version: 0,
1299 num_row_groups: info.num_row_groups,
1300 num_rows: info.num_rows as u64,
1301 sequence: None,
1302 partition_expr: match &metadata.partition_expr {
1303 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1304 .expect("partition expression should be valid JSON"),
1305 None => None,
1306 },
1307 num_series: 0,
1308 ..Default::default()
1309 },
1310 Arc::new(NoopFilePurger),
1311 )
1312 }
1313
1314 fn create_test_cache() -> Arc<CacheManager> {
1316 Arc::new(
1317 CacheManager::builder()
1318 .index_result_cache_size(1024 * 1024)
1319 .index_metadata_size(1024 * 1024)
1320 .index_content_page_size(1024 * 1024)
1321 .index_content_size(1024 * 1024)
1322 .puffin_metadata_size(1024 * 1024)
1323 .build(),
1324 )
1325 }
1326
1327 #[tokio::test]
1328 async fn test_write_flat_with_index() {
1329 let mut env = TestEnv::new().await;
1330 let object_store = env.init_object_store_manager();
1331 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1332 let metadata = Arc::new(sst_region_metadata());
1333 let row_group_size = 50;
1334
1335 let flat_batches = vec![
1337 new_record_batch_by_range(&["a", "d"], 0, 20),
1338 new_record_batch_by_range(&["b", "d"], 0, 20),
1339 new_record_batch_by_range(&["c", "d"], 0, 20),
1340 new_record_batch_by_range(&["c", "f"], 0, 40),
1341 new_record_batch_by_range(&["c", "h"], 100, 200),
1342 ];
1343
1344 let flat_source = new_flat_source_from_record_batches(flat_batches);
1345
1346 let write_opts = WriteOptions {
1347 row_group_size,
1348 ..Default::default()
1349 };
1350
1351 let puffin_manager = env
1352 .get_puffin_manager()
1353 .build(object_store.clone(), file_path.clone());
1354 let intermediate_manager = env.get_intermediate_manager();
1355
1356 let indexer_builder = IndexerBuilderImpl {
1357 build_type: IndexBuildType::Flush,
1358 metadata: metadata.clone(),
1359 row_group_size,
1360 puffin_manager,
1361 write_cache_enabled: false,
1362 intermediate_manager,
1363 index_options: IndexOptions {
1364 inverted_index: InvertedIndexOptions {
1365 segment_row_count: 1,
1366 ..Default::default()
1367 },
1368 },
1369 inverted_index_config: Default::default(),
1370 fulltext_index_config: Default::default(),
1371 bloom_filter_index_config: Default::default(),
1372 #[cfg(feature = "vector_index")]
1373 vector_index_config: Default::default(),
1374 };
1375
1376 let mut metrics = Metrics::new(WriteType::Flush);
1377 let mut writer = ParquetWriter::new_with_object_store(
1378 object_store.clone(),
1379 metadata.clone(),
1380 IndexConfig::default(),
1381 indexer_builder,
1382 file_path.clone(),
1383 &mut metrics,
1384 )
1385 .await;
1386
1387 let info = writer
1388 .write_all_flat(flat_source, None, &write_opts)
1389 .await
1390 .unwrap()
1391 .remove(0);
1392 assert_eq!(200, info.num_rows);
1393 assert!(info.file_size > 0);
1394 assert!(info.index_metadata.file_size > 0);
1395
1396 assert!(info.index_metadata.inverted_index.index_size > 0);
1397 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1398 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1399
1400 assert!(info.index_metadata.bloom_filter.index_size > 0);
1401 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1402 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1403
1404 assert_eq!(
1405 (
1406 Timestamp::new_millisecond(0),
1407 Timestamp::new_millisecond(199)
1408 ),
1409 info.time_range
1410 );
1411 }
1412
1413 #[tokio::test]
1414 async fn test_read_with_override_sequence() {
1415 let mut env = TestEnv::new().await;
1416 let object_store = env.init_object_store_manager();
1417 let handle = sst_file_handle(0, 1000);
1418 let file_path = FixedPathProvider {
1419 region_file_id: handle.file_id(),
1420 };
1421 let metadata = Arc::new(sst_region_metadata());
1422
1423 let source = new_flat_source_from_record_batches(vec![
1425 new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1426 new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1427 ]);
1428
1429 let write_opts = WriteOptions {
1430 row_group_size: 50,
1431 ..Default::default()
1432 };
1433
1434 let mut metrics = Metrics::new(WriteType::Flush);
1435 let mut writer = ParquetWriter::new_with_object_store(
1436 object_store.clone(),
1437 metadata.clone(),
1438 IndexConfig::default(),
1439 NoopIndexBuilder,
1440 file_path,
1441 &mut metrics,
1442 )
1443 .await;
1444
1445 writer
1446 .write_all_flat_as_primary_key(source, None, &write_opts)
1447 .await
1448 .unwrap()
1449 .remove(0);
1450
1451 let builder = ParquetReaderBuilder::new(
1453 FILE_DIR.to_string(),
1454 PathType::Bare,
1455 handle.clone(),
1456 object_store.clone(),
1457 );
1458 let mut reader = builder.build().await.unwrap().unwrap();
1459 let mut normal_batches = Vec::new();
1460 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1461 normal_batches.push(batch);
1462 }
1463
1464 let custom_sequence = 12345u64;
1466 let file_meta = handle.meta_ref();
1467 let mut override_file_meta = file_meta.clone();
1468 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1469 let override_handle = FileHandle::new(
1470 override_file_meta,
1471 Arc::new(crate::sst::file_purger::NoopFilePurger),
1472 );
1473
1474 let builder = ParquetReaderBuilder::new(
1475 FILE_DIR.to_string(),
1476 PathType::Bare,
1477 override_handle,
1478 object_store.clone(),
1479 );
1480 let mut reader = builder.build().await.unwrap().unwrap();
1481 let mut override_batches = Vec::new();
1482 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1483 override_batches.push(batch);
1484 }
1485
1486 assert_eq!(normal_batches.len(), override_batches.len());
1488 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1489 let expected_batch = {
1490 let mut columns = normal.columns().to_vec();
1491 let num_cols = columns.len();
1492 columns[num_cols - 2] =
1493 Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1494 RecordBatch::try_new(normal.schema(), columns).unwrap()
1495 };
1496
1497 assert_eq!(*override_batch, expected_batch);
1499 }
1500 }
1501
1502 #[tokio::test]
1503 async fn test_write_flat_read_with_inverted_index() {
1504 let mut env = TestEnv::new().await;
1505 let object_store = env.init_object_store_manager();
1506 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1507 let metadata = Arc::new(sst_region_metadata());
1508 let row_group_size = 100;
1509
1510 let flat_batches = vec![
1518 new_record_batch_by_range(&["a", "d"], 0, 50),
1519 new_record_batch_by_range(&["b", "d"], 50, 100),
1520 new_record_batch_by_range(&["c", "d"], 100, 150),
1521 new_record_batch_by_range(&["c", "f"], 150, 200),
1522 ];
1523
1524 let flat_source = new_flat_source_from_record_batches(flat_batches);
1525
1526 let write_opts = WriteOptions {
1527 row_group_size,
1528 ..Default::default()
1529 };
1530
1531 let indexer_builder = create_test_indexer_builder(
1532 &env,
1533 object_store.clone(),
1534 file_path.clone(),
1535 metadata.clone(),
1536 row_group_size,
1537 );
1538
1539 let info = write_flat_sst(
1540 object_store.clone(),
1541 metadata.clone(),
1542 indexer_builder,
1543 file_path.clone(),
1544 flat_source,
1545 &write_opts,
1546 )
1547 .await;
1548 assert_eq!(200, info.num_rows);
1549 assert!(info.file_size > 0);
1550 assert!(info.index_metadata.file_size > 0);
1551
1552 let handle = create_file_handle_from_sst_info(&info, &metadata);
1553
1554 let cache = create_test_cache();
1555
1556 let preds = vec![col("tag_0").eq(lit("b"))];
1559 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1560 FILE_DIR.to_string(),
1561 PathType::Bare,
1562 object_store.clone(),
1563 &metadata,
1564 HashSet::from_iter([0]),
1565 env.get_puffin_manager(),
1566 )
1567 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1568 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1569 .build(&preds)
1570 .unwrap()
1571 .map(Arc::new);
1572
1573 let builder = ParquetReaderBuilder::new(
1574 FILE_DIR.to_string(),
1575 PathType::Bare,
1576 handle.clone(),
1577 object_store.clone(),
1578 )
1579 .predicate(Some(Predicate::new(preds)))
1580 .inverted_index_appliers([inverted_index_applier.clone(), None])
1581 .cache(CacheStrategy::EnableAll(cache.clone()));
1582
1583 let mut metrics = ReaderMetrics::default();
1584 let (_context, selection) = builder
1585 .build_reader_input(&mut metrics)
1586 .await
1587 .unwrap()
1588 .unwrap();
1589
1590 assert_eq!(selection.row_group_count(), 1);
1592 assert_eq!(50, selection.get(0).unwrap().row_count());
1593
1594 assert_eq!(metrics.filter_metrics.rg_total, 2);
1596 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1597 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1598 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1599 }
1600
1601 #[tokio::test]
1602 async fn test_write_flat_read_with_bloom_filter() {
1603 let mut env = TestEnv::new().await;
1604 let object_store = env.init_object_store_manager();
1605 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1606 let metadata = Arc::new(sst_region_metadata());
1607 let row_group_size = 100;
1608
1609 let flat_batches = vec![
1617 new_record_batch_by_range(&["a", "d"], 0, 50),
1618 new_record_batch_by_range(&["b", "e"], 50, 100),
1619 new_record_batch_by_range(&["c", "d"], 100, 150),
1620 new_record_batch_by_range(&["c", "f"], 150, 200),
1621 ];
1622
1623 let flat_source = new_flat_source_from_record_batches(flat_batches);
1624
1625 let write_opts = WriteOptions {
1626 row_group_size,
1627 ..Default::default()
1628 };
1629
1630 let indexer_builder = create_test_indexer_builder(
1631 &env,
1632 object_store.clone(),
1633 file_path.clone(),
1634 metadata.clone(),
1635 row_group_size,
1636 );
1637
1638 let info = write_flat_sst(
1639 object_store.clone(),
1640 metadata.clone(),
1641 indexer_builder,
1642 file_path.clone(),
1643 flat_source,
1644 &write_opts,
1645 )
1646 .await;
1647 assert_eq!(200, info.num_rows);
1648 assert!(info.file_size > 0);
1649 assert!(info.index_metadata.file_size > 0);
1650
1651 let handle = create_file_handle_from_sst_info(&info, &metadata);
1652
1653 let cache = create_test_cache();
1654
1655 let preds = vec![
1658 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1659 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1660 col("tag_1").eq(lit("d")),
1661 ];
1662 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1663 FILE_DIR.to_string(),
1664 PathType::Bare,
1665 object_store.clone(),
1666 &metadata,
1667 env.get_puffin_manager(),
1668 )
1669 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1670 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1671 .build(&preds)
1672 .unwrap()
1673 .map(Arc::new);
1674
1675 let builder = ParquetReaderBuilder::new(
1676 FILE_DIR.to_string(),
1677 PathType::Bare,
1678 handle.clone(),
1679 object_store.clone(),
1680 )
1681 .predicate(Some(Predicate::new(preds)))
1682 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1683 .cache(CacheStrategy::EnableAll(cache.clone()));
1684
1685 let mut metrics = ReaderMetrics::default();
1686 let (_context, selection) = builder
1687 .build_reader_input(&mut metrics)
1688 .await
1689 .unwrap()
1690 .unwrap();
1691
1692 assert_eq!(selection.row_group_count(), 2);
1694 assert_eq!(50, selection.get(0).unwrap().row_count());
1695 assert_eq!(50, selection.get(1).unwrap().row_count());
1696
1697 assert_eq!(metrics.filter_metrics.rg_total, 2);
1699 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1700 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1701 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1702 }
1703
1704 #[tokio::test]
1705 async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
1706 let mut env = TestEnv::new().await;
1707 let object_store = env.init_object_store_manager();
1708 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1709 let metadata = Arc::new(sst_region_metadata());
1710 let row_group_size = 10;
1711
1712 let flat_source = new_flat_source_from_record_batches(vec![
1713 new_record_batch_by_range(&["a", "d"], 0, 3),
1714 new_record_batch_by_range(&["b", "d"], 3, 10),
1715 ]);
1716 let write_opts = WriteOptions {
1717 row_group_size,
1718 ..Default::default()
1719 };
1720 let indexer_builder = create_test_indexer_builder(
1721 &env,
1722 object_store.clone(),
1723 file_path.clone(),
1724 metadata.clone(),
1725 row_group_size,
1726 );
1727 let info = write_flat_sst(
1728 object_store.clone(),
1729 metadata.clone(),
1730 indexer_builder,
1731 file_path,
1732 flat_source,
1733 &write_opts,
1734 )
1735 .await;
1736 let handle = create_file_handle_from_sst_info(&info, &metadata);
1737
1738 let builder =
1739 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1740 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1741
1742 let mut metrics = ReaderMetrics::default();
1743 let (context, _) = builder
1744 .build_reader_input(&mut metrics)
1745 .await
1746 .unwrap()
1747 .unwrap();
1748 let selection = RowGroupSelection::from_row_ranges(
1749 vec![(0, std::iter::once(0..6).collect())],
1750 row_group_size,
1751 );
1752
1753 let mut reader = ParquetReader::new(Arc::new(context), selection)
1754 .await
1755 .unwrap();
1756 check_record_batch_reader_result(
1757 &mut reader,
1758 &[new_record_batch_by_range(&["a", "d"], 0, 3)],
1759 )
1760 .await;
1761 }
1762
1763 #[tokio::test]
1764 async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
1765 let mut env = TestEnv::new().await;
1766 let object_store = env.init_object_store_manager();
1767 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1768 let metadata = Arc::new(sst_region_metadata());
1769 let row_group_size = 8;
1770
1771 let flat_source = new_flat_source_from_record_batches(vec![
1772 new_record_batch_by_range(&["a", "d"], 0, 2),
1773 new_record_batch_by_range(&["b", "d"], 2, 4),
1774 new_record_batch_by_range(&["a", "d"], 4, 6),
1775 new_record_batch_by_range(&["c", "d"], 6, 8),
1776 ]);
1777 let write_opts = WriteOptions {
1778 row_group_size,
1779 ..Default::default()
1780 };
1781 let indexer_builder = create_test_indexer_builder(
1782 &env,
1783 object_store.clone(),
1784 file_path.clone(),
1785 metadata.clone(),
1786 row_group_size,
1787 );
1788 let info = write_flat_sst(
1789 object_store.clone(),
1790 metadata.clone(),
1791 indexer_builder,
1792 file_path,
1793 flat_source,
1794 &write_opts,
1795 )
1796 .await;
1797 let handle = create_file_handle_from_sst_info(&info, &metadata);
1798
1799 let builder =
1800 ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
1801 .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
1802
1803 let mut metrics = ReaderMetrics::default();
1804 let (context, _) = builder
1805 .build_reader_input(&mut metrics)
1806 .await
1807 .unwrap()
1808 .unwrap();
1809 let selection = RowGroupSelection::from_row_ranges(
1810 vec![(0, std::iter::once(0..8).collect())],
1811 row_group_size,
1812 );
1813
1814 let mut reader = ParquetReader::new(Arc::new(context), selection)
1815 .await
1816 .unwrap();
1817 check_record_batch_reader_result(
1818 &mut reader,
1819 &[new_record_batch_from_rows(&[
1820 ("a", "d", 0),
1821 ("a", "d", 1),
1822 ("a", "d", 4),
1823 ("a", "d", 5),
1824 ])],
1825 )
1826 .await;
1827 }
1828
1829 #[tokio::test]
1830 async fn test_write_flat_read_with_inverted_index_sparse() {
1831 common_telemetry::init_default_ut_logging();
1832
1833 let mut env = TestEnv::new().await;
1834 let object_store = env.init_object_store_manager();
1835 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1836 let metadata = Arc::new(sst_region_metadata_with_encoding(
1837 PrimaryKeyEncoding::Sparse,
1838 ));
1839 let row_group_size = 100;
1840
1841 let flat_batches = vec![
1849 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1850 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1851 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1852 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1853 ];
1854
1855 let flat_source = new_flat_source_from_record_batches(flat_batches);
1856
1857 let write_opts = WriteOptions {
1858 row_group_size,
1859 ..Default::default()
1860 };
1861
1862 let indexer_builder = create_test_indexer_builder(
1863 &env,
1864 object_store.clone(),
1865 file_path.clone(),
1866 metadata.clone(),
1867 row_group_size,
1868 );
1869
1870 let info = write_flat_sst(
1871 object_store.clone(),
1872 metadata.clone(),
1873 indexer_builder,
1874 file_path.clone(),
1875 flat_source,
1876 &write_opts,
1877 )
1878 .await;
1879 assert_eq!(200, info.num_rows);
1880 assert!(info.file_size > 0);
1881 assert!(info.index_metadata.file_size > 0);
1882
1883 let handle = create_file_handle_from_sst_info(&info, &metadata);
1884
1885 let cache = create_test_cache();
1886
1887 let preds = vec![col("tag_0").eq(lit("b"))];
1890 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1891 FILE_DIR.to_string(),
1892 PathType::Bare,
1893 object_store.clone(),
1894 &metadata,
1895 HashSet::from_iter([0]),
1896 env.get_puffin_manager(),
1897 )
1898 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1899 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1900 .build(&preds)
1901 .unwrap()
1902 .map(Arc::new);
1903
1904 let builder = ParquetReaderBuilder::new(
1905 FILE_DIR.to_string(),
1906 PathType::Bare,
1907 handle.clone(),
1908 object_store.clone(),
1909 )
1910 .predicate(Some(Predicate::new(preds)))
1911 .inverted_index_appliers([inverted_index_applier.clone(), None])
1912 .cache(CacheStrategy::EnableAll(cache.clone()));
1913
1914 let mut metrics = ReaderMetrics::default();
1915 let (_context, selection) = builder
1916 .build_reader_input(&mut metrics)
1917 .await
1918 .unwrap()
1919 .unwrap();
1920
1921 assert_eq!(selection.row_group_count(), 1);
1923 assert_eq!(50, selection.get(0).unwrap().row_count());
1924
1925 assert_eq!(metrics.filter_metrics.rg_total, 2);
1929 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1931 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1932 }
1933
1934 #[tokio::test]
1935 async fn test_write_flat_read_with_bloom_filter_sparse() {
1936 let mut env = TestEnv::new().await;
1937 let object_store = env.init_object_store_manager();
1938 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1939 let metadata = Arc::new(sst_region_metadata_with_encoding(
1940 PrimaryKeyEncoding::Sparse,
1941 ));
1942 let row_group_size = 100;
1943
1944 let flat_batches = vec![
1952 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1953 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1954 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1955 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1956 ];
1957
1958 let flat_source = new_flat_source_from_record_batches(flat_batches);
1959
1960 let write_opts = WriteOptions {
1961 row_group_size,
1962 ..Default::default()
1963 };
1964
1965 let indexer_builder = create_test_indexer_builder(
1966 &env,
1967 object_store.clone(),
1968 file_path.clone(),
1969 metadata.clone(),
1970 row_group_size,
1971 );
1972
1973 let info = write_flat_sst(
1974 object_store.clone(),
1975 metadata.clone(),
1976 indexer_builder,
1977 file_path.clone(),
1978 flat_source,
1979 &write_opts,
1980 )
1981 .await;
1982 assert_eq!(200, info.num_rows);
1983 assert!(info.file_size > 0);
1984 assert!(info.index_metadata.file_size > 0);
1985
1986 let handle = create_file_handle_from_sst_info(&info, &metadata);
1987
1988 let cache = create_test_cache();
1989
1990 let preds = vec![
1993 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1994 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1995 col("tag_1").eq(lit("d")),
1996 ];
1997 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1998 FILE_DIR.to_string(),
1999 PathType::Bare,
2000 object_store.clone(),
2001 &metadata,
2002 env.get_puffin_manager(),
2003 )
2004 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2005 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
2006 .build(&preds)
2007 .unwrap()
2008 .map(Arc::new);
2009
2010 let builder = ParquetReaderBuilder::new(
2011 FILE_DIR.to_string(),
2012 PathType::Bare,
2013 handle.clone(),
2014 object_store.clone(),
2015 )
2016 .predicate(Some(Predicate::new(preds)))
2017 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
2018 .cache(CacheStrategy::EnableAll(cache.clone()));
2019
2020 let mut metrics = ReaderMetrics::default();
2021 let (_context, selection) = builder
2022 .build_reader_input(&mut metrics)
2023 .await
2024 .unwrap()
2025 .unwrap();
2026
2027 assert_eq!(selection.row_group_count(), 2);
2029 assert_eq!(50, selection.get(0).unwrap().row_count());
2030 assert_eq!(50, selection.get(1).unwrap().row_count());
2031
2032 assert_eq!(metrics.filter_metrics.rg_total, 2);
2034 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2035 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
2036 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
2037 }
2038
2039 fn fulltext_region_metadata() -> RegionMetadata {
2042 let mut builder = RegionMetadataBuilder::new(REGION_ID);
2043 builder
2044 .push_column_metadata(ColumnMetadata {
2045 column_schema: ColumnSchema::new(
2046 "tag_0".to_string(),
2047 ConcreteDataType::string_datatype(),
2048 true,
2049 ),
2050 semantic_type: SemanticType::Tag,
2051 column_id: 0,
2052 })
2053 .push_column_metadata(ColumnMetadata {
2054 column_schema: ColumnSchema::new(
2055 "text_bloom".to_string(),
2056 ConcreteDataType::string_datatype(),
2057 true,
2058 )
2059 .with_fulltext_options(FulltextOptions {
2060 enable: true,
2061 analyzer: FulltextAnalyzer::English,
2062 case_sensitive: false,
2063 backend: FulltextBackend::Bloom,
2064 granularity: 1,
2065 false_positive_rate_in_10000: 50,
2066 })
2067 .unwrap(),
2068 semantic_type: SemanticType::Field,
2069 column_id: 1,
2070 })
2071 .push_column_metadata(ColumnMetadata {
2072 column_schema: ColumnSchema::new(
2073 "text_tantivy".to_string(),
2074 ConcreteDataType::string_datatype(),
2075 true,
2076 )
2077 .with_fulltext_options(FulltextOptions {
2078 enable: true,
2079 analyzer: FulltextAnalyzer::English,
2080 case_sensitive: false,
2081 backend: FulltextBackend::Tantivy,
2082 granularity: 1,
2083 false_positive_rate_in_10000: 50,
2084 })
2085 .unwrap(),
2086 semantic_type: SemanticType::Field,
2087 column_id: 2,
2088 })
2089 .push_column_metadata(ColumnMetadata {
2090 column_schema: ColumnSchema::new(
2091 "field_0".to_string(),
2092 ConcreteDataType::uint64_datatype(),
2093 true,
2094 ),
2095 semantic_type: SemanticType::Field,
2096 column_id: 3,
2097 })
2098 .push_column_metadata(ColumnMetadata {
2099 column_schema: ColumnSchema::new(
2100 "ts".to_string(),
2101 ConcreteDataType::timestamp_millisecond_datatype(),
2102 false,
2103 ),
2104 semantic_type: SemanticType::Timestamp,
2105 column_id: 4,
2106 })
2107 .primary_key(vec![0]);
2108 builder.build().unwrap()
2109 }
2110
2111 fn new_fulltext_record_batch_by_range(
2113 tag: &str,
2114 text_bloom: &str,
2115 text_tantivy: &str,
2116 start: usize,
2117 end: usize,
2118 ) -> RecordBatch {
2119 assert!(end >= start);
2120 let metadata = Arc::new(fulltext_region_metadata());
2121 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2122
2123 let num_rows = end - start;
2124 let mut columns = Vec::new();
2125
2126 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
2128 for _ in 0..num_rows {
2129 tag_builder.append_value(tag);
2130 }
2131 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
2132
2133 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
2135 columns.push(Arc::new(StringArray::from(text_bloom_values)));
2136
2137 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
2139 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
2140
2141 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
2143 columns.push(Arc::new(UInt64Array::from(field_values)));
2144
2145 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
2147 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
2148
2149 let pk = new_primary_key(&[tag]);
2151 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
2152 for _ in 0..num_rows {
2153 pk_builder.append(&pk).unwrap();
2154 }
2155 columns.push(Arc::new(pk_builder.finish()));
2156
2157 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
2159
2160 columns.push(Arc::new(UInt8Array::from_value(
2162 OpType::Put as u8,
2163 num_rows,
2164 )));
2165
2166 RecordBatch::try_new(flat_schema, columns).unwrap()
2167 }
2168
2169 #[tokio::test]
2170 async fn test_write_flat_read_with_fulltext_index() {
2171 let mut env = TestEnv::new().await;
2172 let object_store = env.init_object_store_manager();
2173 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
2174 let metadata = Arc::new(fulltext_region_metadata());
2175 let row_group_size = 50;
2176
2177 let flat_batches = vec![
2183 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2184 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2185 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2186 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2187 ];
2188
2189 let flat_source = new_flat_source_from_record_batches(flat_batches);
2190
2191 let write_opts = WriteOptions {
2192 row_group_size,
2193 ..Default::default()
2194 };
2195
2196 let indexer_builder = create_test_indexer_builder(
2197 &env,
2198 object_store.clone(),
2199 file_path.clone(),
2200 metadata.clone(),
2201 row_group_size,
2202 );
2203
2204 let mut info = write_flat_sst(
2205 object_store.clone(),
2206 metadata.clone(),
2207 indexer_builder,
2208 file_path.clone(),
2209 flat_source,
2210 &write_opts,
2211 )
2212 .await;
2213 assert_eq!(200, info.num_rows);
2214 assert!(info.file_size > 0);
2215 assert!(info.index_metadata.file_size > 0);
2216
2217 assert!(info.index_metadata.fulltext_index.index_size > 0);
2219 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2220 info.index_metadata.fulltext_index.columns.sort_unstable();
2222 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2223
2224 assert_eq!(
2225 (
2226 Timestamp::new_millisecond(0),
2227 Timestamp::new_millisecond(199)
2228 ),
2229 info.time_range
2230 );
2231
2232 let handle = create_file_handle_from_sst_info(&info, &metadata);
2233
2234 let cache = create_test_cache();
2235
2236 let matches_func = || {
2238 Arc::new(
2239 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2240 .provide(Default::default()),
2241 )
2242 };
2243
2244 let matches_term_func = || {
2245 Arc::new(
2246 ScalarFunctionFactory::from(
2247 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2248 )
2249 .provide(Default::default()),
2250 )
2251 };
2252
2253 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2256 args: vec![col("text_bloom"), "hello".lit()],
2257 func: matches_term_func(),
2258 })];
2259
2260 let fulltext_applier = FulltextIndexApplierBuilder::new(
2261 FILE_DIR.to_string(),
2262 PathType::Bare,
2263 object_store.clone(),
2264 env.get_puffin_manager(),
2265 &metadata,
2266 )
2267 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2268 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2269 .build(&preds)
2270 .unwrap()
2271 .map(Arc::new);
2272
2273 let builder = ParquetReaderBuilder::new(
2274 FILE_DIR.to_string(),
2275 PathType::Bare,
2276 handle.clone(),
2277 object_store.clone(),
2278 )
2279 .predicate(Some(Predicate::new(preds)))
2280 .fulltext_index_appliers([None, fulltext_applier.clone()])
2281 .cache(CacheStrategy::EnableAll(cache.clone()));
2282
2283 let mut metrics = ReaderMetrics::default();
2284 let (_context, selection) = builder
2285 .build_reader_input(&mut metrics)
2286 .await
2287 .unwrap()
2288 .unwrap();
2289
2290 assert_eq!(selection.row_group_count(), 2);
2292 assert_eq!(50, selection.get(0).unwrap().row_count());
2293 assert_eq!(50, selection.get(1).unwrap().row_count());
2294
2295 assert_eq!(metrics.filter_metrics.rg_total, 4);
2297 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2298 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2299 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2300
2301 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2304 args: vec![col("text_tantivy"), "lazy".lit()],
2305 func: matches_func(),
2306 })];
2307
2308 let fulltext_applier = FulltextIndexApplierBuilder::new(
2309 FILE_DIR.to_string(),
2310 PathType::Bare,
2311 object_store.clone(),
2312 env.get_puffin_manager(),
2313 &metadata,
2314 )
2315 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2316 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2317 .build(&preds)
2318 .unwrap()
2319 .map(Arc::new);
2320
2321 let builder = ParquetReaderBuilder::new(
2322 FILE_DIR.to_string(),
2323 PathType::Bare,
2324 handle.clone(),
2325 object_store.clone(),
2326 )
2327 .predicate(Some(Predicate::new(preds)))
2328 .fulltext_index_appliers([None, fulltext_applier.clone()])
2329 .cache(CacheStrategy::EnableAll(cache.clone()));
2330
2331 let mut metrics = ReaderMetrics::default();
2332 let (_context, selection) = builder
2333 .build_reader_input(&mut metrics)
2334 .await
2335 .unwrap()
2336 .unwrap();
2337
2338 assert_eq!(selection.row_group_count(), 2);
2340 assert_eq!(50, selection.get(2).unwrap().row_count());
2341 assert_eq!(50, selection.get(3).unwrap().row_count());
2342
2343 assert_eq!(metrics.filter_metrics.rg_total, 4);
2345 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2346 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2347 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2348 }
2349}