1use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::data_type::ConcreteDataType;
20use datatypes::value::Value;
21use memcomparable::Serializer;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::ColumnId;
25
26use crate::error::{EvaluateFilterSnafu, Result};
27use crate::row_converter::{
28 DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec,
29};
30
31#[derive(Clone)]
32struct PrimaryKeyFilterInner {
33 filters: Arc<Vec<SimpleFilterEvaluator>>,
34 compiled_filters: Vec<CompiledPrimaryKeyFilter>,
35}
36
37impl PrimaryKeyFilterInner {
38 fn new(metadata: RegionMetadataRef, filters: Arc<Vec<SimpleFilterEvaluator>>) -> Self {
39 let compiled_filters = Self::compile_filters(&metadata, &filters);
40 Self {
41 filters,
42 compiled_filters,
43 }
44 }
45
46 fn compile_filters(
47 metadata: &RegionMetadataRef,
48 filters: &[SimpleFilterEvaluator],
49 ) -> Vec<CompiledPrimaryKeyFilter> {
50 if filters.is_empty() || metadata.primary_key.is_empty() {
51 return Vec::new();
52 }
53
54 let mut compiled_filters = Vec::with_capacity(filters.len());
55 for (filter_idx, filter) in filters.iter().enumerate() {
56 let Some(column) = metadata.column_by_name(filter.column_name()) else {
57 continue;
58 };
59 if column.semantic_type != SemanticType::Tag {
61 continue;
62 }
63
64 let Some(pk_index) = metadata.primary_key_index(column.column_id) else {
66 continue;
67 };
68
69 let data_type = column.column_schema.data_type.clone();
70 let fast_path = CompiledFastPath::try_new(filter, &data_type);
71
72 compiled_filters.push(CompiledPrimaryKeyFilter {
73 filter_idx,
74 column_id: column.column_id,
75 pk_index,
76 data_type,
77 fast_path,
78 });
79 }
80
81 compiled_filters
82 }
83
84 fn evaluate_filters<'a>(
85 &self,
86 accessor: &mut impl PrimaryKeyValueAccessor<'a>,
87 ) -> Result<bool> {
88 if self.compiled_filters.is_empty() {
89 return Ok(true);
90 }
91
92 for compiled in &self.compiled_filters {
93 let filter = &self.filters[compiled.filter_idx];
94
95 let passed = if let Some(fast_path) = &compiled.fast_path {
96 let encoded_value = accessor.encoded_value(compiled)?;
97 fast_path.matches(encoded_value)
98 } else {
99 let value = accessor.decode_value(compiled)?;
100
101 let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
103 filter
104 .evaluate_scalar(&scalar_value)
105 .context(EvaluateFilterSnafu)?
106 };
107
108 if !passed {
109 return Ok(false);
110 }
111 }
112
113 Ok(true)
114 }
115}
116
117#[derive(Clone)]
118struct CompiledPrimaryKeyFilter {
119 filter_idx: usize,
120 column_id: ColumnId,
121 pk_index: usize,
122 data_type: ConcreteDataType,
123 fast_path: Option<CompiledFastPath>,
124}
125
126#[derive(Clone)]
127enum CompiledFastPath {
128 Eq(Vec<u8>),
129 NotEq(Vec<u8>),
130 Lt(Vec<u8>),
131 LtEq(Vec<u8>),
132 Gt(Vec<u8>),
133 GtEq(Vec<u8>),
134 InList(Vec<Vec<u8>>),
135}
136
137impl CompiledFastPath {
138 fn try_new(filter: &SimpleFilterEvaluator, data_type: &ConcreteDataType) -> Option<Self> {
139 let field = SortField::new(data_type.clone());
140 if field.encode_data_type().is_float() {
143 return None;
144 }
145
146 let encoded = |value: &Value| -> Option<Vec<u8>> {
147 let mut buf = Vec::new();
148 let mut serializer = Serializer::new(&mut buf);
149 field
150 .serialize(&mut serializer, &value.as_value_ref())
151 .ok()?;
152 Some(buf)
153 };
154
155 if filter.is_eq() {
156 let value = filter.literal_value()?;
157 Some(Self::Eq(encoded(&value)?))
158 } else if filter.is_not_eq() {
159 let value = filter.literal_value()?;
160 Some(Self::NotEq(encoded(&value)?))
161 } else if filter.is_lt() {
162 let value = filter.literal_value()?;
163 Some(Self::Lt(encoded(&value)?))
164 } else if filter.is_lt_eq() {
165 let value = filter.literal_value()?;
166 Some(Self::LtEq(encoded(&value)?))
167 } else if filter.is_gt() {
168 let value = filter.literal_value()?;
169 Some(Self::Gt(encoded(&value)?))
170 } else if filter.is_gt_eq() {
171 let value = filter.literal_value()?;
172 Some(Self::GtEq(encoded(&value)?))
173 } else if filter.is_or_eq_chain() {
174 let values = filter.literal_list_values()?;
175 let mut list = Vec::with_capacity(values.len());
176 for value in values {
177 let bytes = encoded(&value)?;
178 if bytes.first() == Some(&0) {
180 continue;
181 }
182 list.push(bytes);
183 }
184 Some(Self::InList(list))
185 } else {
186 None
187 }
188 }
189
190 fn matches(&self, encoded_value: Option<&[u8]>) -> bool {
191 let Some(encoded_value) = encoded_value else {
192 return false;
193 };
194
195 if encoded_value.first() == Some(&0) {
197 return false;
198 }
199
200 match self {
201 CompiledFastPath::Eq(encoded_literal) => {
202 encoded_literal.first() != Some(&0) && encoded_value == &encoded_literal[..]
203 }
204 CompiledFastPath::NotEq(encoded_literal) => {
205 encoded_literal.first() != Some(&0) && encoded_value != &encoded_literal[..]
206 }
207 CompiledFastPath::Lt(encoded_literal) => {
208 encoded_literal.first() != Some(&0) && encoded_value < &encoded_literal[..]
209 }
210 CompiledFastPath::LtEq(encoded_literal) => {
211 encoded_literal.first() != Some(&0) && encoded_value <= &encoded_literal[..]
212 }
213 CompiledFastPath::Gt(encoded_literal) => {
214 encoded_literal.first() != Some(&0) && encoded_value > &encoded_literal[..]
215 }
216 CompiledFastPath::GtEq(encoded_literal) => {
217 encoded_literal.first() != Some(&0) && encoded_value >= &encoded_literal[..]
218 }
219 CompiledFastPath::InList(encoded_literals) => {
220 encoded_literals.iter().any(|lit| encoded_value == &lit[..])
221 }
222 }
223 }
224}
225
226trait PrimaryKeyValueAccessor<'a> {
227 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>>;
228 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value>;
229}
230
231#[derive(Clone)]
233pub struct DensePrimaryKeyFilter {
234 inner: PrimaryKeyFilterInner,
235 codec: DensePrimaryKeyCodec,
236 offsets_buf: Vec<usize>,
237}
238
239impl DensePrimaryKeyFilter {
240 pub(crate) fn new(
241 metadata: RegionMetadataRef,
242 filters: Arc<Vec<SimpleFilterEvaluator>>,
243 codec: DensePrimaryKeyCodec,
244 ) -> Self {
245 Self {
246 inner: PrimaryKeyFilterInner::new(metadata, filters),
247 codec,
248 offsets_buf: Vec::new(),
249 }
250 }
251}
252
253impl PrimaryKeyFilter for DensePrimaryKeyFilter {
254 fn matches(&mut self, pk: &[u8]) -> Result<bool> {
255 self.offsets_buf.clear();
256 let mut accessor = DensePrimaryKeyValueAccessor {
257 pk,
258 codec: &self.codec,
259 offsets_buf: &mut self.offsets_buf,
260 };
261 self.inner.evaluate_filters(&mut accessor)
262 }
263}
264
265struct DensePrimaryKeyValueAccessor<'a, 'b> {
266 pk: &'a [u8],
267 codec: &'b DensePrimaryKeyCodec,
268 offsets_buf: &'b mut Vec<usize>,
269}
270
271impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
272 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
273 self.codec
274 .encoded_value_at(self.pk, filter.pk_index, self.offsets_buf)
275 .map(Some)
276 }
277
278 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
279 self.codec
280 .decode_value_at(self.pk, filter.pk_index, self.offsets_buf)
281 }
282}
283
284#[derive(Clone)]
286pub struct SparsePrimaryKeyFilter {
287 inner: PrimaryKeyFilterInner,
288 codec: SparsePrimaryKeyCodec,
289 offsets_cache: SparseOffsetsCache,
290}
291
292impl SparsePrimaryKeyFilter {
293 pub(crate) fn new(
294 metadata: RegionMetadataRef,
295 filters: Arc<Vec<SimpleFilterEvaluator>>,
296 codec: SparsePrimaryKeyCodec,
297 ) -> Self {
298 Self {
299 inner: PrimaryKeyFilterInner::new(metadata, filters),
300 codec,
301 offsets_cache: SparseOffsetsCache::new(),
302 }
303 }
304}
305
306impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
307 fn matches(&mut self, pk: &[u8]) -> Result<bool> {
308 self.offsets_cache.clear();
309 let mut accessor = SparsePrimaryKeyValueAccessor {
310 pk,
311 codec: &self.codec,
312 offsets_cache: &mut self.offsets_cache,
313 };
314 self.inner.evaluate_filters(&mut accessor)
315 }
316}
317
318struct SparsePrimaryKeyValueAccessor<'a, 'b> {
319 pk: &'a [u8],
320 codec: &'b SparsePrimaryKeyCodec,
321 offsets_cache: &'b mut SparseOffsetsCache,
322}
323
324impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
325 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
326 self.codec
327 .encoded_value_for_column(self.pk, self.offsets_cache, filter.column_id)
328 }
329
330 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
331 if let Some(offset) = self
332 .codec
333 .has_column(self.pk, self.offsets_cache, filter.column_id)
334 {
335 self.codec
336 .decode_value_at(self.pk, offset, filter.column_id)
337 } else {
338 Ok(Value::Null)
339 }
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use std::sync::Arc;
346
347 use api::v1::SemanticType;
348 use common_query::prelude::{greptime_timestamp, greptime_value};
349 use datafusion_common::Column;
350 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
351 use datatypes::prelude::ConcreteDataType;
352 use datatypes::schema::ColumnSchema;
353 use datatypes::value::{OrderedFloat, ValueRef};
354 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
355 use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
356 use store_api::storage::{ColumnId, RegionId};
357
358 use super::*;
359 use crate::row_converter::PrimaryKeyCodecExt;
360
361 fn setup_metadata() -> RegionMetadataRef {
362 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
363 builder
364 .push_column_metadata(ColumnMetadata {
365 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
366 semantic_type: SemanticType::Tag,
367 column_id: 1,
368 })
369 .push_column_metadata(ColumnMetadata {
370 column_schema: ColumnSchema::new(
371 "namespace",
372 ConcreteDataType::string_datatype(),
373 true,
374 ),
375 semantic_type: SemanticType::Tag,
376 column_id: 2,
377 })
378 .push_column_metadata(ColumnMetadata {
379 column_schema: ColumnSchema::new(
380 "container",
381 ConcreteDataType::string_datatype(),
382 true,
383 ),
384 semantic_type: SemanticType::Tag,
385 column_id: 3,
386 })
387 .push_column_metadata(ColumnMetadata {
388 column_schema: ColumnSchema::new(
389 greptime_value(),
390 ConcreteDataType::float64_datatype(),
391 false,
392 ),
393 semantic_type: SemanticType::Field,
394 column_id: 4,
395 })
396 .push_column_metadata(ColumnMetadata {
397 column_schema: ColumnSchema::new(
398 greptime_timestamp(),
399 ConcreteDataType::timestamp_nanosecond_datatype(),
400 false,
401 ),
402 semantic_type: SemanticType::Timestamp,
403 column_id: 5,
404 })
405 .primary_key(vec![1, 2, 3]);
406 let metadata = builder.build().unwrap();
407 Arc::new(metadata)
408 }
409
410 fn setup_partitioned_metadata() -> RegionMetadataRef {
411 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
412 builder
413 .push_column_metadata(ColumnMetadata {
414 column_schema: ColumnSchema::new(
415 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
416 ConcreteDataType::uint32_datatype(),
417 false,
418 ),
419 semantic_type: SemanticType::Tag,
420 column_id: 10,
421 })
422 .push_column_metadata(ColumnMetadata {
423 column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
424 semantic_type: SemanticType::Tag,
425 column_id: 1,
426 })
427 .push_column_metadata(ColumnMetadata {
428 column_schema: ColumnSchema::new(
429 greptime_timestamp(),
430 ConcreteDataType::timestamp_nanosecond_datatype(),
431 false,
432 ),
433 semantic_type: SemanticType::Timestamp,
434 column_id: 2,
435 })
436 .primary_key(vec![10, 1]);
437 Arc::new(builder.build().unwrap())
438 }
439
440 fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
441 vec![
442 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
443 (2, ValueRef::String("greptime-cluster")),
444 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
445 ]
446 }
447
448 fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
449 create_filter_with_op(column_name, Operator::Eq, value)
450 }
451
452 fn create_filter_with_op<T: Literal>(
453 column_name: &str,
454 op: Operator,
455 value: T,
456 ) -> SimpleFilterEvaluator {
457 let expr = Expr::BinaryExpr(BinaryExpr {
458 left: Box::new(Expr::Column(Column::from_name(column_name))),
459 op,
460 right: Box::new(value.lit()),
461 });
462 SimpleFilterEvaluator::try_new(&expr).unwrap()
463 }
464
465 fn encode_sparse_pk(
466 metadata: &RegionMetadataRef,
467 table_id: u32,
468 tsid: u64,
469 row: Vec<(ColumnId, ValueRef<'static>)>,
470 ) -> Vec<u8> {
471 let codec = SparsePrimaryKeyCodec::new(metadata);
472 let mut pk = Vec::new();
473 codec.encode_internal(table_id, tsid, &mut pk).unwrap();
474 codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
475 pk
476 }
477
478 fn encode_dense_pk(
479 metadata: &RegionMetadataRef,
480 row: Vec<(ColumnId, ValueRef<'static>)>,
481 ) -> Vec<u8> {
482 let codec = DensePrimaryKeyCodec::new(metadata);
483 let mut pk = Vec::new();
484 codec
485 .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
486 .unwrap();
487 pk
488 }
489
490 #[test]
491 fn test_sparse_primary_key_filter_matches() {
492 let metadata = setup_metadata();
493 let filters = Arc::new(vec![create_filter(
494 "pod",
495 "greptime-frontend-6989d9899-22222",
496 )]);
497 let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
498 let codec = SparsePrimaryKeyCodec::new(&metadata);
499 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
500 assert!(filter.matches(&pk).unwrap());
501 }
502
503 #[test]
504 fn test_sparse_primary_key_filter_not_matches() {
505 let metadata = setup_metadata();
506 let filters = Arc::new(vec![create_filter(
507 "pod",
508 "greptime-frontend-6989d9899-22223",
509 )]);
510 let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
511 let codec = SparsePrimaryKeyCodec::new(&metadata);
512 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
513 assert!(!filter.matches(&pk).unwrap());
514 }
515
516 #[test]
517 fn test_sparse_primary_key_filter_matches_with_null() {
518 let metadata = setup_metadata();
519 let filters = Arc::new(vec![create_filter(
520 "non-exist-label",
521 "greptime-frontend-6989d9899-22222",
522 )]);
523 let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
524 let codec = SparsePrimaryKeyCodec::new(&metadata);
525 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
526 assert!(filter.matches(&pk).unwrap());
527 }
528
529 #[test]
530 fn test_dense_primary_key_filter_matches() {
531 let metadata = setup_metadata();
532 let filters = Arc::new(vec![create_filter(
533 "pod",
534 "greptime-frontend-6989d9899-22222",
535 )]);
536 let pk = encode_dense_pk(&metadata, create_test_row());
537 let codec = DensePrimaryKeyCodec::new(&metadata);
538 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
539 assert!(filter.matches(&pk).unwrap());
540 }
541
542 #[test]
543 fn test_dense_primary_key_filter_not_matches() {
544 let metadata = setup_metadata();
545 let filters = Arc::new(vec![create_filter(
546 "pod",
547 "greptime-frontend-6989d9899-22223",
548 )]);
549 let pk = encode_dense_pk(&metadata, create_test_row());
550 let codec = DensePrimaryKeyCodec::new(&metadata);
551 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
552 assert!(!filter.matches(&pk).unwrap());
553 }
554
555 #[test]
556 fn test_dense_primary_key_filter_matches_with_null() {
557 let metadata = setup_metadata();
558 let filters = Arc::new(vec![create_filter(
559 "non-exist-label",
560 "greptime-frontend-6989d9899-22222",
561 )]);
562 let pk = encode_dense_pk(&metadata, create_test_row());
563 let codec = DensePrimaryKeyCodec::new(&metadata);
564 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
565 assert!(filter.matches(&pk).unwrap());
566 }
567
568 #[test]
569 fn test_dense_primary_key_filter_order_ops() {
570 let metadata = setup_metadata();
571 let pk = encode_dense_pk(&metadata, create_test_row());
572 let codec = DensePrimaryKeyCodec::new(&metadata);
573
574 let cases = [
575 (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
576 (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
577 (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
578 (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
579 ];
580
581 for (op, value, expected) in cases {
582 let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
583 let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
584 assert_eq!(expected, filter.matches(&pk).unwrap());
585 }
586 }
587
588 #[test]
589 fn test_sparse_primary_key_filter_order_ops() {
590 let metadata = setup_metadata();
591 let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
592 let codec = SparsePrimaryKeyCodec::new(&metadata);
593
594 let cases = [
595 (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
596 (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
597 (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
598 (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
599 ];
600
601 for (op, value, expected) in cases {
602 let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
603 let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
604 assert_eq!(expected, filter.matches(&pk).unwrap());
605 }
606 }
607
608 #[test]
609 fn test_dense_primary_key_filter_float_eq_fallback() {
610 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
611 builder
612 .push_column_metadata(ColumnMetadata {
613 column_schema: ColumnSchema::new("f", ConcreteDataType::float64_datatype(), true),
614 semantic_type: SemanticType::Tag,
615 column_id: 1,
616 })
617 .push_column_metadata(ColumnMetadata {
618 column_schema: ColumnSchema::new(
619 greptime_timestamp(),
620 ConcreteDataType::timestamp_nanosecond_datatype(),
621 false,
622 ),
623 semantic_type: SemanticType::Timestamp,
624 column_id: 2,
625 })
626 .primary_key(vec![1]);
627 let metadata = Arc::new(builder.build().unwrap());
628
629 let codec = DensePrimaryKeyCodec::new(&metadata);
630 let mut pk = Vec::new();
631 codec
632 .encode_to_vec([ValueRef::Float64(OrderedFloat(-0.0))].into_iter(), &mut pk)
633 .unwrap();
634
635 let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
636 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
637
638 assert!(filter.matches(&pk).unwrap());
639 }
640
641 #[test]
642 fn test_dense_primary_key_filter_matches_partition_column_by_default() {
643 let metadata = setup_partitioned_metadata();
644 let codec = DensePrimaryKeyCodec::new(&metadata);
645 let mut pk = Vec::new();
646 codec
647 .encode_to_vec(
648 [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
649 &mut pk,
650 )
651 .unwrap();
652
653 let filters = Arc::new(vec![create_filter_with_op(
654 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
655 Operator::Eq,
656 42_u32,
657 )]);
658 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
659
660 assert!(filter.matches(&pk).unwrap());
661 }
662}