log_store/kafka/index/
encoder.rs1use std::collections::{BTreeSet, HashMap};
16use std::sync::Mutex;
17
18use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use store_api::logstore::provider::KafkaProvider;
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::kafka::index::collector::RegionIndexes;
26
27impl From<&RegionIndexes> for DeltaEncodedRegionIndexes {
31 fn from(value: &RegionIndexes) -> Self {
32 let mut regions = HashMap::with_capacity(value.regions.len());
33 for (region_id, indexes) in value.regions.iter() {
34 let indexes = indexes.iter().copied().deltas().collect();
35 regions.insert(*region_id, indexes);
36 }
37 Self {
38 regions,
39 last_index: value.latest_entry_id,
40 }
41 }
42}
43
44#[derive(Debug, Default, Serialize, Deserialize)]
46pub struct DeltaEncodedRegionIndexes {
47 regions: HashMap<RegionId, Vec<u64>>,
48 last_index: u64,
49}
50
51impl DeltaEncodedRegionIndexes {
52 pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
54 let decoded = self
55 .regions
56 .get(®ion_id)
57 .map(|delta| delta.iter().copied().original().collect::<BTreeSet<_>>());
58
59 decoded
60 }
61
62 pub(crate) fn last_index(&self) -> u64 {
64 self.last_index
65 }
66}
67
68pub trait IndexEncoder: Send + Sync {
69 fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
70
71 fn finish(&self) -> Result<Vec<u8>>;
72}
73
74#[derive(Debug, Default, Serialize, Deserialize)]
76pub(crate) struct DatanodeWalIndexes(HashMap<String, DeltaEncodedRegionIndexes>);
77
78impl DatanodeWalIndexes {
79 fn insert(&mut self, topic: String, region_index: &RegionIndexes) {
80 self.0.insert(topic, region_index.into());
81 }
82
83 fn encode(&mut self) -> Result<Vec<u8>> {
84 let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu);
85 self.0.clear();
86 value
87 }
88
89 pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
90 serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
91 }
92
93 pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> {
95 self.0.get(&provider.topic)
96 }
97}
98
99#[derive(Debug, Default)]
101pub(crate) struct JsonIndexEncoder {
102 buf: Mutex<DatanodeWalIndexes>,
103}
104
105impl IndexEncoder for JsonIndexEncoder {
106 fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) {
107 self.buf
108 .lock()
109 .unwrap()
110 .insert(provider.topic.to_string(), region_index);
111 }
112
113 fn finish(&self) -> Result<Vec<u8>> {
114 let mut buf = self.buf.lock().unwrap();
115 buf.encode()
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use std::collections::{BTreeSet, HashMap};
122
123 use store_api::logstore::provider::KafkaProvider;
124 use store_api::storage::RegionId;
125
126 use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder};
127 use crate::kafka::index::collector::RegionIndexes;
128
129 #[test]
130 fn test_json_index_encoder() {
131 let encoder = JsonIndexEncoder::default();
132 let topic_1 = KafkaProvider::new("my_topic_1".to_string());
133 let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]);
134 let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]);
135 encoder.encode(
136 &topic_1,
137 &RegionIndexes {
138 regions: HashMap::from([
139 (RegionId::new(1, 1), region_1_indexes.clone()),
140 (RegionId::new(1, 2), region_2_indexes.clone()),
141 ]),
142 latest_entry_id: 1024,
143 },
144 );
145 let topic_2 = KafkaProvider::new("my_topic_2".to_string());
146 encoder.encode(
147 &topic_2,
148 &RegionIndexes {
149 regions: HashMap::from([
150 (
151 RegionId::new(1, 1),
152 BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]),
153 ),
154 (RegionId::new(1, 2), BTreeSet::from([1512])),
155 ]),
156 latest_entry_id: 2048,
157 },
158 );
159
160 let bytes = encoder.finish().unwrap();
161 let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap();
162 assert_eq!(
163 datanode_index
164 .provider(&topic_1)
165 .unwrap()
166 .region(RegionId::new(1, 1))
167 .unwrap(),
168 region_1_indexes,
169 );
170 assert_eq!(
171 datanode_index
172 .provider(&topic_1)
173 .unwrap()
174 .region(RegionId::new(1, 2))
175 .unwrap(),
176 region_2_indexes,
177 );
178 assert!(datanode_index
179 .provider(&KafkaProvider::new("my_topic_3".to_string()))
180 .is_none());
181 }
182}