Skip to main content

common_meta/key/
topic_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt, ResultExt, ensure};
20
21use crate::ensure_values;
22use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
23use crate::key::txn_helper::TxnOpGetResponseSet;
24use crate::key::{
25    DeserializedValueWithBytes, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX,
26    LEGACY_TOPIC_KEY_PREFIX, MetadataKey, MetadataValue,
27};
28use crate::kv_backend::KvBackendRef;
29use crate::kv_backend::txn::{Txn, TxnOp};
30use crate::rpc::KeyValue;
31use crate::rpc::store::{BatchGetRequest, BatchPutRequest, RangeRequest};
32
33#[derive(Debug, Clone, PartialEq)]
34pub struct TopicNameKey<'a> {
35    pub topic: &'a str,
36}
37
38/// The value associated with a topic name key.
39///
40/// The `pruned_entry_id` is the highest entry id that has been pruned from the remote WAL.
41/// When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimal available entry id).
42#[derive(Debug, Serialize, Deserialize, Default, Clone)]
43pub struct TopicNameValue {
44    pub pruned_entry_id: u64,
45}
46
47impl TopicNameValue {
48    pub fn new(pruned_entry_id: u64) -> Self {
49        Self { pruned_entry_id }
50    }
51}
52
53impl MetadataValue for TopicNameValue {
54    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
55        let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
56        Ok(value)
57    }
58
59    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
60        let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
61        Ok(raw_value)
62    }
63}
64
65impl<'a> TopicNameKey<'a> {
66    pub fn new(topic: &'a str) -> Self {
67        Self { topic }
68    }
69
70    pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
71        format!("{}_{}", prefix, id)
72    }
73
74    pub fn range_start_key() -> String {
75        KAFKA_TOPIC_KEY_PREFIX.to_string()
76    }
77}
78
79impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
80    fn to_bytes(&self) -> Vec<u8> {
81        self.to_string().into_bytes()
82    }
83
84    fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
85        let key = std::str::from_utf8(bytes).map_err(|e| {
86            InvalidMetadataSnafu {
87                err_msg: format!(
88                    "TopicNameKey '{}' is not a valid UTF8 string: {e}",
89                    String::from_utf8_lossy(bytes)
90                ),
91            }
92            .build()
93        })?;
94        TopicNameKey::try_from(key)
95    }
96}
97
98impl Display for TopicNameKey<'_> {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
101    }
102}
103
104impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
105    type Error = Error;
106
107    fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
108        let captures = KAFKA_TOPIC_KEY_PATTERN
109            .captures(value)
110            .context(InvalidMetadataSnafu {
111                err_msg: format!("Invalid topic name key: {}", value),
112            })?;
113
114        // Safety: pass the regex check above
115        Ok(TopicNameKey {
116            topic: captures.get(1).unwrap().as_str(),
117        })
118    }
119}
120
121/// Convert a key-value pair to a topic name.
122fn topic_decoder(kv: &KeyValue) -> Result<String> {
123    let key = TopicNameKey::from_bytes(&kv.key)?;
124    Ok(key.topic.to_string())
125}
126
127pub struct TopicNameManager {
128    kv_backend: KvBackendRef,
129}
130
131impl TopicNameManager {
132    pub fn new(kv_backend: KvBackendRef) -> Self {
133        Self { kv_backend }
134    }
135
136    /// Update the topics in legacy format to the new format.
137    pub async fn update_legacy_topics(&self) -> Result<()> {
138        if let Some(kv) = self
139            .kv_backend
140            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
141            .await?
142        {
143            let topics =
144                serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
145            let mut reqs = Vec::with_capacity(topics.len() + 1);
146            for topic in topics {
147                let topic_name_key = TopicNameKey::new(&topic);
148                let topic_name_value = TopicNameValue::new(0);
149                let put_req = TxnOp::Put(
150                    topic_name_key.to_bytes(),
151                    topic_name_value.try_as_raw_value()?,
152                );
153                reqs.push(put_req);
154            }
155            let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
156            reqs.push(delete_req);
157            let txn = Txn::new().and_then(reqs);
158            self.kv_backend.txn(txn).await?;
159        }
160        Ok(())
161    }
162
163    /// Range query for topics. Only the keys are returned.
164    /// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
165    pub async fn range(&self) -> Result<Vec<String>> {
166        let prefix = TopicNameKey::range_start_key();
167        let raw_prefix = prefix.as_bytes();
168        let req = RangeRequest::new().with_prefix(raw_prefix);
169        let resp = self.kv_backend.range(req).await?;
170        resp.kvs
171            .iter()
172            .map(topic_decoder)
173            .collect::<Result<Vec<String>>>()
174    }
175
176    /// Put topics into kvbackend. The value is set to 0 by default.
177    pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
178        let mut kvs = Vec::with_capacity(topic_name_keys.len());
179        let topic_name_value = TopicNameValue::new(0);
180        for topic_name_key in &topic_name_keys {
181            let kv = KeyValue {
182                key: topic_name_key.to_bytes(),
183                value: topic_name_value.clone().try_as_raw_value()?,
184            };
185            kvs.push(kv);
186        }
187        let req = BatchPutRequest {
188            kvs,
189            prev_kv: false,
190        };
191        self.kv_backend.batch_put(req).await?;
192        Ok(())
193    }
194
195    /// Get value for a specific topic.
196    pub async fn get(
197        &self,
198        topic: &str,
199    ) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
200        let key = TopicNameKey::new(topic);
201        let raw_key = key.to_bytes();
202        self.kv_backend
203            .get(&raw_key)
204            .await?
205            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
206            .transpose()
207    }
208
209    /// Batch get values for specific topics.
210    pub async fn batch_get(
211        &self,
212        topics: Vec<TopicNameKey<'_>>,
213    ) -> Result<HashMap<String, TopicNameValue>> {
214        let raw_keys = topics.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
215        let req = BatchGetRequest { keys: raw_keys };
216        let resp = self.kv_backend.batch_get(req).await?;
217
218        resp.kvs
219            .into_iter()
220            .map(|kv| {
221                let key = TopicNameKey::from_bytes(&kv.key)?;
222                let value = TopicNameValue::try_from_raw_value(&kv.value)?;
223                Ok((key.topic.to_string(), value))
224            })
225            .collect::<Result<HashMap<_, _>>>()
226    }
227
228    /// Update the topic name key and value in the kv backend.
229    pub async fn update(
230        &self,
231        topic: &str,
232        pruned_entry_id: u64,
233        prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
234    ) -> Result<()> {
235        let key = TopicNameKey::new(topic);
236        let raw_key = key.to_bytes();
237        let value = TopicNameValue::new(pruned_entry_id);
238        let new_raw_value = value.try_as_raw_value()?;
239        let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
240
241        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
242        let mut r = self.kv_backend.txn(txn).await?;
243
244        if !r.succeeded {
245            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
246            let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
247                .context(UnexpectedSnafu {
248                    err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
249                })?;
250
251            let op_name = "updating TopicNameValue";
252            ensure_values!(raw_value, new_raw_value, op_name);
253        }
254        Ok(())
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::assert_matches;
261    use std::sync::Arc;
262
263    use super::*;
264    use crate::kv_backend::KvBackend;
265    use crate::kv_backend::memory::MemoryKvBackend;
266    use crate::rpc::store::PutRequest;
267
268    #[tokio::test]
269    async fn test_topic_name_key_manager() {
270        let kv_backend = Arc::new(MemoryKvBackend::default());
271        let manager = TopicNameManager::new(kv_backend.clone());
272
273        let mut all_topics = (0..16)
274            .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
275            .collect::<Vec<_>>();
276        all_topics.sort();
277        let topic_name_keys = all_topics
278            .iter()
279            .map(|topic| TopicNameKey::new(topic))
280            .collect::<Vec<_>>();
281
282        manager.batch_put(topic_name_keys.clone()).await.unwrap();
283
284        let topics = manager.range().await.unwrap();
285        assert_eq!(topics, all_topics);
286
287        kv_backend
288            .put(PutRequest {
289                key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
290                value: serde_json::to_vec(&all_topics).unwrap(),
291                prev_kv: false,
292            })
293            .await
294            .unwrap();
295        manager.update_legacy_topics().await.unwrap();
296        let res = kv_backend
297            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
298            .await
299            .unwrap();
300        assert!(res.is_none());
301        let topics = manager.range().await.unwrap();
302        assert_eq!(topics, all_topics);
303
304        for topic in &topics {
305            let value = manager.get(topic).await.unwrap().unwrap();
306            assert_eq!(value.pruned_entry_id, 0);
307            manager.update(topic, 1, Some(value.clone())).await.unwrap();
308            let new_value = manager.get(topic).await.unwrap().unwrap();
309            assert_eq!(new_value.pruned_entry_id, 1);
310            // Update twice, nothing changed
311            manager.update(topic, 1, Some(value.clone())).await.unwrap();
312            let new_value = manager.get(topic).await.unwrap().unwrap();
313            assert_eq!(new_value.pruned_entry_id, 1);
314            // Bad cas, emit error
315            let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
316            assert_matches!(err, error::Error::Unexpected { .. });
317        }
318
319        let batch_topics = topics
320            .iter()
321            .take(2)
322            .map(|topic| TopicNameKey::new(topic))
323            .chain(std::iter::once(TopicNameKey::new("missing-topic")))
324            .collect::<Vec<_>>();
325        let values = manager.batch_get(batch_topics).await.unwrap();
326        assert_eq!(values.len(), 2);
327        for topic in topics.iter().take(2) {
328            assert_eq!(values.get(topic).unwrap().pruned_entry_id, 1);
329        }
330    }
331}