common_meta/
wal_options_allocator.rs1mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_wal::config::MetasrvWalConfig;
25use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
26use snafu::{ensure, ResultExt};
27use store_api::storage::{RegionId, RegionNumber};
28
29use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
30use crate::key::NAME_PATTERN_REGEX;
31use crate::kv_backend::KvBackendRef;
32use crate::leadership_notifier::LeadershipChangeListener;
33pub use crate::wal_options_allocator::topic_creator::{
34 build_kafka_client, build_kafka_topic_creator,
35};
36use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
37
38#[derive(Default, Debug)]
40pub enum WalOptionsAllocator {
41 #[default]
42 RaftEngine,
43 Kafka(KafkaTopicPool),
44}
45
46pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
48
49impl WalOptionsAllocator {
50 pub async fn start(&self) -> Result<()> {
52 match self {
53 Self::RaftEngine => Ok(()),
54 Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
55 }
56 }
57
58 pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
61 if skip_wal {
62 return Ok(vec![WalOptions::Noop; num_regions]);
63 }
64 match self {
65 WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
66 WalOptionsAllocator::Kafka(topic_manager) => {
67 let options_batch = topic_manager
68 .select_batch(num_regions)?
69 .into_iter()
70 .map(|topic| {
71 WalOptions::Kafka(KafkaWalOptions {
72 topic: topic.clone(),
73 })
74 })
75 .collect();
76 Ok(options_batch)
77 }
78 }
79 }
80
81 pub fn is_remote_wal(&self) -> bool {
83 matches!(&self, WalOptionsAllocator::Kafka(_))
84 }
85}
86
87#[async_trait]
88impl LeadershipChangeListener for WalOptionsAllocator {
89 fn name(&self) -> &str {
90 "WalOptionsAllocator"
91 }
92
93 async fn on_leader_start(&self) -> Result<()> {
94 self.start().await
95 }
96
97 async fn on_leader_stop(&self) -> Result<()> {
98 Ok(())
99 }
100}
101
102pub async fn build_wal_options_allocator(
104 config: &MetasrvWalConfig,
105 kv_backend: KvBackendRef,
106) -> Result<WalOptionsAllocator> {
107 match config {
108 MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
109 MetasrvWalConfig::Kafka(kafka_config) => {
110 let prefix = &kafka_config.kafka_topic.topic_name_prefix;
111 ensure!(
112 NAME_PATTERN_REGEX.is_match(prefix),
113 InvalidTopicNamePrefixSnafu { prefix }
114 );
115 let topic_creator =
116 build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
117 .await?;
118 let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
119 Ok(WalOptionsAllocator::Kafka(topic_pool))
120 }
121 }
122}
123
124pub fn allocate_region_wal_options(
126 regions: Vec<RegionNumber>,
127 wal_options_allocator: &WalOptionsAllocator,
128 skip_wal: bool,
129) -> Result<HashMap<RegionNumber, String>> {
130 let wal_options = wal_options_allocator
131 .alloc_batch(regions.len(), skip_wal)?
132 .into_iter()
133 .map(|wal_options| {
134 serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
135 })
136 .collect::<Result<Vec<_>>>()?;
137
138 Ok(regions.into_iter().zip(wal_options).collect())
139}
140
141pub fn prepare_wal_options(
143 options: &mut HashMap<String, String>,
144 region_id: RegionId,
145 region_wal_options: &HashMap<RegionNumber, String>,
146) {
147 if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) {
148 options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::assert_matches::assert_matches;
155
156 use common_wal::config::kafka::common::KafkaTopicConfig;
157 use common_wal::config::kafka::MetasrvKafkaConfig;
158 use common_wal::maybe_skip_kafka_integration_test;
159 use common_wal::test_util::get_kafka_endpoints;
160
161 use super::*;
162 use crate::error::Error;
163 use crate::kv_backend::memory::MemoryKvBackend;
164 use crate::test_util::test_kafka_topic_pool;
165 use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
166
167 #[tokio::test]
169 async fn test_allocator_with_raft_engine() {
170 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
171 let wal_config = MetasrvWalConfig::RaftEngine;
172 let allocator = build_wal_options_allocator(&wal_config, kv_backend)
173 .await
174 .unwrap();
175 allocator.start().await.unwrap();
176
177 let num_regions = 32;
178 let regions = (0..num_regions).collect::<Vec<_>>();
179 let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
180
181 let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
182 let expected = regions
183 .into_iter()
184 .zip(vec![encoded_wal_options; num_regions as usize])
185 .collect();
186 assert_eq!(got, expected);
187 }
188
189 #[tokio::test]
190 async fn test_refuse_invalid_topic_name_prefix() {
191 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
192 let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
193 kafka_topic: KafkaTopicConfig {
194 topic_name_prefix: "``````".to_string(),
195 ..Default::default()
196 },
197 ..Default::default()
198 });
199 let got = build_wal_options_allocator(&wal_config, kv_backend)
200 .await
201 .unwrap_err();
202 assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
203 }
204
205 #[tokio::test]
206 async fn test_allocator_with_kafka_allocate_wal_options() {
207 common_telemetry::init_default_ut_logging();
208 maybe_skip_kafka_integration_test!();
209 let num_topics = 5;
210 let mut topic_pool = test_kafka_topic_pool(
211 get_kafka_endpoints(),
212 num_topics,
213 true,
214 Some("test_allocator_with_kafka"),
215 )
216 .await;
217 topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
218 let topics = topic_pool.topics.clone();
219 let topic_creator = topic_pool.topic_creator();
221 topic_creator.delete_topics(&topics).await.unwrap();
222
223 let allocator = WalOptionsAllocator::Kafka(topic_pool);
225 allocator.start().await.unwrap();
226
227 let num_regions = 3;
228 let regions = (0..num_regions).collect::<Vec<_>>();
229 let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
230
231 let expected = (0..num_regions)
233 .map(|i| {
234 let options = WalOptions::Kafka(KafkaWalOptions {
235 topic: topics[i as usize].clone(),
236 });
237 (i, serde_json::to_string(&options).unwrap())
238 })
239 .collect::<HashMap<_, _>>();
240 assert_eq!(got, expected);
241 }
242
243 #[tokio::test]
244 async fn test_allocator_with_skip_wal() {
245 let allocator = WalOptionsAllocator::RaftEngine;
246 allocator.start().await.unwrap();
247
248 let num_regions = 32;
249 let regions = (0..num_regions).collect::<Vec<_>>();
250 let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
251 assert_eq!(got.len(), num_regions as usize);
252 for wal_options in got.values() {
253 assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
254 }
255 }
256}