mito_codec/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::storage::ColumnId;
24
25use crate::error::Result;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28/// Returns true if this is a partition column for metrics in the memtable.
29pub fn is_partition_column(name: &str) -> bool {
30    name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
31}
32
33#[derive(Clone)]
34struct PrimaryKeyFilterInner {
35    metadata: RegionMetadataRef,
36    filters: Arc<Vec<SimpleFilterEvaluator>>,
37}
38
39impl PrimaryKeyFilterInner {
40    fn evaluate_filters(
41        &self,
42        mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
43    ) -> bool {
44        if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
45            return true;
46        }
47
48        let mut result = true;
49        for filter in self.filters.iter() {
50            if is_partition_column(filter.column_name()) {
51                continue;
52            }
53            let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
54                continue;
55            };
56            // ignore filters that are not referencing primary key columns
57            if column.semantic_type != SemanticType::Tag {
58                continue;
59            }
60
61            let value = match decode_fn(column.column_id, &self.metadata) {
62                Ok(v) => v,
63                Err(e) => {
64                    common_telemetry::error!(e; "Failed to decode primary key");
65                    return true;
66                }
67            };
68
69            // TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We
70            // can compare the bytes directly without allocation and matching types as we use
71            // comparable encoding.
72            // Safety: arrow schema and datatypes are constructed from the same source.
73            let scalar_value = value
74                .try_to_scalar_value(&column.column_schema.data_type)
75                .unwrap();
76            result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
77        }
78
79        result
80    }
81}
82
83/// Dense primary key filter.
84#[derive(Clone)]
85pub struct DensePrimaryKeyFilter {
86    inner: PrimaryKeyFilterInner,
87    codec: DensePrimaryKeyCodec,
88    offsets_buf: Vec<usize>,
89}
90
91impl DensePrimaryKeyFilter {
92    pub(crate) fn new(
93        metadata: RegionMetadataRef,
94        filters: Arc<Vec<SimpleFilterEvaluator>>,
95        codec: DensePrimaryKeyCodec,
96    ) -> Self {
97        Self {
98            inner: PrimaryKeyFilterInner { metadata, filters },
99            codec,
100            offsets_buf: Vec::new(),
101        }
102    }
103}
104
105impl PrimaryKeyFilter for DensePrimaryKeyFilter {
106    fn matches(&mut self, pk: &[u8]) -> bool {
107        self.offsets_buf.clear();
108        self.inner.evaluate_filters(|column_id, metadata| {
109            // index of tag column in primary key
110            // Safety: A tag column is always in primary key.
111            let index = metadata.primary_key_index(column_id).unwrap();
112            self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
113        })
114    }
115}
116
117/// Sparse primary key filter.
118#[derive(Clone)]
119pub struct SparsePrimaryKeyFilter {
120    inner: PrimaryKeyFilterInner,
121    codec: SparsePrimaryKeyCodec,
122    offsets_map: HashMap<ColumnId, usize>,
123}
124
125impl SparsePrimaryKeyFilter {
126    pub(crate) fn new(
127        metadata: RegionMetadataRef,
128        filters: Arc<Vec<SimpleFilterEvaluator>>,
129        codec: SparsePrimaryKeyCodec,
130    ) -> Self {
131        Self {
132            inner: PrimaryKeyFilterInner { metadata, filters },
133            codec,
134            offsets_map: HashMap::new(),
135        }
136    }
137}
138
139impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
140    fn matches(&mut self, pk: &[u8]) -> bool {
141        self.offsets_map.clear();
142        self.inner.evaluate_filters(|column_id, _| {
143            if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
144                self.codec.decode_value_at(pk, offset, column_id)
145            } else {
146                Ok(Value::Null)
147            }
148        })
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::sync::Arc;
155
156    use api::v1::SemanticType;
157    use common_query::prelude::{greptime_timestamp, greptime_value};
158    use datafusion_common::Column;
159    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
160    use datatypes::prelude::ConcreteDataType;
161    use datatypes::schema::ColumnSchema;
162    use datatypes::value::ValueRef;
163    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
164    use store_api::storage::{ColumnId, RegionId};
165
166    use super::*;
167    use crate::row_converter::PrimaryKeyCodecExt;
168
169    fn setup_metadata() -> RegionMetadataRef {
170        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
171        builder
172            .push_column_metadata(ColumnMetadata {
173                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
174                semantic_type: SemanticType::Tag,
175                column_id: 1,
176            })
177            .push_column_metadata(ColumnMetadata {
178                column_schema: ColumnSchema::new(
179                    "namespace",
180                    ConcreteDataType::string_datatype(),
181                    true,
182                ),
183                semantic_type: SemanticType::Tag,
184                column_id: 2,
185            })
186            .push_column_metadata(ColumnMetadata {
187                column_schema: ColumnSchema::new(
188                    "container",
189                    ConcreteDataType::string_datatype(),
190                    true,
191                ),
192                semantic_type: SemanticType::Tag,
193                column_id: 3,
194            })
195            .push_column_metadata(ColumnMetadata {
196                column_schema: ColumnSchema::new(
197                    greptime_value(),
198                    ConcreteDataType::float64_datatype(),
199                    false,
200                ),
201                semantic_type: SemanticType::Field,
202                column_id: 4,
203            })
204            .push_column_metadata(ColumnMetadata {
205                column_schema: ColumnSchema::new(
206                    greptime_timestamp(),
207                    ConcreteDataType::timestamp_nanosecond_datatype(),
208                    false,
209                ),
210                semantic_type: SemanticType::Timestamp,
211                column_id: 5,
212            })
213            .primary_key(vec![1, 2, 3]);
214        let metadata = builder.build().unwrap();
215        Arc::new(metadata)
216    }
217
218    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
219        vec![
220            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
221            (2, ValueRef::String("greptime-cluster")),
222            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
223        ]
224    }
225
226    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
227        let expr = Expr::BinaryExpr(BinaryExpr {
228            left: Box::new(Expr::Column(Column::from_name(column_name))),
229            op: Operator::Eq,
230            right: Box::new(value.lit()),
231        });
232        SimpleFilterEvaluator::try_new(&expr).unwrap()
233    }
234
235    fn encode_sparse_pk(
236        metadata: &RegionMetadataRef,
237        row: Vec<(ColumnId, ValueRef<'static>)>,
238    ) -> Vec<u8> {
239        let codec = SparsePrimaryKeyCodec::new(metadata);
240        let mut pk = Vec::new();
241        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
242        pk
243    }
244
245    fn encode_dense_pk(
246        metadata: &RegionMetadataRef,
247        row: Vec<(ColumnId, ValueRef<'static>)>,
248    ) -> Vec<u8> {
249        let codec = DensePrimaryKeyCodec::new(metadata);
250        let mut pk = Vec::new();
251        codec
252            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
253            .unwrap();
254        pk
255    }
256
257    #[test]
258    fn test_sparse_primary_key_filter_matches() {
259        let metadata = setup_metadata();
260        let filters = Arc::new(vec![create_filter(
261            "pod",
262            "greptime-frontend-6989d9899-22222",
263        )]);
264        let pk = encode_sparse_pk(&metadata, create_test_row());
265        let codec = SparsePrimaryKeyCodec::new(&metadata);
266        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
267        assert!(filter.matches(&pk));
268    }
269
270    #[test]
271    fn test_sparse_primary_key_filter_not_matches() {
272        let metadata = setup_metadata();
273        let filters = Arc::new(vec![create_filter(
274            "pod",
275            "greptime-frontend-6989d9899-22223",
276        )]);
277        let pk = encode_sparse_pk(&metadata, create_test_row());
278        let codec = SparsePrimaryKeyCodec::new(&metadata);
279        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
280        assert!(!filter.matches(&pk));
281    }
282
283    #[test]
284    fn test_sparse_primary_key_filter_matches_with_null() {
285        let metadata = setup_metadata();
286        let filters = Arc::new(vec![create_filter(
287            "non-exist-label",
288            "greptime-frontend-6989d9899-22222",
289        )]);
290        let pk = encode_sparse_pk(&metadata, create_test_row());
291        let codec = SparsePrimaryKeyCodec::new(&metadata);
292        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
293        assert!(filter.matches(&pk));
294    }
295
296    #[test]
297    fn test_dense_primary_key_filter_matches() {
298        let metadata = setup_metadata();
299        let filters = Arc::new(vec![create_filter(
300            "pod",
301            "greptime-frontend-6989d9899-22222",
302        )]);
303        let pk = encode_dense_pk(&metadata, create_test_row());
304        let codec = DensePrimaryKeyCodec::new(&metadata);
305        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
306        assert!(filter.matches(&pk));
307    }
308
309    #[test]
310    fn test_dense_primary_key_filter_not_matches() {
311        let metadata = setup_metadata();
312        let filters = Arc::new(vec![create_filter(
313            "pod",
314            "greptime-frontend-6989d9899-22223",
315        )]);
316        let pk = encode_dense_pk(&metadata, create_test_row());
317        let codec = DensePrimaryKeyCodec::new(&metadata);
318        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
319        assert!(!filter.matches(&pk));
320    }
321
322    #[test]
323    fn test_dense_primary_key_filter_matches_with_null() {
324        let metadata = setup_metadata();
325        let filters = Arc::new(vec![create_filter(
326            "non-exist-label",
327            "greptime-frontend-6989d9899-22222",
328        )]);
329        let pk = encode_dense_pk(&metadata, create_test_row());
330        let codec = DensePrimaryKeyCodec::new(&metadata);
331        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
332        assert!(filter.matches(&pk));
333    }
334}