meta_srv/selector/
round_robin.rs1use std::collections::HashSet;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_meta::peer::Peer;
20use snafu::ensure;
21
22use crate::error::{NoEnoughAvailableNodeSnafu, Result};
23use crate::lease;
24use crate::metasrv::{SelectTarget, SelectorContext};
25use crate::node_excluder::NodeExcluderRef;
26use crate::selector::{Selector, SelectorOptions};
27
28pub struct RoundRobinSelector {
36 select_target: SelectTarget,
37 counter: AtomicUsize,
38 node_excluder: NodeExcluderRef,
39}
40
41impl Default for RoundRobinSelector {
42 fn default() -> Self {
43 Self {
44 select_target: SelectTarget::Datanode,
45 counter: AtomicUsize::new(0),
46 node_excluder: Arc::new(Vec::new()),
47 }
48 }
49}
50
51impl RoundRobinSelector {
52 pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
53 Self {
54 select_target,
55 node_excluder,
56 ..Default::default()
57 }
58 }
59
60 async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
61 let mut peers = match self.select_target {
62 SelectTarget::Datanode => {
63 let lease_kvs =
65 lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
66 .with_condition(lease::is_datanode_accept_ingest_workload)
67 .await?;
68
69 let mut exclude_peer_ids = self
70 .node_excluder
71 .excluded_datanode_ids()
72 .iter()
73 .cloned()
74 .collect::<HashSet<_>>();
75 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
76 lease_kvs
78 .into_iter()
79 .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
80 .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
81 .collect::<Vec<_>>()
82 }
83 SelectTarget::Flownode => {
84 let lease_kvs =
86 lease::alive_flownodes(&ctx.meta_peer_client, ctx.flownode_lease_secs).await?;
87
88 lease_kvs
90 .into_iter()
91 .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
92 .collect::<Vec<_>>()
93 }
94 };
95
96 ensure!(
97 !peers.is_empty(),
98 NoEnoughAvailableNodeSnafu {
99 required: opts.min_required_items,
100 available: peers.len(),
101 select_target: self.select_target
102 }
103 );
104
105 peers.sort_by_key(|p| p.id);
107
108 Ok(peers)
109 }
110}
111
112#[async_trait::async_trait]
113impl Selector for RoundRobinSelector {
114 type Context = SelectorContext;
115 type Output = Vec<Peer>;
116
117 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
118 let peers = self.get_peers(&opts, ctx).await?;
119 let mut selected = Vec::with_capacity(opts.min_required_items);
121 for _ in 0..opts.min_required_items {
122 let idx = self
123 .counter
124 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
125 % peers.len();
126 selected.push(peers[idx].clone());
127 }
128
129 Ok(selected)
130 }
131}
132
133#[cfg(test)]
134mod test {
135 use std::collections::HashSet;
136
137 use super::*;
138 use crate::test_util::{create_selector_context, put_datanodes};
139
140 #[tokio::test]
141 async fn test_round_robin_selector() {
142 let selector = RoundRobinSelector::default();
143 let ctx = create_selector_context();
144 let peer1 = Peer {
146 id: 2,
147 addr: "node1".to_string(),
148 };
149 let peer2 = Peer {
150 id: 5,
151 addr: "node2".to_string(),
152 };
153 let peer3 = Peer {
154 id: 8,
155 addr: "node3".to_string(),
156 };
157 let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
158 put_datanodes(&ctx.meta_peer_client, peers).await;
159
160 let peers = selector
161 .select(
162 &ctx,
163 SelectorOptions {
164 min_required_items: 4,
165 allow_duplication: true,
166 exclude_peer_ids: HashSet::new(),
167 },
168 )
169 .await
170 .unwrap();
171 assert_eq!(peers.len(), 4);
172 assert_eq!(
173 peers,
174 vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
175 );
176
177 let peers = selector
178 .select(
179 &ctx,
180 SelectorOptions {
181 min_required_items: 2,
182 allow_duplication: true,
183 exclude_peer_ids: HashSet::new(),
184 },
185 )
186 .await
187 .unwrap();
188 assert_eq!(peers.len(), 2);
189 assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
190 }
191
192 #[tokio::test]
193 async fn test_round_robin_selector_with_exclude_peer_ids() {
194 let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
195 let ctx = create_selector_context();
196 let peer1 = Peer {
198 id: 2,
199 addr: "node1".to_string(),
200 };
201 let peer2 = Peer {
202 id: 5,
203 addr: "node2".to_string(),
204 };
205 let peer3 = Peer {
206 id: 8,
207 addr: "node3".to_string(),
208 };
209 put_datanodes(
210 &ctx.meta_peer_client,
211 vec![peer1.clone(), peer2.clone(), peer3.clone()],
212 )
213 .await;
214
215 let peers = selector
216 .select(
217 &ctx,
218 SelectorOptions {
219 min_required_items: 1,
220 allow_duplication: true,
221 exclude_peer_ids: HashSet::from([2]),
222 },
223 )
224 .await
225 .unwrap();
226 assert_eq!(peers.len(), 1);
227 assert_eq!(peers, vec![peer3.clone()]);
228 }
229}