log_store/kafka/index/
encoder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Mutex;
17
18use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use store_api::logstore::provider::KafkaProvider;
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::kafka::index::collector::RegionIndexes;
26
27/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`].
28///
29/// This conversion encodes the index values using delta encoding to reduce storage space.
30impl From<&RegionIndexes> for DeltaEncodedRegionIndexes {
31    fn from(value: &RegionIndexes) -> Self {
32        let mut regions = HashMap::with_capacity(value.regions.len());
33        for (region_id, indexes) in value.regions.iter() {
34            let indexes = indexes.iter().copied().deltas().collect();
35            regions.insert(*region_id, indexes);
36        }
37        Self {
38            regions,
39            last_index: value.latest_entry_id,
40        }
41    }
42}
43
44/// Represents the delta-encoded version of region indexes for efficient storage.
45#[derive(Debug, Default, Serialize, Deserialize)]
46pub struct DeltaEncodedRegionIndexes {
47    regions: HashMap<RegionId, Vec<u64>>,
48    last_index: u64,
49}
50
51impl DeltaEncodedRegionIndexes {
52    /// Retrieves the original (decoded) index values for a given region.
53    pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
54        self.regions
55            .get(&region_id)
56            .map(|delta| delta.iter().copied().original().collect::<BTreeSet<_>>())
57    }
58
59    /// Retrieves the last index.
60    pub(crate) fn last_index(&self) -> u64 {
61        self.last_index
62    }
63}
64
65pub trait IndexEncoder: Send + Sync {
66    fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
67
68    fn finish(&self) -> Result<Vec<u8>>;
69}
70
71/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode.
72#[derive(Debug, Default, Serialize, Deserialize)]
73pub(crate) struct DatanodeWalIndexes(HashMap<String, DeltaEncodedRegionIndexes>);
74
75impl DatanodeWalIndexes {
76    fn insert(&mut self, topic: String, region_index: &RegionIndexes) {
77        self.0.insert(topic, region_index.into());
78    }
79
80    fn encode(&mut self) -> Result<Vec<u8>> {
81        let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu);
82        self.0.clear();
83        value
84    }
85
86    pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
87        serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
88    }
89
90    /// Retrieves the delta encoded region indexes for a given `provider`.
91    pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> {
92        self.0.get(&provider.topic)
93    }
94}
95
96/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format.
97#[derive(Debug, Default)]
98pub(crate) struct JsonIndexEncoder {
99    buf: Mutex<DatanodeWalIndexes>,
100}
101
102impl IndexEncoder for JsonIndexEncoder {
103    fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) {
104        self.buf
105            .lock()
106            .unwrap()
107            .insert(provider.topic.to_string(), region_index);
108    }
109
110    fn finish(&self) -> Result<Vec<u8>> {
111        let mut buf = self.buf.lock().unwrap();
112        buf.encode()
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use std::collections::{BTreeSet, HashMap};
119
120    use store_api::logstore::provider::KafkaProvider;
121    use store_api::storage::RegionId;
122
123    use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder};
124    use crate::kafka::index::collector::RegionIndexes;
125
126    #[test]
127    fn test_json_index_encoder() {
128        let encoder = JsonIndexEncoder::default();
129        let topic_1 = KafkaProvider::new("my_topic_1".to_string());
130        let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]);
131        let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]);
132        encoder.encode(
133            &topic_1,
134            &RegionIndexes {
135                regions: HashMap::from([
136                    (RegionId::new(1, 1), region_1_indexes.clone()),
137                    (RegionId::new(1, 2), region_2_indexes.clone()),
138                ]),
139                latest_entry_id: 1024,
140            },
141        );
142        let topic_2 = KafkaProvider::new("my_topic_2".to_string());
143        encoder.encode(
144            &topic_2,
145            &RegionIndexes {
146                regions: HashMap::from([
147                    (
148                        RegionId::new(1, 1),
149                        BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]),
150                    ),
151                    (RegionId::new(1, 2), BTreeSet::from([1512])),
152                ]),
153                latest_entry_id: 2048,
154            },
155        );
156
157        let bytes = encoder.finish().unwrap();
158        let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap();
159        assert_eq!(
160            datanode_index
161                .provider(&topic_1)
162                .unwrap()
163                .region(RegionId::new(1, 1))
164                .unwrap(),
165            region_1_indexes,
166        );
167        assert_eq!(
168            datanode_index
169                .provider(&topic_1)
170                .unwrap()
171                .region(RegionId::new(1, 2))
172                .unwrap(),
173            region_2_indexes,
174        );
175        assert!(
176            datanode_index
177                .provider(&KafkaProvider::new("my_topic_3".to_string()))
178                .is_none()
179        );
180    }
181}