1use std::sync::Arc;
16
17use bytes::Buf;
18use common_base::bytes::Bytes;
19use common_decimal::Decimal128;
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use common_time::time::Time;
22use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Value;
25use datatypes::types::IntervalType;
26use datatypes::value::ValueRef;
27use memcomparable::{Deserializer, Serializer};
28use paste::paste;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::{RegionMetadata, RegionMetadataRef};
33use store_api::storage::ColumnId;
34
35use crate::error::{
36 self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
37};
38use crate::key_values::KeyValue;
39use crate::primary_key_filter::DensePrimaryKeyFilter;
40use crate::row_converter::{
41 CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct SortField {
47 data_type: ConcreteDataType,
48}
49
50impl SortField {
51 pub fn new(data_type: ConcreteDataType) -> Self {
52 Self { data_type }
53 }
54
55 pub fn data_type(&self) -> &ConcreteDataType {
57 &self.data_type
58 }
59
60 pub fn estimated_size(&self) -> usize {
61 match &self.data_type {
62 ConcreteDataType::Boolean(_) => 2,
63 ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
64 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
65 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
66 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
67 ConcreteDataType::Float32(_) => 5,
68 ConcreteDataType::Float64(_) => 9,
69 ConcreteDataType::Binary(_)
70 | ConcreteDataType::Json(_)
71 | ConcreteDataType::Vector(_) => 11,
72 ConcreteDataType::String(_) => 11, ConcreteDataType::Date(_) => 5,
74 ConcreteDataType::Timestamp(_) => 10,
75 ConcreteDataType::Time(_) => 10,
76 ConcreteDataType::Duration(_) => 10,
77 ConcreteDataType::Interval(_) => 18,
78 ConcreteDataType::Decimal128(_) => 19,
79 ConcreteDataType::Null(_)
80 | ConcreteDataType::List(_)
81 | ConcreteDataType::Dictionary(_) => 0,
82 }
83 }
84
85 pub fn serialize(
87 &self,
88 serializer: &mut Serializer<&mut Vec<u8>>,
89 value: &ValueRef,
90 ) -> Result<()> {
91 macro_rules! cast_value_and_serialize {
92 (
93 $self: ident;
94 $serializer: ident;
95 $(
96 $ty: ident, $f: ident
97 ),*
98 ) => {
99 match &$self.data_type {
100 $(
101 ConcreteDataType::$ty(_) => {
102 paste!{
103 value
104 .[<as_ $f>]()
105 .context(FieldTypeMismatchSnafu)?
106 .serialize($serializer)
107 .context(SerializeFieldSnafu)?;
108 }
109 }
110 )*
111 ConcreteDataType::Timestamp(_) => {
112 let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
113 timestamp
114 .map(|t|t.value())
115 .serialize($serializer)
116 .context(SerializeFieldSnafu)?;
117 }
118 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
119 let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
120 interval.map(|i| i.to_i32())
121 .serialize($serializer)
122 .context(SerializeFieldSnafu)?;
123 }
124 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
125 let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
126 interval.map(|i| i.to_i64())
127 .serialize($serializer)
128 .context(SerializeFieldSnafu)?;
129 }
130 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
131 let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
132 interval.map(|i| i.to_i128())
133 .serialize($serializer)
134 .context(SerializeFieldSnafu)?;
135 }
136 ConcreteDataType::List(_) |
137 ConcreteDataType::Dictionary(_) |
138 ConcreteDataType::Null(_) => {
139 return error::NotSupportedFieldSnafu {
140 data_type: $self.data_type.clone()
141 }.fail()
142 }
143 }
144 };
145 }
146 cast_value_and_serialize!(self; serializer;
147 Boolean, boolean,
148 Binary, binary,
149 Int8, i8,
150 UInt8, u8,
151 Int16, i16,
152 UInt16, u16,
153 Int32, i32,
154 UInt32, u32,
155 Int64, i64,
156 UInt64, u64,
157 Float32, f32,
158 Float64, f64,
159 String, string,
160 Date, date,
161 Time, time,
162 Duration, duration,
163 Decimal128, decimal128,
164 Json, binary,
165 Vector, binary
166 );
167
168 Ok(())
169 }
170
171 pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
173 macro_rules! deserialize_and_build_value {
174 (
175 $self: ident;
176 $serializer: ident;
177 $(
178 $ty: ident, $f: ident
179 ),*
180 ) => {
181
182 match &$self.data_type {
183 $(
184 ConcreteDataType::$ty(_) => {
185 Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
186 }
187 )*
188 ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
189 Option::<Vec<u8>>::deserialize(deserializer)
190 .context(error::DeserializeFieldSnafu)?
191 .map(Bytes::from),
192 )),
193 ConcreteDataType::Timestamp(ty) => {
194 let timestamp = Option::<i64>::deserialize(deserializer)
195 .context(error::DeserializeFieldSnafu)?
196 .map(|t|ty.create_timestamp(t));
197 Ok(Value::from(timestamp))
198 }
199 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
200 let interval = Option::<i32>::deserialize(deserializer)
201 .context(error::DeserializeFieldSnafu)?
202 .map(IntervalYearMonth::from_i32);
203 Ok(Value::from(interval))
204 }
205 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
206 let interval = Option::<i64>::deserialize(deserializer)
207 .context(error::DeserializeFieldSnafu)?
208 .map(IntervalDayTime::from_i64);
209 Ok(Value::from(interval))
210 }
211 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
212 let interval = Option::<i128>::deserialize(deserializer)
213 .context(error::DeserializeFieldSnafu)?
214 .map(IntervalMonthDayNano::from_i128);
215 Ok(Value::from(interval))
216 }
217 ConcreteDataType::List(l) => NotSupportedFieldSnafu {
218 data_type: ConcreteDataType::List(l.clone()),
219 }
220 .fail(),
221 ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
222 data_type: ConcreteDataType::Dictionary(d.clone()),
223 }
224 .fail(),
225 ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
226 data_type: ConcreteDataType::Null(n.clone()),
227 }
228 .fail(),
229 }
230 };
231 }
232 deserialize_and_build_value!(self; deserializer;
233 Boolean, bool,
234 Int8, i8,
235 Int16, i16,
236 Int32, i32,
237 Int64, i64,
238 UInt8, u8,
239 UInt16, u16,
240 UInt32, u32,
241 UInt64, u64,
242 Float32, f32,
243 Float64, f64,
244 String, String,
245 Date, Date,
246 Time, Time,
247 Duration, Duration,
248 Decimal128, Decimal128
249 )
250 }
251
252 pub(crate) fn skip_deserialize(
254 &self,
255 bytes: &[u8],
256 deserializer: &mut Deserializer<&[u8]>,
257 ) -> Result<usize> {
258 let pos = deserializer.position();
259 if bytes[pos] == 0 {
260 deserializer.advance(1);
261 return Ok(1);
262 }
263
264 let to_skip = match &self.data_type {
265 ConcreteDataType::Boolean(_) => 2,
266 ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
267 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
268 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
269 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
270 ConcreteDataType::Float32(_) => 5,
271 ConcreteDataType::Float64(_) => 9,
272 ConcreteDataType::Binary(_)
273 | ConcreteDataType::Json(_)
274 | ConcreteDataType::Vector(_) => {
275 let pos_before = deserializer.position();
278 let mut current = pos_before + 1;
279 while bytes[current] == 1 {
280 current += 2;
281 }
282 let to_skip = current - pos_before + 1;
283 deserializer.advance(to_skip);
284 return Ok(to_skip);
285 }
286 ConcreteDataType::String(_) => {
287 let pos_before = deserializer.position();
288 deserializer.advance(1);
289 deserializer
290 .skip_bytes()
291 .context(error::DeserializeFieldSnafu)?;
292 return Ok(deserializer.position() - pos_before);
293 }
294 ConcreteDataType::Date(_) => 5,
295 ConcreteDataType::Timestamp(_) => 9, ConcreteDataType::Time(_) => 10, ConcreteDataType::Duration(_) => 10,
298 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
299 ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
300 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
301 ConcreteDataType::Decimal128(_) => 19,
302 ConcreteDataType::Null(_)
303 | ConcreteDataType::List(_)
304 | ConcreteDataType::Dictionary(_) => 0,
305 };
306 deserializer.advance(to_skip);
307 Ok(to_skip)
308 }
309}
310
311impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
312 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
313 where
314 I: Iterator<Item = ValueRef<'a>>,
315 {
316 self.encode_dense(row, buffer)
317 }
318}
319
320#[derive(Clone, Debug)]
322pub struct DensePrimaryKeyCodec {
323 ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
325}
326
327impl DensePrimaryKeyCodec {
328 pub fn new(metadata: &RegionMetadata) -> Self {
329 let ordered_primary_key_columns = metadata
330 .primary_key_columns()
331 .map(|c| {
332 (
333 c.column_id,
334 SortField::new(c.column_schema.data_type.clone()),
335 )
336 })
337 .collect::<Vec<_>>();
338
339 Self::with_fields(ordered_primary_key_columns)
340 }
341
342 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
343 Self {
344 ordered_primary_key_columns: Arc::new(fields),
345 }
346 }
347
348 fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
349 where
350 I: Iterator<Item = ValueRef<'a>>,
351 {
352 let mut serializer = Serializer::new(buffer);
353 for (idx, value) in row.enumerate() {
354 self.field_at(idx).serialize(&mut serializer, &value)?;
355 }
356 Ok(())
357 }
358
359 pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
361 let mut deserializer = Deserializer::new(bytes);
362 let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
363 for (column_id, field) in self.ordered_primary_key_columns.iter() {
364 let value = field.deserialize(&mut deserializer)?;
365 values.push((*column_id, value));
366 }
367 Ok(values)
368 }
369
370 pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
372 let mut deserializer = Deserializer::new(bytes);
373 let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
374 for (_, field) in self.ordered_primary_key_columns.iter() {
375 let value = field.deserialize(&mut deserializer)?;
376 values.push(value);
377 }
378 Ok(values)
379 }
380
381 fn field_at(&self, pos: usize) -> &SortField {
386 &self.ordered_primary_key_columns[pos].1
387 }
388
389 pub fn decode_value_at(
393 &self,
394 bytes: &[u8],
395 pos: usize,
396 offsets_buf: &mut Vec<usize>,
397 ) -> Result<Value> {
398 let mut deserializer = Deserializer::new(bytes);
399 if pos < offsets_buf.len() {
400 let to_skip = offsets_buf[pos];
402 deserializer.advance(to_skip);
403 return self.field_at(pos).deserialize(&mut deserializer);
404 }
405
406 if offsets_buf.is_empty() {
407 let mut offset = 0;
408 for i in 0..pos {
410 offsets_buf.push(offset);
412 let skip = self
413 .field_at(i)
414 .skip_deserialize(bytes, &mut deserializer)?;
415 offset += skip;
416 }
417 offsets_buf.push(offset);
419 } else {
420 let value_start = offsets_buf.len() - 1;
422 let mut offset = offsets_buf[value_start];
424 deserializer.advance(offset);
425 for i in value_start..pos {
426 let skip = self
428 .field_at(i)
429 .skip_deserialize(bytes, &mut deserializer)?;
430 offset += skip;
432 offsets_buf.push(offset);
433 }
434 }
435
436 self.field_at(pos).deserialize(&mut deserializer)
437 }
438
439 pub fn estimated_size(&self) -> usize {
440 self.ordered_primary_key_columns
441 .iter()
442 .map(|(_, f)| f.estimated_size())
443 .sum()
444 }
445
446 pub fn num_fields(&self) -> usize {
447 self.ordered_primary_key_columns.len()
448 }
449}
450
451impl PrimaryKeyCodec for DensePrimaryKeyCodec {
452 fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
453 self.encode_dense(key_value.primary_keys(), buffer)
454 }
455
456 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
457 self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
458 }
459
460 fn encode_value_refs(
461 &self,
462 values: &[(ColumnId, ValueRef)],
463 buffer: &mut Vec<u8>,
464 ) -> Result<()> {
465 let iter = values.iter().map(|(_, v)| *v);
466 self.encode_dense(iter, buffer)
467 }
468
469 fn estimated_size(&self) -> Option<usize> {
470 Some(self.estimated_size())
471 }
472
473 fn num_fields(&self) -> Option<usize> {
474 Some(self.num_fields())
475 }
476
477 fn encoding(&self) -> PrimaryKeyEncoding {
478 PrimaryKeyEncoding::Dense
479 }
480
481 fn primary_key_filter(
482 &self,
483 metadata: &RegionMetadataRef,
484 filters: Arc<Vec<SimpleFilterEvaluator>>,
485 ) -> Box<dyn PrimaryKeyFilter> {
486 Box::new(DensePrimaryKeyFilter::new(
487 metadata.clone(),
488 filters,
489 self.clone(),
490 ))
491 }
492
493 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
494 Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
495 }
496
497 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
498 let mut values = self.decode_dense(bytes)?;
500 Ok(values.pop().map(|(_, v)| v))
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use common_base::bytes::StringBytes;
507 use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
508 use datatypes::value::Value;
509
510 use super::*;
511
512 fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
513 let encoder = DensePrimaryKeyCodec::with_fields(
514 data_types
515 .iter()
516 .map(|t| (0, SortField::new(t.clone())))
517 .collect::<Vec<_>>(),
518 );
519
520 let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
521
522 let result = encoder.encode(value_ref.iter().cloned()).unwrap();
523 let decoded = encoder.decode(&result).unwrap().into_dense();
524 assert_eq!(decoded, row);
525 let mut decoded = Vec::new();
526 let mut offsets = Vec::new();
527 for _ in 0..2 {
529 decoded.clear();
530 for i in 0..data_types.len() {
531 let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
532 decoded.push(value);
533 }
534 assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
535 assert_eq!(decoded, row);
536 }
537 }
538
539 #[test]
540 fn test_memcmp() {
541 let encoder = DensePrimaryKeyCodec::with_fields(vec![
542 (0, SortField::new(ConcreteDataType::string_datatype())),
543 (1, SortField::new(ConcreteDataType::int64_datatype())),
544 ]);
545 let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
546 let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
547 let result = encoder.encode(value_ref.iter().cloned()).unwrap();
548
549 let decoded = encoder.decode(&result).unwrap().into_dense();
550 assert_eq!(&values, &decoded as &[Value]);
551 }
552
553 #[test]
554 fn test_memcmp_timestamp() {
555 check_encode_and_decode(
556 &[
557 ConcreteDataType::timestamp_millisecond_datatype(),
558 ConcreteDataType::int64_datatype(),
559 ],
560 vec![
561 Value::Timestamp(Timestamp::new_millisecond(42)),
562 Value::Int64(43),
563 ],
564 );
565 }
566
567 #[test]
568 fn test_memcmp_duration() {
569 check_encode_and_decode(
570 &[
571 ConcreteDataType::duration_millisecond_datatype(),
572 ConcreteDataType::int64_datatype(),
573 ],
574 vec![
575 Value::Duration(Duration::new_millisecond(44)),
576 Value::Int64(45),
577 ],
578 )
579 }
580
581 #[test]
582 fn test_memcmp_binary() {
583 check_encode_and_decode(
584 &[
585 ConcreteDataType::binary_datatype(),
586 ConcreteDataType::int64_datatype(),
587 ],
588 vec![
589 Value::Binary(Bytes::from("hello".as_bytes())),
590 Value::Int64(43),
591 ],
592 );
593 }
594
595 #[test]
596 fn test_memcmp_string() {
597 check_encode_and_decode(
598 &[ConcreteDataType::string_datatype()],
599 vec![Value::String(StringBytes::from("hello"))],
600 );
601
602 check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
603
604 check_encode_and_decode(
605 &[ConcreteDataType::string_datatype()],
606 vec![Value::String("".into())],
607 );
608 check_encode_and_decode(
609 &[ConcreteDataType::string_datatype()],
610 vec![Value::String("world".into())],
611 );
612 }
613
614 #[test]
615 fn test_encode_null() {
616 check_encode_and_decode(
617 &[
618 ConcreteDataType::string_datatype(),
619 ConcreteDataType::int32_datatype(),
620 ],
621 vec![Value::String(StringBytes::from("abcd")), Value::Null],
622 )
623 }
624
625 #[test]
626 fn test_encode_multiple_rows() {
627 check_encode_and_decode(
628 &[
629 ConcreteDataType::string_datatype(),
630 ConcreteDataType::int64_datatype(),
631 ConcreteDataType::boolean_datatype(),
632 ],
633 vec![
634 Value::String("hello".into()),
635 Value::Int64(42),
636 Value::Boolean(false),
637 ],
638 );
639
640 check_encode_and_decode(
641 &[
642 ConcreteDataType::string_datatype(),
643 ConcreteDataType::int64_datatype(),
644 ConcreteDataType::boolean_datatype(),
645 ],
646 vec![
647 Value::String("world".into()),
648 Value::Int64(43),
649 Value::Boolean(true),
650 ],
651 );
652
653 check_encode_and_decode(
654 &[
655 ConcreteDataType::string_datatype(),
656 ConcreteDataType::int64_datatype(),
657 ConcreteDataType::boolean_datatype(),
658 ],
659 vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
660 );
661
662 check_encode_and_decode(
664 &[
665 ConcreteDataType::boolean_datatype(),
666 ConcreteDataType::int8_datatype(),
667 ConcreteDataType::uint8_datatype(),
668 ConcreteDataType::int16_datatype(),
669 ConcreteDataType::uint16_datatype(),
670 ConcreteDataType::int32_datatype(),
671 ConcreteDataType::uint32_datatype(),
672 ConcreteDataType::int64_datatype(),
673 ConcreteDataType::uint64_datatype(),
674 ConcreteDataType::float32_datatype(),
675 ConcreteDataType::float64_datatype(),
676 ConcreteDataType::binary_datatype(),
677 ConcreteDataType::string_datatype(),
678 ConcreteDataType::date_datatype(),
679 ConcreteDataType::timestamp_millisecond_datatype(),
680 ConcreteDataType::time_millisecond_datatype(),
681 ConcreteDataType::duration_millisecond_datatype(),
682 ConcreteDataType::interval_year_month_datatype(),
683 ConcreteDataType::interval_day_time_datatype(),
684 ConcreteDataType::interval_month_day_nano_datatype(),
685 ConcreteDataType::decimal128_default_datatype(),
686 ConcreteDataType::vector_datatype(3),
687 ],
688 vec![
689 Value::Boolean(true),
690 Value::Int8(8),
691 Value::UInt8(8),
692 Value::Int16(16),
693 Value::UInt16(16),
694 Value::Int32(32),
695 Value::UInt32(32),
696 Value::Int64(64),
697 Value::UInt64(64),
698 Value::Float32(1.0.into()),
699 Value::Float64(1.0.into()),
700 Value::Binary(b"hello"[..].into()),
701 Value::String("world".into()),
702 Value::Date(Date::new(10)),
703 Value::Timestamp(Timestamp::new_millisecond(12)),
704 Value::Time(Time::new_millisecond(13)),
705 Value::Duration(Duration::new_millisecond(14)),
706 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
707 Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
708 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
709 Value::Decimal128(Decimal128::from(16)),
710 Value::Binary(Bytes::from(vec![0; 12])),
711 ],
712 );
713 }
714}