common_meta/wal_options_allocator/
selector.rs1use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17
18use rand::Rng;
19use snafu::ensure;
20
21use crate::error::{EmptyTopicPoolSnafu, Result};
22
23pub(crate) trait TopicSelector: Send + Sync {
25 fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
27}
28
29pub(crate) type TopicSelectorRef = Arc<dyn TopicSelector>;
31
32#[derive(Default)]
34pub(crate) struct RoundRobinTopicSelector {
35 cursor: AtomicUsize,
36}
37
38impl RoundRobinTopicSelector {
39 pub fn with_shuffle() -> Self {
42 let offset = rand::rng().random_range(0..64);
43 Self {
44 cursor: AtomicUsize::new(offset),
45 }
46 }
47}
48
49impl TopicSelector for RoundRobinTopicSelector {
50 fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
51 ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
52 let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
53 Ok(&topic_pool[which])
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use super::*;
60
61 #[test]
63 fn test_empty_topic_pool() {
64 let topic_pool = vec![];
65 let selector = RoundRobinTopicSelector::default();
66 assert!(selector.select(&topic_pool).is_err());
67 }
68
69 #[test]
70 fn test_round_robin_topic_selector() {
71 let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
72 let selector = RoundRobinTopicSelector::default();
73
74 assert_eq!(selector.select(&topic_pool).unwrap(), "0");
75 assert_eq!(selector.select(&topic_pool).unwrap(), "1");
76 assert_eq!(selector.select(&topic_pool).unwrap(), "2");
77 assert_eq!(selector.select(&topic_pool).unwrap(), "0");
78
79 let selector = RoundRobinTopicSelector::with_shuffle();
81 let topic = selector.select(&topic_pool).unwrap();
82 assert!(topic_pool.contains(topic));
83 }
84}