Skip to main content

mito_codec/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::data_type::ConcreteDataType;
20use datatypes::value::Value;
21use memcomparable::Serializer;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
25use store_api::storage::ColumnId;
26
27use crate::error::{EvaluateFilterSnafu, Result};
28use crate::row_converter::{
29    DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec,
30};
31
32/// Returns true if this is a partition column for metrics in the memtable.
33pub fn is_partition_column(name: &str) -> bool {
34    name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
35}
36
37#[derive(Clone)]
38struct PrimaryKeyFilterInner {
39    filters: Arc<Vec<SimpleFilterEvaluator>>,
40    compiled_filters: Vec<CompiledPrimaryKeyFilter>,
41}
42
43impl PrimaryKeyFilterInner {
44    fn new(
45        metadata: RegionMetadataRef,
46        filters: Arc<Vec<SimpleFilterEvaluator>>,
47        skip_partition_column: bool,
48    ) -> Self {
49        let compiled_filters = Self::compile_filters(&metadata, &filters, skip_partition_column);
50        Self {
51            filters,
52            compiled_filters,
53        }
54    }
55
56    fn compile_filters(
57        metadata: &RegionMetadataRef,
58        filters: &[SimpleFilterEvaluator],
59        skip_partition_column: bool,
60    ) -> Vec<CompiledPrimaryKeyFilter> {
61        if filters.is_empty() || metadata.primary_key.is_empty() {
62            return Vec::new();
63        }
64
65        let mut compiled_filters = Vec::with_capacity(filters.len());
66        for (filter_idx, filter) in filters.iter().enumerate() {
67            if skip_partition_column && is_partition_column(filter.column_name()) {
68                continue;
69            }
70
71            let Some(column) = metadata.column_by_name(filter.column_name()) else {
72                continue;
73            };
74            // Ignore filters that are not referencing primary key tag columns.
75            if column.semantic_type != SemanticType::Tag {
76                continue;
77            }
78
79            // Safety: A tag column is always in primary key.
80            let Some(pk_index) = metadata.primary_key_index(column.column_id) else {
81                continue;
82            };
83
84            let data_type = column.column_schema.data_type.clone();
85            let fast_path = CompiledFastPath::try_new(filter, &data_type);
86
87            compiled_filters.push(CompiledPrimaryKeyFilter {
88                filter_idx,
89                column_id: column.column_id,
90                pk_index,
91                data_type,
92                fast_path,
93            });
94        }
95
96        compiled_filters
97    }
98
99    fn evaluate_filters<'a>(
100        &self,
101        accessor: &mut impl PrimaryKeyValueAccessor<'a>,
102    ) -> Result<bool> {
103        if self.compiled_filters.is_empty() {
104            return Ok(true);
105        }
106
107        for compiled in &self.compiled_filters {
108            let filter = &self.filters[compiled.filter_idx];
109
110            let passed = if let Some(fast_path) = &compiled.fast_path {
111                let encoded_value = accessor.encoded_value(compiled)?;
112                fast_path.matches(encoded_value)
113            } else {
114                let value = accessor.decode_value(compiled)?;
115
116                // Safety: arrow schema and datatypes are constructed from the same source.
117                let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
118                filter
119                    .evaluate_scalar(&scalar_value)
120                    .context(EvaluateFilterSnafu)?
121            };
122
123            if !passed {
124                return Ok(false);
125            }
126        }
127
128        Ok(true)
129    }
130}
131
132#[derive(Clone)]
133struct CompiledPrimaryKeyFilter {
134    filter_idx: usize,
135    column_id: ColumnId,
136    pk_index: usize,
137    data_type: ConcreteDataType,
138    fast_path: Option<CompiledFastPath>,
139}
140
141#[derive(Clone)]
142enum CompiledFastPath {
143    Eq(Vec<u8>),
144    NotEq(Vec<u8>),
145    Lt(Vec<u8>),
146    LtEq(Vec<u8>),
147    Gt(Vec<u8>),
148    GtEq(Vec<u8>),
149    InList(Vec<Vec<u8>>),
150}
151
152impl CompiledFastPath {
153    fn try_new(filter: &SimpleFilterEvaluator, data_type: &ConcreteDataType) -> Option<Self> {
154        let field = SortField::new(data_type.clone());
155        // Safety guard: memcomparable defines a total order for floats that doesn't necessarily
156        // match DataFusion/Arrow float comparison semantics (e.g. `-0.0 == 0.0`).
157        if field.encode_data_type().is_float() {
158            return None;
159        }
160
161        let encoded = |value: &Value| -> Option<Vec<u8>> {
162            let mut buf = Vec::new();
163            let mut serializer = Serializer::new(&mut buf);
164            field
165                .serialize(&mut serializer, &value.as_value_ref())
166                .ok()?;
167            Some(buf)
168        };
169
170        if filter.is_eq() {
171            let value = filter.literal_value()?;
172            Some(Self::Eq(encoded(&value)?))
173        } else if filter.is_not_eq() {
174            let value = filter.literal_value()?;
175            Some(Self::NotEq(encoded(&value)?))
176        } else if filter.is_lt() {
177            let value = filter.literal_value()?;
178            Some(Self::Lt(encoded(&value)?))
179        } else if filter.is_lt_eq() {
180            let value = filter.literal_value()?;
181            Some(Self::LtEq(encoded(&value)?))
182        } else if filter.is_gt() {
183            let value = filter.literal_value()?;
184            Some(Self::Gt(encoded(&value)?))
185        } else if filter.is_gt_eq() {
186            let value = filter.literal_value()?;
187            Some(Self::GtEq(encoded(&value)?))
188        } else if filter.is_or_eq_chain() {
189            let values = filter.literal_list_values()?;
190            let mut list = Vec::with_capacity(values.len());
191            for value in values {
192                let bytes = encoded(&value)?;
193                // `NULL` never matches in equality comparisons (`NULL = x` is NULL).
194                if bytes.first() == Some(&0) {
195                    continue;
196                }
197                list.push(bytes);
198            }
199            Some(Self::InList(list))
200        } else {
201            None
202        }
203    }
204
205    fn matches(&self, encoded_value: Option<&[u8]>) -> bool {
206        let Some(encoded_value) = encoded_value else {
207            return false;
208        };
209
210        // `NULL` never matches in comparisons (`NULL = x` and `NULL != x` are NULL).
211        if encoded_value.first() == Some(&0) {
212            return false;
213        }
214
215        match self {
216            CompiledFastPath::Eq(encoded_literal) => {
217                encoded_literal.first() != Some(&0) && encoded_value == &encoded_literal[..]
218            }
219            CompiledFastPath::NotEq(encoded_literal) => {
220                encoded_literal.first() != Some(&0) && encoded_value != &encoded_literal[..]
221            }
222            CompiledFastPath::Lt(encoded_literal) => {
223                encoded_literal.first() != Some(&0) && encoded_value < &encoded_literal[..]
224            }
225            CompiledFastPath::LtEq(encoded_literal) => {
226                encoded_literal.first() != Some(&0) && encoded_value <= &encoded_literal[..]
227            }
228            CompiledFastPath::Gt(encoded_literal) => {
229                encoded_literal.first() != Some(&0) && encoded_value > &encoded_literal[..]
230            }
231            CompiledFastPath::GtEq(encoded_literal) => {
232                encoded_literal.first() != Some(&0) && encoded_value >= &encoded_literal[..]
233            }
234            CompiledFastPath::InList(encoded_literals) => {
235                encoded_literals.iter().any(|lit| encoded_value == &lit[..])
236            }
237        }
238    }
239}
240
241trait PrimaryKeyValueAccessor<'a> {
242    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>>;
243    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value>;
244}
245
246/// Dense primary key filter.
247#[derive(Clone)]
248pub struct DensePrimaryKeyFilter {
249    inner: PrimaryKeyFilterInner,
250    codec: DensePrimaryKeyCodec,
251    offsets_buf: Vec<usize>,
252}
253
254impl DensePrimaryKeyFilter {
255    pub(crate) fn new(
256        metadata: RegionMetadataRef,
257        filters: Arc<Vec<SimpleFilterEvaluator>>,
258        codec: DensePrimaryKeyCodec,
259        skip_partition_column: bool,
260    ) -> Self {
261        Self {
262            inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
263            codec,
264            offsets_buf: Vec::new(),
265        }
266    }
267}
268
269impl PrimaryKeyFilter for DensePrimaryKeyFilter {
270    fn matches(&mut self, pk: &[u8]) -> Result<bool> {
271        self.offsets_buf.clear();
272        let mut accessor = DensePrimaryKeyValueAccessor {
273            pk,
274            codec: &self.codec,
275            offsets_buf: &mut self.offsets_buf,
276        };
277        self.inner.evaluate_filters(&mut accessor)
278    }
279}
280
281struct DensePrimaryKeyValueAccessor<'a, 'b> {
282    pk: &'a [u8],
283    codec: &'b DensePrimaryKeyCodec,
284    offsets_buf: &'b mut Vec<usize>,
285}
286
287impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
288    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
289        self.codec
290            .encoded_value_at(self.pk, filter.pk_index, self.offsets_buf)
291            .map(Some)
292    }
293
294    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
295        self.codec
296            .decode_value_at(self.pk, filter.pk_index, self.offsets_buf)
297    }
298}
299
300/// Sparse primary key filter.
301#[derive(Clone)]
302pub struct SparsePrimaryKeyFilter {
303    inner: PrimaryKeyFilterInner,
304    codec: SparsePrimaryKeyCodec,
305    offsets_cache: SparseOffsetsCache,
306}
307
308impl SparsePrimaryKeyFilter {
309    pub(crate) fn new(
310        metadata: RegionMetadataRef,
311        filters: Arc<Vec<SimpleFilterEvaluator>>,
312        codec: SparsePrimaryKeyCodec,
313        skip_partition_column: bool,
314    ) -> Self {
315        Self {
316            inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
317            codec,
318            offsets_cache: SparseOffsetsCache::new(),
319        }
320    }
321}
322
323impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
324    fn matches(&mut self, pk: &[u8]) -> Result<bool> {
325        self.offsets_cache.clear();
326        let mut accessor = SparsePrimaryKeyValueAccessor {
327            pk,
328            codec: &self.codec,
329            offsets_cache: &mut self.offsets_cache,
330        };
331        self.inner.evaluate_filters(&mut accessor)
332    }
333}
334
335struct SparsePrimaryKeyValueAccessor<'a, 'b> {
336    pk: &'a [u8],
337    codec: &'b SparsePrimaryKeyCodec,
338    offsets_cache: &'b mut SparseOffsetsCache,
339}
340
341impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
342    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
343        self.codec
344            .encoded_value_for_column(self.pk, self.offsets_cache, filter.column_id)
345    }
346
347    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
348        if let Some(offset) = self
349            .codec
350            .has_column(self.pk, self.offsets_cache, filter.column_id)
351        {
352            self.codec
353                .decode_value_at(self.pk, offset, filter.column_id)
354        } else {
355            Ok(Value::Null)
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::sync::Arc;
363
364    use api::v1::SemanticType;
365    use common_query::prelude::{greptime_timestamp, greptime_value};
366    use datafusion_common::Column;
367    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
368    use datatypes::prelude::ConcreteDataType;
369    use datatypes::schema::ColumnSchema;
370    use datatypes::value::{OrderedFloat, ValueRef};
371    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
372    use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
373    use store_api::storage::{ColumnId, RegionId};
374
375    use super::*;
376    use crate::row_converter::PrimaryKeyCodecExt;
377
378    fn setup_metadata() -> RegionMetadataRef {
379        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
380        builder
381            .push_column_metadata(ColumnMetadata {
382                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
383                semantic_type: SemanticType::Tag,
384                column_id: 1,
385            })
386            .push_column_metadata(ColumnMetadata {
387                column_schema: ColumnSchema::new(
388                    "namespace",
389                    ConcreteDataType::string_datatype(),
390                    true,
391                ),
392                semantic_type: SemanticType::Tag,
393                column_id: 2,
394            })
395            .push_column_metadata(ColumnMetadata {
396                column_schema: ColumnSchema::new(
397                    "container",
398                    ConcreteDataType::string_datatype(),
399                    true,
400                ),
401                semantic_type: SemanticType::Tag,
402                column_id: 3,
403            })
404            .push_column_metadata(ColumnMetadata {
405                column_schema: ColumnSchema::new(
406                    greptime_value(),
407                    ConcreteDataType::float64_datatype(),
408                    false,
409                ),
410                semantic_type: SemanticType::Field,
411                column_id: 4,
412            })
413            .push_column_metadata(ColumnMetadata {
414                column_schema: ColumnSchema::new(
415                    greptime_timestamp(),
416                    ConcreteDataType::timestamp_nanosecond_datatype(),
417                    false,
418                ),
419                semantic_type: SemanticType::Timestamp,
420                column_id: 5,
421            })
422            .primary_key(vec![1, 2, 3]);
423        let metadata = builder.build().unwrap();
424        Arc::new(metadata)
425    }
426
427    fn setup_partitioned_metadata() -> RegionMetadataRef {
428        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
429        builder
430            .push_column_metadata(ColumnMetadata {
431                column_schema: ColumnSchema::new(
432                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
433                    ConcreteDataType::uint32_datatype(),
434                    false,
435                ),
436                semantic_type: SemanticType::Tag,
437                column_id: 10,
438            })
439            .push_column_metadata(ColumnMetadata {
440                column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
441                semantic_type: SemanticType::Tag,
442                column_id: 1,
443            })
444            .push_column_metadata(ColumnMetadata {
445                column_schema: ColumnSchema::new(
446                    greptime_timestamp(),
447                    ConcreteDataType::timestamp_nanosecond_datatype(),
448                    false,
449                ),
450                semantic_type: SemanticType::Timestamp,
451                column_id: 2,
452            })
453            .primary_key(vec![10, 1]);
454        Arc::new(builder.build().unwrap())
455    }
456
457    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
458        vec![
459            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
460            (2, ValueRef::String("greptime-cluster")),
461            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
462        ]
463    }
464
465    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
466        create_filter_with_op(column_name, Operator::Eq, value)
467    }
468
469    fn create_filter_with_op<T: Literal>(
470        column_name: &str,
471        op: Operator,
472        value: T,
473    ) -> SimpleFilterEvaluator {
474        let expr = Expr::BinaryExpr(BinaryExpr {
475            left: Box::new(Expr::Column(Column::from_name(column_name))),
476            op,
477            right: Box::new(value.lit()),
478        });
479        SimpleFilterEvaluator::try_new(&expr).unwrap()
480    }
481
482    fn encode_sparse_pk(
483        metadata: &RegionMetadataRef,
484        table_id: u32,
485        tsid: u64,
486        row: Vec<(ColumnId, ValueRef<'static>)>,
487    ) -> Vec<u8> {
488        let codec = SparsePrimaryKeyCodec::new(metadata);
489        let mut pk = Vec::new();
490        codec.encode_internal(table_id, tsid, &mut pk).unwrap();
491        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
492        pk
493    }
494
495    fn encode_dense_pk(
496        metadata: &RegionMetadataRef,
497        row: Vec<(ColumnId, ValueRef<'static>)>,
498    ) -> Vec<u8> {
499        let codec = DensePrimaryKeyCodec::new(metadata);
500        let mut pk = Vec::new();
501        codec
502            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
503            .unwrap();
504        pk
505    }
506
507    #[test]
508    fn test_sparse_primary_key_filter_matches() {
509        let metadata = setup_metadata();
510        let filters = Arc::new(vec![create_filter(
511            "pod",
512            "greptime-frontend-6989d9899-22222",
513        )]);
514        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
515        let codec = SparsePrimaryKeyCodec::new(&metadata);
516        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
517        assert!(filter.matches(&pk).unwrap());
518    }
519
520    #[test]
521    fn test_sparse_primary_key_filter_not_matches() {
522        let metadata = setup_metadata();
523        let filters = Arc::new(vec![create_filter(
524            "pod",
525            "greptime-frontend-6989d9899-22223",
526        )]);
527        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
528        let codec = SparsePrimaryKeyCodec::new(&metadata);
529        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
530        assert!(!filter.matches(&pk).unwrap());
531    }
532
533    #[test]
534    fn test_sparse_primary_key_filter_matches_with_null() {
535        let metadata = setup_metadata();
536        let filters = Arc::new(vec![create_filter(
537            "non-exist-label",
538            "greptime-frontend-6989d9899-22222",
539        )]);
540        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
541        let codec = SparsePrimaryKeyCodec::new(&metadata);
542        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
543        assert!(filter.matches(&pk).unwrap());
544    }
545
546    #[test]
547    fn test_dense_primary_key_filter_matches() {
548        let metadata = setup_metadata();
549        let filters = Arc::new(vec![create_filter(
550            "pod",
551            "greptime-frontend-6989d9899-22222",
552        )]);
553        let pk = encode_dense_pk(&metadata, create_test_row());
554        let codec = DensePrimaryKeyCodec::new(&metadata);
555        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
556        assert!(filter.matches(&pk).unwrap());
557    }
558
559    #[test]
560    fn test_dense_primary_key_filter_not_matches() {
561        let metadata = setup_metadata();
562        let filters = Arc::new(vec![create_filter(
563            "pod",
564            "greptime-frontend-6989d9899-22223",
565        )]);
566        let pk = encode_dense_pk(&metadata, create_test_row());
567        let codec = DensePrimaryKeyCodec::new(&metadata);
568        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
569        assert!(!filter.matches(&pk).unwrap());
570    }
571
572    #[test]
573    fn test_dense_primary_key_filter_matches_with_null() {
574        let metadata = setup_metadata();
575        let filters = Arc::new(vec![create_filter(
576            "non-exist-label",
577            "greptime-frontend-6989d9899-22222",
578        )]);
579        let pk = encode_dense_pk(&metadata, create_test_row());
580        let codec = DensePrimaryKeyCodec::new(&metadata);
581        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
582        assert!(filter.matches(&pk).unwrap());
583    }
584
585    #[test]
586    fn test_dense_primary_key_filter_order_ops() {
587        let metadata = setup_metadata();
588        let pk = encode_dense_pk(&metadata, create_test_row());
589        let codec = DensePrimaryKeyCodec::new(&metadata);
590
591        let cases = [
592            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
593            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
594            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
595            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
596        ];
597
598        for (op, value, expected) in cases {
599            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
600            let mut filter =
601                DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
602            assert_eq!(expected, filter.matches(&pk).unwrap());
603        }
604    }
605
606    #[test]
607    fn test_sparse_primary_key_filter_order_ops() {
608        let metadata = setup_metadata();
609        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
610        let codec = SparsePrimaryKeyCodec::new(&metadata);
611
612        let cases = [
613            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
614            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
615            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
616            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
617        ];
618
619        for (op, value, expected) in cases {
620            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
621            let mut filter =
622                SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
623            assert_eq!(expected, filter.matches(&pk).unwrap());
624        }
625    }
626
627    #[test]
628    fn test_dense_primary_key_filter_float_eq_fallback() {
629        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
630        builder
631            .push_column_metadata(ColumnMetadata {
632                column_schema: ColumnSchema::new("f", ConcreteDataType::float64_datatype(), true),
633                semantic_type: SemanticType::Tag,
634                column_id: 1,
635            })
636            .push_column_metadata(ColumnMetadata {
637                column_schema: ColumnSchema::new(
638                    greptime_timestamp(),
639                    ConcreteDataType::timestamp_nanosecond_datatype(),
640                    false,
641                ),
642                semantic_type: SemanticType::Timestamp,
643                column_id: 2,
644            })
645            .primary_key(vec![1]);
646        let metadata = Arc::new(builder.build().unwrap());
647
648        let codec = DensePrimaryKeyCodec::new(&metadata);
649        let mut pk = Vec::new();
650        codec
651            .encode_to_vec([ValueRef::Float64(OrderedFloat(-0.0))].into_iter(), &mut pk)
652            .unwrap();
653
654        let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
655        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
656
657        assert!(filter.matches(&pk).unwrap());
658    }
659
660    #[test]
661    fn test_dense_primary_key_filter_matches_partition_column_by_default() {
662        let metadata = setup_partitioned_metadata();
663        let codec = DensePrimaryKeyCodec::new(&metadata);
664        let mut pk = Vec::new();
665        codec
666            .encode_to_vec(
667                [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
668                &mut pk,
669            )
670            .unwrap();
671
672        let filters = Arc::new(vec![create_filter_with_op(
673            DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
674            Operator::Eq,
675            42_u32,
676        )]);
677        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
678
679        assert!(filter.matches(&pk).unwrap());
680    }
681
682    #[test]
683    fn test_dense_primary_key_filter_can_skip_partition_column() {
684        let metadata = setup_partitioned_metadata();
685        let codec = DensePrimaryKeyCodec::new(&metadata);
686        let mut pk = Vec::new();
687        codec
688            .encode_to_vec(
689                [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
690                &mut pk,
691            )
692            .unwrap();
693
694        let filters = Arc::new(vec![create_filter_with_op(
695            DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
696            Operator::Eq,
697            7_u32,
698        )]);
699        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, true);
700
701        assert!(filter.matches(&pk).unwrap());
702    }
703}