1use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::region_request::PathType;
24use store_api::storage::{ColumnId, ConcreteDataType};
25
26use crate::cache::file_cache::FileCacheRef;
27use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
28use crate::error::Result;
29use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
30use crate::sst::index::puffin_manager::PuffinManagerFactory;
31
32#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
36pub struct FulltextRequest {
37 pub queries: Vec<FulltextQuery>,
38 pub terms: Vec<FulltextTerm>,
39}
40
41impl FulltextRequest {
42 pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
49 let mut query = String::new();
50 for term in &self.terms {
51 if skip_lowercased && term.col_lowered {
52 continue;
53 }
54 let escaped_term = term.term.replace("\"", "\\\"");
56 if query.is_empty() {
57 query = format!("+\"{escaped_term}\"");
58 } else {
59 query.push_str(&format!(" +\"{escaped_term}\""));
60 }
61 }
62 FulltextQuery(query)
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70pub struct FulltextQuery(pub String);
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77pub struct FulltextTerm {
78 pub col_lowered: bool,
79 pub term: String,
80}
81
82pub struct FulltextIndexApplierBuilder<'a> {
84 table_dir: String,
85 path_type: PathType,
86 store: ObjectStore,
87 puffin_manager_factory: PuffinManagerFactory,
88 metadata: &'a RegionMetadata,
89 file_cache: Option<FileCacheRef>,
90 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
91 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
92}
93
94impl<'a> FulltextIndexApplierBuilder<'a> {
95 pub fn new(
97 table_dir: String,
98 path_type: PathType,
99 store: ObjectStore,
100 puffin_manager_factory: PuffinManagerFactory,
101 metadata: &'a RegionMetadata,
102 ) -> Self {
103 Self {
104 table_dir,
105 path_type,
106 store,
107 puffin_manager_factory,
108 metadata,
109 file_cache: None,
110 puffin_metadata_cache: None,
111 bloom_filter_cache: None,
112 }
113 }
114
115 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
117 self.file_cache = file_cache;
118 self
119 }
120
121 pub fn with_puffin_metadata_cache(
123 mut self,
124 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
125 ) -> Self {
126 self.puffin_metadata_cache = puffin_metadata_cache;
127 self
128 }
129
130 pub fn with_bloom_filter_cache(
132 mut self,
133 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
134 ) -> Self {
135 self.bloom_filter_cache = bloom_filter_cache;
136 self
137 }
138
139 pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
141 let mut requests = BTreeMap::new();
142 for expr in exprs {
143 Self::extract_requests(expr, self.metadata, &mut requests);
144 }
145
146 let has_requests = requests
148 .iter()
149 .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
150
151 Ok(has_requests.then(|| {
152 FulltextIndexApplier::new(
153 self.table_dir,
154 self.path_type,
155 self.store,
156 requests,
157 self.puffin_manager_factory,
158 )
159 .with_file_cache(self.file_cache)
160 .with_puffin_metadata_cache(self.puffin_metadata_cache)
161 .with_bloom_filter_cache(self.bloom_filter_cache)
162 }))
163 }
164
165 fn extract_requests(
166 expr: &Expr,
167 metadata: &'a RegionMetadata,
168 requests: &mut BTreeMap<ColumnId, FulltextRequest>,
169 ) {
170 match expr {
171 Expr::BinaryExpr(BinaryExpr {
172 left,
173 op: Operator::And,
174 right,
175 }) => {
176 Self::extract_requests(left, metadata, requests);
177 Self::extract_requests(right, metadata, requests);
178 }
179 Expr::ScalarFunction(func) => {
180 if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
181 requests.entry(column_id).or_default().queries.push(query);
182 } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
183 requests.entry(column_id).or_default().terms.push(term);
184 }
185 }
186 _ => {}
187 }
188 }
189
190 fn expr_to_query(
191 metadata: &RegionMetadata,
192 f: &ScalarFunction,
193 ) -> Option<(ColumnId, FulltextQuery)> {
194 if f.name() != "matches" {
195 return None;
196 }
197 if f.args.len() != 2 {
198 return None;
199 }
200
201 let Expr::Column(c) = &f.args[0] else {
202 return None;
203 };
204 let column = metadata.column_by_name(&c.name)?;
205
206 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
207 return None;
208 }
209
210 let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
211 return None;
212 };
213
214 Some((column.column_id, FulltextQuery(query.to_string())))
215 }
216
217 fn expr_to_term(
218 metadata: &RegionMetadata,
219 f: &ScalarFunction,
220 ) -> Option<(ColumnId, FulltextTerm)> {
221 if f.name() != "matches_term" {
222 return None;
223 }
224 if f.args.len() != 2 {
225 return None;
226 }
227
228 let mut lowered = false;
229 let column;
230 match &f.args[0] {
231 Expr::Column(c) => {
232 column = c;
233 }
234 Expr::ScalarFunction(f) => {
235 let lower_arg = Self::extract_lower_arg(f)?;
236 lowered = true;
237 if let Expr::Column(c) = lower_arg {
238 column = c;
239 } else {
240 return None;
241 }
242 }
243 _ => return None,
244 }
245
246 let column = metadata.column_by_name(&column.name)?;
247 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
248 return None;
249 }
250
251 let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
252 return None;
253 };
254
255 Some((
256 column.column_id,
257 FulltextTerm {
258 col_lowered: lowered,
259 term: term.to_string(),
260 },
261 ))
262 }
263
264 fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
265 if lower_func.args.len() != 1 {
266 return None;
267 }
268
269 if lower_func.name() != "lower" {
270 return None;
271 }
272
273 if lower_func.args.len() != 1 {
274 return None;
275 }
276
277 Some(&lower_func.args[0])
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use std::sync::Arc;
284
285 use api::v1::SemanticType;
286 use common_function::function::FunctionRef;
287 use common_function::function_factory::ScalarFunctionFactory;
288 use common_function::scalars::matches::MatchesFunction;
289 use common_function::scalars::matches_term::MatchesTermFunction;
290 use datafusion::functions::string::lower;
291 use datafusion_common::Column;
292 use datafusion_expr::expr::ScalarFunction;
293 use datafusion_expr::ScalarUDF;
294 use datatypes::schema::ColumnSchema;
295 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
296 use store_api::storage::RegionId;
297
298 use super::*;
299
300 fn mock_metadata() -> RegionMetadata {
301 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
302 builder
303 .push_column_metadata(ColumnMetadata {
304 column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
305 semantic_type: SemanticType::Field,
306 column_id: 1,
307 })
308 .push_column_metadata(ColumnMetadata {
309 column_schema: ColumnSchema::new(
310 "ts",
311 ConcreteDataType::timestamp_millisecond_datatype(),
312 false,
313 ),
314 semantic_type: SemanticType::Timestamp,
315 column_id: 2,
316 });
317
318 builder.build().unwrap()
319 }
320
321 fn matches_func() -> Arc<ScalarUDF> {
322 Arc::new(
323 ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
324 .provide(Default::default()),
325 )
326 }
327
328 fn matches_term_func() -> Arc<ScalarUDF> {
329 Arc::new(
330 ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
331 .provide(Default::default()),
332 )
333 }
334
335 #[test]
336 fn test_expr_to_query_basic() {
337 let metadata = mock_metadata();
338
339 let func = ScalarFunction {
340 args: vec![
341 Expr::Column(Column::from_name("text")),
342 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
343 ],
344 func: matches_func(),
345 };
346
347 let (column_id, query) =
348 FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
349 assert_eq!(column_id, 1);
350 assert_eq!(query, FulltextQuery("foo".to_string()));
351 }
352
353 #[test]
354 fn test_expr_to_query_wrong_num_args() {
355 let metadata = mock_metadata();
356
357 let func = ScalarFunction {
358 args: vec![Expr::Column(Column::from_name("text"))],
359 func: matches_func(),
360 };
361
362 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
363 }
364
365 #[test]
366 fn test_expr_to_query_not_found_column() {
367 let metadata = mock_metadata();
368
369 let func = ScalarFunction {
370 args: vec![
371 Expr::Column(Column::from_name("not_found")),
372 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
373 ],
374 func: matches_func(),
375 };
376
377 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
378 }
379
380 #[test]
381 fn test_expr_to_query_column_wrong_data_type() {
382 let metadata = mock_metadata();
383
384 let func = ScalarFunction {
385 args: vec![
386 Expr::Column(Column::from_name("ts")),
387 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
388 ],
389 func: matches_func(),
390 };
391
392 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
393 }
394
395 #[test]
396 fn test_expr_to_query_pattern_not_string() {
397 let metadata = mock_metadata();
398
399 let func = ScalarFunction {
400 args: vec![
401 Expr::Column(Column::from_name("text")),
402 Expr::Literal(ScalarValue::Int64(Some(42))),
403 ],
404 func: matches_func(),
405 };
406
407 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
408 }
409
410 #[test]
411 fn test_expr_to_term_basic() {
412 let metadata = mock_metadata();
413
414 let func = ScalarFunction {
415 args: vec![
416 Expr::Column(Column::from_name("text")),
417 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
418 ],
419 func: matches_term_func(),
420 };
421
422 let (column_id, term) =
423 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
424 assert_eq!(column_id, 1);
425 assert_eq!(
426 term,
427 FulltextTerm {
428 col_lowered: false,
429 term: "foo".to_string(),
430 }
431 );
432 }
433
434 #[test]
435 fn test_expr_to_term_with_lower() {
436 let metadata = mock_metadata();
437
438 let lower_func_expr = ScalarFunction {
439 args: vec![Expr::Column(Column::from_name("text"))],
440 func: lower(),
441 };
442
443 let func = ScalarFunction {
444 args: vec![
445 Expr::ScalarFunction(lower_func_expr),
446 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
447 ],
448 func: matches_term_func(),
449 };
450
451 let (column_id, term) =
452 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
453 assert_eq!(column_id, 1);
454 assert_eq!(
455 term,
456 FulltextTerm {
457 col_lowered: true,
458 term: "foo".to_string(),
459 }
460 );
461 }
462
463 #[test]
464 fn test_expr_to_term_wrong_num_args() {
465 let metadata = mock_metadata();
466
467 let func = ScalarFunction {
468 args: vec![Expr::Column(Column::from_name("text"))],
469 func: matches_term_func(),
470 };
471
472 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
473 }
474
475 #[test]
476 fn test_expr_to_term_wrong_function_name() {
477 let metadata = mock_metadata();
478
479 let func = ScalarFunction {
480 args: vec![
481 Expr::Column(Column::from_name("text")),
482 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
483 ],
484 func: matches_func(), };
486
487 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
488 }
489
490 #[test]
491 fn test_extract_lower_arg() {
492 let func = ScalarFunction {
493 args: vec![Expr::Column(Column::from_name("text"))],
494 func: lower(),
495 };
496
497 let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
498 match arg {
499 Expr::Column(c) => {
500 assert_eq!(c.name, "text");
501 }
502 _ => panic!("Expected Column expression"),
503 }
504 }
505
506 #[test]
507 fn test_extract_lower_arg_wrong_function() {
508 let func = ScalarFunction {
509 args: vec![Expr::Column(Column::from_name("text"))],
510 func: matches_func(), };
512
513 assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
514 }
515
516 #[test]
517 fn test_extract_requests() {
518 let metadata = mock_metadata();
519
520 let matches_expr = Expr::ScalarFunction(ScalarFunction {
522 args: vec![
523 Expr::Column(Column::from_name("text")),
524 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
525 ],
526 func: matches_func(),
527 });
528
529 let mut requests = BTreeMap::new();
530 FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
531
532 assert_eq!(requests.len(), 1);
533 let request = requests.get(&1).unwrap();
534 assert_eq!(request.queries.len(), 1);
535 assert_eq!(request.terms.len(), 0);
536 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
537 }
538
539 #[test]
540 fn test_extract_multiple_requests() {
541 let metadata = mock_metadata();
542
543 let matches_expr = Expr::ScalarFunction(ScalarFunction {
545 args: vec![
546 Expr::Column(Column::from_name("text")),
547 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
548 ],
549 func: matches_func(),
550 });
551
552 let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
554 args: vec![
555 Expr::Column(Column::from_name("text")),
556 Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
557 ],
558 func: matches_term_func(),
559 });
560
561 let binary_expr = Expr::BinaryExpr(BinaryExpr {
563 left: Box::new(matches_expr),
564 op: Operator::And,
565 right: Box::new(matches_term_expr),
566 });
567
568 let mut requests = BTreeMap::new();
569 FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
570
571 assert_eq!(requests.len(), 1);
572 let request = requests.get(&1).unwrap();
573 assert_eq!(request.queries.len(), 1);
574 assert_eq!(request.terms.len(), 1);
575 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
576 assert_eq!(
577 request.terms[0],
578 FulltextTerm {
579 col_lowered: false,
580 term: "bar".to_string(),
581 }
582 );
583 }
584
585 #[test]
586 fn test_terms_as_query() {
587 let request = FulltextRequest::default();
589 assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
590 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
591
592 let mut request = FulltextRequest::default();
594 request.terms.push(FulltextTerm {
595 col_lowered: false,
596 term: "foo".to_string(),
597 });
598 assert_eq!(
599 request.terms_as_query(false),
600 FulltextQuery("+\"foo\"".to_string())
601 );
602 assert_eq!(
603 request.terms_as_query(true),
604 FulltextQuery("+\"foo\"".to_string())
605 );
606
607 let mut request = FulltextRequest::default();
609 request.terms.push(FulltextTerm {
610 col_lowered: true,
611 term: "foo".to_string(),
612 });
613 assert_eq!(
614 request.terms_as_query(false),
615 FulltextQuery("+\"foo\"".to_string())
616 );
617 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); let mut request = FulltextRequest::default();
621 request.terms.push(FulltextTerm {
622 col_lowered: false,
623 term: "foo".to_string(),
624 });
625 request.terms.push(FulltextTerm {
626 col_lowered: true,
627 term: "bar".to_string(),
628 });
629 assert_eq!(
630 request.terms_as_query(false),
631 FulltextQuery("+\"foo\" +\"bar\"".to_string())
632 );
633 assert_eq!(
634 request.terms_as_query(true),
635 FulltextQuery("+\"foo\"".to_string()) );
637
638 let mut request = FulltextRequest::default();
640 request.terms.push(FulltextTerm {
641 col_lowered: false,
642 term: "foo\"bar".to_string(),
643 });
644 assert_eq!(
645 request.terms_as_query(false),
646 FulltextQuery("+\"foo\\\"bar\"".to_string())
647 );
648
649 let mut request = FulltextRequest::default();
651 request.terms.push(FulltextTerm {
652 col_lowered: false,
653 term: "foo".to_string(),
654 });
655 request.terms.push(FulltextTerm {
656 col_lowered: true,
657 term: "bar\"quoted\"".to_string(),
658 });
659 request.terms.push(FulltextTerm {
660 col_lowered: false,
661 term: "baz\\escape".to_string(),
662 });
663 assert_eq!(
664 request.terms_as_query(false),
665 FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
666 );
667 assert_eq!(
668 request.terms_as_query(true),
669 FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) );
671 }
672}