use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
pub(crate) mod tests;
pub mod truncate_table;
pub mod utils;
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
pub tracing_context: Option<W3cTrace>,
}
#[async_trait::async_trait]
pub trait ProcedureExecutor: Send + Sync {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
async fn migrate_region(
&self,
ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
async fn query_procedure_state(
&self,
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
}
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
pub struct TableMetadataAllocatorContext {
pub cluster_id: ClusterId,
}
#[derive(Default)]
pub struct TableMetadata {
pub table_id: TableId,
pub table_route: PhysicalTableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
}
pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;
pub type DetectingRegion = (ClusterId, DatanodeId, RegionId);
#[async_trait::async_trait]
pub trait RegionFailureDetectorController: Send + Sync {
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
}
#[derive(Debug, Clone)]
pub struct NoopRegionFailureDetectorControl;
#[async_trait::async_trait]
impl RegionFailureDetectorController for NoopRegionFailureDetectorControl {
async fn register_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
async fn deregister_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
}
#[derive(Clone)]
pub struct DdlContext {
pub node_manager: NodeManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub table_metadata_allocator: TableMetadataAllocatorRef,
pub flow_metadata_manager: FlowMetadataManagerRef,
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
}
impl DdlContext {
pub async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
}
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
}
}