mito_codec/row_converter/
dense.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Buf;
18use common_base::bytes::Bytes;
19use common_decimal::Decimal128;
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use common_time::time::Time;
22use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Value;
25use datatypes::types::IntervalType;
26use datatypes::value::ValueRef;
27use memcomparable::{Deserializer, Serializer};
28use paste::paste;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::{RegionMetadata, RegionMetadataRef};
33use store_api::storage::ColumnId;
34
35use crate::error::{
36    self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
37};
38use crate::key_values::KeyValue;
39use crate::primary_key_filter::DensePrimaryKeyFilter;
40use crate::row_converter::{
41    CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
42};
43
44/// Field to serialize and deserialize value in memcomparable format.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct SortField {
47    data_type: ConcreteDataType,
48}
49
50impl SortField {
51    pub fn new(data_type: ConcreteDataType) -> Self {
52        Self { data_type }
53    }
54
55    /// Returns the data type of the field.
56    pub fn data_type(&self) -> &ConcreteDataType {
57        &self.data_type
58    }
59
60    pub fn estimated_size(&self) -> usize {
61        match &self.data_type {
62            ConcreteDataType::Dictionary(dict_type) => {
63                Self::estimated_size_by_type(dict_type.value_type())
64            }
65            data_type => Self::estimated_size_by_type(data_type),
66        }
67    }
68
69    fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
70        match data_type {
71            ConcreteDataType::Boolean(_) => 2,
72            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
73            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
74            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
75            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
76            ConcreteDataType::Float32(_) => 5,
77            ConcreteDataType::Float64(_) => 9,
78            ConcreteDataType::Binary(_)
79            | ConcreteDataType::Json(_)
80            | ConcreteDataType::Vector(_) => 11,
81            ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
82            ConcreteDataType::Date(_) => 5,
83            ConcreteDataType::Timestamp(_) => 10,
84            ConcreteDataType::Time(_) => 10,
85            ConcreteDataType::Duration(_) => 10,
86            ConcreteDataType::Interval(_) => 18,
87            ConcreteDataType::Decimal128(_) => 19,
88            ConcreteDataType::Null(_)
89            | ConcreteDataType::List(_)
90            | ConcreteDataType::Struct(_)
91            | ConcreteDataType::Dictionary(_) => 0,
92        }
93    }
94
95    /// Serialize a value to the serializer.
96    pub fn serialize(
97        &self,
98        serializer: &mut Serializer<&mut Vec<u8>>,
99        value: &ValueRef,
100    ) -> Result<()> {
101        match self.data_type() {
102            ConcreteDataType::Dictionary(dict_type) => {
103                Self::serialize_by_type(dict_type.value_type(), serializer, value)
104            }
105            data_type => Self::serialize_by_type(data_type, serializer, value),
106        }
107    }
108
109    fn serialize_by_type(
110        data_type: &ConcreteDataType,
111        serializer: &mut Serializer<&mut Vec<u8>>,
112        value: &ValueRef,
113    ) -> Result<()> {
114        macro_rules! cast_value_and_serialize {
115            (
116                $data_type: ident;
117                $serializer: ident;
118                $(
119                    $ty: ident, $f: ident
120                ),*
121            ) => {
122                match $data_type {
123                $(
124                    ConcreteDataType::$ty(_) => {
125                        paste!{
126                            value
127                            .[<as_ $f>]()
128                            .context(FieldTypeMismatchSnafu)?
129                            .serialize($serializer)
130                            .context(SerializeFieldSnafu)?;
131                        }
132                    }
133                )*
134                    ConcreteDataType::Timestamp(_) => {
135                        let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
136                        timestamp
137                            .map(|t|t.value())
138                            .serialize($serializer)
139                            .context(SerializeFieldSnafu)?;
140                    }
141                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
142                        let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
143                        interval.map(|i| i.to_i32())
144                            .serialize($serializer)
145                            .context(SerializeFieldSnafu)?;
146                    }
147                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
148                        let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
149                        interval.map(|i| i.to_i64())
150                            .serialize($serializer)
151                            .context(SerializeFieldSnafu)?;
152                    }
153                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
154                        let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
155                        interval.map(|i| i.to_i128())
156                            .serialize($serializer)
157                            .context(SerializeFieldSnafu)?;
158                    }
159                    ConcreteDataType::List(_) |
160                    ConcreteDataType::Struct(_) |
161                    ConcreteDataType::Dictionary(_) |
162                    ConcreteDataType::Null(_) => {
163                        return error::NotSupportedFieldSnafu {
164                            data_type: $data_type.clone()
165                        }.fail()
166                    }
167                }
168            };
169        }
170        cast_value_and_serialize!(data_type; serializer;
171            Boolean, boolean,
172            Binary, binary,
173            Int8, i8,
174            UInt8, u8,
175            Int16, i16,
176            UInt16, u16,
177            Int32, i32,
178            UInt32, u32,
179            Int64, i64,
180            UInt64, u64,
181            Float32, f32,
182            Float64, f64,
183            String, string,
184            Date, date,
185            Time, time,
186            Duration, duration,
187            Decimal128, decimal128,
188            Json, binary,
189            Vector, binary
190        );
191
192        Ok(())
193    }
194
195    /// Deserialize a value from the deserializer.
196    pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
197        match &self.data_type {
198            ConcreteDataType::Dictionary(dict_type) => {
199                Self::deserialize_by_type(dict_type.value_type(), deserializer)
200            }
201            data_type => Self::deserialize_by_type(data_type, deserializer),
202        }
203    }
204
205    fn deserialize_by_type<B: Buf>(
206        data_type: &ConcreteDataType,
207        deserializer: &mut Deserializer<B>,
208    ) -> Result<Value> {
209        macro_rules! deserialize_and_build_value {
210            (
211                $data_type: ident;
212                $serializer: ident;
213                $(
214                    $ty: ident, $f: ident
215                ),*
216            ) => {
217
218                match $data_type {
219                    $(
220                        ConcreteDataType::$ty(_) => {
221                            Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
222                        }
223                    )*
224                    ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
225                        Option::<Vec<u8>>::deserialize(deserializer)
226                            .context(error::DeserializeFieldSnafu)?
227                            .map(Bytes::from),
228                    )),
229                    ConcreteDataType::Timestamp(ty) => {
230                        let timestamp = Option::<i64>::deserialize(deserializer)
231                            .context(error::DeserializeFieldSnafu)?
232                            .map(|t|ty.create_timestamp(t));
233                        Ok(Value::from(timestamp))
234                    }
235                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
236                        let interval = Option::<i32>::deserialize(deserializer)
237                            .context(error::DeserializeFieldSnafu)?
238                            .map(IntervalYearMonth::from_i32);
239                        Ok(Value::from(interval))
240                    }
241                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
242                        let interval = Option::<i64>::deserialize(deserializer)
243                            .context(error::DeserializeFieldSnafu)?
244                            .map(IntervalDayTime::from_i64);
245                        Ok(Value::from(interval))
246                    }
247                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
248                        let interval = Option::<i128>::deserialize(deserializer)
249                            .context(error::DeserializeFieldSnafu)?
250                            .map(IntervalMonthDayNano::from_i128);
251                        Ok(Value::from(interval))
252                    }
253                    ConcreteDataType::List(l) => NotSupportedFieldSnafu {
254                        data_type: ConcreteDataType::List(l.clone()),
255                    }
256                    .fail(),
257                    ConcreteDataType::Struct(f) => NotSupportedFieldSnafu {
258                        data_type: ConcreteDataType::Struct(f.clone()),
259                    }
260                    .fail(),
261                    ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
262                        data_type: ConcreteDataType::Dictionary(d.clone()),
263                    }
264                    .fail(),
265                    ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
266                        data_type: ConcreteDataType::Null(n.clone()),
267                    }
268                    .fail(),
269                }
270            };
271        }
272        deserialize_and_build_value!(data_type; deserializer;
273            Boolean, bool,
274            Int8, i8,
275            Int16, i16,
276            Int32, i32,
277            Int64, i64,
278            UInt8, u8,
279            UInt16, u16,
280            UInt32, u32,
281            UInt64, u64,
282            Float32, f32,
283            Float64, f64,
284            String, String,
285            Date, Date,
286            Time, Time,
287            Duration, Duration,
288            Decimal128, Decimal128
289        )
290    }
291
292    /// Skip deserializing this field, returns the length of it.
293    pub(crate) fn skip_deserialize(
294        &self,
295        bytes: &[u8],
296        deserializer: &mut Deserializer<&[u8]>,
297    ) -> Result<usize> {
298        let pos = deserializer.position();
299        if bytes[pos] == 0 {
300            deserializer.advance(1);
301            return Ok(1);
302        }
303
304        match &self.data_type {
305            ConcreteDataType::Dictionary(dict_type) => {
306                Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer)
307            }
308            data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer),
309        }
310    }
311
312    fn skip_deserialize_by_type(
313        data_type: &ConcreteDataType,
314        bytes: &[u8],
315        deserializer: &mut Deserializer<&[u8]>,
316    ) -> Result<usize> {
317        let to_skip = match data_type {
318            ConcreteDataType::Boolean(_) => 2,
319            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
320            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
321            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
322            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
323            ConcreteDataType::Float32(_) => 5,
324            ConcreteDataType::Float64(_) => 9,
325            ConcreteDataType::Binary(_)
326            | ConcreteDataType::Json(_)
327            | ConcreteDataType::Vector(_) => {
328                // Now the encoder encode binary as a list of bytes so we can't use
329                // skip bytes.
330                let pos_before = deserializer.position();
331                let mut current = pos_before + 1;
332                while bytes[current] == 1 {
333                    current += 2;
334                }
335                let to_skip = current - pos_before + 1;
336                deserializer.advance(to_skip);
337                return Ok(to_skip);
338            }
339            ConcreteDataType::String(_) => {
340                let pos_before = deserializer.position();
341                deserializer.advance(1);
342                deserializer
343                    .skip_bytes()
344                    .context(error::DeserializeFieldSnafu)?;
345                return Ok(deserializer.position() - pos_before);
346            }
347            ConcreteDataType::Date(_) => 5,
348            ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
349            ConcreteDataType::Time(_) => 10,     // i64 and 1 byte time unit
350            ConcreteDataType::Duration(_) => 10,
351            ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
352            ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
353            ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
354            ConcreteDataType::Decimal128(_) => 19,
355            ConcreteDataType::Null(_)
356            | ConcreteDataType::List(_)
357            | ConcreteDataType::Struct(_)
358            | ConcreteDataType::Dictionary(_) => 0,
359        };
360        deserializer.advance(to_skip);
361        Ok(to_skip)
362    }
363}
364
365impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
366    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
367    where
368        I: Iterator<Item = ValueRef<'a>>,
369    {
370        self.encode_dense(row, buffer)
371    }
372}
373
374/// A memory-comparable row [`Value`] encoder/decoder.
375#[derive(Clone, Debug)]
376pub struct DensePrimaryKeyCodec {
377    /// Primary key fields.
378    ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
379}
380
381impl DensePrimaryKeyCodec {
382    pub fn new(metadata: &RegionMetadata) -> Self {
383        let ordered_primary_key_columns = metadata
384            .primary_key_columns()
385            .map(|c| {
386                (
387                    c.column_id,
388                    SortField::new(c.column_schema.data_type.clone()),
389                )
390            })
391            .collect::<Vec<_>>();
392
393        Self::with_fields(ordered_primary_key_columns)
394    }
395
396    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
397        Self {
398            ordered_primary_key_columns: Arc::new(fields),
399        }
400    }
401
402    fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
403    where
404        I: Iterator<Item = ValueRef<'a>>,
405    {
406        let mut serializer = Serializer::new(buffer);
407        for (idx, value) in row.enumerate() {
408            self.field_at(idx).serialize(&mut serializer, &value)?;
409        }
410        Ok(())
411    }
412
413    /// Decode primary key values from bytes.
414    pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
415        let mut deserializer = Deserializer::new(bytes);
416        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
417        for (column_id, field) in self.ordered_primary_key_columns.iter() {
418            let value = field.deserialize(&mut deserializer)?;
419            values.push((*column_id, value));
420        }
421        Ok(values)
422    }
423
424    /// Decode primary key values from bytes without column id.
425    pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
426        let mut deserializer = Deserializer::new(bytes);
427        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
428        for (_, field) in self.ordered_primary_key_columns.iter() {
429            let value = field.deserialize(&mut deserializer)?;
430            values.push(value);
431        }
432        Ok(values)
433    }
434
435    /// Returns the field at `pos`.
436    ///
437    /// # Panics
438    /// Panics if `pos` is out of bounds.
439    fn field_at(&self, pos: usize) -> &SortField {
440        &self.ordered_primary_key_columns[pos].1
441    }
442
443    /// Decode value at `pos` in `bytes`.
444    ///
445    /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
446    pub fn decode_value_at(
447        &self,
448        bytes: &[u8],
449        pos: usize,
450        offsets_buf: &mut Vec<usize>,
451    ) -> Result<Value> {
452        let mut deserializer = Deserializer::new(bytes);
453        if pos < offsets_buf.len() {
454            // We computed the offset before.
455            let to_skip = offsets_buf[pos];
456            deserializer.advance(to_skip);
457            return self.field_at(pos).deserialize(&mut deserializer);
458        }
459
460        if offsets_buf.is_empty() {
461            let mut offset = 0;
462            // Skip values before `pos`.
463            for i in 0..pos {
464                // Offset to skip before reading value i.
465                offsets_buf.push(offset);
466                let skip = self
467                    .field_at(i)
468                    .skip_deserialize(bytes, &mut deserializer)?;
469                offset += skip;
470            }
471            // Offset to skip before reading this value.
472            offsets_buf.push(offset);
473        } else {
474            // Offsets are not enough.
475            let value_start = offsets_buf.len() - 1;
476            // Advances to decode value at `value_start`.
477            let mut offset = offsets_buf[value_start];
478            deserializer.advance(offset);
479            for i in value_start..pos {
480                // Skip value i.
481                let skip = self
482                    .field_at(i)
483                    .skip_deserialize(bytes, &mut deserializer)?;
484                // Offset for the value at i + 1.
485                offset += skip;
486                offsets_buf.push(offset);
487            }
488        }
489
490        self.field_at(pos).deserialize(&mut deserializer)
491    }
492
493    pub fn estimated_size(&self) -> usize {
494        self.ordered_primary_key_columns
495            .iter()
496            .map(|(_, f)| f.estimated_size())
497            .sum()
498    }
499
500    pub fn num_fields(&self) -> usize {
501        self.ordered_primary_key_columns.len()
502    }
503}
504
505impl PrimaryKeyCodec for DensePrimaryKeyCodec {
506    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
507        self.encode_dense(key_value.primary_keys(), buffer)
508    }
509
510    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
511        self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
512    }
513
514    fn encode_value_refs(
515        &self,
516        values: &[(ColumnId, ValueRef)],
517        buffer: &mut Vec<u8>,
518    ) -> Result<()> {
519        let iter = values.iter().map(|(_, v)| *v);
520        self.encode_dense(iter, buffer)
521    }
522
523    fn estimated_size(&self) -> Option<usize> {
524        Some(self.estimated_size())
525    }
526
527    fn num_fields(&self) -> Option<usize> {
528        Some(self.num_fields())
529    }
530
531    fn encoding(&self) -> PrimaryKeyEncoding {
532        PrimaryKeyEncoding::Dense
533    }
534
535    fn primary_key_filter(
536        &self,
537        metadata: &RegionMetadataRef,
538        filters: Arc<Vec<SimpleFilterEvaluator>>,
539    ) -> Box<dyn PrimaryKeyFilter> {
540        Box::new(DensePrimaryKeyFilter::new(
541            metadata.clone(),
542            filters,
543            self.clone(),
544        ))
545    }
546
547    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
548        Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
549    }
550
551    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
552        // TODO(weny, yinwen): avoid decoding the whole primary key.
553        let mut values = self.decode_dense(bytes)?;
554        Ok(values.pop().map(|(_, v)| v))
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use common_base::bytes::StringBytes;
561    use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
562    use datatypes::value::Value;
563
564    use super::*;
565
566    fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
567        let encoder = DensePrimaryKeyCodec::with_fields(
568            data_types
569                .iter()
570                .map(|t| (0, SortField::new(t.clone())))
571                .collect::<Vec<_>>(),
572        );
573
574        let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
575
576        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
577        let decoded = encoder.decode(&result).unwrap().into_dense();
578        assert_eq!(decoded, row);
579        let mut decoded = Vec::new();
580        let mut offsets = Vec::new();
581        // Iter two times to test offsets buffer.
582        for _ in 0..2 {
583            decoded.clear();
584            for i in 0..data_types.len() {
585                let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
586                decoded.push(value);
587            }
588            assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
589            assert_eq!(decoded, row);
590        }
591    }
592
593    #[test]
594    fn test_memcmp() {
595        let encoder = DensePrimaryKeyCodec::with_fields(vec![
596            (0, SortField::new(ConcreteDataType::string_datatype())),
597            (1, SortField::new(ConcreteDataType::int64_datatype())),
598        ]);
599        let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
600        let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
601        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
602
603        let decoded = encoder.decode(&result).unwrap().into_dense();
604        assert_eq!(&values, &decoded as &[Value]);
605    }
606
607    #[test]
608    fn test_memcmp_timestamp() {
609        check_encode_and_decode(
610            &[
611                ConcreteDataType::timestamp_millisecond_datatype(),
612                ConcreteDataType::int64_datatype(),
613            ],
614            vec![
615                Value::Timestamp(Timestamp::new_millisecond(42)),
616                Value::Int64(43),
617            ],
618        );
619    }
620
621    #[test]
622    fn test_memcmp_duration() {
623        check_encode_and_decode(
624            &[
625                ConcreteDataType::duration_millisecond_datatype(),
626                ConcreteDataType::int64_datatype(),
627            ],
628            vec![
629                Value::Duration(Duration::new_millisecond(44)),
630                Value::Int64(45),
631            ],
632        )
633    }
634
635    #[test]
636    fn test_memcmp_binary() {
637        check_encode_and_decode(
638            &[
639                ConcreteDataType::binary_datatype(),
640                ConcreteDataType::int64_datatype(),
641            ],
642            vec![
643                Value::Binary(Bytes::from("hello".as_bytes())),
644                Value::Int64(43),
645            ],
646        );
647    }
648
649    #[test]
650    fn test_memcmp_string() {
651        check_encode_and_decode(
652            &[ConcreteDataType::string_datatype()],
653            vec![Value::String(StringBytes::from("hello"))],
654        );
655
656        check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
657
658        check_encode_and_decode(
659            &[ConcreteDataType::string_datatype()],
660            vec![Value::String("".into())],
661        );
662        check_encode_and_decode(
663            &[ConcreteDataType::string_datatype()],
664            vec![Value::String("world".into())],
665        );
666    }
667
668    #[test]
669    fn test_encode_null() {
670        check_encode_and_decode(
671            &[
672                ConcreteDataType::string_datatype(),
673                ConcreteDataType::int32_datatype(),
674            ],
675            vec![Value::String(StringBytes::from("abcd")), Value::Null],
676        )
677    }
678
679    #[test]
680    fn test_memcmp_dictionary() {
681        // Test Dictionary<i32, string>
682        check_encode_and_decode(
683            &[ConcreteDataType::dictionary_datatype(
684                ConcreteDataType::int32_datatype(),
685                ConcreteDataType::string_datatype(),
686            )],
687            vec![Value::String("hello".into())],
688        );
689
690        // Test Dictionary<i32, i64>
691        check_encode_and_decode(
692            &[ConcreteDataType::dictionary_datatype(
693                ConcreteDataType::int32_datatype(),
694                ConcreteDataType::int64_datatype(),
695            )],
696            vec![Value::Int64(42)],
697        );
698
699        // Test Dictionary with null value
700        check_encode_and_decode(
701            &[ConcreteDataType::dictionary_datatype(
702                ConcreteDataType::int32_datatype(),
703                ConcreteDataType::string_datatype(),
704            )],
705            vec![Value::Null],
706        );
707
708        // Test multiple Dictionary columns
709        check_encode_and_decode(
710            &[
711                ConcreteDataType::dictionary_datatype(
712                    ConcreteDataType::int32_datatype(),
713                    ConcreteDataType::string_datatype(),
714                ),
715                ConcreteDataType::dictionary_datatype(
716                    ConcreteDataType::int16_datatype(),
717                    ConcreteDataType::int64_datatype(),
718                ),
719            ],
720            vec![Value::String("world".into()), Value::Int64(123)],
721        );
722    }
723
724    #[test]
725    fn test_encode_multiple_rows() {
726        check_encode_and_decode(
727            &[
728                ConcreteDataType::string_datatype(),
729                ConcreteDataType::int64_datatype(),
730                ConcreteDataType::boolean_datatype(),
731            ],
732            vec![
733                Value::String("hello".into()),
734                Value::Int64(42),
735                Value::Boolean(false),
736            ],
737        );
738
739        check_encode_and_decode(
740            &[
741                ConcreteDataType::string_datatype(),
742                ConcreteDataType::int64_datatype(),
743                ConcreteDataType::boolean_datatype(),
744            ],
745            vec![
746                Value::String("world".into()),
747                Value::Int64(43),
748                Value::Boolean(true),
749            ],
750        );
751
752        check_encode_and_decode(
753            &[
754                ConcreteDataType::string_datatype(),
755                ConcreteDataType::int64_datatype(),
756                ConcreteDataType::boolean_datatype(),
757            ],
758            vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
759        );
760
761        // All types.
762        check_encode_and_decode(
763            &[
764                ConcreteDataType::boolean_datatype(),
765                ConcreteDataType::int8_datatype(),
766                ConcreteDataType::uint8_datatype(),
767                ConcreteDataType::int16_datatype(),
768                ConcreteDataType::uint16_datatype(),
769                ConcreteDataType::int32_datatype(),
770                ConcreteDataType::uint32_datatype(),
771                ConcreteDataType::int64_datatype(),
772                ConcreteDataType::uint64_datatype(),
773                ConcreteDataType::float32_datatype(),
774                ConcreteDataType::float64_datatype(),
775                ConcreteDataType::binary_datatype(),
776                ConcreteDataType::string_datatype(),
777                ConcreteDataType::date_datatype(),
778                ConcreteDataType::timestamp_millisecond_datatype(),
779                ConcreteDataType::time_millisecond_datatype(),
780                ConcreteDataType::duration_millisecond_datatype(),
781                ConcreteDataType::interval_year_month_datatype(),
782                ConcreteDataType::interval_day_time_datatype(),
783                ConcreteDataType::interval_month_day_nano_datatype(),
784                ConcreteDataType::decimal128_default_datatype(),
785                ConcreteDataType::vector_datatype(3),
786                ConcreteDataType::dictionary_datatype(
787                    ConcreteDataType::int32_datatype(),
788                    ConcreteDataType::string_datatype(),
789                ),
790            ],
791            vec![
792                Value::Boolean(true),
793                Value::Int8(8),
794                Value::UInt8(8),
795                Value::Int16(16),
796                Value::UInt16(16),
797                Value::Int32(32),
798                Value::UInt32(32),
799                Value::Int64(64),
800                Value::UInt64(64),
801                Value::Float32(1.0.into()),
802                Value::Float64(1.0.into()),
803                Value::Binary(b"hello"[..].into()),
804                Value::String("world".into()),
805                Value::Date(Date::new(10)),
806                Value::Timestamp(Timestamp::new_millisecond(12)),
807                Value::Time(Time::new_millisecond(13)),
808                Value::Duration(Duration::new_millisecond(14)),
809                Value::IntervalYearMonth(IntervalYearMonth::new(1)),
810                Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
811                Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
812                Value::Decimal128(Decimal128::from(16)),
813                Value::Binary(Bytes::from(vec![0; 12])),
814                Value::String("dict_value".into()),
815            ],
816        );
817    }
818}