common_meta/key/
topic_name.rs1use std::fmt::{self, Display};
16
17use serde::{Deserialize, Serialize};
18use snafu::{ensure, OptionExt, ResultExt};
19
20use crate::ensure_values;
21use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
22use crate::key::txn_helper::TxnOpGetResponseSet;
23use crate::key::{
24 DeserializedValueWithBytes, MetadataKey, MetadataValue, KAFKA_TOPIC_KEY_PATTERN,
25 KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
26};
27use crate::kv_backend::txn::{Txn, TxnOp};
28use crate::kv_backend::KvBackendRef;
29use crate::rpc::store::{BatchPutRequest, RangeRequest};
30use crate::rpc::KeyValue;
31
32#[derive(Debug, Clone, PartialEq)]
33pub struct TopicNameKey<'a> {
34 pub topic: &'a str,
35}
36
37#[derive(Debug, Serialize, Deserialize, Default, Clone)]
42pub struct TopicNameValue {
43 pub pruned_entry_id: u64,
44}
45
46impl TopicNameValue {
47 pub fn new(pruned_entry_id: u64) -> Self {
48 Self { pruned_entry_id }
49 }
50}
51
52impl MetadataValue for TopicNameValue {
53 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
54 let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
55 Ok(value)
56 }
57
58 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
59 let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
60 Ok(raw_value)
61 }
62}
63
64impl<'a> TopicNameKey<'a> {
65 pub fn new(topic: &'a str) -> Self {
66 Self { topic }
67 }
68
69 pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
70 format!("{}_{}", prefix, id)
71 }
72
73 pub fn range_start_key() -> String {
74 KAFKA_TOPIC_KEY_PREFIX.to_string()
75 }
76}
77
78impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
79 fn to_bytes(&self) -> Vec<u8> {
80 self.to_string().into_bytes()
81 }
82
83 fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
84 let key = std::str::from_utf8(bytes).map_err(|e| {
85 InvalidMetadataSnafu {
86 err_msg: format!(
87 "TopicNameKey '{}' is not a valid UTF8 string: {e}",
88 String::from_utf8_lossy(bytes)
89 ),
90 }
91 .build()
92 })?;
93 TopicNameKey::try_from(key)
94 }
95}
96
97impl Display for TopicNameKey<'_> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
100 }
101}
102
103impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
104 type Error = Error;
105
106 fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
107 let captures = KAFKA_TOPIC_KEY_PATTERN
108 .captures(value)
109 .context(InvalidMetadataSnafu {
110 err_msg: format!("Invalid topic name key: {}", value),
111 })?;
112
113 Ok(TopicNameKey {
115 topic: captures.get(1).unwrap().as_str(),
116 })
117 }
118}
119
120fn topic_decoder(kv: &KeyValue) -> Result<String> {
122 let key = TopicNameKey::from_bytes(&kv.key)?;
123 Ok(key.topic.to_string())
124}
125
126pub struct TopicNameManager {
127 kv_backend: KvBackendRef,
128}
129
130impl TopicNameManager {
131 pub fn new(kv_backend: KvBackendRef) -> Self {
132 Self { kv_backend }
133 }
134
135 pub async fn update_legacy_topics(&self) -> Result<()> {
137 if let Some(kv) = self
138 .kv_backend
139 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
140 .await?
141 {
142 let topics =
143 serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
144 let mut reqs = Vec::with_capacity(topics.len() + 1);
145 for topic in topics {
146 let topic_name_key = TopicNameKey::new(&topic);
147 let topic_name_value = TopicNameValue::new(0);
148 let put_req = TxnOp::Put(
149 topic_name_key.to_bytes(),
150 topic_name_value.try_as_raw_value()?,
151 );
152 reqs.push(put_req);
153 }
154 let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
155 reqs.push(delete_req);
156 let txn = Txn::new().and_then(reqs);
157 self.kv_backend.txn(txn).await?;
158 }
159 Ok(())
160 }
161
162 pub async fn range(&self) -> Result<Vec<String>> {
165 let prefix = TopicNameKey::range_start_key();
166 let raw_prefix = prefix.as_bytes();
167 let req = RangeRequest::new().with_prefix(raw_prefix);
168 let resp = self.kv_backend.range(req).await?;
169 resp.kvs
170 .iter()
171 .map(topic_decoder)
172 .collect::<Result<Vec<String>>>()
173 }
174
175 pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
177 let mut kvs = Vec::with_capacity(topic_name_keys.len());
178 let topic_name_value = TopicNameValue::new(0);
179 for topic_name_key in &topic_name_keys {
180 let kv = KeyValue {
181 key: topic_name_key.to_bytes(),
182 value: topic_name_value.clone().try_as_raw_value()?,
183 };
184 kvs.push(kv);
185 }
186 let req = BatchPutRequest {
187 kvs,
188 prev_kv: false,
189 };
190 self.kv_backend.batch_put(req).await?;
191 Ok(())
192 }
193
194 pub async fn get(
196 &self,
197 topic: &str,
198 ) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
199 let key = TopicNameKey::new(topic);
200 let raw_key = key.to_bytes();
201 self.kv_backend
202 .get(&raw_key)
203 .await?
204 .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
205 .transpose()
206 }
207
208 pub async fn update(
210 &self,
211 topic: &str,
212 pruned_entry_id: u64,
213 prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
214 ) -> Result<()> {
215 let key = TopicNameKey::new(topic);
216 let raw_key = key.to_bytes();
217 let value = TopicNameValue::new(pruned_entry_id);
218 let new_raw_value = value.try_as_raw_value()?;
219 let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
220
221 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
222 let mut r = self.kv_backend.txn(txn).await?;
223
224 if !r.succeeded {
225 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
226 let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
227 .context(UnexpectedSnafu {
228 err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
229 })?;
230
231 let op_name = "updating TopicNameValue";
232 ensure_values!(raw_value, new_raw_value, op_name);
233 }
234 Ok(())
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use std::assert_matches::assert_matches;
241 use std::sync::Arc;
242
243 use super::*;
244 use crate::kv_backend::memory::MemoryKvBackend;
245 use crate::kv_backend::KvBackend;
246 use crate::rpc::store::PutRequest;
247
248 #[tokio::test]
249 async fn test_topic_name_key_manager() {
250 let kv_backend = Arc::new(MemoryKvBackend::default());
251 let manager = TopicNameManager::new(kv_backend.clone());
252
253 let mut all_topics = (0..16)
254 .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
255 .collect::<Vec<_>>();
256 all_topics.sort();
257 let topic_name_keys = all_topics
258 .iter()
259 .map(|topic| TopicNameKey::new(topic))
260 .collect::<Vec<_>>();
261
262 manager.batch_put(topic_name_keys.clone()).await.unwrap();
263
264 let topics = manager.range().await.unwrap();
265 assert_eq!(topics, all_topics);
266
267 kv_backend
268 .put(PutRequest {
269 key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
270 value: serde_json::to_vec(&all_topics).unwrap(),
271 prev_kv: false,
272 })
273 .await
274 .unwrap();
275 manager.update_legacy_topics().await.unwrap();
276 let res = kv_backend
277 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
278 .await
279 .unwrap();
280 assert!(res.is_none());
281 let topics = manager.range().await.unwrap();
282 assert_eq!(topics, all_topics);
283
284 for topic in &topics {
285 let value = manager.get(topic).await.unwrap().unwrap();
286 assert_eq!(value.pruned_entry_id, 0);
287 manager.update(topic, 1, Some(value.clone())).await.unwrap();
288 let new_value = manager.get(topic).await.unwrap().unwrap();
289 assert_eq!(new_value.pruned_entry_id, 1);
290 manager.update(topic, 1, Some(value.clone())).await.unwrap();
292 let new_value = manager.get(topic).await.unwrap().unwrap();
293 assert_eq!(new_value.pruned_entry_id, 1);
294 let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
296 assert_matches!(err, error::Error::Unexpected { .. });
297 }
298 }
299}