mito_codec/row_converter/
dense.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Buf;
18use common_base::bytes::Bytes;
19use common_decimal::Decimal128;
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use common_time::time::Time;
22use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Value;
25use datatypes::types::IntervalType;
26use datatypes::value::ValueRef;
27use memcomparable::{Deserializer, Serializer};
28use paste::paste;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::{RegionMetadata, RegionMetadataRef};
33use store_api::storage::ColumnId;
34
35use crate::error::{
36    self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
37};
38use crate::key_values::KeyValue;
39use crate::primary_key_filter::DensePrimaryKeyFilter;
40use crate::row_converter::{
41    CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
42};
43
44/// Field to serialize and deserialize value in memcomparable format.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct SortField {
47    data_type: ConcreteDataType,
48}
49
50impl SortField {
51    pub fn new(data_type: ConcreteDataType) -> Self {
52        Self { data_type }
53    }
54
55    /// Returns the data type of the field.
56    pub fn data_type(&self) -> &ConcreteDataType {
57        &self.data_type
58    }
59
60    /// Returns the physical data type to encode of the field.
61    ///
62    /// For example, a dictionary field will be encoded as its value type.
63    pub fn encode_data_type(&self) -> &ConcreteDataType {
64        match &self.data_type {
65            ConcreteDataType::Dictionary(dict_type) => dict_type.value_type(),
66            _ => &self.data_type,
67        }
68    }
69
70    pub fn estimated_size(&self) -> usize {
71        Self::estimated_size_by_type(self.encode_data_type())
72    }
73
74    fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
75        match data_type {
76            ConcreteDataType::Boolean(_) => 2,
77            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
78            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
79            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
80            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
81            ConcreteDataType::Float32(_) => 5,
82            ConcreteDataType::Float64(_) => 9,
83            ConcreteDataType::Binary(_)
84            | ConcreteDataType::Json(_)
85            | ConcreteDataType::Vector(_) => 11,
86            ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
87            ConcreteDataType::Date(_) => 5,
88            ConcreteDataType::Timestamp(_) => 10,
89            ConcreteDataType::Time(_) => 10,
90            ConcreteDataType::Duration(_) => 10,
91            ConcreteDataType::Interval(_) => 18,
92            ConcreteDataType::Decimal128(_) => 19,
93            ConcreteDataType::Null(_)
94            | ConcreteDataType::List(_)
95            | ConcreteDataType::Struct(_)
96            | ConcreteDataType::Dictionary(_) => 0,
97        }
98    }
99
100    /// Serialize a value to the serializer.
101    pub fn serialize(
102        &self,
103        serializer: &mut Serializer<&mut Vec<u8>>,
104        value: &ValueRef,
105    ) -> Result<()> {
106        Self::serialize_by_type(self.encode_data_type(), serializer, value)
107    }
108
109    fn serialize_by_type(
110        data_type: &ConcreteDataType,
111        serializer: &mut Serializer<&mut Vec<u8>>,
112        value: &ValueRef,
113    ) -> Result<()> {
114        macro_rules! cast_value_and_serialize {
115            (
116                $data_type: ident;
117                $serializer: ident;
118                $(
119                    $ty: ident, $f: ident
120                ),*
121            ) => {
122                match $data_type {
123                $(
124                    ConcreteDataType::$ty(_) => {
125                        paste!{
126                            value
127                            .[<try_into_ $f>]()
128                            .context(FieldTypeMismatchSnafu)?
129                            .serialize($serializer)
130                            .context(SerializeFieldSnafu)?;
131                        }
132                    }
133                )*
134                    ConcreteDataType::Timestamp(_) => {
135                        let timestamp = value.try_into_timestamp().context(FieldTypeMismatchSnafu)?;
136                        timestamp
137                            .map(|t|t.value())
138                            .serialize($serializer)
139                            .context(SerializeFieldSnafu)?;
140                    }
141                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
142                        let interval = value.try_into_interval_year_month().context(FieldTypeMismatchSnafu)?;
143                        interval.map(|i| i.to_i32())
144                            .serialize($serializer)
145                            .context(SerializeFieldSnafu)?;
146                    }
147                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
148                        let interval = value.try_into_interval_day_time().context(FieldTypeMismatchSnafu)?;
149                        interval.map(|i| i.to_i64())
150                            .serialize($serializer)
151                            .context(SerializeFieldSnafu)?;
152                    }
153                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
154                        let interval = value.try_into_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
155                        interval.map(|i| i.to_i128())
156                            .serialize($serializer)
157                            .context(SerializeFieldSnafu)?;
158                    }
159                    ConcreteDataType::List(_) |
160                    ConcreteDataType::Struct(_) |
161                    ConcreteDataType::Dictionary(_) |
162                    ConcreteDataType::Null(_) => {
163                        return error::NotSupportedFieldSnafu {
164                            data_type: $data_type.clone()
165                        }.fail()
166                    }
167                }
168            };
169        }
170        cast_value_and_serialize!(data_type; serializer;
171            Boolean, boolean,
172            Binary, binary,
173            Int8, i8,
174            UInt8, u8,
175            Int16, i16,
176            UInt16, u16,
177            Int32, i32,
178            UInt32, u32,
179            Int64, i64,
180            UInt64, u64,
181            Float32, f32,
182            Float64, f64,
183            String, string,
184            Date, date,
185            Time, time,
186            Duration, duration,
187            Decimal128, decimal128,
188            Json, binary,
189            Vector, binary
190        );
191
192        Ok(())
193    }
194
195    /// Deserialize a value from the deserializer.
196    pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
197        Self::deserialize_by_type(self.encode_data_type(), deserializer)
198    }
199
200    fn deserialize_by_type<B: Buf>(
201        data_type: &ConcreteDataType,
202        deserializer: &mut Deserializer<B>,
203    ) -> Result<Value> {
204        macro_rules! deserialize_and_build_value {
205            (
206                $data_type: ident;
207                $serializer: ident;
208                $(
209                    $ty: ident, $f: ident
210                ),*
211            ) => {
212
213                match $data_type {
214                    $(
215                        ConcreteDataType::$ty(_) => {
216                            Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
217                        }
218                    )*
219                    ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
220                        Option::<Vec<u8>>::deserialize(deserializer)
221                            .context(error::DeserializeFieldSnafu)?
222                            .map(Bytes::from),
223                    )),
224                    ConcreteDataType::Timestamp(ty) => {
225                        let timestamp = Option::<i64>::deserialize(deserializer)
226                            .context(error::DeserializeFieldSnafu)?
227                            .map(|t|ty.create_timestamp(t));
228                        Ok(Value::from(timestamp))
229                    }
230                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
231                        let interval = Option::<i32>::deserialize(deserializer)
232                            .context(error::DeserializeFieldSnafu)?
233                            .map(IntervalYearMonth::from_i32);
234                        Ok(Value::from(interval))
235                    }
236                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
237                        let interval = Option::<i64>::deserialize(deserializer)
238                            .context(error::DeserializeFieldSnafu)?
239                            .map(IntervalDayTime::from_i64);
240                        Ok(Value::from(interval))
241                    }
242                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
243                        let interval = Option::<i128>::deserialize(deserializer)
244                            .context(error::DeserializeFieldSnafu)?
245                            .map(IntervalMonthDayNano::from_i128);
246                        Ok(Value::from(interval))
247                    }
248                    ConcreteDataType::List(l) => NotSupportedFieldSnafu {
249                        data_type: ConcreteDataType::List(l.clone()),
250                    }
251                    .fail(),
252                    ConcreteDataType::Struct(f) => NotSupportedFieldSnafu {
253                        data_type: ConcreteDataType::Struct(f.clone()),
254                    }
255                    .fail(),
256                    ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
257                        data_type: ConcreteDataType::Dictionary(d.clone()),
258                    }
259                    .fail(),
260                    ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
261                        data_type: ConcreteDataType::Null(n.clone()),
262                    }
263                    .fail(),
264                }
265            };
266        }
267        deserialize_and_build_value!(data_type; deserializer;
268            Boolean, bool,
269            Int8, i8,
270            Int16, i16,
271            Int32, i32,
272            Int64, i64,
273            UInt8, u8,
274            UInt16, u16,
275            UInt32, u32,
276            UInt64, u64,
277            Float32, f32,
278            Float64, f64,
279            String, String,
280            Date, Date,
281            Time, Time,
282            Duration, Duration,
283            Decimal128, Decimal128
284        )
285    }
286
287    /// Skip deserializing this field, returns the length of it.
288    pub(crate) fn skip_deserialize(
289        &self,
290        bytes: &[u8],
291        deserializer: &mut Deserializer<&[u8]>,
292    ) -> Result<usize> {
293        let pos = deserializer.position();
294        if bytes[pos] == 0 {
295            deserializer.advance(1);
296            return Ok(1);
297        }
298
299        Self::skip_deserialize_by_type(self.encode_data_type(), bytes, deserializer)
300    }
301
302    fn skip_deserialize_by_type(
303        data_type: &ConcreteDataType,
304        bytes: &[u8],
305        deserializer: &mut Deserializer<&[u8]>,
306    ) -> Result<usize> {
307        let to_skip = match data_type {
308            ConcreteDataType::Boolean(_) => 2,
309            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
310            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
311            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
312            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
313            ConcreteDataType::Float32(_) => 5,
314            ConcreteDataType::Float64(_) => 9,
315            ConcreteDataType::Binary(_)
316            | ConcreteDataType::Json(_)
317            | ConcreteDataType::Vector(_) => {
318                // Now the encoder encode binary as a list of bytes so we can't use
319                // skip bytes.
320                let pos_before = deserializer.position();
321                let mut current = pos_before + 1;
322                while bytes[current] == 1 {
323                    current += 2;
324                }
325                let to_skip = current - pos_before + 1;
326                deserializer.advance(to_skip);
327                return Ok(to_skip);
328            }
329            ConcreteDataType::String(_) => {
330                let pos_before = deserializer.position();
331                deserializer.advance(1);
332                deserializer
333                    .skip_bytes()
334                    .context(error::DeserializeFieldSnafu)?;
335                return Ok(deserializer.position() - pos_before);
336            }
337            ConcreteDataType::Date(_) => 5,
338            ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
339            ConcreteDataType::Time(_) => 10,     // i64 and 1 byte time unit
340            ConcreteDataType::Duration(_) => 10,
341            ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
342            ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
343            ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
344            ConcreteDataType::Decimal128(_) => 19,
345            ConcreteDataType::Null(_)
346            | ConcreteDataType::List(_)
347            | ConcreteDataType::Struct(_)
348            | ConcreteDataType::Dictionary(_) => 0,
349        };
350        deserializer.advance(to_skip);
351        Ok(to_skip)
352    }
353}
354
355impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
356    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
357    where
358        I: Iterator<Item = ValueRef<'a>>,
359    {
360        self.encode_dense(row, buffer)
361    }
362}
363
364/// A memory-comparable row [`Value`] encoder/decoder.
365#[derive(Clone, Debug)]
366pub struct DensePrimaryKeyCodec {
367    /// Primary key fields.
368    ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
369}
370
371impl DensePrimaryKeyCodec {
372    pub fn new(metadata: &RegionMetadata) -> Self {
373        let ordered_primary_key_columns = metadata
374            .primary_key_columns()
375            .map(|c| {
376                (
377                    c.column_id,
378                    SortField::new(c.column_schema.data_type.clone()),
379                )
380            })
381            .collect::<Vec<_>>();
382
383        Self::with_fields(ordered_primary_key_columns)
384    }
385
386    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
387        Self {
388            ordered_primary_key_columns: Arc::new(fields),
389        }
390    }
391
392    fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
393    where
394        I: Iterator<Item = ValueRef<'a>>,
395    {
396        let mut serializer = Serializer::new(buffer);
397        for (idx, value) in row.enumerate() {
398            self.field_at(idx).serialize(&mut serializer, &value)?;
399        }
400        Ok(())
401    }
402
403    /// Decode primary key values from bytes.
404    pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
405        let mut deserializer = Deserializer::new(bytes);
406        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
407        for (column_id, field) in self.ordered_primary_key_columns.iter() {
408            let value = field.deserialize(&mut deserializer)?;
409            values.push((*column_id, value));
410        }
411        Ok(values)
412    }
413
414    /// Decode primary key values from bytes without column id.
415    pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
416        let mut deserializer = Deserializer::new(bytes);
417        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
418        for (_, field) in self.ordered_primary_key_columns.iter() {
419            let value = field.deserialize(&mut deserializer)?;
420            values.push(value);
421        }
422        Ok(values)
423    }
424
425    /// Returns the field at `pos`.
426    ///
427    /// # Panics
428    /// Panics if `pos` is out of bounds.
429    fn field_at(&self, pos: usize) -> &SortField {
430        &self.ordered_primary_key_columns[pos].1
431    }
432
433    /// Advances `deserializer` to the start of value at `pos`.
434    ///
435    /// Returns the offset of the value at `pos` in `bytes`.
436    fn advance_to_value_at(
437        &self,
438        bytes: &[u8],
439        pos: usize,
440        offsets_buf: &mut Vec<usize>,
441        deserializer: &mut Deserializer<&[u8]>,
442    ) -> Result<usize> {
443        if pos < offsets_buf.len() {
444            // We computed the offset before.
445            let offset = offsets_buf[pos];
446            deserializer.advance(offset);
447            return Ok(offset);
448        }
449
450        if offsets_buf.is_empty() {
451            let mut offset = 0;
452            // Skip values before `pos`.
453            for i in 0..pos {
454                // Offset to skip before reading value i.
455                offsets_buf.push(offset);
456                let skip = self.field_at(i).skip_deserialize(bytes, deserializer)?;
457                offset += skip;
458            }
459            // Offset to skip before reading this value.
460            offsets_buf.push(offset);
461            Ok(offset)
462        } else {
463            // Offsets are not enough.
464            let value_start = offsets_buf.len() - 1;
465            // Advances to decode value at `value_start`.
466            let mut offset = offsets_buf[value_start];
467            deserializer.advance(offset);
468            for i in value_start..pos {
469                // Skip value i.
470                let skip = self.field_at(i).skip_deserialize(bytes, deserializer)?;
471                // Offset for the value at i + 1.
472                offset += skip;
473                offsets_buf.push(offset);
474            }
475            Ok(offset)
476        }
477    }
478
479    /// Decode value at `pos` in `bytes`.
480    ///
481    /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
482    pub fn decode_value_at(
483        &self,
484        bytes: &[u8],
485        pos: usize,
486        offsets_buf: &mut Vec<usize>,
487    ) -> Result<Value> {
488        let mut deserializer = Deserializer::new(bytes);
489        self.advance_to_value_at(bytes, pos, offsets_buf, &mut deserializer)?;
490
491        self.field_at(pos).deserialize(&mut deserializer)
492    }
493
494    /// Returns the encoded bytes at `pos` in `bytes`.
495    ///
496    /// The i-th element in offsets buffer is how many bytes to skip in order to read value at
497    /// `pos`.
498    pub fn encoded_value_at<'a>(
499        &self,
500        bytes: &'a [u8],
501        pos: usize,
502        offsets_buf: &mut Vec<usize>,
503    ) -> Result<&'a [u8]> {
504        let mut deserializer = Deserializer::new(bytes);
505        let offset = self.advance_to_value_at(bytes, pos, offsets_buf, &mut deserializer)?;
506
507        let len = self
508            .field_at(pos)
509            .skip_deserialize(bytes, &mut deserializer)?;
510        Ok(&bytes[offset..offset + len])
511    }
512
513    pub fn estimated_size(&self) -> usize {
514        self.ordered_primary_key_columns
515            .iter()
516            .map(|(_, f)| f.estimated_size())
517            .sum()
518    }
519
520    pub fn num_fields(&self) -> usize {
521        self.ordered_primary_key_columns.len()
522    }
523}
524
525impl PrimaryKeyCodec for DensePrimaryKeyCodec {
526    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
527        self.encode_dense(key_value.primary_keys(), buffer)
528    }
529
530    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
531        self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
532    }
533
534    fn encode_value_refs(
535        &self,
536        values: &[(ColumnId, ValueRef)],
537        buffer: &mut Vec<u8>,
538    ) -> Result<()> {
539        let iter = values.iter().map(|(_, v)| v.clone());
540        self.encode_dense(iter, buffer)
541    }
542
543    fn estimated_size(&self) -> Option<usize> {
544        Some(self.estimated_size())
545    }
546
547    fn num_fields(&self) -> Option<usize> {
548        Some(self.num_fields())
549    }
550
551    fn encoding(&self) -> PrimaryKeyEncoding {
552        PrimaryKeyEncoding::Dense
553    }
554
555    fn primary_key_filter(
556        &self,
557        metadata: &RegionMetadataRef,
558        filters: Arc<Vec<SimpleFilterEvaluator>>,
559    ) -> Box<dyn PrimaryKeyFilter> {
560        Box::new(DensePrimaryKeyFilter::new(
561            metadata.clone(),
562            filters,
563            self.clone(),
564        ))
565    }
566
567    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
568        Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
569    }
570
571    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
572        // TODO(weny, yinwen): avoid decoding the whole primary key.
573        let mut values = self.decode_dense(bytes)?;
574        Ok(values.pop().map(|(_, v)| v))
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use common_base::bytes::StringBytes;
581    use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
582    use datatypes::value::Value;
583
584    use super::*;
585
586    fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
587        let encoder = DensePrimaryKeyCodec::with_fields(
588            data_types
589                .iter()
590                .map(|t| (0, SortField::new(t.clone())))
591                .collect::<Vec<_>>(),
592        );
593
594        let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
595
596        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
597        let decoded = encoder.decode(&result).unwrap().into_dense();
598        assert_eq!(decoded, row);
599        let mut decoded = Vec::new();
600        let mut offsets = Vec::new();
601        // Iter two times to test offsets buffer.
602        for _ in 0..2 {
603            decoded.clear();
604            for i in 0..data_types.len() {
605                let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
606                decoded.push(value);
607            }
608            assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
609            assert_eq!(decoded, row);
610        }
611    }
612
613    #[test]
614    fn test_memcmp() {
615        let encoder = DensePrimaryKeyCodec::with_fields(vec![
616            (0, SortField::new(ConcreteDataType::string_datatype())),
617            (1, SortField::new(ConcreteDataType::int64_datatype())),
618        ]);
619        let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
620        let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
621        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
622
623        let decoded = encoder.decode(&result).unwrap().into_dense();
624        assert_eq!(&values, &decoded as &[Value]);
625    }
626
627    #[test]
628    fn test_memcmp_timestamp() {
629        check_encode_and_decode(
630            &[
631                ConcreteDataType::timestamp_millisecond_datatype(),
632                ConcreteDataType::int64_datatype(),
633            ],
634            vec![
635                Value::Timestamp(Timestamp::new_millisecond(42)),
636                Value::Int64(43),
637            ],
638        );
639    }
640
641    #[test]
642    fn test_memcmp_duration() {
643        check_encode_and_decode(
644            &[
645                ConcreteDataType::duration_millisecond_datatype(),
646                ConcreteDataType::int64_datatype(),
647            ],
648            vec![
649                Value::Duration(Duration::new_millisecond(44)),
650                Value::Int64(45),
651            ],
652        )
653    }
654
655    #[test]
656    fn test_memcmp_binary() {
657        check_encode_and_decode(
658            &[
659                ConcreteDataType::binary_datatype(),
660                ConcreteDataType::int64_datatype(),
661            ],
662            vec![
663                Value::Binary(Bytes::from("hello".as_bytes())),
664                Value::Int64(43),
665            ],
666        );
667    }
668
669    #[test]
670    fn test_memcmp_string() {
671        check_encode_and_decode(
672            &[ConcreteDataType::string_datatype()],
673            vec![Value::String(StringBytes::from("hello"))],
674        );
675
676        check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
677
678        check_encode_and_decode(
679            &[ConcreteDataType::string_datatype()],
680            vec![Value::String("".into())],
681        );
682        check_encode_and_decode(
683            &[ConcreteDataType::string_datatype()],
684            vec![Value::String("world".into())],
685        );
686    }
687
688    #[test]
689    fn test_encode_null() {
690        check_encode_and_decode(
691            &[
692                ConcreteDataType::string_datatype(),
693                ConcreteDataType::int32_datatype(),
694            ],
695            vec![Value::String(StringBytes::from("abcd")), Value::Null],
696        )
697    }
698
699    #[test]
700    fn test_encoded_value_at() {
701        let data_types = [
702            ConcreteDataType::string_datatype(),
703            ConcreteDataType::int32_datatype(),
704            ConcreteDataType::string_datatype(),
705        ];
706        let encoder = DensePrimaryKeyCodec::with_fields(
707            data_types
708                .iter()
709                .enumerate()
710                .map(|(idx, t)| (idx as ColumnId, SortField::new(t.clone())))
711                .collect::<Vec<_>>(),
712        );
713        let row = [Value::String("hello".into()), Value::Int32(42), Value::Null];
714        let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
715        let encoded_pk = encoder.encode(value_ref.iter().cloned()).unwrap();
716
717        let mut offsets = Vec::new();
718        let mut combined = Vec::new();
719        for (pos, value) in row.iter().enumerate() {
720            let encoded_value = encoder
721                .encoded_value_at(&encoded_pk, pos, &mut offsets)
722                .unwrap();
723            combined.extend_from_slice(encoded_value);
724
725            let field = SortField::new(data_types[pos].clone());
726            let mut expected = Vec::new();
727            let mut serializer = Serializer::new(&mut expected);
728            field
729                .serialize(&mut serializer, &value.as_value_ref())
730                .unwrap();
731            assert_eq!(encoded_value, expected.as_slice());
732        }
733        assert_eq!(combined, encoded_pk);
734        assert_eq!(offsets.len(), row.len());
735
736        // Verify the offsets buffer can be reused for random access.
737        for (pos, value) in row.iter().enumerate().rev() {
738            let encoded_value = encoder
739                .encoded_value_at(&encoded_pk, pos, &mut offsets)
740                .unwrap();
741
742            let field = SortField::new(data_types[pos].clone());
743            let mut expected = Vec::new();
744            let mut serializer = Serializer::new(&mut expected);
745            field
746                .serialize(&mut serializer, &value.as_value_ref())
747                .unwrap();
748            assert_eq!(encoded_value, expected.as_slice());
749        }
750    }
751
752    #[test]
753    fn test_memcmp_dictionary() {
754        // Test Dictionary<i32, string>
755        check_encode_and_decode(
756            &[ConcreteDataType::dictionary_datatype(
757                ConcreteDataType::int32_datatype(),
758                ConcreteDataType::string_datatype(),
759            )],
760            vec![Value::String("hello".into())],
761        );
762
763        // Test Dictionary<i32, i64>
764        check_encode_and_decode(
765            &[ConcreteDataType::dictionary_datatype(
766                ConcreteDataType::int32_datatype(),
767                ConcreteDataType::int64_datatype(),
768            )],
769            vec![Value::Int64(42)],
770        );
771
772        // Test Dictionary with null value
773        check_encode_and_decode(
774            &[ConcreteDataType::dictionary_datatype(
775                ConcreteDataType::int32_datatype(),
776                ConcreteDataType::string_datatype(),
777            )],
778            vec![Value::Null],
779        );
780
781        // Test multiple Dictionary columns
782        check_encode_and_decode(
783            &[
784                ConcreteDataType::dictionary_datatype(
785                    ConcreteDataType::int32_datatype(),
786                    ConcreteDataType::string_datatype(),
787                ),
788                ConcreteDataType::dictionary_datatype(
789                    ConcreteDataType::int16_datatype(),
790                    ConcreteDataType::int64_datatype(),
791                ),
792            ],
793            vec![Value::String("world".into()), Value::Int64(123)],
794        );
795    }
796
797    #[test]
798    fn test_encode_multiple_rows() {
799        check_encode_and_decode(
800            &[
801                ConcreteDataType::string_datatype(),
802                ConcreteDataType::int64_datatype(),
803                ConcreteDataType::boolean_datatype(),
804            ],
805            vec![
806                Value::String("hello".into()),
807                Value::Int64(42),
808                Value::Boolean(false),
809            ],
810        );
811
812        check_encode_and_decode(
813            &[
814                ConcreteDataType::string_datatype(),
815                ConcreteDataType::int64_datatype(),
816                ConcreteDataType::boolean_datatype(),
817            ],
818            vec![
819                Value::String("world".into()),
820                Value::Int64(43),
821                Value::Boolean(true),
822            ],
823        );
824
825        check_encode_and_decode(
826            &[
827                ConcreteDataType::string_datatype(),
828                ConcreteDataType::int64_datatype(),
829                ConcreteDataType::boolean_datatype(),
830            ],
831            vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
832        );
833
834        // All types.
835        check_encode_and_decode(
836            &[
837                ConcreteDataType::boolean_datatype(),
838                ConcreteDataType::int8_datatype(),
839                ConcreteDataType::uint8_datatype(),
840                ConcreteDataType::int16_datatype(),
841                ConcreteDataType::uint16_datatype(),
842                ConcreteDataType::int32_datatype(),
843                ConcreteDataType::uint32_datatype(),
844                ConcreteDataType::int64_datatype(),
845                ConcreteDataType::uint64_datatype(),
846                ConcreteDataType::float32_datatype(),
847                ConcreteDataType::float64_datatype(),
848                ConcreteDataType::binary_datatype(),
849                ConcreteDataType::string_datatype(),
850                ConcreteDataType::date_datatype(),
851                ConcreteDataType::timestamp_millisecond_datatype(),
852                ConcreteDataType::time_millisecond_datatype(),
853                ConcreteDataType::duration_millisecond_datatype(),
854                ConcreteDataType::interval_year_month_datatype(),
855                ConcreteDataType::interval_day_time_datatype(),
856                ConcreteDataType::interval_month_day_nano_datatype(),
857                ConcreteDataType::decimal128_default_datatype(),
858                ConcreteDataType::vector_datatype(3),
859                ConcreteDataType::dictionary_datatype(
860                    ConcreteDataType::int32_datatype(),
861                    ConcreteDataType::string_datatype(),
862                ),
863            ],
864            vec![
865                Value::Boolean(true),
866                Value::Int8(8),
867                Value::UInt8(8),
868                Value::Int16(16),
869                Value::UInt16(16),
870                Value::Int32(32),
871                Value::UInt32(32),
872                Value::Int64(64),
873                Value::UInt64(64),
874                Value::Float32(1.0.into()),
875                Value::Float64(1.0.into()),
876                Value::Binary(b"hello"[..].into()),
877                Value::String("world".into()),
878                Value::Date(Date::new(10)),
879                Value::Timestamp(Timestamp::new_millisecond(12)),
880                Value::Time(Time::new_millisecond(13)),
881                Value::Duration(Duration::new_millisecond(14)),
882                Value::IntervalYearMonth(IntervalYearMonth::new(1)),
883                Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
884                Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
885                Value::Decimal128(Decimal128::from(16)),
886                Value::Binary(Bytes::from(vec![0; 12])),
887                Value::String("dict_value".into()),
888            ],
889        );
890    }
891}