meta_srv/
peer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::error::{ExternalSnafu, Result as MetaResult};
20use common_meta::peer::{Peer, PeerAllocator};
21use snafu::{ResultExt, ensure};
22
23use crate::discovery::utils::accept_ingest_workload;
24use crate::error::{Result, TooManyPartitionsSnafu};
25use crate::metasrv::{SelectorContext, SelectorRef};
26use crate::selector::SelectorOptions;
27
28pub struct MetasrvPeerAllocator {
29    ctx: SelectorContext,
30    selector: SelectorRef,
31    max_items: Option<u32>,
32}
33
34impl MetasrvPeerAllocator {
35    /// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
36    pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37        Self {
38            ctx,
39            selector,
40            max_items: None,
41        }
42    }
43
44    pub fn with_max_items(self, max_items: u32) -> Self {
45        Self {
46            ctx: self.ctx,
47            selector: self.selector,
48            max_items: Some(max_items),
49        }
50    }
51
52    /// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of
53    /// regions. The returned peers will have the same length as the number of regions.
54    ///
55    /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
56    /// no guarantee that how the returned peers are used, like whether they are from the same
57    /// table or not. So this method isn't idempotent.
58    async fn alloc(&self, min_required_items: usize) -> Result<Vec<Peer>> {
59        if let Some(max_items) = self.max_items {
60            ensure!(
61                min_required_items <= max_items as usize,
62                TooManyPartitionsSnafu
63            );
64        }
65
66        self.selector
67            .select(
68                &self.ctx,
69                SelectorOptions {
70                    min_required_items,
71                    allow_duplication: true,
72                    exclude_peer_ids: HashSet::new(),
73                    workload_filter: Some(accept_ingest_workload),
74                },
75            )
76            .await
77    }
78}
79
80#[async_trait]
81impl PeerAllocator for MetasrvPeerAllocator {
82    async fn alloc(&self, regions: usize) -> MetaResult<Vec<Peer>> {
83        self.alloc(regions)
84            .await
85            .map_err(BoxedError::new)
86            .context(ExternalSnafu)
87    }
88}