meta_srv/
table_meta_alloc.rs1use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::ddl::table_meta::PeerAllocator;
20use common_meta::error::{ExternalSnafu, Result as MetaResult};
21use common_meta::peer::Peer;
22use snafu::{ensure, ResultExt};
23use store_api::storage::MAX_REGION_SEQ;
24
25use crate::error::{self, Result, TooManyPartitionsSnafu};
26use crate::metasrv::{SelectTarget, SelectorContext, SelectorRef};
27use crate::selector::SelectorOptions;
28
29pub struct MetasrvPeerAllocator {
30 ctx: SelectorContext,
31 selector: SelectorRef,
32}
33
34impl MetasrvPeerAllocator {
35 pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37 Self { ctx, selector }
38 }
39
40 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
47 ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
48
49 let mut peers = self
50 .selector
51 .select(
52 &self.ctx,
53 SelectorOptions {
54 min_required_items: regions,
55 allow_duplication: true,
56 exclude_peer_ids: HashSet::new(),
57 },
58 )
59 .await?;
60
61 ensure!(
62 peers.len() >= regions,
63 error::NoEnoughAvailableNodeSnafu {
64 required: regions,
65 available: peers.len(),
66 select_target: SelectTarget::Datanode
67 }
68 );
69
70 peers.truncate(regions);
71
72 Ok(peers)
73 }
74}
75
76#[async_trait]
77impl PeerAllocator for MetasrvPeerAllocator {
78 async fn alloc(&self, regions: usize) -> MetaResult<Vec<Peer>> {
79 self.alloc(regions)
80 .await
81 .map_err(BoxedError::new)
82 .context(ExternalSnafu)
83 }
84}