1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::storage::ColumnId;
24
25use crate::error::Result;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28pub fn is_partition_column(name: &str) -> bool {
30 name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
31}
32
33#[derive(Clone)]
34struct PrimaryKeyFilterInner {
35 metadata: RegionMetadataRef,
36 filters: Arc<Vec<SimpleFilterEvaluator>>,
37}
38
39impl PrimaryKeyFilterInner {
40 fn evaluate_filters(
41 &self,
42 mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
43 ) -> bool {
44 if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
45 return true;
46 }
47
48 let mut result = true;
49 for filter in self.filters.iter() {
50 if is_partition_column(filter.column_name()) {
51 continue;
52 }
53 let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
54 continue;
55 };
56 if column.semantic_type != SemanticType::Tag {
58 continue;
59 }
60
61 let value = match decode_fn(column.column_id, &self.metadata) {
62 Ok(v) => v,
63 Err(e) => {
64 common_telemetry::error!(e; "Failed to decode primary key");
65 return true;
66 }
67 };
68
69 let scalar_value = value
74 .try_to_scalar_value(&column.column_schema.data_type)
75 .unwrap();
76 result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
77 }
78
79 result
80 }
81}
82
83#[derive(Clone)]
85pub struct DensePrimaryKeyFilter {
86 inner: PrimaryKeyFilterInner,
87 codec: DensePrimaryKeyCodec,
88 offsets_buf: Vec<usize>,
89}
90
91impl DensePrimaryKeyFilter {
92 pub(crate) fn new(
93 metadata: RegionMetadataRef,
94 filters: Arc<Vec<SimpleFilterEvaluator>>,
95 codec: DensePrimaryKeyCodec,
96 ) -> Self {
97 Self {
98 inner: PrimaryKeyFilterInner { metadata, filters },
99 codec,
100 offsets_buf: Vec::new(),
101 }
102 }
103}
104
105impl PrimaryKeyFilter for DensePrimaryKeyFilter {
106 fn matches(&mut self, pk: &[u8]) -> bool {
107 self.offsets_buf.clear();
108 self.inner.evaluate_filters(|column_id, metadata| {
109 let index = metadata.primary_key_index(column_id).unwrap();
112 self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
113 })
114 }
115}
116
117#[derive(Clone)]
119pub struct SparsePrimaryKeyFilter {
120 inner: PrimaryKeyFilterInner,
121 codec: SparsePrimaryKeyCodec,
122 offsets_map: HashMap<ColumnId, usize>,
123}
124
125impl SparsePrimaryKeyFilter {
126 pub(crate) fn new(
127 metadata: RegionMetadataRef,
128 filters: Arc<Vec<SimpleFilterEvaluator>>,
129 codec: SparsePrimaryKeyCodec,
130 ) -> Self {
131 Self {
132 inner: PrimaryKeyFilterInner { metadata, filters },
133 codec,
134 offsets_map: HashMap::new(),
135 }
136 }
137}
138
139impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
140 fn matches(&mut self, pk: &[u8]) -> bool {
141 self.offsets_map.clear();
142 self.inner.evaluate_filters(|column_id, _| {
143 if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
144 self.codec.decode_value_at(pk, offset, column_id)
145 } else {
146 Ok(Value::Null)
147 }
148 })
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::sync::Arc;
155
156 use api::v1::SemanticType;
157 use datafusion_common::{Column, ScalarValue};
158 use datafusion_expr::{BinaryExpr, Expr, Operator};
159 use datatypes::prelude::ConcreteDataType;
160 use datatypes::schema::ColumnSchema;
161 use datatypes::value::ValueRef;
162 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
163 use store_api::storage::{ColumnId, RegionId};
164
165 use super::*;
166 use crate::row_converter::PrimaryKeyCodecExt;
167
168 fn setup_metadata() -> RegionMetadataRef {
169 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
170 builder
171 .push_column_metadata(ColumnMetadata {
172 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
173 semantic_type: SemanticType::Tag,
174 column_id: 1,
175 })
176 .push_column_metadata(ColumnMetadata {
177 column_schema: ColumnSchema::new(
178 "namespace",
179 ConcreteDataType::string_datatype(),
180 true,
181 ),
182 semantic_type: SemanticType::Tag,
183 column_id: 2,
184 })
185 .push_column_metadata(ColumnMetadata {
186 column_schema: ColumnSchema::new(
187 "container",
188 ConcreteDataType::string_datatype(),
189 true,
190 ),
191 semantic_type: SemanticType::Tag,
192 column_id: 3,
193 })
194 .push_column_metadata(ColumnMetadata {
195 column_schema: ColumnSchema::new(
196 "greptime_value",
197 ConcreteDataType::float64_datatype(),
198 false,
199 ),
200 semantic_type: SemanticType::Field,
201 column_id: 4,
202 })
203 .push_column_metadata(ColumnMetadata {
204 column_schema: ColumnSchema::new(
205 "greptime_timestamp",
206 ConcreteDataType::timestamp_nanosecond_datatype(),
207 false,
208 ),
209 semantic_type: SemanticType::Timestamp,
210 column_id: 5,
211 })
212 .primary_key(vec![1, 2, 3]);
213 let metadata = builder.build().unwrap();
214 Arc::new(metadata)
215 }
216
217 fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
218 vec![
219 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
220 (2, ValueRef::String("greptime-cluster")),
221 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
222 ]
223 }
224
225 fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
226 let expr = Expr::BinaryExpr(BinaryExpr {
227 left: Box::new(Expr::Column(Column::from_name(column_name))),
228 op: Operator::Eq,
229 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))),
230 });
231 SimpleFilterEvaluator::try_new(&expr).unwrap()
232 }
233
234 fn encode_sparse_pk(
235 metadata: &RegionMetadataRef,
236 row: Vec<(ColumnId, ValueRef<'static>)>,
237 ) -> Vec<u8> {
238 let codec = SparsePrimaryKeyCodec::new(metadata);
239 let mut pk = Vec::new();
240 codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
241 pk
242 }
243
244 fn encode_dense_pk(
245 metadata: &RegionMetadataRef,
246 row: Vec<(ColumnId, ValueRef<'static>)>,
247 ) -> Vec<u8> {
248 let codec = DensePrimaryKeyCodec::new(metadata);
249 let mut pk = Vec::new();
250 codec
251 .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
252 .unwrap();
253 pk
254 }
255
256 #[test]
257 fn test_sparse_primary_key_filter_matches() {
258 let metadata = setup_metadata();
259 let filters = Arc::new(vec![create_filter(
260 "pod",
261 "greptime-frontend-6989d9899-22222",
262 )]);
263 let pk = encode_sparse_pk(&metadata, create_test_row());
264 let codec = SparsePrimaryKeyCodec::new(&metadata);
265 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
266 assert!(filter.matches(&pk));
267 }
268
269 #[test]
270 fn test_sparse_primary_key_filter_not_matches() {
271 let metadata = setup_metadata();
272 let filters = Arc::new(vec![create_filter(
273 "pod",
274 "greptime-frontend-6989d9899-22223",
275 )]);
276 let pk = encode_sparse_pk(&metadata, create_test_row());
277 let codec = SparsePrimaryKeyCodec::new(&metadata);
278 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
279 assert!(!filter.matches(&pk));
280 }
281
282 #[test]
283 fn test_sparse_primary_key_filter_matches_with_null() {
284 let metadata = setup_metadata();
285 let filters = Arc::new(vec![create_filter(
286 "non-exist-label",
287 "greptime-frontend-6989d9899-22222",
288 )]);
289 let pk = encode_sparse_pk(&metadata, create_test_row());
290 let codec = SparsePrimaryKeyCodec::new(&metadata);
291 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
292 assert!(filter.matches(&pk));
293 }
294
295 #[test]
296 fn test_dense_primary_key_filter_matches() {
297 let metadata = setup_metadata();
298 let filters = Arc::new(vec![create_filter(
299 "pod",
300 "greptime-frontend-6989d9899-22222",
301 )]);
302 let pk = encode_dense_pk(&metadata, create_test_row());
303 let codec = DensePrimaryKeyCodec::new(&metadata);
304 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
305 assert!(filter.matches(&pk));
306 }
307
308 #[test]
309 fn test_dense_primary_key_filter_not_matches() {
310 let metadata = setup_metadata();
311 let filters = Arc::new(vec![create_filter(
312 "pod",
313 "greptime-frontend-6989d9899-22223",
314 )]);
315 let pk = encode_dense_pk(&metadata, create_test_row());
316 let codec = DensePrimaryKeyCodec::new(&metadata);
317 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
318 assert!(!filter.matches(&pk));
319 }
320
321 #[test]
322 fn test_dense_primary_key_filter_matches_with_null() {
323 let metadata = setup_metadata();
324 let filters = Arc::new(vec![create_filter(
325 "non-exist-label",
326 "greptime-frontend-6989d9899-22222",
327 )]);
328 let pk = encode_dense_pk(&metadata, create_test_row());
329 let codec = DensePrimaryKeyCodec::new(&metadata);
330 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
331 assert!(filter.matches(&pk));
332 }
333}