Skip to main content

meta_srv/
peer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::error::{ExternalSnafu, Result as MetaResult};
20use common_meta::peer::{Peer, PeerAllocContext, PeerAllocator};
21use snafu::{ResultExt, ensure};
22
23use crate::discovery::utils::accept_ingest_workload;
24use crate::error::{Result, TooManyPartitionsSnafu};
25use crate::metasrv::{SelectorContext, SelectorRef};
26use crate::selector::SelectorOptions;
27
28pub struct MetasrvPeerAllocator {
29    ctx: SelectorContext,
30    selector: SelectorRef,
31    max_items: Option<u32>,
32}
33
34impl MetasrvPeerAllocator {
35    /// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
36    pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37        Self {
38            ctx,
39            selector,
40            max_items: None,
41        }
42    }
43
44    pub fn with_max_items(self, max_items: u32) -> Self {
45        Self {
46            ctx: self.ctx,
47            selector: self.selector,
48            max_items: Some(max_items),
49        }
50    }
51
52    /// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of
53    /// regions. The returned peers will have the same length as the number of regions.
54    ///
55    /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
56    /// no guarantee that how the returned peers are used, like whether they are from the same
57    /// table or not. So this method isn't idempotent.
58    async fn alloc(
59        &self,
60        min_required_items: usize,
61        alloc_context: &PeerAllocContext,
62    ) -> Result<Vec<Peer>> {
63        if let Some(max_items) = self.max_items {
64            ensure!(
65                min_required_items <= max_items as usize,
66                TooManyPartitionsSnafu
67            );
68        }
69
70        self.selector
71            .select(
72                &self.ctx,
73                SelectorOptions {
74                    min_required_items,
75                    allow_duplication: true,
76                    exclude_peer_ids: HashSet::new(),
77                    workload_filter: Some(accept_ingest_workload),
78                    extensions: alloc_context.extensions.clone(),
79                },
80            )
81            .await
82    }
83}
84
85#[async_trait]
86impl PeerAllocator for MetasrvPeerAllocator {
87    async fn alloc(&self, regions: usize, ctx: &PeerAllocContext) -> MetaResult<Vec<Peer>> {
88        self.alloc(regions, ctx)
89            .await
90            .map_err(BoxedError::new)
91            .context(ExternalSnafu)
92    }
93}