common_meta/wal_options_allocator/
topic_manager.rs1use std::collections::HashSet;
16
17use crate::error::Result;
18use crate::key::topic_name::{TopicNameKey, TopicNameManager};
19use crate::kv_backend::KvBackendRef;
20
21pub struct KafkaTopicManager {
27 topic_name_manager: TopicNameManager,
28}
29
30impl KafkaTopicManager {
31 pub fn new(kv_backend: KvBackendRef) -> Self {
32 Self {
33 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
34 }
35 }
36
37 async fn restore_topics(&self) -> Result<Vec<String>> {
38 self.topic_name_manager.update_legacy_topics().await?;
39 let topics = self.topic_name_manager.range().await?;
40 Ok(topics)
41 }
42
43 pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
45 let existing_topics = self.restore_topics().await?;
46 let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
47 let mut topics_to_create = Vec::with_capacity(all_topics.len());
48 for topic in all_topics {
49 if !existing_topic_set.contains(topic) {
50 topics_to_create.push(topic.to_string());
51 }
52 }
53 Ok(topics_to_create)
54 }
55
56 pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
58 self.topic_name_manager
59 .batch_put(
60 topics
61 .iter()
62 .map(|topic| TopicNameKey::new(topic))
63 .collect(),
64 )
65 .await?;
66 Ok(())
67 }
68}
69
70#[cfg(test)]
71impl KafkaTopicManager {
72 pub async fn list_topics(&self) -> Result<Vec<String>> {
74 self.topic_name_manager.range().await
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use std::sync::Arc;
81
82 use super::*;
83 use crate::key::LEGACY_TOPIC_KEY_PREFIX;
84 use crate::kv_backend::memory::MemoryKvBackend;
85 use crate::rpc::store::PutRequest;
86
87 #[tokio::test]
88 async fn test_restore_legacy_persisted_topics() {
89 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
90 let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
91
92 let all_topics = (0..16)
93 .map(|i| format!("greptimedb_wal_topic_{}", i))
94 .collect::<Vec<_>>();
95
96 let mut topics_to_be_created = topic_kvbackend_manager
98 .unprepare_topics(&all_topics)
99 .await
100 .unwrap();
101 topics_to_be_created.sort();
102 let mut expected = all_topics.clone();
103 expected.sort();
104 assert_eq!(expected, topics_to_be_created);
105
106 let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
108 let put_req = PutRequest {
109 key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
110 value: topics.as_bytes().to_vec(),
111 prev_kv: true,
112 };
113 let res = kv_backend.put(put_req).await.unwrap();
114 assert!(res.prev_kv.is_none());
115
116 let topics_to_be_created = topic_kvbackend_manager
117 .unprepare_topics(&all_topics)
118 .await
119 .unwrap();
120 assert!(topics_to_be_created.is_empty());
121
122 let legacy_topics = kv_backend
124 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
125 .await
126 .unwrap();
127 assert!(legacy_topics.is_none());
128
129 let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
131 restored_topics.sort();
132 let mut expected = all_topics.clone();
133 expected.sort();
134 assert_eq!(expected, restored_topics);
135 }
136
137 #[tokio::test]
139 async fn test_restore_persisted_topics() {
140 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
141 let topic_name_prefix = "greptimedb_wal_topic";
142 let num_topics = 16;
143
144 let all_topics = (0..num_topics)
145 .map(|i| format!("{}_{}", topic_name_prefix, i))
146 .collect::<Vec<_>>();
147
148 let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
150
151 let mut topics_to_be_created = topic_kvbackend_manager
152 .unprepare_topics(&all_topics)
153 .await
154 .unwrap();
155 topics_to_be_created.sort();
156 let mut expected = all_topics.clone();
157 expected.sort();
158 assert_eq!(expected, topics_to_be_created);
159
160 topic_kvbackend_manager
162 .persist_prepared_topics(&all_topics)
163 .await
164 .unwrap();
165 let topics_to_be_created = topic_kvbackend_manager
166 .unprepare_topics(&all_topics)
167 .await
168 .unwrap();
169 assert!(topics_to_be_created.is_empty());
170 }
171}