Skip to main content

mito_codec/
primary_key_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::data_type::ConcreteDataType;
20use datatypes::value::Value;
21use memcomparable::Serializer;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::ColumnId;
25
26use crate::error::{EvaluateFilterSnafu, Result};
27use crate::row_converter::{
28    DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec,
29};
30
31#[derive(Clone)]
32struct PrimaryKeyFilterInner {
33    filters: Arc<Vec<SimpleFilterEvaluator>>,
34    compiled_filters: Vec<CompiledPrimaryKeyFilter>,
35}
36
37impl PrimaryKeyFilterInner {
38    fn new(metadata: RegionMetadataRef, filters: Arc<Vec<SimpleFilterEvaluator>>) -> Self {
39        let compiled_filters = Self::compile_filters(&metadata, &filters);
40        Self {
41            filters,
42            compiled_filters,
43        }
44    }
45
46    fn compile_filters(
47        metadata: &RegionMetadataRef,
48        filters: &[SimpleFilterEvaluator],
49    ) -> Vec<CompiledPrimaryKeyFilter> {
50        if filters.is_empty() || metadata.primary_key.is_empty() {
51            return Vec::new();
52        }
53
54        let mut compiled_filters = Vec::with_capacity(filters.len());
55        for (filter_idx, filter) in filters.iter().enumerate() {
56            let Some(column) = metadata.column_by_name(filter.column_name()) else {
57                continue;
58            };
59            // Ignore filters that are not referencing primary key tag columns.
60            if column.semantic_type != SemanticType::Tag {
61                continue;
62            }
63
64            // Safety: A tag column is always in primary key.
65            let Some(pk_index) = metadata.primary_key_index(column.column_id) else {
66                continue;
67            };
68
69            let data_type = column.column_schema.data_type.clone();
70            let fast_path = CompiledFastPath::try_new(filter, &data_type);
71
72            compiled_filters.push(CompiledPrimaryKeyFilter {
73                filter_idx,
74                column_id: column.column_id,
75                pk_index,
76                data_type,
77                fast_path,
78            });
79        }
80
81        compiled_filters
82    }
83
84    fn evaluate_filters<'a>(
85        &self,
86        accessor: &mut impl PrimaryKeyValueAccessor<'a>,
87    ) -> Result<bool> {
88        if self.compiled_filters.is_empty() {
89            return Ok(true);
90        }
91
92        for compiled in &self.compiled_filters {
93            let filter = &self.filters[compiled.filter_idx];
94
95            let passed = if let Some(fast_path) = &compiled.fast_path {
96                let encoded_value = accessor.encoded_value(compiled)?;
97                fast_path.matches(encoded_value)
98            } else {
99                let value = accessor.decode_value(compiled)?;
100
101                // Safety: arrow schema and datatypes are constructed from the same source.
102                let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
103                filter
104                    .evaluate_scalar(&scalar_value)
105                    .context(EvaluateFilterSnafu)?
106            };
107
108            if !passed {
109                return Ok(false);
110            }
111        }
112
113        Ok(true)
114    }
115}
116
117#[derive(Clone)]
118struct CompiledPrimaryKeyFilter {
119    filter_idx: usize,
120    column_id: ColumnId,
121    pk_index: usize,
122    data_type: ConcreteDataType,
123    fast_path: Option<CompiledFastPath>,
124}
125
126#[derive(Clone)]
127enum CompiledFastPath {
128    Eq(Vec<u8>),
129    NotEq(Vec<u8>),
130    Lt(Vec<u8>),
131    LtEq(Vec<u8>),
132    Gt(Vec<u8>),
133    GtEq(Vec<u8>),
134    InList(Vec<Vec<u8>>),
135}
136
137impl CompiledFastPath {
138    fn try_new(filter: &SimpleFilterEvaluator, data_type: &ConcreteDataType) -> Option<Self> {
139        let field = SortField::new(data_type.clone());
140        // Safety guard: memcomparable defines a total order for floats that doesn't necessarily
141        // match DataFusion/Arrow float comparison semantics (e.g. `-0.0 == 0.0`).
142        if field.encode_data_type().is_float() {
143            return None;
144        }
145
146        let encoded = |value: &Value| -> Option<Vec<u8>> {
147            let mut buf = Vec::new();
148            let mut serializer = Serializer::new(&mut buf);
149            field
150                .serialize(&mut serializer, &value.as_value_ref())
151                .ok()?;
152            Some(buf)
153        };
154
155        if filter.is_eq() {
156            let value = filter.literal_value()?;
157            Some(Self::Eq(encoded(&value)?))
158        } else if filter.is_not_eq() {
159            let value = filter.literal_value()?;
160            Some(Self::NotEq(encoded(&value)?))
161        } else if filter.is_lt() {
162            let value = filter.literal_value()?;
163            Some(Self::Lt(encoded(&value)?))
164        } else if filter.is_lt_eq() {
165            let value = filter.literal_value()?;
166            Some(Self::LtEq(encoded(&value)?))
167        } else if filter.is_gt() {
168            let value = filter.literal_value()?;
169            Some(Self::Gt(encoded(&value)?))
170        } else if filter.is_gt_eq() {
171            let value = filter.literal_value()?;
172            Some(Self::GtEq(encoded(&value)?))
173        } else if filter.is_or_eq_chain() {
174            let values = filter.literal_list_values()?;
175            let mut list = Vec::with_capacity(values.len());
176            for value in values {
177                let bytes = encoded(&value)?;
178                // `NULL` never matches in equality comparisons (`NULL = x` is NULL).
179                if bytes.first() == Some(&0) {
180                    continue;
181                }
182                list.push(bytes);
183            }
184            Some(Self::InList(list))
185        } else {
186            None
187        }
188    }
189
190    fn matches(&self, encoded_value: Option<&[u8]>) -> bool {
191        let Some(encoded_value) = encoded_value else {
192            return false;
193        };
194
195        // `NULL` never matches in comparisons (`NULL = x` and `NULL != x` are NULL).
196        if encoded_value.first() == Some(&0) {
197            return false;
198        }
199
200        match self {
201            CompiledFastPath::Eq(encoded_literal) => {
202                encoded_literal.first() != Some(&0) && encoded_value == &encoded_literal[..]
203            }
204            CompiledFastPath::NotEq(encoded_literal) => {
205                encoded_literal.first() != Some(&0) && encoded_value != &encoded_literal[..]
206            }
207            CompiledFastPath::Lt(encoded_literal) => {
208                encoded_literal.first() != Some(&0) && encoded_value < &encoded_literal[..]
209            }
210            CompiledFastPath::LtEq(encoded_literal) => {
211                encoded_literal.first() != Some(&0) && encoded_value <= &encoded_literal[..]
212            }
213            CompiledFastPath::Gt(encoded_literal) => {
214                encoded_literal.first() != Some(&0) && encoded_value > &encoded_literal[..]
215            }
216            CompiledFastPath::GtEq(encoded_literal) => {
217                encoded_literal.first() != Some(&0) && encoded_value >= &encoded_literal[..]
218            }
219            CompiledFastPath::InList(encoded_literals) => {
220                encoded_literals.iter().any(|lit| encoded_value == &lit[..])
221            }
222        }
223    }
224}
225
226trait PrimaryKeyValueAccessor<'a> {
227    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>>;
228    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value>;
229}
230
231/// Dense primary key filter.
232#[derive(Clone)]
233pub struct DensePrimaryKeyFilter {
234    inner: PrimaryKeyFilterInner,
235    codec: DensePrimaryKeyCodec,
236    offsets_buf: Vec<usize>,
237}
238
239impl DensePrimaryKeyFilter {
240    pub(crate) fn new(
241        metadata: RegionMetadataRef,
242        filters: Arc<Vec<SimpleFilterEvaluator>>,
243        codec: DensePrimaryKeyCodec,
244    ) -> Self {
245        Self {
246            inner: PrimaryKeyFilterInner::new(metadata, filters),
247            codec,
248            offsets_buf: Vec::new(),
249        }
250    }
251}
252
253impl PrimaryKeyFilter for DensePrimaryKeyFilter {
254    fn matches(&mut self, pk: &[u8]) -> Result<bool> {
255        self.offsets_buf.clear();
256        let mut accessor = DensePrimaryKeyValueAccessor {
257            pk,
258            codec: &self.codec,
259            offsets_buf: &mut self.offsets_buf,
260        };
261        self.inner.evaluate_filters(&mut accessor)
262    }
263}
264
265struct DensePrimaryKeyValueAccessor<'a, 'b> {
266    pk: &'a [u8],
267    codec: &'b DensePrimaryKeyCodec,
268    offsets_buf: &'b mut Vec<usize>,
269}
270
271impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
272    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
273        self.codec
274            .encoded_value_at(self.pk, filter.pk_index, self.offsets_buf)
275            .map(Some)
276    }
277
278    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
279        self.codec
280            .decode_value_at(self.pk, filter.pk_index, self.offsets_buf)
281    }
282}
283
284/// Sparse primary key filter.
285#[derive(Clone)]
286pub struct SparsePrimaryKeyFilter {
287    inner: PrimaryKeyFilterInner,
288    codec: SparsePrimaryKeyCodec,
289    offsets_cache: SparseOffsetsCache,
290}
291
292impl SparsePrimaryKeyFilter {
293    pub(crate) fn new(
294        metadata: RegionMetadataRef,
295        filters: Arc<Vec<SimpleFilterEvaluator>>,
296        codec: SparsePrimaryKeyCodec,
297    ) -> Self {
298        Self {
299            inner: PrimaryKeyFilterInner::new(metadata, filters),
300            codec,
301            offsets_cache: SparseOffsetsCache::new(),
302        }
303    }
304}
305
306impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
307    fn matches(&mut self, pk: &[u8]) -> Result<bool> {
308        self.offsets_cache.clear();
309        let mut accessor = SparsePrimaryKeyValueAccessor {
310            pk,
311            codec: &self.codec,
312            offsets_cache: &mut self.offsets_cache,
313        };
314        self.inner.evaluate_filters(&mut accessor)
315    }
316}
317
318struct SparsePrimaryKeyValueAccessor<'a, 'b> {
319    pk: &'a [u8],
320    codec: &'b SparsePrimaryKeyCodec,
321    offsets_cache: &'b mut SparseOffsetsCache,
322}
323
324impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
325    fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
326        self.codec
327            .encoded_value_for_column(self.pk, self.offsets_cache, filter.column_id)
328    }
329
330    fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
331        if let Some(offset) = self
332            .codec
333            .has_column(self.pk, self.offsets_cache, filter.column_id)
334        {
335            self.codec
336                .decode_value_at(self.pk, offset, filter.column_id)
337        } else {
338            Ok(Value::Null)
339        }
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use std::sync::Arc;
346
347    use api::v1::SemanticType;
348    use common_query::prelude::{greptime_timestamp, greptime_value};
349    use datafusion_common::Column;
350    use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
351    use datatypes::prelude::ConcreteDataType;
352    use datatypes::schema::ColumnSchema;
353    use datatypes::value::{OrderedFloat, ValueRef};
354    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
355    use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
356    use store_api::storage::{ColumnId, RegionId};
357
358    use super::*;
359    use crate::row_converter::PrimaryKeyCodecExt;
360
361    fn setup_metadata() -> RegionMetadataRef {
362        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
363        builder
364            .push_column_metadata(ColumnMetadata {
365                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
366                semantic_type: SemanticType::Tag,
367                column_id: 1,
368            })
369            .push_column_metadata(ColumnMetadata {
370                column_schema: ColumnSchema::new(
371                    "namespace",
372                    ConcreteDataType::string_datatype(),
373                    true,
374                ),
375                semantic_type: SemanticType::Tag,
376                column_id: 2,
377            })
378            .push_column_metadata(ColumnMetadata {
379                column_schema: ColumnSchema::new(
380                    "container",
381                    ConcreteDataType::string_datatype(),
382                    true,
383                ),
384                semantic_type: SemanticType::Tag,
385                column_id: 3,
386            })
387            .push_column_metadata(ColumnMetadata {
388                column_schema: ColumnSchema::new(
389                    greptime_value(),
390                    ConcreteDataType::float64_datatype(),
391                    false,
392                ),
393                semantic_type: SemanticType::Field,
394                column_id: 4,
395            })
396            .push_column_metadata(ColumnMetadata {
397                column_schema: ColumnSchema::new(
398                    greptime_timestamp(),
399                    ConcreteDataType::timestamp_nanosecond_datatype(),
400                    false,
401                ),
402                semantic_type: SemanticType::Timestamp,
403                column_id: 5,
404            })
405            .primary_key(vec![1, 2, 3]);
406        let metadata = builder.build().unwrap();
407        Arc::new(metadata)
408    }
409
410    fn setup_partitioned_metadata() -> RegionMetadataRef {
411        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
412        builder
413            .push_column_metadata(ColumnMetadata {
414                column_schema: ColumnSchema::new(
415                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
416                    ConcreteDataType::uint32_datatype(),
417                    false,
418                ),
419                semantic_type: SemanticType::Tag,
420                column_id: 10,
421            })
422            .push_column_metadata(ColumnMetadata {
423                column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
424                semantic_type: SemanticType::Tag,
425                column_id: 1,
426            })
427            .push_column_metadata(ColumnMetadata {
428                column_schema: ColumnSchema::new(
429                    greptime_timestamp(),
430                    ConcreteDataType::timestamp_nanosecond_datatype(),
431                    false,
432                ),
433                semantic_type: SemanticType::Timestamp,
434                column_id: 2,
435            })
436            .primary_key(vec![10, 1]);
437        Arc::new(builder.build().unwrap())
438    }
439
440    fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
441        vec![
442            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
443            (2, ValueRef::String("greptime-cluster")),
444            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
445        ]
446    }
447
448    fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
449        create_filter_with_op(column_name, Operator::Eq, value)
450    }
451
452    fn create_filter_with_op<T: Literal>(
453        column_name: &str,
454        op: Operator,
455        value: T,
456    ) -> SimpleFilterEvaluator {
457        let expr = Expr::BinaryExpr(BinaryExpr {
458            left: Box::new(Expr::Column(Column::from_name(column_name))),
459            op,
460            right: Box::new(value.lit()),
461        });
462        SimpleFilterEvaluator::try_new(&expr).unwrap()
463    }
464
465    fn encode_sparse_pk(
466        metadata: &RegionMetadataRef,
467        table_id: u32,
468        tsid: u64,
469        row: Vec<(ColumnId, ValueRef<'static>)>,
470    ) -> Vec<u8> {
471        let codec = SparsePrimaryKeyCodec::new(metadata);
472        let mut pk = Vec::new();
473        codec.encode_internal(table_id, tsid, &mut pk).unwrap();
474        codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
475        pk
476    }
477
478    fn encode_dense_pk(
479        metadata: &RegionMetadataRef,
480        row: Vec<(ColumnId, ValueRef<'static>)>,
481    ) -> Vec<u8> {
482        let codec = DensePrimaryKeyCodec::new(metadata);
483        let mut pk = Vec::new();
484        codec
485            .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
486            .unwrap();
487        pk
488    }
489
490    #[test]
491    fn test_sparse_primary_key_filter_matches() {
492        let metadata = setup_metadata();
493        let filters = Arc::new(vec![create_filter(
494            "pod",
495            "greptime-frontend-6989d9899-22222",
496        )]);
497        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
498        let codec = SparsePrimaryKeyCodec::new(&metadata);
499        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
500        assert!(filter.matches(&pk).unwrap());
501    }
502
503    #[test]
504    fn test_sparse_primary_key_filter_not_matches() {
505        let metadata = setup_metadata();
506        let filters = Arc::new(vec![create_filter(
507            "pod",
508            "greptime-frontend-6989d9899-22223",
509        )]);
510        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
511        let codec = SparsePrimaryKeyCodec::new(&metadata);
512        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
513        assert!(!filter.matches(&pk).unwrap());
514    }
515
516    #[test]
517    fn test_sparse_primary_key_filter_matches_with_null() {
518        let metadata = setup_metadata();
519        let filters = Arc::new(vec![create_filter(
520            "non-exist-label",
521            "greptime-frontend-6989d9899-22222",
522        )]);
523        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
524        let codec = SparsePrimaryKeyCodec::new(&metadata);
525        let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
526        assert!(filter.matches(&pk).unwrap());
527    }
528
529    #[test]
530    fn test_dense_primary_key_filter_matches() {
531        let metadata = setup_metadata();
532        let filters = Arc::new(vec![create_filter(
533            "pod",
534            "greptime-frontend-6989d9899-22222",
535        )]);
536        let pk = encode_dense_pk(&metadata, create_test_row());
537        let codec = DensePrimaryKeyCodec::new(&metadata);
538        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
539        assert!(filter.matches(&pk).unwrap());
540    }
541
542    #[test]
543    fn test_dense_primary_key_filter_not_matches() {
544        let metadata = setup_metadata();
545        let filters = Arc::new(vec![create_filter(
546            "pod",
547            "greptime-frontend-6989d9899-22223",
548        )]);
549        let pk = encode_dense_pk(&metadata, create_test_row());
550        let codec = DensePrimaryKeyCodec::new(&metadata);
551        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
552        assert!(!filter.matches(&pk).unwrap());
553    }
554
555    #[test]
556    fn test_dense_primary_key_filter_matches_with_null() {
557        let metadata = setup_metadata();
558        let filters = Arc::new(vec![create_filter(
559            "non-exist-label",
560            "greptime-frontend-6989d9899-22222",
561        )]);
562        let pk = encode_dense_pk(&metadata, create_test_row());
563        let codec = DensePrimaryKeyCodec::new(&metadata);
564        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
565        assert!(filter.matches(&pk).unwrap());
566    }
567
568    #[test]
569    fn test_dense_primary_key_filter_order_ops() {
570        let metadata = setup_metadata();
571        let pk = encode_dense_pk(&metadata, create_test_row());
572        let codec = DensePrimaryKeyCodec::new(&metadata);
573
574        let cases = [
575            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
576            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
577            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
578            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
579        ];
580
581        for (op, value, expected) in cases {
582            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
583            let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
584            assert_eq!(expected, filter.matches(&pk).unwrap());
585        }
586    }
587
588    #[test]
589    fn test_sparse_primary_key_filter_order_ops() {
590        let metadata = setup_metadata();
591        let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
592        let codec = SparsePrimaryKeyCodec::new(&metadata);
593
594        let cases = [
595            (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
596            (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
597            (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
598            (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
599        ];
600
601        for (op, value, expected) in cases {
602            let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
603            let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
604            assert_eq!(expected, filter.matches(&pk).unwrap());
605        }
606    }
607
608    #[test]
609    fn test_dense_primary_key_filter_float_eq_fallback() {
610        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
611        builder
612            .push_column_metadata(ColumnMetadata {
613                column_schema: ColumnSchema::new("f", ConcreteDataType::float64_datatype(), true),
614                semantic_type: SemanticType::Tag,
615                column_id: 1,
616            })
617            .push_column_metadata(ColumnMetadata {
618                column_schema: ColumnSchema::new(
619                    greptime_timestamp(),
620                    ConcreteDataType::timestamp_nanosecond_datatype(),
621                    false,
622                ),
623                semantic_type: SemanticType::Timestamp,
624                column_id: 2,
625            })
626            .primary_key(vec![1]);
627        let metadata = Arc::new(builder.build().unwrap());
628
629        let codec = DensePrimaryKeyCodec::new(&metadata);
630        let mut pk = Vec::new();
631        codec
632            .encode_to_vec([ValueRef::Float64(OrderedFloat(-0.0))].into_iter(), &mut pk)
633            .unwrap();
634
635        let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
636        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
637
638        assert!(filter.matches(&pk).unwrap());
639    }
640
641    #[test]
642    fn test_dense_primary_key_filter_matches_partition_column_by_default() {
643        let metadata = setup_partitioned_metadata();
644        let codec = DensePrimaryKeyCodec::new(&metadata);
645        let mut pk = Vec::new();
646        codec
647            .encode_to_vec(
648                [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
649                &mut pk,
650            )
651            .unwrap();
652
653        let filters = Arc::new(vec![create_filter_with_op(
654            DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
655            Operator::Eq,
656            42_u32,
657        )]);
658        let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
659
660        assert!(filter.matches(&pk).unwrap());
661    }
662}