meta_srv/selector/
round_robin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicUsize;
16
17use common_meta::peer::Peer;
18use snafu::{ResultExt, ensure};
19
20use crate::error::{
21    ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
22};
23use crate::metasrv::{SelectTarget, SelectorContext};
24use crate::selector::{Selector, SelectorOptions};
25
26/// Round-robin selector that returns the next peer in the list in sequence.
27/// Datanodes are ordered by their node_id.
28///
29/// This selector is useful when you want to distribute the load evenly across
30/// all datanodes. But **it's not recommended** to use this selector in serious
31/// production environments because it doesn't take into account the load of
32/// each datanode.
33pub struct RoundRobinSelector {
34    select_target: SelectTarget,
35    counter: AtomicUsize,
36}
37
38impl Default for RoundRobinSelector {
39    fn default() -> Self {
40        Self {
41            select_target: SelectTarget::Datanode,
42            counter: AtomicUsize::new(0),
43        }
44    }
45}
46
47impl RoundRobinSelector {
48    pub fn new(select_target: SelectTarget) -> Self {
49        Self {
50            select_target,
51            ..Default::default()
52        }
53    }
54
55    async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
56        let mut peers = match self.select_target {
57            SelectTarget::Datanode => {
58                // 1. get alive datanodes.
59                let alive_datanodes = ctx
60                    .peer_discovery
61                    .active_datanodes(opts.workload_filter)
62                    .await
63                    .context(ListActiveDatanodesSnafu)?;
64
65                // 2. filter out excluded datanodes.
66                alive_datanodes
67                    .into_iter()
68                    .filter(|p| !opts.exclude_peer_ids.contains(&p.id))
69                    .collect::<Vec<_>>()
70            }
71            SelectTarget::Flownode => ctx
72                .peer_discovery
73                .active_flownodes(opts.workload_filter)
74                .await
75                .context(ListActiveFlownodesSnafu)?,
76        };
77
78        ensure!(
79            !peers.is_empty(),
80            NoEnoughAvailableNodeSnafu {
81                required: opts.min_required_items,
82                available: peers.len(),
83                select_target: self.select_target
84            }
85        );
86
87        // 3. sort by node id
88        peers.sort_by_key(|p| p.id);
89
90        Ok(peers)
91    }
92}
93
94#[async_trait::async_trait]
95impl Selector for RoundRobinSelector {
96    type Context = SelectorContext;
97    type Output = Vec<Peer>;
98
99    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
100        let peers = self.get_peers(&opts, ctx).await?;
101        // choose peers
102        let mut selected = Vec::with_capacity(opts.min_required_items);
103        for _ in 0..opts.min_required_items {
104            let idx = self
105                .counter
106                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
107                % peers.len();
108            selected.push(peers[idx].clone());
109        }
110
111        Ok(selected)
112    }
113}
114
115#[cfg(test)]
116mod test {
117    use std::collections::HashSet;
118
119    use super::*;
120    use crate::test_util::{create_meta_peer_client, put_datanodes};
121
122    #[tokio::test]
123    async fn test_round_robin_selector() {
124        let selector = RoundRobinSelector::default();
125        let meta_peer_client = create_meta_peer_client();
126        let ctx = SelectorContext {
127            peer_discovery: meta_peer_client.clone(),
128        };
129        // add three nodes
130        let peer1 = Peer {
131            id: 2,
132            addr: "node1".to_string(),
133        };
134        let peer2 = Peer {
135            id: 5,
136            addr: "node2".to_string(),
137        };
138        let peer3 = Peer {
139            id: 8,
140            addr: "node3".to_string(),
141        };
142        let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
143        put_datanodes(&meta_peer_client, peers).await;
144
145        let peers = selector
146            .select(
147                &ctx,
148                SelectorOptions {
149                    min_required_items: 4,
150                    allow_duplication: true,
151                    exclude_peer_ids: HashSet::new(),
152                    workload_filter: None,
153                },
154            )
155            .await
156            .unwrap();
157        assert_eq!(peers.len(), 4);
158        assert_eq!(
159            peers,
160            vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
161        );
162
163        let peers = selector
164            .select(
165                &ctx,
166                SelectorOptions {
167                    min_required_items: 2,
168                    allow_duplication: true,
169                    exclude_peer_ids: HashSet::new(),
170                    workload_filter: None,
171                },
172            )
173            .await
174            .unwrap();
175        assert_eq!(peers.len(), 2);
176        assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
177    }
178
179    #[tokio::test]
180    async fn test_round_robin_selector_with_exclude_peer_ids() {
181        let selector = RoundRobinSelector::new(SelectTarget::Datanode);
182        let meta_peer_client = create_meta_peer_client();
183        let ctx = SelectorContext {
184            peer_discovery: meta_peer_client.clone(),
185        };
186        // add three nodes
187        let peer1 = Peer {
188            id: 2,
189            addr: "node1".to_string(),
190        };
191        let peer2 = Peer {
192            id: 5,
193            addr: "node2".to_string(),
194        };
195        let peer3 = Peer {
196            id: 8,
197            addr: "node3".to_string(),
198        };
199        put_datanodes(
200            &meta_peer_client,
201            vec![peer1.clone(), peer2.clone(), peer3.clone()],
202        )
203        .await;
204
205        let peers = selector
206            .select(
207                &ctx,
208                SelectorOptions {
209                    min_required_items: 1,
210                    allow_duplication: true,
211                    exclude_peer_ids: HashSet::from([2, 5]),
212                    workload_filter: None,
213                },
214            )
215            .await
216            .unwrap();
217        assert_eq!(peers.len(), 1);
218        assert_eq!(peers, vec![peer3.clone()]);
219    }
220}