common_meta/key/
topic_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16
17use serde::{Deserialize, Serialize};
18use snafu::{ensure, OptionExt, ResultExt};
19
20use crate::ensure_values;
21use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
22use crate::key::txn_helper::TxnOpGetResponseSet;
23use crate::key::{
24    DeserializedValueWithBytes, MetadataKey, MetadataValue, KAFKA_TOPIC_KEY_PATTERN,
25    KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
26};
27use crate::kv_backend::txn::{Txn, TxnOp};
28use crate::kv_backend::KvBackendRef;
29use crate::rpc::store::{BatchPutRequest, RangeRequest};
30use crate::rpc::KeyValue;
31
32#[derive(Debug, Clone, PartialEq)]
33pub struct TopicNameKey<'a> {
34    pub topic: &'a str,
35}
36
37/// The value associated with a topic name key.
38///
39/// The `pruned_entry_id` is the highest entry id that has been pruned from the remote WAL.
40/// When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimal available entry id).
41#[derive(Debug, Serialize, Deserialize, Default, Clone)]
42pub struct TopicNameValue {
43    pub pruned_entry_id: u64,
44}
45
46impl TopicNameValue {
47    pub fn new(pruned_entry_id: u64) -> Self {
48        Self { pruned_entry_id }
49    }
50}
51
52impl MetadataValue for TopicNameValue {
53    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
54        let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
55        Ok(value)
56    }
57
58    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
59        let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
60        Ok(raw_value)
61    }
62}
63
64impl<'a> TopicNameKey<'a> {
65    pub fn new(topic: &'a str) -> Self {
66        Self { topic }
67    }
68
69    pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
70        format!("{}_{}", prefix, id)
71    }
72
73    pub fn range_start_key() -> String {
74        KAFKA_TOPIC_KEY_PREFIX.to_string()
75    }
76}
77
78impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
79    fn to_bytes(&self) -> Vec<u8> {
80        self.to_string().into_bytes()
81    }
82
83    fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
84        let key = std::str::from_utf8(bytes).map_err(|e| {
85            InvalidMetadataSnafu {
86                err_msg: format!(
87                    "TopicNameKey '{}' is not a valid UTF8 string: {e}",
88                    String::from_utf8_lossy(bytes)
89                ),
90            }
91            .build()
92        })?;
93        TopicNameKey::try_from(key)
94    }
95}
96
97impl Display for TopicNameKey<'_> {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
100    }
101}
102
103impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
104    type Error = Error;
105
106    fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
107        let captures = KAFKA_TOPIC_KEY_PATTERN
108            .captures(value)
109            .context(InvalidMetadataSnafu {
110                err_msg: format!("Invalid topic name key: {}", value),
111            })?;
112
113        // Safety: pass the regex check above
114        Ok(TopicNameKey {
115            topic: captures.get(1).unwrap().as_str(),
116        })
117    }
118}
119
120/// Convert a key-value pair to a topic name.
121fn topic_decoder(kv: &KeyValue) -> Result<String> {
122    let key = TopicNameKey::from_bytes(&kv.key)?;
123    Ok(key.topic.to_string())
124}
125
126pub struct TopicNameManager {
127    kv_backend: KvBackendRef,
128}
129
130impl TopicNameManager {
131    pub fn new(kv_backend: KvBackendRef) -> Self {
132        Self { kv_backend }
133    }
134
135    /// Update the topics in legacy format to the new format.
136    pub async fn update_legacy_topics(&self) -> Result<()> {
137        if let Some(kv) = self
138            .kv_backend
139            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
140            .await?
141        {
142            let topics =
143                serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
144            let mut reqs = Vec::with_capacity(topics.len() + 1);
145            for topic in topics {
146                let topic_name_key = TopicNameKey::new(&topic);
147                let topic_name_value = TopicNameValue::new(0);
148                let put_req = TxnOp::Put(
149                    topic_name_key.to_bytes(),
150                    topic_name_value.try_as_raw_value()?,
151                );
152                reqs.push(put_req);
153            }
154            let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
155            reqs.push(delete_req);
156            let txn = Txn::new().and_then(reqs);
157            self.kv_backend.txn(txn).await?;
158        }
159        Ok(())
160    }
161
162    /// Range query for topics. Only the keys are returned.
163    /// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
164    pub async fn range(&self) -> Result<Vec<String>> {
165        let prefix = TopicNameKey::range_start_key();
166        let raw_prefix = prefix.as_bytes();
167        let req = RangeRequest::new().with_prefix(raw_prefix);
168        let resp = self.kv_backend.range(req).await?;
169        resp.kvs
170            .iter()
171            .map(topic_decoder)
172            .collect::<Result<Vec<String>>>()
173    }
174
175    /// Put topics into kvbackend. The value is set to 0 by default.
176    pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
177        let mut kvs = Vec::with_capacity(topic_name_keys.len());
178        let topic_name_value = TopicNameValue::new(0);
179        for topic_name_key in &topic_name_keys {
180            let kv = KeyValue {
181                key: topic_name_key.to_bytes(),
182                value: topic_name_value.clone().try_as_raw_value()?,
183            };
184            kvs.push(kv);
185        }
186        let req = BatchPutRequest {
187            kvs,
188            prev_kv: false,
189        };
190        self.kv_backend.batch_put(req).await?;
191        Ok(())
192    }
193
194    /// Get value for a specific topic.
195    pub async fn get(
196        &self,
197        topic: &str,
198    ) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
199        let key = TopicNameKey::new(topic);
200        let raw_key = key.to_bytes();
201        self.kv_backend
202            .get(&raw_key)
203            .await?
204            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
205            .transpose()
206    }
207
208    /// Update the topic name key and value in the kv backend.
209    pub async fn update(
210        &self,
211        topic: &str,
212        pruned_entry_id: u64,
213        prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
214    ) -> Result<()> {
215        let key = TopicNameKey::new(topic);
216        let raw_key = key.to_bytes();
217        let value = TopicNameValue::new(pruned_entry_id);
218        let new_raw_value = value.try_as_raw_value()?;
219        let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
220
221        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
222        let mut r = self.kv_backend.txn(txn).await?;
223
224        if !r.succeeded {
225            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
226            let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
227                .context(UnexpectedSnafu {
228                    err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
229                })?;
230
231            let op_name = "updating TopicNameValue";
232            ensure_values!(raw_value, new_raw_value, op_name);
233        }
234        Ok(())
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use std::assert_matches::assert_matches;
241    use std::sync::Arc;
242
243    use super::*;
244    use crate::kv_backend::memory::MemoryKvBackend;
245    use crate::kv_backend::KvBackend;
246    use crate::rpc::store::PutRequest;
247
248    #[tokio::test]
249    async fn test_topic_name_key_manager() {
250        let kv_backend = Arc::new(MemoryKvBackend::default());
251        let manager = TopicNameManager::new(kv_backend.clone());
252
253        let mut all_topics = (0..16)
254            .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
255            .collect::<Vec<_>>();
256        all_topics.sort();
257        let topic_name_keys = all_topics
258            .iter()
259            .map(|topic| TopicNameKey::new(topic))
260            .collect::<Vec<_>>();
261
262        manager.batch_put(topic_name_keys.clone()).await.unwrap();
263
264        let topics = manager.range().await.unwrap();
265        assert_eq!(topics, all_topics);
266
267        kv_backend
268            .put(PutRequest {
269                key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
270                value: serde_json::to_vec(&all_topics).unwrap(),
271                prev_kv: false,
272            })
273            .await
274            .unwrap();
275        manager.update_legacy_topics().await.unwrap();
276        let res = kv_backend
277            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
278            .await
279            .unwrap();
280        assert!(res.is_none());
281        let topics = manager.range().await.unwrap();
282        assert_eq!(topics, all_topics);
283
284        for topic in &topics {
285            let value = manager.get(topic).await.unwrap().unwrap();
286            assert_eq!(value.pruned_entry_id, 0);
287            manager.update(topic, 1, Some(value.clone())).await.unwrap();
288            let new_value = manager.get(topic).await.unwrap().unwrap();
289            assert_eq!(new_value.pruned_entry_id, 1);
290            // Update twice, nothing changed
291            manager.update(topic, 1, Some(value.clone())).await.unwrap();
292            let new_value = manager.get(topic).await.unwrap().unwrap();
293            assert_eq!(new_value.pruned_entry_id, 1);
294            // Bad cas, emit error
295            let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
296            assert_matches!(err, error::Error::Unexpected { .. });
297        }
298    }
299}