1use std::sync::Arc;
16
17use bytes::Buf;
18use common_base::bytes::Bytes;
19use common_decimal::Decimal128;
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use common_time::time::Time;
22use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Value;
25use datatypes::types::IntervalType;
26use datatypes::value::ValueRef;
27use memcomparable::{Deserializer, Serializer};
28use paste::paste;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::{RegionMetadata, RegionMetadataRef};
33use store_api::storage::ColumnId;
34
35use crate::error::{
36 self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
37};
38use crate::memtable::key_values::KeyValue;
39use crate::memtable::partition_tree::DensePrimaryKeyFilter;
40use crate::row_converter::{
41 CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct SortField {
46 pub(crate) data_type: ConcreteDataType,
47}
48
49impl SortField {
50 pub fn new(data_type: ConcreteDataType) -> Self {
51 Self { data_type }
52 }
53
54 pub fn estimated_size(&self) -> usize {
55 match &self.data_type {
56 ConcreteDataType::Boolean(_) => 2,
57 ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
58 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
59 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
60 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
61 ConcreteDataType::Float32(_) => 5,
62 ConcreteDataType::Float64(_) => 9,
63 ConcreteDataType::Binary(_)
64 | ConcreteDataType::Json(_)
65 | ConcreteDataType::Vector(_) => 11,
66 ConcreteDataType::String(_) => 11, ConcreteDataType::Date(_) => 5,
68 ConcreteDataType::Timestamp(_) => 10,
69 ConcreteDataType::Time(_) => 10,
70 ConcreteDataType::Duration(_) => 10,
71 ConcreteDataType::Interval(_) => 18,
72 ConcreteDataType::Decimal128(_) => 19,
73 ConcreteDataType::Null(_)
74 | ConcreteDataType::List(_)
75 | ConcreteDataType::Dictionary(_) => 0,
76 }
77 }
78}
79
80impl SortField {
81 pub(crate) fn serialize(
82 &self,
83 serializer: &mut Serializer<&mut Vec<u8>>,
84 value: &ValueRef,
85 ) -> Result<()> {
86 macro_rules! cast_value_and_serialize {
87 (
88 $self: ident;
89 $serializer: ident;
90 $(
91 $ty: ident, $f: ident
92 ),*
93 ) => {
94 match &$self.data_type {
95 $(
96 ConcreteDataType::$ty(_) => {
97 paste!{
98 value
99 .[<as_ $f>]()
100 .context(FieldTypeMismatchSnafu)?
101 .serialize($serializer)
102 .context(SerializeFieldSnafu)?;
103 }
104 }
105 )*
106 ConcreteDataType::Timestamp(_) => {
107 let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
108 timestamp
109 .map(|t|t.value())
110 .serialize($serializer)
111 .context(SerializeFieldSnafu)?;
112 }
113 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
114 let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
115 interval.map(|i| i.to_i32())
116 .serialize($serializer)
117 .context(SerializeFieldSnafu)?;
118 }
119 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
120 let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
121 interval.map(|i| i.to_i64())
122 .serialize($serializer)
123 .context(SerializeFieldSnafu)?;
124 }
125 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
126 let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
127 interval.map(|i| i.to_i128())
128 .serialize($serializer)
129 .context(SerializeFieldSnafu)?;
130 }
131 ConcreteDataType::List(_) |
132 ConcreteDataType::Dictionary(_) |
133 ConcreteDataType::Null(_) => {
134 return error::NotSupportedFieldSnafu {
135 data_type: $self.data_type.clone()
136 }.fail()
137 }
138 }
139 };
140 }
141 cast_value_and_serialize!(self; serializer;
142 Boolean, boolean,
143 Binary, binary,
144 Int8, i8,
145 UInt8, u8,
146 Int16, i16,
147 UInt16, u16,
148 Int32, i32,
149 UInt32, u32,
150 Int64, i64,
151 UInt64, u64,
152 Float32, f32,
153 Float64, f64,
154 String, string,
155 Date, date,
156 Time, time,
157 Duration, duration,
158 Decimal128, decimal128,
159 Json, binary,
160 Vector, binary
161 );
162
163 Ok(())
164 }
165
166 pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
167 macro_rules! deserialize_and_build_value {
168 (
169 $self: ident;
170 $serializer: ident;
171 $(
172 $ty: ident, $f: ident
173 ),*
174 ) => {
175
176 match &$self.data_type {
177 $(
178 ConcreteDataType::$ty(_) => {
179 Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
180 }
181 )*
182 ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
183 Option::<Vec<u8>>::deserialize(deserializer)
184 .context(error::DeserializeFieldSnafu)?
185 .map(Bytes::from),
186 )),
187 ConcreteDataType::Timestamp(ty) => {
188 let timestamp = Option::<i64>::deserialize(deserializer)
189 .context(error::DeserializeFieldSnafu)?
190 .map(|t|ty.create_timestamp(t));
191 Ok(Value::from(timestamp))
192 }
193 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
194 let interval = Option::<i32>::deserialize(deserializer)
195 .context(error::DeserializeFieldSnafu)?
196 .map(IntervalYearMonth::from_i32);
197 Ok(Value::from(interval))
198 }
199 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
200 let interval = Option::<i64>::deserialize(deserializer)
201 .context(error::DeserializeFieldSnafu)?
202 .map(IntervalDayTime::from_i64);
203 Ok(Value::from(interval))
204 }
205 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
206 let interval = Option::<i128>::deserialize(deserializer)
207 .context(error::DeserializeFieldSnafu)?
208 .map(IntervalMonthDayNano::from_i128);
209 Ok(Value::from(interval))
210 }
211 ConcreteDataType::List(l) => NotSupportedFieldSnafu {
212 data_type: ConcreteDataType::List(l.clone()),
213 }
214 .fail(),
215 ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
216 data_type: ConcreteDataType::Dictionary(d.clone()),
217 }
218 .fail(),
219 ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
220 data_type: ConcreteDataType::Null(n.clone()),
221 }
222 .fail(),
223 }
224 };
225 }
226 deserialize_and_build_value!(self; deserializer;
227 Boolean, bool,
228 Int8, i8,
229 Int16, i16,
230 Int32, i32,
231 Int64, i64,
232 UInt8, u8,
233 UInt16, u16,
234 UInt32, u32,
235 UInt64, u64,
236 Float32, f32,
237 Float64, f64,
238 String, String,
239 Date, Date,
240 Time, Time,
241 Duration, Duration,
242 Decimal128, Decimal128
243 )
244 }
245
246 pub(crate) fn skip_deserialize(
248 &self,
249 bytes: &[u8],
250 deserializer: &mut Deserializer<&[u8]>,
251 ) -> Result<usize> {
252 let pos = deserializer.position();
253 if bytes[pos] == 0 {
254 deserializer.advance(1);
255 return Ok(1);
256 }
257
258 let to_skip = match &self.data_type {
259 ConcreteDataType::Boolean(_) => 2,
260 ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
261 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
262 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
263 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
264 ConcreteDataType::Float32(_) => 5,
265 ConcreteDataType::Float64(_) => 9,
266 ConcreteDataType::Binary(_)
267 | ConcreteDataType::Json(_)
268 | ConcreteDataType::Vector(_) => {
269 let pos_before = deserializer.position();
272 let mut current = pos_before + 1;
273 while bytes[current] == 1 {
274 current += 2;
275 }
276 let to_skip = current - pos_before + 1;
277 deserializer.advance(to_skip);
278 return Ok(to_skip);
279 }
280 ConcreteDataType::String(_) => {
281 let pos_before = deserializer.position();
282 deserializer.advance(1);
283 deserializer
284 .skip_bytes()
285 .context(error::DeserializeFieldSnafu)?;
286 return Ok(deserializer.position() - pos_before);
287 }
288 ConcreteDataType::Date(_) => 5,
289 ConcreteDataType::Timestamp(_) => 9, ConcreteDataType::Time(_) => 10, ConcreteDataType::Duration(_) => 10,
292 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
293 ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
294 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
295 ConcreteDataType::Decimal128(_) => 19,
296 ConcreteDataType::Null(_)
297 | ConcreteDataType::List(_)
298 | ConcreteDataType::Dictionary(_) => 0,
299 };
300 deserializer.advance(to_skip);
301 Ok(to_skip)
302 }
303}
304
305impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
306 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
307 where
308 I: Iterator<Item = ValueRef<'a>>,
309 {
310 self.encode_dense(row, buffer)
311 }
312}
313
314#[derive(Clone, Debug)]
316pub struct DensePrimaryKeyCodec {
317 ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
319}
320
321impl DensePrimaryKeyCodec {
322 pub fn new(metadata: &RegionMetadata) -> Self {
323 let ordered_primary_key_columns = metadata
324 .primary_key_columns()
325 .map(|c| {
326 (
327 c.column_id,
328 SortField::new(c.column_schema.data_type.clone()),
329 )
330 })
331 .collect::<Vec<_>>();
332
333 Self::with_fields(ordered_primary_key_columns)
334 }
335
336 pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
337 Self {
338 ordered_primary_key_columns: Arc::new(fields),
339 }
340 }
341
342 fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
343 where
344 I: Iterator<Item = ValueRef<'a>>,
345 {
346 let mut serializer = Serializer::new(buffer);
347 for (idx, value) in row.enumerate() {
348 self.field_at(idx).serialize(&mut serializer, &value)?;
349 }
350 Ok(())
351 }
352
353 pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
355 let mut deserializer = Deserializer::new(bytes);
356 let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
357 for (column_id, field) in self.ordered_primary_key_columns.iter() {
358 let value = field.deserialize(&mut deserializer)?;
359 values.push((*column_id, value));
360 }
361 Ok(values)
362 }
363
364 pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
366 let mut deserializer = Deserializer::new(bytes);
367 let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
368 for (_, field) in self.ordered_primary_key_columns.iter() {
369 let value = field.deserialize(&mut deserializer)?;
370 values.push(value);
371 }
372 Ok(values)
373 }
374
375 fn field_at(&self, pos: usize) -> &SortField {
380 &self.ordered_primary_key_columns[pos].1
381 }
382
383 pub fn decode_value_at(
387 &self,
388 bytes: &[u8],
389 pos: usize,
390 offsets_buf: &mut Vec<usize>,
391 ) -> Result<Value> {
392 let mut deserializer = Deserializer::new(bytes);
393 if pos < offsets_buf.len() {
394 let to_skip = offsets_buf[pos];
396 deserializer.advance(to_skip);
397 return self.field_at(pos).deserialize(&mut deserializer);
398 }
399
400 if offsets_buf.is_empty() {
401 let mut offset = 0;
402 for i in 0..pos {
404 offsets_buf.push(offset);
406 let skip = self
407 .field_at(i)
408 .skip_deserialize(bytes, &mut deserializer)?;
409 offset += skip;
410 }
411 offsets_buf.push(offset);
413 } else {
414 let value_start = offsets_buf.len() - 1;
416 let mut offset = offsets_buf[value_start];
418 deserializer.advance(offset);
419 for i in value_start..pos {
420 let skip = self
422 .field_at(i)
423 .skip_deserialize(bytes, &mut deserializer)?;
424 offset += skip;
426 offsets_buf.push(offset);
427 }
428 }
429
430 self.field_at(pos).deserialize(&mut deserializer)
431 }
432
433 pub fn estimated_size(&self) -> usize {
434 self.ordered_primary_key_columns
435 .iter()
436 .map(|(_, f)| f.estimated_size())
437 .sum()
438 }
439
440 pub fn num_fields(&self) -> usize {
441 self.ordered_primary_key_columns.len()
442 }
443}
444
445impl PrimaryKeyCodec for DensePrimaryKeyCodec {
446 fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
447 self.encode_dense(key_value.primary_keys(), buffer)
448 }
449
450 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
451 self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
452 }
453
454 fn encode_value_refs(
455 &self,
456 values: &[(ColumnId, ValueRef)],
457 buffer: &mut Vec<u8>,
458 ) -> Result<()> {
459 let iter = values.iter().map(|(_, v)| *v);
460 self.encode_dense(iter, buffer)
461 }
462
463 fn estimated_size(&self) -> Option<usize> {
464 Some(self.estimated_size())
465 }
466
467 fn num_fields(&self) -> Option<usize> {
468 Some(self.num_fields())
469 }
470
471 fn encoding(&self) -> PrimaryKeyEncoding {
472 PrimaryKeyEncoding::Dense
473 }
474
475 fn primary_key_filter(
476 &self,
477 metadata: &RegionMetadataRef,
478 filters: Arc<Vec<SimpleFilterEvaluator>>,
479 ) -> Box<dyn PrimaryKeyFilter> {
480 Box::new(DensePrimaryKeyFilter::new(
481 metadata.clone(),
482 filters,
483 self.clone(),
484 ))
485 }
486
487 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
488 Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
489 }
490
491 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
492 let mut values = self.decode_dense(bytes)?;
494 Ok(values.pop().map(|(_, v)| v))
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use common_base::bytes::StringBytes;
501 use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
502 use datatypes::value::Value;
503
504 use super::*;
505
506 fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
507 let encoder = DensePrimaryKeyCodec::with_fields(
508 data_types
509 .iter()
510 .map(|t| (0, SortField::new(t.clone())))
511 .collect::<Vec<_>>(),
512 );
513
514 let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
515
516 let result = encoder.encode(value_ref.iter().cloned()).unwrap();
517 let decoded = encoder.decode(&result).unwrap().into_dense();
518 assert_eq!(decoded, row);
519 let mut decoded = Vec::new();
520 let mut offsets = Vec::new();
521 for _ in 0..2 {
523 decoded.clear();
524 for i in 0..data_types.len() {
525 let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
526 decoded.push(value);
527 }
528 assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
529 assert_eq!(decoded, row);
530 }
531 }
532
533 #[test]
534 fn test_memcmp() {
535 let encoder = DensePrimaryKeyCodec::with_fields(vec![
536 (0, SortField::new(ConcreteDataType::string_datatype())),
537 (1, SortField::new(ConcreteDataType::int64_datatype())),
538 ]);
539 let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
540 let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
541 let result = encoder.encode(value_ref.iter().cloned()).unwrap();
542
543 let decoded = encoder.decode(&result).unwrap().into_dense();
544 assert_eq!(&values, &decoded as &[Value]);
545 }
546
547 #[test]
548 fn test_memcmp_timestamp() {
549 check_encode_and_decode(
550 &[
551 ConcreteDataType::timestamp_millisecond_datatype(),
552 ConcreteDataType::int64_datatype(),
553 ],
554 vec![
555 Value::Timestamp(Timestamp::new_millisecond(42)),
556 Value::Int64(43),
557 ],
558 );
559 }
560
561 #[test]
562 fn test_memcmp_duration() {
563 check_encode_and_decode(
564 &[
565 ConcreteDataType::duration_millisecond_datatype(),
566 ConcreteDataType::int64_datatype(),
567 ],
568 vec![
569 Value::Duration(Duration::new_millisecond(44)),
570 Value::Int64(45),
571 ],
572 )
573 }
574
575 #[test]
576 fn test_memcmp_binary() {
577 check_encode_and_decode(
578 &[
579 ConcreteDataType::binary_datatype(),
580 ConcreteDataType::int64_datatype(),
581 ],
582 vec![
583 Value::Binary(Bytes::from("hello".as_bytes())),
584 Value::Int64(43),
585 ],
586 );
587 }
588
589 #[test]
590 fn test_memcmp_string() {
591 check_encode_and_decode(
592 &[ConcreteDataType::string_datatype()],
593 vec![Value::String(StringBytes::from("hello"))],
594 );
595
596 check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
597
598 check_encode_and_decode(
599 &[ConcreteDataType::string_datatype()],
600 vec![Value::String("".into())],
601 );
602 check_encode_and_decode(
603 &[ConcreteDataType::string_datatype()],
604 vec![Value::String("world".into())],
605 );
606 }
607
608 #[test]
609 fn test_encode_null() {
610 check_encode_and_decode(
611 &[
612 ConcreteDataType::string_datatype(),
613 ConcreteDataType::int32_datatype(),
614 ],
615 vec![Value::String(StringBytes::from("abcd")), Value::Null],
616 )
617 }
618
619 #[test]
620 fn test_encode_multiple_rows() {
621 check_encode_and_decode(
622 &[
623 ConcreteDataType::string_datatype(),
624 ConcreteDataType::int64_datatype(),
625 ConcreteDataType::boolean_datatype(),
626 ],
627 vec![
628 Value::String("hello".into()),
629 Value::Int64(42),
630 Value::Boolean(false),
631 ],
632 );
633
634 check_encode_and_decode(
635 &[
636 ConcreteDataType::string_datatype(),
637 ConcreteDataType::int64_datatype(),
638 ConcreteDataType::boolean_datatype(),
639 ],
640 vec![
641 Value::String("world".into()),
642 Value::Int64(43),
643 Value::Boolean(true),
644 ],
645 );
646
647 check_encode_and_decode(
648 &[
649 ConcreteDataType::string_datatype(),
650 ConcreteDataType::int64_datatype(),
651 ConcreteDataType::boolean_datatype(),
652 ],
653 vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
654 );
655
656 check_encode_and_decode(
658 &[
659 ConcreteDataType::boolean_datatype(),
660 ConcreteDataType::int8_datatype(),
661 ConcreteDataType::uint8_datatype(),
662 ConcreteDataType::int16_datatype(),
663 ConcreteDataType::uint16_datatype(),
664 ConcreteDataType::int32_datatype(),
665 ConcreteDataType::uint32_datatype(),
666 ConcreteDataType::int64_datatype(),
667 ConcreteDataType::uint64_datatype(),
668 ConcreteDataType::float32_datatype(),
669 ConcreteDataType::float64_datatype(),
670 ConcreteDataType::binary_datatype(),
671 ConcreteDataType::string_datatype(),
672 ConcreteDataType::date_datatype(),
673 ConcreteDataType::timestamp_millisecond_datatype(),
674 ConcreteDataType::time_millisecond_datatype(),
675 ConcreteDataType::duration_millisecond_datatype(),
676 ConcreteDataType::interval_year_month_datatype(),
677 ConcreteDataType::interval_day_time_datatype(),
678 ConcreteDataType::interval_month_day_nano_datatype(),
679 ConcreteDataType::decimal128_default_datatype(),
680 ConcreteDataType::vector_datatype(3),
681 ],
682 vec![
683 Value::Boolean(true),
684 Value::Int8(8),
685 Value::UInt8(8),
686 Value::Int16(16),
687 Value::UInt16(16),
688 Value::Int32(32),
689 Value::UInt32(32),
690 Value::Int64(64),
691 Value::UInt64(64),
692 Value::Float32(1.0.into()),
693 Value::Float64(1.0.into()),
694 Value::Binary(b"hello"[..].into()),
695 Value::String("world".into()),
696 Value::Date(Date::new(10)),
697 Value::Timestamp(Timestamp::new_millisecond(12)),
698 Value::Time(Time::new_millisecond(13)),
699 Value::Duration(Duration::new_millisecond(14)),
700 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
701 Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
702 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
703 Value::Decimal128(Decimal128::from(16)),
704 Value::Binary(Bytes::from(vec![0; 12])),
705 ],
706 );
707 }
708}