meta_srv/peer.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::error::{ExternalSnafu, Result as MetaResult};
20use common_meta::peer::{Peer, PeerAllocator};
21use snafu::{ResultExt, ensure};
22
23use crate::discovery::utils::accept_ingest_workload;
24use crate::error::{Result, TooManyPartitionsSnafu};
25use crate::metasrv::{SelectorContext, SelectorRef};
26use crate::selector::SelectorOptions;
27
28pub struct MetasrvPeerAllocator {
29 ctx: SelectorContext,
30 selector: SelectorRef,
31 max_items: Option<u32>,
32}
33
34impl MetasrvPeerAllocator {
35 /// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
36 pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37 Self {
38 ctx,
39 selector,
40 max_items: None,
41 }
42 }
43
44 pub fn with_max_items(self, max_items: u32) -> Self {
45 Self {
46 ctx: self.ctx,
47 selector: self.selector,
48 max_items: Some(max_items),
49 }
50 }
51
52 /// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of
53 /// regions. The returned peers will have the same length as the number of regions.
54 ///
55 /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
56 /// no guarantee that how the returned peers are used, like whether they are from the same
57 /// table or not. So this method isn't idempotent.
58 async fn alloc(&self, min_required_items: usize) -> Result<Vec<Peer>> {
59 if let Some(max_items) = self.max_items {
60 ensure!(
61 min_required_items <= max_items as usize,
62 TooManyPartitionsSnafu
63 );
64 }
65
66 self.selector
67 .select(
68 &self.ctx,
69 SelectorOptions {
70 min_required_items,
71 allow_duplication: true,
72 exclude_peer_ids: HashSet::new(),
73 workload_filter: Some(accept_ingest_workload),
74 },
75 )
76 .await
77 }
78}
79
80#[async_trait]
81impl PeerAllocator for MetasrvPeerAllocator {
82 async fn alloc(&self, regions: usize) -> MetaResult<Vec<Peer>> {
83 self.alloc(regions)
84 .await
85 .map_err(BoxedError::new)
86 .context(ExternalSnafu)
87 }
88}