common_meta/wal_options_allocator/
topic_pool.rs1use std::fmt::{self, Formatter};
16use std::sync::Arc;
17
18use common_telemetry::info;
19use common_wal::config::kafka::MetasrvKafkaConfig;
20use common_wal::TopicSelectorType;
21use snafu::ensure;
22
23use crate::error::{InvalidNumTopicsSnafu, Result};
24use crate::kv_backend::KvBackendRef;
25use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef};
26use crate::wal_options_allocator::topic_creator::KafkaTopicCreator;
27use crate::wal_options_allocator::topic_manager::KafkaTopicManager;
28
29pub struct KafkaTopicPool {
35 pub(crate) topics: Vec<String>,
36 topic_manager: KafkaTopicManager,
38 topic_creator: KafkaTopicCreator,
40 pub(crate) selector: TopicSelectorRef,
41 auto_create_topics: bool,
42}
43
44impl fmt::Debug for KafkaTopicPool {
45 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46 f.debug_struct("KafkaTopicPool")
47 .field("topics", &self.topics)
48 .field("auto_create_topics", &self.auto_create_topics)
49 .finish()
50 }
51}
52
53impl KafkaTopicPool {
54 pub fn new(
55 config: &MetasrvKafkaConfig,
56 kvbackend: KvBackendRef,
57 topic_creator: KafkaTopicCreator,
58 ) -> Self {
59 let num_topics = config.kafka_topic.num_topics;
60 let prefix = config.kafka_topic.topic_name_prefix.clone();
61 let topics = (0..num_topics)
62 .map(|i| format!("{}_{}", prefix, i))
63 .collect();
64
65 let selector = match config.kafka_topic.selector_type {
66 TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
67 };
68
69 let topic_manager = KafkaTopicManager::new(kvbackend);
70
71 Self {
72 topics,
73 topic_manager,
74 topic_creator,
75 selector: Arc::new(selector),
76 auto_create_topics: config.auto_create_topics,
77 }
78 }
79
80 pub async fn activate(&self) -> Result<()> {
89 let num_topics = self.topics.len();
90 ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
91
92 let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
93
94 if !unprepared_topics.is_empty() {
95 if self.auto_create_topics {
96 info!("Creating {} topics", unprepared_topics.len());
97 self.topic_creator.create_topics(&unprepared_topics).await?;
98 } else {
99 info!("Auto create topics is disabled, skipping topic creation.");
100 }
101 self.topic_creator
102 .prepare_topics(&unprepared_topics)
103 .await?;
104 self.topic_manager
105 .persist_prepared_topics(&unprepared_topics)
106 .await?;
107 }
108 info!("Activated topic pool with {} topics", self.topics.len());
109
110 Ok(())
111 }
112
113 pub fn select(&self) -> Result<&String> {
115 self.selector.select(&self.topics)
116 }
117
118 pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
120 (0..num_topics)
121 .map(|_| self.selector.select(&self.topics))
122 .collect()
123 }
124}
125
126#[cfg(test)]
127impl KafkaTopicPool {
128 pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
129 &self.topic_manager
130 }
131
132 pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
133 &self.topic_creator
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use std::assert_matches::assert_matches;
140
141 use common_wal::maybe_skip_kafka_integration_test;
142 use common_wal::test_util::get_kafka_endpoints;
143
144 use super::*;
145 use crate::error::Error;
146 use crate::test_util::test_kafka_topic_pool;
147 use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
148
149 #[tokio::test]
150 async fn test_pool_invalid_number_topics_err() {
151 common_telemetry::init_default_ut_logging();
152 maybe_skip_kafka_integration_test!();
153 let endpoints = get_kafka_endpoints();
154
155 let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
156 let err = pool.activate().await.unwrap_err();
157 assert_matches!(err, Error::InvalidNumTopics { .. });
158
159 let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
160 let err = pool.activate().await.unwrap_err();
161 assert_matches!(err, Error::InvalidNumTopics { .. });
162 }
163
164 #[tokio::test]
165 async fn test_pool_activate_unknown_topics_err() {
166 common_telemetry::init_default_ut_logging();
167 maybe_skip_kafka_integration_test!();
168 let pool =
169 test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
170 let err = pool.activate().await.unwrap_err();
171 assert_matches!(err, Error::KafkaPartitionClient { .. });
172 }
173
174 #[tokio::test]
175 async fn test_pool_activate() {
176 common_telemetry::init_default_ut_logging();
177 maybe_skip_kafka_integration_test!();
178 let pool =
179 test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
180 let topic_creator = pool.topic_creator();
182 topic_creator.delete_topics(&pool.topics).await.unwrap();
183
184 let topic_manager = pool.topic_manager();
185 pool.activate().await.unwrap();
186 let topics = topic_manager.list_topics().await.unwrap();
187 assert_eq!(topics.len(), 2);
188 }
189
190 #[tokio::test]
191 async fn test_pool_activate_with_existing_topics() {
192 common_telemetry::init_default_ut_logging();
193 maybe_skip_kafka_integration_test!();
194 let prefix = "pool_activate_with_existing_topics";
195 let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
196 let topic_creator = pool.topic_creator();
197 topic_creator.delete_topics(&pool.topics).await.unwrap();
198
199 let topic_manager = pool.topic_manager();
200 topic_manager
202 .persist_prepared_topics(&pool.topics[0..1])
203 .await
204 .unwrap();
205
206 pool.activate().await.unwrap();
207 let topics = topic_manager.list_topics().await.unwrap();
208 assert_eq!(topics.len(), 2);
209
210 let client = pool.topic_creator().client();
211 let topics = client
212 .list_topics()
213 .await
214 .unwrap()
215 .into_iter()
216 .filter(|t| t.name.starts_with(prefix))
217 .collect::<Vec<_>>();
218 assert_eq!(topics.len(), 1);
219 }
220
221 #[tokio::test]
223 async fn test_alloc_topics() {
224 common_telemetry::init_default_ut_logging();
225 maybe_skip_kafka_integration_test!();
226 let num_topics = 5;
227 let mut topic_pool = test_kafka_topic_pool(
228 get_kafka_endpoints(),
229 num_topics,
230 true,
231 Some("test_allocator_with_kafka"),
232 )
233 .await;
234 topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
235 let topics = topic_pool.topics.clone();
236 let topic_creator = topic_pool.topic_creator();
238 topic_creator.delete_topics(&topics).await.unwrap();
239
240 let got = (0..topics.len())
242 .map(|_| topic_pool.select().unwrap())
243 .cloned()
244 .collect::<Vec<_>>();
245 assert_eq!(got, topics);
246
247 let got = topic_pool
249 .select_batch(topics.len())
250 .unwrap()
251 .into_iter()
252 .map(ToString::to_string)
253 .collect::<Vec<_>>();
254 assert_eq!(got, topics);
255
256 let got = topic_pool
258 .select_batch(2 * topics.len())
259 .unwrap()
260 .into_iter()
261 .map(ToString::to_string)
262 .collect::<Vec<_>>();
263 let expected = vec![topics.clone(); 2]
264 .into_iter()
265 .flatten()
266 .collect::<Vec<_>>();
267 assert_eq!(got, expected);
268 }
269}