common_meta/wal_options_allocator/
topic_manager.rs1use std::collections::HashSet;
16
17use crate::error::Result;
18use crate::key::topic_name::{TopicNameKey, TopicNameManager};
19use crate::kv_backend::KvBackendRef;
20
21pub struct KafkaTopicManager {
27    topic_name_manager: TopicNameManager,
28}
29
30impl KafkaTopicManager {
31    pub fn new(kv_backend: KvBackendRef) -> Self {
32        Self {
33            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
34        }
35    }
36
37    async fn restore_topics(&self) -> Result<Vec<String>> {
38        self.topic_name_manager.update_legacy_topics().await?;
39        let topics = self.topic_name_manager.range().await?;
40        Ok(topics)
41    }
42
43    pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
45        let existing_topics = self.restore_topics().await?;
46        let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
47        let mut topics_to_create = Vec::with_capacity(all_topics.len());
48        for topic in all_topics {
49            if !existing_topic_set.contains(topic) {
50                topics_to_create.push(topic.clone());
51            }
52        }
53        Ok(topics_to_create)
54    }
55
56    pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
58        self.topic_name_manager
59            .batch_put(
60                topics
61                    .iter()
62                    .map(|topic| TopicNameKey::new(topic))
63                    .collect(),
64            )
65            .await?;
66        Ok(())
67    }
68}
69
70#[cfg(test)]
71impl KafkaTopicManager {
72    pub async fn list_topics(&self) -> Result<Vec<String>> {
74        self.topic_name_manager.range().await
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use std::sync::Arc;
81
82    use super::*;
83    use crate::key::LEGACY_TOPIC_KEY_PREFIX;
84    use crate::kv_backend::memory::MemoryKvBackend;
85    use crate::rpc::store::PutRequest;
86
87    #[tokio::test]
88    async fn test_restore_legacy_persisted_topics() {
89        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
90        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
91
92        let all_topics = (0..16)
93            .map(|i| format!("greptimedb_wal_topic_{}", i))
94            .collect::<Vec<_>>();
95
96        let mut topics_to_be_created = topic_kvbackend_manager
98            .unprepare_topics(&all_topics)
99            .await
100            .unwrap();
101        topics_to_be_created.sort();
102        let mut expected = all_topics.clone();
103        expected.sort();
104        assert_eq!(expected, topics_to_be_created);
105
106        let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
108        let put_req = PutRequest {
109            key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
110            value: topics.as_bytes().to_vec(),
111            prev_kv: true,
112        };
113        let res = kv_backend.put(put_req).await.unwrap();
114        assert!(res.prev_kv.is_none());
115
116        let topics_to_be_created = topic_kvbackend_manager
117            .unprepare_topics(&all_topics)
118            .await
119            .unwrap();
120        assert!(topics_to_be_created.is_empty());
121
122        let legacy_topics = kv_backend
124            .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
125            .await
126            .unwrap();
127        assert!(legacy_topics.is_none());
128
129        let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
131        restored_topics.sort();
132        let mut expected = all_topics.clone();
133        expected.sort();
134        assert_eq!(expected, restored_topics);
135    }
136
137    #[tokio::test]
139    async fn test_restore_persisted_topics() {
140        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
141        let topic_name_prefix = "greptimedb_wal_topic";
142        let num_topics = 16;
143
144        let all_topics = (0..num_topics)
145            .map(|i| format!("{}_{}", topic_name_prefix, i))
146            .collect::<Vec<_>>();
147
148        let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
150
151        let mut topics_to_be_created = topic_kvbackend_manager
152            .unprepare_topics(&all_topics)
153            .await
154            .unwrap();
155        topics_to_be_created.sort();
156        let mut expected = all_topics.clone();
157        expected.sort();
158        assert_eq!(expected, topics_to_be_created);
159
160        topic_kvbackend_manager
162            .persist_prepared_topics(&all_topics)
163            .await
164            .unwrap();
165        let topics_to_be_created = topic_kvbackend_manager
166            .unprepare_topics(&all_topics)
167            .await
168            .unwrap();
169        assert!(topics_to_be_created.is_empty());
170    }
171}