1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::prelude::ConcreteDataType;
20use datatypes::value::{Value, ValueRef};
21use memcomparable::{Deserializer, Serializer};
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::consts::ReservedColumnId;
27use store_api::storage::ColumnId;
28
29use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
30use crate::memtable::key_values::KeyValue;
31use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
32use crate::row_converter::dense::SortField;
33use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
34
35#[derive(Clone, Debug)]
39pub struct SparsePrimaryKeyCodec {
40 inner: Arc<SparsePrimaryKeyCodecInner>,
41}
42
43#[derive(Debug)]
44struct SparsePrimaryKeyCodecInner {
45 table_id_field: SortField,
47 tsid_field: SortField,
49 label_field: SortField,
51 columns: Option<HashSet<ColumnId>>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct SparseValues {
62 values: HashMap<ColumnId, Value>,
63}
64
65impl SparseValues {
66 pub fn new(values: HashMap<ColumnId, Value>) -> Self {
68 Self { values }
69 }
70
71 pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
73 self.values.get(&column_id).unwrap_or(&Value::Null)
74 }
75
76 pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
78 self.values.get(column_id)
79 }
80
81 pub fn insert(&mut self, column_id: ColumnId, value: Value) {
83 self.values.insert(column_id, value);
84 }
85}
86
87const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
89const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
91pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
93
94impl SparsePrimaryKeyCodec {
95 pub fn new(region_metadata: &RegionMetadataRef) -> Self {
97 Self {
98 inner: Arc::new(SparsePrimaryKeyCodecInner {
99 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
100 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
101 label_field: SortField::new(ConcreteDataType::string_datatype()),
102 columns: Some(
103 region_metadata
104 .primary_key_columns()
105 .map(|c| c.column_id)
106 .collect(),
107 ),
108 }),
109 }
110 }
111
112 pub fn schemaless() -> Self {
116 Self {
117 inner: Arc::new(SparsePrimaryKeyCodecInner {
118 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
119 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
120 label_field: SortField::new(ConcreteDataType::string_datatype()),
121 columns: None,
122 }),
123 }
124 }
125
126 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
127 Self {
128 inner: Arc::new(SparsePrimaryKeyCodecInner {
129 columns: Some(fields.iter().map(|f| f.0).collect()),
130 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
131 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
132 label_field: SortField::new(ConcreteDataType::string_datatype()),
133 }),
134 }
135 }
136
137 fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
139 if let Some(columns) = &self.inner.columns {
141 if !columns.contains(&column_id) {
142 return None;
143 }
144 }
145
146 match column_id {
147 RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
148 RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
149 _ => Some(&self.inner.label_field),
150 }
151 }
152
153 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
155 where
156 I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
157 {
158 let mut serializer = Serializer::new(buffer);
159 for (column_id, value) in row {
160 if value.is_null() {
161 continue;
162 }
163
164 if let Some(field) = self.get_field(column_id) {
165 column_id
166 .serialize(&mut serializer)
167 .context(SerializeFieldSnafu)?;
168 field.serialize(&mut serializer, &value)?;
169 } else {
170 common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
172 }
173 }
174 Ok(())
175 }
176
177 fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
179 let mut deserializer = Deserializer::new(bytes);
180 let mut values = SparseValues::new(HashMap::new());
181
182 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
183 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
184 values.insert(column_id, value);
185
186 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
187 let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
188 values.insert(column_id, value);
189 while deserializer.has_remaining() {
190 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
191 let value = self.inner.label_field.deserialize(&mut deserializer)?;
192 values.insert(column_id, value);
193 }
194
195 Ok(values)
196 }
197
198 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
200 let mut deserializer = Deserializer::new(bytes);
201 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
203 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
204 Ok(Some(value))
205 }
206
207 pub(crate) fn has_column(
209 &self,
210 pk: &[u8],
211 offsets_map: &mut HashMap<u32, usize>,
212 column_id: ColumnId,
213 ) -> Option<usize> {
214 if offsets_map.is_empty() {
215 let mut deserializer = Deserializer::new(pk);
216 let mut offset = 0;
217 while deserializer.has_remaining() {
218 let column_id = u32::deserialize(&mut deserializer).unwrap();
219 offset += 4;
220 offsets_map.insert(column_id, offset);
221 let Some(field) = self.get_field(column_id) else {
222 break;
223 };
224
225 let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
226 offset += skip;
227 }
228
229 offsets_map.get(&column_id).copied()
230 } else {
231 offsets_map.get(&column_id).copied()
232 }
233 }
234
235 pub(crate) fn decode_value_at(
237 &self,
238 pk: &[u8],
239 offset: usize,
240 column_id: ColumnId,
241 ) -> Result<Value> {
242 let mut deserializer = Deserializer::new(pk);
243 deserializer.advance(offset);
244 let field = self.get_field(column_id).unwrap();
246 field.deserialize(&mut deserializer)
247 }
248}
249
250impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
251 fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
252 UnsupportedOperationSnafu {
253 err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
254 }
255 .fail()
256 }
257
258 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
259 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
260 }
261
262 fn encode_value_refs(
263 &self,
264 values: &[(ColumnId, ValueRef)],
265 buffer: &mut Vec<u8>,
266 ) -> Result<()> {
267 self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
268 }
269
270 fn estimated_size(&self) -> Option<usize> {
271 None
272 }
273
274 fn num_fields(&self) -> Option<usize> {
275 None
276 }
277
278 fn encoding(&self) -> PrimaryKeyEncoding {
279 PrimaryKeyEncoding::Sparse
280 }
281
282 fn primary_key_filter(
283 &self,
284 metadata: &RegionMetadataRef,
285 filters: Arc<Vec<SimpleFilterEvaluator>>,
286 ) -> Box<dyn PrimaryKeyFilter> {
287 Box::new(SparsePrimaryKeyFilter::new(
288 metadata.clone(),
289 filters,
290 self.clone(),
291 ))
292 }
293
294 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
295 Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
296 }
297
298 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
299 self.decode_leftmost(bytes)
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use std::sync::Arc;
306
307 use api::v1::SemanticType;
308 use common_time::timestamp::TimeUnit;
309 use common_time::Timestamp;
310 use datatypes::schema::ColumnSchema;
311 use datatypes::value::{OrderedFloat, Value};
312 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
313 use store_api::metric_engine_consts::{
314 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
315 };
316 use store_api::storage::{ColumnId, RegionId};
317
318 use super::*;
319
320 fn test_region_metadata() -> RegionMetadataRef {
321 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
322 builder
323 .push_column_metadata(ColumnMetadata {
324 column_schema: ColumnSchema::new(
325 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
326 ConcreteDataType::uint32_datatype(),
327 false,
328 ),
329 semantic_type: SemanticType::Tag,
330 column_id: ReservedColumnId::table_id(),
331 })
332 .push_column_metadata(ColumnMetadata {
333 column_schema: ColumnSchema::new(
334 DATA_SCHEMA_TSID_COLUMN_NAME,
335 ConcreteDataType::uint64_datatype(),
336 false,
337 ),
338 semantic_type: SemanticType::Tag,
339 column_id: ReservedColumnId::tsid(),
340 })
341 .push_column_metadata(ColumnMetadata {
342 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
343 semantic_type: SemanticType::Tag,
344 column_id: 1,
345 })
346 .push_column_metadata(ColumnMetadata {
347 column_schema: ColumnSchema::new(
348 "namespace",
349 ConcreteDataType::string_datatype(),
350 true,
351 ),
352 semantic_type: SemanticType::Tag,
353 column_id: 2,
354 })
355 .push_column_metadata(ColumnMetadata {
356 column_schema: ColumnSchema::new(
357 "container",
358 ConcreteDataType::string_datatype(),
359 true,
360 ),
361 semantic_type: SemanticType::Tag,
362 column_id: 3,
363 })
364 .push_column_metadata(ColumnMetadata {
365 column_schema: ColumnSchema::new(
366 "pod_name",
367 ConcreteDataType::string_datatype(),
368 true,
369 ),
370 semantic_type: SemanticType::Tag,
371 column_id: 4,
372 })
373 .push_column_metadata(ColumnMetadata {
374 column_schema: ColumnSchema::new(
375 "pod_ip",
376 ConcreteDataType::string_datatype(),
377 true,
378 ),
379 semantic_type: SemanticType::Tag,
380 column_id: 5,
381 })
382 .push_column_metadata(ColumnMetadata {
383 column_schema: ColumnSchema::new(
384 "greptime_value",
385 ConcreteDataType::float64_datatype(),
386 false,
387 ),
388 semantic_type: SemanticType::Field,
389 column_id: 6,
390 })
391 .push_column_metadata(ColumnMetadata {
392 column_schema: ColumnSchema::new(
393 "greptime_timestamp",
394 ConcreteDataType::timestamp_nanosecond_datatype(),
395 false,
396 ),
397 semantic_type: SemanticType::Timestamp,
398 column_id: 7,
399 })
400 .primary_key(vec![
401 ReservedColumnId::table_id(),
402 ReservedColumnId::tsid(),
403 1,
404 2,
405 3,
406 4,
407 5,
408 ]);
409 let metadata = builder.build().unwrap();
410 Arc::new(metadata)
411 }
412
413 #[test]
414 fn test_sparse_value_new_and_get_or_null() {
415 let mut values = HashMap::new();
416 values.insert(1, Value::Int32(42));
417 let sparse_value = SparseValues::new(values);
418
419 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
420 assert_eq!(sparse_value.get_or_null(2), &Value::Null);
421 }
422
423 #[test]
424 fn test_sparse_value_insert() {
425 let mut sparse_value = SparseValues::new(HashMap::new());
426 sparse_value.insert(1, Value::Int32(42));
427
428 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
429 }
430
431 fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
432 vec![
433 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
434 (
435 RESERVED_COLUMN_ID_TSID,
436 ValueRef::UInt64(123843349035232323),
437 ),
438 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
440 (2, ValueRef::String("greptime-cluster")),
442 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
444 (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
446 (5, ValueRef::String("10.10.10.10")),
448 (6, ValueRef::Float64(OrderedFloat(1.0))),
450 (
452 7,
453 ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
454 ),
455 ]
456 }
457
458 #[test]
459 fn test_encode_to_vec() {
460 let region_metadata = test_region_metadata();
461 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
462 let mut buffer = Vec::new();
463
464 let row = test_row();
465 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
466 assert!(!buffer.is_empty());
467 let sparse_value = codec.decode_sparse(&buffer).unwrap();
468 assert_eq!(
469 sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
470 &Value::UInt32(42)
471 );
472 assert_eq!(
473 sparse_value.get_or_null(1),
474 &Value::String("greptime-frontend-6989d9899-22222".into())
475 );
476 assert_eq!(
477 sparse_value.get_or_null(2),
478 &Value::String("greptime-cluster".into())
479 );
480 assert_eq!(
481 sparse_value.get_or_null(3),
482 &Value::String("greptime-frontend-6989d9899-22222".into())
483 );
484 assert_eq!(
485 sparse_value.get_or_null(4),
486 &Value::String("greptime-frontend-6989d9899-22222".into())
487 );
488 assert_eq!(
489 sparse_value.get_or_null(5),
490 &Value::String("10.10.10.10".into())
491 );
492 }
493
494 #[test]
495 fn test_decode_leftmost() {
496 let region_metadata = test_region_metadata();
497 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
498 let mut buffer = Vec::new();
499 let row = test_row();
500 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
501 assert!(!buffer.is_empty());
502 let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
503 assert_eq!(result, Value::UInt32(42));
504 }
505
506 #[test]
507 fn test_has_column() {
508 let region_metadata = test_region_metadata();
509 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
510 let mut buffer = Vec::new();
511 let row = test_row();
512 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
513 assert!(!buffer.is_empty());
514
515 let mut offsets_map = HashMap::new();
516 for column_id in [
517 RESERVED_COLUMN_ID_TABLE_ID,
518 RESERVED_COLUMN_ID_TSID,
519 1,
520 2,
521 3,
522 4,
523 5,
524 ] {
525 let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
526 assert!(offset.is_some());
527 }
528
529 let offset = codec.has_column(&buffer, &mut offsets_map, 6);
530 assert!(offset.is_none());
531 }
532
533 #[test]
534 fn test_decode_value_at() {
535 let region_metadata = test_region_metadata();
536 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
537 let mut buffer = Vec::new();
538 let row = test_row();
539 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
540 assert!(!buffer.is_empty());
541
542 let row = test_row();
543 let mut offsets_map = HashMap::new();
544 for column_id in [
545 RESERVED_COLUMN_ID_TABLE_ID,
546 RESERVED_COLUMN_ID_TSID,
547 1,
548 2,
549 3,
550 4,
551 5,
552 ] {
553 let offset = codec
554 .has_column(&buffer, &mut offsets_map, column_id)
555 .unwrap();
556 let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
557 let expected_value = row.iter().find(|(id, _)| *id == column_id).unwrap().1;
558 assert_eq!(value.as_value_ref(), expected_value);
559 }
560 }
561}