common_meta/wal_options_allocator/
topic_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use crate::error::Result;
18use crate::key::topic_name::{TopicNameKey, TopicNameManager};
19use crate::kv_backend::KvBackendRef;
20
21/// Manages topics in kvbackend.
22/// Responsible for:
23/// 1. Restores and persisting topics in kvbackend.
24/// 2. Clears topics in legacy format and restores them in the new format.
25/// 3. Stores and fetches topic-region mapping in kvbackend.
26pub struct KafkaTopicManager {
27    topic_name_manager: TopicNameManager,
28}
29
30impl KafkaTopicManager {
31    pub fn new(kv_backend: KvBackendRef) -> Self {
32        Self {
33            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
34        }
35    }
36
37    async fn restore_topics(&self) -> Result<Vec<String>> {
38        self.topic_name_manager.update_legacy_topics().await?;
39        let topics = self.topic_name_manager.range().await?;
40        Ok(topics)
41    }
42
43    /// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
44    pub async fn get_topics_to_create<'a>(
45        &self,
46        all_topics: &'a [String],
47    ) -> Result<Vec<&'a String>> {
48        let existing_topics = self.restore_topics().await?;
49        let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
50        let mut topics_to_create = Vec::with_capacity(all_topics.len());
51        for topic in all_topics {
52            if !existing_topic_set.contains(topic) {
53                topics_to_create.push(topic);
54            }
55        }
56        Ok(topics_to_create)
57    }
58
59    /// Persists topics into the key-value backend.
60    pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
61        self.topic_name_manager
62            .batch_put(
63                topics
64                    .iter()
65                    .map(|topic| TopicNameKey::new(topic))
66                    .collect(),
67            )
68            .await?;
69        Ok(())
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::sync::Arc;
76
77    use super::*;
78    use crate::key::LEGACY_TOPIC_KEY_PREFIX;
79    use crate::kv_backend::memory::MemoryKvBackend;
80    use crate::rpc::store::PutRequest;
81
82    #[tokio::test]
83    async fn test_restore_legacy_persisted_topics() {
84        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
85        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
86
87        let all_topics = (0..16)
88            .map(|i| format!("greptimedb_wal_topic_{}", i))
89            .collect::<Vec<_>>();
90
91        // No legacy topics.
92        let mut topics_to_be_created = topic_kvbackend_manager
93            .get_topics_to_create(&all_topics)
94            .await
95            .unwrap();
96        topics_to_be_created.sort();
97        let mut expected = all_topics.iter().collect::<Vec<_>>();
98        expected.sort();
99        assert_eq!(expected, topics_to_be_created);
100
101        // A topic pool with 16 topics stored in kvbackend in legacy format.
102        let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
103        let put_req = PutRequest {
104            key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
105            value: topics.as_bytes().to_vec(),
106            prev_kv: true,
107        };
108        let res = kv_backend.put(put_req).await.unwrap();
109        assert!(res.prev_kv.is_none());
110
111        let topics_to_be_created = topic_kvbackend_manager
112            .get_topics_to_create(&all_topics)
113            .await
114            .unwrap();
115        assert!(topics_to_be_created.is_empty());
116
117        // Legacy topics should be deleted after restoring.
118        let legacy_topics = kv_backend
119            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
120            .await
121            .unwrap();
122        assert!(legacy_topics.is_none());
123
124        // Then we can restore it from the new format.
125        let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
126        restored_topics.sort();
127        let mut expected = all_topics.clone();
128        expected.sort();
129        assert_eq!(expected, restored_topics);
130    }
131
132    // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
133    #[tokio::test]
134    async fn test_restore_persisted_topics() {
135        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
136        let topic_name_prefix = "greptimedb_wal_topic";
137        let num_topics = 16;
138
139        let all_topics = (0..num_topics)
140            .map(|i| format!("{}_{}", topic_name_prefix, i))
141            .collect::<Vec<_>>();
142
143        // Constructs mock topics.
144        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
145
146        let mut topics_to_be_created = topic_kvbackend_manager
147            .get_topics_to_create(&all_topics)
148            .await
149            .unwrap();
150        topics_to_be_created.sort();
151        let mut expected = all_topics.iter().collect::<Vec<_>>();
152        expected.sort();
153        assert_eq!(expected, topics_to_be_created);
154
155        // Persists topics to kv backend.
156        topic_kvbackend_manager
157            .persist_topics(&all_topics)
158            .await
159            .unwrap();
160        let topics_to_be_created = topic_kvbackend_manager
161            .get_topics_to_create(&all_topics)
162            .await
163            .unwrap();
164        assert!(topics_to_be_created.is_empty());
165    }
166}