meta_srv/selector/
common.rsuse common_meta::peer::Peer;
use snafu::ensure;
use super::weighted_choose::WeightedChoose;
use crate::error;
use crate::error::Result;
use crate::metasrv::SelectTarget;
use crate::selector::SelectorOptions;
pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
where
W: WeightedChoose<Peer>,
{
let min_required_items = opts.min_required_items;
ensure!(
!weighted_choose.is_empty(),
error::NoEnoughAvailableNodeSnafu {
required: min_required_items,
available: 0_usize,
select_target: SelectTarget::Datanode
}
);
if min_required_items == 1 {
return Ok(vec![weighted_choose.choose_one()?]);
}
let available_count = weighted_choose.len();
if opts.allow_duplication {
let complete_batches = min_required_items / available_count;
let leftover_items = min_required_items % available_count;
if complete_batches == 0 {
return weighted_choose.choose_multiple(leftover_items);
}
let mut result = Vec::with_capacity(min_required_items);
for _ in 0..complete_batches {
result.extend(weighted_choose.choose_multiple(available_count)?);
}
result.extend(weighted_choose.choose_multiple(leftover_items)?);
Ok(result)
} else {
ensure!(
available_count >= min_required_items,
error::NoEnoughAvailableNodeSnafu {
required: min_required_items,
available: available_count,
select_target: SelectTarget::Datanode
}
);
weighted_choose.choose_multiple(min_required_items)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use common_meta::peer::Peer;
use crate::selector::common::choose_items;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::SelectorOptions;
#[test]
fn test_choose_peers() {
let weight_array = vec![
WeightedItem {
item: Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
},
WeightedItem {
item: Peer {
id: 2,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
},
WeightedItem {
item: Peer {
id: 3,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
},
WeightedItem {
item: Peer {
id: 4,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
},
WeightedItem {
item: Peer {
id: 5,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
},
];
for i in 1..=5 {
let opts = SelectorOptions {
min_required_items: i,
allow_duplication: false,
};
let selected_peers: HashSet<_> =
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
.unwrap()
.into_iter()
.collect();
assert_eq!(i, selected_peers.len());
}
let opts = SelectorOptions {
min_required_items: 6,
allow_duplication: false,
};
let selected_result =
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
assert!(selected_result.is_err());
for i in 1..=50 {
let opts = SelectorOptions {
min_required_items: i,
allow_duplication: true,
};
let selected_peers =
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
assert_eq!(i, selected_peers.len());
}
}
}