log_store/kafka/index/
encoder.rs1use std::collections::{BTreeSet, HashMap};
16use std::sync::Mutex;
17
18use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21use store_api::logstore::provider::KafkaProvider;
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::kafka::index::collector::RegionIndexes;
26
27impl From<&RegionIndexes> for DeltaEncodedRegionIndexes {
31 fn from(value: &RegionIndexes) -> Self {
32 let mut regions = HashMap::with_capacity(value.regions.len());
33 for (region_id, indexes) in value.regions.iter() {
34 let indexes = indexes.iter().copied().deltas().collect();
35 regions.insert(*region_id, indexes);
36 }
37 Self {
38 regions,
39 last_index: value.latest_entry_id,
40 }
41 }
42}
43
44#[derive(Debug, Default, Serialize, Deserialize)]
46pub struct DeltaEncodedRegionIndexes {
47 regions: HashMap<RegionId, Vec<u64>>,
48 last_index: u64,
49}
50
51impl DeltaEncodedRegionIndexes {
52 pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
54 self.regions
55 .get(®ion_id)
56 .map(|delta| delta.iter().copied().original().collect::<BTreeSet<_>>())
57 }
58
59 pub(crate) fn last_index(&self) -> u64 {
61 self.last_index
62 }
63}
64
65pub trait IndexEncoder: Send + Sync {
66 fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
67
68 fn finish(&self) -> Result<Vec<u8>>;
69}
70
71#[derive(Debug, Default, Serialize, Deserialize)]
73pub(crate) struct DatanodeWalIndexes(HashMap<String, DeltaEncodedRegionIndexes>);
74
75impl DatanodeWalIndexes {
76 fn insert(&mut self, topic: String, region_index: &RegionIndexes) {
77 self.0.insert(topic, region_index.into());
78 }
79
80 fn encode(&mut self) -> Result<Vec<u8>> {
81 let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu);
82 self.0.clear();
83 value
84 }
85
86 pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
87 serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
88 }
89
90 pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> {
92 self.0.get(&provider.topic)
93 }
94}
95
96#[derive(Debug, Default)]
98pub(crate) struct JsonIndexEncoder {
99 buf: Mutex<DatanodeWalIndexes>,
100}
101
102impl IndexEncoder for JsonIndexEncoder {
103 fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) {
104 self.buf
105 .lock()
106 .unwrap()
107 .insert(provider.topic.to_string(), region_index);
108 }
109
110 fn finish(&self) -> Result<Vec<u8>> {
111 let mut buf = self.buf.lock().unwrap();
112 buf.encode()
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use std::collections::{BTreeSet, HashMap};
119
120 use store_api::logstore::provider::KafkaProvider;
121 use store_api::storage::RegionId;
122
123 use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder};
124 use crate::kafka::index::collector::RegionIndexes;
125
126 #[test]
127 fn test_json_index_encoder() {
128 let encoder = JsonIndexEncoder::default();
129 let topic_1 = KafkaProvider::new("my_topic_1".to_string());
130 let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]);
131 let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]);
132 encoder.encode(
133 &topic_1,
134 &RegionIndexes {
135 regions: HashMap::from([
136 (RegionId::new(1, 1), region_1_indexes.clone()),
137 (RegionId::new(1, 2), region_2_indexes.clone()),
138 ]),
139 latest_entry_id: 1024,
140 },
141 );
142 let topic_2 = KafkaProvider::new("my_topic_2".to_string());
143 encoder.encode(
144 &topic_2,
145 &RegionIndexes {
146 regions: HashMap::from([
147 (
148 RegionId::new(1, 1),
149 BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]),
150 ),
151 (RegionId::new(1, 2), BTreeSet::from([1512])),
152 ]),
153 latest_entry_id: 2048,
154 },
155 );
156
157 let bytes = encoder.finish().unwrap();
158 let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap();
159 assert_eq!(
160 datanode_index
161 .provider(&topic_1)
162 .unwrap()
163 .region(RegionId::new(1, 1))
164 .unwrap(),
165 region_1_indexes,
166 );
167 assert_eq!(
168 datanode_index
169 .provider(&topic_1)
170 .unwrap()
171 .region(RegionId::new(1, 2))
172 .unwrap(),
173 region_2_indexes,
174 );
175 assert!(
176 datanode_index
177 .provider(&KafkaProvider::new("my_topic_3".to_string()))
178 .is_none()
179 );
180 }
181}