mito2/sst/index/fulltext_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
24
25use crate::cache::file_cache::FileCacheRef;
26use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
27use crate::error::Result;
28use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
29use crate::sst::index::puffin_manager::PuffinManagerFactory;
30
31/// A request for fulltext index.
32///
33/// It contains all the queries and terms for a column.
34#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
35pub struct FulltextRequest {
36    pub queries: Vec<FulltextQuery>,
37    pub terms: Vec<FulltextTerm>,
38}
39
40impl FulltextRequest {
41    /// Convert terms to a query string.
42    ///
43    /// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`.
44    /// Need to escape the `"` in the term.
45    ///
46    /// `skip_lowercased` is used for the situation that lowercased terms are not indexed.
47    pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
48        let mut query = String::new();
49        for term in &self.terms {
50            if skip_lowercased && term.col_lowered {
51                continue;
52            }
53            // Escape the `"` in the term.
54            let escaped_term = term.term.replace("\"", "\\\"");
55            if query.is_empty() {
56                query = format!("+\"{escaped_term}\"");
57            } else {
58                query.push_str(&format!(" +\"{escaped_term}\""));
59            }
60        }
61        FulltextQuery(query)
62    }
63}
64
65/// A query to be matched in fulltext index.
66///
67/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct FulltextQuery(pub String);
70
71/// A term to be matched in fulltext index.
72///
73/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
74/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct FulltextTerm {
77    pub col_lowered: bool,
78    pub term: String,
79}
80
81/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
82pub struct FulltextIndexApplierBuilder<'a> {
83    region_dir: String,
84    region_id: RegionId,
85    store: ObjectStore,
86    puffin_manager_factory: PuffinManagerFactory,
87    metadata: &'a RegionMetadata,
88    file_cache: Option<FileCacheRef>,
89    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
90    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
91}
92
93impl<'a> FulltextIndexApplierBuilder<'a> {
94    /// Creates a new `FulltextIndexApplierBuilder`.
95    pub fn new(
96        region_dir: String,
97        region_id: RegionId,
98        store: ObjectStore,
99        puffin_manager_factory: PuffinManagerFactory,
100        metadata: &'a RegionMetadata,
101    ) -> Self {
102        Self {
103            region_dir,
104            region_id,
105            store,
106            puffin_manager_factory,
107            metadata,
108            file_cache: None,
109            puffin_metadata_cache: None,
110            bloom_filter_cache: None,
111        }
112    }
113
114    /// Sets the file cache to be used by the `FulltextIndexApplier`.
115    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
116        self.file_cache = file_cache;
117        self
118    }
119
120    /// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
121    pub fn with_puffin_metadata_cache(
122        mut self,
123        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
124    ) -> Self {
125        self.puffin_metadata_cache = puffin_metadata_cache;
126        self
127    }
128
129    /// Sets the bloom filter cache to be used by the `FulltextIndexApplier`.
130    pub fn with_bloom_filter_cache(
131        mut self,
132        bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
133    ) -> Self {
134        self.bloom_filter_cache = bloom_filter_cache;
135        self
136    }
137
138    /// Builds `SstIndexApplier` from the given expressions.
139    pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
140        let mut requests = BTreeMap::new();
141        for expr in exprs {
142            Self::extract_requests(expr, self.metadata, &mut requests);
143        }
144
145        // Check if any requests have queries or terms
146        let has_requests = requests
147            .iter()
148            .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
149
150        Ok(has_requests.then(|| {
151            FulltextIndexApplier::new(
152                self.region_dir,
153                self.region_id,
154                self.store,
155                requests,
156                self.puffin_manager_factory,
157            )
158            .with_file_cache(self.file_cache)
159            .with_puffin_metadata_cache(self.puffin_metadata_cache)
160            .with_bloom_filter_cache(self.bloom_filter_cache)
161        }))
162    }
163
164    fn extract_requests(
165        expr: &Expr,
166        metadata: &'a RegionMetadata,
167        requests: &mut BTreeMap<ColumnId, FulltextRequest>,
168    ) {
169        match expr {
170            Expr::BinaryExpr(BinaryExpr {
171                left,
172                op: Operator::And,
173                right,
174            }) => {
175                Self::extract_requests(left, metadata, requests);
176                Self::extract_requests(right, metadata, requests);
177            }
178            Expr::ScalarFunction(func) => {
179                if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
180                    requests.entry(column_id).or_default().queries.push(query);
181                } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
182                    requests.entry(column_id).or_default().terms.push(term);
183                }
184            }
185            _ => {}
186        }
187    }
188
189    fn expr_to_query(
190        metadata: &RegionMetadata,
191        f: &ScalarFunction,
192    ) -> Option<(ColumnId, FulltextQuery)> {
193        if f.name() != "matches" {
194            return None;
195        }
196        if f.args.len() != 2 {
197            return None;
198        }
199
200        let Expr::Column(c) = &f.args[0] else {
201            return None;
202        };
203        let column = metadata.column_by_name(&c.name)?;
204
205        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
206            return None;
207        }
208
209        let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
210            return None;
211        };
212
213        Some((column.column_id, FulltextQuery(query.to_string())))
214    }
215
216    fn expr_to_term(
217        metadata: &RegionMetadata,
218        f: &ScalarFunction,
219    ) -> Option<(ColumnId, FulltextTerm)> {
220        if f.name() != "matches_term" {
221            return None;
222        }
223        if f.args.len() != 2 {
224            return None;
225        }
226
227        let mut lowered = false;
228        let column;
229        match &f.args[0] {
230            Expr::Column(c) => {
231                column = c;
232            }
233            Expr::ScalarFunction(f) => {
234                let lower_arg = Self::extract_lower_arg(f)?;
235                lowered = true;
236                if let Expr::Column(c) = lower_arg {
237                    column = c;
238                } else {
239                    return None;
240                }
241            }
242            _ => return None,
243        }
244
245        let column = metadata.column_by_name(&column.name)?;
246        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
247            return None;
248        }
249
250        let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
251            return None;
252        };
253
254        Some((
255            column.column_id,
256            FulltextTerm {
257                col_lowered: lowered,
258                term: term.to_string(),
259            },
260        ))
261    }
262
263    fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
264        if lower_func.args.len() != 1 {
265            return None;
266        }
267
268        if lower_func.name() != "lower" {
269            return None;
270        }
271
272        if lower_func.args.len() != 1 {
273            return None;
274        }
275
276        Some(&lower_func.args[0])
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::sync::Arc;
283
284    use api::v1::SemanticType;
285    use common_function::function::FunctionRef;
286    use common_function::function_factory::ScalarFunctionFactory;
287    use common_function::scalars::matches::MatchesFunction;
288    use common_function::scalars::matches_term::MatchesTermFunction;
289    use datafusion::functions::string::lower;
290    use datafusion_common::Column;
291    use datafusion_expr::expr::ScalarFunction;
292    use datafusion_expr::ScalarUDF;
293    use datatypes::schema::ColumnSchema;
294    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
295    use store_api::storage::RegionId;
296
297    use super::*;
298
299    fn mock_metadata() -> RegionMetadata {
300        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
301        builder
302            .push_column_metadata(ColumnMetadata {
303                column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
304                semantic_type: SemanticType::Field,
305                column_id: 1,
306            })
307            .push_column_metadata(ColumnMetadata {
308                column_schema: ColumnSchema::new(
309                    "ts",
310                    ConcreteDataType::timestamp_millisecond_datatype(),
311                    false,
312                ),
313                semantic_type: SemanticType::Timestamp,
314                column_id: 2,
315            });
316
317        builder.build().unwrap()
318    }
319
320    fn matches_func() -> Arc<ScalarUDF> {
321        Arc::new(
322            ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
323                .provide(Default::default()),
324        )
325    }
326
327    fn matches_term_func() -> Arc<ScalarUDF> {
328        Arc::new(
329            ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
330                .provide(Default::default()),
331        )
332    }
333
334    #[test]
335    fn test_expr_to_query_basic() {
336        let metadata = mock_metadata();
337
338        let func = ScalarFunction {
339            args: vec![
340                Expr::Column(Column::from_name("text")),
341                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
342            ],
343            func: matches_func(),
344        };
345
346        let (column_id, query) =
347            FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
348        assert_eq!(column_id, 1);
349        assert_eq!(query, FulltextQuery("foo".to_string()));
350    }
351
352    #[test]
353    fn test_expr_to_query_wrong_num_args() {
354        let metadata = mock_metadata();
355
356        let func = ScalarFunction {
357            args: vec![Expr::Column(Column::from_name("text"))],
358            func: matches_func(),
359        };
360
361        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
362    }
363
364    #[test]
365    fn test_expr_to_query_not_found_column() {
366        let metadata = mock_metadata();
367
368        let func = ScalarFunction {
369            args: vec![
370                Expr::Column(Column::from_name("not_found")),
371                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
372            ],
373            func: matches_func(),
374        };
375
376        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
377    }
378
379    #[test]
380    fn test_expr_to_query_column_wrong_data_type() {
381        let metadata = mock_metadata();
382
383        let func = ScalarFunction {
384            args: vec![
385                Expr::Column(Column::from_name("ts")),
386                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
387            ],
388            func: matches_func(),
389        };
390
391        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
392    }
393
394    #[test]
395    fn test_expr_to_query_pattern_not_string() {
396        let metadata = mock_metadata();
397
398        let func = ScalarFunction {
399            args: vec![
400                Expr::Column(Column::from_name("text")),
401                Expr::Literal(ScalarValue::Int64(Some(42))),
402            ],
403            func: matches_func(),
404        };
405
406        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
407    }
408
409    #[test]
410    fn test_expr_to_term_basic() {
411        let metadata = mock_metadata();
412
413        let func = ScalarFunction {
414            args: vec![
415                Expr::Column(Column::from_name("text")),
416                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
417            ],
418            func: matches_term_func(),
419        };
420
421        let (column_id, term) =
422            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
423        assert_eq!(column_id, 1);
424        assert_eq!(
425            term,
426            FulltextTerm {
427                col_lowered: false,
428                term: "foo".to_string(),
429            }
430        );
431    }
432
433    #[test]
434    fn test_expr_to_term_with_lower() {
435        let metadata = mock_metadata();
436
437        let lower_func_expr = ScalarFunction {
438            args: vec![Expr::Column(Column::from_name("text"))],
439            func: lower(),
440        };
441
442        let func = ScalarFunction {
443            args: vec![
444                Expr::ScalarFunction(lower_func_expr),
445                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
446            ],
447            func: matches_term_func(),
448        };
449
450        let (column_id, term) =
451            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
452        assert_eq!(column_id, 1);
453        assert_eq!(
454            term,
455            FulltextTerm {
456                col_lowered: true,
457                term: "foo".to_string(),
458            }
459        );
460    }
461
462    #[test]
463    fn test_expr_to_term_wrong_num_args() {
464        let metadata = mock_metadata();
465
466        let func = ScalarFunction {
467            args: vec![Expr::Column(Column::from_name("text"))],
468            func: matches_term_func(),
469        };
470
471        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
472    }
473
474    #[test]
475    fn test_expr_to_term_wrong_function_name() {
476        let metadata = mock_metadata();
477
478        let func = ScalarFunction {
479            args: vec![
480                Expr::Column(Column::from_name("text")),
481                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
482            ],
483            func: matches_func(), // Using 'matches' instead of 'matches_term'
484        };
485
486        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
487    }
488
489    #[test]
490    fn test_extract_lower_arg() {
491        let func = ScalarFunction {
492            args: vec![Expr::Column(Column::from_name("text"))],
493            func: lower(),
494        };
495
496        let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
497        match arg {
498            Expr::Column(c) => {
499                assert_eq!(c.name, "text");
500            }
501            _ => panic!("Expected Column expression"),
502        }
503    }
504
505    #[test]
506    fn test_extract_lower_arg_wrong_function() {
507        let func = ScalarFunction {
508            args: vec![Expr::Column(Column::from_name("text"))],
509            func: matches_func(), // Not 'lower'
510        };
511
512        assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
513    }
514
515    #[test]
516    fn test_extract_requests() {
517        let metadata = mock_metadata();
518
519        // Create a matches expression
520        let matches_expr = Expr::ScalarFunction(ScalarFunction {
521            args: vec![
522                Expr::Column(Column::from_name("text")),
523                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
524            ],
525            func: matches_func(),
526        });
527
528        let mut requests = BTreeMap::new();
529        FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
530
531        assert_eq!(requests.len(), 1);
532        let request = requests.get(&1).unwrap();
533        assert_eq!(request.queries.len(), 1);
534        assert_eq!(request.terms.len(), 0);
535        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
536    }
537
538    #[test]
539    fn test_extract_multiple_requests() {
540        let metadata = mock_metadata();
541
542        // Create a matches expression
543        let matches_expr = Expr::ScalarFunction(ScalarFunction {
544            args: vec![
545                Expr::Column(Column::from_name("text")),
546                Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
547            ],
548            func: matches_func(),
549        });
550
551        // Create a matches_term expression
552        let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
553            args: vec![
554                Expr::Column(Column::from_name("text")),
555                Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
556            ],
557            func: matches_term_func(),
558        });
559
560        // Create a binary expression combining both
561        let binary_expr = Expr::BinaryExpr(BinaryExpr {
562            left: Box::new(matches_expr),
563            op: Operator::And,
564            right: Box::new(matches_term_expr),
565        });
566
567        let mut requests = BTreeMap::new();
568        FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
569
570        assert_eq!(requests.len(), 1);
571        let request = requests.get(&1).unwrap();
572        assert_eq!(request.queries.len(), 1);
573        assert_eq!(request.terms.len(), 1);
574        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
575        assert_eq!(
576            request.terms[0],
577            FulltextTerm {
578                col_lowered: false,
579                term: "bar".to_string(),
580            }
581        );
582    }
583
584    #[test]
585    fn test_terms_as_query() {
586        // Test with empty terms
587        let request = FulltextRequest::default();
588        assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
589        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
590
591        // Test with a single term (not lowercased)
592        let mut request = FulltextRequest::default();
593        request.terms.push(FulltextTerm {
594            col_lowered: false,
595            term: "foo".to_string(),
596        });
597        assert_eq!(
598            request.terms_as_query(false),
599            FulltextQuery("+\"foo\"".to_string())
600        );
601        assert_eq!(
602            request.terms_as_query(true),
603            FulltextQuery("+\"foo\"".to_string())
604        );
605
606        // Test with a single lowercased term and skip_lowercased=true
607        let mut request = FulltextRequest::default();
608        request.terms.push(FulltextTerm {
609            col_lowered: true,
610            term: "foo".to_string(),
611        });
612        assert_eq!(
613            request.terms_as_query(false),
614            FulltextQuery("+\"foo\"".to_string())
615        );
616        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term
617
618        // Test with multiple terms, mix of lowercased and not
619        let mut request = FulltextRequest::default();
620        request.terms.push(FulltextTerm {
621            col_lowered: false,
622            term: "foo".to_string(),
623        });
624        request.terms.push(FulltextTerm {
625            col_lowered: true,
626            term: "bar".to_string(),
627        });
628        assert_eq!(
629            request.terms_as_query(false),
630            FulltextQuery("+\"foo\" +\"bar\"".to_string())
631        );
632        assert_eq!(
633            request.terms_as_query(true),
634            FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term
635        );
636
637        // Test with term containing quotes that need escaping
638        let mut request = FulltextRequest::default();
639        request.terms.push(FulltextTerm {
640            col_lowered: false,
641            term: "foo\"bar".to_string(),
642        });
643        assert_eq!(
644            request.terms_as_query(false),
645            FulltextQuery("+\"foo\\\"bar\"".to_string())
646        );
647
648        // Test with a complex mix of terms
649        let mut request = FulltextRequest::default();
650        request.terms.push(FulltextTerm {
651            col_lowered: false,
652            term: "foo".to_string(),
653        });
654        request.terms.push(FulltextTerm {
655            col_lowered: true,
656            term: "bar\"quoted\"".to_string(),
657        });
658        request.terms.push(FulltextTerm {
659            col_lowered: false,
660            term: "baz\\escape".to_string(),
661        });
662        assert_eq!(
663            request.terms_as_query(false),
664            FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
665        );
666        assert_eq!(
667            request.terms_as_query(true),
668            FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term
669        );
670    }
671}