meta_srv/selector/
round_robin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_meta::peer::Peer;
21use snafu::ensure;
22
23use crate::error::{NoEnoughAvailableNodeSnafu, Result};
24use crate::lease;
25use crate::metasrv::{SelectTarget, SelectorContext};
26use crate::node_excluder::NodeExcluderRef;
27use crate::selector::{Selector, SelectorOptions};
28
29/// Round-robin selector that returns the next peer in the list in sequence.
30/// Datanodes are ordered by their node_id.
31///
32/// This selector is useful when you want to distribute the load evenly across
33/// all datanodes. But **it's not recommended** to use this selector in serious
34/// production environments because it doesn't take into account the load of
35/// each datanode.
36pub struct RoundRobinSelector {
37    select_target: SelectTarget,
38    counter: AtomicUsize,
39    node_excluder: NodeExcluderRef,
40}
41
42impl Default for RoundRobinSelector {
43    fn default() -> Self {
44        Self {
45            select_target: SelectTarget::Datanode,
46            counter: AtomicUsize::new(0),
47            node_excluder: Arc::new(Vec::new()),
48        }
49    }
50}
51
52impl RoundRobinSelector {
53    pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
54        Self {
55            select_target,
56            node_excluder,
57            ..Default::default()
58        }
59    }
60
61    async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
62        let mut peers = match self.select_target {
63            SelectTarget::Datanode => {
64                // 1. get alive datanodes.
65                let lease_kvs = lease::alive_datanodes(
66                    &ctx.meta_peer_client,
67                    Duration::from_secs(ctx.datanode_lease_secs),
68                )
69                .with_condition(lease::is_datanode_accept_ingest_workload)
70                .await?;
71
72                let mut exclude_peer_ids = self
73                    .node_excluder
74                    .excluded_datanode_ids()
75                    .iter()
76                    .cloned()
77                    .collect::<HashSet<_>>();
78                exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
79                // 2. map into peers
80                lease_kvs
81                    .into_iter()
82                    .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
83                    .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
84                    .collect::<Vec<_>>()
85            }
86            SelectTarget::Flownode => {
87                // 1. get alive flownodes.
88                let lease_kvs = lease::alive_flownodes(
89                    &ctx.meta_peer_client,
90                    Duration::from_secs(ctx.flownode_lease_secs),
91                )
92                .await?;
93
94                // 2. map into peers
95                lease_kvs
96                    .into_iter()
97                    .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
98                    .collect::<Vec<_>>()
99            }
100        };
101
102        ensure!(
103            !peers.is_empty(),
104            NoEnoughAvailableNodeSnafu {
105                required: opts.min_required_items,
106                available: peers.len(),
107                select_target: self.select_target
108            }
109        );
110
111        // 3. sort by node id
112        peers.sort_by_key(|p| p.id);
113
114        Ok(peers)
115    }
116}
117
118#[async_trait::async_trait]
119impl Selector for RoundRobinSelector {
120    type Context = SelectorContext;
121    type Output = Vec<Peer>;
122
123    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
124        let peers = self.get_peers(&opts, ctx).await?;
125        // choose peers
126        let mut selected = Vec::with_capacity(opts.min_required_items);
127        for _ in 0..opts.min_required_items {
128            let idx = self
129                .counter
130                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
131                % peers.len();
132            selected.push(peers[idx].clone());
133        }
134
135        Ok(selected)
136    }
137}
138
139#[cfg(test)]
140mod test {
141    use std::collections::HashSet;
142
143    use super::*;
144    use crate::test_util::{create_selector_context, put_datanodes};
145
146    #[tokio::test]
147    async fn test_round_robin_selector() {
148        let selector = RoundRobinSelector::default();
149        let ctx = create_selector_context();
150        // add three nodes
151        let peer1 = Peer {
152            id: 2,
153            addr: "node1".to_string(),
154        };
155        let peer2 = Peer {
156            id: 5,
157            addr: "node2".to_string(),
158        };
159        let peer3 = Peer {
160            id: 8,
161            addr: "node3".to_string(),
162        };
163        let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
164        put_datanodes(&ctx.meta_peer_client, peers).await;
165
166        let peers = selector
167            .select(
168                &ctx,
169                SelectorOptions {
170                    min_required_items: 4,
171                    allow_duplication: true,
172                    exclude_peer_ids: HashSet::new(),
173                },
174            )
175            .await
176            .unwrap();
177        assert_eq!(peers.len(), 4);
178        assert_eq!(
179            peers,
180            vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
181        );
182
183        let peers = selector
184            .select(
185                &ctx,
186                SelectorOptions {
187                    min_required_items: 2,
188                    allow_duplication: true,
189                    exclude_peer_ids: HashSet::new(),
190                },
191            )
192            .await
193            .unwrap();
194        assert_eq!(peers.len(), 2);
195        assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
196    }
197
198    #[tokio::test]
199    async fn test_round_robin_selector_with_exclude_peer_ids() {
200        let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
201        let ctx = create_selector_context();
202        // add three nodes
203        let peer1 = Peer {
204            id: 2,
205            addr: "node1".to_string(),
206        };
207        let peer2 = Peer {
208            id: 5,
209            addr: "node2".to_string(),
210        };
211        let peer3 = Peer {
212            id: 8,
213            addr: "node3".to_string(),
214        };
215        put_datanodes(
216            &ctx.meta_peer_client,
217            vec![peer1.clone(), peer2.clone(), peer3.clone()],
218        )
219        .await;
220
221        let peers = selector
222            .select(
223                &ctx,
224                SelectorOptions {
225                    min_required_items: 1,
226                    allow_duplication: true,
227                    exclude_peer_ids: HashSet::from([2]),
228                },
229            )
230            .await
231            .unwrap();
232        assert_eq!(peers.len(), 1);
233        assert_eq!(peers, vec![peer3.clone()]);
234    }
235}