mito2/sst/index/fulltext_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
24
25use crate::cache::file_cache::FileCacheRef;
26use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
27use crate::error::Result;
28use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
29use crate::sst::index::puffin_manager::PuffinManagerFactory;
30
31/// A request for fulltext index.
32///
33/// It contains all the queries and terms for a column.
34#[derive(Default, Debug)]
35pub struct FulltextRequest {
36    pub queries: Vec<FulltextQuery>,
37    pub terms: Vec<FulltextTerm>,
38}
39
40impl FulltextRequest {
41    /// Convert terms to a query string.
42    ///
43    /// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`.
44    /// Need to escape the `"` in the term.
45    ///
46    /// `skip_lowercased` is used for the situation that lowercased terms are not indexed.
47    pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
48        let mut query = String::new();
49        for term in &self.terms {
50            if skip_lowercased && term.col_lowered {
51                continue;
52            }
53            // Escape the `"` in the term.
54            let escaped_term = term.term.replace("\"", "\\\"");
55            if query.is_empty() {
56                query = format!("+\"{escaped_term}\"");
57            } else {
58                query.push_str(&format!(" +\"{escaped_term}\""));
59            }
60        }
61        FulltextQuery(query)
62    }
63}
64
65/// A query to be matched in fulltext index.
66///
67/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
68#[derive(Debug, PartialEq, Eq)]
69pub struct FulltextQuery(pub String);
70
71/// A term to be matched in fulltext index.
72///
73/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
74/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
75#[derive(Debug, PartialEq, Eq)]
76pub struct FulltextTerm {
77    pub col_lowered: bool,
78    pub term: String,
79}
80
81/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
82pub struct FulltextIndexApplierBuilder<'a> {
83    region_dir: String,
84    region_id: RegionId,
85    store: ObjectStore,
86    puffin_manager_factory: PuffinManagerFactory,
87    metadata: &'a RegionMetadata,
88    file_cache: Option<FileCacheRef>,
89    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
90    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
91}
92
93impl<'a> FulltextIndexApplierBuilder<'a> {
94    /// Creates a new `FulltextIndexApplierBuilder`.
95    pub fn new(
96        region_dir: String,
97        region_id: RegionId,
98        store: ObjectStore,
99        puffin_manager_factory: PuffinManagerFactory,
100        metadata: &'a RegionMetadata,
101    ) -> Self {
102        Self {
103            region_dir,
104            region_id,
105            store,
106            puffin_manager_factory,
107            metadata,
108            file_cache: None,
109            puffin_metadata_cache: None,
110            bloom_filter_cache: None,
111        }
112    }
113
114    /// Sets the file cache to be used by the `FulltextIndexApplier`.
115    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
116        self.file_cache = file_cache;
117        self
118    }
119
120    /// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
121    pub fn with_puffin_metadata_cache(
122        mut self,
123        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
124    ) -> Self {
125        self.puffin_metadata_cache = puffin_metadata_cache;
126        self
127    }
128
129    /// Sets the bloom filter cache to be used by the `FulltextIndexApplier`.
130    pub fn with_bloom_filter_cache(
131        mut self,
132        bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
133    ) -> Self {
134        self.bloom_filter_cache = bloom_filter_cache;
135        self
136    }
137
138    /// Builds `SstIndexApplier` from the given expressions.
139    pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
140        let mut requests = HashMap::new();
141        for expr in exprs {
142            Self::extract_requests(expr, self.metadata, &mut requests);
143        }
144
145        // Check if any requests have queries or terms
146        let has_requests = requests
147            .iter()
148            .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
149
150        Ok(has_requests.then(|| {
151            FulltextIndexApplier::new(
152                self.region_dir,
153                self.region_id,
154                self.store,
155                requests,
156                self.puffin_manager_factory,
157            )
158            .with_file_cache(self.file_cache)
159            .with_puffin_metadata_cache(self.puffin_metadata_cache)
160            .with_bloom_filter_cache(self.bloom_filter_cache)
161        }))
162    }
163
164    fn extract_requests(
165        expr: &Expr,
166        metadata: &'a RegionMetadata,
167        requests: &mut HashMap<ColumnId, FulltextRequest>,
168    ) {
169        match expr {
170            Expr::BinaryExpr(BinaryExpr {
171                left,
172                op: Operator::And,
173                right,
174            }) => {
175                Self::extract_requests(left, metadata, requests);
176                Self::extract_requests(right, metadata, requests);
177            }
178            Expr::ScalarFunction(func) => {
179                if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
180                    requests.entry(column_id).or_default().queries.push(query);
181                } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
182                    requests.entry(column_id).or_default().terms.push(term);
183                }
184            }
185            _ => {}
186        }
187    }
188
189    fn expr_to_query(
190        metadata: &RegionMetadata,
191        f: &ScalarFunction,
192    ) -> Option<(ColumnId, FulltextQuery)> {
193        if f.name() != "matches" {
194            return None;
195        }
196        if f.args.len() != 2 {
197            return None;
198        }
199
200        let Expr::Column(c) = &f.args[0] else {
201            return None;
202        };
203        let column = metadata.column_by_name(&c.name)?;
204
205        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
206            return None;
207        }
208
209        let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
210            return None;
211        };
212
213        Some((column.column_id, FulltextQuery(query.to_string())))
214    }
215
216    fn expr_to_term(
217        metadata: &RegionMetadata,
218        f: &ScalarFunction,
219    ) -> Option<(ColumnId, FulltextTerm)> {
220        if f.name() != "matches_term" {
221            return None;
222        }
223        if f.args.len() != 2 {
224            return None;
225        }
226
227        let mut lowered = false;
228        let column;
229        match &f.args[0] {
230            Expr::Column(c) => {
231                column = c;
232            }
233            Expr::ScalarFunction(f) => {
234                let lower_arg = Self::extract_lower_arg(f)?;
235                lowered = true;
236                if let Expr::Column(c) = lower_arg {
237                    column = c;
238                } else {
239                    return None;
240                }
241            }
242            _ => return None,
243        }
244
245        let column = metadata.column_by_name(&column.name)?;
246        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
247            return None;
248        }
249
250        let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
251            return None;
252        };
253
254        Some((
255            column.column_id,
256            FulltextTerm {
257                col_lowered: lowered,
258                term: term.to_string(),
259            },
260        ))
261    }
262
263    fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
264        if lower_func.args.len() != 1 {
265            return None;
266        }
267
268        if lower_func.name() != "lower" {
269            return None;
270        }
271
272        if lower_func.args.len() != 1 {
273            return None;
274        }
275
276        Some(&lower_func.args[0])
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::sync::Arc;
283
284    use api::v1::SemanticType;
285    use common_function::function_registry::FUNCTION_REGISTRY;
286    use common_function::scalars::udf::create_udf;
287    use datafusion::functions::string::lower;
288    use datafusion_common::Column;
289    use datafusion_expr::expr::ScalarFunction;
290    use datafusion_expr::ScalarUDF;
291    use datatypes::schema::ColumnSchema;
292    use session::context::QueryContext;
293    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
294    use store_api::storage::RegionId;
295
296    use super::*;
297
298    fn mock_metadata() -> RegionMetadata {
299        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
300        builder
301            .push_column_metadata(ColumnMetadata {
302                column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
303                semantic_type: SemanticType::Field,
304                column_id: 1,
305            })
306            .push_column_metadata(ColumnMetadata {
307                column_schema: ColumnSchema::new(
308                    "ts",
309                    ConcreteDataType::timestamp_millisecond_datatype(),
310                    false,
311                ),
312                semantic_type: SemanticType::Timestamp,
313                column_id: 2,
314            });
315
316        builder.build().unwrap()
317    }
318
319    fn matches_func() -> Arc<ScalarUDF> {
320        Arc::new(create_udf(
321            FUNCTION_REGISTRY.get_function("matches").unwrap(),
322            QueryContext::arc(),
323            Default::default(),
324        ))
325    }
326
327    fn matches_term_func() -> Arc<ScalarUDF> {
328        Arc::new(create_udf(
329            FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
330            QueryContext::arc(),
331            Default::default(),
332        ))
333    }
334
335    #[test]
336    fn test_expr_to_query_basic() {
337        let metadata = mock_metadata();
338
339        let func = ScalarFunction {
340            args: vec![
341                Expr::Column(Column::from_name("text")),
342                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
343            ],
344            func: matches_func(),
345        };
346
347        let (column_id, query) =
348            FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
349        assert_eq!(column_id, 1);
350        assert_eq!(query, FulltextQuery("foo".to_string()));
351    }
352
353    #[test]
354    fn test_expr_to_query_wrong_num_args() {
355        let metadata = mock_metadata();
356
357        let func = ScalarFunction {
358            args: vec![Expr::Column(Column::from_name("text"))],
359            func: matches_func(),
360        };
361
362        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
363    }
364
365    #[test]
366    fn test_expr_to_query_not_found_column() {
367        let metadata = mock_metadata();
368
369        let func = ScalarFunction {
370            args: vec![
371                Expr::Column(Column::from_name("not_found")),
372                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
373            ],
374            func: matches_func(),
375        };
376
377        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
378    }
379
380    #[test]
381    fn test_expr_to_query_column_wrong_data_type() {
382        let metadata = mock_metadata();
383
384        let func = ScalarFunction {
385            args: vec![
386                Expr::Column(Column::from_name("ts")),
387                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
388            ],
389            func: matches_func(),
390        };
391
392        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
393    }
394
395    #[test]
396    fn test_expr_to_query_pattern_not_string() {
397        let metadata = mock_metadata();
398
399        let func = ScalarFunction {
400            args: vec![
401                Expr::Column(Column::from_name("text")),
402                Expr::Literal(ScalarValue::Int64(Some(42))),
403            ],
404            func: matches_func(),
405        };
406
407        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
408    }
409
410    #[test]
411    fn test_expr_to_term_basic() {
412        let metadata = mock_metadata();
413
414        let func = ScalarFunction {
415            args: vec![
416                Expr::Column(Column::from_name("text")),
417                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
418            ],
419            func: matches_term_func(),
420        };
421
422        let (column_id, term) =
423            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
424        assert_eq!(column_id, 1);
425        assert_eq!(
426            term,
427            FulltextTerm {
428                col_lowered: false,
429                term: "foo".to_string(),
430            }
431        );
432    }
433
434    #[test]
435    fn test_expr_to_term_with_lower() {
436        let metadata = mock_metadata();
437
438        let lower_func_expr = ScalarFunction {
439            args: vec![Expr::Column(Column::from_name("text"))],
440            func: lower(),
441        };
442
443        let func = ScalarFunction {
444            args: vec![
445                Expr::ScalarFunction(lower_func_expr),
446                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
447            ],
448            func: matches_term_func(),
449        };
450
451        let (column_id, term) =
452            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
453        assert_eq!(column_id, 1);
454        assert_eq!(
455            term,
456            FulltextTerm {
457                col_lowered: true,
458                term: "foo".to_string(),
459            }
460        );
461    }
462
463    #[test]
464    fn test_expr_to_term_wrong_num_args() {
465        let metadata = mock_metadata();
466
467        let func = ScalarFunction {
468            args: vec![Expr::Column(Column::from_name("text"))],
469            func: matches_term_func(),
470        };
471
472        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
473    }
474
475    #[test]
476    fn test_expr_to_term_wrong_function_name() {
477        let metadata = mock_metadata();
478
479        let func = ScalarFunction {
480            args: vec![
481                Expr::Column(Column::from_name("text")),
482                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
483            ],
484            func: matches_func(), // Using 'matches' instead of 'matches_term'
485        };
486
487        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
488    }
489
490    #[test]
491    fn test_extract_lower_arg() {
492        let func = ScalarFunction {
493            args: vec![Expr::Column(Column::from_name("text"))],
494            func: lower(),
495        };
496
497        let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
498        match arg {
499            Expr::Column(c) => {
500                assert_eq!(c.name, "text");
501            }
502            _ => panic!("Expected Column expression"),
503        }
504    }
505
506    #[test]
507    fn test_extract_lower_arg_wrong_function() {
508        let func = ScalarFunction {
509            args: vec![Expr::Column(Column::from_name("text"))],
510            func: matches_func(), // Not 'lower'
511        };
512
513        assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
514    }
515
516    #[test]
517    fn test_extract_requests() {
518        let metadata = mock_metadata();
519
520        // Create a matches expression
521        let matches_expr = Expr::ScalarFunction(ScalarFunction {
522            args: vec![
523                Expr::Column(Column::from_name("text")),
524                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
525            ],
526            func: matches_func(),
527        });
528
529        let mut requests = HashMap::new();
530        FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
531
532        assert_eq!(requests.len(), 1);
533        let request = requests.get(&1).unwrap();
534        assert_eq!(request.queries.len(), 1);
535        assert_eq!(request.terms.len(), 0);
536        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
537    }
538
539    #[test]
540    fn test_extract_multiple_requests() {
541        let metadata = mock_metadata();
542
543        // Create a matches expression
544        let matches_expr = Expr::ScalarFunction(ScalarFunction {
545            args: vec![
546                Expr::Column(Column::from_name("text")),
547                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
548            ],
549            func: matches_func(),
550        });
551
552        // Create a matches_term expression
553        let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
554            args: vec![
555                Expr::Column(Column::from_name("text")),
556                Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
557            ],
558            func: matches_term_func(),
559        });
560
561        // Create a binary expression combining both
562        let binary_expr = Expr::BinaryExpr(BinaryExpr {
563            left: Box::new(matches_expr),
564            op: Operator::And,
565            right: Box::new(matches_term_expr),
566        });
567
568        let mut requests = HashMap::new();
569        FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
570
571        assert_eq!(requests.len(), 1);
572        let request = requests.get(&1).unwrap();
573        assert_eq!(request.queries.len(), 1);
574        assert_eq!(request.terms.len(), 1);
575        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
576        assert_eq!(
577            request.terms[0],
578            FulltextTerm {
579                col_lowered: false,
580                term: "bar".to_string(),
581            }
582        );
583    }
584
585    #[test]
586    fn test_terms_as_query() {
587        // Test with empty terms
588        let request = FulltextRequest::default();
589        assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
590        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
591
592        // Test with a single term (not lowercased)
593        let mut request = FulltextRequest::default();
594        request.terms.push(FulltextTerm {
595            col_lowered: false,
596            term: "foo".to_string(),
597        });
598        assert_eq!(
599            request.terms_as_query(false),
600            FulltextQuery("+\"foo\"".to_string())
601        );
602        assert_eq!(
603            request.terms_as_query(true),
604            FulltextQuery("+\"foo\"".to_string())
605        );
606
607        // Test with a single lowercased term and skip_lowercased=true
608        let mut request = FulltextRequest::default();
609        request.terms.push(FulltextTerm {
610            col_lowered: true,
611            term: "foo".to_string(),
612        });
613        assert_eq!(
614            request.terms_as_query(false),
615            FulltextQuery("+\"foo\"".to_string())
616        );
617        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term
618
619        // Test with multiple terms, mix of lowercased and not
620        let mut request = FulltextRequest::default();
621        request.terms.push(FulltextTerm {
622            col_lowered: false,
623            term: "foo".to_string(),
624        });
625        request.terms.push(FulltextTerm {
626            col_lowered: true,
627            term: "bar".to_string(),
628        });
629        assert_eq!(
630            request.terms_as_query(false),
631            FulltextQuery("+\"foo\" +\"bar\"".to_string())
632        );
633        assert_eq!(
634            request.terms_as_query(true),
635            FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term
636        );
637
638        // Test with term containing quotes that need escaping
639        let mut request = FulltextRequest::default();
640        request.terms.push(FulltextTerm {
641            col_lowered: false,
642            term: "foo\"bar".to_string(),
643        });
644        assert_eq!(
645            request.terms_as_query(false),
646            FulltextQuery("+\"foo\\\"bar\"".to_string())
647        );
648
649        // Test with a complex mix of terms
650        let mut request = FulltextRequest::default();
651        request.terms.push(FulltextTerm {
652            col_lowered: false,
653            term: "foo".to_string(),
654        });
655        request.terms.push(FulltextTerm {
656            col_lowered: true,
657            term: "bar\"quoted\"".to_string(),
658        });
659        request.terms.push(FulltextTerm {
660            col_lowered: false,
661            term: "baz\\escape".to_string(),
662        });
663        assert_eq!(
664            request.terms_as_query(false),
665            FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
666        );
667        assert_eq!(
668            request.terms_as_query(true),
669            FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term
670        );
671    }
672}