common_meta/wal_options_allocator/
topic_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use crate::error::Result;
18use crate::key::topic_name::{TopicNameKey, TopicNameManager};
19use crate::kv_backend::KvBackendRef;
20
21/// Manages topics in kvbackend.
22/// Responsible for:
23/// 1. Restores and persisting topics in kvbackend.
24/// 2. Clears topics in legacy format and restores them in the new format.
25/// 3. Stores and fetches topic-region mapping in kvbackend.
26pub struct KafkaTopicManager {
27    topic_name_manager: TopicNameManager,
28}
29
30impl KafkaTopicManager {
31    pub fn new(kv_backend: KvBackendRef) -> Self {
32        Self {
33            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
34        }
35    }
36
37    async fn restore_topics(&self) -> Result<Vec<String>> {
38        self.topic_name_manager.update_legacy_topics().await?;
39        let topics = self.topic_name_manager.range().await?;
40        Ok(topics)
41    }
42
43    /// Returns the topics that are not prepared.
44    pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
45        let existing_topics = self.restore_topics().await?;
46        let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
47        let mut topics_to_create = Vec::with_capacity(all_topics.len());
48        for topic in all_topics {
49            if !existing_topic_set.contains(topic) {
50                topics_to_create.push(topic.to_string());
51            }
52        }
53        Ok(topics_to_create)
54    }
55
56    /// Persists prepared topics into the key-value backend.
57    pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
58        self.topic_name_manager
59            .batch_put(
60                topics
61                    .iter()
62                    .map(|topic| TopicNameKey::new(topic))
63                    .collect(),
64            )
65            .await?;
66        Ok(())
67    }
68}
69
70#[cfg(test)]
71impl KafkaTopicManager {
72    /// Lists all topics in the key-value backend.
73    pub async fn list_topics(&self) -> Result<Vec<String>> {
74        self.topic_name_manager.range().await
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use std::sync::Arc;
81
82    use super::*;
83    use crate::key::LEGACY_TOPIC_KEY_PREFIX;
84    use crate::kv_backend::memory::MemoryKvBackend;
85    use crate::rpc::store::PutRequest;
86
87    #[tokio::test]
88    async fn test_restore_legacy_persisted_topics() {
89        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
90        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
91
92        let all_topics = (0..16)
93            .map(|i| format!("greptimedb_wal_topic_{}", i))
94            .collect::<Vec<_>>();
95
96        // No legacy topics.
97        let mut topics_to_be_created = topic_kvbackend_manager
98            .unprepare_topics(&all_topics)
99            .await
100            .unwrap();
101        topics_to_be_created.sort();
102        let mut expected = all_topics.clone();
103        expected.sort();
104        assert_eq!(expected, topics_to_be_created);
105
106        // A topic pool with 16 topics stored in kvbackend in legacy format.
107        let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
108        let put_req = PutRequest {
109            key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
110            value: topics.as_bytes().to_vec(),
111            prev_kv: true,
112        };
113        let res = kv_backend.put(put_req).await.unwrap();
114        assert!(res.prev_kv.is_none());
115
116        let topics_to_be_created = topic_kvbackend_manager
117            .unprepare_topics(&all_topics)
118            .await
119            .unwrap();
120        assert!(topics_to_be_created.is_empty());
121
122        // Legacy topics should be deleted after restoring.
123        let legacy_topics = kv_backend
124            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
125            .await
126            .unwrap();
127        assert!(legacy_topics.is_none());
128
129        // Then we can restore it from the new format.
130        let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
131        restored_topics.sort();
132        let mut expected = all_topics.clone();
133        expected.sort();
134        assert_eq!(expected, restored_topics);
135    }
136
137    // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
138    #[tokio::test]
139    async fn test_restore_persisted_topics() {
140        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
141        let topic_name_prefix = "greptimedb_wal_topic";
142        let num_topics = 16;
143
144        let all_topics = (0..num_topics)
145            .map(|i| format!("{}_{}", topic_name_prefix, i))
146            .collect::<Vec<_>>();
147
148        // Constructs mock topics.
149        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
150
151        let mut topics_to_be_created = topic_kvbackend_manager
152            .unprepare_topics(&all_topics)
153            .await
154            .unwrap();
155        topics_to_be_created.sort();
156        let mut expected = all_topics.clone();
157        expected.sort();
158        assert_eq!(expected, topics_to_be_created);
159
160        // Persists topics to kv backend.
161        topic_kvbackend_manager
162            .persist_prepared_topics(&all_topics)
163            .await
164            .unwrap();
165        let topics_to_be_created = topic_kvbackend_manager
166            .unprepare_topics(&all_topics)
167            .await
168            .unwrap();
169        assert!(topics_to_be_created.is_empty());
170    }
171}