common_meta/
wal_options_allocator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_wal::config::MetasrvWalConfig;
25use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
26use snafu::{ensure, ResultExt};
27use store_api::storage::{RegionId, RegionNumber};
28
29use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
30use crate::key::NAME_PATTERN_REGEX;
31use crate::kv_backend::KvBackendRef;
32use crate::leadership_notifier::LeadershipChangeListener;
33pub use crate::wal_options_allocator::topic_creator::{
34    build_kafka_client, build_kafka_topic_creator,
35};
36use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
37
38/// Allocates wal options in region granularity.
39#[derive(Default, Debug)]
40pub enum WalOptionsAllocator {
41    #[default]
42    RaftEngine,
43    Kafka(KafkaTopicPool),
44}
45
46/// Arc wrapper of WalOptionsAllocator.
47pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
48
49impl WalOptionsAllocator {
50    /// Tries to start the allocator.
51    pub async fn start(&self) -> Result<()> {
52        match self {
53            Self::RaftEngine => Ok(()),
54            Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
55        }
56    }
57
58    /// Allocates a batch of wal options where each wal options goes to a region.
59    /// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
60    pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
61        if skip_wal {
62            return Ok(vec![WalOptions::Noop; num_regions]);
63        }
64        match self {
65            WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
66            WalOptionsAllocator::Kafka(topic_manager) => {
67                let options_batch = topic_manager
68                    .select_batch(num_regions)?
69                    .into_iter()
70                    .map(|topic| {
71                        WalOptions::Kafka(KafkaWalOptions {
72                            topic: topic.clone(),
73                        })
74                    })
75                    .collect();
76                Ok(options_batch)
77            }
78        }
79    }
80
81    /// Returns true if it's the remote WAL.
82    pub fn is_remote_wal(&self) -> bool {
83        matches!(&self, WalOptionsAllocator::Kafka(_))
84    }
85}
86
87#[async_trait]
88impl LeadershipChangeListener for WalOptionsAllocator {
89    fn name(&self) -> &str {
90        "WalOptionsAllocator"
91    }
92
93    async fn on_leader_start(&self) -> Result<()> {
94        self.start().await
95    }
96
97    async fn on_leader_stop(&self) -> Result<()> {
98        Ok(())
99    }
100}
101
102/// Builds a wal options allocator based on the given configuration.
103pub async fn build_wal_options_allocator(
104    config: &MetasrvWalConfig,
105    kv_backend: KvBackendRef,
106) -> Result<WalOptionsAllocator> {
107    match config {
108        MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
109        MetasrvWalConfig::Kafka(kafka_config) => {
110            let prefix = &kafka_config.kafka_topic.topic_name_prefix;
111            ensure!(
112                NAME_PATTERN_REGEX.is_match(prefix),
113                InvalidTopicNamePrefixSnafu { prefix }
114            );
115            let topic_creator =
116                build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
117                    .await?;
118            let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
119            Ok(WalOptionsAllocator::Kafka(topic_pool))
120        }
121    }
122}
123
124/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
125pub fn allocate_region_wal_options(
126    regions: Vec<RegionNumber>,
127    wal_options_allocator: &WalOptionsAllocator,
128    skip_wal: bool,
129) -> Result<HashMap<RegionNumber, String>> {
130    let wal_options = wal_options_allocator
131        .alloc_batch(regions.len(), skip_wal)?
132        .into_iter()
133        .map(|wal_options| {
134            serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
135        })
136        .collect::<Result<Vec<_>>>()?;
137
138    Ok(regions.into_iter().zip(wal_options).collect())
139}
140
141/// Inserts wal options into options.
142pub fn prepare_wal_options(
143    options: &mut HashMap<String, String>,
144    region_id: RegionId,
145    region_wal_options: &HashMap<RegionNumber, String>,
146) {
147    if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
148        options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::assert_matches::assert_matches;
155
156    use common_wal::config::kafka::common::KafkaTopicConfig;
157    use common_wal::config::kafka::MetasrvKafkaConfig;
158    use common_wal::maybe_skip_kafka_integration_test;
159    use common_wal::test_util::get_kafka_endpoints;
160
161    use super::*;
162    use crate::error::Error;
163    use crate::kv_backend::memory::MemoryKvBackend;
164    use crate::test_util::test_kafka_topic_pool;
165    use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
166
167    // Tests that the wal options allocator could successfully allocate raft-engine wal options.
168    #[tokio::test]
169    async fn test_allocator_with_raft_engine() {
170        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
171        let wal_config = MetasrvWalConfig::RaftEngine;
172        let allocator = build_wal_options_allocator(&wal_config, kv_backend)
173            .await
174            .unwrap();
175        allocator.start().await.unwrap();
176
177        let num_regions = 32;
178        let regions = (0..num_regions).collect::<Vec<_>>();
179        let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
180
181        let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
182        let expected = regions
183            .into_iter()
184            .zip(vec![encoded_wal_options; num_regions as usize])
185            .collect();
186        assert_eq!(got, expected);
187    }
188
189    #[tokio::test]
190    async fn test_refuse_invalid_topic_name_prefix() {
191        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
192        let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
193            kafka_topic: KafkaTopicConfig {
194                topic_name_prefix: "``````".to_string(),
195                ..Default::default()
196            },
197            ..Default::default()
198        });
199        let got = build_wal_options_allocator(&wal_config, kv_backend)
200            .await
201            .unwrap_err();
202        assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
203    }
204
205    #[tokio::test]
206    async fn test_allocator_with_kafka_allocate_wal_options() {
207        common_telemetry::init_default_ut_logging();
208        maybe_skip_kafka_integration_test!();
209        let num_topics = 5;
210        let mut topic_pool = test_kafka_topic_pool(
211            get_kafka_endpoints(),
212            num_topics,
213            true,
214            Some("test_allocator_with_kafka"),
215        )
216        .await;
217        topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
218        let topics = topic_pool.topics.clone();
219        // clean up the topics before test
220        let topic_creator = topic_pool.topic_creator();
221        topic_creator.delete_topics(&topics).await.unwrap();
222
223        // Creates an options allocator.
224        let allocator = WalOptionsAllocator::Kafka(topic_pool);
225        allocator.start().await.unwrap();
226
227        let num_regions = 3;
228        let regions = (0..num_regions).collect::<Vec<_>>();
229        let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
230
231        // Check the allocated wal options contain the expected topics.
232        let expected = (0..num_regions)
233            .map(|i| {
234                let options = WalOptions::Kafka(KafkaWalOptions {
235                    topic: topics[i as usize].clone(),
236                });
237                (i, serde_json::to_string(&options).unwrap())
238            })
239            .collect::<HashMap<_, _>>();
240        assert_eq!(got, expected);
241    }
242
243    #[tokio::test]
244    async fn test_allocator_with_skip_wal() {
245        let allocator = WalOptionsAllocator::RaftEngine;
246        allocator.start().await.unwrap();
247
248        let num_regions = 32;
249        let regions = (0..num_regions).collect::<Vec<_>>();
250        let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
251        assert_eq!(got.len(), num_regions as usize);
252        for wal_options in got.values() {
253            assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
254        }
255    }
256}