1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use memcomparable::Serializer;
23use store_api::metadata::RegionMetadataRef;
24use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
25use store_api::storage::ColumnId;
26
27use crate::error::Result;
28use crate::row_converter::{
29 DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec,
30};
31
32pub fn is_partition_column(name: &str) -> bool {
34 name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
35}
36
37#[derive(Clone)]
38struct PrimaryKeyFilterInner {
39 filters: Arc<Vec<SimpleFilterEvaluator>>,
40 compiled_filters: Vec<CompiledPrimaryKeyFilter>,
41}
42
43impl PrimaryKeyFilterInner {
44 fn new(metadata: RegionMetadataRef, filters: Arc<Vec<SimpleFilterEvaluator>>) -> Self {
45 let compiled_filters = Self::compile_filters(&metadata, &filters);
46 Self {
47 filters,
48 compiled_filters,
49 }
50 }
51
52 fn compile_filters(
53 metadata: &RegionMetadataRef,
54 filters: &[SimpleFilterEvaluator],
55 ) -> Vec<CompiledPrimaryKeyFilter> {
56 if filters.is_empty() || metadata.primary_key.is_empty() {
57 return Vec::new();
58 }
59
60 let mut compiled_filters = Vec::with_capacity(filters.len());
61 for (filter_idx, filter) in filters.iter().enumerate() {
62 if is_partition_column(filter.column_name()) {
63 continue;
64 }
65
66 let Some(column) = metadata.column_by_name(filter.column_name()) else {
67 continue;
68 };
69 if column.semantic_type != SemanticType::Tag {
71 continue;
72 }
73
74 let Some(pk_index) = metadata.primary_key_index(column.column_id) else {
76 continue;
77 };
78
79 let data_type = column.column_schema.data_type.clone();
80 let fast_path = CompiledFastPath::try_new(filter, &data_type);
81
82 compiled_filters.push(CompiledPrimaryKeyFilter {
83 filter_idx,
84 column_id: column.column_id,
85 pk_index,
86 data_type,
87 fast_path,
88 });
89 }
90
91 compiled_filters
92 }
93
94 fn evaluate_filters<'a>(&self, accessor: &mut impl PrimaryKeyValueAccessor<'a>) -> bool {
95 if self.compiled_filters.is_empty() {
96 return true;
97 }
98
99 for compiled in &self.compiled_filters {
100 let filter = &self.filters[compiled.filter_idx];
101
102 let passed = if let Some(fast_path) = &compiled.fast_path {
103 let encoded_value = match accessor.encoded_value(compiled) {
104 Ok(v) => v,
105 Err(e) => {
106 common_telemetry::error!(e; "Failed to decode primary key");
107 return true;
108 }
109 };
110 fast_path.matches(encoded_value)
111 } else {
112 let value = match accessor.decode_value(compiled) {
113 Ok(v) => v,
114 Err(e) => {
115 common_telemetry::error!(e; "Failed to decode primary key");
116 return true;
117 }
118 };
119
120 let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
122 filter.evaluate_scalar(&scalar_value).unwrap_or(true)
123 };
124
125 if !passed {
126 return false;
127 }
128 }
129
130 true
131 }
132}
133
134#[derive(Clone)]
135struct CompiledPrimaryKeyFilter {
136 filter_idx: usize,
137 column_id: ColumnId,
138 pk_index: usize,
139 data_type: ConcreteDataType,
140 fast_path: Option<CompiledFastPath>,
141}
142
143#[derive(Clone)]
144enum CompiledFastPath {
145 Eq(Vec<u8>),
146 NotEq(Vec<u8>),
147 Lt(Vec<u8>),
148 LtEq(Vec<u8>),
149 Gt(Vec<u8>),
150 GtEq(Vec<u8>),
151 InList(Vec<Vec<u8>>),
152}
153
154impl CompiledFastPath {
155 fn try_new(filter: &SimpleFilterEvaluator, data_type: &ConcreteDataType) -> Option<Self> {
156 let field = SortField::new(data_type.clone());
157 if field.encode_data_type().is_float() {
160 return None;
161 }
162
163 let encoded = |value: &Value| -> Option<Vec<u8>> {
164 let mut buf = Vec::new();
165 let mut serializer = Serializer::new(&mut buf);
166 field
167 .serialize(&mut serializer, &value.as_value_ref())
168 .ok()?;
169 Some(buf)
170 };
171
172 if filter.is_eq() {
173 let value = filter.literal_value()?;
174 Some(Self::Eq(encoded(&value)?))
175 } else if filter.is_not_eq() {
176 let value = filter.literal_value()?;
177 Some(Self::NotEq(encoded(&value)?))
178 } else if filter.is_lt() {
179 let value = filter.literal_value()?;
180 Some(Self::Lt(encoded(&value)?))
181 } else if filter.is_lt_eq() {
182 let value = filter.literal_value()?;
183 Some(Self::LtEq(encoded(&value)?))
184 } else if filter.is_gt() {
185 let value = filter.literal_value()?;
186 Some(Self::Gt(encoded(&value)?))
187 } else if filter.is_gt_eq() {
188 let value = filter.literal_value()?;
189 Some(Self::GtEq(encoded(&value)?))
190 } else if filter.is_or_eq_chain() {
191 let values = filter.literal_list_values()?;
192 let mut list = Vec::with_capacity(values.len());
193 for value in values {
194 let bytes = encoded(&value)?;
195 if bytes.first() == Some(&0) {
197 continue;
198 }
199 list.push(bytes);
200 }
201 Some(Self::InList(list))
202 } else {
203 None
204 }
205 }
206
207 fn matches(&self, encoded_value: Option<&[u8]>) -> bool {
208 let Some(encoded_value) = encoded_value else {
209 return false;
210 };
211
212 if encoded_value.first() == Some(&0) {
214 return false;
215 }
216
217 match self {
218 CompiledFastPath::Eq(encoded_literal) => {
219 encoded_literal.first() != Some(&0) && encoded_value == &encoded_literal[..]
220 }
221 CompiledFastPath::NotEq(encoded_literal) => {
222 encoded_literal.first() != Some(&0) && encoded_value != &encoded_literal[..]
223 }
224 CompiledFastPath::Lt(encoded_literal) => {
225 encoded_literal.first() != Some(&0) && encoded_value < &encoded_literal[..]
226 }
227 CompiledFastPath::LtEq(encoded_literal) => {
228 encoded_literal.first() != Some(&0) && encoded_value <= &encoded_literal[..]
229 }
230 CompiledFastPath::Gt(encoded_literal) => {
231 encoded_literal.first() != Some(&0) && encoded_value > &encoded_literal[..]
232 }
233 CompiledFastPath::GtEq(encoded_literal) => {
234 encoded_literal.first() != Some(&0) && encoded_value >= &encoded_literal[..]
235 }
236 CompiledFastPath::InList(encoded_literals) => {
237 encoded_literals.iter().any(|lit| encoded_value == &lit[..])
238 }
239 }
240 }
241}
242
243trait PrimaryKeyValueAccessor<'a> {
244 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>>;
245 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value>;
246}
247
248#[derive(Clone)]
250pub struct DensePrimaryKeyFilter {
251 inner: PrimaryKeyFilterInner,
252 codec: DensePrimaryKeyCodec,
253 offsets_buf: Vec<usize>,
254}
255
256impl DensePrimaryKeyFilter {
257 pub(crate) fn new(
258 metadata: RegionMetadataRef,
259 filters: Arc<Vec<SimpleFilterEvaluator>>,
260 codec: DensePrimaryKeyCodec,
261 ) -> Self {
262 Self {
263 inner: PrimaryKeyFilterInner::new(metadata, filters),
264 codec,
265 offsets_buf: Vec::new(),
266 }
267 }
268}
269
270impl PrimaryKeyFilter for DensePrimaryKeyFilter {
271 fn matches(&mut self, pk: &[u8]) -> bool {
272 self.offsets_buf.clear();
273 let mut accessor = DensePrimaryKeyValueAccessor {
274 pk,
275 codec: &self.codec,
276 offsets_buf: &mut self.offsets_buf,
277 };
278 self.inner.evaluate_filters(&mut accessor)
279 }
280}
281
282struct DensePrimaryKeyValueAccessor<'a, 'b> {
283 pk: &'a [u8],
284 codec: &'b DensePrimaryKeyCodec,
285 offsets_buf: &'b mut Vec<usize>,
286}
287
288impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
289 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
290 self.codec
291 .encoded_value_at(self.pk, filter.pk_index, self.offsets_buf)
292 .map(Some)
293 }
294
295 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
296 self.codec
297 .decode_value_at(self.pk, filter.pk_index, self.offsets_buf)
298 }
299}
300
301#[derive(Clone)]
303pub struct SparsePrimaryKeyFilter {
304 inner: PrimaryKeyFilterInner,
305 codec: SparsePrimaryKeyCodec,
306 offsets_map: HashMap<ColumnId, usize>,
307}
308
309impl SparsePrimaryKeyFilter {
310 pub(crate) fn new(
311 metadata: RegionMetadataRef,
312 filters: Arc<Vec<SimpleFilterEvaluator>>,
313 codec: SparsePrimaryKeyCodec,
314 ) -> Self {
315 Self {
316 inner: PrimaryKeyFilterInner::new(metadata, filters),
317 codec,
318 offsets_map: HashMap::new(),
319 }
320 }
321}
322
323impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
324 fn matches(&mut self, pk: &[u8]) -> bool {
325 self.offsets_map.clear();
326 let mut accessor = SparsePrimaryKeyValueAccessor {
327 pk,
328 codec: &self.codec,
329 offsets_map: &mut self.offsets_map,
330 };
331 self.inner.evaluate_filters(&mut accessor)
332 }
333}
334
335struct SparsePrimaryKeyValueAccessor<'a, 'b> {
336 pk: &'a [u8],
337 codec: &'b SparsePrimaryKeyCodec,
338 offsets_map: &'b mut HashMap<ColumnId, usize>,
339}
340
341impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
342 fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
343 self.codec
344 .encoded_value_for_column(self.pk, self.offsets_map, filter.column_id)
345 }
346
347 fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
348 if let Some(offset) = self
349 .codec
350 .has_column(self.pk, self.offsets_map, filter.column_id)
351 {
352 self.codec
353 .decode_value_at(self.pk, offset, filter.column_id)
354 } else {
355 Ok(Value::Null)
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use std::sync::Arc;
363
364 use api::v1::SemanticType;
365 use common_query::prelude::{greptime_timestamp, greptime_value};
366 use datafusion_common::Column;
367 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
368 use datatypes::prelude::ConcreteDataType;
369 use datatypes::schema::ColumnSchema;
370 use datatypes::value::{OrderedFloat, ValueRef};
371 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
372 use store_api::storage::{ColumnId, RegionId};
373
374 use super::*;
375 use crate::row_converter::PrimaryKeyCodecExt;
376
377 fn setup_metadata() -> RegionMetadataRef {
378 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
379 builder
380 .push_column_metadata(ColumnMetadata {
381 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
382 semantic_type: SemanticType::Tag,
383 column_id: 1,
384 })
385 .push_column_metadata(ColumnMetadata {
386 column_schema: ColumnSchema::new(
387 "namespace",
388 ConcreteDataType::string_datatype(),
389 true,
390 ),
391 semantic_type: SemanticType::Tag,
392 column_id: 2,
393 })
394 .push_column_metadata(ColumnMetadata {
395 column_schema: ColumnSchema::new(
396 "container",
397 ConcreteDataType::string_datatype(),
398 true,
399 ),
400 semantic_type: SemanticType::Tag,
401 column_id: 3,
402 })
403 .push_column_metadata(ColumnMetadata {
404 column_schema: ColumnSchema::new(
405 greptime_value(),
406 ConcreteDataType::float64_datatype(),
407 false,
408 ),
409 semantic_type: SemanticType::Field,
410 column_id: 4,
411 })
412 .push_column_metadata(ColumnMetadata {
413 column_schema: ColumnSchema::new(
414 greptime_timestamp(),
415 ConcreteDataType::timestamp_nanosecond_datatype(),
416 false,
417 ),
418 semantic_type: SemanticType::Timestamp,
419 column_id: 5,
420 })
421 .primary_key(vec![1, 2, 3]);
422 let metadata = builder.build().unwrap();
423 Arc::new(metadata)
424 }
425
426 fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
427 vec![
428 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
429 (2, ValueRef::String("greptime-cluster")),
430 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
431 ]
432 }
433
434 fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
435 create_filter_with_op(column_name, Operator::Eq, value)
436 }
437
438 fn create_filter_with_op<T: Literal>(
439 column_name: &str,
440 op: Operator,
441 value: T,
442 ) -> SimpleFilterEvaluator {
443 let expr = Expr::BinaryExpr(BinaryExpr {
444 left: Box::new(Expr::Column(Column::from_name(column_name))),
445 op,
446 right: Box::new(value.lit()),
447 });
448 SimpleFilterEvaluator::try_new(&expr).unwrap()
449 }
450
451 fn encode_sparse_pk(
452 metadata: &RegionMetadataRef,
453 row: Vec<(ColumnId, ValueRef<'static>)>,
454 ) -> Vec<u8> {
455 let codec = SparsePrimaryKeyCodec::new(metadata);
456 let mut pk = Vec::new();
457 codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
458 pk
459 }
460
461 fn encode_dense_pk(
462 metadata: &RegionMetadataRef,
463 row: Vec<(ColumnId, ValueRef<'static>)>,
464 ) -> Vec<u8> {
465 let codec = DensePrimaryKeyCodec::new(metadata);
466 let mut pk = Vec::new();
467 codec
468 .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
469 .unwrap();
470 pk
471 }
472
473 #[test]
474 fn test_sparse_primary_key_filter_matches() {
475 let metadata = setup_metadata();
476 let filters = Arc::new(vec![create_filter(
477 "pod",
478 "greptime-frontend-6989d9899-22222",
479 )]);
480 let pk = encode_sparse_pk(&metadata, create_test_row());
481 let codec = SparsePrimaryKeyCodec::new(&metadata);
482 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
483 assert!(filter.matches(&pk));
484 }
485
486 #[test]
487 fn test_sparse_primary_key_filter_not_matches() {
488 let metadata = setup_metadata();
489 let filters = Arc::new(vec![create_filter(
490 "pod",
491 "greptime-frontend-6989d9899-22223",
492 )]);
493 let pk = encode_sparse_pk(&metadata, create_test_row());
494 let codec = SparsePrimaryKeyCodec::new(&metadata);
495 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
496 assert!(!filter.matches(&pk));
497 }
498
499 #[test]
500 fn test_sparse_primary_key_filter_matches_with_null() {
501 let metadata = setup_metadata();
502 let filters = Arc::new(vec![create_filter(
503 "non-exist-label",
504 "greptime-frontend-6989d9899-22222",
505 )]);
506 let pk = encode_sparse_pk(&metadata, create_test_row());
507 let codec = SparsePrimaryKeyCodec::new(&metadata);
508 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
509 assert!(filter.matches(&pk));
510 }
511
512 #[test]
513 fn test_dense_primary_key_filter_matches() {
514 let metadata = setup_metadata();
515 let filters = Arc::new(vec![create_filter(
516 "pod",
517 "greptime-frontend-6989d9899-22222",
518 )]);
519 let pk = encode_dense_pk(&metadata, create_test_row());
520 let codec = DensePrimaryKeyCodec::new(&metadata);
521 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
522 assert!(filter.matches(&pk));
523 }
524
525 #[test]
526 fn test_dense_primary_key_filter_not_matches() {
527 let metadata = setup_metadata();
528 let filters = Arc::new(vec![create_filter(
529 "pod",
530 "greptime-frontend-6989d9899-22223",
531 )]);
532 let pk = encode_dense_pk(&metadata, create_test_row());
533 let codec = DensePrimaryKeyCodec::new(&metadata);
534 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
535 assert!(!filter.matches(&pk));
536 }
537
538 #[test]
539 fn test_dense_primary_key_filter_matches_with_null() {
540 let metadata = setup_metadata();
541 let filters = Arc::new(vec![create_filter(
542 "non-exist-label",
543 "greptime-frontend-6989d9899-22222",
544 )]);
545 let pk = encode_dense_pk(&metadata, create_test_row());
546 let codec = DensePrimaryKeyCodec::new(&metadata);
547 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
548 assert!(filter.matches(&pk));
549 }
550
551 #[test]
552 fn test_dense_primary_key_filter_order_ops() {
553 let metadata = setup_metadata();
554 let pk = encode_dense_pk(&metadata, create_test_row());
555 let codec = DensePrimaryKeyCodec::new(&metadata);
556
557 let cases = [
558 (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
559 (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
560 (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
561 (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
562 ];
563
564 for (op, value, expected) in cases {
565 let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
566 let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
567 assert_eq!(expected, filter.matches(&pk));
568 }
569 }
570
571 #[test]
572 fn test_sparse_primary_key_filter_order_ops() {
573 let metadata = setup_metadata();
574 let pk = encode_sparse_pk(&metadata, create_test_row());
575 let codec = SparsePrimaryKeyCodec::new(&metadata);
576
577 let cases = [
578 (Operator::Gt, "greptime-frontend-6989d9899-22221", true),
579 (Operator::GtEq, "greptime-frontend-6989d9899-22222", true),
580 (Operator::Lt, "greptime-frontend-6989d9899-22223", true),
581 (Operator::LtEq, "greptime-frontend-6989d9899-22221", false),
582 ];
583
584 for (op, value, expected) in cases {
585 let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
586 let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
587 assert_eq!(expected, filter.matches(&pk));
588 }
589 }
590
591 #[test]
592 fn test_dense_primary_key_filter_float_eq_fallback() {
593 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
594 builder
595 .push_column_metadata(ColumnMetadata {
596 column_schema: ColumnSchema::new("f", ConcreteDataType::float64_datatype(), true),
597 semantic_type: SemanticType::Tag,
598 column_id: 1,
599 })
600 .push_column_metadata(ColumnMetadata {
601 column_schema: ColumnSchema::new(
602 greptime_timestamp(),
603 ConcreteDataType::timestamp_nanosecond_datatype(),
604 false,
605 ),
606 semantic_type: SemanticType::Timestamp,
607 column_id: 2,
608 })
609 .primary_key(vec![1]);
610 let metadata = Arc::new(builder.build().unwrap());
611
612 let codec = DensePrimaryKeyCodec::new(&metadata);
613 let mut pk = Vec::new();
614 codec
615 .encode_to_vec([ValueRef::Float64(OrderedFloat(-0.0))].into_iter(), &mut pk)
616 .unwrap();
617
618 let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
619 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
620
621 assert!(filter.matches(&pk));
622 }
623}