1use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::region_request::PathType;
24use store_api::storage::{ColumnId, ConcreteDataType};
25
26use crate::cache::file_cache::FileCacheRef;
27use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
28use crate::error::Result;
29use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
30use crate::sst::index::puffin_manager::PuffinManagerFactory;
31
32#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
36pub struct FulltextRequest {
37 pub queries: Vec<FulltextQuery>,
38 pub terms: Vec<FulltextTerm>,
39}
40
41impl FulltextRequest {
42 pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
49 let mut query = String::new();
50 for term in &self.terms {
51 if skip_lowercased && term.col_lowered {
52 continue;
53 }
54 let escaped_term = term.term.replace("\"", "\\\"");
56 if query.is_empty() {
57 query = format!("+\"{escaped_term}\"");
58 } else {
59 query.push_str(&format!(" +\"{escaped_term}\""));
60 }
61 }
62 FulltextQuery(query)
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70pub struct FulltextQuery(pub String);
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77pub struct FulltextTerm {
78 pub col_lowered: bool,
79 pub term: String,
80}
81
82pub struct FulltextIndexApplierBuilder<'a> {
84 table_dir: String,
85 path_type: PathType,
86 store: ObjectStore,
87 puffin_manager_factory: PuffinManagerFactory,
88 metadata: &'a RegionMetadata,
89 file_cache: Option<FileCacheRef>,
90 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
91 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
92}
93
94impl<'a> FulltextIndexApplierBuilder<'a> {
95 pub fn new(
97 table_dir: String,
98 path_type: PathType,
99 store: ObjectStore,
100 puffin_manager_factory: PuffinManagerFactory,
101 metadata: &'a RegionMetadata,
102 ) -> Self {
103 Self {
104 table_dir,
105 path_type,
106 store,
107 puffin_manager_factory,
108 metadata,
109 file_cache: None,
110 puffin_metadata_cache: None,
111 bloom_filter_cache: None,
112 }
113 }
114
115 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
117 self.file_cache = file_cache;
118 self
119 }
120
121 pub fn with_puffin_metadata_cache(
123 mut self,
124 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
125 ) -> Self {
126 self.puffin_metadata_cache = puffin_metadata_cache;
127 self
128 }
129
130 pub fn with_bloom_filter_cache(
132 mut self,
133 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
134 ) -> Self {
135 self.bloom_filter_cache = bloom_filter_cache;
136 self
137 }
138
139 pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
141 let mut requests = BTreeMap::new();
142 for expr in exprs {
143 Self::extract_requests(expr, self.metadata, &mut requests);
144 }
145
146 let has_requests = requests
148 .iter()
149 .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
150
151 Ok(has_requests.then(|| {
152 FulltextIndexApplier::new(
153 self.table_dir,
154 self.path_type,
155 self.store,
156 requests,
157 self.puffin_manager_factory,
158 )
159 .with_file_cache(self.file_cache)
160 .with_puffin_metadata_cache(self.puffin_metadata_cache)
161 .with_bloom_filter_cache(self.bloom_filter_cache)
162 }))
163 }
164
165 fn extract_requests(
166 expr: &Expr,
167 metadata: &'a RegionMetadata,
168 requests: &mut BTreeMap<ColumnId, FulltextRequest>,
169 ) {
170 match expr {
171 Expr::BinaryExpr(BinaryExpr {
172 left,
173 op: Operator::And,
174 right,
175 }) => {
176 Self::extract_requests(left, metadata, requests);
177 Self::extract_requests(right, metadata, requests);
178 }
179 Expr::ScalarFunction(func) => {
180 if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
181 requests.entry(column_id).or_default().queries.push(query);
182 } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
183 requests.entry(column_id).or_default().terms.push(term);
184 }
185 }
186 _ => {}
187 }
188 }
189
190 fn expr_to_query(
191 metadata: &RegionMetadata,
192 f: &ScalarFunction,
193 ) -> Option<(ColumnId, FulltextQuery)> {
194 if f.name() != "matches" {
195 return None;
196 }
197 if f.args.len() != 2 {
198 return None;
199 }
200
201 let Expr::Column(c) = &f.args[0] else {
202 return None;
203 };
204 let column = metadata.column_by_name(&c.name)?;
205
206 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
207 return None;
208 }
209
210 let Expr::Literal(ScalarValue::Utf8(Some(query)), _) = &f.args[1] else {
211 return None;
212 };
213
214 Some((column.column_id, FulltextQuery(query.to_string())))
215 }
216
217 fn expr_to_term(
218 metadata: &RegionMetadata,
219 f: &ScalarFunction,
220 ) -> Option<(ColumnId, FulltextTerm)> {
221 if f.name() != "matches_term" {
222 return None;
223 }
224 if f.args.len() != 2 {
225 return None;
226 }
227
228 let mut lowered = false;
229 let column;
230 match &f.args[0] {
231 Expr::Column(c) => {
232 column = c;
233 }
234 Expr::ScalarFunction(f) => {
235 let lower_arg = Self::extract_lower_arg(f)?;
236 lowered = true;
237 if let Expr::Column(c) = lower_arg {
238 column = c;
239 } else {
240 return None;
241 }
242 }
243 _ => return None,
244 }
245
246 let column = metadata.column_by_name(&column.name)?;
247 if column.column_schema.data_type != ConcreteDataType::string_datatype() {
248 return None;
249 }
250
251 let Expr::Literal(ScalarValue::Utf8(Some(term)), _) = &f.args[1] else {
252 return None;
253 };
254
255 Some((
256 column.column_id,
257 FulltextTerm {
258 col_lowered: lowered,
259 term: term.to_string(),
260 },
261 ))
262 }
263
264 fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
265 if lower_func.args.len() != 1 {
266 return None;
267 }
268
269 if lower_func.name() != "lower" {
270 return None;
271 }
272
273 if lower_func.args.len() != 1 {
274 return None;
275 }
276
277 Some(&lower_func.args[0])
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use std::sync::Arc;
284
285 use api::v1::SemanticType;
286 use common_function::function::FunctionRef;
287 use common_function::function_factory::ScalarFunctionFactory;
288 use common_function::scalars::matches::MatchesFunction;
289 use common_function::scalars::matches_term::MatchesTermFunction;
290 use datafusion::functions::string::lower;
291 use datafusion_common::Column;
292 use datafusion_expr::expr::ScalarFunction;
293 use datafusion_expr::{Literal, ScalarUDF};
294 use datatypes::schema::ColumnSchema;
295 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
296 use store_api::storage::RegionId;
297
298 use super::*;
299
300 fn mock_metadata() -> RegionMetadata {
301 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
302 builder
303 .push_column_metadata(ColumnMetadata {
304 column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
305 semantic_type: SemanticType::Field,
306 column_id: 1,
307 })
308 .push_column_metadata(ColumnMetadata {
309 column_schema: ColumnSchema::new(
310 "ts",
311 ConcreteDataType::timestamp_millisecond_datatype(),
312 false,
313 ),
314 semantic_type: SemanticType::Timestamp,
315 column_id: 2,
316 });
317
318 builder.build().unwrap()
319 }
320
321 fn matches_func() -> Arc<ScalarUDF> {
322 Arc::new(
323 ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
324 .provide(Default::default()),
325 )
326 }
327
328 fn matches_term_func() -> Arc<ScalarUDF> {
329 Arc::new(
330 ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
331 .provide(Default::default()),
332 )
333 }
334
335 #[test]
336 fn test_expr_to_query_basic() {
337 let metadata = mock_metadata();
338
339 let func = ScalarFunction {
340 args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
341 func: matches_func(),
342 };
343
344 let (column_id, query) =
345 FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
346 assert_eq!(column_id, 1);
347 assert_eq!(query, FulltextQuery("foo".to_string()));
348 }
349
350 #[test]
351 fn test_expr_to_query_wrong_num_args() {
352 let metadata = mock_metadata();
353
354 let func = ScalarFunction {
355 args: vec![Expr::Column(Column::from_name("text"))],
356 func: matches_func(),
357 };
358
359 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
360 }
361
362 #[test]
363 fn test_expr_to_query_not_found_column() {
364 let metadata = mock_metadata();
365
366 let func = ScalarFunction {
367 args: vec![Expr::Column(Column::from_name("not_found")), "foo".lit()],
368 func: matches_func(),
369 };
370
371 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
372 }
373
374 #[test]
375 fn test_expr_to_query_column_wrong_data_type() {
376 let metadata = mock_metadata();
377
378 let func = ScalarFunction {
379 args: vec![Expr::Column(Column::from_name("ts")), "foo".lit()],
380 func: matches_func(),
381 };
382
383 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
384 }
385
386 #[test]
387 fn test_expr_to_query_pattern_not_string() {
388 let metadata = mock_metadata();
389
390 let func = ScalarFunction {
391 args: vec![Expr::Column(Column::from_name("text")), 42.lit()],
392 func: matches_func(),
393 };
394
395 assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
396 }
397
398 #[test]
399 fn test_expr_to_term_basic() {
400 let metadata = mock_metadata();
401
402 let func = ScalarFunction {
403 args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
404 func: matches_term_func(),
405 };
406
407 let (column_id, term) =
408 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
409 assert_eq!(column_id, 1);
410 assert_eq!(
411 term,
412 FulltextTerm {
413 col_lowered: false,
414 term: "foo".to_string(),
415 }
416 );
417 }
418
419 #[test]
420 fn test_expr_to_term_with_lower() {
421 let metadata = mock_metadata();
422
423 let lower_func_expr = ScalarFunction {
424 args: vec![Expr::Column(Column::from_name("text"))],
425 func: lower(),
426 };
427
428 let func = ScalarFunction {
429 args: vec![Expr::ScalarFunction(lower_func_expr), "foo".lit()],
430 func: matches_term_func(),
431 };
432
433 let (column_id, term) =
434 FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
435 assert_eq!(column_id, 1);
436 assert_eq!(
437 term,
438 FulltextTerm {
439 col_lowered: true,
440 term: "foo".to_string(),
441 }
442 );
443 }
444
445 #[test]
446 fn test_expr_to_term_wrong_num_args() {
447 let metadata = mock_metadata();
448
449 let func = ScalarFunction {
450 args: vec![Expr::Column(Column::from_name("text"))],
451 func: matches_term_func(),
452 };
453
454 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
455 }
456
457 #[test]
458 fn test_expr_to_term_wrong_function_name() {
459 let metadata = mock_metadata();
460
461 let func = ScalarFunction {
462 args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
463 func: matches_func(), };
465
466 assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
467 }
468
469 #[test]
470 fn test_extract_lower_arg() {
471 let func = ScalarFunction {
472 args: vec![Expr::Column(Column::from_name("text"))],
473 func: lower(),
474 };
475
476 let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
477 match arg {
478 Expr::Column(c) => {
479 assert_eq!(c.name, "text");
480 }
481 _ => panic!("Expected Column expression"),
482 }
483 }
484
485 #[test]
486 fn test_extract_lower_arg_wrong_function() {
487 let func = ScalarFunction {
488 args: vec![Expr::Column(Column::from_name("text"))],
489 func: matches_func(), };
491
492 assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
493 }
494
495 #[test]
496 fn test_extract_requests() {
497 let metadata = mock_metadata();
498
499 let matches_expr = Expr::ScalarFunction(ScalarFunction {
501 args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
502 func: matches_func(),
503 });
504
505 let mut requests = BTreeMap::new();
506 FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
507
508 assert_eq!(requests.len(), 1);
509 let request = requests.get(&1).unwrap();
510 assert_eq!(request.queries.len(), 1);
511 assert_eq!(request.terms.len(), 0);
512 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
513 }
514
515 #[test]
516 fn test_extract_multiple_requests() {
517 let metadata = mock_metadata();
518
519 let matches_expr = Expr::ScalarFunction(ScalarFunction {
521 args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
522 func: matches_func(),
523 });
524
525 let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
527 args: vec![Expr::Column(Column::from_name("text")), "bar".lit()],
528 func: matches_term_func(),
529 });
530
531 let binary_expr = Expr::BinaryExpr(BinaryExpr {
533 left: Box::new(matches_expr),
534 op: Operator::And,
535 right: Box::new(matches_term_expr),
536 });
537
538 let mut requests = BTreeMap::new();
539 FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
540
541 assert_eq!(requests.len(), 1);
542 let request = requests.get(&1).unwrap();
543 assert_eq!(request.queries.len(), 1);
544 assert_eq!(request.terms.len(), 1);
545 assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
546 assert_eq!(
547 request.terms[0],
548 FulltextTerm {
549 col_lowered: false,
550 term: "bar".to_string(),
551 }
552 );
553 }
554
555 #[test]
556 fn test_terms_as_query() {
557 let request = FulltextRequest::default();
559 assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
560 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
561
562 let mut request = FulltextRequest::default();
564 request.terms.push(FulltextTerm {
565 col_lowered: false,
566 term: "foo".to_string(),
567 });
568 assert_eq!(
569 request.terms_as_query(false),
570 FulltextQuery("+\"foo\"".to_string())
571 );
572 assert_eq!(
573 request.terms_as_query(true),
574 FulltextQuery("+\"foo\"".to_string())
575 );
576
577 let mut request = FulltextRequest::default();
579 request.terms.push(FulltextTerm {
580 col_lowered: true,
581 term: "foo".to_string(),
582 });
583 assert_eq!(
584 request.terms_as_query(false),
585 FulltextQuery("+\"foo\"".to_string())
586 );
587 assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); let mut request = FulltextRequest::default();
591 request.terms.push(FulltextTerm {
592 col_lowered: false,
593 term: "foo".to_string(),
594 });
595 request.terms.push(FulltextTerm {
596 col_lowered: true,
597 term: "bar".to_string(),
598 });
599 assert_eq!(
600 request.terms_as_query(false),
601 FulltextQuery("+\"foo\" +\"bar\"".to_string())
602 );
603 assert_eq!(
604 request.terms_as_query(true),
605 FulltextQuery("+\"foo\"".to_string()) );
607
608 let mut request = FulltextRequest::default();
610 request.terms.push(FulltextTerm {
611 col_lowered: false,
612 term: "foo\"bar".to_string(),
613 });
614 assert_eq!(
615 request.terms_as_query(false),
616 FulltextQuery("+\"foo\\\"bar\"".to_string())
617 );
618
619 let mut request = FulltextRequest::default();
621 request.terms.push(FulltextTerm {
622 col_lowered: false,
623 term: "foo".to_string(),
624 });
625 request.terms.push(FulltextTerm {
626 col_lowered: true,
627 term: "bar\"quoted\"".to_string(),
628 });
629 request.terms.push(FulltextTerm {
630 col_lowered: false,
631 term: "baz\\escape".to_string(),
632 });
633 assert_eq!(
634 request.terms_as_query(false),
635 FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
636 );
637 assert_eq!(
638 request.terms_as_query(true),
639 FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) );
641 }
642}