use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use snafu::ResultExt;
#[derive(Clone)]
pub struct ProcedureServiceOperator {
procedure_executor: ProcedureExecutorRef,
}
impl ProcedureServiceOperator {
pub fn new(procedure_executor: ProcedureExecutorRef) -> Self {
Self { procedure_executor }
}
}
#[async_trait]
impl ProcedureServiceHandler for ProcedureServiceOperator {
async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor
.migrate_region(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)?
.pid
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
self.procedure_executor
.query_procedure_state(&ExecutorContext::default(), pid)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
}