meta_srv/selector/
round_robin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_meta::peer::Peer;
20use snafu::ensure;
21
22use crate::error::{NoEnoughAvailableNodeSnafu, Result};
23use crate::lease;
24use crate::metasrv::{SelectTarget, SelectorContext};
25use crate::node_excluder::NodeExcluderRef;
26use crate::selector::{Selector, SelectorOptions};
27
28/// Round-robin selector that returns the next peer in the list in sequence.
29/// Datanodes are ordered by their node_id.
30///
31/// This selector is useful when you want to distribute the load evenly across
32/// all datanodes. But **it's not recommended** to use this selector in serious
33/// production environments because it doesn't take into account the load of
34/// each datanode.
35pub struct RoundRobinSelector {
36    select_target: SelectTarget,
37    counter: AtomicUsize,
38    node_excluder: NodeExcluderRef,
39}
40
41impl Default for RoundRobinSelector {
42    fn default() -> Self {
43        Self {
44            select_target: SelectTarget::Datanode,
45            counter: AtomicUsize::new(0),
46            node_excluder: Arc::new(Vec::new()),
47        }
48    }
49}
50
51impl RoundRobinSelector {
52    pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
53        Self {
54            select_target,
55            node_excluder,
56            ..Default::default()
57        }
58    }
59
60    async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
61        let mut peers = match self.select_target {
62            SelectTarget::Datanode => {
63                // 1. get alive datanodes.
64                let lease_kvs =
65                    lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
66                        .with_condition(lease::is_datanode_accept_ingest_workload)
67                        .await?;
68
69                let mut exclude_peer_ids = self
70                    .node_excluder
71                    .excluded_datanode_ids()
72                    .iter()
73                    .cloned()
74                    .collect::<HashSet<_>>();
75                exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
76                // 2. map into peers
77                lease_kvs
78                    .into_iter()
79                    .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
80                    .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
81                    .collect::<Vec<_>>()
82            }
83            SelectTarget::Flownode => {
84                // 1. get alive flownodes.
85                let lease_kvs =
86                    lease::alive_flownodes(&ctx.meta_peer_client, ctx.flownode_lease_secs).await?;
87
88                // 2. map into peers
89                lease_kvs
90                    .into_iter()
91                    .map(|(k, v)| Peer::new(k.node_id, v.node_addr))
92                    .collect::<Vec<_>>()
93            }
94        };
95
96        ensure!(
97            !peers.is_empty(),
98            NoEnoughAvailableNodeSnafu {
99                required: opts.min_required_items,
100                available: peers.len(),
101                select_target: self.select_target
102            }
103        );
104
105        // 3. sort by node id
106        peers.sort_by_key(|p| p.id);
107
108        Ok(peers)
109    }
110}
111
112#[async_trait::async_trait]
113impl Selector for RoundRobinSelector {
114    type Context = SelectorContext;
115    type Output = Vec<Peer>;
116
117    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
118        let peers = self.get_peers(&opts, ctx).await?;
119        // choose peers
120        let mut selected = Vec::with_capacity(opts.min_required_items);
121        for _ in 0..opts.min_required_items {
122            let idx = self
123                .counter
124                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
125                % peers.len();
126            selected.push(peers[idx].clone());
127        }
128
129        Ok(selected)
130    }
131}
132
133#[cfg(test)]
134mod test {
135    use std::collections::HashSet;
136
137    use super::*;
138    use crate::test_util::{create_selector_context, put_datanodes};
139
140    #[tokio::test]
141    async fn test_round_robin_selector() {
142        let selector = RoundRobinSelector::default();
143        let ctx = create_selector_context();
144        // add three nodes
145        let peer1 = Peer {
146            id: 2,
147            addr: "node1".to_string(),
148        };
149        let peer2 = Peer {
150            id: 5,
151            addr: "node2".to_string(),
152        };
153        let peer3 = Peer {
154            id: 8,
155            addr: "node3".to_string(),
156        };
157        let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
158        put_datanodes(&ctx.meta_peer_client, peers).await;
159
160        let peers = selector
161            .select(
162                &ctx,
163                SelectorOptions {
164                    min_required_items: 4,
165                    allow_duplication: true,
166                    exclude_peer_ids: HashSet::new(),
167                },
168            )
169            .await
170            .unwrap();
171        assert_eq!(peers.len(), 4);
172        assert_eq!(
173            peers,
174            vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
175        );
176
177        let peers = selector
178            .select(
179                &ctx,
180                SelectorOptions {
181                    min_required_items: 2,
182                    allow_duplication: true,
183                    exclude_peer_ids: HashSet::new(),
184                },
185            )
186            .await
187            .unwrap();
188        assert_eq!(peers.len(), 2);
189        assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
190    }
191
192    #[tokio::test]
193    async fn test_round_robin_selector_with_exclude_peer_ids() {
194        let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
195        let ctx = create_selector_context();
196        // add three nodes
197        let peer1 = Peer {
198            id: 2,
199            addr: "node1".to_string(),
200        };
201        let peer2 = Peer {
202            id: 5,
203            addr: "node2".to_string(),
204        };
205        let peer3 = Peer {
206            id: 8,
207            addr: "node3".to_string(),
208        };
209        put_datanodes(
210            &ctx.meta_peer_client,
211            vec![peer1.clone(), peer2.clone(), peer3.clone()],
212        )
213        .await;
214
215        let peers = selector
216            .select(
217                &ctx,
218                SelectorOptions {
219                    min_required_items: 1,
220                    allow_duplication: true,
221                    exclude_peer_ids: HashSet::from([2]),
222                },
223            )
224            .await
225            .unwrap();
226        assert_eq!(peers.len(), 1);
227        assert_eq!(peers, vec![peer3.clone()]);
228    }
229}