common_meta/key/
topic_name.rs1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt, ResultExt, ensure};
20
21use crate::ensure_values;
22use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
23use crate::key::txn_helper::TxnOpGetResponseSet;
24use crate::key::{
25 DeserializedValueWithBytes, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX,
26 LEGACY_TOPIC_KEY_PREFIX, MetadataKey, MetadataValue,
27};
28use crate::kv_backend::KvBackendRef;
29use crate::kv_backend::txn::{Txn, TxnOp};
30use crate::rpc::KeyValue;
31use crate::rpc::store::{BatchGetRequest, BatchPutRequest, RangeRequest};
32
33#[derive(Debug, Clone, PartialEq)]
34pub struct TopicNameKey<'a> {
35 pub topic: &'a str,
36}
37
38#[derive(Debug, Serialize, Deserialize, Default, Clone)]
43pub struct TopicNameValue {
44 pub pruned_entry_id: u64,
45}
46
47impl TopicNameValue {
48 pub fn new(pruned_entry_id: u64) -> Self {
49 Self { pruned_entry_id }
50 }
51}
52
53impl MetadataValue for TopicNameValue {
54 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
55 let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
56 Ok(value)
57 }
58
59 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
60 let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
61 Ok(raw_value)
62 }
63}
64
65impl<'a> TopicNameKey<'a> {
66 pub fn new(topic: &'a str) -> Self {
67 Self { topic }
68 }
69
70 pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
71 format!("{}_{}", prefix, id)
72 }
73
74 pub fn range_start_key() -> String {
75 KAFKA_TOPIC_KEY_PREFIX.to_string()
76 }
77}
78
79impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
80 fn to_bytes(&self) -> Vec<u8> {
81 self.to_string().into_bytes()
82 }
83
84 fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
85 let key = std::str::from_utf8(bytes).map_err(|e| {
86 InvalidMetadataSnafu {
87 err_msg: format!(
88 "TopicNameKey '{}' is not a valid UTF8 string: {e}",
89 String::from_utf8_lossy(bytes)
90 ),
91 }
92 .build()
93 })?;
94 TopicNameKey::try_from(key)
95 }
96}
97
98impl Display for TopicNameKey<'_> {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
101 }
102}
103
104impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
105 type Error = Error;
106
107 fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
108 let captures = KAFKA_TOPIC_KEY_PATTERN
109 .captures(value)
110 .context(InvalidMetadataSnafu {
111 err_msg: format!("Invalid topic name key: {}", value),
112 })?;
113
114 Ok(TopicNameKey {
116 topic: captures.get(1).unwrap().as_str(),
117 })
118 }
119}
120
121fn topic_decoder(kv: &KeyValue) -> Result<String> {
123 let key = TopicNameKey::from_bytes(&kv.key)?;
124 Ok(key.topic.to_string())
125}
126
127pub struct TopicNameManager {
128 kv_backend: KvBackendRef,
129}
130
131impl TopicNameManager {
132 pub fn new(kv_backend: KvBackendRef) -> Self {
133 Self { kv_backend }
134 }
135
136 pub async fn update_legacy_topics(&self) -> Result<()> {
138 if let Some(kv) = self
139 .kv_backend
140 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
141 .await?
142 {
143 let topics =
144 serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
145 let mut reqs = Vec::with_capacity(topics.len() + 1);
146 for topic in topics {
147 let topic_name_key = TopicNameKey::new(&topic);
148 let topic_name_value = TopicNameValue::new(0);
149 let put_req = TxnOp::Put(
150 topic_name_key.to_bytes(),
151 topic_name_value.try_as_raw_value()?,
152 );
153 reqs.push(put_req);
154 }
155 let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
156 reqs.push(delete_req);
157 let txn = Txn::new().and_then(reqs);
158 self.kv_backend.txn(txn).await?;
159 }
160 Ok(())
161 }
162
163 pub async fn range(&self) -> Result<Vec<String>> {
166 let prefix = TopicNameKey::range_start_key();
167 let raw_prefix = prefix.as_bytes();
168 let req = RangeRequest::new().with_prefix(raw_prefix);
169 let resp = self.kv_backend.range(req).await?;
170 resp.kvs
171 .iter()
172 .map(topic_decoder)
173 .collect::<Result<Vec<String>>>()
174 }
175
176 pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
178 let mut kvs = Vec::with_capacity(topic_name_keys.len());
179 let topic_name_value = TopicNameValue::new(0);
180 for topic_name_key in &topic_name_keys {
181 let kv = KeyValue {
182 key: topic_name_key.to_bytes(),
183 value: topic_name_value.clone().try_as_raw_value()?,
184 };
185 kvs.push(kv);
186 }
187 let req = BatchPutRequest {
188 kvs,
189 prev_kv: false,
190 };
191 self.kv_backend.batch_put(req).await?;
192 Ok(())
193 }
194
195 pub async fn get(
197 &self,
198 topic: &str,
199 ) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
200 let key = TopicNameKey::new(topic);
201 let raw_key = key.to_bytes();
202 self.kv_backend
203 .get(&raw_key)
204 .await?
205 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
206 .transpose()
207 }
208
209 pub async fn batch_get(
211 &self,
212 topics: Vec<TopicNameKey<'_>>,
213 ) -> Result<HashMap<String, TopicNameValue>> {
214 let raw_keys = topics.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
215 let req = BatchGetRequest { keys: raw_keys };
216 let resp = self.kv_backend.batch_get(req).await?;
217
218 resp.kvs
219 .into_iter()
220 .map(|kv| {
221 let key = TopicNameKey::from_bytes(&kv.key)?;
222 let value = TopicNameValue::try_from_raw_value(&kv.value)?;
223 Ok((key.topic.to_string(), value))
224 })
225 .collect::<Result<HashMap<_, _>>>()
226 }
227
228 pub async fn update(
230 &self,
231 topic: &str,
232 pruned_entry_id: u64,
233 prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
234 ) -> Result<()> {
235 let key = TopicNameKey::new(topic);
236 let raw_key = key.to_bytes();
237 let value = TopicNameValue::new(pruned_entry_id);
238 let new_raw_value = value.try_as_raw_value()?;
239 let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
240
241 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
242 let mut r = self.kv_backend.txn(txn).await?;
243
244 if !r.succeeded {
245 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
246 let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
247 .context(UnexpectedSnafu {
248 err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
249 })?;
250
251 let op_name = "updating TopicNameValue";
252 ensure_values!(raw_value, new_raw_value, op_name);
253 }
254 Ok(())
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use std::assert_matches;
261 use std::sync::Arc;
262
263 use super::*;
264 use crate::kv_backend::KvBackend;
265 use crate::kv_backend::memory::MemoryKvBackend;
266 use crate::rpc::store::PutRequest;
267
268 #[tokio::test]
269 async fn test_topic_name_key_manager() {
270 let kv_backend = Arc::new(MemoryKvBackend::default());
271 let manager = TopicNameManager::new(kv_backend.clone());
272
273 let mut all_topics = (0..16)
274 .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
275 .collect::<Vec<_>>();
276 all_topics.sort();
277 let topic_name_keys = all_topics
278 .iter()
279 .map(|topic| TopicNameKey::new(topic))
280 .collect::<Vec<_>>();
281
282 manager.batch_put(topic_name_keys.clone()).await.unwrap();
283
284 let topics = manager.range().await.unwrap();
285 assert_eq!(topics, all_topics);
286
287 kv_backend
288 .put(PutRequest {
289 key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
290 value: serde_json::to_vec(&all_topics).unwrap(),
291 prev_kv: false,
292 })
293 .await
294 .unwrap();
295 manager.update_legacy_topics().await.unwrap();
296 let res = kv_backend
297 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
298 .await
299 .unwrap();
300 assert!(res.is_none());
301 let topics = manager.range().await.unwrap();
302 assert_eq!(topics, all_topics);
303
304 for topic in &topics {
305 let value = manager.get(topic).await.unwrap().unwrap();
306 assert_eq!(value.pruned_entry_id, 0);
307 manager.update(topic, 1, Some(value.clone())).await.unwrap();
308 let new_value = manager.get(topic).await.unwrap().unwrap();
309 assert_eq!(new_value.pruned_entry_id, 1);
310 manager.update(topic, 1, Some(value.clone())).await.unwrap();
312 let new_value = manager.get(topic).await.unwrap().unwrap();
313 assert_eq!(new_value.pruned_entry_id, 1);
314 let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
316 assert_matches!(err, error::Error::Unexpected { .. });
317 }
318
319 let batch_topics = topics
320 .iter()
321 .take(2)
322 .map(|topic| TopicNameKey::new(topic))
323 .chain(std::iter::once(TopicNameKey::new("missing-topic")))
324 .collect::<Vec<_>>();
325 let values = manager.batch_get(batch_topics).await.unwrap();
326 assert_eq!(values.len(), 2);
327 for topic in topics.iter().take(2) {
328 assert_eq!(values.get(topic).unwrap().pruned_entry_id, 1);
329 }
330 }
331}