1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::value::Value;
21use store_api::metadata::RegionMetadataRef;
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::storage::ColumnId;
24
25use crate::error::Result;
26use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
27
28pub fn is_partition_column(name: &str) -> bool {
30 name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
31}
32
33#[derive(Clone)]
34struct PrimaryKeyFilterInner {
35 metadata: RegionMetadataRef,
36 filters: Arc<Vec<SimpleFilterEvaluator>>,
37}
38
39impl PrimaryKeyFilterInner {
40 fn evaluate_filters(
41 &self,
42 mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
43 ) -> bool {
44 if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
45 return true;
46 }
47
48 let mut result = true;
49 for filter in self.filters.iter() {
50 if is_partition_column(filter.column_name()) {
51 continue;
52 }
53 let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
54 continue;
55 };
56 if column.semantic_type != SemanticType::Tag {
58 continue;
59 }
60
61 let value = match decode_fn(column.column_id, &self.metadata) {
62 Ok(v) => v,
63 Err(e) => {
64 common_telemetry::error!(e; "Failed to decode primary key");
65 return true;
66 }
67 };
68
69 let scalar_value = value
74 .try_to_scalar_value(&column.column_schema.data_type)
75 .unwrap();
76 result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
77 }
78
79 result
80 }
81}
82
83#[derive(Clone)]
85pub struct DensePrimaryKeyFilter {
86 inner: PrimaryKeyFilterInner,
87 codec: DensePrimaryKeyCodec,
88 offsets_buf: Vec<usize>,
89}
90
91impl DensePrimaryKeyFilter {
92 pub(crate) fn new(
93 metadata: RegionMetadataRef,
94 filters: Arc<Vec<SimpleFilterEvaluator>>,
95 codec: DensePrimaryKeyCodec,
96 ) -> Self {
97 Self {
98 inner: PrimaryKeyFilterInner { metadata, filters },
99 codec,
100 offsets_buf: Vec::new(),
101 }
102 }
103}
104
105impl PrimaryKeyFilter for DensePrimaryKeyFilter {
106 fn matches(&mut self, pk: &[u8]) -> bool {
107 self.offsets_buf.clear();
108 self.inner.evaluate_filters(|column_id, metadata| {
109 let index = metadata.primary_key_index(column_id).unwrap();
112 self.codec.decode_value_at(pk, index, &mut self.offsets_buf)
113 })
114 }
115}
116
117#[derive(Clone)]
119pub struct SparsePrimaryKeyFilter {
120 inner: PrimaryKeyFilterInner,
121 codec: SparsePrimaryKeyCodec,
122 offsets_map: HashMap<ColumnId, usize>,
123}
124
125impl SparsePrimaryKeyFilter {
126 pub(crate) fn new(
127 metadata: RegionMetadataRef,
128 filters: Arc<Vec<SimpleFilterEvaluator>>,
129 codec: SparsePrimaryKeyCodec,
130 ) -> Self {
131 Self {
132 inner: PrimaryKeyFilterInner { metadata, filters },
133 codec,
134 offsets_map: HashMap::new(),
135 }
136 }
137}
138
139impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
140 fn matches(&mut self, pk: &[u8]) -> bool {
141 self.offsets_map.clear();
142 self.inner.evaluate_filters(|column_id, _| {
143 if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
144 self.codec.decode_value_at(pk, offset, column_id)
145 } else {
146 Ok(Value::Null)
147 }
148 })
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::sync::Arc;
155
156 use api::v1::SemanticType;
157 use common_query::prelude::{greptime_timestamp, greptime_value};
158 use datafusion_common::Column;
159 use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
160 use datatypes::prelude::ConcreteDataType;
161 use datatypes::schema::ColumnSchema;
162 use datatypes::value::ValueRef;
163 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
164 use store_api::storage::{ColumnId, RegionId};
165
166 use super::*;
167 use crate::row_converter::PrimaryKeyCodecExt;
168
169 fn setup_metadata() -> RegionMetadataRef {
170 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
171 builder
172 .push_column_metadata(ColumnMetadata {
173 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
174 semantic_type: SemanticType::Tag,
175 column_id: 1,
176 })
177 .push_column_metadata(ColumnMetadata {
178 column_schema: ColumnSchema::new(
179 "namespace",
180 ConcreteDataType::string_datatype(),
181 true,
182 ),
183 semantic_type: SemanticType::Tag,
184 column_id: 2,
185 })
186 .push_column_metadata(ColumnMetadata {
187 column_schema: ColumnSchema::new(
188 "container",
189 ConcreteDataType::string_datatype(),
190 true,
191 ),
192 semantic_type: SemanticType::Tag,
193 column_id: 3,
194 })
195 .push_column_metadata(ColumnMetadata {
196 column_schema: ColumnSchema::new(
197 greptime_value(),
198 ConcreteDataType::float64_datatype(),
199 false,
200 ),
201 semantic_type: SemanticType::Field,
202 column_id: 4,
203 })
204 .push_column_metadata(ColumnMetadata {
205 column_schema: ColumnSchema::new(
206 greptime_timestamp(),
207 ConcreteDataType::timestamp_nanosecond_datatype(),
208 false,
209 ),
210 semantic_type: SemanticType::Timestamp,
211 column_id: 5,
212 })
213 .primary_key(vec![1, 2, 3]);
214 let metadata = builder.build().unwrap();
215 Arc::new(metadata)
216 }
217
218 fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
219 vec![
220 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
221 (2, ValueRef::String("greptime-cluster")),
222 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
223 ]
224 }
225
226 fn create_filter(column_name: &str, value: &str) -> SimpleFilterEvaluator {
227 let expr = Expr::BinaryExpr(BinaryExpr {
228 left: Box::new(Expr::Column(Column::from_name(column_name))),
229 op: Operator::Eq,
230 right: Box::new(value.lit()),
231 });
232 SimpleFilterEvaluator::try_new(&expr).unwrap()
233 }
234
235 fn encode_sparse_pk(
236 metadata: &RegionMetadataRef,
237 row: Vec<(ColumnId, ValueRef<'static>)>,
238 ) -> Vec<u8> {
239 let codec = SparsePrimaryKeyCodec::new(metadata);
240 let mut pk = Vec::new();
241 codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
242 pk
243 }
244
245 fn encode_dense_pk(
246 metadata: &RegionMetadataRef,
247 row: Vec<(ColumnId, ValueRef<'static>)>,
248 ) -> Vec<u8> {
249 let codec = DensePrimaryKeyCodec::new(metadata);
250 let mut pk = Vec::new();
251 codec
252 .encode_to_vec(row.into_iter().map(|(_, v)| v), &mut pk)
253 .unwrap();
254 pk
255 }
256
257 #[test]
258 fn test_sparse_primary_key_filter_matches() {
259 let metadata = setup_metadata();
260 let filters = Arc::new(vec![create_filter(
261 "pod",
262 "greptime-frontend-6989d9899-22222",
263 )]);
264 let pk = encode_sparse_pk(&metadata, create_test_row());
265 let codec = SparsePrimaryKeyCodec::new(&metadata);
266 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
267 assert!(filter.matches(&pk));
268 }
269
270 #[test]
271 fn test_sparse_primary_key_filter_not_matches() {
272 let metadata = setup_metadata();
273 let filters = Arc::new(vec![create_filter(
274 "pod",
275 "greptime-frontend-6989d9899-22223",
276 )]);
277 let pk = encode_sparse_pk(&metadata, create_test_row());
278 let codec = SparsePrimaryKeyCodec::new(&metadata);
279 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
280 assert!(!filter.matches(&pk));
281 }
282
283 #[test]
284 fn test_sparse_primary_key_filter_matches_with_null() {
285 let metadata = setup_metadata();
286 let filters = Arc::new(vec![create_filter(
287 "non-exist-label",
288 "greptime-frontend-6989d9899-22222",
289 )]);
290 let pk = encode_sparse_pk(&metadata, create_test_row());
291 let codec = SparsePrimaryKeyCodec::new(&metadata);
292 let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
293 assert!(filter.matches(&pk));
294 }
295
296 #[test]
297 fn test_dense_primary_key_filter_matches() {
298 let metadata = setup_metadata();
299 let filters = Arc::new(vec![create_filter(
300 "pod",
301 "greptime-frontend-6989d9899-22222",
302 )]);
303 let pk = encode_dense_pk(&metadata, create_test_row());
304 let codec = DensePrimaryKeyCodec::new(&metadata);
305 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
306 assert!(filter.matches(&pk));
307 }
308
309 #[test]
310 fn test_dense_primary_key_filter_not_matches() {
311 let metadata = setup_metadata();
312 let filters = Arc::new(vec![create_filter(
313 "pod",
314 "greptime-frontend-6989d9899-22223",
315 )]);
316 let pk = encode_dense_pk(&metadata, create_test_row());
317 let codec = DensePrimaryKeyCodec::new(&metadata);
318 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
319 assert!(!filter.matches(&pk));
320 }
321
322 #[test]
323 fn test_dense_primary_key_filter_matches_with_null() {
324 let metadata = setup_metadata();
325 let filters = Arc::new(vec![create_filter(
326 "non-exist-label",
327 "greptime-frontend-6989d9899-22222",
328 )]);
329 let pk = encode_dense_pk(&metadata, create_test_row());
330 let codec = DensePrimaryKeyCodec::new(&metadata);
331 let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
332 assert!(filter.matches(&pk));
333 }
334}