mito_codec/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::storage::ColumnId;
24
25use crate::error::Result;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28/// Returns true if this is a partition column for metrics in the memtable.
29pub fn is_partition_column(name: &str) -> bool {
30    name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
31}
32
33#[derive(Clone)]
34struct PrimaryKeyFilterInner {
35    metadata: RegionMetadataRef,
36    filters: Arc<Vec<SimpleFilterEvaluator>>,
37}
38
39impl PrimaryKeyFilterInner {
40    fn evaluate_filters(
41        &self,
42        mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
43    ) -> bool {
44        if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
45            return true;
46        }
47
48        let mut result = true;
49        for filter in self.filters.iter() {
50            if is_partition_column(filter.column_name()) {
51                continue;
52            }
53            let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
54                continue;
55            };
56            // ignore filters that are not referencing primary key columns
57            if column.semantic_type != SemanticType::Tag {
58                continue;
59            }
60
61            let value = match decode_fn(column.column_id, &self.metadata) {
62                Ok(v) => v,
63                Err(e) => {
64                    common_telemetry::error!(e; "Failed to decode primary key");
65                    return true;
66                }
67            };
68
69            // TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We
70            // can compare the bytes directly without allocation and matching types as we use
71            // comparable encoding.
72            // Safety: arrow schema and datatypes are constructed from the same source.
73            let scalar_value = value
74                .try_to_scalar_value(&column.column_schema.data_type)
75                .unwrap();
76            result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
77        }
78
79        result
80    }
81}
82
83/// Dense primary key filter.
84#[derive(Clone)]
85pub struct DensePrimaryKeyFilter {
86    inner: PrimaryKeyFilterInner,
87    codec: DensePrimaryKeyCodec,
88    offsets_buf: Vec<usize>,
89}
90
91impl DensePrimaryKeyFilter {
92    pub(crate) fn new(
93        metadata: RegionMetadataRef,
94        filters: Arc<Vec<SimpleFilterEvaluator>>,
95        codec: DensePrimaryKeyCodec,
96    ) -> Self {
97        Self {
98            inner: PrimaryKeyFilterInner { metadata, filters },
99            codec,
100            offsets_buf: Vec::new(),
101        }
102    }
103}
104
105impl PrimaryKeyFilter for DensePrimaryKeyFilter {
106    fn matches(&mut self, pk: &[u8]) -> bool {
107        self.offsets_buf.clear();
108        self.inner.evaluate_filters(|column_id, metadata| {
109            // index of tag column in primary key
110            // Safety: A tag column is always in primary key.
111            let index = metadata.primary_key_index(column_id).unwrap();
112            self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
113        })
114    }
115}
116
117/// Sparse primary key filter.
118#[derive(Clone)]
119pub struct SparsePrimaryKeyFilter {
120    inner: PrimaryKeyFilterInner,
121    codec: SparsePrimaryKeyCodec,
122    offsets_map: HashMap<ColumnId, usize>,
123}
124
125impl SparsePrimaryKeyFilter {
126    pub(crate) fn new(
127        metadata: RegionMetadataRef,
128        filters: Arc<Vec<SimpleFilterEvaluator>>,
129        codec: SparsePrimaryKeyCodec,
130    ) -> Self {
131        Self {
132            inner: PrimaryKeyFilterInner { metadata, filters },
133            codec,
134            offsets_map: HashMap::new(),
135        }
136    }
137}
138
139impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
140    fn matches(&mut self, pk: &[u8]) -> bool {
141        self.offsets_map.clear();
142        self.inner.evaluate_filters(|column_id, _| {
143            if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
144                self.codec.decode_value_at(pk, offset, column_id)
145            } else {
146                Ok(Value::Null)
147            }
148        })
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::sync::Arc;
155
156    use api::v1::SemanticType;
157    use datafusion_common::{Column, ScalarValue};
158    use datafusion_expr::{BinaryExpr, Expr, Operator};
159    use datatypes::prelude::ConcreteDataType;
160    use datatypes::schema::ColumnSchema;
161    use datatypes::value::ValueRef;
162    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
163    use store_api::storage::{ColumnId, RegionId};
164
165    use super::*;
166    use crate::row_converter::PrimaryKeyCodecExt;
167
168    fn setup_metadata() -> RegionMetadataRef {
169        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
170        builder
171            .push_column_metadata(ColumnMetadata {
172                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
173                semantic_type: SemanticType::Tag,
174                column_id: 1,
175            })
176            .push_column_metadata(ColumnMetadata {
177                column_schema: ColumnSchema::new(
178                    "namespace",
179                    ConcreteDataType::string_datatype(),
180                    true,
181                ),
182                semantic_type: SemanticType::Tag,
183                column_id: 2,
184            })
185            .push_column_metadata(ColumnMetadata {
186                column_schema: ColumnSchema::new(
187                    "container",
188                    ConcreteDataType::string_datatype(),
189                    true,
190                ),
191                semantic_type: SemanticType::Tag,
192                column_id: 3,
193            })
194            .push_column_metadata(ColumnMetadata {
195                column_schema: ColumnSchema::new(
196                    "greptime_value",
197                    ConcreteDataType::float64_datatype(),
198                    false,
199                ),
200                semantic_type: SemanticType::Field,
201                column_id: 4,
202            })
203            .push_column_metadata(ColumnMetadata {
204                column_schema: ColumnSchema::new(
205                    "greptime_timestamp",
206                    ConcreteDataType::timestamp_nanosecond_datatype(),
207                    false,
208                ),
209                semantic_type: SemanticType::Timestamp,
210                column_id: 5,
211            })
212            .primary_key(vec![1, 2, 3]);
213        let metadata = builder.build().unwrap();
214        Arc::new(metadata)
215    }
216
217    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
218        vec![
219            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
220            (2, ValueRef::String("greptime-cluster")),
221            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
222        ]
223    }
224
225    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
226        let expr = Expr::BinaryExpr(BinaryExpr {
227            left: Box::new(Expr::Column(Column::from_name(column_name))),
228            op: Operator::Eq,
229            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))),
230        });
231        SimpleFilterEvaluator::try_new(&expr).unwrap()
232    }
233
234    fn encode_sparse_pk(
235        metadata: &RegionMetadataRef,
236        row: Vec<(ColumnId, ValueRef<'static>)>,
237    ) -> Vec<u8> {
238        let codec = SparsePrimaryKeyCodec::new(metadata);
239        let mut pk = Vec::new();
240        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
241        pk
242    }
243
244    fn encode_dense_pk(
245        metadata: &RegionMetadataRef,
246        row: Vec<(ColumnId, ValueRef<'static>)>,
247    ) -> Vec<u8> {
248        let codec = DensePrimaryKeyCodec::new(metadata);
249        let mut pk = Vec::new();
250        codec
251            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
252            .unwrap();
253        pk
254    }
255
256    #[test]
257    fn test_sparse_primary_key_filter_matches() {
258        let metadata = setup_metadata();
259        let filters = Arc::new(vec![create_filter(
260            "pod",
261            "greptime-frontend-6989d9899-22222",
262        )]);
263        let pk = encode_sparse_pk(&metadata, create_test_row());
264        let codec = SparsePrimaryKeyCodec::new(&metadata);
265        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
266        assert!(filter.matches(&pk));
267    }
268
269    #[test]
270    fn test_sparse_primary_key_filter_not_matches() {
271        let metadata = setup_metadata();
272        let filters = Arc::new(vec![create_filter(
273            "pod",
274            "greptime-frontend-6989d9899-22223",
275        )]);
276        let pk = encode_sparse_pk(&metadata, create_test_row());
277        let codec = SparsePrimaryKeyCodec::new(&metadata);
278        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
279        assert!(!filter.matches(&pk));
280    }
281
282    #[test]
283    fn test_sparse_primary_key_filter_matches_with_null() {
284        let metadata = setup_metadata();
285        let filters = Arc::new(vec![create_filter(
286            "non-exist-label",
287            "greptime-frontend-6989d9899-22222",
288        )]);
289        let pk = encode_sparse_pk(&metadata, create_test_row());
290        let codec = SparsePrimaryKeyCodec::new(&metadata);
291        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
292        assert!(filter.matches(&pk));
293    }
294
295    #[test]
296    fn test_dense_primary_key_filter_matches() {
297        let metadata = setup_metadata();
298        let filters = Arc::new(vec![create_filter(
299            "pod",
300            "greptime-frontend-6989d9899-22222",
301        )]);
302        let pk = encode_dense_pk(&metadata, create_test_row());
303        let codec = DensePrimaryKeyCodec::new(&metadata);
304        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
305        assert!(filter.matches(&pk));
306    }
307
308    #[test]
309    fn test_dense_primary_key_filter_not_matches() {
310        let metadata = setup_metadata();
311        let filters = Arc::new(vec![create_filter(
312            "pod",
313            "greptime-frontend-6989d9899-22223",
314        )]);
315        let pk = encode_dense_pk(&metadata, create_test_row());
316        let codec = DensePrimaryKeyCodec::new(&metadata);
317        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
318        assert!(!filter.matches(&pk));
319    }
320
321    #[test]
322    fn test_dense_primary_key_filter_matches_with_null() {
323        let metadata = setup_metadata();
324        let filters = Arc::new(vec![create_filter(
325            "non-exist-label",
326            "greptime-frontend-6989d9899-22222",
327        )]);
328        let pk = encode_dense_pk(&metadata, create_test_row());
329        let codec = DensePrimaryKeyCodec::new(&metadata);
330        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
331        assert!(filter.matches(&pk));
332    }
333}