mito_codec/row_converter/
dense.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Buf;
18use common_base::bytes::Bytes;
19use common_decimal::Decimal128;
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use common_time::time::Time;
22use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Value;
25use datatypes::types::IntervalType;
26use datatypes::value::ValueRef;
27use memcomparable::{Deserializer, Serializer};
28use paste::paste;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::{RegionMetadata, RegionMetadataRef};
33use store_api::storage::ColumnId;
34
35use crate::error::{
36    self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
37};
38use crate::key_values::KeyValue;
39use crate::primary_key_filter::DensePrimaryKeyFilter;
40use crate::row_converter::{
41    CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
42};
43
44/// Field to serialize and deserialize value in memcomparable format.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct SortField {
47    data_type: ConcreteDataType,
48}
49
50impl SortField {
51    pub fn new(data_type: ConcreteDataType) -> Self {
52        Self { data_type }
53    }
54
55    /// Returns the data type of the field.
56    pub fn data_type(&self) -> &ConcreteDataType {
57        &self.data_type
58    }
59
60    /// Returns the physical data type to encode of the field.
61    ///
62    /// For example, a dictionary field will be encoded as its value type.
63    pub fn encode_data_type(&self) -> &ConcreteDataType {
64        match &self.data_type {
65            ConcreteDataType::Dictionary(dict_type) => dict_type.value_type(),
66            _ => &self.data_type,
67        }
68    }
69
70    pub fn estimated_size(&self) -> usize {
71        Self::estimated_size_by_type(self.encode_data_type())
72    }
73
74    fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
75        match data_type {
76            ConcreteDataType::Boolean(_) => 2,
77            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
78            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
79            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
80            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
81            ConcreteDataType::Float32(_) => 5,
82            ConcreteDataType::Float64(_) => 9,
83            ConcreteDataType::Binary(_)
84            | ConcreteDataType::Json(_)
85            | ConcreteDataType::Vector(_) => 11,
86            ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
87            ConcreteDataType::Date(_) => 5,
88            ConcreteDataType::Timestamp(_) => 10,
89            ConcreteDataType::Time(_) => 10,
90            ConcreteDataType::Duration(_) => 10,
91            ConcreteDataType::Interval(_) => 18,
92            ConcreteDataType::Decimal128(_) => 19,
93            ConcreteDataType::Null(_)
94            | ConcreteDataType::List(_)
95            | ConcreteDataType::Struct(_)
96            | ConcreteDataType::Dictionary(_) => 0,
97        }
98    }
99
100    /// Serialize a value to the serializer.
101    pub fn serialize(
102        &self,
103        serializer: &mut Serializer<&mut Vec<u8>>,
104        value: &ValueRef,
105    ) -> Result<()> {
106        Self::serialize_by_type(self.encode_data_type(), serializer, value)
107    }
108
109    fn serialize_by_type(
110        data_type: &ConcreteDataType,
111        serializer: &mut Serializer<&mut Vec<u8>>,
112        value: &ValueRef,
113    ) -> Result<()> {
114        macro_rules! cast_value_and_serialize {
115            (
116                $data_type: ident;
117                $serializer: ident;
118                $(
119                    $ty: ident, $f: ident
120                ),*
121            ) => {
122                match $data_type {
123                $(
124                    ConcreteDataType::$ty(_) => {
125                        paste!{
126                            value
127                            .[<try_into_ $f>]()
128                            .context(FieldTypeMismatchSnafu)?
129                            .serialize($serializer)
130                            .context(SerializeFieldSnafu)?;
131                        }
132                    }
133                )*
134                    ConcreteDataType::Timestamp(_) => {
135                        let timestamp = value.try_into_timestamp().context(FieldTypeMismatchSnafu)?;
136                        timestamp
137                            .map(|t|t.value())
138                            .serialize($serializer)
139                            .context(SerializeFieldSnafu)?;
140                    }
141                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
142                        let interval = value.try_into_interval_year_month().context(FieldTypeMismatchSnafu)?;
143                        interval.map(|i| i.to_i32())
144                            .serialize($serializer)
145                            .context(SerializeFieldSnafu)?;
146                    }
147                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
148                        let interval = value.try_into_interval_day_time().context(FieldTypeMismatchSnafu)?;
149                        interval.map(|i| i.to_i64())
150                            .serialize($serializer)
151                            .context(SerializeFieldSnafu)?;
152                    }
153                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
154                        let interval = value.try_into_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
155                        interval.map(|i| i.to_i128())
156                            .serialize($serializer)
157                            .context(SerializeFieldSnafu)?;
158                    }
159                    ConcreteDataType::List(_) |
160                    ConcreteDataType::Struct(_) |
161                    ConcreteDataType::Dictionary(_) |
162                    ConcreteDataType::Null(_) => {
163                        return error::NotSupportedFieldSnafu {
164                            data_type: $data_type.clone()
165                        }.fail()
166                    }
167                }
168            };
169        }
170        cast_value_and_serialize!(data_type; serializer;
171            Boolean, boolean,
172            Binary, binary,
173            Int8, i8,
174            UInt8, u8,
175            Int16, i16,
176            UInt16, u16,
177            Int32, i32,
178            UInt32, u32,
179            Int64, i64,
180            UInt64, u64,
181            Float32, f32,
182            Float64, f64,
183            String, string,
184            Date, date,
185            Time, time,
186            Duration, duration,
187            Decimal128, decimal128,
188            Json, binary,
189            Vector, binary
190        );
191
192        Ok(())
193    }
194
195    /// Deserialize a value from the deserializer.
196    pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
197        Self::deserialize_by_type(self.encode_data_type(), deserializer)
198    }
199
200    fn deserialize_by_type<B: Buf>(
201        data_type: &ConcreteDataType,
202        deserializer: &mut Deserializer<B>,
203    ) -> Result<Value> {
204        macro_rules! deserialize_and_build_value {
205            (
206                $data_type: ident;
207                $serializer: ident;
208                $(
209                    $ty: ident, $f: ident
210                ),*
211            ) => {
212
213                match $data_type {
214                    $(
215                        ConcreteDataType::$ty(_) => {
216                            Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
217                        }
218                    )*
219                    ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
220                        Option::<Vec<u8>>::deserialize(deserializer)
221                            .context(error::DeserializeFieldSnafu)?
222                            .map(Bytes::from),
223                    )),
224                    ConcreteDataType::Timestamp(ty) => {
225                        let timestamp = Option::<i64>::deserialize(deserializer)
226                            .context(error::DeserializeFieldSnafu)?
227                            .map(|t|ty.create_timestamp(t));
228                        Ok(Value::from(timestamp))
229                    }
230                    ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
231                        let interval = Option::<i32>::deserialize(deserializer)
232                            .context(error::DeserializeFieldSnafu)?
233                            .map(IntervalYearMonth::from_i32);
234                        Ok(Value::from(interval))
235                    }
236                    ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
237                        let interval = Option::<i64>::deserialize(deserializer)
238                            .context(error::DeserializeFieldSnafu)?
239                            .map(IntervalDayTime::from_i64);
240                        Ok(Value::from(interval))
241                    }
242                    ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
243                        let interval = Option::<i128>::deserialize(deserializer)
244                            .context(error::DeserializeFieldSnafu)?
245                            .map(IntervalMonthDayNano::from_i128);
246                        Ok(Value::from(interval))
247                    }
248                    ConcreteDataType::List(l) => NotSupportedFieldSnafu {
249                        data_type: ConcreteDataType::List(l.clone()),
250                    }
251                    .fail(),
252                    ConcreteDataType::Struct(f) => NotSupportedFieldSnafu {
253                        data_type: ConcreteDataType::Struct(f.clone()),
254                    }
255                    .fail(),
256                    ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
257                        data_type: ConcreteDataType::Dictionary(d.clone()),
258                    }
259                    .fail(),
260                    ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
261                        data_type: ConcreteDataType::Null(n.clone()),
262                    }
263                    .fail(),
264                }
265            };
266        }
267        deserialize_and_build_value!(data_type; deserializer;
268            Boolean, bool,
269            Int8, i8,
270            Int16, i16,
271            Int32, i32,
272            Int64, i64,
273            UInt8, u8,
274            UInt16, u16,
275            UInt32, u32,
276            UInt64, u64,
277            Float32, f32,
278            Float64, f64,
279            String, String,
280            Date, Date,
281            Time, Time,
282            Duration, Duration,
283            Decimal128, Decimal128
284        )
285    }
286
287    /// Skip deserializing this field, returns the length of it.
288    pub(crate) fn skip_deserialize(
289        &self,
290        bytes: &[u8],
291        deserializer: &mut Deserializer<&[u8]>,
292    ) -> Result<usize> {
293        let pos = deserializer.position();
294        if bytes[pos] == 0 {
295            deserializer.advance(1);
296            return Ok(1);
297        }
298
299        Self::skip_deserialize_by_type(self.encode_data_type(), bytes, deserializer)
300    }
301
302    fn skip_deserialize_by_type(
303        data_type: &ConcreteDataType,
304        bytes: &[u8],
305        deserializer: &mut Deserializer<&[u8]>,
306    ) -> Result<usize> {
307        let to_skip = match data_type {
308            ConcreteDataType::Boolean(_) => 2,
309            ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
310            ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
311            ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
312            ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
313            ConcreteDataType::Float32(_) => 5,
314            ConcreteDataType::Float64(_) => 9,
315            ConcreteDataType::Binary(_)
316            | ConcreteDataType::Json(_)
317            | ConcreteDataType::Vector(_) => {
318                // Now the encoder encode binary as a list of bytes so we can't use
319                // skip bytes.
320                let pos_before = deserializer.position();
321                let mut current = pos_before + 1;
322                while bytes[current] == 1 {
323                    current += 2;
324                }
325                let to_skip = current - pos_before + 1;
326                deserializer.advance(to_skip);
327                return Ok(to_skip);
328            }
329            ConcreteDataType::String(_) => {
330                let pos_before = deserializer.position();
331                deserializer.advance(1);
332                deserializer
333                    .skip_bytes()
334                    .context(error::DeserializeFieldSnafu)?;
335                return Ok(deserializer.position() - pos_before);
336            }
337            ConcreteDataType::Date(_) => 5,
338            ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
339            ConcreteDataType::Time(_) => 10,     // i64 and 1 byte time unit
340            ConcreteDataType::Duration(_) => 10,
341            ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
342            ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
343            ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
344            ConcreteDataType::Decimal128(_) => 19,
345            ConcreteDataType::Null(_)
346            | ConcreteDataType::List(_)
347            | ConcreteDataType::Struct(_)
348            | ConcreteDataType::Dictionary(_) => 0,
349        };
350        deserializer.advance(to_skip);
351        Ok(to_skip)
352    }
353}
354
355impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
356    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
357    where
358        I: Iterator<Item = ValueRef<'a>>,
359    {
360        self.encode_dense(row, buffer)
361    }
362}
363
364/// A memory-comparable row [`Value`] encoder/decoder.
365#[derive(Clone, Debug)]
366pub struct DensePrimaryKeyCodec {
367    /// Primary key fields.
368    ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
369}
370
371impl DensePrimaryKeyCodec {
372    pub fn new(metadata: &RegionMetadata) -> Self {
373        let ordered_primary_key_columns = metadata
374            .primary_key_columns()
375            .map(|c| {
376                (
377                    c.column_id,
378                    SortField::new(c.column_schema.data_type.clone()),
379                )
380            })
381            .collect::<Vec<_>>();
382
383        Self::with_fields(ordered_primary_key_columns)
384    }
385
386    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
387        Self {
388            ordered_primary_key_columns: Arc::new(fields),
389        }
390    }
391
392    fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
393    where
394        I: Iterator<Item = ValueRef<'a>>,
395    {
396        let mut serializer = Serializer::new(buffer);
397        for (idx, value) in row.enumerate() {
398            self.field_at(idx).serialize(&mut serializer, &value)?;
399        }
400        Ok(())
401    }
402
403    /// Decode primary key values from bytes.
404    pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
405        let mut deserializer = Deserializer::new(bytes);
406        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
407        for (column_id, field) in self.ordered_primary_key_columns.iter() {
408            let value = field.deserialize(&mut deserializer)?;
409            values.push((*column_id, value));
410        }
411        Ok(values)
412    }
413
414    /// Decode primary key values from bytes without column id.
415    pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
416        let mut deserializer = Deserializer::new(bytes);
417        let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
418        for (_, field) in self.ordered_primary_key_columns.iter() {
419            let value = field.deserialize(&mut deserializer)?;
420            values.push(value);
421        }
422        Ok(values)
423    }
424
425    /// Returns the field at `pos`.
426    ///
427    /// # Panics
428    /// Panics if `pos` is out of bounds.
429    fn field_at(&self, pos: usize) -> &SortField {
430        &self.ordered_primary_key_columns[pos].1
431    }
432
433    /// Decode value at `pos` in `bytes`.
434    ///
435    /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
436    pub fn decode_value_at(
437        &self,
438        bytes: &[u8],
439        pos: usize,
440        offsets_buf: &mut Vec<usize>,
441    ) -> Result<Value> {
442        let mut deserializer = Deserializer::new(bytes);
443        if pos < offsets_buf.len() {
444            // We computed the offset before.
445            let to_skip = offsets_buf[pos];
446            deserializer.advance(to_skip);
447            return self.field_at(pos).deserialize(&mut deserializer);
448        }
449
450        if offsets_buf.is_empty() {
451            let mut offset = 0;
452            // Skip values before `pos`.
453            for i in 0..pos {
454                // Offset to skip before reading value i.
455                offsets_buf.push(offset);
456                let skip = self
457                    .field_at(i)
458                    .skip_deserialize(bytes, &mut deserializer)?;
459                offset += skip;
460            }
461            // Offset to skip before reading this value.
462            offsets_buf.push(offset);
463        } else {
464            // Offsets are not enough.
465            let value_start = offsets_buf.len() - 1;
466            // Advances to decode value at `value_start`.
467            let mut offset = offsets_buf[value_start];
468            deserializer.advance(offset);
469            for i in value_start..pos {
470                // Skip value i.
471                let skip = self
472                    .field_at(i)
473                    .skip_deserialize(bytes, &mut deserializer)?;
474                // Offset for the value at i + 1.
475                offset += skip;
476                offsets_buf.push(offset);
477            }
478        }
479
480        self.field_at(pos).deserialize(&mut deserializer)
481    }
482
483    pub fn estimated_size(&self) -> usize {
484        self.ordered_primary_key_columns
485            .iter()
486            .map(|(_, f)| f.estimated_size())
487            .sum()
488    }
489
490    pub fn num_fields(&self) -> usize {
491        self.ordered_primary_key_columns.len()
492    }
493}
494
495impl PrimaryKeyCodec for DensePrimaryKeyCodec {
496    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
497        self.encode_dense(key_value.primary_keys(), buffer)
498    }
499
500    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
501        self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
502    }
503
504    fn encode_value_refs(
505        &self,
506        values: &[(ColumnId, ValueRef)],
507        buffer: &mut Vec<u8>,
508    ) -> Result<()> {
509        let iter = values.iter().map(|(_, v)| v.clone());
510        self.encode_dense(iter, buffer)
511    }
512
513    fn estimated_size(&self) -> Option<usize> {
514        Some(self.estimated_size())
515    }
516
517    fn num_fields(&self) -> Option<usize> {
518        Some(self.num_fields())
519    }
520
521    fn encoding(&self) -> PrimaryKeyEncoding {
522        PrimaryKeyEncoding::Dense
523    }
524
525    fn primary_key_filter(
526        &self,
527        metadata: &RegionMetadataRef,
528        filters: Arc<Vec<SimpleFilterEvaluator>>,
529    ) -> Box<dyn PrimaryKeyFilter> {
530        Box::new(DensePrimaryKeyFilter::new(
531            metadata.clone(),
532            filters,
533            self.clone(),
534        ))
535    }
536
537    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
538        Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
539    }
540
541    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
542        // TODO(weny, yinwen): avoid decoding the whole primary key.
543        let mut values = self.decode_dense(bytes)?;
544        Ok(values.pop().map(|(_, v)| v))
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use common_base::bytes::StringBytes;
551    use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
552    use datatypes::value::Value;
553
554    use super::*;
555
556    fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
557        let encoder = DensePrimaryKeyCodec::with_fields(
558            data_types
559                .iter()
560                .map(|t| (0, SortField::new(t.clone())))
561                .collect::<Vec<_>>(),
562        );
563
564        let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
565
566        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
567        let decoded = encoder.decode(&result).unwrap().into_dense();
568        assert_eq!(decoded, row);
569        let mut decoded = Vec::new();
570        let mut offsets = Vec::new();
571        // Iter two times to test offsets buffer.
572        for _ in 0..2 {
573            decoded.clear();
574            for i in 0..data_types.len() {
575                let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
576                decoded.push(value);
577            }
578            assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
579            assert_eq!(decoded, row);
580        }
581    }
582
583    #[test]
584    fn test_memcmp() {
585        let encoder = DensePrimaryKeyCodec::with_fields(vec![
586            (0, SortField::new(ConcreteDataType::string_datatype())),
587            (1, SortField::new(ConcreteDataType::int64_datatype())),
588        ]);
589        let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
590        let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
591        let result = encoder.encode(value_ref.iter().cloned()).unwrap();
592
593        let decoded = encoder.decode(&result).unwrap().into_dense();
594        assert_eq!(&values, &decoded as &[Value]);
595    }
596
597    #[test]
598    fn test_memcmp_timestamp() {
599        check_encode_and_decode(
600            &[
601                ConcreteDataType::timestamp_millisecond_datatype(),
602                ConcreteDataType::int64_datatype(),
603            ],
604            vec![
605                Value::Timestamp(Timestamp::new_millisecond(42)),
606                Value::Int64(43),
607            ],
608        );
609    }
610
611    #[test]
612    fn test_memcmp_duration() {
613        check_encode_and_decode(
614            &[
615                ConcreteDataType::duration_millisecond_datatype(),
616                ConcreteDataType::int64_datatype(),
617            ],
618            vec![
619                Value::Duration(Duration::new_millisecond(44)),
620                Value::Int64(45),
621            ],
622        )
623    }
624
625    #[test]
626    fn test_memcmp_binary() {
627        check_encode_and_decode(
628            &[
629                ConcreteDataType::binary_datatype(),
630                ConcreteDataType::int64_datatype(),
631            ],
632            vec![
633                Value::Binary(Bytes::from("hello".as_bytes())),
634                Value::Int64(43),
635            ],
636        );
637    }
638
639    #[test]
640    fn test_memcmp_string() {
641        check_encode_and_decode(
642            &[ConcreteDataType::string_datatype()],
643            vec![Value::String(StringBytes::from("hello"))],
644        );
645
646        check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
647
648        check_encode_and_decode(
649            &[ConcreteDataType::string_datatype()],
650            vec![Value::String("".into())],
651        );
652        check_encode_and_decode(
653            &[ConcreteDataType::string_datatype()],
654            vec![Value::String("world".into())],
655        );
656    }
657
658    #[test]
659    fn test_encode_null() {
660        check_encode_and_decode(
661            &[
662                ConcreteDataType::string_datatype(),
663                ConcreteDataType::int32_datatype(),
664            ],
665            vec![Value::String(StringBytes::from("abcd")), Value::Null],
666        )
667    }
668
669    #[test]
670    fn test_memcmp_dictionary() {
671        // Test Dictionary<i32, string>
672        check_encode_and_decode(
673            &[ConcreteDataType::dictionary_datatype(
674                ConcreteDataType::int32_datatype(),
675                ConcreteDataType::string_datatype(),
676            )],
677            vec![Value::String("hello".into())],
678        );
679
680        // Test Dictionary<i32, i64>
681        check_encode_and_decode(
682            &[ConcreteDataType::dictionary_datatype(
683                ConcreteDataType::int32_datatype(),
684                ConcreteDataType::int64_datatype(),
685            )],
686            vec![Value::Int64(42)],
687        );
688
689        // Test Dictionary with null value
690        check_encode_and_decode(
691            &[ConcreteDataType::dictionary_datatype(
692                ConcreteDataType::int32_datatype(),
693                ConcreteDataType::string_datatype(),
694            )],
695            vec![Value::Null],
696        );
697
698        // Test multiple Dictionary columns
699        check_encode_and_decode(
700            &[
701                ConcreteDataType::dictionary_datatype(
702                    ConcreteDataType::int32_datatype(),
703                    ConcreteDataType::string_datatype(),
704                ),
705                ConcreteDataType::dictionary_datatype(
706                    ConcreteDataType::int16_datatype(),
707                    ConcreteDataType::int64_datatype(),
708                ),
709            ],
710            vec![Value::String("world".into()), Value::Int64(123)],
711        );
712    }
713
714    #[test]
715    fn test_encode_multiple_rows() {
716        check_encode_and_decode(
717            &[
718                ConcreteDataType::string_datatype(),
719                ConcreteDataType::int64_datatype(),
720                ConcreteDataType::boolean_datatype(),
721            ],
722            vec![
723                Value::String("hello".into()),
724                Value::Int64(42),
725                Value::Boolean(false),
726            ],
727        );
728
729        check_encode_and_decode(
730            &[
731                ConcreteDataType::string_datatype(),
732                ConcreteDataType::int64_datatype(),
733                ConcreteDataType::boolean_datatype(),
734            ],
735            vec![
736                Value::String("world".into()),
737                Value::Int64(43),
738                Value::Boolean(true),
739            ],
740        );
741
742        check_encode_and_decode(
743            &[
744                ConcreteDataType::string_datatype(),
745                ConcreteDataType::int64_datatype(),
746                ConcreteDataType::boolean_datatype(),
747            ],
748            vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
749        );
750
751        // All types.
752        check_encode_and_decode(
753            &[
754                ConcreteDataType::boolean_datatype(),
755                ConcreteDataType::int8_datatype(),
756                ConcreteDataType::uint8_datatype(),
757                ConcreteDataType::int16_datatype(),
758                ConcreteDataType::uint16_datatype(),
759                ConcreteDataType::int32_datatype(),
760                ConcreteDataType::uint32_datatype(),
761                ConcreteDataType::int64_datatype(),
762                ConcreteDataType::uint64_datatype(),
763                ConcreteDataType::float32_datatype(),
764                ConcreteDataType::float64_datatype(),
765                ConcreteDataType::binary_datatype(),
766                ConcreteDataType::string_datatype(),
767                ConcreteDataType::date_datatype(),
768                ConcreteDataType::timestamp_millisecond_datatype(),
769                ConcreteDataType::time_millisecond_datatype(),
770                ConcreteDataType::duration_millisecond_datatype(),
771                ConcreteDataType::interval_year_month_datatype(),
772                ConcreteDataType::interval_day_time_datatype(),
773                ConcreteDataType::interval_month_day_nano_datatype(),
774                ConcreteDataType::decimal128_default_datatype(),
775                ConcreteDataType::vector_datatype(3),
776                ConcreteDataType::dictionary_datatype(
777                    ConcreteDataType::int32_datatype(),
778                    ConcreteDataType::string_datatype(),
779                ),
780            ],
781            vec![
782                Value::Boolean(true),
783                Value::Int8(8),
784                Value::UInt8(8),
785                Value::Int16(16),
786                Value::UInt16(16),
787                Value::Int32(32),
788                Value::UInt32(32),
789                Value::Int64(64),
790                Value::UInt64(64),
791                Value::Float32(1.0.into()),
792                Value::Float64(1.0.into()),
793                Value::Binary(b"hello"[..].into()),
794                Value::String("world".into()),
795                Value::Date(Date::new(10)),
796                Value::Timestamp(Timestamp::new_millisecond(12)),
797                Value::Time(Time::new_millisecond(13)),
798                Value::Duration(Duration::new_millisecond(14)),
799                Value::IntervalYearMonth(IntervalYearMonth::new(1)),
800                Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
801                Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
802                Value::Decimal128(Decimal128::from(16)),
803                Value::Binary(Bytes::from(vec![0; 12])),
804                Value::String("dict_value".into()),
805            ],
806        );
807    }
808}