mito2/memtable/partition_tree/
primary_key_filter.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::storage::ColumnId;
23
24use crate::error::Result;
25use crate::memtable::partition_tree::partition::Partition;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28#[derive(Clone)]
29struct PrimaryKeyFilterInner {
30 metadata: RegionMetadataRef,
31 filters: Arc<Vec<SimpleFilterEvaluator>>,
32}
33
34impl PrimaryKeyFilterInner {
35 fn evaluate_filters(
36 &self,
37 mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
38 ) -> bool {
39 if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
40 return true;
41 }
42
43 let mut result = true;
44 for filter in self.filters.iter() {
45 if Partition::is_partition_column(filter.column_name()) {
46 continue;
47 }
48 let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
49 continue;
50 };
51 if column.semantic_type != SemanticType::Tag {
53 continue;
54 }
55
56 let value = match decode_fn(column.column_id, &self.metadata) {
57 Ok(v) => v,
58 Err(e) => {
59 common_telemetry::error!(e; "Failed to decode primary key");
60 return true;
61 }
62 };
63
64 let scalar_value = value
69 .try_to_scalar_value(&column.column_schema.data_type)
70 .unwrap();
71 result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
72 }
73
74 result
75 }
76}
77
78#[derive(Clone)]
80pub struct DensePrimaryKeyFilter {
81 inner: PrimaryKeyFilterInner,
82 codec: DensePrimaryKeyCodec,
83 offsets_buf: Vec<usize>,
84}
85
86impl DensePrimaryKeyFilter {
87 pub(crate) fn new(
88 metadata: RegionMetadataRef,
89 filters: Arc<Vec<SimpleFilterEvaluator>>,
90 codec: DensePrimaryKeyCodec,
91 ) -> Self {
92 Self {
93 inner: PrimaryKeyFilterInner { metadata, filters },
94 codec,
95 offsets_buf: Vec::new(),
96 }
97 }
98}
99
100impl PrimaryKeyFilter for DensePrimaryKeyFilter {
101 fn matches(&mut self, pk: &[u8]) -> bool {
102 self.offsets_buf.clear();
103 self.inner.evaluate_filters(|column_id, metadata| {
104 let index = metadata.primary_key_index(column_id).unwrap();
107 self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
108 })
109 }
110}
111
112#[derive(Clone)]
114pub struct SparsePrimaryKeyFilter {
115 inner: PrimaryKeyFilterInner,
116 codec: SparsePrimaryKeyCodec,
117 offsets_map: HashMap<ColumnId, usize>,
118}
119
120impl SparsePrimaryKeyFilter {
121 pub(crate) fn new(
122 metadata: RegionMetadataRef,
123 filters: Arc<Vec<SimpleFilterEvaluator>>,
124 codec: SparsePrimaryKeyCodec,
125 ) -> Self {
126 Self {
127 inner: PrimaryKeyFilterInner { metadata, filters },
128 codec,
129 offsets_map: HashMap::new(),
130 }
131 }
132}
133
134impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
135 fn matches(&mut self, pk: &[u8]) -> bool {
136 self.offsets_map.clear();
137 self.inner.evaluate_filters(|column_id, _| {
138 if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
139 self.codec.decode_value_at(pk, offset, column_id)
140 } else {
141 Ok(Value::Null)
142 }
143 })
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::sync::Arc;
150
151 use api::v1::SemanticType;
152 use datafusion::logical_expr::BinaryExpr;
153 use datafusion_common::{Column, ScalarValue};
154 use datafusion_expr::{Expr, Operator};
155 use datatypes::prelude::ConcreteDataType;
156 use datatypes::schema::ColumnSchema;
157 use datatypes::value::ValueRef;
158 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
159 use store_api::storage::{ColumnId, RegionId};
160
161 use super::*;
162 use crate::row_converter::PrimaryKeyCodecExt;
163
164 fn setup_metadata() -> RegionMetadataRef {
165 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
166 builder
167 .push_column_metadata(ColumnMetadata {
168 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
169 semantic_type: SemanticType::Tag,
170 column_id: 1,
171 })
172 .push_column_metadata(ColumnMetadata {
173 column_schema: ColumnSchema::new(
174 "namespace",
175 ConcreteDataType::string_datatype(),
176 true,
177 ),
178 semantic_type: SemanticType::Tag,
179 column_id: 2,
180 })
181 .push_column_metadata(ColumnMetadata {
182 column_schema: ColumnSchema::new(
183 "container",
184 ConcreteDataType::string_datatype(),
185 true,
186 ),
187 semantic_type: SemanticType::Tag,
188 column_id: 3,
189 })
190 .push_column_metadata(ColumnMetadata {
191 column_schema: ColumnSchema::new(
192 "greptime_value",
193 ConcreteDataType::float64_datatype(),
194 false,
195 ),
196 semantic_type: SemanticType::Field,
197 column_id: 4,
198 })
199 .push_column_metadata(ColumnMetadata {
200 column_schema: ColumnSchema::new(
201 "greptime_timestamp",
202 ConcreteDataType::timestamp_nanosecond_datatype(),
203 false,
204 ),
205 semantic_type: SemanticType::Timestamp,
206 column_id: 5,
207 })
208 .primary_key(vec![1, 2, 3]);
209 let metadata = builder.build().unwrap();
210 Arc::new(metadata)
211 }
212
213 fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
214 vec![
215 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
216 (2, ValueRef::String("greptime-cluster")),
217 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
218 ]
219 }
220
221 fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
222 let expr = Expr::BinaryExpr(BinaryExpr {
223 left: Box::new(Expr::Column(Column::from_name(column_name))),
224 op: Operator::Eq,
225 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))),
226 });
227 SimpleFilterEvaluator::try_new(&expr).unwrap()
228 }
229
230 fn encode_sparse_pk(
231 metadata: &RegionMetadataRef,
232 row: Vec<(ColumnId, ValueRef<'static>)>,
233 ) -> Vec<u8> {
234 let codec = SparsePrimaryKeyCodec::new(metadata);
235 let mut pk = Vec::new();
236 codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
237 pk
238 }
239
240 fn encode_dense_pk(
241 metadata: &RegionMetadataRef,
242 row: Vec<(ColumnId, ValueRef<'static>)>,
243 ) -> Vec<u8> {
244 let codec = DensePrimaryKeyCodec::new(metadata);
245 let mut pk = Vec::new();
246 codec
247 .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
248 .unwrap();
249 pk
250 }
251
252 #[test]
253 fn test_sparse_primary_key_filter_matches() {
254 let metadata = setup_metadata();
255 let filters = Arc::new(vec![create_filter(
256 "pod",
257 "greptime-frontend-6989d9899-22222",
258 )]);
259 let pk = encode_sparse_pk(&metadata, create_test_row());
260 let codec = SparsePrimaryKeyCodec::new(&metadata);
261 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
262 assert!(filter.matches(&pk));
263 }
264
265 #[test]
266 fn test_sparse_primary_key_filter_not_matches() {
267 let metadata = setup_metadata();
268 let filters = Arc::new(vec![create_filter(
269 "pod",
270 "greptime-frontend-6989d9899-22223",
271 )]);
272 let pk = encode_sparse_pk(&metadata, create_test_row());
273 let codec = SparsePrimaryKeyCodec::new(&metadata);
274 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
275 assert!(!filter.matches(&pk));
276 }
277
278 #[test]
279 fn test_sparse_primary_key_filter_matches_with_null() {
280 let metadata = setup_metadata();
281 let filters = Arc::new(vec![create_filter(
282 "non-exist-label",
283 "greptime-frontend-6989d9899-22222",
284 )]);
285 let pk = encode_sparse_pk(&metadata, create_test_row());
286 let codec = SparsePrimaryKeyCodec::new(&metadata);
287 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
288 assert!(filter.matches(&pk));
289 }
290
291 #[test]
292 fn test_dense_primary_key_filter_matches() {
293 let metadata = setup_metadata();
294 let filters = Arc::new(vec![create_filter(
295 "pod",
296 "greptime-frontend-6989d9899-22222",
297 )]);
298 let pk = encode_dense_pk(&metadata, create_test_row());
299 let codec = DensePrimaryKeyCodec::new(&metadata);
300 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
301 assert!(filter.matches(&pk));
302 }
303
304 #[test]
305 fn test_dense_primary_key_filter_not_matches() {
306 let metadata = setup_metadata();
307 let filters = Arc::new(vec![create_filter(
308 "pod",
309 "greptime-frontend-6989d9899-22223",
310 )]);
311 let pk = encode_dense_pk(&metadata, create_test_row());
312 let codec = DensePrimaryKeyCodec::new(&metadata);
313 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
314 assert!(!filter.matches(&pk));
315 }
316
317 #[test]
318 fn test_dense_primary_key_filter_matches_with_null() {
319 let metadata = setup_metadata();
320 let filters = Arc::new(vec![create_filter(
321 "non-exist-label",
322 "greptime-frontend-6989d9899-22222",
323 )]);
324 let pk = encode_dense_pk(&metadata, create_test_row());
325 let codec = DensePrimaryKeyCodec::new(&metadata);
326 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
327 assert!(filter.matches(&pk));
328 }
329}