1use std::collections::HashMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
24
25use crate::cache::file_cache::FileCacheRef;
26use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
27use crate::error::Result;
28use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
29use crate::sst::index::puffin_manager::PuffinManagerFactory;
30
31#[derive(Default, Debug)]
35pub struct FulltextRequest {
36 pub queries: Vec<FulltextQuery>,
37 pub terms: Vec<FulltextTerm>,
38}
39
40impl FulltextRequest {
41 pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
48 let mut query = String::new();
49 for term in &self.terms {
50 if skip_lowercased && term.col_lowered {
51 continue;
52 }
53 let escaped_term = term.term.replace("\"", "\\\"");
55 if query.is_empty() {
56 query = format!("+\"{escaped_term}\"");
57 } else {
58 query.push_str(&format!(" +\"{escaped_term}\""));
59 }
60 }
61 FulltextQuery(query)
62 }
63}
64
65#[derive(Debug, PartialEq, Eq)]
69pub struct FulltextQuery(pub String);
70
71#[derive(Debug, PartialEq, Eq)]
76pub struct FulltextTerm {
77 pub col_lowered: bool,
78 pub term: String,
79}
80
81pub struct FulltextIndexApplierBuilder<'a> {
83 region_dir: String,
84 region_id: RegionId,
85 store: ObjectStore,
86 puffin_manager_factory: PuffinManagerFactory,
87 metadata: &'a RegionMetadata,
88 file_cache: Option<FileCacheRef>,
89 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
90 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
91}
92
93impl<'a> FulltextIndexApplierBuilder<'a> {
94 pub fn new(
96 region_dir: String,
97 region_id: RegionId,
98 store: ObjectStore,
99 puffin_manager_factory: PuffinManagerFactory,
100 metadata: &'a RegionMetadata,
101 ) -> Self {
102 Self {
103 region_dir,
104 region_id,
105 store,
106 puffin_manager_factory,
107 metadata,
108 file_cache: None,
109 puffin_metadata_cache: None,
110 bloom_filter_cache: None,
111 }
112 }
113
114 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
116 self.file_cache = file_cache;
117 self
118 }
119
120 pub fn with_puffin_metadata_cache(
122 mut self,
123 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
124 ) -> Self {
125 self.puffin_metadata_cache = puffin_metadata_cache;
126 self
127 }
128
129 pub fn with_bloom_filter_cache(
131 mut self,
132 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
133 ) -> Self {
134 self.bloom_filter_cache = bloom_filter_cache;
135 self
136 }
137
138 pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
140 let mut requests = HashMap::new();
141 for expr in exprs {
142 Self::extract_requests(expr, self.metadata, &mut requests);
143 }
144
145 let has_requests = requests
147 .iter()
148 .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
149
150 Ok(has_requests.then(|| {
151 FulltextIndexApplier::new(
152 self.region_dir,
153 self.region_id,
154 self.store,
155 requests,
156 self.puffin_manager_factory,
157 )
158 .with_file_cache(self.file_cache)
159 .with_puffin_metadata_cache(self.puffin_metadata_cache)
160 .with_bloom_filter_cache(self.bloom_filter_cache)
161 }))
162 }
163
164 fn extract_requests(
165 expr: &Expr,
166 metadata: &'a RegionMetadata,
167 requests: &mut HashMap<ColumnId, FulltextRequest>,
168 ) {
169 match expr {
170 Expr::BinaryExpr(BinaryExpr {
171 left,
172 op: Operator::And,
173 right,
174 }) => {
175 Self::extract_requests(left, metadata, requests);
176 Self::extract_requests(right, metadata, requests);
177 }
178 Expr::ScalarFunction(func) => {
179 if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
180 requests.entry(column_id).or_default().queries.push(query);
181 } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
182 requests.entry(column_id).or_default().terms.push(term);
183 }
184 }
185 _ => {}
186 }
187 }
188
189 fn expr_to_query(
190 metadata: &RegionMetadata,
191 f: &ScalarFunction,
192 ) -> Option<(ColumnId, FulltextQuery)> {
193 if f.name() != "matches" {
194 return None;
195 }
196 if f.args.len() != 2 {
197 return None;
198 }
199
200 let Expr::Column(c) = &f.args[0] else {
201 return None;
202 };
203 let column = metadata.column_by_name(&c.name)?;
204
205 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
206 return None;
207 }
208
209 let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
210 return None;
211 };
212
213 Some((column.column_id, FulltextQuery(query.to_string())))
214 }
215
216 fn expr_to_term(
217 metadata: &RegionMetadata,
218 f: &ScalarFunction,
219 ) -> Option<(ColumnId, FulltextTerm)> {
220 if f.name() != "matches_term" {
221 return None;
222 }
223 if f.args.len() != 2 {
224 return None;
225 }
226
227 let mut lowered = false;
228 let column;
229 match &f.args[0] {
230 Expr::Column(c) => {
231 column = c;
232 }
233 Expr::ScalarFunction(f) => {
234 let lower_arg = Self::extract_lower_arg(f)?;
235 lowered = true;
236 if let Expr::Column(c) = lower_arg {
237 column = c;
238 } else {
239 return None;
240 }
241 }
242 _ => return None,
243 }
244
245 let column = metadata.column_by_name(&column.name)?;
246 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
247 return None;
248 }
249
250 let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
251 return None;
252 };
253
254 Some((
255 column.column_id,
256 FulltextTerm {
257 col_lowered: lowered,
258 term: term.to_string(),
259 },
260 ))
261 }
262
263 fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
264 if lower_func.args.len() != 1 {
265 return None;
266 }
267
268 if lower_func.name() != "lower" {
269 return None;
270 }
271
272 if lower_func.args.len() != 1 {
273 return None;
274 }
275
276 Some(&lower_func.args[0])
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::sync::Arc;
283
284 use api::v1::SemanticType;
285 use common_function::function_registry::FUNCTION_REGISTRY;
286 use common_function::scalars::udf::create_udf;
287 use datafusion::functions::string::lower;
288 use datafusion_common::Column;
289 use datafusion_expr::expr::ScalarFunction;
290 use datafusion_expr::ScalarUDF;
291 use datatypes::schema::ColumnSchema;
292 use session::context::QueryContext;
293 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
294 use store_api::storage::RegionId;
295
296 use super::*;
297
298 fn mock_metadata() -> RegionMetadata {
299 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
300 builder
301 .push_column_metadata(ColumnMetadata {
302 column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
303 semantic_type: SemanticType::Field,
304 column_id: 1,
305 })
306 .push_column_metadata(ColumnMetadata {
307 column_schema: ColumnSchema::new(
308 "ts",
309 ConcreteDataType::timestamp_millisecond_datatype(),
310 false,
311 ),
312 semantic_type: SemanticType::Timestamp,
313 column_id: 2,
314 });
315
316 builder.build().unwrap()
317 }
318
319 fn matches_func() -> Arc<ScalarUDF> {
320 Arc::new(create_udf(
321 FUNCTION_REGISTRY.get_function("matches").unwrap(),
322 QueryContext::arc(),
323 Default::default(),
324 ))
325 }
326
327 fn matches_term_func() -> Arc<ScalarUDF> {
328 Arc::new(create_udf(
329 FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
330 QueryContext::arc(),
331 Default::default(),
332 ))
333 }
334
335 #[test]
336 fn test_expr_to_query_basic() {
337 let metadata = mock_metadata();
338
339 let func = ScalarFunction {
340 args: vec![
341 Expr::Column(Column::from_name("text")),
342 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
343 ],
344 func: matches_func(),
345 };
346
347 let (column_id, query) =
348 FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
349 assert_eq!(column_id, 1);
350 assert_eq!(query, FulltextQuery("foo".to_string()));
351 }
352
353 #[test]
354 fn test_expr_to_query_wrong_num_args() {
355 let metadata = mock_metadata();
356
357 let func = ScalarFunction {
358 args: vec![Expr::Column(Column::from_name("text"))],
359 func: matches_func(),
360 };
361
362 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
363 }
364
365 #[test]
366 fn test_expr_to_query_not_found_column() {
367 let metadata = mock_metadata();
368
369 let func = ScalarFunction {
370 args: vec![
371 Expr::Column(Column::from_name("not_found")),
372 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
373 ],
374 func: matches_func(),
375 };
376
377 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
378 }
379
380 #[test]
381 fn test_expr_to_query_column_wrong_data_type() {
382 let metadata = mock_metadata();
383
384 let func = ScalarFunction {
385 args: vec![
386 Expr::Column(Column::from_name("ts")),
387 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
388 ],
389 func: matches_func(),
390 };
391
392 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
393 }
394
395 #[test]
396 fn test_expr_to_query_pattern_not_string() {
397 let metadata = mock_metadata();
398
399 let func = ScalarFunction {
400 args: vec![
401 Expr::Column(Column::from_name("text")),
402 Expr::Literal(ScalarValue::Int64(Some(42))),
403 ],
404 func: matches_func(),
405 };
406
407 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
408 }
409
410 #[test]
411 fn test_expr_to_term_basic() {
412 let metadata = mock_metadata();
413
414 let func = ScalarFunction {
415 args: vec![
416 Expr::Column(Column::from_name("text")),
417 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
418 ],
419 func: matches_term_func(),
420 };
421
422 let (column_id, term) =
423 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
424 assert_eq!(column_id, 1);
425 assert_eq!(
426 term,
427 FulltextTerm {
428 col_lowered: false,
429 term: "foo".to_string(),
430 }
431 );
432 }
433
434 #[test]
435 fn test_expr_to_term_with_lower() {
436 let metadata = mock_metadata();
437
438 let lower_func_expr = ScalarFunction {
439 args: vec![Expr::Column(Column::from_name("text"))],
440 func: lower(),
441 };
442
443 let func = ScalarFunction {
444 args: vec![
445 Expr::ScalarFunction(lower_func_expr),
446 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
447 ],
448 func: matches_term_func(),
449 };
450
451 let (column_id, term) =
452 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
453 assert_eq!(column_id, 1);
454 assert_eq!(
455 term,
456 FulltextTerm {
457 col_lowered: true,
458 term: "foo".to_string(),
459 }
460 );
461 }
462
463 #[test]
464 fn test_expr_to_term_wrong_num_args() {
465 let metadata = mock_metadata();
466
467 let func = ScalarFunction {
468 args: vec![Expr::Column(Column::from_name("text"))],
469 func: matches_term_func(),
470 };
471
472 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
473 }
474
475 #[test]
476 fn test_expr_to_term_wrong_function_name() {
477 let metadata = mock_metadata();
478
479 let func = ScalarFunction {
480 args: vec![
481 Expr::Column(Column::from_name("text")),
482 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
483 ],
484 func: matches_func(), };
486
487 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
488 }
489
490 #[test]
491 fn test_extract_lower_arg() {
492 let func = ScalarFunction {
493 args: vec![Expr::Column(Column::from_name("text"))],
494 func: lower(),
495 };
496
497 let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
498 match arg {
499 Expr::Column(c) => {
500 assert_eq!(c.name, "text");
501 }
502 _ => panic!("Expected Column expression"),
503 }
504 }
505
506 #[test]
507 fn test_extract_lower_arg_wrong_function() {
508 let func = ScalarFunction {
509 args: vec![Expr::Column(Column::from_name("text"))],
510 func: matches_func(), };
512
513 assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
514 }
515
516 #[test]
517 fn test_extract_requests() {
518 let metadata = mock_metadata();
519
520 let matches_expr = Expr::ScalarFunction(ScalarFunction {
522 args: vec![
523 Expr::Column(Column::from_name("text")),
524 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
525 ],
526 func: matches_func(),
527 });
528
529 let mut requests = HashMap::new();
530 FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
531
532 assert_eq!(requests.len(), 1);
533 let request = requests.get(&1).unwrap();
534 assert_eq!(request.queries.len(), 1);
535 assert_eq!(request.terms.len(), 0);
536 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
537 }
538
539 #[test]
540 fn test_extract_multiple_requests() {
541 let metadata = mock_metadata();
542
543 let matches_expr = Expr::ScalarFunction(ScalarFunction {
545 args: vec![
546 Expr::Column(Column::from_name("text")),
547 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
548 ],
549 func: matches_func(),
550 });
551
552 let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
554 args: vec![
555 Expr::Column(Column::from_name("text")),
556 Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
557 ],
558 func: matches_term_func(),
559 });
560
561 let binary_expr = Expr::BinaryExpr(BinaryExpr {
563 left: Box::new(matches_expr),
564 op: Operator::And,
565 right: Box::new(matches_term_expr),
566 });
567
568 let mut requests = HashMap::new();
569 FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
570
571 assert_eq!(requests.len(), 1);
572 let request = requests.get(&1).unwrap();
573 assert_eq!(request.queries.len(), 1);
574 assert_eq!(request.terms.len(), 1);
575 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
576 assert_eq!(
577 request.terms[0],
578 FulltextTerm {
579 col_lowered: false,
580 term: "bar".to_string(),
581 }
582 );
583 }
584
585 #[test]
586 fn test_terms_as_query() {
587 let request = FulltextRequest::default();
589 assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
590 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
591
592 let mut request = FulltextRequest::default();
594 request.terms.push(FulltextTerm {
595 col_lowered: false,
596 term: "foo".to_string(),
597 });
598 assert_eq!(
599 request.terms_as_query(false),
600 FulltextQuery("+\"foo\"".to_string())
601 );
602 assert_eq!(
603 request.terms_as_query(true),
604 FulltextQuery("+\"foo\"".to_string())
605 );
606
607 let mut request = FulltextRequest::default();
609 request.terms.push(FulltextTerm {
610 col_lowered: true,
611 term: "foo".to_string(),
612 });
613 assert_eq!(
614 request.terms_as_query(false),
615 FulltextQuery("+\"foo\"".to_string())
616 );
617 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); let mut request = FulltextRequest::default();
621 request.terms.push(FulltextTerm {
622 col_lowered: false,
623 term: "foo".to_string(),
624 });
625 request.terms.push(FulltextTerm {
626 col_lowered: true,
627 term: "bar".to_string(),
628 });
629 assert_eq!(
630 request.terms_as_query(false),
631 FulltextQuery("+\"foo\" +\"bar\"".to_string())
632 );
633 assert_eq!(
634 request.terms_as_query(true),
635 FulltextQuery("+\"foo\"".to_string()) );
637
638 let mut request = FulltextRequest::default();
640 request.terms.push(FulltextTerm {
641 col_lowered: false,
642 term: "foo\"bar".to_string(),
643 });
644 assert_eq!(
645 request.terms_as_query(false),
646 FulltextQuery("+\"foo\\\"bar\"".to_string())
647 );
648
649 let mut request = FulltextRequest::default();
651 request.terms.push(FulltextTerm {
652 col_lowered: false,
653 term: "foo".to_string(),
654 });
655 request.terms.push(FulltextTerm {
656 col_lowered: true,
657 term: "bar\"quoted\"".to_string(),
658 });
659 request.terms.push(FulltextTerm {
660 col_lowered: false,
661 term: "baz\\escape".to_string(),
662 });
663 assert_eq!(
664 request.terms_as_query(false),
665 FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
666 );
667 assert_eq!(
668 request.terms_as_query(true),
669 FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) );
671 }
672}