mito2/sst/index/fulltext_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use datafusion_common::ScalarValue;
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::{BinaryExpr, Expr, Operator};
20use object_store::ObjectStore;
21use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
22use store_api::metadata::RegionMetadata;
23use store_api::region_request::PathType;
24use store_api::storage::{ColumnId, ConcreteDataType};
25
26use crate::cache::file_cache::FileCacheRef;
27use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
28use crate::error::Result;
29use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
30use crate::sst::index::puffin_manager::PuffinManagerFactory;
31
32/// A request for fulltext index.
33///
34/// It contains all the queries and terms for a column.
35#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
36pub struct FulltextRequest {
37    pub queries: Vec<FulltextQuery>,
38    pub terms: Vec<FulltextTerm>,
39}
40
41impl FulltextRequest {
42    /// Convert terms to a query string.
43    ///
44    /// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`.
45    /// Need to escape the `"` in the term.
46    ///
47    /// `skip_lowercased` is used for the situation that lowercased terms are not indexed.
48    pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
49        let mut query = String::new();
50        for term in &self.terms {
51            if skip_lowercased && term.col_lowered {
52                continue;
53            }
54            // Escape the `"` in the term.
55            let escaped_term = term.term.replace("\"", "\\\"");
56            if query.is_empty() {
57                query = format!("+\"{escaped_term}\"");
58            } else {
59                query.push_str(&format!(" +\"{escaped_term}\""));
60            }
61        }
62        FulltextQuery(query)
63    }
64}
65
66/// A query to be matched in fulltext index.
67///
68/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70pub struct FulltextQuery(pub String);
71
72/// A term to be matched in fulltext index.
73///
74/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
75/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77pub struct FulltextTerm {
78    pub col_lowered: bool,
79    pub term: String,
80}
81
82/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
83pub struct FulltextIndexApplierBuilder<'a> {
84    table_dir: String,
85    path_type: PathType,
86    store: ObjectStore,
87    puffin_manager_factory: PuffinManagerFactory,
88    metadata: &'a RegionMetadata,
89    file_cache: Option<FileCacheRef>,
90    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
91    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
92}
93
94impl<'a> FulltextIndexApplierBuilder<'a> {
95    /// Creates a new `FulltextIndexApplierBuilder`.
96    pub fn new(
97        table_dir: String,
98        path_type: PathType,
99        store: ObjectStore,
100        puffin_manager_factory: PuffinManagerFactory,
101        metadata: &'a RegionMetadata,
102    ) -> Self {
103        Self {
104            table_dir,
105            path_type,
106            store,
107            puffin_manager_factory,
108            metadata,
109            file_cache: None,
110            puffin_metadata_cache: None,
111            bloom_filter_cache: None,
112        }
113    }
114
115    /// Sets the file cache to be used by the `FulltextIndexApplier`.
116    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
117        self.file_cache = file_cache;
118        self
119    }
120
121    /// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
122    pub fn with_puffin_metadata_cache(
123        mut self,
124        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
125    ) -> Self {
126        self.puffin_metadata_cache = puffin_metadata_cache;
127        self
128    }
129
130    /// Sets the bloom filter cache to be used by the `FulltextIndexApplier`.
131    pub fn with_bloom_filter_cache(
132        mut self,
133        bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
134    ) -> Self {
135        self.bloom_filter_cache = bloom_filter_cache;
136        self
137    }
138
139    /// Builds `SstIndexApplier` from the given expressions.
140    pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
141        let mut requests = BTreeMap::new();
142        for expr in exprs {
143            Self::extract_requests(expr, self.metadata, &mut requests);
144        }
145
146        // Check if any requests have queries or terms
147        let has_requests = requests
148            .iter()
149            .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
150
151        Ok(has_requests.then(|| {
152            FulltextIndexApplier::new(
153                self.table_dir,
154                self.path_type,
155                self.store,
156                requests,
157                self.puffin_manager_factory,
158            )
159            .with_file_cache(self.file_cache)
160            .with_puffin_metadata_cache(self.puffin_metadata_cache)
161            .with_bloom_filter_cache(self.bloom_filter_cache)
162        }))
163    }
164
165    fn extract_requests(
166        expr: &Expr,
167        metadata: &'a RegionMetadata,
168        requests: &mut BTreeMap<ColumnId, FulltextRequest>,
169    ) {
170        match expr {
171            Expr::BinaryExpr(BinaryExpr {
172                left,
173                op: Operator::And,
174                right,
175            }) => {
176                Self::extract_requests(left, metadata, requests);
177                Self::extract_requests(right, metadata, requests);
178            }
179            Expr::ScalarFunction(func) => {
180                if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
181                    requests.entry(column_id).or_default().queries.push(query);
182                } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
183                    requests.entry(column_id).or_default().terms.push(term);
184                }
185            }
186            _ => {}
187        }
188    }
189
190    fn expr_to_query(
191        metadata: &RegionMetadata,
192        f: &ScalarFunction,
193    ) -> Option<(ColumnId, FulltextQuery)> {
194        if f.name() != "matches" {
195            return None;
196        }
197        if f.args.len() != 2 {
198            return None;
199        }
200
201        let Expr::Column(c) = &f.args[0] else {
202            return None;
203        };
204        let column = metadata.column_by_name(&c.name)?;
205
206        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
207            return None;
208        }
209
210        let Expr::Literal(ScalarValue::Utf8(Some(query)), _) = &f.args[1] else {
211            return None;
212        };
213
214        Some((column.column_id, FulltextQuery(query.to_string())))
215    }
216
217    fn expr_to_term(
218        metadata: &RegionMetadata,
219        f: &ScalarFunction,
220    ) -> Option<(ColumnId, FulltextTerm)> {
221        if f.name() != "matches_term" {
222            return None;
223        }
224        if f.args.len() != 2 {
225            return None;
226        }
227
228        let mut lowered = false;
229        let column;
230        match &f.args[0] {
231            Expr::Column(c) => {
232                column = c;
233            }
234            Expr::ScalarFunction(f) => {
235                let lower_arg = Self::extract_lower_arg(f)?;
236                lowered = true;
237                if let Expr::Column(c) = lower_arg {
238                    column = c;
239                } else {
240                    return None;
241                }
242            }
243            _ => return None,
244        }
245
246        let column = metadata.column_by_name(&column.name)?;
247        if column.column_schema.data_type != ConcreteDataType::string_datatype() {
248            return None;
249        }
250
251        let Expr::Literal(ScalarValue::Utf8(Some(term)), _) = &f.args[1] else {
252            return None;
253        };
254
255        Some((
256            column.column_id,
257            FulltextTerm {
258                col_lowered: lowered,
259                term: term.to_string(),
260            },
261        ))
262    }
263
264    fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
265        if lower_func.args.len() != 1 {
266            return None;
267        }
268
269        if lower_func.name() != "lower" {
270            return None;
271        }
272
273        if lower_func.args.len() != 1 {
274            return None;
275        }
276
277        Some(&lower_func.args[0])
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use std::sync::Arc;
284
285    use api::v1::SemanticType;
286    use common_function::function::FunctionRef;
287    use common_function::function_factory::ScalarFunctionFactory;
288    use common_function::scalars::matches::MatchesFunction;
289    use common_function::scalars::matches_term::MatchesTermFunction;
290    use datafusion::functions::string::lower;
291    use datafusion_common::Column;
292    use datafusion_expr::expr::ScalarFunction;
293    use datafusion_expr::{Literal, ScalarUDF};
294    use datatypes::schema::ColumnSchema;
295    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
296    use store_api::storage::RegionId;
297
298    use super::*;
299
300    fn mock_metadata() -> RegionMetadata {
301        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
302        builder
303            .push_column_metadata(ColumnMetadata {
304                column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
305                semantic_type: SemanticType::Field,
306                column_id: 1,
307            })
308            .push_column_metadata(ColumnMetadata {
309                column_schema: ColumnSchema::new(
310                    "ts",
311                    ConcreteDataType::timestamp_millisecond_datatype(),
312                    false,
313                ),
314                semantic_type: SemanticType::Timestamp,
315                column_id: 2,
316            });
317
318        builder.build().unwrap()
319    }
320
321    fn matches_func() -> Arc<ScalarUDF> {
322        Arc::new(
323            ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
324                .provide(Default::default()),
325        )
326    }
327
328    fn matches_term_func() -> Arc<ScalarUDF> {
329        Arc::new(
330            ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
331                .provide(Default::default()),
332        )
333    }
334
335    #[test]
336    fn test_expr_to_query_basic() {
337        let metadata = mock_metadata();
338
339        let func = ScalarFunction {
340            args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
341            func: matches_func(),
342        };
343
344        let (column_id, query) =
345            FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
346        assert_eq!(column_id, 1);
347        assert_eq!(query, FulltextQuery("foo".to_string()));
348    }
349
350    #[test]
351    fn test_expr_to_query_wrong_num_args() {
352        let metadata = mock_metadata();
353
354        let func = ScalarFunction {
355            args: vec![Expr::Column(Column::from_name("text"))],
356            func: matches_func(),
357        };
358
359        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
360    }
361
362    #[test]
363    fn test_expr_to_query_not_found_column() {
364        let metadata = mock_metadata();
365
366        let func = ScalarFunction {
367            args: vec![Expr::Column(Column::from_name("not_found")), "foo".lit()],
368            func: matches_func(),
369        };
370
371        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
372    }
373
374    #[test]
375    fn test_expr_to_query_column_wrong_data_type() {
376        let metadata = mock_metadata();
377
378        let func = ScalarFunction {
379            args: vec![Expr::Column(Column::from_name("ts")), "foo".lit()],
380            func: matches_func(),
381        };
382
383        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
384    }
385
386    #[test]
387    fn test_expr_to_query_pattern_not_string() {
388        let metadata = mock_metadata();
389
390        let func = ScalarFunction {
391            args: vec![Expr::Column(Column::from_name("text")), 42.lit()],
392            func: matches_func(),
393        };
394
395        assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
396    }
397
398    #[test]
399    fn test_expr_to_term_basic() {
400        let metadata = mock_metadata();
401
402        let func = ScalarFunction {
403            args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
404            func: matches_term_func(),
405        };
406
407        let (column_id, term) =
408            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
409        assert_eq!(column_id, 1);
410        assert_eq!(
411            term,
412            FulltextTerm {
413                col_lowered: false,
414                term: "foo".to_string(),
415            }
416        );
417    }
418
419    #[test]
420    fn test_expr_to_term_with_lower() {
421        let metadata = mock_metadata();
422
423        let lower_func_expr = ScalarFunction {
424            args: vec![Expr::Column(Column::from_name("text"))],
425            func: lower(),
426        };
427
428        let func = ScalarFunction {
429            args: vec![Expr::ScalarFunction(lower_func_expr), "foo".lit()],
430            func: matches_term_func(),
431        };
432
433        let (column_id, term) =
434            FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
435        assert_eq!(column_id, 1);
436        assert_eq!(
437            term,
438            FulltextTerm {
439                col_lowered: true,
440                term: "foo".to_string(),
441            }
442        );
443    }
444
445    #[test]
446    fn test_expr_to_term_wrong_num_args() {
447        let metadata = mock_metadata();
448
449        let func = ScalarFunction {
450            args: vec![Expr::Column(Column::from_name("text"))],
451            func: matches_term_func(),
452        };
453
454        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
455    }
456
457    #[test]
458    fn test_expr_to_term_wrong_function_name() {
459        let metadata = mock_metadata();
460
461        let func = ScalarFunction {
462            args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
463            func: matches_func(), // Using 'matches' instead of 'matches_term'
464        };
465
466        assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
467    }
468
469    #[test]
470    fn test_extract_lower_arg() {
471        let func = ScalarFunction {
472            args: vec![Expr::Column(Column::from_name("text"))],
473            func: lower(),
474        };
475
476        let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
477        match arg {
478            Expr::Column(c) => {
479                assert_eq!(c.name, "text");
480            }
481            _ => panic!("Expected Column expression"),
482        }
483    }
484
485    #[test]
486    fn test_extract_lower_arg_wrong_function() {
487        let func = ScalarFunction {
488            args: vec![Expr::Column(Column::from_name("text"))],
489            func: matches_func(), // Not 'lower'
490        };
491
492        assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
493    }
494
495    #[test]
496    fn test_extract_requests() {
497        let metadata = mock_metadata();
498
499        // Create a matches expression
500        let matches_expr = Expr::ScalarFunction(ScalarFunction {
501            args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
502            func: matches_func(),
503        });
504
505        let mut requests = BTreeMap::new();
506        FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
507
508        assert_eq!(requests.len(), 1);
509        let request = requests.get(&1).unwrap();
510        assert_eq!(request.queries.len(), 1);
511        assert_eq!(request.terms.len(), 0);
512        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
513    }
514
515    #[test]
516    fn test_extract_multiple_requests() {
517        let metadata = mock_metadata();
518
519        // Create a matches expression
520        let matches_expr = Expr::ScalarFunction(ScalarFunction {
521            args: vec![Expr::Column(Column::from_name("text")), "foo".lit()],
522            func: matches_func(),
523        });
524
525        // Create a matches_term expression
526        let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
527            args: vec![Expr::Column(Column::from_name("text")), "bar".lit()],
528            func: matches_term_func(),
529        });
530
531        // Create a binary expression combining both
532        let binary_expr = Expr::BinaryExpr(BinaryExpr {
533            left: Box::new(matches_expr),
534            op: Operator::And,
535            right: Box::new(matches_term_expr),
536        });
537
538        let mut requests = BTreeMap::new();
539        FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
540
541        assert_eq!(requests.len(), 1);
542        let request = requests.get(&1).unwrap();
543        assert_eq!(request.queries.len(), 1);
544        assert_eq!(request.terms.len(), 1);
545        assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
546        assert_eq!(
547            request.terms[0],
548            FulltextTerm {
549                col_lowered: false,
550                term: "bar".to_string(),
551            }
552        );
553    }
554
555    #[test]
556    fn test_terms_as_query() {
557        // Test with empty terms
558        let request = FulltextRequest::default();
559        assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
560        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
561
562        // Test with a single term (not lowercased)
563        let mut request = FulltextRequest::default();
564        request.terms.push(FulltextTerm {
565            col_lowered: false,
566            term: "foo".to_string(),
567        });
568        assert_eq!(
569            request.terms_as_query(false),
570            FulltextQuery("+\"foo\"".to_string())
571        );
572        assert_eq!(
573            request.terms_as_query(true),
574            FulltextQuery("+\"foo\"".to_string())
575        );
576
577        // Test with a single lowercased term and skip_lowercased=true
578        let mut request = FulltextRequest::default();
579        request.terms.push(FulltextTerm {
580            col_lowered: true,
581            term: "foo".to_string(),
582        });
583        assert_eq!(
584            request.terms_as_query(false),
585            FulltextQuery("+\"foo\"".to_string())
586        );
587        assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term
588
589        // Test with multiple terms, mix of lowercased and not
590        let mut request = FulltextRequest::default();
591        request.terms.push(FulltextTerm {
592            col_lowered: false,
593            term: "foo".to_string(),
594        });
595        request.terms.push(FulltextTerm {
596            col_lowered: true,
597            term: "bar".to_string(),
598        });
599        assert_eq!(
600            request.terms_as_query(false),
601            FulltextQuery("+\"foo\" +\"bar\"".to_string())
602        );
603        assert_eq!(
604            request.terms_as_query(true),
605            FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term
606        );
607
608        // Test with term containing quotes that need escaping
609        let mut request = FulltextRequest::default();
610        request.terms.push(FulltextTerm {
611            col_lowered: false,
612            term: "foo\"bar".to_string(),
613        });
614        assert_eq!(
615            request.terms_as_query(false),
616            FulltextQuery("+\"foo\\\"bar\"".to_string())
617        );
618
619        // Test with a complex mix of terms
620        let mut request = FulltextRequest::default();
621        request.terms.push(FulltextTerm {
622            col_lowered: false,
623            term: "foo".to_string(),
624        });
625        request.terms.push(FulltextTerm {
626            col_lowered: true,
627            term: "bar\"quoted\"".to_string(),
628        });
629        request.terms.push(FulltextTerm {
630            col_lowered: false,
631            term: "baz\\escape".to_string(),
632        });
633        assert_eq!(
634            request.terms_as_query(false),
635            FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
636        );
637        assert_eq!(
638            request.terms_as_query(true),
639            FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term
640        );
641    }
642}