mito2/sst/index/fulltext_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::region_request::PathType;
24use store_api::storage::{ColumnId, ConcreteDataType};
25
26use crate::cache::file_cache::FileCacheRef;
27use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
28use crate::error::Result;
29use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
30use crate::sst::index::puffin_manager::PuffinManagerFactory;
31
32/// A request for fulltext index.
33///
34/// It contains all the queries and terms for a column.
35#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
36pub struct FulltextRequest {
37    pub queries: Vec<FulltextQuery>,
38    pub terms: Vec<FulltextTerm>,
39}
40
41impl FulltextRequest {
42    /// Convert terms to a query string.
43    ///
44    /// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`.
45    /// Need to escape the `"` in the term.
46    ///
47    /// `skip_lowercased` is used for the situation that lowercased terms are not indexed.
48    pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
49        let mut query = String::new();
50        for term in &self.terms {
51            if skip_lowercased && term.col_lowered {
52                continue;
53            }
54            // Escape the `"` in the term.
55            let escaped_term = term.term.replace("\"", "\\\"");
56            if query.is_empty() {
57                query = format!("+\"{escaped_term}\"");
58            } else {
59                query.push_str(&format!(" +\"{escaped_term}\""));
60            }
61        }
62        FulltextQuery(query)
63    }
64}
65
66/// A query to be matched in fulltext index.
67///
68/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70pub struct FulltextQuery(pub String);
71
72/// A term to be matched in fulltext index.
73///
74/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
75/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77pub struct FulltextTerm {
78    pub col_lowered: bool,
79    pub term: String,
80}
81
82/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
83pub struct FulltextIndexApplierBuilder<'a> {
84    table_dir: String,
85    path_type: PathType,
86    store: ObjectStore,
87    puffin_manager_factory: PuffinManagerFactory,
88    metadata: &'a RegionMetadata,
89    file_cache: Option<FileCacheRef>,
90    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
91    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
92}
93
94impl<'a> FulltextIndexApplierBuilder<'a> {
95    /// Creates a new `FulltextIndexApplierBuilder`.
96    pub fn new(
97        table_dir: String,
98        path_type: PathType,
99        store: ObjectStore,
100        puffin_manager_factory: PuffinManagerFactory,
101        metadata: &'a RegionMetadata,
102    ) -> Self {
103        Self {
104            table_dir,
105            path_type,
106            store,
107            puffin_manager_factory,
108            metadata,
109            file_cache: None,
110            puffin_metadata_cache: None,
111            bloom_filter_cache: None,
112        }
113    }
114
115    /// Sets the file cache to be used by the `FulltextIndexApplier`.
116    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
117        self.file_cache = file_cache;
118        self
119    }
120
121    /// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
122    pub fn with_puffin_metadata_cache(
123        mut self,
124        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
125    ) -> Self {
126        self.puffin_metadata_cache = puffin_metadata_cache;
127        self
128    }
129
130    /// Sets the bloom filter cache to be used by the `FulltextIndexApplier`.
131    pub fn with_bloom_filter_cache(
132        mut self,
133        bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
134    ) -> Self {
135        self.bloom_filter_cache = bloom_filter_cache;
136        self
137    }
138
139    /// Builds `SstIndexApplier` from the given expressions.
140    pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
141        let mut requests = BTreeMap::new();
142        for expr in exprs {
143            Self::extract_requests(expr, self.metadata, &mut requests);
144        }
145
146        // Check if any requests have queries or terms
147        let has_requests = requests
148            .iter()
149            .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
150
151        Ok(has_requests.then(|| {
152            FulltextIndexApplier::new(
153                self.table_dir,
154                self.path_type,
155                self.store,
156                requests,
157                self.puffin_manager_factory,
158            )
159            .with_file_cache(self.file_cache)
160            .with_puffin_metadata_cache(self.puffin_metadata_cache)
161            .with_bloom_filter_cache(self.bloom_filter_cache)
162        }))
163    }
164
165    fn extract_requests(
166        expr: &Expr,
167        metadata: &'a RegionMetadata,
168        requests: &mut BTreeMap<ColumnId, FulltextRequest>,
169    ) {
170        match expr {
171            Expr::BinaryExpr(BinaryExpr {
172                left,
173                op: Operator::And,
174                right,
175            }) => {
176                Self::extract_requests(left, metadata, requests);
177                Self::extract_requests(right, metadata, requests);
178            }
179            Expr::ScalarFunction(func) => {
180                if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
181                    requests.entry(column_id).or_default().queries.push(query);
182                } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
183                    requests.entry(column_id).or_default().terms.push(term);
184                }
185            }
186            _ => {}
187        }
188    }
189
190    fn expr_to_query(
191        metadata: &RegionMetadata,
192        f: &ScalarFunction,
193    ) -> Option<(ColumnId, FulltextQuery)> {
194        if f.name() != "matches" {
195            return None;
196        }
197        if f.args.len() != 2 {
198            return None;
199        }
200
201        let Expr::Column(c) = &f.args[0] else {
202            return None;
203        };
204        let column = metadata.column_by_name(&c.name)?;
205
206        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
207            return None;
208        }
209
210        let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
211            return None;
212        };
213
214        Some((column.column_id, FulltextQuery(query.to_string())))
215    }
216
217    fn expr_to_term(
218        metadata: &RegionMetadata,
219        f: &ScalarFunction,
220    ) -> Option<(ColumnId, FulltextTerm)> {
221        if f.name() != "matches_term" {
222            return None;
223        }
224        if f.args.len() != 2 {
225            return None;
226        }
227
228        let mut lowered = false;
229        let column;
230        match &f.args[0] {
231            Expr::Column(c) => {
232                column = c;
233            }
234            Expr::ScalarFunction(f) => {
235                let lower_arg = Self::extract_lower_arg(f)?;
236                lowered = true;
237                if let Expr::Column(c) = lower_arg {
238                    column = c;
239                } else {
240                    return None;
241                }
242            }
243            _ => return None,
244        }
245
246        let column = metadata.column_by_name(&column.name)?;
247        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
248            return None;
249        }
250
251        let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
252            return None;
253        };
254
255        Some((
256            column.column_id,
257            FulltextTerm {
258                col_lowered: lowered,
259                term: term.to_string(),
260            },
261        ))
262    }
263
264    fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
265        if lower_func.args.len() != 1 {
266            return None;
267        }
268
269        if lower_func.name() != "lower" {
270            return None;
271        }
272
273        if lower_func.args.len() != 1 {
274            return None;
275        }
276
277        Some(&lower_func.args[0])
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use std::sync::Arc;
284
285    use api::v1::SemanticType;
286    use common_function::function::FunctionRef;
287    use common_function::function_factory::ScalarFunctionFactory;
288    use common_function::scalars::matches::MatchesFunction;
289    use common_function::scalars::matches_term::MatchesTermFunction;
290    use datafusion::functions::string::lower;
291    use datafusion_common::Column;
292    use datafusion_expr::expr::ScalarFunction;
293    use datafusion_expr::ScalarUDF;
294    use datatypes::schema::ColumnSchema;
295    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
296    use store_api::storage::RegionId;
297
298    use super::*;
299
300    fn mock_metadata() -> RegionMetadata {
301        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
302        builder
303            .push_column_metadata(ColumnMetadata {
304                column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
305                semantic_type: SemanticType::Field,
306                column_id: 1,
307            })
308            .push_column_metadata(ColumnMetadata {
309                column_schema: ColumnSchema::new(
310                    "ts",
311                    ConcreteDataType::timestamp_millisecond_datatype(),
312                    false,
313                ),
314                semantic_type: SemanticType::Timestamp,
315                column_id: 2,
316            });
317
318        builder.build().unwrap()
319    }
320
321    fn matches_func() -> Arc<ScalarUDF> {
322        Arc::new(
323            ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
324                .provide(Default::default()),
325        )
326    }
327
328    fn matches_term_func() -> Arc<ScalarUDF> {
329        Arc::new(
330            ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
331                .provide(Default::default()),
332        )
333    }
334
335    #[test]
336    fn test_expr_to_query_basic() {
337        let metadata = mock_metadata();
338
339        let func = ScalarFunction {
340            args: vec![
341                Expr::Column(Column::from_name("text")),
342                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
343            ],
344            func: matches_func(),
345        };
346
347        let (column_id, query) =
348            FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
349        assert_eq!(column_id, 1);
350        assert_eq!(query, FulltextQuery("foo".to_string()));
351    }
352
353    #[test]
354    fn test_expr_to_query_wrong_num_args() {
355        let metadata = mock_metadata();
356
357        let func = ScalarFunction {
358            args: vec![Expr::Column(Column::from_name("text"))],
359            func: matches_func(),
360        };
361
362        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
363    }
364
365    #[test]
366    fn test_expr_to_query_not_found_column() {
367        let metadata = mock_metadata();
368
369        let func = ScalarFunction {
370            args: vec![
371                Expr::Column(Column::from_name("not_found")),
372                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
373            ],
374            func: matches_func(),
375        };
376
377        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
378    }
379
380    #[test]
381    fn test_expr_to_query_column_wrong_data_type() {
382        let metadata = mock_metadata();
383
384        let func = ScalarFunction {
385            args: vec![
386                Expr::Column(Column::from_name("ts")),
387                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
388            ],
389            func: matches_func(),
390        };
391
392        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
393    }
394
395    #[test]
396    fn test_expr_to_query_pattern_not_string() {
397        let metadata = mock_metadata();
398
399        let func = ScalarFunction {
400            args: vec![
401                Expr::Column(Column::from_name("text")),
402                Expr::Literal(ScalarValue::Int64(Some(42))),
403            ],
404            func: matches_func(),
405        };
406
407        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
408    }
409
410    #[test]
411    fn test_expr_to_term_basic() {
412        let metadata = mock_metadata();
413
414        let func = ScalarFunction {
415            args: vec![
416                Expr::Column(Column::from_name("text")),
417                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
418            ],
419            func: matches_term_func(),
420        };
421
422        let (column_id, term) =
423            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
424        assert_eq!(column_id, 1);
425        assert_eq!(
426            term,
427            FulltextTerm {
428                col_lowered: false,
429                term: "foo".to_string(),
430            }
431        );
432    }
433
434    #[test]
435    fn test_expr_to_term_with_lower() {
436        let metadata = mock_metadata();
437
438        let lower_func_expr = ScalarFunction {
439            args: vec![Expr::Column(Column::from_name("text"))],
440            func: lower(),
441        };
442
443        let func = ScalarFunction {
444            args: vec![
445                Expr::ScalarFunction(lower_func_expr),
446                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
447            ],
448            func: matches_term_func(),
449        };
450
451        let (column_id, term) =
452            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
453        assert_eq!(column_id, 1);
454        assert_eq!(
455            term,
456            FulltextTerm {
457                col_lowered: true,
458                term: "foo".to_string(),
459            }
460        );
461    }
462
463    #[test]
464    fn test_expr_to_term_wrong_num_args() {
465        let metadata = mock_metadata();
466
467        let func = ScalarFunction {
468            args: vec![Expr::Column(Column::from_name("text"))],
469            func: matches_term_func(),
470        };
471
472        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
473    }
474
475    #[test]
476    fn test_expr_to_term_wrong_function_name() {
477        let metadata = mock_metadata();
478
479        let func = ScalarFunction {
480            args: vec![
481                Expr::Column(Column::from_name("text")),
482                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
483            ],
484            func: matches_func(), // Using 'matches' instead of 'matches_term'
485        };
486
487        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
488    }
489
490    #[test]
491    fn test_extract_lower_arg() {
492        let func = ScalarFunction {
493            args: vec![Expr::Column(Column::from_name("text"))],
494            func: lower(),
495        };
496
497        let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
498        match arg {
499            Expr::Column(c) => {
500                assert_eq!(c.name, "text");
501            }
502            _ => panic!("Expected Column expression"),
503        }
504    }
505
506    #[test]
507    fn test_extract_lower_arg_wrong_function() {
508        let func = ScalarFunction {
509            args: vec![Expr::Column(Column::from_name("text"))],
510            func: matches_func(), // Not 'lower'
511        };
512
513        assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
514    }
515
516    #[test]
517    fn test_extract_requests() {
518        let metadata = mock_metadata();
519
520        // Create a matches expression
521        let matches_expr = Expr::ScalarFunction(ScalarFunction {
522            args: vec![
523                Expr::Column(Column::from_name("text")),
524                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
525            ],
526            func: matches_func(),
527        });
528
529        let mut requests = BTreeMap::new();
530        FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
531
532        assert_eq!(requests.len(), 1);
533        let request = requests.get(&1).unwrap();
534        assert_eq!(request.queries.len(), 1);
535        assert_eq!(request.terms.len(), 0);
536        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
537    }
538
539    #[test]
540    fn test_extract_multiple_requests() {
541        let metadata = mock_metadata();
542
543        // Create a matches expression
544        let matches_expr = Expr::ScalarFunction(ScalarFunction {
545            args: vec![
546                Expr::Column(Column::from_name("text")),
547                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
548            ],
549            func: matches_func(),
550        });
551
552        // Create a matches_term expression
553        let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
554            args: vec![
555                Expr::Column(Column::from_name("text")),
556                Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
557            ],
558            func: matches_term_func(),
559        });
560
561        // Create a binary expression combining both
562        let binary_expr = Expr::BinaryExpr(BinaryExpr {
563            left: Box::new(matches_expr),
564            op: Operator::And,
565            right: Box::new(matches_term_expr),
566        });
567
568        let mut requests = BTreeMap::new();
569        FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
570
571        assert_eq!(requests.len(), 1);
572        let request = requests.get(&1).unwrap();
573        assert_eq!(request.queries.len(), 1);
574        assert_eq!(request.terms.len(), 1);
575        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
576        assert_eq!(
577            request.terms[0],
578            FulltextTerm {
579                col_lowered: false,
580                term: "bar".to_string(),
581            }
582        );
583    }
584
585    #[test]
586    fn test_terms_as_query() {
587        // Test with empty terms
588        let request = FulltextRequest::default();
589        assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
590        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
591
592        // Test with a single term (not lowercased)
593        let mut request = FulltextRequest::default();
594        request.terms.push(FulltextTerm {
595            col_lowered: false,
596            term: "foo".to_string(),
597        });
598        assert_eq!(
599            request.terms_as_query(false),
600            FulltextQuery("+\"foo\"".to_string())
601        );
602        assert_eq!(
603            request.terms_as_query(true),
604            FulltextQuery("+\"foo\"".to_string())
605        );
606
607        // Test with a single lowercased term and skip_lowercased=true
608        let mut request = FulltextRequest::default();
609        request.terms.push(FulltextTerm {
610            col_lowered: true,
611            term: "foo".to_string(),
612        });
613        assert_eq!(
614            request.terms_as_query(false),
615            FulltextQuery("+\"foo\"".to_string())
616        );
617        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term
618
619        // Test with multiple terms, mix of lowercased and not
620        let mut request = FulltextRequest::default();
621        request.terms.push(FulltextTerm {
622            col_lowered: false,
623            term: "foo".to_string(),
624        });
625        request.terms.push(FulltextTerm {
626            col_lowered: true,
627            term: "bar".to_string(),
628        });
629        assert_eq!(
630            request.terms_as_query(false),
631            FulltextQuery("+\"foo\" +\"bar\"".to_string())
632        );
633        assert_eq!(
634            request.terms_as_query(true),
635            FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term
636        );
637
638        // Test with term containing quotes that need escaping
639        let mut request = FulltextRequest::default();
640        request.terms.push(FulltextTerm {
641            col_lowered: false,
642            term: "foo\"bar".to_string(),
643        });
644        assert_eq!(
645            request.terms_as_query(false),
646            FulltextQuery("+\"foo\\\"bar\"".to_string())
647        );
648
649        // Test with a complex mix of terms
650        let mut request = FulltextRequest::default();
651        request.terms.push(FulltextTerm {
652            col_lowered: false,
653            term: "foo".to_string(),
654        });
655        request.terms.push(FulltextTerm {
656            col_lowered: true,
657            term: "bar\"quoted\"".to_string(),
658        });
659        request.terms.push(FulltextTerm {
660            col_lowered: false,
661            term: "baz\\escape".to_string(),
662        });
663        assert_eq!(
664            request.terms_as_query(false),
665            FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
666        );
667        assert_eq!(
668            request.terms_as_query(true),
669            FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term
670        );
671    }
672}