common_meta/ddl/
flow_meta.rsuse std::sync::Arc;
use tonic::async_trait;
use crate::error::Result;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;
use crate::ClusterId;
pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
#[derive(Clone)]
pub struct FlowMetadataAllocator {
flow_id_sequence: SequenceRef,
partition_peer_allocator: PartitionPeerAllocatorRef,
}
impl FlowMetadataAllocator {
pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
}
}
pub fn with_peer_allocator(
flow_id_sequence: SequenceRef,
peer_allocator: Arc<dyn PartitionPeerAllocator>,
) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: peer_allocator,
}
}
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}
pub async fn create(
&self,
cluster_id: ClusterId,
partitions: usize,
) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self
.partition_peer_allocator
.alloc(cluster_id, partitions)
.await?;
Ok((flow_id, peers))
}
}
#[async_trait]
pub trait PartitionPeerAllocator: Send + Sync {
async fn alloc(&self, cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>>;
}
pub type PartitionPeerAllocatorRef = Arc<dyn PartitionPeerAllocator>;
struct NoopPartitionPeerAllocator;
#[async_trait]
impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
async fn alloc(&self, _cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); partitions])
}
}