meta_srv/
table_meta_alloc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::ddl::table_meta::PeerAllocator;
20use common_meta::error::{ExternalSnafu, Result as MetaResult};
21use common_meta::peer::Peer;
22use snafu::{ensure, ResultExt};
23use store_api::storage::MAX_REGION_SEQ;
24
25use crate::error::{self, Result, TooManyPartitionsSnafu};
26use crate::metasrv::{SelectTarget, SelectorContext, SelectorRef};
27use crate::selector::SelectorOptions;
28
29pub struct MetasrvPeerAllocator {
30    ctx: SelectorContext,
31    selector: SelectorRef,
32}
33
34impl MetasrvPeerAllocator {
35    /// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
36    pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
37        Self { ctx, selector }
38    }
39
40    /// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of
41    /// regions. The returned peers will have the same length as the number of regions.
42    ///
43    /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
44    /// no guarantee that how the returned peers are used, like whether they are from the same
45    /// table or not. So this method isn't idempotent.
46    async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
47        ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
48
49        let mut peers = self
50            .selector
51            .select(
52                &self.ctx,
53                SelectorOptions {
54                    min_required_items: regions,
55                    allow_duplication: true,
56                    exclude_peer_ids: HashSet::new(),
57                },
58            )
59            .await?;
60
61        ensure!(
62            peers.len() >= regions,
63            error::NoEnoughAvailableNodeSnafu {
64                required: regions,
65                available: peers.len(),
66                select_target: SelectTarget::Datanode
67            }
68        );
69
70        peers.truncate(regions);
71
72        Ok(peers)
73    }
74}
75
76#[async_trait]
77impl PeerAllocator for MetasrvPeerAllocator {
78    async fn alloc(&self, regions: usize) -> MetaResult<Vec<Peer>> {
79        self.alloc(regions)
80            .await
81            .map_err(BoxedError::new)
82            .context(ExternalSnafu)
83    }
84}