Skip to main content

common_meta/
wal_provider.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod selector;
16pub(crate) mod topic_creator;
17mod topic_manager;
18pub(crate) mod topic_pool;
19
20use std::collections::{BTreeSet, HashMap};
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use common_procedure::Context as ProcedureContext;
25use common_procedure::local::DynamicKeyLockGuard;
26use common_wal::config::MetasrvWalConfig;
27use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
28use serde::{Deserialize, Deserializer, Serialize, Serializer};
29use snafu::ensure;
30use store_api::storage::{RegionId, RegionNumber};
31
32use crate::ddl::allocator::wal_options::WalOptionsAllocator;
33use crate::error::{InvalidTopicNamePrefixSnafu, Result};
34use crate::key::topic_name::TopicNameKey;
35use crate::key::{TOPIC_NAME_PATTERN_REGEX, TableMetadataManagerRef};
36use crate::kv_backend::KvBackendRef;
37use crate::leadership_notifier::LeadershipChangeListener;
38use crate::lock_key::RemoteWalLock;
39pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
40use crate::wal_provider::topic_pool::KafkaTopicPool;
41
42/// WAL options allocated for each region.
43pub type RegionWalOptions = HashMap<RegionNumber, WalOptions>;
44
45/// Returns remote WAL topics referenced by region WAL options.
46pub fn remote_wal_topics(region_wal_options: &RegionWalOptions) -> Vec<&str> {
47    region_wal_options
48        .values()
49        .filter_map(|wal_options| match wal_options {
50            WalOptions::Kafka(kafka_options) => Some(kafka_options.topic.as_str()),
51            _ => None,
52        })
53        .collect::<BTreeSet<_>>()
54        .into_iter()
55        .collect()
56}
57
58/// Acquires per-topic read locks for remote WAL topics.
59pub async fn acquire_remote_wal_read_locks(
60    ctx: &ProcedureContext,
61    region_wal_options: &RegionWalOptions,
62) -> Vec<DynamicKeyLockGuard> {
63    let topics = remote_wal_topics(region_wal_options)
64        .into_iter()
65        .map(str::to_string)
66        .collect::<Vec<_>>();
67    let mut guards = Vec::with_capacity(topics.len());
68    for topic in topics {
69        let guard = ctx
70            .provider
71            .acquire_lock(&(RemoteWalLock::Read(topic).into()))
72            .await;
73        guards.push(guard);
74    }
75    guards
76}
77
78/// Refreshes initial pruned entry ids for Kafka WAL options.
79pub async fn refresh_initial_pruned_entry_ids(
80    table_metadata_manager: &TableMetadataManagerRef,
81    region_wal_options: &mut RegionWalOptions,
82) -> Result<()> {
83    let topics = remote_wal_topics(region_wal_options);
84    if topics.is_empty() {
85        return Ok(());
86    }
87
88    let topic_values = table_metadata_manager
89        .topic_name_manager()
90        .batch_get(
91            topics
92                .iter()
93                .map(|topic| TopicNameKey::new(topic))
94                .collect(),
95        )
96        .await?;
97
98    for wal_options in region_wal_options.values_mut() {
99        let WalOptions::Kafka(kafka_options) = wal_options else {
100            continue;
101        };
102        kafka_options.initial_pruned_entry_id = Some(
103            topic_values
104                .get(&kafka_options.topic)
105                .map(|value| value.pruned_entry_id)
106                .unwrap_or_default(),
107        );
108    }
109
110    Ok(())
111}
112
113#[derive(Deserialize)]
114#[serde(untagged)]
115enum WalOptionsCompat {
116    Encoded(String),
117    Structured(WalOptions),
118}
119
120fn deserialize_region_wal_options<E>(
121    values: HashMap<String, WalOptionsCompat>,
122) -> std::result::Result<RegionWalOptions, E>
123where
124    E: serde::de::Error,
125{
126    values
127        .into_iter()
128        .map(|(region_number, wal_options)| {
129            let region_number = region_number.parse::<RegionNumber>().map_err(|err| {
130                E::custom(format!(
131                    "invalid region number in region_wal_options: {region_number}, err: {err}"
132                ))
133            })?;
134            let wal_options = match wal_options {
135                WalOptionsCompat::Encoded(encoded) => serde_json::from_str(&encoded).map_err(|err| {
136                    E::custom(format!(
137                        "failed to decode legacy wal options for region {region_number}: {encoded}, err: {err}"
138                    ))
139                })?,
140                WalOptionsCompat::Structured(wal_options) => wal_options,
141            };
142            Ok((region_number, wal_options))
143        })
144        .collect()
145}
146
147/// Serde helpers for [`RegionWalOptions`] persisted in metadata.
148///
149/// New metadata stores WAL options as structured JSON objects. The deserializer
150/// also accepts the legacy format whose map values are JSON strings encoded from
151/// [`WalOptions`].
152pub mod region_wal_options_serde {
153    use super::*;
154
155    /// Serializes region WAL options in structured form.
156    pub fn serialize<S>(
157        value: &RegionWalOptions,
158        serializer: S,
159    ) -> std::result::Result<S::Ok, S::Error>
160    where
161        S: Serializer,
162    {
163        value.serialize(serializer)
164    }
165
166    /// Deserializes region WAL options from either structured or legacy encoded form.
167    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<RegionWalOptions, D::Error>
168    where
169        D: Deserializer<'de>,
170    {
171        let values = HashMap::<String, WalOptionsCompat>::deserialize(deserializer)?;
172        deserialize_region_wal_options(values)
173    }
174}
175
176/// Serde helpers for optional [`RegionWalOptions`] persisted in procedure state.
177pub mod optional_region_wal_options_serde {
178    use super::*;
179
180    /// Serializes optional region WAL options in structured form.
181    pub fn serialize<S>(
182        value: &Option<RegionWalOptions>,
183        serializer: S,
184    ) -> std::result::Result<S::Ok, S::Error>
185    where
186        S: Serializer,
187    {
188        value.serialize(serializer)
189    }
190
191    /// Deserializes optional region WAL options from structured or legacy encoded form.
192    pub fn deserialize<'de, D>(
193        deserializer: D,
194    ) -> std::result::Result<Option<RegionWalOptions>, D::Error>
195    where
196        D: Deserializer<'de>,
197    {
198        let Some(values) = Option::<HashMap<String, WalOptionsCompat>>::deserialize(deserializer)?
199        else {
200            return Ok(None);
201        };
202
203        deserialize_region_wal_options(values).map(Some)
204    }
205}
206
207/// Provides wal options in region granularity.
208#[derive(Default, Debug)]
209pub enum WalProvider {
210    #[default]
211    RaftEngine,
212    Kafka(KafkaTopicPool),
213}
214
215/// Arc wrapper of WalProvider.
216pub type WalProviderRef = Arc<WalProvider>;
217
218#[async_trait::async_trait]
219impl WalOptionsAllocator for WalProvider {
220    async fn allocate(
221        &self,
222        region_numbers: &[RegionNumber],
223        skip_wal: bool,
224    ) -> Result<RegionWalOptions> {
225        let wal_options = self.alloc_batch(region_numbers.len(), skip_wal).await?;
226
227        Ok(region_numbers.iter().copied().zip(wal_options).collect())
228    }
229}
230
231impl WalProvider {
232    /// Tries to start the provider.
233    pub async fn start(&self) -> Result<()> {
234        match self {
235            Self::RaftEngine => Ok(()),
236            Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
237        }
238    }
239
240    /// Allocates a batch of wal options where each wal options goes to a region.
241    /// If skip_wal is true, the wal options will be set to Noop regardless of the provider type.
242    pub async fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
243        if skip_wal {
244            return Ok(vec![WalOptions::Noop; num_regions]);
245        }
246        match self {
247            WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
248            WalProvider::Kafka(topic_manager) => {
249                let options_batch = topic_manager
250                    .select_batch(num_regions)?
251                    .into_iter()
252                    .map(|topic| WalOptions::Kafka(KafkaWalOptions::new(topic.clone())))
253                    .collect();
254                Ok(options_batch)
255            }
256        }
257    }
258
259    /// Returns true if it's the remote WAL.
260    pub fn is_remote_wal(&self) -> bool {
261        matches!(&self, WalProvider::Kafka(_))
262    }
263}
264
265#[async_trait]
266impl LeadershipChangeListener for WalProvider {
267    fn name(&self) -> &str {
268        "WalProvider"
269    }
270
271    async fn on_leader_start(&self) -> Result<()> {
272        self.start().await
273    }
274
275    async fn on_leader_stop(&self) -> Result<()> {
276        Ok(())
277    }
278}
279
280/// Builds a wal provider based on the given configuration.
281pub async fn build_wal_provider(
282    config: &MetasrvWalConfig,
283    kv_backend: KvBackendRef,
284) -> Result<WalProvider> {
285    match config {
286        MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
287        MetasrvWalConfig::Kafka(kafka_config) => {
288            let prefix = &kafka_config.kafka_topic.topic_name_prefix;
289            ensure!(
290                TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
291                InvalidTopicNamePrefixSnafu { prefix }
292            );
293            let topic_creator =
294                build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
295                    .await?;
296            let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
297            Ok(WalProvider::Kafka(topic_pool))
298        }
299    }
300}
301
302/// Serializes and inserts WAL options into the region options.
303pub fn serialize_wal_options(
304    options: &mut HashMap<String, String>,
305    region_id: RegionId,
306    region_wal_options: &RegionWalOptions,
307) -> std::result::Result<(), serde_json::Error> {
308    if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
309        let encoded = serde_json::to_string(wal_options)?;
310        options.insert(WAL_OPTIONS_KEY.to_string(), encoded);
311    }
312    Ok(())
313}
314
315/// Extracts the topic from the wal options.
316pub fn extract_topic_from_wal_options(
317    region_id: RegionId,
318    region_options: &RegionWalOptions,
319) -> Option<String> {
320    region_options
321        .get(&region_id.region_number())
322        .and_then(|wal_options| match wal_options {
323            WalOptions::Kafka(kafka_wal_option) => Some(kafka_wal_option.topic.clone()),
324            _ => None,
325        })
326}
327
328#[cfg(test)]
329mod tests {
330    use std::assert_matches;
331
332    use common_wal::config::kafka::MetasrvKafkaConfig;
333    use common_wal::config::kafka::common::KafkaTopicConfig;
334    use common_wal::maybe_skip_kafka_integration_test;
335    use common_wal::test_util::get_kafka_endpoints;
336
337    use super::*;
338    use crate::error::Error;
339    use crate::kv_backend::memory::MemoryKvBackend;
340    use crate::test_util::test_kafka_topic_pool;
341    use crate::wal_provider::selector::RoundRobinTopicSelector;
342
343    // Tests that the wal provider could successfully allocate raft-engine wal options.
344    #[tokio::test]
345    async fn test_provider_with_raft_engine() {
346        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
347        let wal_config = MetasrvWalConfig::RaftEngine;
348        let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
349        provider.start().await.unwrap();
350
351        let num_regions = 32;
352        let regions = (0..num_regions).collect::<Vec<_>>();
353        let got = provider.allocate(&regions, false).await.unwrap();
354
355        let expected = regions
356            .into_iter()
357            .zip(vec![WalOptions::RaftEngine; num_regions as usize])
358            .collect();
359        assert_eq!(got, expected);
360    }
361
362    #[tokio::test]
363    async fn test_refuse_invalid_topic_name_prefix() {
364        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
365        let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
366            kafka_topic: KafkaTopicConfig {
367                topic_name_prefix: "``````".to_string(),
368                ..Default::default()
369            },
370            ..Default::default()
371        });
372        let got = build_wal_provider(&wal_config, kv_backend)
373            .await
374            .unwrap_err();
375        assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
376    }
377
378    #[tokio::test]
379    async fn test_provider_with_kafka_allocate_wal_options() {
380        common_telemetry::init_default_ut_logging();
381        maybe_skip_kafka_integration_test!();
382        let num_topics = 5;
383        let mut topic_pool = test_kafka_topic_pool(
384            get_kafka_endpoints(),
385            num_topics,
386            true,
387            Some("test_allocator_with_kafka"),
388        )
389        .await;
390        topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
391        let topics = topic_pool.topics.clone();
392        // clean up the topics before test
393        let topic_creator = topic_pool.topic_creator();
394        topic_creator.delete_topics(&topics).await.unwrap();
395
396        // Creates an options provider.
397        let provider = WalProvider::Kafka(topic_pool);
398        provider.start().await.unwrap();
399
400        let num_regions = 3;
401        let regions = (0..num_regions).collect::<Vec<_>>();
402        let got = provider.allocate(&regions, false).await.unwrap();
403
404        // Check the allocated wal options contain the expected topics.
405        let expected = (0..num_regions)
406            .map(|i| {
407                let options = WalOptions::Kafka(KafkaWalOptions::new(topics[i as usize].clone()));
408                (i, options)
409            })
410            .collect::<HashMap<_, _>>();
411        assert_eq!(got, expected);
412    }
413
414    #[derive(Debug, PartialEq, Serialize, Deserialize)]
415    struct RegionWalOptionsWrapper {
416        #[serde(with = "region_wal_options_serde")]
417        region_wal_options: RegionWalOptions,
418    }
419
420    #[test]
421    fn test_deserialize_legacy_region_wal_options_from_encoded_map() {
422        let legacy_region_wal_options = HashMap::from([
423            (1, serde_json::to_string(&WalOptions::RaftEngine).unwrap()),
424            (
425                2,
426                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions::new(
427                    "topic_a".to_string(),
428                )))
429                .unwrap(),
430            ),
431        ]);
432        let legacy_json = serde_json::json!({
433            "region_wal_options": legacy_region_wal_options,
434        });
435
436        assert_eq!(
437            legacy_json.to_string(),
438            r#"{"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}","2":"{\"wal.provider\":\"kafka\",\"wal.kafka.topic\":\"topic_a\"}"}}"#
439        );
440
441        let decoded: RegionWalOptionsWrapper = serde_json::from_value(legacy_json).unwrap();
442
443        assert_eq!(
444            decoded.region_wal_options,
445            HashMap::from([
446                (1, WalOptions::RaftEngine),
447                (
448                    2,
449                    WalOptions::Kafka(KafkaWalOptions::new("topic_a".to_string())),
450                ),
451            ])
452        );
453    }
454
455    #[test]
456    fn test_deserialize_structured_region_wal_options() {
457        let json = r#"{
458            "region_wal_options": {
459                "1": {"wal.provider":"raft_engine"},
460                "2": {"wal.provider":"noop"}
461            }
462        }"#;
463
464        let decoded: RegionWalOptionsWrapper = serde_json::from_str(json).unwrap();
465
466        assert_eq!(
467            decoded.region_wal_options,
468            HashMap::from([(1, WalOptions::RaftEngine), (2, WalOptions::Noop)])
469        );
470    }
471
472    #[test]
473    fn test_serialize_structured_region_wal_options() {
474        let wrapper = RegionWalOptionsWrapper {
475            region_wal_options: HashMap::from([(1, WalOptions::RaftEngine)]),
476        };
477
478        let encoded = serde_json::to_string(&wrapper).unwrap();
479
480        assert_eq!(
481            encoded,
482            r#"{"region_wal_options":{"1":{"wal.provider":"raft_engine"}}}"#
483        );
484    }
485
486    #[tokio::test]
487    async fn test_provider_with_skip_wal() {
488        let provider = WalProvider::RaftEngine;
489        provider.start().await.unwrap();
490
491        let num_regions = 32;
492        let regions = (0..num_regions).collect::<Vec<_>>();
493        let got = provider.allocate(&regions, true).await.unwrap();
494        assert_eq!(got.len(), num_regions as usize);
495        for wal_options in got.values() {
496            assert_eq!(wal_options, &WalOptions::Noop);
497        }
498    }
499}