1use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
24
25use crate::cache::file_cache::FileCacheRef;
26use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
27use crate::error::Result;
28use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
29use crate::sst::index::puffin_manager::PuffinManagerFactory;
30
31#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
35pub struct FulltextRequest {
36 pub queries: Vec<FulltextQuery>,
37 pub terms: Vec<FulltextTerm>,
38}
39
40impl FulltextRequest {
41 pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
48 let mut query = String::new();
49 for term in &self.terms {
50 if skip_lowercased && term.col_lowered {
51 continue;
52 }
53 let escaped_term = term.term.replace("\"", "\\\"");
55 if query.is_empty() {
56 query = format!("+\"{escaped_term}\"");
57 } else {
58 query.push_str(&format!(" +\"{escaped_term}\""));
59 }
60 }
61 FulltextQuery(query)
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct FulltextQuery(pub String);
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct FulltextTerm {
77 pub col_lowered: bool,
78 pub term: String,
79}
80
81pub struct FulltextIndexApplierBuilder<'a> {
83 region_dir: String,
84 region_id: RegionId,
85 store: ObjectStore,
86 puffin_manager_factory: PuffinManagerFactory,
87 metadata: &'a RegionMetadata,
88 file_cache: Option<FileCacheRef>,
89 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
90 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
91}
92
93impl<'a> FulltextIndexApplierBuilder<'a> {
94 pub fn new(
96 region_dir: String,
97 region_id: RegionId,
98 store: ObjectStore,
99 puffin_manager_factory: PuffinManagerFactory,
100 metadata: &'a RegionMetadata,
101 ) -> Self {
102 Self {
103 region_dir,
104 region_id,
105 store,
106 puffin_manager_factory,
107 metadata,
108 file_cache: None,
109 puffin_metadata_cache: None,
110 bloom_filter_cache: None,
111 }
112 }
113
114 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
116 self.file_cache = file_cache;
117 self
118 }
119
120 pub fn with_puffin_metadata_cache(
122 mut self,
123 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
124 ) -> Self {
125 self.puffin_metadata_cache = puffin_metadata_cache;
126 self
127 }
128
129 pub fn with_bloom_filter_cache(
131 mut self,
132 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
133 ) -> Self {
134 self.bloom_filter_cache = bloom_filter_cache;
135 self
136 }
137
138 pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
140 let mut requests = BTreeMap::new();
141 for expr in exprs {
142 Self::extract_requests(expr, self.metadata, &mut requests);
143 }
144
145 let has_requests = requests
147 .iter()
148 .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
149
150 Ok(has_requests.then(|| {
151 FulltextIndexApplier::new(
152 self.region_dir,
153 self.region_id,
154 self.store,
155 requests,
156 self.puffin_manager_factory,
157 )
158 .with_file_cache(self.file_cache)
159 .with_puffin_metadata_cache(self.puffin_metadata_cache)
160 .with_bloom_filter_cache(self.bloom_filter_cache)
161 }))
162 }
163
164 fn extract_requests(
165 expr: &Expr,
166 metadata: &'a RegionMetadata,
167 requests: &mut BTreeMap<ColumnId, FulltextRequest>,
168 ) {
169 match expr {
170 Expr::BinaryExpr(BinaryExpr {
171 left,
172 op: Operator::And,
173 right,
174 }) => {
175 Self::extract_requests(left, metadata, requests);
176 Self::extract_requests(right, metadata, requests);
177 }
178 Expr::ScalarFunction(func) => {
179 if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
180 requests.entry(column_id).or_default().queries.push(query);
181 } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
182 requests.entry(column_id).or_default().terms.push(term);
183 }
184 }
185 _ => {}
186 }
187 }
188
189 fn expr_to_query(
190 metadata: &RegionMetadata,
191 f: &ScalarFunction,
192 ) -> Option<(ColumnId, FulltextQuery)> {
193 if f.name() != "matches" {
194 return None;
195 }
196 if f.args.len() != 2 {
197 return None;
198 }
199
200 let Expr::Column(c) = &f.args[0] else {
201 return None;
202 };
203 let column = metadata.column_by_name(&c.name)?;
204
205 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
206 return None;
207 }
208
209 let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
210 return None;
211 };
212
213 Some((column.column_id, FulltextQuery(query.to_string())))
214 }
215
216 fn expr_to_term(
217 metadata: &RegionMetadata,
218 f: &ScalarFunction,
219 ) -> Option<(ColumnId, FulltextTerm)> {
220 if f.name() != "matches_term" {
221 return None;
222 }
223 if f.args.len() != 2 {
224 return None;
225 }
226
227 let mut lowered = false;
228 let column;
229 match &f.args[0] {
230 Expr::Column(c) => {
231 column = c;
232 }
233 Expr::ScalarFunction(f) => {
234 let lower_arg = Self::extract_lower_arg(f)?;
235 lowered = true;
236 if let Expr::Column(c) = lower_arg {
237 column = c;
238 } else {
239 return None;
240 }
241 }
242 _ => return None,
243 }
244
245 let column = metadata.column_by_name(&column.name)?;
246 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
247 return None;
248 }
249
250 let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
251 return None;
252 };
253
254 Some((
255 column.column_id,
256 FulltextTerm {
257 col_lowered: lowered,
258 term: term.to_string(),
259 },
260 ))
261 }
262
263 fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
264 if lower_func.args.len() != 1 {
265 return None;
266 }
267
268 if lower_func.name() != "lower" {
269 return None;
270 }
271
272 if lower_func.args.len() != 1 {
273 return None;
274 }
275
276 Some(&lower_func.args[0])
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::sync::Arc;
283
284 use api::v1::SemanticType;
285 use common_function::function::FunctionRef;
286 use common_function::function_factory::ScalarFunctionFactory;
287 use common_function::scalars::matches::MatchesFunction;
288 use common_function::scalars::matches_term::MatchesTermFunction;
289 use datafusion::functions::string::lower;
290 use datafusion_common::Column;
291 use datafusion_expr::expr::ScalarFunction;
292 use datafusion_expr::ScalarUDF;
293 use datatypes::schema::ColumnSchema;
294 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
295 use store_api::storage::RegionId;
296
297 use super::*;
298
299 fn mock_metadata() -> RegionMetadata {
300 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
301 builder
302 .push_column_metadata(ColumnMetadata {
303 column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
304 semantic_type: SemanticType::Field,
305 column_id: 1,
306 })
307 .push_column_metadata(ColumnMetadata {
308 column_schema: ColumnSchema::new(
309 "ts",
310 ConcreteDataType::timestamp_millisecond_datatype(),
311 false,
312 ),
313 semantic_type: SemanticType::Timestamp,
314 column_id: 2,
315 });
316
317 builder.build().unwrap()
318 }
319
320 fn matches_func() -> Arc<ScalarUDF> {
321 Arc::new(
322 ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
323 .provide(Default::default()),
324 )
325 }
326
327 fn matches_term_func() -> Arc<ScalarUDF> {
328 Arc::new(
329 ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
330 .provide(Default::default()),
331 )
332 }
333
334 #[test]
335 fn test_expr_to_query_basic() {
336 let metadata = mock_metadata();
337
338 let func = ScalarFunction {
339 args: vec![
340 Expr::Column(Column::from_name("text")),
341 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
342 ],
343 func: matches_func(),
344 };
345
346 let (column_id, query) =
347 FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
348 assert_eq!(column_id, 1);
349 assert_eq!(query, FulltextQuery("foo".to_string()));
350 }
351
352 #[test]
353 fn test_expr_to_query_wrong_num_args() {
354 let metadata = mock_metadata();
355
356 let func = ScalarFunction {
357 args: vec![Expr::Column(Column::from_name("text"))],
358 func: matches_func(),
359 };
360
361 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
362 }
363
364 #[test]
365 fn test_expr_to_query_not_found_column() {
366 let metadata = mock_metadata();
367
368 let func = ScalarFunction {
369 args: vec![
370 Expr::Column(Column::from_name("not_found")),
371 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
372 ],
373 func: matches_func(),
374 };
375
376 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
377 }
378
379 #[test]
380 fn test_expr_to_query_column_wrong_data_type() {
381 let metadata = mock_metadata();
382
383 let func = ScalarFunction {
384 args: vec![
385 Expr::Column(Column::from_name("ts")),
386 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
387 ],
388 func: matches_func(),
389 };
390
391 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
392 }
393
394 #[test]
395 fn test_expr_to_query_pattern_not_string() {
396 let metadata = mock_metadata();
397
398 let func = ScalarFunction {
399 args: vec![
400 Expr::Column(Column::from_name("text")),
401 Expr::Literal(ScalarValue::Int64(Some(42))),
402 ],
403 func: matches_func(),
404 };
405
406 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
407 }
408
409 #[test]
410 fn test_expr_to_term_basic() {
411 let metadata = mock_metadata();
412
413 let func = ScalarFunction {
414 args: vec![
415 Expr::Column(Column::from_name("text")),
416 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
417 ],
418 func: matches_term_func(),
419 };
420
421 let (column_id, term) =
422 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
423 assert_eq!(column_id, 1);
424 assert_eq!(
425 term,
426 FulltextTerm {
427 col_lowered: false,
428 term: "foo".to_string(),
429 }
430 );
431 }
432
433 #[test]
434 fn test_expr_to_term_with_lower() {
435 let metadata = mock_metadata();
436
437 let lower_func_expr = ScalarFunction {
438 args: vec![Expr::Column(Column::from_name("text"))],
439 func: lower(),
440 };
441
442 let func = ScalarFunction {
443 args: vec![
444 Expr::ScalarFunction(lower_func_expr),
445 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
446 ],
447 func: matches_term_func(),
448 };
449
450 let (column_id, term) =
451 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
452 assert_eq!(column_id, 1);
453 assert_eq!(
454 term,
455 FulltextTerm {
456 col_lowered: true,
457 term: "foo".to_string(),
458 }
459 );
460 }
461
462 #[test]
463 fn test_expr_to_term_wrong_num_args() {
464 let metadata = mock_metadata();
465
466 let func = ScalarFunction {
467 args: vec![Expr::Column(Column::from_name("text"))],
468 func: matches_term_func(),
469 };
470
471 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
472 }
473
474 #[test]
475 fn test_expr_to_term_wrong_function_name() {
476 let metadata = mock_metadata();
477
478 let func = ScalarFunction {
479 args: vec![
480 Expr::Column(Column::from_name("text")),
481 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
482 ],
483 func: matches_func(), };
485
486 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
487 }
488
489 #[test]
490 fn test_extract_lower_arg() {
491 let func = ScalarFunction {
492 args: vec![Expr::Column(Column::from_name("text"))],
493 func: lower(),
494 };
495
496 let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
497 match arg {
498 Expr::Column(c) => {
499 assert_eq!(c.name, "text");
500 }
501 _ => panic!("Expected Column expression"),
502 }
503 }
504
505 #[test]
506 fn test_extract_lower_arg_wrong_function() {
507 let func = ScalarFunction {
508 args: vec![Expr::Column(Column::from_name("text"))],
509 func: matches_func(), };
511
512 assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
513 }
514
515 #[test]
516 fn test_extract_requests() {
517 let metadata = mock_metadata();
518
519 let matches_expr = Expr::ScalarFunction(ScalarFunction {
521 args: vec![
522 Expr::Column(Column::from_name("text")),
523 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
524 ],
525 func: matches_func(),
526 });
527
528 let mut requests = BTreeMap::new();
529 FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
530
531 assert_eq!(requests.len(), 1);
532 let request = requests.get(&1).unwrap();
533 assert_eq!(request.queries.len(), 1);
534 assert_eq!(request.terms.len(), 0);
535 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
536 }
537
538 #[test]
539 fn test_extract_multiple_requests() {
540 let metadata = mock_metadata();
541
542 let matches_expr = Expr::ScalarFunction(ScalarFunction {
544 args: vec![
545 Expr::Column(Column::from_name("text")),
546 Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
547 ],
548 func: matches_func(),
549 });
550
551 let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
553 args: vec![
554 Expr::Column(Column::from_name("text")),
555 Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
556 ],
557 func: matches_term_func(),
558 });
559
560 let binary_expr = Expr::BinaryExpr(BinaryExpr {
562 left: Box::new(matches_expr),
563 op: Operator::And,
564 right: Box::new(matches_term_expr),
565 });
566
567 let mut requests = BTreeMap::new();
568 FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
569
570 assert_eq!(requests.len(), 1);
571 let request = requests.get(&1).unwrap();
572 assert_eq!(request.queries.len(), 1);
573 assert_eq!(request.terms.len(), 1);
574 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
575 assert_eq!(
576 request.terms[0],
577 FulltextTerm {
578 col_lowered: false,
579 term: "bar".to_string(),
580 }
581 );
582 }
583
584 #[test]
585 fn test_terms_as_query() {
586 let request = FulltextRequest::default();
588 assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
589 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
590
591 let mut request = FulltextRequest::default();
593 request.terms.push(FulltextTerm {
594 col_lowered: false,
595 term: "foo".to_string(),
596 });
597 assert_eq!(
598 request.terms_as_query(false),
599 FulltextQuery("+\"foo\"".to_string())
600 );
601 assert_eq!(
602 request.terms_as_query(true),
603 FulltextQuery("+\"foo\"".to_string())
604 );
605
606 let mut request = FulltextRequest::default();
608 request.terms.push(FulltextTerm {
609 col_lowered: true,
610 term: "foo".to_string(),
611 });
612 assert_eq!(
613 request.terms_as_query(false),
614 FulltextQuery("+\"foo\"".to_string())
615 );
616 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); let mut request = FulltextRequest::default();
620 request.terms.push(FulltextTerm {
621 col_lowered: false,
622 term: "foo".to_string(),
623 });
624 request.terms.push(FulltextTerm {
625 col_lowered: true,
626 term: "bar".to_string(),
627 });
628 assert_eq!(
629 request.terms_as_query(false),
630 FulltextQuery("+\"foo\" +\"bar\"".to_string())
631 );
632 assert_eq!(
633 request.terms_as_query(true),
634 FulltextQuery("+\"foo\"".to_string()) );
636
637 let mut request = FulltextRequest::default();
639 request.terms.push(FulltextTerm {
640 col_lowered: false,
641 term: "foo\"bar".to_string(),
642 });
643 assert_eq!(
644 request.terms_as_query(false),
645 FulltextQuery("+\"foo\\\"bar\"".to_string())
646 );
647
648 let mut request = FulltextRequest::default();
650 request.terms.push(FulltextTerm {
651 col_lowered: false,
652 term: "foo".to_string(),
653 });
654 request.terms.push(FulltextTerm {
655 col_lowered: true,
656 term: "bar\"quoted\"".to_string(),
657 });
658 request.terms.push(FulltextTerm {
659 col_lowered: false,
660 term: "baz\\escape".to_string(),
661 });
662 assert_eq!(
663 request.terms_as_query(false),
664 FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
665 );
666 assert_eq!(
667 request.terms_as_query(true),
668 FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) );
670 }
671}