log_store/kafka/index/
encoder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Mutex;
17
18use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use store_api::logstore::provider::KafkaProvider;
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::kafka::index::collector::RegionIndexes;
26
27/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`].
28///
29/// This conversion encodes the index values using delta encoding to reduce storage space.
30impl From<&RegionIndexes> for DeltaEncodedRegionIndexes {
31    fn from(value: &RegionIndexes) -> Self {
32        let mut regions = HashMap::with_capacity(value.regions.len());
33        for (region_id, indexes) in value.regions.iter() {
34            let indexes = indexes.iter().copied().deltas().collect();
35            regions.insert(*region_id, indexes);
36        }
37        Self {
38            regions,
39            last_index: value.latest_entry_id,
40        }
41    }
42}
43
44/// Represents the delta-encoded version of region indexes for efficient storage.
45#[derive(Debug, Default, Serialize, Deserialize)]
46pub struct DeltaEncodedRegionIndexes {
47    regions: HashMap<RegionId, Vec<u64>>,
48    last_index: u64,
49}
50
51impl DeltaEncodedRegionIndexes {
52    /// Retrieves the original (decoded) index values for a given region.
53    pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
54        let decoded = self
55            .regions
56            .get(&region_id)
57            .map(|delta| delta.iter().copied().original().collect::<BTreeSet<_>>());
58
59        decoded
60    }
61
62    /// Retrieves the last index.
63    pub(crate) fn last_index(&self) -> u64 {
64        self.last_index
65    }
66}
67
68pub trait IndexEncoder: Send + Sync {
69    fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
70
71    fn finish(&self) -> Result<Vec<u8>>;
72}
73
74/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode.
75#[derive(Debug, Default, Serialize, Deserialize)]
76pub(crate) struct DatanodeWalIndexes(HashMap<String, DeltaEncodedRegionIndexes>);
77
78impl DatanodeWalIndexes {
79    fn insert(&mut self, topic: String, region_index: &RegionIndexes) {
80        self.0.insert(topic, region_index.into());
81    }
82
83    fn encode(&mut self) -> Result<Vec<u8>> {
84        let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu);
85        self.0.clear();
86        value
87    }
88
89    pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
90        serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
91    }
92
93    /// Retrieves the delta encoded region indexes for a given `provider`.
94    pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> {
95        self.0.get(&provider.topic)
96    }
97}
98
99/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format.
100#[derive(Debug, Default)]
101pub(crate) struct JsonIndexEncoder {
102    buf: Mutex<DatanodeWalIndexes>,
103}
104
105impl IndexEncoder for JsonIndexEncoder {
106    fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) {
107        self.buf
108            .lock()
109            .unwrap()
110            .insert(provider.topic.to_string(), region_index);
111    }
112
113    fn finish(&self) -> Result<Vec<u8>> {
114        let mut buf = self.buf.lock().unwrap();
115        buf.encode()
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use std::collections::{BTreeSet, HashMap};
122
123    use store_api::logstore::provider::KafkaProvider;
124    use store_api::storage::RegionId;
125
126    use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder};
127    use crate::kafka::index::collector::RegionIndexes;
128
129    #[test]
130    fn test_json_index_encoder() {
131        let encoder = JsonIndexEncoder::default();
132        let topic_1 = KafkaProvider::new("my_topic_1".to_string());
133        let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]);
134        let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]);
135        encoder.encode(
136            &topic_1,
137            &RegionIndexes {
138                regions: HashMap::from([
139                    (RegionId::new(1, 1), region_1_indexes.clone()),
140                    (RegionId::new(1, 2), region_2_indexes.clone()),
141                ]),
142                latest_entry_id: 1024,
143            },
144        );
145        let topic_2 = KafkaProvider::new("my_topic_2".to_string());
146        encoder.encode(
147            &topic_2,
148            &RegionIndexes {
149                regions: HashMap::from([
150                    (
151                        RegionId::new(1, 1),
152                        BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]),
153                    ),
154                    (RegionId::new(1, 2), BTreeSet::from([1512])),
155                ]),
156                latest_entry_id: 2048,
157            },
158        );
159
160        let bytes = encoder.finish().unwrap();
161        let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap();
162        assert_eq!(
163            datanode_index
164                .provider(&topic_1)
165                .unwrap()
166                .region(RegionId::new(1, 1))
167                .unwrap(),
168            region_1_indexes,
169        );
170        assert_eq!(
171            datanode_index
172                .provider(&topic_1)
173                .unwrap()
174                .region(RegionId::new(1, 2))
175                .unwrap(),
176            region_2_indexes,
177        );
178        assert!(datanode_index
179            .provider(&KafkaProvider::new("my_topic_3".to_string()))
180            .is_none());
181    }
182}