common_meta/ddl/
flow_meta.rs1use std::sync::Arc;
16
17use crate::error::Result;
18use crate::key::FlowId;
19use crate::peer::{NoopPeerAllocator, Peer, PeerAllocatorRef};
20use crate::sequence::SequenceRef;
21
22pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
24
25#[derive(Clone)]
29pub struct FlowMetadataAllocator {
30 flow_id_sequence: SequenceRef,
31 peer_allocator: PeerAllocatorRef,
32}
33
34impl FlowMetadataAllocator {
35 pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
37 Self {
38 flow_id_sequence,
39 peer_allocator: Arc::new(NoopPeerAllocator),
40 }
41 }
42
43 pub fn with_peer_allocator(
44 flow_id_sequence: SequenceRef,
45 peer_allocator: PeerAllocatorRef,
46 ) -> Self {
47 Self {
48 flow_id_sequence,
49 peer_allocator,
50 }
51 }
52
53 pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
55 let flow_id = self.flow_id_sequence.next().await? as FlowId;
56 Ok(flow_id)
57 }
58
59 pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
61 let flow_id = self.allocate_flow_id().await?;
62 let peers = self.peer_allocator.alloc(partitions).await?;
63
64 Ok((flow_id, peers))
65 }
66}