common_meta/wal_options_allocator/kafka/
topic_selector.rsuse std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use rand::Rng;
use snafu::ensure;
use crate::error::{EmptyTopicPoolSnafu, Result};
pub(crate) trait TopicSelector: Send + Sync {
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
}
pub(crate) type TopicSelectorRef = Arc<dyn TopicSelector>;
#[derive(Default)]
pub(crate) struct RoundRobinTopicSelector {
cursor: AtomicUsize,
}
impl RoundRobinTopicSelector {
pub fn with_shuffle() -> Self {
let offset = rand::thread_rng().gen_range(0..64);
Self {
cursor: AtomicUsize::new(offset),
}
}
}
impl TopicSelector for RoundRobinTopicSelector {
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
Ok(&topic_pool[which])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_topic_pool() {
let topic_pool = vec![];
let selector = RoundRobinTopicSelector::default();
assert!(selector.select(&topic_pool).is_err());
}
#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
let selector = RoundRobinTopicSelector::default();
assert_eq!(selector.select(&topic_pool).unwrap(), "0");
assert_eq!(selector.select(&topic_pool).unwrap(), "1");
assert_eq!(selector.select(&topic_pool).unwrap(), "2");
assert_eq!(selector.select(&topic_pool).unwrap(), "0");
let selector = RoundRobinTopicSelector::with_shuffle();
let topic = selector.select(&topic_pool).unwrap();
assert!(topic_pool.contains(topic));
}
}