common_meta/ddl/
flow_meta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use tonic::async_trait;
18
19use crate::error::Result;
20use crate::key::FlowId;
21use crate::peer::Peer;
22use crate::sequence::SequenceRef;
23
24/// The reference of [FlowMetadataAllocator].
25pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
26
27/// [FlowMetadataAllocator] provides the ability of:
28/// - [FlowId] Allocation.
29/// - [FlownodeId] Selection.
30#[derive(Clone)]
31pub struct FlowMetadataAllocator {
32    flow_id_sequence: SequenceRef,
33    partition_peer_allocator: PartitionPeerAllocatorRef,
34}
35
36impl FlowMetadataAllocator {
37    /// Returns the [FlowMetadataAllocator] with [NoopPartitionPeerAllocator].
38    pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
39        Self {
40            flow_id_sequence,
41            partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
42        }
43    }
44
45    pub fn with_peer_allocator(
46        flow_id_sequence: SequenceRef,
47        peer_allocator: Arc<dyn PartitionPeerAllocator>,
48    ) -> Self {
49        Self {
50            flow_id_sequence,
51            partition_peer_allocator: peer_allocator,
52        }
53    }
54
55    /// Allocates a the [FlowId].
56    pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
57        let flow_id = self.flow_id_sequence.next().await? as FlowId;
58        Ok(flow_id)
59    }
60
61    /// Allocates the [FlowId] and [Peer]s.
62    pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
63        let flow_id = self.allocate_flow_id().await?;
64        let peers = self.partition_peer_allocator.alloc(partitions).await?;
65
66        Ok((flow_id, peers))
67    }
68}
69
70/// Allocates [Peer]s for partitions.
71#[async_trait]
72pub trait PartitionPeerAllocator: Send + Sync {
73    /// Allocates [Peer] nodes for storing partitions.
74    async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
75}
76
77/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions.
78pub type PartitionPeerAllocatorRef = Arc<dyn PartitionPeerAllocator>;
79
80struct NoopPartitionPeerAllocator;
81
82#[async_trait]
83impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
84    async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
85        Ok(vec![Peer::default(); partitions])
86    }
87}