1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::prelude::ConcreteDataType;
20use datatypes::value::{Value, ValueRef};
21use memcomparable::{Deserializer, Serializer};
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::consts::ReservedColumnId;
27use store_api::storage::ColumnId;
28
29use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
30use crate::key_values::KeyValue;
31use crate::primary_key_filter::SparsePrimaryKeyFilter;
32use crate::row_converter::dense::SortField;
33use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
34
35#[derive(Clone, Debug)]
39pub struct SparsePrimaryKeyCodec {
40 inner: Arc<SparsePrimaryKeyCodecInner>,
41}
42
43#[derive(Debug)]
44struct SparsePrimaryKeyCodecInner {
45 table_id_field: SortField,
47 tsid_field: SortField,
49 label_field: SortField,
51 columns: Option<HashSet<ColumnId>>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct SparseValues {
62 values: HashMap<ColumnId, Value>,
63}
64
65impl SparseValues {
66 pub fn new(values: HashMap<ColumnId, Value>) -> Self {
68 Self { values }
69 }
70
71 pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
73 self.values.get(&column_id).unwrap_or(&Value::Null)
74 }
75
76 pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
78 self.values.get(column_id)
79 }
80
81 pub fn insert(&mut self, column_id: ColumnId, value: Value) {
83 self.values.insert(column_id, value);
84 }
85}
86
87const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
89const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
91pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
93
94impl SparsePrimaryKeyCodec {
95 pub fn new(region_metadata: &RegionMetadataRef) -> Self {
97 Self {
98 inner: Arc::new(SparsePrimaryKeyCodecInner {
99 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
100 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
101 label_field: SortField::new(ConcreteDataType::string_datatype()),
102 columns: Some(
103 region_metadata
104 .primary_key_columns()
105 .map(|c| c.column_id)
106 .collect(),
107 ),
108 }),
109 }
110 }
111
112 pub fn schemaless() -> Self {
116 Self {
117 inner: Arc::new(SparsePrimaryKeyCodecInner {
118 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
119 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
120 label_field: SortField::new(ConcreteDataType::string_datatype()),
121 columns: None,
122 }),
123 }
124 }
125
126 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
127 Self {
128 inner: Arc::new(SparsePrimaryKeyCodecInner {
129 columns: Some(fields.iter().map(|f| f.0).collect()),
130 table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
131 tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
132 label_field: SortField::new(ConcreteDataType::string_datatype()),
133 }),
134 }
135 }
136
137 fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
139 if let Some(columns) = &self.inner.columns {
141 if !columns.contains(&column_id) {
142 return None;
143 }
144 }
145
146 match column_id {
147 RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
148 RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
149 _ => Some(&self.inner.label_field),
150 }
151 }
152
153 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
155 where
156 I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
157 {
158 let mut serializer = Serializer::new(buffer);
159 for (column_id, value) in row {
160 if value.is_null() {
161 continue;
162 }
163
164 if let Some(field) = self.get_field(column_id) {
165 column_id
166 .serialize(&mut serializer)
167 .context(SerializeFieldSnafu)?;
168 field.serialize(&mut serializer, &value)?;
169 } else {
170 common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
172 }
173 }
174 Ok(())
175 }
176
177 fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
179 let mut deserializer = Deserializer::new(bytes);
180 let mut values = SparseValues::new(HashMap::new());
181
182 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
183 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
184 values.insert(column_id, value);
185
186 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
187 let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
188 values.insert(column_id, value);
189 while deserializer.has_remaining() {
190 let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
191 let value = self.inner.label_field.deserialize(&mut deserializer)?;
192 values.insert(column_id, value);
193 }
194
195 Ok(values)
196 }
197
198 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
200 let mut deserializer = Deserializer::new(bytes);
201 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
203 let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
204 Ok(Some(value))
205 }
206
207 pub fn has_column(
209 &self,
210 pk: &[u8],
211 offsets_map: &mut HashMap<u32, usize>,
212 column_id: ColumnId,
213 ) -> Option<usize> {
214 if offsets_map.is_empty() {
215 let mut deserializer = Deserializer::new(pk);
216 let mut offset = 0;
217 while deserializer.has_remaining() {
218 let column_id = u32::deserialize(&mut deserializer).unwrap();
219 offset += 4;
220 offsets_map.insert(column_id, offset);
221 let Some(field) = self.get_field(column_id) else {
222 break;
223 };
224
225 let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
226 offset += skip;
227 }
228
229 offsets_map.get(&column_id).copied()
230 } else {
231 offsets_map.get(&column_id).copied()
232 }
233 }
234
235 pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
237 let mut deserializer = Deserializer::new(pk);
238 deserializer.advance(offset);
239 let field = self.get_field(column_id).unwrap();
241 field.deserialize(&mut deserializer)
242 }
243}
244
245impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
246 fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
247 UnsupportedOperationSnafu {
248 err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
249 }
250 .fail()
251 }
252
253 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
254 self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
255 }
256
257 fn encode_value_refs(
258 &self,
259 values: &[(ColumnId, ValueRef)],
260 buffer: &mut Vec<u8>,
261 ) -> Result<()> {
262 self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
263 }
264
265 fn estimated_size(&self) -> Option<usize> {
266 None
267 }
268
269 fn num_fields(&self) -> Option<usize> {
270 None
271 }
272
273 fn encoding(&self) -> PrimaryKeyEncoding {
274 PrimaryKeyEncoding::Sparse
275 }
276
277 fn primary_key_filter(
278 &self,
279 metadata: &RegionMetadataRef,
280 filters: Arc<Vec<SimpleFilterEvaluator>>,
281 ) -> Box<dyn PrimaryKeyFilter> {
282 Box::new(SparsePrimaryKeyFilter::new(
283 metadata.clone(),
284 filters,
285 self.clone(),
286 ))
287 }
288
289 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
290 Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
291 }
292
293 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
294 self.decode_leftmost(bytes)
295 }
296}
297
298pub struct FieldWithId {
300 pub field: SortField,
301 pub column_id: ColumnId,
302}
303
304pub struct SparseEncoder {
306 fields: Vec<FieldWithId>,
307}
308
309impl SparseEncoder {
310 pub fn new(fields: Vec<FieldWithId>) -> Self {
311 Self { fields }
312 }
313
314 pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
315 where
316 I: Iterator<Item = ValueRef<'a>>,
317 {
318 let mut serializer = Serializer::new(buffer);
319 for (value, field) in row.zip(self.fields.iter()) {
320 if !value.is_null() {
321 field
322 .column_id
323 .serialize(&mut serializer)
324 .context(SerializeFieldSnafu)?;
325 field.field.serialize(&mut serializer, &value)?;
326 }
327 }
328 Ok(())
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use std::sync::Arc;
335
336 use api::v1::SemanticType;
337 use common_time::timestamp::TimeUnit;
338 use common_time::Timestamp;
339 use datatypes::schema::ColumnSchema;
340 use datatypes::value::{OrderedFloat, Value};
341 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
342 use store_api::metric_engine_consts::{
343 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
344 };
345 use store_api::storage::{ColumnId, RegionId};
346
347 use super::*;
348
349 fn test_region_metadata() -> RegionMetadataRef {
350 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
351 builder
352 .push_column_metadata(ColumnMetadata {
353 column_schema: ColumnSchema::new(
354 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
355 ConcreteDataType::uint32_datatype(),
356 false,
357 ),
358 semantic_type: SemanticType::Tag,
359 column_id: ReservedColumnId::table_id(),
360 })
361 .push_column_metadata(ColumnMetadata {
362 column_schema: ColumnSchema::new(
363 DATA_SCHEMA_TSID_COLUMN_NAME,
364 ConcreteDataType::uint64_datatype(),
365 false,
366 ),
367 semantic_type: SemanticType::Tag,
368 column_id: ReservedColumnId::tsid(),
369 })
370 .push_column_metadata(ColumnMetadata {
371 column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
372 semantic_type: SemanticType::Tag,
373 column_id: 1,
374 })
375 .push_column_metadata(ColumnMetadata {
376 column_schema: ColumnSchema::new(
377 "namespace",
378 ConcreteDataType::string_datatype(),
379 true,
380 ),
381 semantic_type: SemanticType::Tag,
382 column_id: 2,
383 })
384 .push_column_metadata(ColumnMetadata {
385 column_schema: ColumnSchema::new(
386 "container",
387 ConcreteDataType::string_datatype(),
388 true,
389 ),
390 semantic_type: SemanticType::Tag,
391 column_id: 3,
392 })
393 .push_column_metadata(ColumnMetadata {
394 column_schema: ColumnSchema::new(
395 "pod_name",
396 ConcreteDataType::string_datatype(),
397 true,
398 ),
399 semantic_type: SemanticType::Tag,
400 column_id: 4,
401 })
402 .push_column_metadata(ColumnMetadata {
403 column_schema: ColumnSchema::new(
404 "pod_ip",
405 ConcreteDataType::string_datatype(),
406 true,
407 ),
408 semantic_type: SemanticType::Tag,
409 column_id: 5,
410 })
411 .push_column_metadata(ColumnMetadata {
412 column_schema: ColumnSchema::new(
413 "greptime_value",
414 ConcreteDataType::float64_datatype(),
415 false,
416 ),
417 semantic_type: SemanticType::Field,
418 column_id: 6,
419 })
420 .push_column_metadata(ColumnMetadata {
421 column_schema: ColumnSchema::new(
422 "greptime_timestamp",
423 ConcreteDataType::timestamp_nanosecond_datatype(),
424 false,
425 ),
426 semantic_type: SemanticType::Timestamp,
427 column_id: 7,
428 })
429 .primary_key(vec![
430 ReservedColumnId::table_id(),
431 ReservedColumnId::tsid(),
432 1,
433 2,
434 3,
435 4,
436 5,
437 ]);
438 let metadata = builder.build().unwrap();
439 Arc::new(metadata)
440 }
441
442 #[test]
443 fn test_sparse_value_new_and_get_or_null() {
444 let mut values = HashMap::new();
445 values.insert(1, Value::Int32(42));
446 let sparse_value = SparseValues::new(values);
447
448 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
449 assert_eq!(sparse_value.get_or_null(2), &Value::Null);
450 }
451
452 #[test]
453 fn test_sparse_value_insert() {
454 let mut sparse_value = SparseValues::new(HashMap::new());
455 sparse_value.insert(1, Value::Int32(42));
456
457 assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
458 }
459
460 fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
461 vec![
462 (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
463 (
464 RESERVED_COLUMN_ID_TSID,
465 ValueRef::UInt64(123843349035232323),
466 ),
467 (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
469 (2, ValueRef::String("greptime-cluster")),
471 (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
473 (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
475 (5, ValueRef::String("10.10.10.10")),
477 (6, ValueRef::Float64(OrderedFloat(1.0))),
479 (
481 7,
482 ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
483 ),
484 ]
485 }
486
487 #[test]
488 fn test_encode_to_vec() {
489 let region_metadata = test_region_metadata();
490 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
491 let mut buffer = Vec::new();
492
493 let row = test_row();
494 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
495 assert!(!buffer.is_empty());
496 let sparse_value = codec.decode_sparse(&buffer).unwrap();
497 assert_eq!(
498 sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
499 &Value::UInt32(42)
500 );
501 assert_eq!(
502 sparse_value.get_or_null(1),
503 &Value::String("greptime-frontend-6989d9899-22222".into())
504 );
505 assert_eq!(
506 sparse_value.get_or_null(2),
507 &Value::String("greptime-cluster".into())
508 );
509 assert_eq!(
510 sparse_value.get_or_null(3),
511 &Value::String("greptime-frontend-6989d9899-22222".into())
512 );
513 assert_eq!(
514 sparse_value.get_or_null(4),
515 &Value::String("greptime-frontend-6989d9899-22222".into())
516 );
517 assert_eq!(
518 sparse_value.get_or_null(5),
519 &Value::String("10.10.10.10".into())
520 );
521 }
522
523 #[test]
524 fn test_decode_leftmost() {
525 let region_metadata = test_region_metadata();
526 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
527 let mut buffer = Vec::new();
528 let row = test_row();
529 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
530 assert!(!buffer.is_empty());
531 let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
532 assert_eq!(result, Value::UInt32(42));
533 }
534
535 #[test]
536 fn test_has_column() {
537 let region_metadata = test_region_metadata();
538 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
539 let mut buffer = Vec::new();
540 let row = test_row();
541 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
542 assert!(!buffer.is_empty());
543
544 let mut offsets_map = HashMap::new();
545 for column_id in [
546 RESERVED_COLUMN_ID_TABLE_ID,
547 RESERVED_COLUMN_ID_TSID,
548 1,
549 2,
550 3,
551 4,
552 5,
553 ] {
554 let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
555 assert!(offset.is_some());
556 }
557
558 let offset = codec.has_column(&buffer, &mut offsets_map, 6);
559 assert!(offset.is_none());
560 }
561
562 #[test]
563 fn test_decode_value_at() {
564 let region_metadata = test_region_metadata();
565 let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
566 let mut buffer = Vec::new();
567 let row = test_row();
568 codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
569 assert!(!buffer.is_empty());
570
571 let row = test_row();
572 let mut offsets_map = HashMap::new();
573 for column_id in [
574 RESERVED_COLUMN_ID_TABLE_ID,
575 RESERVED_COLUMN_ID_TSID,
576 1,
577 2,
578 3,
579 4,
580 5,
581 ] {
582 let offset = codec
583 .has_column(&buffer, &mut offsets_map, column_id)
584 .unwrap();
585 let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
586 let expected_value = row.iter().find(|(id, _)| *id == column_id).unwrap().1;
587 assert_eq!(value.as_value_ref(), expected_value);
588 }
589 }
590}