meta_srv/selector/
round_robin.rs1use std::sync::atomic::AtomicUsize;
16
17use common_meta::peer::Peer;
18use snafu::{ResultExt, ensure};
19
20use crate::error::{
21 ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
22};
23use crate::metasrv::{SelectTarget, SelectorContext};
24use crate::selector::{Selector, SelectorOptions};
25
26pub struct RoundRobinSelector {
34 select_target: SelectTarget,
35 counter: AtomicUsize,
36}
37
38impl Default for RoundRobinSelector {
39 fn default() -> Self {
40 Self {
41 select_target: SelectTarget::Datanode,
42 counter: AtomicUsize::new(0),
43 }
44 }
45}
46
47impl RoundRobinSelector {
48 pub fn new(select_target: SelectTarget) -> Self {
49 Self {
50 select_target,
51 ..Default::default()
52 }
53 }
54
55 async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
56 let mut peers = match self.select_target {
57 SelectTarget::Datanode => {
58 let alive_datanodes = ctx
60 .peer_discovery
61 .active_datanodes(opts.workload_filter)
62 .await
63 .context(ListActiveDatanodesSnafu)?;
64
65 alive_datanodes
67 .into_iter()
68 .map(|node| node.peer)
69 .filter(|p| !opts.exclude_peer_ids.contains(&p.id))
70 .collect::<Vec<_>>()
71 }
72 SelectTarget::Flownode => ctx
73 .peer_discovery
74 .active_flownodes(opts.workload_filter)
75 .await
76 .context(ListActiveFlownodesSnafu)?
77 .into_iter()
78 .map(|node| node.peer)
79 .collect::<Vec<_>>(),
80 };
81
82 ensure!(
83 !peers.is_empty(),
84 NoEnoughAvailableNodeSnafu {
85 required: opts.min_required_items,
86 available: peers.len(),
87 select_target: self.select_target
88 }
89 );
90
91 peers.sort_by_key(|p| p.id);
93
94 Ok(peers)
95 }
96}
97
98#[async_trait::async_trait]
99impl Selector for RoundRobinSelector {
100 type Context = SelectorContext;
101 type Output = Vec<Peer>;
102
103 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
104 let peers = self.get_peers(&opts, ctx).await?;
105 let mut selected = Vec::with_capacity(opts.min_required_items);
107 for _ in 0..opts.min_required_items {
108 let idx = self
109 .counter
110 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
111 % peers.len();
112 selected.push(peers[idx].clone());
113 }
114
115 Ok(selected)
116 }
117}
118
119#[cfg(test)]
120mod test {
121 use std::collections::HashSet;
122
123 use super::*;
124 use crate::test_util::{create_meta_peer_client, put_datanodes};
125
126 #[tokio::test]
127 async fn test_round_robin_selector() {
128 let selector = RoundRobinSelector::default();
129 let meta_peer_client = create_meta_peer_client();
130 let ctx = SelectorContext {
131 peer_discovery: meta_peer_client.clone(),
132 };
133 let peer1 = Peer {
135 id: 2,
136 addr: "node1".to_string(),
137 };
138 let peer2 = Peer {
139 id: 5,
140 addr: "node2".to_string(),
141 };
142 let peer3 = Peer {
143 id: 8,
144 addr: "node3".to_string(),
145 };
146 let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
147 put_datanodes(&meta_peer_client, peers).await;
148
149 let peers = selector
150 .select(
151 &ctx,
152 SelectorOptions {
153 min_required_items: 4,
154 allow_duplication: true,
155 exclude_peer_ids: HashSet::new(),
156 workload_filter: None,
157 extensions: Default::default(),
158 },
159 )
160 .await
161 .unwrap();
162 assert_eq!(peers.len(), 4);
163 assert_eq!(
164 peers,
165 vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
166 );
167
168 let peers = selector
169 .select(
170 &ctx,
171 SelectorOptions {
172 min_required_items: 2,
173 allow_duplication: true,
174 exclude_peer_ids: HashSet::new(),
175 workload_filter: None,
176 extensions: Default::default(),
177 },
178 )
179 .await
180 .unwrap();
181 assert_eq!(peers.len(), 2);
182 assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
183 }
184
185 #[tokio::test]
186 async fn test_round_robin_selector_with_exclude_peer_ids() {
187 let selector = RoundRobinSelector::new(SelectTarget::Datanode);
188 let meta_peer_client = create_meta_peer_client();
189 let ctx = SelectorContext {
190 peer_discovery: meta_peer_client.clone(),
191 };
192 let peer1 = Peer {
194 id: 2,
195 addr: "node1".to_string(),
196 };
197 let peer2 = Peer {
198 id: 5,
199 addr: "node2".to_string(),
200 };
201 let peer3 = Peer {
202 id: 8,
203 addr: "node3".to_string(),
204 };
205 put_datanodes(
206 &meta_peer_client,
207 vec![peer1.clone(), peer2.clone(), peer3.clone()],
208 )
209 .await;
210
211 let peers = selector
212 .select(
213 &ctx,
214 SelectorOptions {
215 min_required_items: 1,
216 allow_duplication: true,
217 exclude_peer_ids: HashSet::from([2, 5]),
218 workload_filter: None,
219 extensions: Default::default(),
220 },
221 )
222 .await
223 .unwrap();
224 assert_eq!(peers.len(), 1);
225 assert_eq!(peers, vec![peer3.clone()]);
226 }
227}