1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::BufMut;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::prelude::ConcreteDataType;
21use datatypes::value::{Value, ValueRef};
22use memcomparable::{Deserializer, Serializer};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use store_api::storage::consts::ReservedColumnId;
29
30use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
31use crate::key_values::KeyValue;
32use crate::primary_key_filter::SparsePrimaryKeyFilter;
33use crate::row_converter::dense::SortField;
34use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
35
36#[derive(Clone, Debug)]
40pub struct SparsePrimaryKeyCodec {
41 inner: Arc<SparsePrimaryKeyCodecInner>,
42}
43
44#[derive(Debug)]
45struct SparsePrimaryKeyCodecInner {
46 table_id_field: SortField,
48 tsid_field: SortField,
50 label_field: SortField,
52 columns: Option<HashSet<ColumnId>>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct SparseValues {
63 values: HashMap<ColumnId, Value>,
64}
65
66impl SparseValues {
67 pub fn new(values: HashMap<ColumnId, Value>) -> Self {
69 Self { values }
70 }
71
72 pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
74 self.values.get(&column_id).unwrap_or(&Value::Null)
75 }
76
77 pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
79 self.values.get(column_id)
80 }
81
82 pub fn insert(&mut self, column_id: ColumnId, value: Value) {
84 self.values.insert(column_id, value);
85 }
86}
87
88pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
90pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
92pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
94
95impl SparsePrimaryKeyCodec {
96 pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
98 let columns = columns_ids.collect();
99 Self {
100 inner: Arc::new(SparsePrimaryKeyCodecInner {
101 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
102 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
103 label_field: SortField::new(ConcreteDataType::string_datatype()),
104 columns: Some(columns),
105 }),
106 }
107 }
108
109 pub fn new(region_metadata: &RegionMetadataRef) -> Self {
111 Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id))
112 }
113
114 pub fn schemaless() -> Self {
118 Self {
119 inner: Arc::new(SparsePrimaryKeyCodecInner {
120 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
121 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
122 label_field: SortField::new(ConcreteDataType::string_datatype()),
123 columns: None,
124 }),
125 }
126 }
127
128 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
130 Self {
131 inner: Arc::new(SparsePrimaryKeyCodecInner {
132 columns: Some(fields.iter().map(|f| f.0).collect()),
133 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
134 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
135 label_field: SortField::new(ConcreteDataType::string_datatype()),
136 }),
137 }
138 }
139
140 fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
142 if let Some(columns) = &self.inner.columns
144 && !columns.contains(&column_id)
145 {
146 return None;
147 }
148
149 match column_id {
150 RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
151 RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
152 _ => Some(&self.inner.label_field),
153 }
154 }
155
156 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
158 where
159 I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
160 {
161 let mut serializer = Serializer::new(buffer);
162 for (column_id, value) in row {
163 if value.is_null() {
164 continue;
165 }
166
167 if let Some(field) = self.get_field(column_id) {
168 column_id
169 .serialize(&mut serializer)
170 .context(SerializeFieldSnafu)?;
171 field.serialize(&mut serializer, &value)?;
172 } else {
173 common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
175 }
176 }
177 Ok(())
178 }
179
180 pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
181 where
182 I: Iterator<Item = (ColumnId, &'a [u8])>,
183 {
184 for (tag_column_id, tag_value) in row {
185 let value_len = tag_value.len();
186 buffer.reserve(6 + value_len / 8 * 9);
187 buffer.put_u32(tag_column_id);
188 buffer.put_u8(1);
189 buffer.put_u8(!tag_value.is_empty() as u8);
190
191 let mut len = 0;
194 let num_chucks = value_len / 8;
195 let remainder = value_len % 8;
196
197 for idx in 0..num_chucks {
198 buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]);
199 len += 8;
200 let extra = if len == value_len { 8 } else { 9 };
204 buffer.put_u8(extra);
205 }
206
207 if remainder != 0 {
208 buffer.extend_from_slice(&tag_value[len..value_len]);
209 buffer.put_bytes(0, 8 - remainder);
210 buffer.put_u8(remainder as u8);
211 }
212 }
213 Ok(())
214 }
215
216 pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec<u8>) -> Result<()> {
218 buffer.reserve_exact(22);
219 buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID);
220 buffer.put_u8(1);
221 buffer.put_u32(table_id);
222 buffer.put_u32(RESERVED_COLUMN_ID_TSID);
223 buffer.put_u8(1);
224 buffer.put_u64(tsid);
225 Ok(())
226 }
227
228 fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
230 let mut deserializer = Deserializer::new(bytes);
231 let mut values = SparseValues::new(HashMap::new());
232
233 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
234 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
235 values.insert(column_id, value);
236
237 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
238 let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
239 values.insert(column_id, value);
240 while deserializer.has_remaining() {
241 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
242 let value = self.inner.label_field.deserialize(&mut deserializer)?;
243 values.insert(column_id, value);
244 }
245
246 Ok(values)
247 }
248
249 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
251 let mut deserializer = Deserializer::new(bytes);
252 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
254 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
255 Ok(Some(value))
256 }
257
258 pub fn has_column(
260 &self,
261 pk: &[u8],
262 offsets_map: &mut HashMap<u32, usize>,
263 column_id: ColumnId,
264 ) -> Option<usize> {
265 if offsets_map.is_empty() {
266 let mut deserializer = Deserializer::new(pk);
267 let mut offset = 0;
268 while deserializer.has_remaining() {
269 let column_id = u32::deserialize(&mut deserializer).unwrap();
270 offset += 4;
271 offsets_map.insert(column_id, offset);
272 let Some(field) = self.get_field(column_id) else {
273 break;
274 };
275
276 let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
277 offset += skip;
278 }
279
280 offsets_map.get(&column_id).copied()
281 } else {
282 offsets_map.get(&column_id).copied()
283 }
284 }
285
286 pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
288 let mut deserializer = Deserializer::new(pk);
289 deserializer.advance(offset);
290 let field = self.get_field(column_id).unwrap();
292 field.deserialize(&mut deserializer)
293 }
294}
295
296impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
297 fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
298 UnsupportedOperationSnafu {
299 err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
300 }
301 .fail()
302 }
303
304 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
305 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
306 }
307
308 fn encode_value_refs(
309 &self,
310 values: &[(ColumnId, ValueRef)],
311 buffer: &mut Vec<u8>,
312 ) -> Result<()> {
313 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.clone())), buffer)
314 }
315
316 fn estimated_size(&self) -> Option<usize> {
317 None
318 }
319
320 fn num_fields(&self) -> Option<usize> {
321 None
322 }
323
324 fn encoding(&self) -> PrimaryKeyEncoding {
325 PrimaryKeyEncoding::Sparse
326 }
327
328 fn primary_key_filter(
329 &self,
330 metadata: &RegionMetadataRef,
331 filters: Arc<Vec<SimpleFilterEvaluator>>,
332 ) -> Box<dyn PrimaryKeyFilter> {
333 Box::new(SparsePrimaryKeyFilter::new(
334 metadata.clone(),
335 filters,
336 self.clone(),
337 ))
338 }
339
340 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
341 Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
342 }
343
344 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
345 self.decode_leftmost(bytes)
346 }
347}
348
349pub struct FieldWithId {
351 pub field: SortField,
352 pub column_id: ColumnId,
353}
354
355pub struct SparseEncoder {
357 fields: Vec<FieldWithId>,
358}
359
360impl SparseEncoder {
361 pub fn new(fields: Vec<FieldWithId>) -> Self {
362 Self { fields }
363 }
364
365 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
366 where
367 I: Iterator<Item = ValueRef<'a>>,
368 {
369 let mut serializer = Serializer::new(buffer);
370 for (value, field) in row.zip(self.fields.iter()) {
371 if !value.is_null() {
372 field
373 .column_id
374 .serialize(&mut serializer)
375 .context(SerializeFieldSnafu)?;
376 field.field.serialize(&mut serializer, &value)?;
377 }
378 }
379 Ok(())
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use std::sync::Arc;
386
387 use api::v1::SemanticType;
388 use common_query::prelude::{greptime_timestamp, greptime_value};
389 use common_time::Timestamp;
390 use common_time::timestamp::TimeUnit;
391 use datatypes::schema::ColumnSchema;
392 use datatypes::value::{OrderedFloat, Value};
393 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
394 use store_api::metric_engine_consts::{
395 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
396 };
397 use store_api::storage::{ColumnId, RegionId};
398
399 use super::*;
400
401 fn test_region_metadata() -> RegionMetadataRef {
402 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
403 builder
404 .push_column_metadata(ColumnMetadata {
405 column_schema: ColumnSchema::new(
406 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
407 ConcreteDataType::uint32_datatype(),
408 false,
409 ),
410 semantic_type: SemanticType::Tag,
411 column_id: ReservedColumnId::table_id(),
412 })
413 .push_column_metadata(ColumnMetadata {
414 column_schema: ColumnSchema::new(
415 DATA_SCHEMA_TSID_COLUMN_NAME,
416 ConcreteDataType::uint64_datatype(),
417 false,
418 ),
419 semantic_type: SemanticType::Tag,
420 column_id: ReservedColumnId::tsid(),
421 })
422 .push_column_metadata(ColumnMetadata {
423 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
424 semantic_type: SemanticType::Tag,
425 column_id: 1,
426 })
427 .push_column_metadata(ColumnMetadata {
428 column_schema: ColumnSchema::new(
429 "namespace",
430 ConcreteDataType::string_datatype(),
431 true,
432 ),
433 semantic_type: SemanticType::Tag,
434 column_id: 2,
435 })
436 .push_column_metadata(ColumnMetadata {
437 column_schema: ColumnSchema::new(
438 "container",
439 ConcreteDataType::string_datatype(),
440 true,
441 ),
442 semantic_type: SemanticType::Tag,
443 column_id: 3,
444 })
445 .push_column_metadata(ColumnMetadata {
446 column_schema: ColumnSchema::new(
447 "pod_name",
448 ConcreteDataType::string_datatype(),
449 true,
450 ),
451 semantic_type: SemanticType::Tag,
452 column_id: 4,
453 })
454 .push_column_metadata(ColumnMetadata {
455 column_schema: ColumnSchema::new(
456 "pod_ip",
457 ConcreteDataType::string_datatype(),
458 true,
459 ),
460 semantic_type: SemanticType::Tag,
461 column_id: 5,
462 })
463 .push_column_metadata(ColumnMetadata {
464 column_schema: ColumnSchema::new(
465 greptime_value(),
466 ConcreteDataType::float64_datatype(),
467 false,
468 ),
469 semantic_type: SemanticType::Field,
470 column_id: 6,
471 })
472 .push_column_metadata(ColumnMetadata {
473 column_schema: ColumnSchema::new(
474 greptime_timestamp(),
475 ConcreteDataType::timestamp_nanosecond_datatype(),
476 false,
477 ),
478 semantic_type: SemanticType::Timestamp,
479 column_id: 7,
480 })
481 .primary_key(vec![
482 ReservedColumnId::table_id(),
483 ReservedColumnId::tsid(),
484 1,
485 2,
486 3,
487 4,
488 5,
489 ]);
490 let metadata = builder.build().unwrap();
491 Arc::new(metadata)
492 }
493
494 #[test]
495 fn test_sparse_value_new_and_get_or_null() {
496 let mut values = HashMap::new();
497 values.insert(1, Value::Int32(42));
498 let sparse_value = SparseValues::new(values);
499
500 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
501 assert_eq!(sparse_value.get_or_null(2), &Value::Null);
502 }
503
504 #[test]
505 fn test_sparse_value_insert() {
506 let mut sparse_value = SparseValues::new(HashMap::new());
507 sparse_value.insert(1, Value::Int32(42));
508
509 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
510 }
511
512 fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
513 vec![
514 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
515 (
516 RESERVED_COLUMN_ID_TSID,
517 ValueRef::UInt64(123843349035232323),
518 ),
519 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
521 (2, ValueRef::String("greptime-cluster")),
523 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
525 (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
527 (5, ValueRef::String("10.10.10.10")),
529 (6, ValueRef::Float64(OrderedFloat(1.0))),
531 (
533 7,
534 ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
535 ),
536 ]
537 }
538
539 #[test]
540 fn test_encode_by_short_cuts() {
541 let region_metadata = test_region_metadata();
542 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
543 let mut buffer = Vec::new();
544 let internal_columns = [
545 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)),
546 (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)),
547 ];
548 let tags = [
549 (1, "greptime-frontend-6989d9899-22222"),
550 (2, "greptime-cluster"),
551 (3, "greptime-frontend-6989d9899-22222"),
552 (4, "greptime-frontend-6989d9899-22222"),
553 (5, "10.10.10.10"),
554 ];
555 codec
556 .encode_to_vec(internal_columns.into_iter(), &mut buffer)
557 .unwrap();
558 codec
559 .encode_to_vec(
560 tags.iter()
561 .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
562 &mut buffer,
563 )
564 .unwrap();
565
566 let mut buffer_by_raw_encoding = Vec::new();
567 codec
568 .encode_internal(1024, 42, &mut buffer_by_raw_encoding)
569 .unwrap();
570 let tags: Vec<_> = tags
571 .into_iter()
572 .map(|(col_id, tag_value)| (col_id, tag_value.as_bytes()))
573 .collect();
574 codec
575 .encode_raw_tag_value(
576 tags.iter().map(|(c, b)| (*c, *b)),
577 &mut buffer_by_raw_encoding,
578 )
579 .unwrap();
580 assert_eq!(buffer, buffer_by_raw_encoding);
581 }
582
583 #[test]
584 fn test_encode_to_vec() {
585 let region_metadata = test_region_metadata();
586 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
587 let mut buffer = Vec::new();
588
589 let row = test_row();
590 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
591 assert!(!buffer.is_empty());
592 let sparse_value = codec.decode_sparse(&buffer).unwrap();
593 assert_eq!(
594 sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
595 &Value::UInt32(42)
596 );
597 assert_eq!(
598 sparse_value.get_or_null(1),
599 &Value::String("greptime-frontend-6989d9899-22222".into())
600 );
601 assert_eq!(
602 sparse_value.get_or_null(2),
603 &Value::String("greptime-cluster".into())
604 );
605 assert_eq!(
606 sparse_value.get_or_null(3),
607 &Value::String("greptime-frontend-6989d9899-22222".into())
608 );
609 assert_eq!(
610 sparse_value.get_or_null(4),
611 &Value::String("greptime-frontend-6989d9899-22222".into())
612 );
613 assert_eq!(
614 sparse_value.get_or_null(5),
615 &Value::String("10.10.10.10".into())
616 );
617 }
618
619 #[test]
620 fn test_decode_leftmost() {
621 let region_metadata = test_region_metadata();
622 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
623 let mut buffer = Vec::new();
624 let row = test_row();
625 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
626 assert!(!buffer.is_empty());
627 let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
628 assert_eq!(result, Value::UInt32(42));
629 }
630
631 #[test]
632 fn test_has_column() {
633 let region_metadata = test_region_metadata();
634 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
635 let mut buffer = Vec::new();
636 let row = test_row();
637 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
638 assert!(!buffer.is_empty());
639
640 let mut offsets_map = HashMap::new();
641 for column_id in [
642 RESERVED_COLUMN_ID_TABLE_ID,
643 RESERVED_COLUMN_ID_TSID,
644 1,
645 2,
646 3,
647 4,
648 5,
649 ] {
650 let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
651 assert!(offset.is_some());
652 }
653
654 let offset = codec.has_column(&buffer, &mut offsets_map, 6);
655 assert!(offset.is_none());
656 }
657
658 #[test]
659 fn test_decode_value_at() {
660 let region_metadata = test_region_metadata();
661 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
662 let mut buffer = Vec::new();
663 let row = test_row();
664 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
665 assert!(!buffer.is_empty());
666
667 let row = test_row();
668 let mut offsets_map = HashMap::new();
669 for column_id in [
670 RESERVED_COLUMN_ID_TABLE_ID,
671 RESERVED_COLUMN_ID_TSID,
672 1,
673 2,
674 3,
675 4,
676 5,
677 ] {
678 let offset = codec
679 .has_column(&buffer, &mut offsets_map, column_id)
680 .unwrap();
681 let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
682 let expected_value = row
683 .iter()
684 .find(|(id, _)| *id == column_id)
685 .unwrap()
686 .1
687 .clone();
688 assert_eq!(value.as_value_ref(), expected_value);
689 }
690 }
691}