common_meta/
wal_provider.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_wal::config::MetasrvWalConfig;
25use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
26use snafu::{ResultExt, ensure};
27use store_api::storage::{RegionId, RegionNumber};
28
29use crate::ddl::allocator::wal_options::WalOptionsAllocator;
30use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
31use crate::key::TOPIC_NAME_PATTERN_REGEX;
32use crate::kv_backend::KvBackendRef;
33use crate::leadership_notifier::LeadershipChangeListener;
34pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
35use crate::wal_provider::topic_pool::KafkaTopicPool;
36
37/// Provides wal options in region granularity.
38#[derive(Default, Debug)]
39pub enum WalProvider {
40    #[default]
41    RaftEngine,
42    Kafka(KafkaTopicPool),
43}
44
45/// Arc wrapper of WalProvider.
46pub type WalProviderRef = Arc<WalProvider>;
47
48#[async_trait::async_trait]
49impl WalOptionsAllocator for WalProvider {
50    async fn allocate(
51        &self,
52        region_numbers: &[RegionNumber],
53        skip_wal: bool,
54    ) -> Result<HashMap<RegionNumber, String>> {
55        let wal_options = self
56            .alloc_batch(region_numbers.len(), skip_wal)?
57            .into_iter()
58            .map(|wal_options| {
59                serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
60            })
61            .collect::<Result<Vec<_>>>()?;
62
63        Ok(region_numbers.iter().copied().zip(wal_options).collect())
64    }
65}
66
67impl WalProvider {
68    /// Tries to start the provider.
69    pub async fn start(&self) -> Result<()> {
70        match self {
71            Self::RaftEngine => Ok(()),
72            Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
73        }
74    }
75
76    /// Allocates a batch of wal options where each wal options goes to a region.
77    /// If skip_wal is true, the wal options will be set to Noop regardless of the provider type.
78    pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
79        if skip_wal {
80            return Ok(vec![WalOptions::Noop; num_regions]);
81        }
82        match self {
83            WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
84            WalProvider::Kafka(topic_manager) => {
85                let options_batch = topic_manager
86                    .select_batch(num_regions)?
87                    .into_iter()
88                    .map(|topic| {
89                        WalOptions::Kafka(KafkaWalOptions {
90                            topic: topic.clone(),
91                        })
92                    })
93                    .collect();
94                Ok(options_batch)
95            }
96        }
97    }
98
99    /// Returns true if it's the remote WAL.
100    pub fn is_remote_wal(&self) -> bool {
101        matches!(&self, WalProvider::Kafka(_))
102    }
103}
104
105#[async_trait]
106impl LeadershipChangeListener for WalProvider {
107    fn name(&self) -> &str {
108        "WalProvider"
109    }
110
111    async fn on_leader_start(&self) -> Result<()> {
112        self.start().await
113    }
114
115    async fn on_leader_stop(&self) -> Result<()> {
116        Ok(())
117    }
118}
119
120/// Builds a wal provider based on the given configuration.
121pub async fn build_wal_provider(
122    config: &MetasrvWalConfig,
123    kv_backend: KvBackendRef,
124) -> Result<WalProvider> {
125    match config {
126        MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
127        MetasrvWalConfig::Kafka(kafka_config) => {
128            let prefix = &kafka_config.kafka_topic.topic_name_prefix;
129            ensure!(
130                TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
131                InvalidTopicNamePrefixSnafu { prefix }
132            );
133            let topic_creator =
134                build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
135                    .await?;
136            let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
137            Ok(WalProvider::Kafka(topic_pool))
138        }
139    }
140}
141
142/// Inserts wal options into options.
143pub fn prepare_wal_options(
144    options: &mut HashMap<String, String>,
145    region_id: RegionId,
146    region_wal_options: &HashMap<RegionNumber, String>,
147) {
148    if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
149        options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
150    }
151}
152
153/// Extracts the topic from the wal options.
154pub fn extract_topic_from_wal_options(
155    region_id: RegionId,
156    region_options: &HashMap<RegionNumber, String>,
157) -> Option<String> {
158    region_options
159        .get(&region_id.region_number())
160        .and_then(|wal_options| {
161            serde_json::from_str::<WalOptions>(wal_options)
162                .ok()
163                .and_then(|wal_options| {
164                    if let WalOptions::Kafka(kafka_wal_option) = wal_options {
165                        Some(kafka_wal_option.topic)
166                    } else {
167                        None
168                    }
169                })
170        })
171}
172
173#[cfg(test)]
174mod tests {
175    use std::assert_matches::assert_matches;
176
177    use common_wal::config::kafka::MetasrvKafkaConfig;
178    use common_wal::config::kafka::common::KafkaTopicConfig;
179    use common_wal::maybe_skip_kafka_integration_test;
180    use common_wal::test_util::get_kafka_endpoints;
181
182    use super::*;
183    use crate::error::Error;
184    use crate::kv_backend::memory::MemoryKvBackend;
185    use crate::test_util::test_kafka_topic_pool;
186    use crate::wal_provider::selector::RoundRobinTopicSelector;
187
188    // Tests that the wal provider could successfully allocate raft-engine wal options.
189    #[tokio::test]
190    async fn test_provider_with_raft_engine() {
191        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
192        let wal_config = MetasrvWalConfig::RaftEngine;
193        let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
194        provider.start().await.unwrap();
195
196        let num_regions = 32;
197        let regions = (0..num_regions).collect::<Vec<_>>();
198        let got = provider.allocate(&regions, false).await.unwrap();
199
200        let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
201        let expected = regions
202            .into_iter()
203            .zip(vec![encoded_wal_options; num_regions as usize])
204            .collect();
205        assert_eq!(got, expected);
206    }
207
208    #[tokio::test]
209    async fn test_refuse_invalid_topic_name_prefix() {
210        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
211        let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
212            kafka_topic: KafkaTopicConfig {
213                topic_name_prefix: "``````".to_string(),
214                ..Default::default()
215            },
216            ..Default::default()
217        });
218        let got = build_wal_provider(&wal_config, kv_backend)
219            .await
220            .unwrap_err();
221        assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
222    }
223
224    #[tokio::test]
225    async fn test_provider_with_kafka_allocate_wal_options() {
226        common_telemetry::init_default_ut_logging();
227        maybe_skip_kafka_integration_test!();
228        let num_topics = 5;
229        let mut topic_pool = test_kafka_topic_pool(
230            get_kafka_endpoints(),
231            num_topics,
232            true,
233            Some("test_allocator_with_kafka"),
234        )
235        .await;
236        topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
237        let topics = topic_pool.topics.clone();
238        // clean up the topics before test
239        let topic_creator = topic_pool.topic_creator();
240        topic_creator.delete_topics(&topics).await.unwrap();
241
242        // Creates an options provider.
243        let provider = WalProvider::Kafka(topic_pool);
244        provider.start().await.unwrap();
245
246        let num_regions = 3;
247        let regions = (0..num_regions).collect::<Vec<_>>();
248        let got = provider.allocate(&regions, false).await.unwrap();
249
250        // Check the allocated wal options contain the expected topics.
251        let expected = (0..num_regions)
252            .map(|i| {
253                let options = WalOptions::Kafka(KafkaWalOptions {
254                    topic: topics[i as usize].clone(),
255                });
256                (i, serde_json::to_string(&options).unwrap())
257            })
258            .collect::<HashMap<_, _>>();
259        assert_eq!(got, expected);
260    }
261
262    #[tokio::test]
263    async fn test_provider_with_skip_wal() {
264        let provider = WalProvider::RaftEngine;
265        provider.start().await.unwrap();
266
267        let num_regions = 32;
268        let regions = (0..num_regions).collect::<Vec<_>>();
269        let got = provider.allocate(&regions, true).await.unwrap();
270        assert_eq!(got.len(), num_regions as usize);
271        for wal_options in got.values() {
272            assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
273        }
274    }
275}