1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::FileId;
22
23use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
24use crate::sst::file::FileTimeRange;
25use crate::sst::index::IndexOutput;
26
27pub mod file_range;
28pub mod flat_format;
29pub mod format;
30pub(crate) mod helper;
31pub(crate) mod metadata;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug, Clone)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub max_row_group_uncompressed_size: u64,
81 pub num_rows: usize,
83 pub num_row_groups: u64,
85 pub file_metadata: Option<Arc<ParquetMetaData>>,
87 pub index_metadata: IndexOutput,
89 pub num_series: u64,
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashSet;
96 use std::sync::Arc;
97
98 use api::v1::{OpType, SemanticType};
99 use common_function::function::FunctionRef;
100 use common_function::function_factory::ScalarFunctionFactory;
101 use common_function::scalars::matches::MatchesFunction;
102 use common_function::scalars::matches_term::MatchesTermFunction;
103 use common_time::Timestamp;
104 use datafusion_common::{Column, ScalarValue};
105 use datafusion_expr::expr::ScalarFunction;
106 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
107 use datatypes::arrow;
108 use datatypes::arrow::array::{
109 ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
110 TimestampMillisecondArray, UInt8Array, UInt64Array,
111 };
112 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
113 use datatypes::arrow::util::pretty::pretty_format_batches;
114 use datatypes::prelude::ConcreteDataType;
115 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
116 use object_store::ObjectStore;
117 use parquet::arrow::AsyncArrowWriter;
118 use parquet::basic::{Compression, Encoding, ZstdLevel};
119 use parquet::file::metadata::{KeyValue, PageIndexPolicy};
120 use parquet::file::properties::WriterProperties;
121 use store_api::codec::PrimaryKeyEncoding;
122 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
123 use store_api::region_request::PathType;
124 use store_api::storage::{ColumnSchema, RegionId};
125 use table::predicate::Predicate;
126 use tokio_util::compat::FuturesAsyncWriteCompatExt;
127
128 use super::*;
129 use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
130 use crate::cache::test_util::assert_parquet_metadata_equal;
131 use crate::cache::{CacheManager, CacheStrategy, PageKey};
132 use crate::config::IndexConfig;
133 use crate::read::FlatSource;
134 use crate::region::options::{IndexOptions, InvertedIndexOptions};
135 use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
136 use crate::sst::file_purger::NoopFilePurger;
137 use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
138 use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
139 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
140 use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
141 use crate::sst::parquet::flat_format::FlatWriteFormat;
142 use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
143 use crate::sst::parquet::writer::ParquetWriter;
144 use crate::sst::{
145 DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
146 };
147 use crate::test_util::TestEnv;
148 use crate::test_util::sst_util::{
149 build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
150 new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
151 new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
152 sst_region_metadata_with_encoding,
153 };
154
155 const FILE_DIR: &str = "/";
156 const REGION_ID: RegionId = RegionId::new(0, 0);
157
158 #[derive(Clone)]
159 struct FixedPathProvider {
160 region_file_id: RegionFileId,
161 }
162
163 impl FilePathProvider for FixedPathProvider {
164 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
165 location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
166 }
167
168 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
169 location::index_file_path(FILE_DIR, index_id, PathType::Bare)
170 }
171
172 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
173 location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
174 }
175 }
176
177 struct NoopIndexBuilder;
178
179 #[async_trait::async_trait]
180 impl IndexerBuilder for NoopIndexBuilder {
181 async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer {
182 Indexer::default()
183 }
184 }
185
186 #[tokio::test]
187 async fn test_write_read() {
188 let mut env = TestEnv::new().await;
189 let object_store = env.init_object_store_manager();
190 let handle = sst_file_handle(0, 1000);
191 let file_path = FixedPathProvider {
192 region_file_id: handle.file_id(),
193 };
194 let metadata = Arc::new(sst_region_metadata());
195 let source = new_flat_source_from_record_batches(vec![
196 new_record_batch_by_range(&["a", "d"], 0, 60),
197 new_record_batch_by_range(&["b", "f"], 0, 40),
198 new_record_batch_by_range(&["b", "h"], 100, 200),
199 ]);
200 let write_opts = WriteOptions {
202 row_group_size: 50,
203 ..Default::default()
204 };
205
206 let mut metrics = Metrics::new(WriteType::Flush);
207 let mut writer = ParquetWriter::new_with_object_store(
208 object_store.clone(),
209 metadata.clone(),
210 IndexConfig::default(),
211 NoopIndexBuilder,
212 file_path,
213 &mut metrics,
214 )
215 .await;
216
217 let info = writer
218 .write_all_flat_as_primary_key(source, None, &write_opts)
219 .await
220 .unwrap()
221 .remove(0);
222 assert_eq!(200, info.num_rows);
223 assert!(info.file_size > 0);
224 assert_eq!(
225 (
226 Timestamp::new_millisecond(0),
227 Timestamp::new_millisecond(199)
228 ),
229 info.time_range
230 );
231
232 let builder = ParquetReaderBuilder::new(
233 FILE_DIR.to_string(),
234 PathType::Bare,
235 handle.clone(),
236 object_store,
237 );
238 let mut reader = builder.build().await.unwrap().unwrap();
239 check_record_batch_reader_result(
240 &mut reader,
241 &[
242 new_record_batch_by_range(&["a", "d"], 0, 50),
243 new_record_batch_by_range(&["a", "d"], 50, 60),
244 new_record_batch_by_range(&["b", "f"], 0, 40),
245 new_record_batch_by_range(&["b", "h"], 100, 150),
246 new_record_batch_by_range(&["b", "h"], 150, 200),
247 ],
248 )
249 .await;
250 }
251
252 #[tokio::test]
253 async fn test_read_with_cache() {
254 let mut env = TestEnv::new().await;
255 let object_store = env.init_object_store_manager();
256 let handle = sst_file_handle(0, 1000);
257 let metadata = Arc::new(sst_region_metadata());
258 let source = new_flat_source_from_record_batches(vec![
259 new_record_batch_by_range(&["a", "d"], 0, 60),
260 new_record_batch_by_range(&["b", "f"], 0, 40),
261 new_record_batch_by_range(&["b", "h"], 100, 200),
262 ]);
263 let write_opts = WriteOptions {
265 row_group_size: 50,
266 ..Default::default()
267 };
268 let mut metrics = Metrics::new(WriteType::Flush);
270 let mut writer = ParquetWriter::new_with_object_store(
271 object_store.clone(),
272 metadata.clone(),
273 IndexConfig::default(),
274 NoopIndexBuilder,
275 FixedPathProvider {
276 region_file_id: handle.file_id(),
277 },
278 &mut metrics,
279 )
280 .await;
281
282 let sst_info = writer
283 .write_all_flat_as_primary_key(source, None, &write_opts)
284 .await
285 .unwrap()
286 .remove(0);
287
288 let cache = CacheStrategy::EnableAll(Arc::new(
290 CacheManager::builder()
291 .page_cache_size(64 * 1024 * 1024)
292 .build(),
293 ));
294 let builder = ParquetReaderBuilder::new(
295 FILE_DIR.to_string(),
296 PathType::Bare,
297 handle.clone(),
298 object_store,
299 )
300 .cache(cache.clone());
301 for _ in 0..3 {
302 let mut reader = builder.build().await.unwrap().unwrap();
303 check_record_batch_reader_result(
304 &mut reader,
305 &[
306 new_record_batch_by_range(&["a", "d"], 0, 50),
307 new_record_batch_by_range(&["a", "d"], 50, 60),
308 new_record_batch_by_range(&["b", "f"], 0, 40),
309 new_record_batch_by_range(&["b", "h"], 100, 150),
310 new_record_batch_by_range(&["b", "h"], 150, 200),
311 ],
312 )
313 .await;
314 }
315
316 let parquet_meta = sst_info.file_metadata.unwrap();
317 let get_ranges = |row_group_idx: usize| {
318 let row_group = parquet_meta.row_group(row_group_idx);
319 let mut ranges = Vec::with_capacity(row_group.num_columns());
320 for i in 0..row_group.num_columns() {
321 let (start, length) = row_group.column(i).byte_range();
322 ranges.push(start..start + length);
323 }
324
325 ranges
326 };
327
328 for i in 0..4 {
330 let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
331 assert!(cache.get_pages(&page_key).is_some());
332 }
333 let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
334 assert!(cache.get_pages(&page_key).is_none());
335 }
336
337 #[tokio::test]
338 async fn test_parquet_metadata_eq() {
339 let mut env = crate::test_util::TestEnv::new().await;
341 let object_store = env.init_object_store_manager();
342 let handle = sst_file_handle(0, 1000);
343 let metadata = Arc::new(sst_region_metadata());
344 let source = new_flat_source_from_record_batches(vec![
345 new_record_batch_by_range(&["a", "d"], 0, 60),
346 new_record_batch_by_range(&["b", "f"], 0, 40),
347 new_record_batch_by_range(&["b", "h"], 100, 200),
348 ]);
349 let write_opts = WriteOptions {
350 row_group_size: 50,
351 ..Default::default()
352 };
353
354 let mut metrics = Metrics::new(WriteType::Flush);
357 let mut writer = ParquetWriter::new_with_object_store(
358 object_store.clone(),
359 metadata.clone(),
360 IndexConfig::default(),
361 NoopIndexBuilder,
362 FixedPathProvider {
363 region_file_id: handle.file_id(),
364 },
365 &mut metrics,
366 )
367 .await;
368
369 let sst_info = writer
370 .write_all_flat_as_primary_key(source, None, &write_opts)
371 .await
372 .unwrap()
373 .remove(0);
374 let writer_metadata = sst_info.file_metadata.unwrap();
375
376 let builder = ParquetReaderBuilder::new(
378 FILE_DIR.to_string(),
379 PathType::Bare,
380 handle.clone(),
381 object_store,
382 )
383 .page_index_policy(PageIndexPolicy::Optional);
384 let reader = builder.build().await.unwrap().unwrap();
385 let reader_metadata = reader.parquet_metadata();
386 let cached_writer_metadata =
387 crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
388 .unwrap()
389 .parquet_metadata();
390
391 assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
392 }
393
394 #[tokio::test]
395 async fn test_read_with_tag_filter() {
396 let mut env = TestEnv::new().await;
397 let object_store = env.init_object_store_manager();
398 let handle = sst_file_handle(0, 1000);
399 let metadata = Arc::new(sst_region_metadata());
400 let source = new_flat_source_from_record_batches(vec![
401 new_record_batch_by_range(&["a", "d"], 0, 60),
402 new_record_batch_by_range(&["b", "f"], 0, 40),
403 new_record_batch_by_range(&["b", "h"], 100, 200),
404 ]);
405 let write_opts = WriteOptions {
407 row_group_size: 50,
408 ..Default::default()
409 };
410 let mut metrics = Metrics::new(WriteType::Flush);
412 let mut writer = ParquetWriter::new_with_object_store(
413 object_store.clone(),
414 metadata.clone(),
415 IndexConfig::default(),
416 NoopIndexBuilder,
417 FixedPathProvider {
418 region_file_id: handle.file_id(),
419 },
420 &mut metrics,
421 )
422 .await;
423 writer
424 .write_all_flat_as_primary_key(source, None, &write_opts)
425 .await
426 .unwrap()
427 .remove(0);
428
429 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
431 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
432 op: Operator::Eq,
433 right: Box::new("a".lit()),
434 })]));
435
436 let builder = ParquetReaderBuilder::new(
437 FILE_DIR.to_string(),
438 PathType::Bare,
439 handle.clone(),
440 object_store,
441 )
442 .predicate(predicate);
443 let mut reader = builder.build().await.unwrap().unwrap();
444 check_record_batch_reader_result(
445 &mut reader,
446 &[
447 new_record_batch_by_range(&["a", "d"], 0, 50),
448 new_record_batch_by_range(&["a", "d"], 50, 60),
449 ],
450 )
451 .await;
452 }
453
454 #[tokio::test]
455 async fn test_read_empty_batch() {
456 let mut env = TestEnv::new().await;
457 let object_store = env.init_object_store_manager();
458 let handle = sst_file_handle(0, 1000);
459 let metadata = Arc::new(sst_region_metadata());
460 let source = new_flat_source_from_record_batches(vec![
461 new_record_batch_by_range(&["a", "z"], 0, 0),
462 new_record_batch_by_range(&["a", "z"], 100, 100),
463 new_record_batch_by_range(&["a", "z"], 200, 230),
464 ]);
465 let write_opts = WriteOptions {
467 row_group_size: 50,
468 ..Default::default()
469 };
470 let mut metrics = Metrics::new(WriteType::Flush);
472 let mut writer = ParquetWriter::new_with_object_store(
473 object_store.clone(),
474 metadata.clone(),
475 IndexConfig::default(),
476 NoopIndexBuilder,
477 FixedPathProvider {
478 region_file_id: handle.file_id(),
479 },
480 &mut metrics,
481 )
482 .await;
483 writer
484 .write_all_flat_as_primary_key(source, None, &write_opts)
485 .await
486 .unwrap()
487 .remove(0);
488
489 let builder = ParquetReaderBuilder::new(
490 FILE_DIR.to_string(),
491 PathType::Bare,
492 handle.clone(),
493 object_store,
494 );
495 let mut reader = builder.build().await.unwrap().unwrap();
496 check_record_batch_reader_result(
497 &mut reader,
498 &[new_record_batch_by_range(&["a", "z"], 200, 230)],
499 )
500 .await;
501 }
502
503 #[tokio::test]
504 async fn test_read_with_field_filter() {
505 let mut env = TestEnv::new().await;
506 let object_store = env.init_object_store_manager();
507 let handle = sst_file_handle(0, 1000);
508 let metadata = Arc::new(sst_region_metadata());
509 let source = new_flat_source_from_record_batches(vec![
510 new_record_batch_by_range(&["a", "d"], 0, 60),
511 new_record_batch_by_range(&["b", "f"], 0, 40),
512 new_record_batch_by_range(&["b", "h"], 100, 200),
513 ]);
514 let write_opts = WriteOptions {
516 row_group_size: 50,
517 ..Default::default()
518 };
519 let mut metrics = Metrics::new(WriteType::Flush);
521 let mut writer = ParquetWriter::new_with_object_store(
522 object_store.clone(),
523 metadata.clone(),
524 IndexConfig::default(),
525 NoopIndexBuilder,
526 FixedPathProvider {
527 region_file_id: handle.file_id(),
528 },
529 &mut metrics,
530 )
531 .await;
532
533 writer
534 .write_all_flat_as_primary_key(source, None, &write_opts)
535 .await
536 .unwrap()
537 .remove(0);
538
539 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
541 left: Box::new(Expr::Column(Column::from_name("field_0"))),
542 op: Operator::GtEq,
543 right: Box::new(150u64.lit()),
544 })]));
545
546 let builder = ParquetReaderBuilder::new(
547 FILE_DIR.to_string(),
548 PathType::Bare,
549 handle.clone(),
550 object_store,
551 )
552 .predicate(predicate);
553 let mut reader = builder.build().await.unwrap().unwrap();
554 check_record_batch_reader_result(
555 &mut reader,
556 &[new_record_batch_by_range(&["b", "h"], 150, 200)],
557 )
558 .await;
559 }
560
561 #[tokio::test]
562 async fn test_read_large_binary() {
563 let mut env = TestEnv::new().await;
564 let object_store = env.init_object_store_manager();
565 let handle = sst_file_handle(0, 1000);
566 let file_path = handle.file_path(FILE_DIR, PathType::Bare);
567
568 let write_opts = WriteOptions {
569 row_group_size: 50,
570 ..Default::default()
571 };
572
573 let metadata = build_test_binary_test_region_metadata();
574 let json = metadata.to_json().unwrap();
575 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
576
577 let props_builder = WriterProperties::builder()
578 .set_key_value_metadata(Some(vec![key_value_meta]))
579 .set_compression(Compression::ZSTD(ZstdLevel::default()))
580 .set_encoding(Encoding::PLAIN)
581 .set_max_row_group_size(write_opts.row_group_size);
582
583 let writer_props = props_builder.build();
584
585 let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
586 let fields: Vec<_> = write_format
587 .arrow_schema()
588 .fields()
589 .into_iter()
590 .map(|field| {
591 let data_type = field.data_type().clone();
592 if data_type == DataType::Binary {
593 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
594 } else {
595 Field::new(field.name(), data_type, field.is_nullable())
596 }
597 })
598 .collect();
599
600 let arrow_schema = Arc::new(Schema::new(fields));
601
602 assert_eq!(
604 &DataType::LargeBinary,
605 arrow_schema.field_with_name("field_0").unwrap().data_type()
606 );
607 let mut writer = AsyncArrowWriter::try_new(
608 object_store
609 .writer_with(&file_path)
610 .concurrent(DEFAULT_WRITE_CONCURRENCY)
611 .await
612 .map(|w| w.into_futures_async_write().compat_write())
613 .unwrap(),
614 arrow_schema.clone(),
615 Some(writer_props),
616 )
617 .unwrap();
618
619 let batch = new_record_batch_with_binary(&["a"], 0, 60);
620 let arrays: Vec<_> = batch
621 .columns()
622 .iter()
623 .map(|array| {
624 let data_type = array.data_type().clone();
625 if data_type == DataType::Binary {
626 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
627 } else {
628 array.clone()
629 }
630 })
631 .collect();
632 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
633
634 writer.write(&result).await.unwrap();
635 writer.close().await.unwrap();
636
637 let builder = ParquetReaderBuilder::new(
638 FILE_DIR.to_string(),
639 PathType::Bare,
640 handle.clone(),
641 object_store,
642 );
643 let mut reader = builder.build().await.unwrap().unwrap();
644 check_record_batch_reader_result(
645 &mut reader,
646 &[
647 new_record_batch_with_binary(&["a"], 0, 50),
648 new_record_batch_with_binary(&["a"], 50, 60),
649 ],
650 )
651 .await;
652 }
653
654 #[tokio::test]
655 async fn test_write_multiple_files() {
656 common_telemetry::init_default_ut_logging();
657 let mut env = TestEnv::new().await;
659 let object_store = env.init_object_store_manager();
660 let metadata = Arc::new(sst_region_metadata());
661 let batches = vec![
662 new_record_batch_by_range(&["a", "d"], 0, 1000),
663 new_record_batch_by_range(&["b", "f"], 0, 1000),
664 new_record_batch_by_range(&["c", "g"], 0, 1000),
665 new_record_batch_by_range(&["b", "h"], 100, 200),
666 new_record_batch_by_range(&["b", "h"], 200, 300),
667 new_record_batch_by_range(&["b", "h"], 300, 1000),
668 ];
669 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
670
671 let source = new_flat_source_from_record_batches(batches);
672 let write_opts = WriteOptions {
673 row_group_size: 50,
674 max_file_size: Some(1024 * 16),
675 ..Default::default()
676 };
677
678 let path_provider = RegionFilePathFactory {
679 table_dir: "test".to_string(),
680 path_type: PathType::Bare,
681 };
682 let mut metrics = Metrics::new(WriteType::Flush);
683 let mut writer = ParquetWriter::new_with_object_store(
684 object_store.clone(),
685 metadata.clone(),
686 IndexConfig::default(),
687 NoopIndexBuilder,
688 path_provider,
689 &mut metrics,
690 )
691 .await;
692
693 let files = writer
694 .write_all_flat_as_primary_key(source, None, &write_opts)
695 .await
696 .unwrap();
697 assert_eq!(2, files.len());
698
699 let mut rows_read = 0;
700 for f in &files {
701 let file_handle = sst_file_handle_with_file_id(
702 f.file_id,
703 f.time_range.0.value(),
704 f.time_range.1.value(),
705 );
706 let builder = ParquetReaderBuilder::new(
707 "test".to_string(),
708 PathType::Bare,
709 file_handle,
710 object_store.clone(),
711 );
712 let mut reader = builder.build().await.unwrap().unwrap();
713 while let Some(batch) = reader.next_record_batch().await.unwrap() {
714 rows_read += batch.num_rows();
715 }
716 }
717 assert_eq!(total_rows, rows_read);
718 }
719
720 #[tokio::test]
721 async fn test_write_read_with_index() {
722 let mut env = TestEnv::new().await;
723 let object_store = env.init_object_store_manager();
724 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
725 let metadata = Arc::new(sst_region_metadata());
726 let row_group_size = 50;
727
728 let source = new_flat_source_from_record_batches(vec![
729 new_record_batch_by_range(&["a", "d"], 0, 20),
730 new_record_batch_by_range(&["b", "d"], 0, 20),
731 new_record_batch_by_range(&["c", "d"], 0, 20),
732 new_record_batch_by_range(&["c", "f"], 0, 40),
733 new_record_batch_by_range(&["c", "h"], 100, 200),
734 ]);
735 let write_opts = WriteOptions {
737 row_group_size,
738 ..Default::default()
739 };
740
741 let puffin_manager = env
742 .get_puffin_manager()
743 .build(object_store.clone(), file_path.clone());
744 let intermediate_manager = env.get_intermediate_manager();
745
746 let indexer_builder = IndexerBuilderImpl {
747 build_type: IndexBuildType::Flush,
748 metadata: metadata.clone(),
749 row_group_size,
750 puffin_manager,
751 write_cache_enabled: false,
752 intermediate_manager,
753 index_options: IndexOptions {
754 inverted_index: InvertedIndexOptions {
755 segment_row_count: 1,
756 ..Default::default()
757 },
758 },
759 inverted_index_config: Default::default(),
760 fulltext_index_config: Default::default(),
761 bloom_filter_index_config: Default::default(),
762 #[cfg(feature = "vector_index")]
763 vector_index_config: Default::default(),
764 };
765
766 let mut metrics = Metrics::new(WriteType::Flush);
767 let mut writer = ParquetWriter::new_with_object_store(
768 object_store.clone(),
769 metadata.clone(),
770 IndexConfig::default(),
771 indexer_builder,
772 file_path.clone(),
773 &mut metrics,
774 )
775 .await;
776
777 let info = writer
778 .write_all_flat_as_primary_key(source, None, &write_opts)
779 .await
780 .unwrap()
781 .remove(0);
782 assert_eq!(200, info.num_rows);
783 assert!(info.file_size > 0);
784 assert!(info.index_metadata.file_size > 0);
785
786 assert!(info.index_metadata.inverted_index.index_size > 0);
787 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
788 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
789
790 assert!(info.index_metadata.bloom_filter.index_size > 0);
791 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
792 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
793
794 assert_eq!(
795 (
796 Timestamp::new_millisecond(0),
797 Timestamp::new_millisecond(199)
798 ),
799 info.time_range
800 );
801
802 let handle = FileHandle::new(
803 FileMeta {
804 region_id: metadata.region_id,
805 file_id: info.file_id,
806 time_range: info.time_range,
807 level: 0,
808 file_size: info.file_size,
809 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
810 available_indexes: info.index_metadata.build_available_indexes(),
811 indexes: info.index_metadata.build_indexes(),
812 index_file_size: info.index_metadata.file_size,
813 index_version: 0,
814 num_row_groups: info.num_row_groups,
815 num_rows: info.num_rows as u64,
816 sequence: None,
817 partition_expr: match &metadata.partition_expr {
818 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
819 .expect("partition expression should be valid JSON"),
820 None => None,
821 },
822 num_series: 0,
823 },
824 Arc::new(NoopFilePurger),
825 );
826
827 let cache = Arc::new(
828 CacheManager::builder()
829 .index_result_cache_size(1024 * 1024)
830 .index_metadata_size(1024 * 1024)
831 .index_content_page_size(1024 * 1024)
832 .index_content_size(1024 * 1024)
833 .puffin_metadata_size(1024 * 1024)
834 .build(),
835 );
836 let index_result_cache = cache.index_result_cache().unwrap();
837
838 let build_inverted_index_applier = |exprs: &[Expr]| {
839 InvertedIndexApplierBuilder::new(
840 FILE_DIR.to_string(),
841 PathType::Bare,
842 object_store.clone(),
843 &metadata,
844 HashSet::from_iter([0]),
845 env.get_puffin_manager(),
846 )
847 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
848 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
849 .build(exprs)
850 .unwrap()
851 .map(Arc::new)
852 };
853
854 let build_bloom_filter_applier = |exprs: &[Expr]| {
855 BloomFilterIndexApplierBuilder::new(
856 FILE_DIR.to_string(),
857 PathType::Bare,
858 object_store.clone(),
859 &metadata,
860 env.get_puffin_manager(),
861 )
862 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
863 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
864 .build(exprs)
865 .unwrap()
866 .map(Arc::new)
867 };
868
869 let preds = vec![col("tag_0").eq(lit("b"))];
886 let inverted_index_applier = build_inverted_index_applier(&preds);
887 let bloom_filter_applier = build_bloom_filter_applier(&preds);
888
889 let builder = ParquetReaderBuilder::new(
890 FILE_DIR.to_string(),
891 PathType::Bare,
892 handle.clone(),
893 object_store.clone(),
894 )
895 .flat_format(true)
896 .predicate(Some(Predicate::new(preds)))
897 .inverted_index_appliers([inverted_index_applier.clone(), None])
898 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
899 .cache(CacheStrategy::EnableAll(cache.clone()));
900
901 let mut metrics = ReaderMetrics::default();
902 let (context, selection) = builder
903 .build_reader_input(&mut metrics)
904 .await
905 .unwrap()
906 .unwrap();
907 let mut reader = ParquetReader::new(Arc::new(context), selection)
908 .await
909 .unwrap();
910 check_record_batch_reader_result(
911 &mut reader,
912 &[new_record_batch_by_range(&["b", "d"], 0, 20)],
913 )
914 .await;
915
916 assert_eq!(metrics.filter_metrics.rg_total, 4);
917 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
918 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
919 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
920 let cached = index_result_cache
921 .get(
922 inverted_index_applier.unwrap().predicate_key(),
923 handle.file_id().file_id(),
924 )
925 .unwrap();
926 assert!(cached.contains_row_group(0));
928 assert!(cached.contains_row_group(1));
929 assert!(cached.contains_row_group(2));
930 assert!(cached.contains_row_group(3));
931
932 let preds = vec![
947 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
948 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
949 col("tag_1").eq(lit("d")),
950 ];
951 let inverted_index_applier = build_inverted_index_applier(&preds);
952 let bloom_filter_applier = build_bloom_filter_applier(&preds);
953
954 let builder = ParquetReaderBuilder::new(
955 FILE_DIR.to_string(),
956 PathType::Bare,
957 handle.clone(),
958 object_store.clone(),
959 )
960 .flat_format(true)
961 .predicate(Some(Predicate::new(preds)))
962 .inverted_index_appliers([inverted_index_applier.clone(), None])
963 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
964 .cache(CacheStrategy::EnableAll(cache.clone()));
965
966 let mut metrics = ReaderMetrics::default();
967 let read_input = builder.build_reader_input(&mut metrics).await.unwrap();
968 assert!(read_input.is_none());
969
970 assert_eq!(metrics.filter_metrics.rg_total, 4);
971 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
972 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
973 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
974 let cached = index_result_cache
975 .get(
976 bloom_filter_applier.unwrap().predicate_key(),
977 handle.file_id().file_id(),
978 )
979 .unwrap();
980 assert!(cached.contains_row_group(2));
981 assert!(cached.contains_row_group(3));
982 assert!(!cached.contains_row_group(0));
983 assert!(!cached.contains_row_group(1));
984
985 let preds = vec![col("tag_1").eq(lit("d"))];
1006 let inverted_index_applier = build_inverted_index_applier(&preds);
1007 let bloom_filter_applier = build_bloom_filter_applier(&preds);
1008
1009 let builder = ParquetReaderBuilder::new(
1010 FILE_DIR.to_string(),
1011 PathType::Bare,
1012 handle.clone(),
1013 object_store.clone(),
1014 )
1015 .flat_format(true)
1016 .predicate(Some(Predicate::new(preds)))
1017 .inverted_index_appliers([inverted_index_applier.clone(), None])
1018 .bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
1019 .cache(CacheStrategy::EnableAll(cache.clone()));
1020
1021 let mut metrics = ReaderMetrics::default();
1022 let (context, selection) = builder
1023 .build_reader_input(&mut metrics)
1024 .await
1025 .unwrap()
1026 .unwrap();
1027 let mut reader = ParquetReader::new(Arc::new(context), selection)
1028 .await
1029 .unwrap();
1030 check_record_batch_reader_result(
1031 &mut reader,
1032 &[
1033 new_record_batch_by_range(&["a", "d"], 0, 20),
1034 new_record_batch_by_range(&["b", "d"], 0, 20),
1035 new_record_batch_by_range(&["c", "d"], 0, 10),
1036 new_record_batch_by_range(&["c", "d"], 10, 20),
1037 ],
1038 )
1039 .await;
1040
1041 assert_eq!(metrics.filter_metrics.rg_total, 4);
1042 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1043 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
1044 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
1045 let cached = index_result_cache
1046 .get(
1047 bloom_filter_applier.unwrap().predicate_key(),
1048 handle.file_id().file_id(),
1049 )
1050 .unwrap();
1051 assert!(cached.contains_row_group(0));
1052 assert!(cached.contains_row_group(1));
1053 assert!(cached.contains_row_group(2));
1054 assert!(cached.contains_row_group(3));
1055 }
1056
1057 fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
1058 assert!(end >= start);
1059 let metadata = build_test_binary_test_region_metadata();
1060 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1061
1062 let num_rows = end - start;
1063 let mut columns = Vec::new();
1064
1065 let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
1066 for _ in 0..num_rows {
1067 tag_0_builder.append_value(tags[0]);
1068 }
1069 columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
1070
1071 let values = (0..num_rows)
1072 .map(|_| "some data".as_bytes())
1073 .collect::<Vec<_>>();
1074 columns.push(
1075 Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
1076 values,
1077 )) as ArrayRef,
1078 );
1079
1080 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1081 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1082
1083 let pk = new_primary_key(tags);
1084 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1085 for _ in 0..num_rows {
1086 pk_builder.append(&pk).unwrap();
1087 }
1088 columns.push(Arc::new(pk_builder.finish()));
1089
1090 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1091 columns.push(Arc::new(UInt8Array::from_value(
1092 OpType::Put as u8,
1093 num_rows,
1094 )));
1095
1096 RecordBatch::try_new(flat_schema, columns).unwrap()
1097 }
1098
1099 async fn check_record_batch_reader_result(
1100 reader: &mut ParquetReader,
1101 expected: &[RecordBatch],
1102 ) {
1103 let mut actual = Vec::new();
1104 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1105 actual.push(batch);
1106 }
1107 assert_eq!(
1108 pretty_format_batches(expected).unwrap().to_string(),
1109 pretty_format_batches(&actual).unwrap().to_string()
1110 );
1111 assert!(reader.next_record_batch().await.unwrap().is_none());
1112 }
1113
1114 fn new_record_batch_by_range_sparse(
1117 tags: &[&str],
1118 start: usize,
1119 end: usize,
1120 metadata: &Arc<RegionMetadata>,
1121 ) -> RecordBatch {
1122 assert!(end >= start);
1123 let flat_schema = to_flat_sst_arrow_schema(
1124 metadata,
1125 &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
1126 );
1127
1128 let num_rows = end - start;
1129 let mut columns: Vec<ArrayRef> = Vec::new();
1130
1131 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1135 columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
1136
1137 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1139 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
1140
1141 let table_id = 1u32; let tsid = 100u64; let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
1145
1146 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1147 for _ in 0..num_rows {
1148 pk_builder.append(&pk).unwrap();
1149 }
1150 columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
1151
1152 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
1154
1155 columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
1157
1158 RecordBatch::try_new(flat_schema, columns).unwrap()
1159 }
1160
1161 fn create_test_indexer_builder(
1163 env: &TestEnv,
1164 object_store: ObjectStore,
1165 file_path: RegionFilePathFactory,
1166 metadata: Arc<RegionMetadata>,
1167 row_group_size: usize,
1168 ) -> IndexerBuilderImpl {
1169 let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
1170 let intermediate_manager = env.get_intermediate_manager();
1171
1172 IndexerBuilderImpl {
1173 build_type: IndexBuildType::Flush,
1174 metadata,
1175 row_group_size,
1176 puffin_manager,
1177 write_cache_enabled: false,
1178 intermediate_manager,
1179 index_options: IndexOptions {
1180 inverted_index: InvertedIndexOptions {
1181 segment_row_count: 1,
1182 ..Default::default()
1183 },
1184 },
1185 inverted_index_config: Default::default(),
1186 fulltext_index_config: Default::default(),
1187 bloom_filter_index_config: Default::default(),
1188 #[cfg(feature = "vector_index")]
1189 vector_index_config: Default::default(),
1190 }
1191 }
1192
1193 async fn write_flat_sst(
1195 object_store: ObjectStore,
1196 metadata: Arc<RegionMetadata>,
1197 indexer_builder: IndexerBuilderImpl,
1198 file_path: RegionFilePathFactory,
1199 flat_source: FlatSource,
1200 write_opts: &WriteOptions,
1201 ) -> SstInfo {
1202 let mut metrics = Metrics::new(WriteType::Flush);
1203 let mut writer = ParquetWriter::new_with_object_store(
1204 object_store,
1205 metadata,
1206 IndexConfig::default(),
1207 indexer_builder,
1208 file_path,
1209 &mut metrics,
1210 )
1211 .await;
1212
1213 writer
1214 .write_all_flat(flat_source, None, write_opts)
1215 .await
1216 .unwrap()
1217 .remove(0)
1218 }
1219
1220 fn create_file_handle_from_sst_info(
1222 info: &SstInfo,
1223 metadata: &Arc<RegionMetadata>,
1224 ) -> FileHandle {
1225 FileHandle::new(
1226 FileMeta {
1227 region_id: metadata.region_id,
1228 file_id: info.file_id,
1229 time_range: info.time_range,
1230 level: 0,
1231 file_size: info.file_size,
1232 max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
1233 available_indexes: info.index_metadata.build_available_indexes(),
1234 indexes: info.index_metadata.build_indexes(),
1235 index_file_size: info.index_metadata.file_size,
1236 index_version: 0,
1237 num_row_groups: info.num_row_groups,
1238 num_rows: info.num_rows as u64,
1239 sequence: None,
1240 partition_expr: match &metadata.partition_expr {
1241 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
1242 .expect("partition expression should be valid JSON"),
1243 None => None,
1244 },
1245 num_series: 0,
1246 },
1247 Arc::new(NoopFilePurger),
1248 )
1249 }
1250
1251 fn create_test_cache() -> Arc<CacheManager> {
1253 Arc::new(
1254 CacheManager::builder()
1255 .index_result_cache_size(1024 * 1024)
1256 .index_metadata_size(1024 * 1024)
1257 .index_content_page_size(1024 * 1024)
1258 .index_content_size(1024 * 1024)
1259 .puffin_metadata_size(1024 * 1024)
1260 .build(),
1261 )
1262 }
1263
1264 #[tokio::test]
1265 async fn test_write_flat_with_index() {
1266 let mut env = TestEnv::new().await;
1267 let object_store = env.init_object_store_manager();
1268 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1269 let metadata = Arc::new(sst_region_metadata());
1270 let row_group_size = 50;
1271
1272 let flat_batches = vec![
1274 new_record_batch_by_range(&["a", "d"], 0, 20),
1275 new_record_batch_by_range(&["b", "d"], 0, 20),
1276 new_record_batch_by_range(&["c", "d"], 0, 20),
1277 new_record_batch_by_range(&["c", "f"], 0, 40),
1278 new_record_batch_by_range(&["c", "h"], 100, 200),
1279 ];
1280
1281 let flat_source = new_flat_source_from_record_batches(flat_batches);
1282
1283 let write_opts = WriteOptions {
1284 row_group_size,
1285 ..Default::default()
1286 };
1287
1288 let puffin_manager = env
1289 .get_puffin_manager()
1290 .build(object_store.clone(), file_path.clone());
1291 let intermediate_manager = env.get_intermediate_manager();
1292
1293 let indexer_builder = IndexerBuilderImpl {
1294 build_type: IndexBuildType::Flush,
1295 metadata: metadata.clone(),
1296 row_group_size,
1297 puffin_manager,
1298 write_cache_enabled: false,
1299 intermediate_manager,
1300 index_options: IndexOptions {
1301 inverted_index: InvertedIndexOptions {
1302 segment_row_count: 1,
1303 ..Default::default()
1304 },
1305 },
1306 inverted_index_config: Default::default(),
1307 fulltext_index_config: Default::default(),
1308 bloom_filter_index_config: Default::default(),
1309 #[cfg(feature = "vector_index")]
1310 vector_index_config: Default::default(),
1311 };
1312
1313 let mut metrics = Metrics::new(WriteType::Flush);
1314 let mut writer = ParquetWriter::new_with_object_store(
1315 object_store.clone(),
1316 metadata.clone(),
1317 IndexConfig::default(),
1318 indexer_builder,
1319 file_path.clone(),
1320 &mut metrics,
1321 )
1322 .await;
1323
1324 let info = writer
1325 .write_all_flat(flat_source, None, &write_opts)
1326 .await
1327 .unwrap()
1328 .remove(0);
1329 assert_eq!(200, info.num_rows);
1330 assert!(info.file_size > 0);
1331 assert!(info.index_metadata.file_size > 0);
1332
1333 assert!(info.index_metadata.inverted_index.index_size > 0);
1334 assert_eq!(info.index_metadata.inverted_index.row_count, 200);
1335 assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
1336
1337 assert!(info.index_metadata.bloom_filter.index_size > 0);
1338 assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
1339 assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
1340
1341 assert_eq!(
1342 (
1343 Timestamp::new_millisecond(0),
1344 Timestamp::new_millisecond(199)
1345 ),
1346 info.time_range
1347 );
1348 }
1349
1350 #[tokio::test]
1351 async fn test_read_with_override_sequence() {
1352 let mut env = TestEnv::new().await;
1353 let object_store = env.init_object_store_manager();
1354 let handle = sst_file_handle(0, 1000);
1355 let file_path = FixedPathProvider {
1356 region_file_id: handle.file_id(),
1357 };
1358 let metadata = Arc::new(sst_region_metadata());
1359
1360 let source = new_flat_source_from_record_batches(vec![
1362 new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
1363 new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
1364 ]);
1365
1366 let write_opts = WriteOptions {
1367 row_group_size: 50,
1368 ..Default::default()
1369 };
1370
1371 let mut metrics = Metrics::new(WriteType::Flush);
1372 let mut writer = ParquetWriter::new_with_object_store(
1373 object_store.clone(),
1374 metadata.clone(),
1375 IndexConfig::default(),
1376 NoopIndexBuilder,
1377 file_path,
1378 &mut metrics,
1379 )
1380 .await;
1381
1382 writer
1383 .write_all_flat_as_primary_key(source, None, &write_opts)
1384 .await
1385 .unwrap()
1386 .remove(0);
1387
1388 let builder = ParquetReaderBuilder::new(
1390 FILE_DIR.to_string(),
1391 PathType::Bare,
1392 handle.clone(),
1393 object_store.clone(),
1394 );
1395 let mut reader = builder.build().await.unwrap().unwrap();
1396 let mut normal_batches = Vec::new();
1397 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1398 normal_batches.push(batch);
1399 }
1400
1401 let custom_sequence = 12345u64;
1403 let file_meta = handle.meta_ref();
1404 let mut override_file_meta = file_meta.clone();
1405 override_file_meta.sequence = Some(std::num::NonZero::new(custom_sequence).unwrap());
1406 let override_handle = FileHandle::new(
1407 override_file_meta,
1408 Arc::new(crate::sst::file_purger::NoopFilePurger),
1409 );
1410
1411 let builder = ParquetReaderBuilder::new(
1412 FILE_DIR.to_string(),
1413 PathType::Bare,
1414 override_handle,
1415 object_store.clone(),
1416 );
1417 let mut reader = builder.build().await.unwrap().unwrap();
1418 let mut override_batches = Vec::new();
1419 while let Some(batch) = reader.next_record_batch().await.unwrap() {
1420 override_batches.push(batch);
1421 }
1422
1423 assert_eq!(normal_batches.len(), override_batches.len());
1425 for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
1426 let expected_batch = {
1427 let mut columns = normal.columns().to_vec();
1428 let num_cols = columns.len();
1429 columns[num_cols - 2] =
1430 Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
1431 RecordBatch::try_new(normal.schema(), columns).unwrap()
1432 };
1433
1434 assert_eq!(*override_batch, expected_batch);
1436 }
1437 }
1438
1439 #[tokio::test]
1440 async fn test_write_flat_read_with_inverted_index() {
1441 let mut env = TestEnv::new().await;
1442 let object_store = env.init_object_store_manager();
1443 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1444 let metadata = Arc::new(sst_region_metadata());
1445 let row_group_size = 100;
1446
1447 let flat_batches = vec![
1455 new_record_batch_by_range(&["a", "d"], 0, 50),
1456 new_record_batch_by_range(&["b", "d"], 50, 100),
1457 new_record_batch_by_range(&["c", "d"], 100, 150),
1458 new_record_batch_by_range(&["c", "f"], 150, 200),
1459 ];
1460
1461 let flat_source = new_flat_source_from_record_batches(flat_batches);
1462
1463 let write_opts = WriteOptions {
1464 row_group_size,
1465 ..Default::default()
1466 };
1467
1468 let indexer_builder = create_test_indexer_builder(
1469 &env,
1470 object_store.clone(),
1471 file_path.clone(),
1472 metadata.clone(),
1473 row_group_size,
1474 );
1475
1476 let info = write_flat_sst(
1477 object_store.clone(),
1478 metadata.clone(),
1479 indexer_builder,
1480 file_path.clone(),
1481 flat_source,
1482 &write_opts,
1483 )
1484 .await;
1485 assert_eq!(200, info.num_rows);
1486 assert!(info.file_size > 0);
1487 assert!(info.index_metadata.file_size > 0);
1488
1489 let handle = create_file_handle_from_sst_info(&info, &metadata);
1490
1491 let cache = create_test_cache();
1492
1493 let preds = vec![col("tag_0").eq(lit("b"))];
1496 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1497 FILE_DIR.to_string(),
1498 PathType::Bare,
1499 object_store.clone(),
1500 &metadata,
1501 HashSet::from_iter([0]),
1502 env.get_puffin_manager(),
1503 )
1504 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1505 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1506 .build(&preds)
1507 .unwrap()
1508 .map(Arc::new);
1509
1510 let builder = ParquetReaderBuilder::new(
1511 FILE_DIR.to_string(),
1512 PathType::Bare,
1513 handle.clone(),
1514 object_store.clone(),
1515 )
1516 .flat_format(true)
1517 .predicate(Some(Predicate::new(preds)))
1518 .inverted_index_appliers([inverted_index_applier.clone(), None])
1519 .cache(CacheStrategy::EnableAll(cache.clone()));
1520
1521 let mut metrics = ReaderMetrics::default();
1522 let (_context, selection) = builder
1523 .build_reader_input(&mut metrics)
1524 .await
1525 .unwrap()
1526 .unwrap();
1527
1528 assert_eq!(selection.row_group_count(), 1);
1530 assert_eq!(50, selection.get(0).unwrap().row_count());
1531
1532 assert_eq!(metrics.filter_metrics.rg_total, 2);
1534 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
1535 assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
1536 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_write_flat_read_with_bloom_filter() {
1541 let mut env = TestEnv::new().await;
1542 let object_store = env.init_object_store_manager();
1543 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1544 let metadata = Arc::new(sst_region_metadata());
1545 let row_group_size = 100;
1546
1547 let flat_batches = vec![
1555 new_record_batch_by_range(&["a", "d"], 0, 50),
1556 new_record_batch_by_range(&["b", "e"], 50, 100),
1557 new_record_batch_by_range(&["c", "d"], 100, 150),
1558 new_record_batch_by_range(&["c", "f"], 150, 200),
1559 ];
1560
1561 let flat_source = new_flat_source_from_record_batches(flat_batches);
1562
1563 let write_opts = WriteOptions {
1564 row_group_size,
1565 ..Default::default()
1566 };
1567
1568 let indexer_builder = create_test_indexer_builder(
1569 &env,
1570 object_store.clone(),
1571 file_path.clone(),
1572 metadata.clone(),
1573 row_group_size,
1574 );
1575
1576 let info = write_flat_sst(
1577 object_store.clone(),
1578 metadata.clone(),
1579 indexer_builder,
1580 file_path.clone(),
1581 flat_source,
1582 &write_opts,
1583 )
1584 .await;
1585 assert_eq!(200, info.num_rows);
1586 assert!(info.file_size > 0);
1587 assert!(info.index_metadata.file_size > 0);
1588
1589 let handle = create_file_handle_from_sst_info(&info, &metadata);
1590
1591 let cache = create_test_cache();
1592
1593 let preds = vec![
1596 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1597 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1598 col("tag_1").eq(lit("d")),
1599 ];
1600 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1601 FILE_DIR.to_string(),
1602 PathType::Bare,
1603 object_store.clone(),
1604 &metadata,
1605 env.get_puffin_manager(),
1606 )
1607 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1608 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1609 .build(&preds)
1610 .unwrap()
1611 .map(Arc::new);
1612
1613 let builder = ParquetReaderBuilder::new(
1614 FILE_DIR.to_string(),
1615 PathType::Bare,
1616 handle.clone(),
1617 object_store.clone(),
1618 )
1619 .flat_format(true)
1620 .predicate(Some(Predicate::new(preds)))
1621 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1622 .cache(CacheStrategy::EnableAll(cache.clone()));
1623
1624 let mut metrics = ReaderMetrics::default();
1625 let (_context, selection) = builder
1626 .build_reader_input(&mut metrics)
1627 .await
1628 .unwrap()
1629 .unwrap();
1630
1631 assert_eq!(selection.row_group_count(), 2);
1633 assert_eq!(50, selection.get(0).unwrap().row_count());
1634 assert_eq!(50, selection.get(1).unwrap().row_count());
1635
1636 assert_eq!(metrics.filter_metrics.rg_total, 2);
1638 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1639 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1640 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1641 }
1642
1643 #[tokio::test]
1644 async fn test_write_flat_read_with_inverted_index_sparse() {
1645 common_telemetry::init_default_ut_logging();
1646
1647 let mut env = TestEnv::new().await;
1648 let object_store = env.init_object_store_manager();
1649 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1650 let metadata = Arc::new(sst_region_metadata_with_encoding(
1651 PrimaryKeyEncoding::Sparse,
1652 ));
1653 let row_group_size = 100;
1654
1655 let flat_batches = vec![
1663 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1664 new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
1665 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1666 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1667 ];
1668
1669 let flat_source = new_flat_source_from_record_batches(flat_batches);
1670
1671 let write_opts = WriteOptions {
1672 row_group_size,
1673 ..Default::default()
1674 };
1675
1676 let indexer_builder = create_test_indexer_builder(
1677 &env,
1678 object_store.clone(),
1679 file_path.clone(),
1680 metadata.clone(),
1681 row_group_size,
1682 );
1683
1684 let info = write_flat_sst(
1685 object_store.clone(),
1686 metadata.clone(),
1687 indexer_builder,
1688 file_path.clone(),
1689 flat_source,
1690 &write_opts,
1691 )
1692 .await;
1693 assert_eq!(200, info.num_rows);
1694 assert!(info.file_size > 0);
1695 assert!(info.index_metadata.file_size > 0);
1696
1697 let handle = create_file_handle_from_sst_info(&info, &metadata);
1698
1699 let cache = create_test_cache();
1700
1701 let preds = vec![col("tag_0").eq(lit("b"))];
1704 let inverted_index_applier = InvertedIndexApplierBuilder::new(
1705 FILE_DIR.to_string(),
1706 PathType::Bare,
1707 object_store.clone(),
1708 &metadata,
1709 HashSet::from_iter([0]),
1710 env.get_puffin_manager(),
1711 )
1712 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1713 .with_inverted_index_cache(cache.inverted_index_cache().cloned())
1714 .build(&preds)
1715 .unwrap()
1716 .map(Arc::new);
1717
1718 let builder = ParquetReaderBuilder::new(
1719 FILE_DIR.to_string(),
1720 PathType::Bare,
1721 handle.clone(),
1722 object_store.clone(),
1723 )
1724 .flat_format(true)
1725 .predicate(Some(Predicate::new(preds)))
1726 .inverted_index_appliers([inverted_index_applier.clone(), None])
1727 .cache(CacheStrategy::EnableAll(cache.clone()));
1728
1729 let mut metrics = ReaderMetrics::default();
1730 let (_context, selection) = builder
1731 .build_reader_input(&mut metrics)
1732 .await
1733 .unwrap()
1734 .unwrap();
1735
1736 assert_eq!(selection.row_group_count(), 1);
1738 assert_eq!(50, selection.get(0).unwrap().row_count());
1739
1740 assert_eq!(metrics.filter_metrics.rg_total, 2);
1744 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
1746 assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
1747 }
1748
1749 #[tokio::test]
1750 async fn test_write_flat_read_with_bloom_filter_sparse() {
1751 let mut env = TestEnv::new().await;
1752 let object_store = env.init_object_store_manager();
1753 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1754 let metadata = Arc::new(sst_region_metadata_with_encoding(
1755 PrimaryKeyEncoding::Sparse,
1756 ));
1757 let row_group_size = 100;
1758
1759 let flat_batches = vec![
1767 new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
1768 new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
1769 new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
1770 new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
1771 ];
1772
1773 let flat_source = new_flat_source_from_record_batches(flat_batches);
1774
1775 let write_opts = WriteOptions {
1776 row_group_size,
1777 ..Default::default()
1778 };
1779
1780 let indexer_builder = create_test_indexer_builder(
1781 &env,
1782 object_store.clone(),
1783 file_path.clone(),
1784 metadata.clone(),
1785 row_group_size,
1786 );
1787
1788 let info = write_flat_sst(
1789 object_store.clone(),
1790 metadata.clone(),
1791 indexer_builder,
1792 file_path.clone(),
1793 flat_source,
1794 &write_opts,
1795 )
1796 .await;
1797 assert_eq!(200, info.num_rows);
1798 assert!(info.file_size > 0);
1799 assert!(info.index_metadata.file_size > 0);
1800
1801 let handle = create_file_handle_from_sst_info(&info, &metadata);
1802
1803 let cache = create_test_cache();
1804
1805 let preds = vec![
1808 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
1809 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
1810 col("tag_1").eq(lit("d")),
1811 ];
1812 let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
1813 FILE_DIR.to_string(),
1814 PathType::Bare,
1815 object_store.clone(),
1816 &metadata,
1817 env.get_puffin_manager(),
1818 )
1819 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
1820 .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
1821 .build(&preds)
1822 .unwrap()
1823 .map(Arc::new);
1824
1825 let builder = ParquetReaderBuilder::new(
1826 FILE_DIR.to_string(),
1827 PathType::Bare,
1828 handle.clone(),
1829 object_store.clone(),
1830 )
1831 .flat_format(true)
1832 .predicate(Some(Predicate::new(preds)))
1833 .bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
1834 .cache(CacheStrategy::EnableAll(cache.clone()));
1835
1836 let mut metrics = ReaderMetrics::default();
1837 let (_context, selection) = builder
1838 .build_reader_input(&mut metrics)
1839 .await
1840 .unwrap()
1841 .unwrap();
1842
1843 assert_eq!(selection.row_group_count(), 2);
1845 assert_eq!(50, selection.get(0).unwrap().row_count());
1846 assert_eq!(50, selection.get(1).unwrap().row_count());
1847
1848 assert_eq!(metrics.filter_metrics.rg_total, 2);
1850 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
1851 assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
1852 assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
1853 }
1854
1855 fn fulltext_region_metadata() -> RegionMetadata {
1858 let mut builder = RegionMetadataBuilder::new(REGION_ID);
1859 builder
1860 .push_column_metadata(ColumnMetadata {
1861 column_schema: ColumnSchema::new(
1862 "tag_0".to_string(),
1863 ConcreteDataType::string_datatype(),
1864 true,
1865 ),
1866 semantic_type: SemanticType::Tag,
1867 column_id: 0,
1868 })
1869 .push_column_metadata(ColumnMetadata {
1870 column_schema: ColumnSchema::new(
1871 "text_bloom".to_string(),
1872 ConcreteDataType::string_datatype(),
1873 true,
1874 )
1875 .with_fulltext_options(FulltextOptions {
1876 enable: true,
1877 analyzer: FulltextAnalyzer::English,
1878 case_sensitive: false,
1879 backend: FulltextBackend::Bloom,
1880 granularity: 1,
1881 false_positive_rate_in_10000: 50,
1882 })
1883 .unwrap(),
1884 semantic_type: SemanticType::Field,
1885 column_id: 1,
1886 })
1887 .push_column_metadata(ColumnMetadata {
1888 column_schema: ColumnSchema::new(
1889 "text_tantivy".to_string(),
1890 ConcreteDataType::string_datatype(),
1891 true,
1892 )
1893 .with_fulltext_options(FulltextOptions {
1894 enable: true,
1895 analyzer: FulltextAnalyzer::English,
1896 case_sensitive: false,
1897 backend: FulltextBackend::Tantivy,
1898 granularity: 1,
1899 false_positive_rate_in_10000: 50,
1900 })
1901 .unwrap(),
1902 semantic_type: SemanticType::Field,
1903 column_id: 2,
1904 })
1905 .push_column_metadata(ColumnMetadata {
1906 column_schema: ColumnSchema::new(
1907 "field_0".to_string(),
1908 ConcreteDataType::uint64_datatype(),
1909 true,
1910 ),
1911 semantic_type: SemanticType::Field,
1912 column_id: 3,
1913 })
1914 .push_column_metadata(ColumnMetadata {
1915 column_schema: ColumnSchema::new(
1916 "ts".to_string(),
1917 ConcreteDataType::timestamp_millisecond_datatype(),
1918 false,
1919 ),
1920 semantic_type: SemanticType::Timestamp,
1921 column_id: 4,
1922 })
1923 .primary_key(vec![0]);
1924 builder.build().unwrap()
1925 }
1926
1927 fn new_fulltext_record_batch_by_range(
1929 tag: &str,
1930 text_bloom: &str,
1931 text_tantivy: &str,
1932 start: usize,
1933 end: usize,
1934 ) -> RecordBatch {
1935 assert!(end >= start);
1936 let metadata = Arc::new(fulltext_region_metadata());
1937 let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1938
1939 let num_rows = end - start;
1940 let mut columns = Vec::new();
1941
1942 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1944 for _ in 0..num_rows {
1945 tag_builder.append_value(tag);
1946 }
1947 columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
1948
1949 let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
1951 columns.push(Arc::new(StringArray::from(text_bloom_values)));
1952
1953 let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
1955 columns.push(Arc::new(StringArray::from(text_tantivy_values)));
1956
1957 let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
1959 columns.push(Arc::new(UInt64Array::from(field_values)));
1960
1961 let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
1963 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
1964
1965 let pk = new_primary_key(&[tag]);
1967 let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1968 for _ in 0..num_rows {
1969 pk_builder.append(&pk).unwrap();
1970 }
1971 columns.push(Arc::new(pk_builder.finish()));
1972
1973 columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
1975
1976 columns.push(Arc::new(UInt8Array::from_value(
1978 OpType::Put as u8,
1979 num_rows,
1980 )));
1981
1982 RecordBatch::try_new(flat_schema, columns).unwrap()
1983 }
1984
1985 #[tokio::test]
1986 async fn test_write_flat_read_with_fulltext_index() {
1987 let mut env = TestEnv::new().await;
1988 let object_store = env.init_object_store_manager();
1989 let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
1990 let metadata = Arc::new(fulltext_region_metadata());
1991 let row_group_size = 50;
1992
1993 let flat_batches = vec![
1999 new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
2000 new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
2001 new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
2002 new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
2003 ];
2004
2005 let flat_source = new_flat_source_from_record_batches(flat_batches);
2006
2007 let write_opts = WriteOptions {
2008 row_group_size,
2009 ..Default::default()
2010 };
2011
2012 let indexer_builder = create_test_indexer_builder(
2013 &env,
2014 object_store.clone(),
2015 file_path.clone(),
2016 metadata.clone(),
2017 row_group_size,
2018 );
2019
2020 let mut info = write_flat_sst(
2021 object_store.clone(),
2022 metadata.clone(),
2023 indexer_builder,
2024 file_path.clone(),
2025 flat_source,
2026 &write_opts,
2027 )
2028 .await;
2029 assert_eq!(200, info.num_rows);
2030 assert!(info.file_size > 0);
2031 assert!(info.index_metadata.file_size > 0);
2032
2033 assert!(info.index_metadata.fulltext_index.index_size > 0);
2035 assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
2036 info.index_metadata.fulltext_index.columns.sort_unstable();
2038 assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
2039
2040 assert_eq!(
2041 (
2042 Timestamp::new_millisecond(0),
2043 Timestamp::new_millisecond(199)
2044 ),
2045 info.time_range
2046 );
2047
2048 let handle = create_file_handle_from_sst_info(&info, &metadata);
2049
2050 let cache = create_test_cache();
2051
2052 let matches_func = || {
2054 Arc::new(
2055 ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
2056 .provide(Default::default()),
2057 )
2058 };
2059
2060 let matches_term_func = || {
2061 Arc::new(
2062 ScalarFunctionFactory::from(
2063 Arc::new(MatchesTermFunction::default()) as FunctionRef,
2064 )
2065 .provide(Default::default()),
2066 )
2067 };
2068
2069 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2072 args: vec![col("text_bloom"), "hello".lit()],
2073 func: matches_term_func(),
2074 })];
2075
2076 let fulltext_applier = FulltextIndexApplierBuilder::new(
2077 FILE_DIR.to_string(),
2078 PathType::Bare,
2079 object_store.clone(),
2080 env.get_puffin_manager(),
2081 &metadata,
2082 )
2083 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2084 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2085 .build(&preds)
2086 .unwrap()
2087 .map(Arc::new);
2088
2089 let builder = ParquetReaderBuilder::new(
2090 FILE_DIR.to_string(),
2091 PathType::Bare,
2092 handle.clone(),
2093 object_store.clone(),
2094 )
2095 .flat_format(true)
2096 .predicate(Some(Predicate::new(preds)))
2097 .fulltext_index_appliers([None, fulltext_applier.clone()])
2098 .cache(CacheStrategy::EnableAll(cache.clone()));
2099
2100 let mut metrics = ReaderMetrics::default();
2101 let (_context, selection) = builder
2102 .build_reader_input(&mut metrics)
2103 .await
2104 .unwrap()
2105 .unwrap();
2106
2107 assert_eq!(selection.row_group_count(), 2);
2109 assert_eq!(50, selection.get(0).unwrap().row_count());
2110 assert_eq!(50, selection.get(1).unwrap().row_count());
2111
2112 assert_eq!(metrics.filter_metrics.rg_total, 4);
2114 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2115 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2116 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2117
2118 let preds = vec![Expr::ScalarFunction(ScalarFunction {
2121 args: vec![col("text_tantivy"), "lazy".lit()],
2122 func: matches_func(),
2123 })];
2124
2125 let fulltext_applier = FulltextIndexApplierBuilder::new(
2126 FILE_DIR.to_string(),
2127 PathType::Bare,
2128 object_store.clone(),
2129 env.get_puffin_manager(),
2130 &metadata,
2131 )
2132 .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
2133 .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
2134 .build(&preds)
2135 .unwrap()
2136 .map(Arc::new);
2137
2138 let builder = ParquetReaderBuilder::new(
2139 FILE_DIR.to_string(),
2140 PathType::Bare,
2141 handle.clone(),
2142 object_store.clone(),
2143 )
2144 .flat_format(true)
2145 .predicate(Some(Predicate::new(preds)))
2146 .fulltext_index_appliers([None, fulltext_applier.clone()])
2147 .cache(CacheStrategy::EnableAll(cache.clone()));
2148
2149 let mut metrics = ReaderMetrics::default();
2150 let (_context, selection) = builder
2151 .build_reader_input(&mut metrics)
2152 .await
2153 .unwrap()
2154 .unwrap();
2155
2156 assert_eq!(selection.row_group_count(), 2);
2158 assert_eq!(50, selection.get(2).unwrap().row_count());
2159 assert_eq!(50, selection.get(3).unwrap().row_count());
2160
2161 assert_eq!(metrics.filter_metrics.rg_total, 4);
2163 assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
2164 assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
2165 assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
2166 }
2167}