1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::BufMut;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::prelude::ConcreteDataType;
21use datatypes::value::{Value, ValueRef};
22use memcomparable::{Deserializer, Serializer};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use store_api::storage::consts::ReservedColumnId;
29
30use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
31use crate::key_values::KeyValue;
32use crate::primary_key_filter::SparsePrimaryKeyFilter;
33use crate::row_converter::dense::SortField;
34use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
35
36#[derive(Clone, Debug)]
54pub struct SparsePrimaryKeyCodec {
55 inner: Arc<SparsePrimaryKeyCodecInner>,
56}
57
58#[derive(Debug)]
59struct SparsePrimaryKeyCodecInner {
60 table_id_field: SortField,
62 tsid_field: SortField,
64 label_field: SortField,
66 columns: Option<HashSet<ColumnId>>,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Default)]
77pub struct SparseValues {
78 values: Vec<(ColumnId, Value)>,
79}
80
81impl SparseValues {
82 pub fn new() -> Self {
84 Self { values: Vec::new() }
85 }
86
87 pub fn with_capacity(cap: usize) -> Self {
89 Self {
90 values: Vec::with_capacity(cap),
91 }
92 }
93
94 pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
96 for (id, value) in &self.values {
97 if *id == column_id {
98 return value;
99 }
100 }
101 &Value::Null
102 }
103
104 pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
106 for (id, value) in &self.values {
107 if id == column_id {
108 return Some(value);
109 }
110 }
111 None
112 }
113
114 pub fn insert(&mut self, column_id: ColumnId, value: Value) {
118 self.values.push((column_id, value));
119 }
120
121 pub fn iter(&self) -> impl Iterator<Item = (&ColumnId, &Value)> {
123 self.values.iter().map(|(id, value)| (id, value))
124 }
125}
126
127pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
129pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
131pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
133
134const TABLE_ID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE;
138const TSID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE;
140const TAGS_START_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE + 9;
142
143const SPARSE_OFFSETS_INLINE_CAP: usize = 32;
149
150#[derive(Debug, Clone)]
152pub struct SparseOffsetsCache {
153 inline: Vec<(ColumnId, usize)>,
156 overflow: HashMap<ColumnId, usize>,
158 cursor: usize,
160 finished: bool,
163}
164
165impl Default for SparseOffsetsCache {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171impl SparseOffsetsCache {
172 pub fn new() -> Self {
173 Self {
174 inline: Vec::new(),
175 overflow: HashMap::new(),
176 cursor: TAGS_START_OFFSET,
177 finished: false,
178 }
179 }
180
181 pub fn clear(&mut self) {
182 self.inline.clear();
183 self.overflow.clear();
184 self.cursor = TAGS_START_OFFSET;
185 self.finished = false;
186 }
187
188 fn get(&self, column_id: ColumnId) -> Option<usize> {
190 for entry in &self.inline {
191 if entry.0 == column_id {
192 return Some(entry.1);
193 }
194 }
195 self.overflow.get(&column_id).copied()
196 }
197
198 fn insert(&mut self, column_id: ColumnId, offset: usize) {
200 if self.inline.len() < SPARSE_OFFSETS_INLINE_CAP {
201 if self.inline.capacity() == 0 {
202 self.inline.reserve_exact(SPARSE_OFFSETS_INLINE_CAP);
203 }
204 self.inline.push((column_id, offset));
205 } else {
206 self.overflow.insert(column_id, offset);
207 }
208 }
209
210 #[cfg(test)]
211 fn contains(&self, column_id: ColumnId) -> bool {
212 self.get(column_id).is_some()
213 }
214}
215
216impl SparsePrimaryKeyCodec {
217 pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
219 let columns = columns_ids.collect();
220 Self {
221 inner: Arc::new(SparsePrimaryKeyCodecInner {
222 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
223 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
224 label_field: SortField::new(ConcreteDataType::string_datatype()),
225 columns: Some(columns),
226 }),
227 }
228 }
229
230 pub fn new(region_metadata: &RegionMetadataRef) -> Self {
232 Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id))
233 }
234
235 pub fn schemaless() -> Self {
239 Self {
240 inner: Arc::new(SparsePrimaryKeyCodecInner {
241 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
242 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
243 label_field: SortField::new(ConcreteDataType::string_datatype()),
244 columns: None,
245 }),
246 }
247 }
248
249 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
251 Self {
252 inner: Arc::new(SparsePrimaryKeyCodecInner {
253 columns: Some(fields.iter().map(|f| f.0).collect()),
254 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
255 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
256 label_field: SortField::new(ConcreteDataType::string_datatype()),
257 }),
258 }
259 }
260
261 fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
263 if let Some(columns) = &self.inner.columns
265 && !columns.contains(&column_id)
266 {
267 return None;
268 }
269
270 match column_id {
271 RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
272 RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
273 _ => Some(&self.inner.label_field),
274 }
275 }
276
277 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
279 where
280 I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
281 {
282 let mut serializer = Serializer::new(buffer);
283 for (column_id, value) in row {
284 if value.is_null() {
285 continue;
286 }
287
288 if let Some(field) = self.get_field(column_id) {
289 column_id
290 .serialize(&mut serializer)
291 .context(SerializeFieldSnafu)?;
292 field.serialize(&mut serializer, &value)?;
293 } else {
294 common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
296 }
297 }
298 Ok(())
299 }
300
301 pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
302 where
303 I: Iterator<Item = (ColumnId, &'a [u8])>,
304 {
305 for (tag_column_id, tag_value) in row {
306 let value_len = tag_value.len();
307 buffer.reserve(6 + value_len / 8 * 9);
308 buffer.put_u32(tag_column_id);
309 buffer.put_u8(1);
310 buffer.put_u8(!tag_value.is_empty() as u8);
311
312 let mut len = 0;
315 let num_chucks = value_len / 8;
316 let remainder = value_len % 8;
317
318 for idx in 0..num_chucks {
319 buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]);
320 len += 8;
321 let extra = if len == value_len { 8 } else { 9 };
325 buffer.put_u8(extra);
326 }
327
328 if remainder != 0 {
329 buffer.extend_from_slice(&tag_value[len..value_len]);
330 buffer.put_bytes(0, 8 - remainder);
331 buffer.put_u8(remainder as u8);
332 }
333 }
334 Ok(())
335 }
336
337 pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec<u8>) -> Result<()> {
339 buffer.reserve_exact(22);
340 buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID);
341 buffer.put_u8(1);
342 buffer.put_u32(table_id);
343 buffer.put_u32(RESERVED_COLUMN_ID_TSID);
344 buffer.put_u8(1);
345 buffer.put_u64(tsid);
346 Ok(())
347 }
348
349 fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
351 let mut deserializer = Deserializer::new(bytes);
352 let mut values = SparseValues::with_capacity(16);
353
354 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
355 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
356 values.insert(column_id, value);
357
358 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
359 let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
360 values.insert(column_id, value);
361 while deserializer.has_remaining() {
362 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
363 let value = self.inner.label_field.deserialize(&mut deserializer)?;
364 values.insert(column_id, value);
365 }
366
367 Ok(values)
368 }
369
370 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
372 let mut deserializer = Deserializer::new(bytes);
373 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
375 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
376 Ok(Some(value))
377 }
378
379 pub fn has_column(
389 &self,
390 pk: &[u8],
391 cache: &mut SparseOffsetsCache,
392 column_id: ColumnId,
393 ) -> Option<usize> {
394 match column_id {
403 RESERVED_COLUMN_ID_TABLE_ID => return Some(TABLE_ID_VALUE_OFFSET),
404 RESERVED_COLUMN_ID_TSID => return Some(TSID_VALUE_OFFSET),
405 _ => {}
406 }
407
408 if let Some(offset) = cache.get(column_id) {
409 return Some(offset);
410 }
411 if cache.finished {
412 return None;
413 }
414
415 let mut deserializer = Deserializer::new(pk);
416 deserializer.advance(cache.cursor);
417 let mut offset = cache.cursor;
418 while deserializer.has_remaining() {
419 let col = u32::deserialize(&mut deserializer).unwrap();
420 offset += COLUMN_ID_ENCODE_SIZE;
421 let value_offset = offset;
422 cache.insert(col, value_offset);
423 let Some(field) = self.get_field(col) else {
424 cache.finished = true;
425 cache.cursor = offset;
426 return None;
427 };
428
429 let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
430 offset += skip;
431 cache.cursor = offset;
432 if col == column_id {
433 return Some(value_offset);
434 }
435 }
436
437 cache.finished = true;
438 None
439 }
440
441 pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
443 let mut deserializer = Deserializer::new(pk);
444 deserializer.advance(offset);
445 let field = self.get_field(column_id).unwrap();
447 field.deserialize(&mut deserializer)
448 }
449
450 pub fn encoded_value_for_column<'a>(
454 &self,
455 pk: &'a [u8],
456 cache: &mut SparseOffsetsCache,
457 column_id: ColumnId,
458 ) -> Result<Option<&'a [u8]>> {
459 let Some(offset) = self.has_column(pk, cache, column_id) else {
460 return Ok(None);
461 };
462
463 let Some(field) = self.get_field(column_id) else {
464 return Ok(None);
465 };
466
467 let mut deserializer = Deserializer::new(pk);
468 deserializer.advance(offset);
469 let len = field.skip_deserialize(pk, &mut deserializer)?;
470 Ok(Some(&pk[offset..offset + len]))
471 }
472}
473
474impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
475 fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
476 UnsupportedOperationSnafu {
477 err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
478 }
479 .fail()
480 }
481
482 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
483 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
484 }
485
486 fn encode_value_refs(
487 &self,
488 values: &[(ColumnId, ValueRef)],
489 buffer: &mut Vec<u8>,
490 ) -> Result<()> {
491 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.clone())), buffer)
492 }
493
494 fn estimated_size(&self) -> Option<usize> {
495 None
496 }
497
498 fn num_fields(&self) -> Option<usize> {
499 None
500 }
501
502 fn encoding(&self) -> PrimaryKeyEncoding {
503 PrimaryKeyEncoding::Sparse
504 }
505
506 fn primary_key_filter(
507 &self,
508 metadata: &RegionMetadataRef,
509 filters: Arc<Vec<SimpleFilterEvaluator>>,
510 skip_partition_column: bool,
511 ) -> Box<dyn PrimaryKeyFilter> {
512 Box::new(SparsePrimaryKeyFilter::new(
513 metadata.clone(),
514 filters,
515 self.clone(),
516 skip_partition_column,
517 ))
518 }
519
520 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
521 Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
522 }
523
524 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
525 self.decode_leftmost(bytes)
526 }
527}
528
529pub struct FieldWithId {
531 pub field: SortField,
532 pub column_id: ColumnId,
533}
534
535pub struct SparseEncoder {
537 fields: Vec<FieldWithId>,
538}
539
540impl SparseEncoder {
541 pub fn new(fields: Vec<FieldWithId>) -> Self {
542 Self { fields }
543 }
544
545 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
546 where
547 I: Iterator<Item = ValueRef<'a>>,
548 {
549 let mut serializer = Serializer::new(buffer);
550 for (value, field) in row.zip(self.fields.iter()) {
551 if !value.is_null() {
552 field
553 .column_id
554 .serialize(&mut serializer)
555 .context(SerializeFieldSnafu)?;
556 field.field.serialize(&mut serializer, &value)?;
557 }
558 }
559 Ok(())
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use std::sync::Arc;
566
567 use api::v1::SemanticType;
568 use common_query::prelude::{greptime_timestamp, greptime_value};
569 use common_time::Timestamp;
570 use common_time::timestamp::TimeUnit;
571 use datatypes::schema::ColumnSchema;
572 use datatypes::value::{OrderedFloat, Value};
573 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
574 use store_api::metric_engine_consts::{
575 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
576 };
577 use store_api::storage::{ColumnId, RegionId};
578
579 use super::*;
580
581 fn test_region_metadata() -> RegionMetadataRef {
582 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
583 builder
584 .push_column_metadata(ColumnMetadata {
585 column_schema: ColumnSchema::new(
586 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
587 ConcreteDataType::uint32_datatype(),
588 false,
589 ),
590 semantic_type: SemanticType::Tag,
591 column_id: ReservedColumnId::table_id(),
592 })
593 .push_column_metadata(ColumnMetadata {
594 column_schema: ColumnSchema::new(
595 DATA_SCHEMA_TSID_COLUMN_NAME,
596 ConcreteDataType::uint64_datatype(),
597 false,
598 ),
599 semantic_type: SemanticType::Tag,
600 column_id: ReservedColumnId::tsid(),
601 })
602 .push_column_metadata(ColumnMetadata {
603 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
604 semantic_type: SemanticType::Tag,
605 column_id: 1,
606 })
607 .push_column_metadata(ColumnMetadata {
608 column_schema: ColumnSchema::new(
609 "namespace",
610 ConcreteDataType::string_datatype(),
611 true,
612 ),
613 semantic_type: SemanticType::Tag,
614 column_id: 2,
615 })
616 .push_column_metadata(ColumnMetadata {
617 column_schema: ColumnSchema::new(
618 "container",
619 ConcreteDataType::string_datatype(),
620 true,
621 ),
622 semantic_type: SemanticType::Tag,
623 column_id: 3,
624 })
625 .push_column_metadata(ColumnMetadata {
626 column_schema: ColumnSchema::new(
627 "pod_name",
628 ConcreteDataType::string_datatype(),
629 true,
630 ),
631 semantic_type: SemanticType::Tag,
632 column_id: 4,
633 })
634 .push_column_metadata(ColumnMetadata {
635 column_schema: ColumnSchema::new(
636 "pod_ip",
637 ConcreteDataType::string_datatype(),
638 true,
639 ),
640 semantic_type: SemanticType::Tag,
641 column_id: 5,
642 })
643 .push_column_metadata(ColumnMetadata {
644 column_schema: ColumnSchema::new(
645 greptime_value(),
646 ConcreteDataType::float64_datatype(),
647 false,
648 ),
649 semantic_type: SemanticType::Field,
650 column_id: 6,
651 })
652 .push_column_metadata(ColumnMetadata {
653 column_schema: ColumnSchema::new(
654 greptime_timestamp(),
655 ConcreteDataType::timestamp_nanosecond_datatype(),
656 false,
657 ),
658 semantic_type: SemanticType::Timestamp,
659 column_id: 7,
660 })
661 .primary_key(vec![
662 ReservedColumnId::table_id(),
663 ReservedColumnId::tsid(),
664 1,
665 2,
666 3,
667 4,
668 5,
669 ]);
670 let metadata = builder.build().unwrap();
671 Arc::new(metadata)
672 }
673
674 #[test]
675 fn test_sparse_value_new_and_get_or_null() {
676 let mut sparse_value = SparseValues::new();
677 sparse_value.insert(1, Value::Int32(42));
678
679 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
680 assert_eq!(sparse_value.get_or_null(2), &Value::Null);
681 }
682
683 #[test]
684 fn test_sparse_value_insert() {
685 let mut sparse_value = SparseValues::new();
686 sparse_value.insert(1, Value::Int32(42));
687
688 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
689 }
690
691 fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
692 vec![
693 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
694 (
695 RESERVED_COLUMN_ID_TSID,
696 ValueRef::UInt64(123843349035232323),
697 ),
698 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
700 (2, ValueRef::String("greptime-cluster")),
702 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
704 (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
706 (5, ValueRef::String("10.10.10.10")),
708 (6, ValueRef::Float64(OrderedFloat(1.0))),
710 (
712 7,
713 ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
714 ),
715 ]
716 }
717
718 #[test]
719 fn test_encode_by_short_cuts() {
720 let region_metadata = test_region_metadata();
721 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
722 let mut buffer = Vec::new();
723 let internal_columns = [
724 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)),
725 (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)),
726 ];
727 let tags = [
728 (1, "greptime-frontend-6989d9899-22222"),
729 (2, "greptime-cluster"),
730 (3, "greptime-frontend-6989d9899-22222"),
731 (4, "greptime-frontend-6989d9899-22222"),
732 (5, "10.10.10.10"),
733 ];
734 codec
735 .encode_to_vec(internal_columns.into_iter(), &mut buffer)
736 .unwrap();
737 codec
738 .encode_to_vec(
739 tags.iter()
740 .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
741 &mut buffer,
742 )
743 .unwrap();
744
745 let mut buffer_by_raw_encoding = Vec::new();
746 codec
747 .encode_internal(1024, 42, &mut buffer_by_raw_encoding)
748 .unwrap();
749 let tags: Vec<_> = tags
750 .into_iter()
751 .map(|(col_id, tag_value)| (col_id, tag_value.as_bytes()))
752 .collect();
753 codec
754 .encode_raw_tag_value(
755 tags.iter().map(|(c, b)| (*c, *b)),
756 &mut buffer_by_raw_encoding,
757 )
758 .unwrap();
759 assert_eq!(buffer, buffer_by_raw_encoding);
760 }
761
762 #[test]
763 fn test_encode_to_vec() {
764 let region_metadata = test_region_metadata();
765 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
766 let mut buffer = Vec::new();
767
768 let row = test_row();
769 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
770 assert!(!buffer.is_empty());
771 let sparse_value = codec.decode_sparse(&buffer).unwrap();
772 assert_eq!(
773 sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
774 &Value::UInt32(42)
775 );
776 assert_eq!(
777 sparse_value.get_or_null(1),
778 &Value::String("greptime-frontend-6989d9899-22222".into())
779 );
780 assert_eq!(
781 sparse_value.get_or_null(2),
782 &Value::String("greptime-cluster".into())
783 );
784 assert_eq!(
785 sparse_value.get_or_null(3),
786 &Value::String("greptime-frontend-6989d9899-22222".into())
787 );
788 assert_eq!(
789 sparse_value.get_or_null(4),
790 &Value::String("greptime-frontend-6989d9899-22222".into())
791 );
792 assert_eq!(
793 sparse_value.get_or_null(5),
794 &Value::String("10.10.10.10".into())
795 );
796 }
797
798 #[test]
799 fn test_decode_leftmost() {
800 let region_metadata = test_region_metadata();
801 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
802 let mut buffer = Vec::new();
803 let row = test_row();
804 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
805 assert!(!buffer.is_empty());
806 let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
807 assert_eq!(result, Value::UInt32(42));
808 }
809
810 #[test]
811 fn test_has_column() {
812 let region_metadata = test_region_metadata();
813 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
814 let mut buffer = Vec::new();
815 let row = test_row();
816 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
817 assert!(!buffer.is_empty());
818
819 let mut offsets_map = SparseOffsetsCache::new();
820 for column_id in [
821 RESERVED_COLUMN_ID_TABLE_ID,
822 RESERVED_COLUMN_ID_TSID,
823 1,
824 2,
825 3,
826 4,
827 5,
828 ] {
829 let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
830 assert!(offset.is_some());
831 }
832
833 let offset = codec.has_column(&buffer, &mut offsets_map, 6);
834 assert!(offset.is_none());
835 }
836
837 #[test]
838 fn test_has_column_lazy_resume() {
839 let region_metadata = test_region_metadata();
840 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
841 let mut buffer = Vec::new();
842 codec
843 .encode_to_vec(test_row().into_iter(), &mut buffer)
844 .unwrap();
845
846 let mut cache = SparseOffsetsCache::new();
847 assert!(codec.has_column(&buffer, &mut cache, 1).is_some());
849 assert!(!cache.finished);
850 assert!(cache.contains(1));
851 assert!(!cache.contains(5));
852
853 assert!(codec.has_column(&buffer, &mut cache, 5).is_some());
855 assert!(cache.contains(5));
856
857 assert!(codec.has_column(&buffer, &mut cache, 2).is_some());
859
860 assert!(codec.has_column(&buffer, &mut cache, 999).is_none());
862 assert!(cache.finished);
863 assert!(codec.has_column(&buffer, &mut cache, 998).is_none());
865 }
866
867 #[test]
868 fn test_decode_value_at() {
869 let region_metadata = test_region_metadata();
870 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
871 let mut buffer = Vec::new();
872 let row = test_row();
873 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
874 assert!(!buffer.is_empty());
875
876 let row = test_row();
877 let mut offsets_map = SparseOffsetsCache::new();
878 for column_id in [
879 RESERVED_COLUMN_ID_TABLE_ID,
880 RESERVED_COLUMN_ID_TSID,
881 1,
882 2,
883 3,
884 4,
885 5,
886 ] {
887 let offset = codec
888 .has_column(&buffer, &mut offsets_map, column_id)
889 .unwrap();
890 let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
891 let expected_value = row
892 .iter()
893 .find(|(id, _)| *id == column_id)
894 .unwrap()
895 .1
896 .clone();
897 assert_eq!(value.as_value_ref(), expected_value);
898 }
899 }
900
901 #[test]
902 fn test_encoded_value_for_column() {
903 let region_metadata = test_region_metadata();
904 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
905 let mut buffer = Vec::new();
906 let row = test_row();
907 codec
908 .encode_to_vec(row.clone().into_iter(), &mut buffer)
909 .unwrap();
910 assert!(!buffer.is_empty());
911
912 let mut offsets_map = SparseOffsetsCache::new();
913 for column_id in [
914 RESERVED_COLUMN_ID_TABLE_ID,
915 RESERVED_COLUMN_ID_TSID,
916 1,
917 2,
918 3,
919 4,
920 5,
921 ] {
922 let encoded_value = codec
923 .encoded_value_for_column(&buffer, &mut offsets_map, column_id)
924 .unwrap()
925 .unwrap();
926 let expected_value = row
927 .iter()
928 .find(|(id, _)| *id == column_id)
929 .unwrap()
930 .1
931 .clone();
932 let data_type = match column_id {
933 RESERVED_COLUMN_ID_TABLE_ID => ConcreteDataType::uint32_datatype(),
934 RESERVED_COLUMN_ID_TSID => ConcreteDataType::uint64_datatype(),
935 _ => ConcreteDataType::string_datatype(),
936 };
937 let field = SortField::new(data_type);
938 let mut expected_encoded = Vec::new();
939 let mut serializer = Serializer::new(&mut expected_encoded);
940 field.serialize(&mut serializer, &expected_value).unwrap();
941 assert_eq!(encoded_value, expected_encoded.as_slice());
942 }
943
944 for column_id in [6_u32, 7_u32, 999_u32] {
945 let encoded_value = codec
946 .encoded_value_for_column(&buffer, &mut offsets_map, column_id)
947 .unwrap();
948 assert!(encoded_value.is_none());
949 }
950 }
951}