Skip to main content

meta_srv/selector/
round_robin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicUsize;
16
17use common_meta::peer::Peer;
18use snafu::{ResultExt, ensure};
19
20use crate::error::{
21    ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
22};
23use crate::metasrv::{SelectTarget, SelectorContext};
24use crate::selector::{Selector, SelectorOptions};
25
26/// Round-robin selector that returns the next peer in the list in sequence.
27/// Datanodes are ordered by their node_id.
28///
29/// This selector is useful when you want to distribute the load evenly across
30/// all datanodes. But **it's not recommended** to use this selector in serious
31/// production environments because it doesn't take into account the load of
32/// each datanode.
33pub struct RoundRobinSelector {
34    select_target: SelectTarget,
35    counter: AtomicUsize,
36}
37
38impl Default for RoundRobinSelector {
39    fn default() -> Self {
40        Self {
41            select_target: SelectTarget::Datanode,
42            counter: AtomicUsize::new(0),
43        }
44    }
45}
46
47impl RoundRobinSelector {
48    pub fn new(select_target: SelectTarget) -> Self {
49        Self {
50            select_target,
51            ..Default::default()
52        }
53    }
54
55    async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
56        let mut peers = match self.select_target {
57            SelectTarget::Datanode => {
58                // 1. get alive datanodes.
59                let alive_datanodes = ctx
60                    .peer_discovery
61                    .active_datanodes(opts.workload_filter)
62                    .await
63                    .context(ListActiveDatanodesSnafu)?;
64
65                // 2. filter out excluded datanodes.
66                alive_datanodes
67                    .into_iter()
68                    .map(|node| node.peer)
69                    .filter(|p| !opts.exclude_peer_ids.contains(&p.id))
70                    .collect::<Vec<_>>()
71            }
72            SelectTarget::Flownode => ctx
73                .peer_discovery
74                .active_flownodes(opts.workload_filter)
75                .await
76                .context(ListActiveFlownodesSnafu)?
77                .into_iter()
78                .map(|node| node.peer)
79                .collect::<Vec<_>>(),
80        };
81
82        ensure!(
83            !peers.is_empty(),
84            NoEnoughAvailableNodeSnafu {
85                required: opts.min_required_items,
86                available: peers.len(),
87                select_target: self.select_target
88            }
89        );
90
91        // 3. sort by node id
92        peers.sort_by_key(|p| p.id);
93
94        Ok(peers)
95    }
96}
97
98#[async_trait::async_trait]
99impl Selector for RoundRobinSelector {
100    type Context = SelectorContext;
101    type Output = Vec<Peer>;
102
103    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
104        let peers = self.get_peers(&opts, ctx).await?;
105        // choose peers
106        let mut selected = Vec::with_capacity(opts.min_required_items);
107        for _ in 0..opts.min_required_items {
108            let idx = self
109                .counter
110                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
111                % peers.len();
112            selected.push(peers[idx].clone());
113        }
114
115        Ok(selected)
116    }
117}
118
119#[cfg(test)]
120mod test {
121    use std::collections::HashSet;
122
123    use super::*;
124    use crate::test_util::{create_meta_peer_client, put_datanodes};
125
126    #[tokio::test]
127    async fn test_round_robin_selector() {
128        let selector = RoundRobinSelector::default();
129        let meta_peer_client = create_meta_peer_client();
130        let ctx = SelectorContext {
131            peer_discovery: meta_peer_client.clone(),
132        };
133        // add three nodes
134        let peer1 = Peer {
135            id: 2,
136            addr: "node1".to_string(),
137        };
138        let peer2 = Peer {
139            id: 5,
140            addr: "node2".to_string(),
141        };
142        let peer3 = Peer {
143            id: 8,
144            addr: "node3".to_string(),
145        };
146        let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
147        put_datanodes(&meta_peer_client, peers).await;
148
149        let peers = selector
150            .select(
151                &ctx,
152                SelectorOptions {
153                    min_required_items: 4,
154                    allow_duplication: true,
155                    exclude_peer_ids: HashSet::new(),
156                    workload_filter: None,
157                    extensions: Default::default(),
158                },
159            )
160            .await
161            .unwrap();
162        assert_eq!(peers.len(), 4);
163        assert_eq!(
164            peers,
165            vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
166        );
167
168        let peers = selector
169            .select(
170                &ctx,
171                SelectorOptions {
172                    min_required_items: 2,
173                    allow_duplication: true,
174                    exclude_peer_ids: HashSet::new(),
175                    workload_filter: None,
176                    extensions: Default::default(),
177                },
178            )
179            .await
180            .unwrap();
181        assert_eq!(peers.len(), 2);
182        assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
183    }
184
185    #[tokio::test]
186    async fn test_round_robin_selector_with_exclude_peer_ids() {
187        let selector = RoundRobinSelector::new(SelectTarget::Datanode);
188        let meta_peer_client = create_meta_peer_client();
189        let ctx = SelectorContext {
190            peer_discovery: meta_peer_client.clone(),
191        };
192        // add three nodes
193        let peer1 = Peer {
194            id: 2,
195            addr: "node1".to_string(),
196        };
197        let peer2 = Peer {
198            id: 5,
199            addr: "node2".to_string(),
200        };
201        let peer3 = Peer {
202            id: 8,
203            addr: "node3".to_string(),
204        };
205        put_datanodes(
206            &meta_peer_client,
207            vec![peer1.clone(), peer2.clone(), peer3.clone()],
208        )
209        .await;
210
211        let peers = selector
212            .select(
213                &ctx,
214                SelectorOptions {
215                    min_required_items: 1,
216                    allow_duplication: true,
217                    exclude_peer_ids: HashSet::from([2, 5]),
218                    workload_filter: None,
219                    extensions: Default::default(),
220                },
221            )
222            .await
223            .unwrap();
224        assert_eq!(peers.len(), 1);
225        assert_eq!(peers, vec![peer3.clone()]);
226    }
227}