common_meta/
wal_provider.rs1mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_wal::config::MetasrvWalConfig;
25use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
26use snafu::{ResultExt, ensure};
27use store_api::storage::{RegionId, RegionNumber};
28
29use crate::ddl::allocator::wal_options::WalOptionsAllocator;
30use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
31use crate::key::TOPIC_NAME_PATTERN_REGEX;
32use crate::kv_backend::KvBackendRef;
33use crate::leadership_notifier::LeadershipChangeListener;
34pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
35use crate::wal_provider::topic_pool::KafkaTopicPool;
36
37#[derive(Default, Debug)]
39pub enum WalProvider {
40 #[default]
41 RaftEngine,
42 Kafka(KafkaTopicPool),
43}
44
45pub type WalProviderRef = Arc<WalProvider>;
47
48#[async_trait::async_trait]
49impl WalOptionsAllocator for WalProvider {
50 async fn allocate(
51 &self,
52 region_numbers: &[RegionNumber],
53 skip_wal: bool,
54 ) -> Result<HashMap<RegionNumber, String>> {
55 let wal_options = self
56 .alloc_batch(region_numbers.len(), skip_wal)?
57 .into_iter()
58 .map(|wal_options| {
59 serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
60 })
61 .collect::<Result<Vec<_>>>()?;
62
63 Ok(region_numbers.iter().copied().zip(wal_options).collect())
64 }
65}
66
67impl WalProvider {
68 pub async fn start(&self) -> Result<()> {
70 match self {
71 Self::RaftEngine => Ok(()),
72 Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
73 }
74 }
75
76 pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
79 if skip_wal {
80 return Ok(vec![WalOptions::Noop; num_regions]);
81 }
82 match self {
83 WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
84 WalProvider::Kafka(topic_manager) => {
85 let options_batch = topic_manager
86 .select_batch(num_regions)?
87 .into_iter()
88 .map(|topic| {
89 WalOptions::Kafka(KafkaWalOptions {
90 topic: topic.clone(),
91 })
92 })
93 .collect();
94 Ok(options_batch)
95 }
96 }
97 }
98
99 pub fn is_remote_wal(&self) -> bool {
101 matches!(&self, WalProvider::Kafka(_))
102 }
103}
104
105#[async_trait]
106impl LeadershipChangeListener for WalProvider {
107 fn name(&self) -> &str {
108 "WalProvider"
109 }
110
111 async fn on_leader_start(&self) -> Result<()> {
112 self.start().await
113 }
114
115 async fn on_leader_stop(&self) -> Result<()> {
116 Ok(())
117 }
118}
119
120pub async fn build_wal_provider(
122 config: &MetasrvWalConfig,
123 kv_backend: KvBackendRef,
124) -> Result<WalProvider> {
125 match config {
126 MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
127 MetasrvWalConfig::Kafka(kafka_config) => {
128 let prefix = &kafka_config.kafka_topic.topic_name_prefix;
129 ensure!(
130 TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
131 InvalidTopicNamePrefixSnafu { prefix }
132 );
133 let topic_creator =
134 build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
135 .await?;
136 let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
137 Ok(WalProvider::Kafka(topic_pool))
138 }
139 }
140}
141
142pub fn prepare_wal_options(
144 options: &mut HashMap<String, String>,
145 region_id: RegionId,
146 region_wal_options: &HashMap<RegionNumber, String>,
147) {
148 if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) {
149 options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
150 }
151}
152
153pub fn extract_topic_from_wal_options(
155 region_id: RegionId,
156 region_options: &HashMap<RegionNumber, String>,
157) -> Option<String> {
158 region_options
159 .get(®ion_id.region_number())
160 .and_then(|wal_options| {
161 serde_json::from_str::<WalOptions>(wal_options)
162 .ok()
163 .and_then(|wal_options| {
164 if let WalOptions::Kafka(kafka_wal_option) = wal_options {
165 Some(kafka_wal_option.topic)
166 } else {
167 None
168 }
169 })
170 })
171}
172
173#[cfg(test)]
174mod tests {
175 use std::assert_matches::assert_matches;
176
177 use common_wal::config::kafka::MetasrvKafkaConfig;
178 use common_wal::config::kafka::common::KafkaTopicConfig;
179 use common_wal::maybe_skip_kafka_integration_test;
180 use common_wal::test_util::get_kafka_endpoints;
181
182 use super::*;
183 use crate::error::Error;
184 use crate::kv_backend::memory::MemoryKvBackend;
185 use crate::test_util::test_kafka_topic_pool;
186 use crate::wal_provider::selector::RoundRobinTopicSelector;
187
188 #[tokio::test]
190 async fn test_provider_with_raft_engine() {
191 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
192 let wal_config = MetasrvWalConfig::RaftEngine;
193 let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
194 provider.start().await.unwrap();
195
196 let num_regions = 32;
197 let regions = (0..num_regions).collect::<Vec<_>>();
198 let got = provider.allocate(®ions, false).await.unwrap();
199
200 let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
201 let expected = regions
202 .into_iter()
203 .zip(vec![encoded_wal_options; num_regions as usize])
204 .collect();
205 assert_eq!(got, expected);
206 }
207
208 #[tokio::test]
209 async fn test_refuse_invalid_topic_name_prefix() {
210 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
211 let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
212 kafka_topic: KafkaTopicConfig {
213 topic_name_prefix: "``````".to_string(),
214 ..Default::default()
215 },
216 ..Default::default()
217 });
218 let got = build_wal_provider(&wal_config, kv_backend)
219 .await
220 .unwrap_err();
221 assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
222 }
223
224 #[tokio::test]
225 async fn test_provider_with_kafka_allocate_wal_options() {
226 common_telemetry::init_default_ut_logging();
227 maybe_skip_kafka_integration_test!();
228 let num_topics = 5;
229 let mut topic_pool = test_kafka_topic_pool(
230 get_kafka_endpoints(),
231 num_topics,
232 true,
233 Some("test_allocator_with_kafka"),
234 )
235 .await;
236 topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
237 let topics = topic_pool.topics.clone();
238 let topic_creator = topic_pool.topic_creator();
240 topic_creator.delete_topics(&topics).await.unwrap();
241
242 let provider = WalProvider::Kafka(topic_pool);
244 provider.start().await.unwrap();
245
246 let num_regions = 3;
247 let regions = (0..num_regions).collect::<Vec<_>>();
248 let got = provider.allocate(®ions, false).await.unwrap();
249
250 let expected = (0..num_regions)
252 .map(|i| {
253 let options = WalOptions::Kafka(KafkaWalOptions {
254 topic: topics[i as usize].clone(),
255 });
256 (i, serde_json::to_string(&options).unwrap())
257 })
258 .collect::<HashMap<_, _>>();
259 assert_eq!(got, expected);
260 }
261
262 #[tokio::test]
263 async fn test_provider_with_skip_wal() {
264 let provider = WalProvider::RaftEngine;
265 provider.start().await.unwrap();
266
267 let num_regions = 32;
268 let regions = (0..num_regions).collect::<Vec<_>>();
269 let got = provider.allocate(®ions, true).await.unwrap();
270 assert_eq!(got.len(), num_regions as usize);
271 for wal_options in got.values() {
272 assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
273 }
274 }
275}