common_meta/ddl/
flow_meta.rs1use std::sync::Arc;
16
17use tonic::async_trait;
18
19use crate::error::Result;
20use crate::key::FlowId;
21use crate::peer::Peer;
22use crate::sequence::SequenceRef;
23
24pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
26
27#[derive(Clone)]
31pub struct FlowMetadataAllocator {
32 flow_id_sequence: SequenceRef,
33 partition_peer_allocator: PartitionPeerAllocatorRef,
34}
35
36impl FlowMetadataAllocator {
37 pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
39 Self {
40 flow_id_sequence,
41 partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
42 }
43 }
44
45 pub fn with_peer_allocator(
46 flow_id_sequence: SequenceRef,
47 peer_allocator: Arc<dyn PartitionPeerAllocator>,
48 ) -> Self {
49 Self {
50 flow_id_sequence,
51 partition_peer_allocator: peer_allocator,
52 }
53 }
54
55 pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
57 let flow_id = self.flow_id_sequence.next().await? as FlowId;
58 Ok(flow_id)
59 }
60
61 pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
63 let flow_id = self.allocate_flow_id().await?;
64 let peers = self.partition_peer_allocator.alloc(partitions).await?;
65
66 Ok((flow_id, peers))
67 }
68}
69
70#[async_trait]
72pub trait PartitionPeerAllocator: Send + Sync {
73 async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
75}
76
77pub type PartitionPeerAllocatorRef = Arc<dyn PartitionPeerAllocator>;
79
80struct NoopPartitionPeerAllocator;
81
82#[async_trait]
83impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
84 async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
85 Ok(vec![Peer::default(); partitions])
86 }
87}