1use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::error::{ExternalSnafu, Result as MetaResult};
20use common_meta::peer::{Peer, PeerAllocContext, PeerAllocator};
21use snafu::{ResultExt, ensure};
22
23use crate::discovery::utils::accept_ingest_workload;
24use crate::error::{Result, TooManyPartitionsSnafu};
25use crate::metasrv::{SelectorContext, SelectorRef};
26use crate::selector::SelectorOptions;
27
28pub struct MetasrvPeerAllocator {
29 ctx: SelectorContext,
30 selector: SelectorRef,
31 max_items: Option<u32>,
32}
33
34impl MetasrvPeerAllocator {
35 pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37 Self {
38 ctx,
39 selector,
40 max_items: None,
41 }
42 }
43
44 pub fn with_max_items(self, max_items: u32) -> Self {
45 Self {
46 ctx: self.ctx,
47 selector: self.selector,
48 max_items: Some(max_items),
49 }
50 }
51
52 async fn alloc(
59 &self,
60 min_required_items: usize,
61 alloc_context: &PeerAllocContext,
62 ) -> Result<Vec<Peer>> {
63 if let Some(max_items) = self.max_items {
64 ensure!(
65 min_required_items <= max_items as usize,
66 TooManyPartitionsSnafu
67 );
68 }
69
70 self.selector
71 .select(
72 &self.ctx,
73 SelectorOptions {
74 min_required_items,
75 allow_duplication: true,
76 exclude_peer_ids: HashSet::new(),
77 workload_filter: Some(accept_ingest_workload),
78 extensions: alloc_context.extensions.clone(),
79 },
80 )
81 .await
82 }
83}
84
85#[async_trait]
86impl PeerAllocator for MetasrvPeerAllocator {
87 async fn alloc(&self, regions: usize, ctx: &PeerAllocContext) -> MetaResult<Vec<Peer>> {
88 self.alloc(regions, ctx)
89 .await
90 .map_err(BoxedError::new)
91 .context(ExternalSnafu)
92 }
93}