meta_srv/selector/
round_robin.rs1use std::collections::HashSet;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_meta::peer::Peer;
21use snafu::ensure;
22
23use crate::error::{NoEnoughAvailableNodeSnafu, Result};
24use crate::lease;
25use crate::metasrv::{SelectTarget, SelectorContext};
26use crate::node_excluder::NodeExcluderRef;
27use crate::selector::{Selector, SelectorOptions};
28
29pub struct RoundRobinSelector {
37 select_target: SelectTarget,
38 counter: AtomicUsize,
39 node_excluder: NodeExcluderRef,
40}
41
42impl Default for RoundRobinSelector {
43 fn default() -> Self {
44 Self {
45 select_target: SelectTarget::Datanode,
46 counter: AtomicUsize::new(0),
47 node_excluder: Arc::new(Vec::new()),
48 }
49 }
50}
51
52impl RoundRobinSelector {
53 pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
54 Self {
55 select_target,
56 node_excluder,
57 ..Default::default()
58 }
59 }
60
61 async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
62 let mut peers = match self.select_target {
63 SelectTarget::Datanode => {
64 let lease_kvs = lease::alive_datanodes(
66 &ctx.meta_peer_client,
67 Duration::from_secs(ctx.datanode_lease_secs),
68 )
69 .with_condition(lease::is_datanode_accept_ingest_workload)
70 .await?;
71
72 let mut exclude_peer_ids = self
73 .node_excluder
74 .excluded_datanode_ids()
75 .iter()
76 .cloned()
77 .collect::<HashSet<_>>();
78 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
79 lease_kvs
81 .into_iter()
82 .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
83 .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
84 .collect::<Vec<_>>()
85 }
86 SelectTarget::Flownode => {
87 let lease_kvs = lease::alive_flownodes(
89 &ctx.meta_peer_client,
90 Duration::from_secs(ctx.flownode_lease_secs),
91 )
92 .await?;
93
94 lease_kvs
96 .into_iter()
97 .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
98 .collect::<Vec<_>>()
99 }
100 };
101
102 ensure!(
103 !peers.is_empty(),
104 NoEnoughAvailableNodeSnafu {
105 required: opts.min_required_items,
106 available: peers.len(),
107 select_target: self.select_target
108 }
109 );
110
111 peers.sort_by_key(|p| p.id);
113
114 Ok(peers)
115 }
116}
117
118#[async_trait::async_trait]
119impl Selector for RoundRobinSelector {
120 type Context = SelectorContext;
121 type Output = Vec<Peer>;
122
123 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
124 let peers = self.get_peers(&opts, ctx).await?;
125 let mut selected = Vec::with_capacity(opts.min_required_items);
127 for _ in 0..opts.min_required_items {
128 let idx = self
129 .counter
130 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
131 % peers.len();
132 selected.push(peers[idx].clone());
133 }
134
135 Ok(selected)
136 }
137}
138
139#[cfg(test)]
140mod test {
141 use std::collections::HashSet;
142
143 use super::*;
144 use crate::test_util::{create_selector_context, put_datanodes};
145
146 #[tokio::test]
147 async fn test_round_robin_selector() {
148 let selector = RoundRobinSelector::default();
149 let ctx = create_selector_context();
150 let peer1 = Peer {
152 id: 2,
153 addr: "node1".to_string(),
154 };
155 let peer2 = Peer {
156 id: 5,
157 addr: "node2".to_string(),
158 };
159 let peer3 = Peer {
160 id: 8,
161 addr: "node3".to_string(),
162 };
163 let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
164 put_datanodes(&ctx.meta_peer_client, peers).await;
165
166 let peers = selector
167 .select(
168 &ctx,
169 SelectorOptions {
170 min_required_items: 4,
171 allow_duplication: true,
172 exclude_peer_ids: HashSet::new(),
173 },
174 )
175 .await
176 .unwrap();
177 assert_eq!(peers.len(), 4);
178 assert_eq!(
179 peers,
180 vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
181 );
182
183 let peers = selector
184 .select(
185 &ctx,
186 SelectorOptions {
187 min_required_items: 2,
188 allow_duplication: true,
189 exclude_peer_ids: HashSet::new(),
190 },
191 )
192 .await
193 .unwrap();
194 assert_eq!(peers.len(), 2);
195 assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
196 }
197
198 #[tokio::test]
199 async fn test_round_robin_selector_with_exclude_peer_ids() {
200 let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
201 let ctx = create_selector_context();
202 let peer1 = Peer {
204 id: 2,
205 addr: "node1".to_string(),
206 };
207 let peer2 = Peer {
208 id: 5,
209 addr: "node2".to_string(),
210 };
211 let peer3 = Peer {
212 id: 8,
213 addr: "node3".to_string(),
214 };
215 put_datanodes(
216 &ctx.meta_peer_client,
217 vec![peer1.clone(), peer2.clone(), peer3.clone()],
218 )
219 .await;
220
221 let peers = selector
222 .select(
223 &ctx,
224 SelectorOptions {
225 min_required_items: 1,
226 allow_duplication: true,
227 exclude_peer_ids: HashSet::from([2]),
228 },
229 )
230 .await
231 .unwrap();
232 assert_eq!(peers.len(), 1);
233 assert_eq!(peers, vec![peer3.clone()]);
234 }
235}