1mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::{BTreeSet, HashMap};
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_procedure::Context as ProcedureContext;
25use common_procedure::local::DynamicKeyLockGuard;
26use common_wal::config::MetasrvWalConfig;
27use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
28use serde::{Deserialize, Deserializer, Serialize, Serializer};
29use snafu::ensure;
30use store_api::storage::{RegionId, RegionNumber};
31
32use crate::ddl::allocator::wal_options::WalOptionsAllocator;
33use crate::error::{InvalidTopicNamePrefixSnafu, Result};
34use crate::key::topic_name::TopicNameKey;
35use crate::key::{TOPIC_NAME_PATTERN_REGEX, TableMetadataManagerRef};
36use crate::kv_backend::KvBackendRef;
37use crate::leadership_notifier::LeadershipChangeListener;
38use crate::lock_key::RemoteWalLock;
39pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
40use crate::wal_provider::topic_pool::KafkaTopicPool;
41
42pub type RegionWalOptions = HashMap<RegionNumber, WalOptions>;
44
45pub fn remote_wal_topics(region_wal_options: &RegionWalOptions) -> Vec<&str> {
47 region_wal_options
48 .values()
49 .filter_map(|wal_options| match wal_options {
50 WalOptions::Kafka(kafka_options) => Some(kafka_options.topic.as_str()),
51 _ => None,
52 })
53 .collect::<BTreeSet<_>>()
54 .into_iter()
55 .collect()
56}
57
58pub async fn acquire_remote_wal_read_locks(
60 ctx: &ProcedureContext,
61 region_wal_options: &RegionWalOptions,
62) -> Vec<DynamicKeyLockGuard> {
63 let topics = remote_wal_topics(region_wal_options)
64 .into_iter()
65 .map(str::to_string)
66 .collect::<Vec<_>>();
67 let mut guards = Vec::with_capacity(topics.len());
68 for topic in topics {
69 let guard = ctx
70 .provider
71 .acquire_lock(&(RemoteWalLock::Read(topic).into()))
72 .await;
73 guards.push(guard);
74 }
75 guards
76}
77
78pub async fn refresh_initial_pruned_entry_ids(
80 table_metadata_manager: &TableMetadataManagerRef,
81 region_wal_options: &mut RegionWalOptions,
82) -> Result<()> {
83 let topics = remote_wal_topics(region_wal_options);
84 if topics.is_empty() {
85 return Ok(());
86 }
87
88 let topic_values = table_metadata_manager
89 .topic_name_manager()
90 .batch_get(
91 topics
92 .iter()
93 .map(|topic| TopicNameKey::new(topic))
94 .collect(),
95 )
96 .await?;
97
98 for wal_options in region_wal_options.values_mut() {
99 let WalOptions::Kafka(kafka_options) = wal_options else {
100 continue;
101 };
102 kafka_options.initial_pruned_entry_id = Some(
103 topic_values
104 .get(&kafka_options.topic)
105 .map(|value| value.pruned_entry_id)
106 .unwrap_or_default(),
107 );
108 }
109
110 Ok(())
111}
112
113#[derive(Deserialize)]
114#[serde(untagged)]
115enum WalOptionsCompat {
116 Encoded(String),
117 Structured(WalOptions),
118}
119
120fn deserialize_region_wal_options<E>(
121 values: HashMap<String, WalOptionsCompat>,
122) -> std::result::Result<RegionWalOptions, E>
123where
124 E: serde::de::Error,
125{
126 values
127 .into_iter()
128 .map(|(region_number, wal_options)| {
129 let region_number = region_number.parse::<RegionNumber>().map_err(|err| {
130 E::custom(format!(
131 "invalid region number in region_wal_options: {region_number}, err: {err}"
132 ))
133 })?;
134 let wal_options = match wal_options {
135 WalOptionsCompat::Encoded(encoded) => serde_json::from_str(&encoded).map_err(|err| {
136 E::custom(format!(
137 "failed to decode legacy wal options for region {region_number}: {encoded}, err: {err}"
138 ))
139 })?,
140 WalOptionsCompat::Structured(wal_options) => wal_options,
141 };
142 Ok((region_number, wal_options))
143 })
144 .collect()
145}
146
147pub mod region_wal_options_serde {
153 use super::*;
154
155 pub fn serialize<S>(
157 value: &RegionWalOptions,
158 serializer: S,
159 ) -> std::result::Result<S::Ok, S::Error>
160 where
161 S: Serializer,
162 {
163 value.serialize(serializer)
164 }
165
166 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<RegionWalOptions, D::Error>
168 where
169 D: Deserializer<'de>,
170 {
171 let values = HashMap::<String, WalOptionsCompat>::deserialize(deserializer)?;
172 deserialize_region_wal_options(values)
173 }
174}
175
176pub mod optional_region_wal_options_serde {
178 use super::*;
179
180 pub fn serialize<S>(
182 value: &Option<RegionWalOptions>,
183 serializer: S,
184 ) -> std::result::Result<S::Ok, S::Error>
185 where
186 S: Serializer,
187 {
188 value.serialize(serializer)
189 }
190
191 pub fn deserialize<'de, D>(
193 deserializer: D,
194 ) -> std::result::Result<Option<RegionWalOptions>, D::Error>
195 where
196 D: Deserializer<'de>,
197 {
198 let Some(values) = Option::<HashMap<String, WalOptionsCompat>>::deserialize(deserializer)?
199 else {
200 return Ok(None);
201 };
202
203 deserialize_region_wal_options(values).map(Some)
204 }
205}
206
207#[derive(Default, Debug)]
209pub enum WalProvider {
210 #[default]
211 RaftEngine,
212 Kafka(KafkaTopicPool),
213}
214
215pub type WalProviderRef = Arc<WalProvider>;
217
218#[async_trait::async_trait]
219impl WalOptionsAllocator for WalProvider {
220 async fn allocate(
221 &self,
222 region_numbers: &[RegionNumber],
223 skip_wal: bool,
224 ) -> Result<RegionWalOptions> {
225 let wal_options = self.alloc_batch(region_numbers.len(), skip_wal).await?;
226
227 Ok(region_numbers.iter().copied().zip(wal_options).collect())
228 }
229}
230
231impl WalProvider {
232 pub async fn start(&self) -> Result<()> {
234 match self {
235 Self::RaftEngine => Ok(()),
236 Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
237 }
238 }
239
240 pub async fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
243 if skip_wal {
244 return Ok(vec![WalOptions::Noop; num_regions]);
245 }
246 match self {
247 WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
248 WalProvider::Kafka(topic_manager) => {
249 let options_batch = topic_manager
250 .select_batch(num_regions)?
251 .into_iter()
252 .map(|topic| WalOptions::Kafka(KafkaWalOptions::new(topic.clone())))
253 .collect();
254 Ok(options_batch)
255 }
256 }
257 }
258
259 pub fn is_remote_wal(&self) -> bool {
261 matches!(&self, WalProvider::Kafka(_))
262 }
263}
264
265#[async_trait]
266impl LeadershipChangeListener for WalProvider {
267 fn name(&self) -> &str {
268 "WalProvider"
269 }
270
271 async fn on_leader_start(&self) -> Result<()> {
272 self.start().await
273 }
274
275 async fn on_leader_stop(&self) -> Result<()> {
276 Ok(())
277 }
278}
279
280pub async fn build_wal_provider(
282 config: &MetasrvWalConfig,
283 kv_backend: KvBackendRef,
284) -> Result<WalProvider> {
285 match config {
286 MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
287 MetasrvWalConfig::Kafka(kafka_config) => {
288 let prefix = &kafka_config.kafka_topic.topic_name_prefix;
289 ensure!(
290 TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
291 InvalidTopicNamePrefixSnafu { prefix }
292 );
293 let topic_creator =
294 build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
295 .await?;
296 let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
297 Ok(WalProvider::Kafka(topic_pool))
298 }
299 }
300}
301
302pub fn serialize_wal_options(
304 options: &mut HashMap<String, String>,
305 region_id: RegionId,
306 region_wal_options: &RegionWalOptions,
307) -> std::result::Result<(), serde_json::Error> {
308 if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) {
309 let encoded = serde_json::to_string(wal_options)?;
310 options.insert(WAL_OPTIONS_KEY.to_string(), encoded);
311 }
312 Ok(())
313}
314
315pub fn extract_topic_from_wal_options(
317 region_id: RegionId,
318 region_options: &RegionWalOptions,
319) -> Option<String> {
320 region_options
321 .get(®ion_id.region_number())
322 .and_then(|wal_options| match wal_options {
323 WalOptions::Kafka(kafka_wal_option) => Some(kafka_wal_option.topic.clone()),
324 _ => None,
325 })
326}
327
328#[cfg(test)]
329mod tests {
330 use std::assert_matches;
331
332 use common_wal::config::kafka::MetasrvKafkaConfig;
333 use common_wal::config::kafka::common::KafkaTopicConfig;
334 use common_wal::maybe_skip_kafka_integration_test;
335 use common_wal::test_util::get_kafka_endpoints;
336
337 use super::*;
338 use crate::error::Error;
339 use crate::kv_backend::memory::MemoryKvBackend;
340 use crate::test_util::test_kafka_topic_pool;
341 use crate::wal_provider::selector::RoundRobinTopicSelector;
342
343 #[tokio::test]
345 async fn test_provider_with_raft_engine() {
346 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
347 let wal_config = MetasrvWalConfig::RaftEngine;
348 let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
349 provider.start().await.unwrap();
350
351 let num_regions = 32;
352 let regions = (0..num_regions).collect::<Vec<_>>();
353 let got = provider.allocate(®ions, false).await.unwrap();
354
355 let expected = regions
356 .into_iter()
357 .zip(vec![WalOptions::RaftEngine; num_regions as usize])
358 .collect();
359 assert_eq!(got, expected);
360 }
361
362 #[tokio::test]
363 async fn test_refuse_invalid_topic_name_prefix() {
364 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
365 let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
366 kafka_topic: KafkaTopicConfig {
367 topic_name_prefix: "``````".to_string(),
368 ..Default::default()
369 },
370 ..Default::default()
371 });
372 let got = build_wal_provider(&wal_config, kv_backend)
373 .await
374 .unwrap_err();
375 assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
376 }
377
378 #[tokio::test]
379 async fn test_provider_with_kafka_allocate_wal_options() {
380 common_telemetry::init_default_ut_logging();
381 maybe_skip_kafka_integration_test!();
382 let num_topics = 5;
383 let mut topic_pool = test_kafka_topic_pool(
384 get_kafka_endpoints(),
385 num_topics,
386 true,
387 Some("test_allocator_with_kafka"),
388 )
389 .await;
390 topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
391 let topics = topic_pool.topics.clone();
392 let topic_creator = topic_pool.topic_creator();
394 topic_creator.delete_topics(&topics).await.unwrap();
395
396 let provider = WalProvider::Kafka(topic_pool);
398 provider.start().await.unwrap();
399
400 let num_regions = 3;
401 let regions = (0..num_regions).collect::<Vec<_>>();
402 let got = provider.allocate(®ions, false).await.unwrap();
403
404 let expected = (0..num_regions)
406 .map(|i| {
407 let options = WalOptions::Kafka(KafkaWalOptions::new(topics[i as usize].clone()));
408 (i, options)
409 })
410 .collect::<HashMap<_, _>>();
411 assert_eq!(got, expected);
412 }
413
414 #[derive(Debug, PartialEq, Serialize, Deserialize)]
415 struct RegionWalOptionsWrapper {
416 #[serde(with = "region_wal_options_serde")]
417 region_wal_options: RegionWalOptions,
418 }
419
420 #[test]
421 fn test_deserialize_legacy_region_wal_options_from_encoded_map() {
422 let legacy_region_wal_options = HashMap::from([
423 (1, serde_json::to_string(&WalOptions::RaftEngine).unwrap()),
424 (
425 2,
426 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions::new(
427 "topic_a".to_string(),
428 )))
429 .unwrap(),
430 ),
431 ]);
432 let legacy_json = serde_json::json!({
433 "region_wal_options": legacy_region_wal_options,
434 });
435
436 assert_eq!(
437 legacy_json.to_string(),
438 r#"{"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}","2":"{\"wal.provider\":\"kafka\",\"wal.kafka.topic\":\"topic_a\"}"}}"#
439 );
440
441 let decoded: RegionWalOptionsWrapper = serde_json::from_value(legacy_json).unwrap();
442
443 assert_eq!(
444 decoded.region_wal_options,
445 HashMap::from([
446 (1, WalOptions::RaftEngine),
447 (
448 2,
449 WalOptions::Kafka(KafkaWalOptions::new("topic_a".to_string())),
450 ),
451 ])
452 );
453 }
454
455 #[test]
456 fn test_deserialize_structured_region_wal_options() {
457 let json = r#"{
458 "region_wal_options": {
459 "1": {"wal.provider":"raft_engine"},
460 "2": {"wal.provider":"noop"}
461 }
462 }"#;
463
464 let decoded: RegionWalOptionsWrapper = serde_json::from_str(json).unwrap();
465
466 assert_eq!(
467 decoded.region_wal_options,
468 HashMap::from([(1, WalOptions::RaftEngine), (2, WalOptions::Noop)])
469 );
470 }
471
472 #[test]
473 fn test_serialize_structured_region_wal_options() {
474 let wrapper = RegionWalOptionsWrapper {
475 region_wal_options: HashMap::from([(1, WalOptions::RaftEngine)]),
476 };
477
478 let encoded = serde_json::to_string(&wrapper).unwrap();
479
480 assert_eq!(
481 encoded,
482 r#"{"region_wal_options":{"1":{"wal.provider":"raft_engine"}}}"#
483 );
484 }
485
486 #[tokio::test]
487 async fn test_provider_with_skip_wal() {
488 let provider = WalProvider::RaftEngine;
489 provider.start().await.unwrap();
490
491 let num_regions = 32;
492 let regions = (0..num_regions).collect::<Vec<_>>();
493 let got = provider.allocate(®ions, true).await.unwrap();
494 assert_eq!(got.len(), num_regions as usize);
495 for wal_options in got.values() {
496 assert_eq!(wal_options, &WalOptions::Noop);
497 }
498 }
499}