use async_trait::async_trait;
use client::Output;
use common_base::AffectedRows;
use common_error::ext::BoxedError;
use common_function::handlers::TableMutationHandler;
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use session::context::QueryContextRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest,
InsertRequest as TableInsertRequest,
};
use crate::delete::DeleterRef;
use crate::insert::InserterRef;
use crate::request::RequesterRef;
pub struct TableMutationOperator {
inserter: InserterRef,
deleter: DeleterRef,
requester: RequesterRef,
}
impl TableMutationOperator {
pub fn new(inserter: InserterRef, deleter: DeleterRef, requester: RequesterRef) -> Self {
Self {
inserter,
deleter,
requester,
}
}
}
#[async_trait]
impl TableMutationHandler for TableMutationOperator {
async fn insert(
&self,
request: TableInsertRequest,
ctx: QueryContextRef,
) -> QueryResult<Output> {
self.inserter
.handle_table_insert(request, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn delete(
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.deleter
.handle_table_delete(request, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn flush(
&self,
request: FlushTableRequest,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.requester
.handle_table_flush(request, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn compact(
&self,
request: CompactTableRequest,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.requester
.handle_table_compaction(request, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn flush_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.requester
.handle_region_flush(region_id, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> QueryResult<AffectedRows> {
self.requester
.handle_region_compaction(region_id, ctx)
.await
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
}