common_meta/wal_options_allocator/
topic_pool.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Formatter};
16use std::sync::Arc;
17
18use common_telemetry::info;
19use common_wal::config::kafka::MetasrvKafkaConfig;
20use common_wal::TopicSelectorType;
21use snafu::ensure;
22
23use crate::error::{InvalidNumTopicsSnafu, Result};
24use crate::kv_backend::KvBackendRef;
25use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef};
26use crate::wal_options_allocator::topic_creator::KafkaTopicCreator;
27use crate::wal_options_allocator::topic_manager::KafkaTopicManager;
28
29/// Topic pool for kafka remote wal.
30/// Responsible for:
31/// 1. Persists topics in kvbackend.
32/// 2. Creates topics in kafka.
33/// 3. Selects topics for regions.
34pub struct KafkaTopicPool {
35    pub(crate) topics: Vec<String>,
36    // Manages topics in kvbackend.
37    topic_manager: KafkaTopicManager,
38    // Creates topics in kafka.
39    topic_creator: KafkaTopicCreator,
40    pub(crate) selector: TopicSelectorRef,
41    auto_create_topics: bool,
42}
43
44impl fmt::Debug for KafkaTopicPool {
45    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46        f.debug_struct("KafkaTopicPool")
47            .field("topics", &self.topics)
48            .field("auto_create_topics", &self.auto_create_topics)
49            .finish()
50    }
51}
52
53impl KafkaTopicPool {
54    pub fn new(
55        config: &MetasrvKafkaConfig,
56        kvbackend: KvBackendRef,
57        topic_creator: KafkaTopicCreator,
58    ) -> Self {
59        let num_topics = config.kafka_topic.num_topics;
60        let prefix = config.kafka_topic.topic_name_prefix.clone();
61        let topics = (0..num_topics)
62            .map(|i| format!("{}_{}", prefix, i))
63            .collect();
64
65        let selector = match config.kafka_topic.selector_type {
66            TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
67        };
68
69        let topic_manager = KafkaTopicManager::new(kvbackend);
70
71        Self {
72            topics,
73            topic_manager,
74            topic_creator,
75            selector: Arc::new(selector),
76            auto_create_topics: config.auto_create_topics,
77        }
78    }
79
80    /// Tries to activate the topic manager when metasrv becomes the leader.
81    ///
82    /// First tries to restore persisted topics from the kv backend.
83    /// If there are unprepared topics (topics that exist in the configuration but not in the kv backend),
84    /// it will create these topics in Kafka if `auto_create_topics` is enabled.
85    ///
86    /// Then it prepares all unprepared topics by appending a noop record if the topic is empty,
87    /// and persists them in the kv backend for future use.
88    pub async fn activate(&self) -> Result<()> {
89        let num_topics = self.topics.len();
90        ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
91
92        let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
93
94        if !unprepared_topics.is_empty() {
95            if self.auto_create_topics {
96                info!("Creating {} topics", unprepared_topics.len());
97                self.topic_creator.create_topics(&unprepared_topics).await?;
98            } else {
99                info!("Auto create topics is disabled, skipping topic creation.");
100            }
101            self.topic_creator
102                .prepare_topics(&unprepared_topics)
103                .await?;
104            self.topic_manager
105                .persist_prepared_topics(&unprepared_topics)
106                .await?;
107        }
108        info!("Activated topic pool with {} topics", self.topics.len());
109
110        Ok(())
111    }
112
113    /// Selects one topic from the topic pool through the topic selector.
114    pub fn select(&self) -> Result<&String> {
115        self.selector.select(&self.topics)
116    }
117
118    /// Selects a batch of topics from the topic pool through the topic selector.
119    pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
120        (0..num_topics)
121            .map(|_| self.selector.select(&self.topics))
122            .collect()
123    }
124}
125
126#[cfg(test)]
127impl KafkaTopicPool {
128    pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
129        &self.topic_manager
130    }
131
132    pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
133        &self.topic_creator
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::assert_matches::assert_matches;
140
141    use common_wal::maybe_skip_kafka_integration_test;
142    use common_wal::test_util::get_kafka_endpoints;
143
144    use super::*;
145    use crate::error::Error;
146    use crate::test_util::test_kafka_topic_pool;
147    use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
148
149    #[tokio::test]
150    async fn test_pool_invalid_number_topics_err() {
151        common_telemetry::init_default_ut_logging();
152        maybe_skip_kafka_integration_test!();
153        let endpoints = get_kafka_endpoints();
154
155        let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
156        let err = pool.activate().await.unwrap_err();
157        assert_matches!(err, Error::InvalidNumTopics { .. });
158
159        let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
160        let err = pool.activate().await.unwrap_err();
161        assert_matches!(err, Error::InvalidNumTopics { .. });
162    }
163
164    #[tokio::test]
165    async fn test_pool_activate_unknown_topics_err() {
166        common_telemetry::init_default_ut_logging();
167        maybe_skip_kafka_integration_test!();
168        let pool =
169            test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
170        let err = pool.activate().await.unwrap_err();
171        assert_matches!(err, Error::KafkaPartitionClient { .. });
172    }
173
174    #[tokio::test]
175    async fn test_pool_activate() {
176        common_telemetry::init_default_ut_logging();
177        maybe_skip_kafka_integration_test!();
178        let pool =
179            test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
180        // clean up the topics before test
181        let topic_creator = pool.topic_creator();
182        topic_creator.delete_topics(&pool.topics).await.unwrap();
183
184        let topic_manager = pool.topic_manager();
185        pool.activate().await.unwrap();
186        let topics = topic_manager.list_topics().await.unwrap();
187        assert_eq!(topics.len(), 2);
188    }
189
190    #[tokio::test]
191    async fn test_pool_activate_with_existing_topics() {
192        common_telemetry::init_default_ut_logging();
193        maybe_skip_kafka_integration_test!();
194        let prefix = "pool_activate_with_existing_topics";
195        let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
196        let topic_creator = pool.topic_creator();
197        topic_creator.delete_topics(&pool.topics).await.unwrap();
198
199        let topic_manager = pool.topic_manager();
200        // persists one topic info, then pool.activate() will create new topics that not persisted.
201        topic_manager
202            .persist_prepared_topics(&pool.topics[0..1])
203            .await
204            .unwrap();
205
206        pool.activate().await.unwrap();
207        let topics = topic_manager.list_topics().await.unwrap();
208        assert_eq!(topics.len(), 2);
209
210        let client = pool.topic_creator().client();
211        let topics = client
212            .list_topics()
213            .await
214            .unwrap()
215            .into_iter()
216            .filter(|t| t.name.starts_with(prefix))
217            .collect::<Vec<_>>();
218        assert_eq!(topics.len(), 1);
219    }
220
221    /// Tests that the topic manager could allocate topics correctly.
222    #[tokio::test]
223    async fn test_alloc_topics() {
224        common_telemetry::init_default_ut_logging();
225        maybe_skip_kafka_integration_test!();
226        let num_topics = 5;
227        let mut topic_pool = test_kafka_topic_pool(
228            get_kafka_endpoints(),
229            num_topics,
230            true,
231            Some("test_allocator_with_kafka"),
232        )
233        .await;
234        topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
235        let topics = topic_pool.topics.clone();
236        // clean up the topics before test
237        let topic_creator = topic_pool.topic_creator();
238        topic_creator.delete_topics(&topics).await.unwrap();
239
240        // Selects exactly the number of `num_topics` topics one by one.
241        let got = (0..topics.len())
242            .map(|_| topic_pool.select().unwrap())
243            .cloned()
244            .collect::<Vec<_>>();
245        assert_eq!(got, topics);
246
247        // Selects exactly the number of `num_topics` topics in a batching manner.
248        let got = topic_pool
249            .select_batch(topics.len())
250            .unwrap()
251            .into_iter()
252            .map(ToString::to_string)
253            .collect::<Vec<_>>();
254        assert_eq!(got, topics);
255
256        // Selects more than the number of `num_topics` topics.
257        let got = topic_pool
258            .select_batch(2 * topics.len())
259            .unwrap()
260            .into_iter()
261            .map(ToString::to_string)
262            .collect::<Vec<_>>();
263        let expected = vec![topics.clone(); 2]
264            .into_iter()
265            .flatten()
266            .collect::<Vec<_>>();
267        assert_eq!(got, expected);
268    }
269}