mito2/
row_converter.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod dense;
mod sparse;
use std::fmt::Debug;
use std::sync::Arc;

use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::memtable::key_values::KeyValue;

/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {
    /// Encodes rows to bytes.
    /// # Note
    /// Ensure the length of row iterator matches the length of fields.
    fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
    where
        I: Iterator<Item = ValueRef<'a>>,
    {
        let mut buffer = Vec::new();
        self.encode_to_vec(row, &mut buffer)?;
        Ok(buffer)
    }

    /// Encodes rows to specific vec.
    /// # Note
    /// Ensure the length of row iterator matches the length of fields.
    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
    where
        I: Iterator<Item = ValueRef<'a>>;
}

pub trait PrimaryKeyFilter: Send + Sync {
    /// Returns true if the primary key matches the filter.
    fn matches(&mut self, pk: &[u8]) -> bool;
}

/// Composite values decoded from primary key bytes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
    Dense(Vec<(ColumnId, Value)>),
    Sparse(SparseValues),
}

impl CompositeValues {
    /// Extends the composite values with the given values.
    pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
        match self {
            CompositeValues::Dense(dense_values) => {
                for (column_id, value) in values {
                    dense_values.push((*column_id, value.clone()));
                }
            }
            CompositeValues::Sparse(sprase_value) => {
                for (column_id, value) in values {
                    sprase_value.insert(*column_id, value.clone());
                }
            }
        }
    }
}

#[cfg(test)]
impl CompositeValues {
    pub fn into_sparse(self) -> SparseValues {
        match self {
            CompositeValues::Sparse(v) => v,
            _ => panic!("CompositeValues is not sparse"),
        }
    }

    pub fn into_dense(self) -> Vec<Value> {
        match self {
            CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
            _ => panic!("CompositeValues is not dense"),
        }
    }
}

pub trait PrimaryKeyCodec: Send + Sync + Debug {
    /// Encodes a key value to bytes.
    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;

    /// Encodes values to bytes.
    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;

    /// Encodes values to bytes.
    fn encode_value_refs(
        &self,
        values: &[(ColumnId, ValueRef)],
        buffer: &mut Vec<u8>,
    ) -> Result<()>;

    /// Returns the number of fields in the primary key.
    fn num_fields(&self) -> Option<usize>;

    /// Returns a primary key filter factory.
    fn primary_key_filter(
        &self,
        metadata: &RegionMetadataRef,
        filters: Arc<Vec<SimpleFilterEvaluator>>,
    ) -> Box<dyn PrimaryKeyFilter>;

    /// Returns the estimated size of the primary key.
    fn estimated_size(&self) -> Option<usize> {
        None
    }

    /// Returns the encoding type of the primary key.
    fn encoding(&self) -> PrimaryKeyEncoding;

    /// Decodes the primary key from the given bytes.
    ///
    /// Returns a [`CompositeValues`] that follows the primary key ordering.
    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;

    /// Decode the leftmost value from bytes.
    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}

/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
    let fields = region_metadata.primary_key_columns().map(|col| {
        (
            col.column_id,
            SortField::new(col.column_schema.data_type.clone()),
        )
    });
    build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}

/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec_with_fields(
    encoding: PrimaryKeyEncoding,
    fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
    match encoding {
        PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
        PrimaryKeyEncoding::Sparse => {
            Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
        }
    }
}