meta_srv/selector/
round_robin.rs1use std::sync::atomic::AtomicUsize;
16
17use common_meta::peer::Peer;
18use snafu::{ResultExt, ensure};
19
20use crate::error::{
21 ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
22};
23use crate::metasrv::{SelectTarget, SelectorContext};
24use crate::selector::{Selector, SelectorOptions};
25
26pub struct RoundRobinSelector {
34 select_target: SelectTarget,
35 counter: AtomicUsize,
36}
37
38impl Default for RoundRobinSelector {
39 fn default() -> Self {
40 Self {
41 select_target: SelectTarget::Datanode,
42 counter: AtomicUsize::new(0),
43 }
44 }
45}
46
47impl RoundRobinSelector {
48 pub fn new(select_target: SelectTarget) -> Self {
49 Self {
50 select_target,
51 ..Default::default()
52 }
53 }
54
55 async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
56 let mut peers = match self.select_target {
57 SelectTarget::Datanode => {
58 let alive_datanodes = ctx
60 .peer_discovery
61 .active_datanodes(opts.workload_filter)
62 .await
63 .context(ListActiveDatanodesSnafu)?;
64
65 alive_datanodes
67 .into_iter()
68 .filter(|p| !opts.exclude_peer_ids.contains(&p.id))
69 .collect::<Vec<_>>()
70 }
71 SelectTarget::Flownode => ctx
72 .peer_discovery
73 .active_flownodes(opts.workload_filter)
74 .await
75 .context(ListActiveFlownodesSnafu)?,
76 };
77
78 ensure!(
79 !peers.is_empty(),
80 NoEnoughAvailableNodeSnafu {
81 required: opts.min_required_items,
82 available: peers.len(),
83 select_target: self.select_target
84 }
85 );
86
87 peers.sort_by_key(|p| p.id);
89
90 Ok(peers)
91 }
92}
93
94#[async_trait::async_trait]
95impl Selector for RoundRobinSelector {
96 type Context = SelectorContext;
97 type Output = Vec<Peer>;
98
99 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
100 let peers = self.get_peers(&opts, ctx).await?;
101 let mut selected = Vec::with_capacity(opts.min_required_items);
103 for _ in 0..opts.min_required_items {
104 let idx = self
105 .counter
106 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
107 % peers.len();
108 selected.push(peers[idx].clone());
109 }
110
111 Ok(selected)
112 }
113}
114
115#[cfg(test)]
116mod test {
117 use std::collections::HashSet;
118
119 use super::*;
120 use crate::test_util::{create_meta_peer_client, put_datanodes};
121
122 #[tokio::test]
123 async fn test_round_robin_selector() {
124 let selector = RoundRobinSelector::default();
125 let meta_peer_client = create_meta_peer_client();
126 let ctx = SelectorContext {
127 peer_discovery: meta_peer_client.clone(),
128 };
129 let peer1 = Peer {
131 id: 2,
132 addr: "node1".to_string(),
133 };
134 let peer2 = Peer {
135 id: 5,
136 addr: "node2".to_string(),
137 };
138 let peer3 = Peer {
139 id: 8,
140 addr: "node3".to_string(),
141 };
142 let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
143 put_datanodes(&meta_peer_client, peers).await;
144
145 let peers = selector
146 .select(
147 &ctx,
148 SelectorOptions {
149 min_required_items: 4,
150 allow_duplication: true,
151 exclude_peer_ids: HashSet::new(),
152 workload_filter: None,
153 },
154 )
155 .await
156 .unwrap();
157 assert_eq!(peers.len(), 4);
158 assert_eq!(
159 peers,
160 vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
161 );
162
163 let peers = selector
164 .select(
165 &ctx,
166 SelectorOptions {
167 min_required_items: 2,
168 allow_duplication: true,
169 exclude_peer_ids: HashSet::new(),
170 workload_filter: None,
171 },
172 )
173 .await
174 .unwrap();
175 assert_eq!(peers.len(), 2);
176 assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
177 }
178
179 #[tokio::test]
180 async fn test_round_robin_selector_with_exclude_peer_ids() {
181 let selector = RoundRobinSelector::new(SelectTarget::Datanode);
182 let meta_peer_client = create_meta_peer_client();
183 let ctx = SelectorContext {
184 peer_discovery: meta_peer_client.clone(),
185 };
186 let peer1 = Peer {
188 id: 2,
189 addr: "node1".to_string(),
190 };
191 let peer2 = Peer {
192 id: 5,
193 addr: "node2".to_string(),
194 };
195 let peer3 = Peer {
196 id: 8,
197 addr: "node3".to_string(),
198 };
199 put_datanodes(
200 &meta_peer_client,
201 vec![peer1.clone(), peer2.clone(), peer3.clone()],
202 )
203 .await;
204
205 let peers = selector
206 .select(
207 &ctx,
208 SelectorOptions {
209 min_required_items: 1,
210 allow_duplication: true,
211 exclude_peer_ids: HashSet::from([2, 5]),
212 workload_filter: None,
213 },
214 )
215 .await
216 .unwrap();
217 assert_eq!(peers.len(), 1);
218 assert_eq!(peers, vec![peer3.clone()]);
219 }
220}