common_meta/
wal_options_allocator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_wal::config::MetasrvWalConfig;
25use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
26use snafu::{ResultExt, ensure};
27use store_api::storage::{RegionId, RegionNumber};
28
29use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
30use crate::key::TOPIC_NAME_PATTERN_REGEX;
31use crate::kv_backend::KvBackendRef;
32use crate::leadership_notifier::LeadershipChangeListener;
33pub use crate::wal_options_allocator::topic_creator::{
34    build_kafka_client, build_kafka_topic_creator,
35};
36use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
37
38/// Allocates wal options in region granularity.
39#[derive(Default, Debug)]
40pub enum WalOptionsAllocator {
41    #[default]
42    RaftEngine,
43    Kafka(KafkaTopicPool),
44}
45
46/// Arc wrapper of WalOptionsAllocator.
47pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
48
49impl WalOptionsAllocator {
50    /// Tries to start the allocator.
51    pub async fn start(&self) -> Result<()> {
52        match self {
53            Self::RaftEngine => Ok(()),
54            Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
55        }
56    }
57
58    /// Allocates a batch of wal options where each wal options goes to a region.
59    /// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
60    pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
61        if skip_wal {
62            return Ok(vec![WalOptions::Noop; num_regions]);
63        }
64        match self {
65            WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
66            WalOptionsAllocator::Kafka(topic_manager) => {
67                let options_batch = topic_manager
68                    .select_batch(num_regions)?
69                    .into_iter()
70                    .map(|topic| {
71                        WalOptions::Kafka(KafkaWalOptions {
72                            topic: topic.clone(),
73                        })
74                    })
75                    .collect();
76                Ok(options_batch)
77            }
78        }
79    }
80
81    /// Returns true if it's the remote WAL.
82    pub fn is_remote_wal(&self) -> bool {
83        matches!(&self, WalOptionsAllocator::Kafka(_))
84    }
85}
86
87#[async_trait]
88impl LeadershipChangeListener for WalOptionsAllocator {
89    fn name(&self) -> &str {
90        "WalOptionsAllocator"
91    }
92
93    async fn on_leader_start(&self) -> Result<()> {
94        self.start().await
95    }
96
97    async fn on_leader_stop(&self) -> Result<()> {
98        Ok(())
99    }
100}
101
102/// Builds a wal options allocator based on the given configuration.
103pub async fn build_wal_options_allocator(
104    config: &MetasrvWalConfig,
105    kv_backend: KvBackendRef,
106) -> Result<WalOptionsAllocator> {
107    match config {
108        MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
109        MetasrvWalConfig::Kafka(kafka_config) => {
110            let prefix = &kafka_config.kafka_topic.topic_name_prefix;
111            ensure!(
112                TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
113                InvalidTopicNamePrefixSnafu { prefix }
114            );
115            let topic_creator =
116                build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
117                    .await?;
118            let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
119            Ok(WalOptionsAllocator::Kafka(topic_pool))
120        }
121    }
122}
123
124/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
125pub fn allocate_region_wal_options(
126    regions: Vec<RegionNumber>,
127    wal_options_allocator: &WalOptionsAllocator,
128    skip_wal: bool,
129) -> Result<HashMap<RegionNumber, String>> {
130    let wal_options = wal_options_allocator
131        .alloc_batch(regions.len(), skip_wal)?
132        .into_iter()
133        .map(|wal_options| {
134            serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
135        })
136        .collect::<Result<Vec<_>>>()?;
137
138    Ok(regions.into_iter().zip(wal_options).collect())
139}
140
141/// Inserts wal options into options.
142pub fn prepare_wal_options(
143    options: &mut HashMap<String, String>,
144    region_id: RegionId,
145    region_wal_options: &HashMap<RegionNumber, String>,
146) {
147    if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
148        options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
149    }
150}
151
152/// Extracts the topic from the wal options.
153pub fn extract_topic_from_wal_options(
154    region_id: RegionId,
155    region_options: &HashMap<RegionNumber, String>,
156) -> Option<String> {
157    region_options
158        .get(&region_id.region_number())
159        .and_then(|wal_options| {
160            serde_json::from_str::<WalOptions>(wal_options)
161                .ok()
162                .and_then(|wal_options| {
163                    if let WalOptions::Kafka(kafka_wal_option) = wal_options {
164                        Some(kafka_wal_option.topic)
165                    } else {
166                        None
167                    }
168                })
169        })
170}
171
172#[cfg(test)]
173mod tests {
174    use std::assert_matches::assert_matches;
175
176    use common_wal::config::kafka::MetasrvKafkaConfig;
177    use common_wal::config::kafka::common::KafkaTopicConfig;
178    use common_wal::maybe_skip_kafka_integration_test;
179    use common_wal::test_util::get_kafka_endpoints;
180
181    use super::*;
182    use crate::error::Error;
183    use crate::kv_backend::memory::MemoryKvBackend;
184    use crate::test_util::test_kafka_topic_pool;
185    use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
186
187    // Tests that the wal options allocator could successfully allocate raft-engine wal options.
188    #[tokio::test]
189    async fn test_allocator_with_raft_engine() {
190        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
191        let wal_config = MetasrvWalConfig::RaftEngine;
192        let allocator = build_wal_options_allocator(&wal_config, kv_backend)
193            .await
194            .unwrap();
195        allocator.start().await.unwrap();
196
197        let num_regions = 32;
198        let regions = (0..num_regions).collect::<Vec<_>>();
199        let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
200
201        let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
202        let expected = regions
203            .into_iter()
204            .zip(vec![encoded_wal_options; num_regions as usize])
205            .collect();
206        assert_eq!(got, expected);
207    }
208
209    #[tokio::test]
210    async fn test_refuse_invalid_topic_name_prefix() {
211        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
212        let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
213            kafka_topic: KafkaTopicConfig {
214                topic_name_prefix: "``````".to_string(),
215                ..Default::default()
216            },
217            ..Default::default()
218        });
219        let got = build_wal_options_allocator(&wal_config, kv_backend)
220            .await
221            .unwrap_err();
222        assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
223    }
224
225    #[tokio::test]
226    async fn test_allocator_with_kafka_allocate_wal_options() {
227        common_telemetry::init_default_ut_logging();
228        maybe_skip_kafka_integration_test!();
229        let num_topics = 5;
230        let mut topic_pool = test_kafka_topic_pool(
231            get_kafka_endpoints(),
232            num_topics,
233            true,
234            Some("test_allocator_with_kafka"),
235        )
236        .await;
237        topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
238        let topics = topic_pool.topics.clone();
239        // clean up the topics before test
240        let topic_creator = topic_pool.topic_creator();
241        topic_creator.delete_topics(&topics).await.unwrap();
242
243        // Creates an options allocator.
244        let allocator = WalOptionsAllocator::Kafka(topic_pool);
245        allocator.start().await.unwrap();
246
247        let num_regions = 3;
248        let regions = (0..num_regions).collect::<Vec<_>>();
249        let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
250
251        // Check the allocated wal options contain the expected topics.
252        let expected = (0..num_regions)
253            .map(|i| {
254                let options = WalOptions::Kafka(KafkaWalOptions {
255                    topic: topics[i as usize].clone(),
256                });
257                (i, serde_json::to_string(&options).unwrap())
258            })
259            .collect::<HashMap<_, _>>();
260        assert_eq!(got, expected);
261    }
262
263    #[tokio::test]
264    async fn test_allocator_with_skip_wal() {
265        let allocator = WalOptionsAllocator::RaftEngine;
266        allocator.start().await.unwrap();
267
268        let num_regions = 32;
269        let regions = (0..num_regions).collect::<Vec<_>>();
270        let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
271        assert_eq!(got.len(), num_regions as usize);
272        for wal_options in got.values() {
273            assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
274        }
275    }
276}