common_meta/wal_options_allocator/
selector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17
18use rand::Rng;
19use snafu::ensure;
20
21use crate::error::{EmptyTopicPoolSnafu, Result};
22
23/// Controls topic selection.
24pub(crate) trait TopicSelector: Send + Sync {
25    /// Selects a topic from the topic pool.
26    fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
27}
28
29/// Arc wrapper of TopicSelector.
30pub(crate) type TopicSelectorRef = Arc<dyn TopicSelector>;
31
32/// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner.
33#[derive(Default)]
34pub(crate) struct RoundRobinTopicSelector {
35    cursor: AtomicUsize,
36}
37
38impl RoundRobinTopicSelector {
39    // The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
40    // Introducing a shuffling strategy may help mitigate this issue.
41    pub fn with_shuffle() -> Self {
42        let offset = rand::rng().random_range(0..64);
43        Self {
44            cursor: AtomicUsize::new(offset),
45        }
46    }
47}
48
49impl TopicSelector for RoundRobinTopicSelector {
50    fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
51        ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
52        let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
53        Ok(&topic_pool[which])
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60
61    /// Tests that a selector behaves as expected when the given topic pool is empty.
62    #[test]
63    fn test_empty_topic_pool() {
64        let topic_pool = vec![];
65        let selector = RoundRobinTopicSelector::default();
66        assert!(selector.select(&topic_pool).is_err());
67    }
68
69    #[test]
70    fn test_round_robin_topic_selector() {
71        let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
72        let selector = RoundRobinTopicSelector::default();
73
74        assert_eq!(selector.select(&topic_pool).unwrap(), "0");
75        assert_eq!(selector.select(&topic_pool).unwrap(), "1");
76        assert_eq!(selector.select(&topic_pool).unwrap(), "2");
77        assert_eq!(selector.select(&topic_pool).unwrap(), "0");
78
79        // Creates a round-robin selector with shuffle.
80        let selector = RoundRobinTopicSelector::with_shuffle();
81        let topic = selector.select(&topic_pool).unwrap();
82        assert!(topic_pool.contains(topic));
83    }
84}