common_meta/wal_options_allocator/
topic_manager.rs1use std::collections::HashSet;
16
17use crate::error::Result;
18use crate::key::topic_name::{TopicNameKey, TopicNameManager};
19use crate::kv_backend::KvBackendRef;
20
21pub struct KafkaTopicManager {
27 topic_name_manager: TopicNameManager,
28}
29
30impl KafkaTopicManager {
31 pub fn new(kv_backend: KvBackendRef) -> Self {
32 Self {
33 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
34 }
35 }
36
37 async fn restore_topics(&self) -> Result<Vec<String>> {
38 self.topic_name_manager.update_legacy_topics().await?;
39 let topics = self.topic_name_manager.range().await?;
40 Ok(topics)
41 }
42
43 pub async fn get_topics_to_create<'a>(
45 &self,
46 all_topics: &'a [String],
47 ) -> Result<Vec<&'a String>> {
48 let existing_topics = self.restore_topics().await?;
49 let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
50 let mut topics_to_create = Vec::with_capacity(all_topics.len());
51 for topic in all_topics {
52 if !existing_topic_set.contains(topic) {
53 topics_to_create.push(topic);
54 }
55 }
56 Ok(topics_to_create)
57 }
58
59 pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
61 self.topic_name_manager
62 .batch_put(
63 topics
64 .iter()
65 .map(|topic| TopicNameKey::new(topic))
66 .collect(),
67 )
68 .await?;
69 Ok(())
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::sync::Arc;
76
77 use super::*;
78 use crate::key::LEGACY_TOPIC_KEY_PREFIX;
79 use crate::kv_backend::memory::MemoryKvBackend;
80 use crate::rpc::store::PutRequest;
81
82 #[tokio::test]
83 async fn test_restore_legacy_persisted_topics() {
84 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
85 let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
86
87 let all_topics = (0..16)
88 .map(|i| format!("greptimedb_wal_topic_{}", i))
89 .collect::<Vec<_>>();
90
91 let mut topics_to_be_created = topic_kvbackend_manager
93 .get_topics_to_create(&all_topics)
94 .await
95 .unwrap();
96 topics_to_be_created.sort();
97 let mut expected = all_topics.iter().collect::<Vec<_>>();
98 expected.sort();
99 assert_eq!(expected, topics_to_be_created);
100
101 let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
103 let put_req = PutRequest {
104 key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
105 value: topics.as_bytes().to_vec(),
106 prev_kv: true,
107 };
108 let res = kv_backend.put(put_req).await.unwrap();
109 assert!(res.prev_kv.is_none());
110
111 let topics_to_be_created = topic_kvbackend_manager
112 .get_topics_to_create(&all_topics)
113 .await
114 .unwrap();
115 assert!(topics_to_be_created.is_empty());
116
117 let legacy_topics = kv_backend
119 .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
120 .await
121 .unwrap();
122 assert!(legacy_topics.is_none());
123
124 let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
126 restored_topics.sort();
127 let mut expected = all_topics.clone();
128 expected.sort();
129 assert_eq!(expected, restored_topics);
130 }
131
132 #[tokio::test]
134 async fn test_restore_persisted_topics() {
135 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
136 let topic_name_prefix = "greptimedb_wal_topic";
137 let num_topics = 16;
138
139 let all_topics = (0..num_topics)
140 .map(|i| format!("{}_{}", topic_name_prefix, i))
141 .collect::<Vec<_>>();
142
143 let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
145
146 let mut topics_to_be_created = topic_kvbackend_manager
147 .get_topics_to_create(&all_topics)
148 .await
149 .unwrap();
150 topics_to_be_created.sort();
151 let mut expected = all_topics.iter().collect::<Vec<_>>();
152 expected.sort();
153 assert_eq!(expected, topics_to_be_created);
154
155 topic_kvbackend_manager
157 .persist_topics(&all_topics)
158 .await
159 .unwrap();
160 let topics_to_be_created = topic_kvbackend_manager
161 .get_topics_to_create(&all_topics)
162 .await
163 .unwrap();
164 assert!(topics_to_be_created.is_empty());
165 }
166}