mito2/memtable/partition_tree/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::storage::ColumnId;
23
24use crate::error::Result;
25use crate::memtable::partition_tree::partition::Partition;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28#[derive(Clone)]
29struct PrimaryKeyFilterInner {
30    metadata: RegionMetadataRef,
31    filters: Arc<Vec<SimpleFilterEvaluator>>,
32}
33
34impl PrimaryKeyFilterInner {
35    fn evaluate_filters(
36        &self,
37        mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
38    ) -> bool {
39        if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
40            return true;
41        }
42
43        let mut result = true;
44        for filter in self.filters.iter() {
45            if Partition::is_partition_column(filter.column_name()) {
46                continue;
47            }
48            let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
49                continue;
50            };
51            // ignore filters that are not referencing primary key columns
52            if column.semantic_type != SemanticType::Tag {
53                continue;
54            }
55
56            let value = match decode_fn(column.column_id, &self.metadata) {
57                Ok(v) => v,
58                Err(e) => {
59                    common_telemetry::error!(e; "Failed to decode primary key");
60                    return true;
61                }
62            };
63
64            // TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We
65            // can compare the bytes directly without allocation and matching types as we use
66            // comparable encoding.
67            // Safety: arrow schema and datatypes are constructed from the same source.
68            let scalar_value = value
69                .try_to_scalar_value(&column.column_schema.data_type)
70                .unwrap();
71            result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
72        }
73
74        result
75    }
76}
77
78/// Dense primary key filter.
79#[derive(Clone)]
80pub struct DensePrimaryKeyFilter {
81    inner: PrimaryKeyFilterInner,
82    codec: DensePrimaryKeyCodec,
83    offsets_buf: Vec<usize>,
84}
85
86impl DensePrimaryKeyFilter {
87    pub(crate) fn new(
88        metadata: RegionMetadataRef,
89        filters: Arc<Vec<SimpleFilterEvaluator>>,
90        codec: DensePrimaryKeyCodec,
91    ) -> Self {
92        Self {
93            inner: PrimaryKeyFilterInner { metadata, filters },
94            codec,
95            offsets_buf: Vec::new(),
96        }
97    }
98}
99
100impl PrimaryKeyFilter for DensePrimaryKeyFilter {
101    fn matches(&mut self, pk: &[u8]) -> bool {
102        self.offsets_buf.clear();
103        self.inner.evaluate_filters(|column_id, metadata| {
104            // index of tag column in primary key
105            // Safety: A tag column is always in primary key.
106            let index = metadata.primary_key_index(column_id).unwrap();
107            self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
108        })
109    }
110}
111
112/// Sparse primary key filter.
113#[derive(Clone)]
114pub struct SparsePrimaryKeyFilter {
115    inner: PrimaryKeyFilterInner,
116    codec: SparsePrimaryKeyCodec,
117    offsets_map: HashMap<ColumnId, usize>,
118}
119
120impl SparsePrimaryKeyFilter {
121    pub(crate) fn new(
122        metadata: RegionMetadataRef,
123        filters: Arc<Vec<SimpleFilterEvaluator>>,
124        codec: SparsePrimaryKeyCodec,
125    ) -> Self {
126        Self {
127            inner: PrimaryKeyFilterInner { metadata, filters },
128            codec,
129            offsets_map: HashMap::new(),
130        }
131    }
132}
133
134impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
135    fn matches(&mut self, pk: &[u8]) -> bool {
136        self.offsets_map.clear();
137        self.inner.evaluate_filters(|column_id, _| {
138            if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
139                self.codec.decode_value_at(pk, offset, column_id)
140            } else {
141                Ok(Value::Null)
142            }
143        })
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::sync::Arc;
150
151    use api::v1::SemanticType;
152    use datafusion::logical_expr::BinaryExpr;
153    use datafusion_common::{Column, ScalarValue};
154    use datafusion_expr::{Expr, Operator};
155    use datatypes::prelude::ConcreteDataType;
156    use datatypes::schema::ColumnSchema;
157    use datatypes::value::ValueRef;
158    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
159    use store_api::storage::{ColumnId, RegionId};
160
161    use super::*;
162    use crate::row_converter::PrimaryKeyCodecExt;
163
164    fn setup_metadata() -> RegionMetadataRef {
165        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
166        builder
167            .push_column_metadata(ColumnMetadata {
168                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
169                semantic_type: SemanticType::Tag,
170                column_id: 1,
171            })
172            .push_column_metadata(ColumnMetadata {
173                column_schema: ColumnSchema::new(
174                    "namespace",
175                    ConcreteDataType::string_datatype(),
176                    true,
177                ),
178                semantic_type: SemanticType::Tag,
179                column_id: 2,
180            })
181            .push_column_metadata(ColumnMetadata {
182                column_schema: ColumnSchema::new(
183                    "container",
184                    ConcreteDataType::string_datatype(),
185                    true,
186                ),
187                semantic_type: SemanticType::Tag,
188                column_id: 3,
189            })
190            .push_column_metadata(ColumnMetadata {
191                column_schema: ColumnSchema::new(
192                    "greptime_value",
193                    ConcreteDataType::float64_datatype(),
194                    false,
195                ),
196                semantic_type: SemanticType::Field,
197                column_id: 4,
198            })
199            .push_column_metadata(ColumnMetadata {
200                column_schema: ColumnSchema::new(
201                    "greptime_timestamp",
202                    ConcreteDataType::timestamp_nanosecond_datatype(),
203                    false,
204                ),
205                semantic_type: SemanticType::Timestamp,
206                column_id: 5,
207            })
208            .primary_key(vec![1, 2, 3]);
209        let metadata = builder.build().unwrap();
210        Arc::new(metadata)
211    }
212
213    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
214        vec![
215            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
216            (2, ValueRef::String("greptime-cluster")),
217            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
218        ]
219    }
220
221    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
222        let expr = Expr::BinaryExpr(BinaryExpr {
223            left: Box::new(Expr::Column(Column::from_name(column_name))),
224            op: Operator::Eq,
225            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))),
226        });
227        SimpleFilterEvaluator::try_new(&expr).unwrap()
228    }
229
230    fn encode_sparse_pk(
231        metadata: &RegionMetadataRef,
232        row: Vec<(ColumnId, ValueRef<'static>)>,
233    ) -> Vec<u8> {
234        let codec = SparsePrimaryKeyCodec::new(metadata);
235        let mut pk = Vec::new();
236        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
237        pk
238    }
239
240    fn encode_dense_pk(
241        metadata: &RegionMetadataRef,
242        row: Vec<(ColumnId, ValueRef<'static>)>,
243    ) -> Vec<u8> {
244        let codec = DensePrimaryKeyCodec::new(metadata);
245        let mut pk = Vec::new();
246        codec
247            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
248            .unwrap();
249        pk
250    }
251
252    #[test]
253    fn test_sparse_primary_key_filter_matches() {
254        let metadata = setup_metadata();
255        let filters = Arc::new(vec![create_filter(
256            "pod",
257            "greptime-frontend-6989d9899-22222",
258        )]);
259        let pk = encode_sparse_pk(&metadata, create_test_row());
260        let codec = SparsePrimaryKeyCodec::new(&metadata);
261        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
262        assert!(filter.matches(&pk));
263    }
264
265    #[test]
266    fn test_sparse_primary_key_filter_not_matches() {
267        let metadata = setup_metadata();
268        let filters = Arc::new(vec![create_filter(
269            "pod",
270            "greptime-frontend-6989d9899-22223",
271        )]);
272        let pk = encode_sparse_pk(&metadata, create_test_row());
273        let codec = SparsePrimaryKeyCodec::new(&metadata);
274        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
275        assert!(!filter.matches(&pk));
276    }
277
278    #[test]
279    fn test_sparse_primary_key_filter_matches_with_null() {
280        let metadata = setup_metadata();
281        let filters = Arc::new(vec![create_filter(
282            "non-exist-label",
283            "greptime-frontend-6989d9899-22222",
284        )]);
285        let pk = encode_sparse_pk(&metadata, create_test_row());
286        let codec = SparsePrimaryKeyCodec::new(&metadata);
287        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
288        assert!(filter.matches(&pk));
289    }
290
291    #[test]
292    fn test_dense_primary_key_filter_matches() {
293        let metadata = setup_metadata();
294        let filters = Arc::new(vec![create_filter(
295            "pod",
296            "greptime-frontend-6989d9899-22222",
297        )]);
298        let pk = encode_dense_pk(&metadata, create_test_row());
299        let codec = DensePrimaryKeyCodec::new(&metadata);
300        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
301        assert!(filter.matches(&pk));
302    }
303
304    #[test]
305    fn test_dense_primary_key_filter_not_matches() {
306        let metadata = setup_metadata();
307        let filters = Arc::new(vec![create_filter(
308            "pod",
309            "greptime-frontend-6989d9899-22223",
310        )]);
311        let pk = encode_dense_pk(&metadata, create_test_row());
312        let codec = DensePrimaryKeyCodec::new(&metadata);
313        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
314        assert!(!filter.matches(&pk));
315    }
316
317    #[test]
318    fn test_dense_primary_key_filter_matches_with_null() {
319        let metadata = setup_metadata();
320        let filters = Arc::new(vec![create_filter(
321            "non-exist-label",
322            "greptime-frontend-6989d9899-22222",
323        )]);
324        let pk = encode_dense_pk(&metadata, create_test_row());
325        let codec = DensePrimaryKeyCodec::new(&metadata);
326        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
327        assert!(filter.matches(&pk));
328    }
329}