mito_codec/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use memcomparable::Serializer;
23use store_api::metadata::RegionMetadataRef;
24use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
25use store_api::storage::ColumnId;
26
27use crate::error::Result;
28use crate::row_converter::{
29    DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec,
30};
31
32/// Returns true if this is a partition column for metrics in the memtable.
33pub fn is_partition_column(name: &str) -> bool {
34    name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
35}
36
37#[derive(Clone)]
38struct PrimaryKeyFilterInner {
39    filters: Arc<Vec<SimpleFilterEvaluator>>,
40    compiled_filters: Vec<CompiledPrimaryKeyFilter>,
41}
42
43impl PrimaryKeyFilterInner {
44    fn new(metadata: RegionMetadataRef, filters: Arc<Vec<SimpleFilterEvaluator>>) -> Self {
45        let compiled_filters = Self::compile_filters(&metadata, &filters);
46        Self {
47            filters,
48            compiled_filters,
49        }
50    }
51
52    fn compile_filters(
53        metadata: &RegionMetadataRef,
54        filters: &[SimpleFilterEvaluator],
55    ) -> Vec<CompiledPrimaryKeyFilter> {
56        if filters.is_empty() || metadata.primary_key.is_empty() {
57            return Vec::new();
58        }
59
60        let mut compiled_filters = Vec::with_capacity(filters.len());
61        for (filter_idx, filter) in filters.iter().enumerate() {
62            if is_partition_column(filter.column_name()) {
63                continue;
64            }
65
66            let Some(column) = metadata.column_by_name(filter.column_name()) else {
67                continue;
68            };
69            // Ignore filters that are not referencing primary key tag columns.
70            if column.semantic_type != SemanticType::Tag {
71                continue;
72            }
73
74            // Safety: A tag column is always in primary key.
75            let Some(pk_index) = metadata.primary_key_index(column.column_id) else {
76                continue;
77            };
78
79            let data_type = column.column_schema.data_type.clone();
80            let fast_path = CompiledFastPath::try_new(filter, &data_type);
81
82            compiled_filters.push(CompiledPrimaryKeyFilter {
83                filter_idx,
84                column_id: column.column_id,
85                pk_index,
86                data_type,
87                fast_path,
88            });
89        }
90
91        compiled_filters
92    }
93
94    fn evaluate_filters<'a>(&self, accessor: &mut impl PrimaryKeyValueAccessor<'a>) -> bool {
95        if self.compiled_filters.is_empty() {
96            return true;
97        }
98
99        for compiled in &self.compiled_filters {
100            let filter = &self.filters[compiled.filter_idx];
101
102            let passed = if let Some(fast_path) = &compiled.fast_path {
103                let encoded_value = match accessor.encoded_value(compiled) {
104                    Ok(v) => v,
105                    Err(e) => {
106                        common_telemetry::error!(e; "Failed to decode primary key");
107                        return true;
108                    }
109                };
110                fast_path.matches(encoded_value)
111            } else {
112                let value = match accessor.decode_value(compiled) {
113                    Ok(v) => v,
114                    Err(e) => {
115                        common_telemetry::error!(e; "Failed to decode primary key");
116                        return true;
117                    }
118                };
119
120                // Safety: arrow schema and datatypes are constructed from the same source.
121                let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
122                filter.evaluate_scalar(&scalar_value).unwrap_or(true)
123            };
124
125            if !passed {
126                return false;
127            }
128        }
129
130        true
131    }
132}
133
134#[derive(Clone)]
135struct CompiledPrimaryKeyFilter {
136    filter_idx: usize,
137    column_id: ColumnId,
138    pk_index: usize,
139    data_type: ConcreteDataType,
140    fast_path: Option<CompiledFastPath>,
141}
142
143#[derive(Clone)]
144enum CompiledFastPath {
145    Eq(Vec<u8>),
146    NotEq(Vec<u8>),
147    Lt(Vec<u8>),
148    LtEq(Vec<u8>),
149    Gt(Vec<u8>),
150    GtEq(Vec<u8>),
151    InList(Vec<Vec<u8>>),
152}
153
154impl CompiledFastPath {
155    fn try_new(filter: &SimpleFilterEvaluator, data_type: &ConcreteDataType) -> Option<Self> {
156        let field = SortField::new(data_type.clone());
157        // Safety guard: memcomparable defines a total order for floats that doesn't necessarily
158        // match DataFusion/Arrow float comparison semantics (e.g. `-0.0 == 0.0`).
159        if field.encode_data_type().is_float() {
160            return None;
161        }
162
163        let encoded = |value: &Value| -> Option<Vec<u8>> {
164            let mut buf = Vec::new();
165            let mut serializer = Serializer::new(&mut buf);
166            field
167                .serialize(&mut serializer, &value.as_value_ref())
168                .ok()?;
169            Some(buf)
170        };
171
172        if filter.is_eq() {
173            let value = filter.literal_value()?;
174            Some(Self::Eq(encoded(&value)?))
175        } else if filter.is_not_eq() {
176            let value = filter.literal_value()?;
177            Some(Self::NotEq(encoded(&value)?))
178        } else if filter.is_lt() {
179            let value = filter.literal_value()?;
180            Some(Self::Lt(encoded(&value)?))
181        } else if filter.is_lt_eq() {
182            let value = filter.literal_value()?;
183            Some(Self::LtEq(encoded(&value)?))
184        } else if filter.is_gt() {
185            let value = filter.literal_value()?;
186            Some(Self::Gt(encoded(&value)?))
187        } else if filter.is_gt_eq() {
188            let value = filter.literal_value()?;
189            Some(Self::GtEq(encoded(&value)?))
190        } else if filter.is_or_eq_chain() {
191            let values = filter.literal_list_values()?;
192            let mut list = Vec::with_capacity(values.len());
193            for value in values {
194                let bytes = encoded(&value)?;
195                // `NULL` never matches in equality comparisons (`NULL = x` is NULL).
196                if bytes.first() == Some(&0) {
197                    continue;
198                }
199                list.push(bytes);
200            }
201            Some(Self::InList(list))
202        } else {
203            None
204        }
205    }
206
207    fn matches(&self, encoded_value: Option<&[u8]>) -> bool {
208        let Some(encoded_value) = encoded_value else {
209            return false;
210        };
211
212        // `NULL` never matches in comparisons (`NULL = x` and `NULL != x` are NULL).
213        if encoded_value.first() == Some(&0) {
214            return false;
215        }
216
217        match self {
218            CompiledFastPath::Eq(encoded_literal) => {
219                encoded_literal.first() != Some(&0) && encoded_value == &encoded_literal[..]
220            }
221            CompiledFastPath::NotEq(encoded_literal) => {
222                encoded_literal.first() != Some(&0) && encoded_value != &encoded_literal[..]
223            }
224            CompiledFastPath::Lt(encoded_literal) => {
225                encoded_literal.first() != Some(&0) && encoded_value < &encoded_literal[..]
226            }
227            CompiledFastPath::LtEq(encoded_literal) => {
228                encoded_literal.first() != Some(&0) && encoded_value <= &encoded_literal[..]
229            }
230            CompiledFastPath::Gt(encoded_literal) => {
231                encoded_literal.first() != Some(&0) && encoded_value > &encoded_literal[..]
232            }
233            CompiledFastPath::GtEq(encoded_literal) => {
234                encoded_literal.first() != Some(&0) && encoded_value >= &encoded_literal[..]
235            }
236            CompiledFastPath::InList(encoded_literals) => {
237                encoded_literals.iter().any(|lit| encoded_value == &lit[..])
238            }
239        }
240    }
241}
242
243trait PrimaryKeyValueAccessor<'a> {
244    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>>;
245    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value>;
246}
247
248/// Dense primary key filter.
249#[derive(Clone)]
250pub struct DensePrimaryKeyFilter {
251    inner: PrimaryKeyFilterInner,
252    codec: DensePrimaryKeyCodec,
253    offsets_buf: Vec<usize>,
254}
255
256impl DensePrimaryKeyFilter {
257    pub(crate) fn new(
258        metadata: RegionMetadataRef,
259        filters: Arc<Vec<SimpleFilterEvaluator>>,
260        codec: DensePrimaryKeyCodec,
261    ) -> Self {
262        Self {
263            inner: PrimaryKeyFilterInner::new(metadata, filters),
264            codec,
265            offsets_buf: Vec::new(),
266        }
267    }
268}
269
270impl PrimaryKeyFilter for DensePrimaryKeyFilter {
271    fn matches(&mut self, pk: &[u8]) -> bool {
272        self.offsets_buf.clear();
273        let mut accessor = DensePrimaryKeyValueAccessor {
274            pk,
275            codec: &self.codec,
276            offsets_buf: &mut self.offsets_buf,
277        };
278        self.inner.evaluate_filters(&mut accessor)
279    }
280}
281
282struct DensePrimaryKeyValueAccessor<'a, 'b> {
283    pk: &'a [u8],
284    codec: &'b DensePrimaryKeyCodec,
285    offsets_buf: &'b mut Vec<usize>,
286}
287
288impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
289    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
290        self.codec
291            .encoded_value_at(self.pk, filter.pk_index, self.offsets_buf)
292            .map(Some)
293    }
294
295    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
296        self.codec
297            .decode_value_at(self.pk, filter.pk_index, self.offsets_buf)
298    }
299}
300
301/// Sparse primary key filter.
302#[derive(Clone)]
303pub struct SparsePrimaryKeyFilter {
304    inner: PrimaryKeyFilterInner,
305    codec: SparsePrimaryKeyCodec,
306    offsets_map: HashMap<ColumnId, usize>,
307}
308
309impl SparsePrimaryKeyFilter {
310    pub(crate) fn new(
311        metadata: RegionMetadataRef,
312        filters: Arc<Vec<SimpleFilterEvaluator>>,
313        codec: SparsePrimaryKeyCodec,
314    ) -> Self {
315        Self {
316            inner: PrimaryKeyFilterInner::new(metadata, filters),
317            codec,
318            offsets_map: HashMap::new(),
319        }
320    }
321}
322
323impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
324    fn matches(&mut self, pk: &[u8]) -> bool {
325        self.offsets_map.clear();
326        let mut accessor = SparsePrimaryKeyValueAccessor {
327            pk,
328            codec: &self.codec,
329            offsets_map: &mut self.offsets_map,
330        };
331        self.inner.evaluate_filters(&mut accessor)
332    }
333}
334
335struct SparsePrimaryKeyValueAccessor<'a, 'b> {
336    pk: &'a [u8],
337    codec: &'b SparsePrimaryKeyCodec,
338    offsets_map: &'b mut HashMap<ColumnId, usize>,
339}
340
341impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
342    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
343        self.codec
344            .encoded_value_for_column(self.pk, self.offsets_map, filter.column_id)
345    }
346
347    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
348        if let Some(offset) = self
349            .codec
350            .has_column(self.pk, self.offsets_map, filter.column_id)
351        {
352            self.codec
353                .decode_value_at(self.pk, offset, filter.column_id)
354        } else {
355            Ok(Value::Null)
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::sync::Arc;
363
364    use api::v1::SemanticType;
365    use common_query::prelude::{greptime_timestamp, greptime_value};
366    use datafusion_common::Column;
367    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
368    use datatypes::prelude::ConcreteDataType;
369    use datatypes::schema::ColumnSchema;
370    use datatypes::value::{OrderedFloat, ValueRef};
371    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
372    use store_api::storage::{ColumnId, RegionId};
373
374    use super::*;
375    use crate::row_converter::PrimaryKeyCodecExt;
376
377    fn setup_metadata() -> RegionMetadataRef {
378        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
379        builder
380            .push_column_metadata(ColumnMetadata {
381                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
382                semantic_type: SemanticType::Tag,
383                column_id: 1,
384            })
385            .push_column_metadata(ColumnMetadata {
386                column_schema: ColumnSchema::new(
387                    "namespace",
388                    ConcreteDataType::string_datatype(),
389                    true,
390                ),
391                semantic_type: SemanticType::Tag,
392                column_id: 2,
393            })
394            .push_column_metadata(ColumnMetadata {
395                column_schema: ColumnSchema::new(
396                    "container",
397                    ConcreteDataType::string_datatype(),
398                    true,
399                ),
400                semantic_type: SemanticType::Tag,
401                column_id: 3,
402            })
403            .push_column_metadata(ColumnMetadata {
404                column_schema: ColumnSchema::new(
405                    greptime_value(),
406                    ConcreteDataType::float64_datatype(),
407                    false,
408                ),
409                semantic_type: SemanticType::Field,
410                column_id: 4,
411            })
412            .push_column_metadata(ColumnMetadata {
413                column_schema: ColumnSchema::new(
414                    greptime_timestamp(),
415                    ConcreteDataType::timestamp_nanosecond_datatype(),
416                    false,
417                ),
418                semantic_type: SemanticType::Timestamp,
419                column_id: 5,
420            })
421            .primary_key(vec![1, 2, 3]);
422        let metadata = builder.build().unwrap();
423        Arc::new(metadata)
424    }
425
426    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
427        vec![
428            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
429            (2, ValueRef::String("greptime-cluster")),
430            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
431        ]
432    }
433
434    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
435        create_filter_with_op(column_name, Operator::Eq, value)
436    }
437
438    fn create_filter_with_op<T: Literal>(
439        column_name: &str,
440        op: Operator,
441        value: T,
442    ) -> SimpleFilterEvaluator {
443        let expr = Expr::BinaryExpr(BinaryExpr {
444            left: Box::new(Expr::Column(Column::from_name(column_name))),
445            op,
446            right: Box::new(value.lit()),
447        });
448        SimpleFilterEvaluator::try_new(&expr).unwrap()
449    }
450
451    fn encode_sparse_pk(
452        metadata: &RegionMetadataRef,
453        row: Vec<(ColumnId, ValueRef<'static>)>,
454    ) -> Vec<u8> {
455        let codec = SparsePrimaryKeyCodec::new(metadata);
456        let mut pk = Vec::new();
457        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
458        pk
459    }
460
461    fn encode_dense_pk(
462        metadata: &RegionMetadataRef,
463        row: Vec<(ColumnId, ValueRef<'static>)>,
464    ) -> Vec<u8> {
465        let codec = DensePrimaryKeyCodec::new(metadata);
466        let mut pk = Vec::new();
467        codec
468            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
469            .unwrap();
470        pk
471    }
472
473    #[test]
474    fn test_sparse_primary_key_filter_matches() {
475        let metadata = setup_metadata();
476        let filters = Arc::new(vec![create_filter(
477            "pod",
478            "greptime-frontend-6989d9899-22222",
479        )]);
480        let pk = encode_sparse_pk(&metadata, create_test_row());
481        let codec = SparsePrimaryKeyCodec::new(&metadata);
482        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
483        assert!(filter.matches(&pk));
484    }
485
486    #[test]
487    fn test_sparse_primary_key_filter_not_matches() {
488        let metadata = setup_metadata();
489        let filters = Arc::new(vec![create_filter(
490            "pod",
491            "greptime-frontend-6989d9899-22223",
492        )]);
493        let pk = encode_sparse_pk(&metadata, create_test_row());
494        let codec = SparsePrimaryKeyCodec::new(&metadata);
495        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
496        assert!(!filter.matches(&pk));
497    }
498
499    #[test]
500    fn test_sparse_primary_key_filter_matches_with_null() {
501        let metadata = setup_metadata();
502        let filters = Arc::new(vec![create_filter(
503            "non-exist-label",
504            "greptime-frontend-6989d9899-22222",
505        )]);
506        let pk = encode_sparse_pk(&metadata, create_test_row());
507        let codec = SparsePrimaryKeyCodec::new(&metadata);
508        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
509        assert!(filter.matches(&pk));
510    }
511
512    #[test]
513    fn test_dense_primary_key_filter_matches() {
514        let metadata = setup_metadata();
515        let filters = Arc::new(vec![create_filter(
516            "pod",
517            "greptime-frontend-6989d9899-22222",
518        )]);
519        let pk = encode_dense_pk(&metadata, create_test_row());
520        let codec = DensePrimaryKeyCodec::new(&metadata);
521        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
522        assert!(filter.matches(&pk));
523    }
524
525    #[test]
526    fn test_dense_primary_key_filter_not_matches() {
527        let metadata = setup_metadata();
528        let filters = Arc::new(vec![create_filter(
529            "pod",
530            "greptime-frontend-6989d9899-22223",
531        )]);
532        let pk = encode_dense_pk(&metadata, create_test_row());
533        let codec = DensePrimaryKeyCodec::new(&metadata);
534        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
535        assert!(!filter.matches(&pk));
536    }
537
538    #[test]
539    fn test_dense_primary_key_filter_matches_with_null() {
540        let metadata = setup_metadata();
541        let filters = Arc::new(vec![create_filter(
542            "non-exist-label",
543            "greptime-frontend-6989d9899-22222",
544        )]);
545        let pk = encode_dense_pk(&metadata, create_test_row());
546        let codec = DensePrimaryKeyCodec::new(&metadata);
547        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
548        assert!(filter.matches(&pk));
549    }
550
551    #[test]
552    fn test_dense_primary_key_filter_order_ops() {
553        let metadata = setup_metadata();
554        let pk = encode_dense_pk(&metadata, create_test_row());
555        let codec = DensePrimaryKeyCodec::new(&metadata);
556
557        let cases = [
558            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
559            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
560            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
561            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
562        ];
563
564        for (op, value, expected) in cases {
565            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
566            let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
567            assert_eq!(expected, filter.matches(&pk));
568        }
569    }
570
571    #[test]
572    fn test_sparse_primary_key_filter_order_ops() {
573        let metadata = setup_metadata();
574        let pk = encode_sparse_pk(&metadata, create_test_row());
575        let codec = SparsePrimaryKeyCodec::new(&metadata);
576
577        let cases = [
578            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
579            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
580            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
581            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
582        ];
583
584        for (op, value, expected) in cases {
585            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
586            let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
587            assert_eq!(expected, filter.matches(&pk));
588        }
589    }
590
591    #[test]
592    fn test_dense_primary_key_filter_float_eq_fallback() {
593        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
594        builder
595            .push_column_metadata(ColumnMetadata {
596                column_schema: ColumnSchema::new("f", ConcreteDataType::float64_datatype(), true),
597                semantic_type: SemanticType::Tag,
598                column_id: 1,
599            })
600            .push_column_metadata(ColumnMetadata {
601                column_schema: ColumnSchema::new(
602                    greptime_timestamp(),
603                    ConcreteDataType::timestamp_nanosecond_datatype(),
604                    false,
605                ),
606                semantic_type: SemanticType::Timestamp,
607                column_id: 2,
608            })
609            .primary_key(vec![1]);
610        let metadata = Arc::new(builder.build().unwrap());
611
612        let codec = DensePrimaryKeyCodec::new(&metadata);
613        let mut pk = Vec::new();
614        codec
615            .encode_to_vec([ValueRef::Float64(OrderedFloat(-0.0))].into_iter(), &mut pk)
616            .unwrap();
617
618        let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
619        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
620
621        assert!(filter.matches(&pk));
622    }
623}