meta_srv/
flow_meta_alloc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use common_error::ext::BoxedError;
18use common_meta::ddl::flow_meta::PartitionPeerAllocator;
19use common_meta::peer::Peer;
20use snafu::ResultExt;
21
22use crate::metasrv::{SelectorContext, SelectorRef};
23use crate::selector::SelectorOptions;
24
25pub struct FlowPeerAllocator {
26    ctx: SelectorContext,
27    selector: SelectorRef,
28}
29
30impl FlowPeerAllocator {
31    pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
32        Self { ctx, selector }
33    }
34}
35
36#[async_trait::async_trait]
37impl PartitionPeerAllocator for FlowPeerAllocator {
38    async fn alloc(&self, partitions: usize) -> common_meta::error::Result<Vec<Peer>> {
39        self.selector
40            .select(
41                &self.ctx,
42                SelectorOptions {
43                    min_required_items: partitions,
44                    allow_duplication: true,
45                    exclude_peer_ids: HashSet::new(),
46                },
47            )
48            .await
49            .map_err(BoxedError::new)
50            .context(common_meta::error::ExternalSnafu)
51    }
52}