1use async_trait::async_trait;
16use client::Output;
17use common_base::AffectedRows;
18use common_error::ext::BoxedError;
19use common_function::handlers::TableMutationHandler;
20use common_query::error as query_error;
21use common_query::error::Result as QueryResult;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use table::requests::{
26 CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest,
27 InsertRequest as TableInsertRequest,
28};
29
30use crate::delete::DeleterRef;
31use crate::insert::InserterRef;
32use crate::request::RequesterRef;
33
34pub struct TableMutationOperator {
35 inserter: InserterRef,
36 deleter: DeleterRef,
37 requester: RequesterRef,
38}
39
40impl TableMutationOperator {
41 pub fn new(inserter: InserterRef, deleter: DeleterRef, requester: RequesterRef) -> Self {
42 Self {
43 inserter,
44 deleter,
45 requester,
46 }
47 }
48}
49
50#[async_trait]
51impl TableMutationHandler for TableMutationOperator {
52 async fn insert(
53 &self,
54 request: TableInsertRequest,
55 ctx: QueryContextRef,
56 ) -> QueryResult<Output> {
57 self.inserter
58 .handle_table_insert(request, ctx)
59 .await
60 .map_err(BoxedError::new)
61 .context(query_error::TableMutationSnafu)
62 }
63
64 async fn delete(
65 &self,
66 request: TableDeleteRequest,
67 ctx: QueryContextRef,
68 ) -> QueryResult<AffectedRows> {
69 self.deleter
70 .handle_table_delete(request, ctx)
71 .await
72 .map_err(BoxedError::new)
73 .context(query_error::TableMutationSnafu)
74 }
75
76 async fn flush(
77 &self,
78 request: FlushTableRequest,
79 ctx: QueryContextRef,
80 ) -> QueryResult<AffectedRows> {
81 self.requester
82 .handle_table_flush(request, ctx)
83 .await
84 .map_err(BoxedError::new)
85 .context(query_error::TableMutationSnafu)
86 }
87
88 async fn compact(
89 &self,
90 request: CompactTableRequest,
91 ctx: QueryContextRef,
92 ) -> QueryResult<AffectedRows> {
93 self.requester
94 .handle_table_compaction(request, ctx)
95 .await
96 .map_err(BoxedError::new)
97 .context(query_error::TableMutationSnafu)
98 }
99
100 async fn flush_region(
101 &self,
102 region_id: RegionId,
103 ctx: QueryContextRef,
104 ) -> QueryResult<AffectedRows> {
105 self.requester
106 .handle_region_flush(region_id, ctx)
107 .await
108 .map_err(BoxedError::new)
109 .context(query_error::TableMutationSnafu)
110 }
111
112 async fn compact_region(
113 &self,
114 region_id: RegionId,
115 ctx: QueryContextRef,
116 ) -> QueryResult<AffectedRows> {
117 self.requester
118 .handle_region_compaction(region_id, ctx)
119 .await
120 .map_err(BoxedError::new)
121 .context(query_error::TableMutationSnafu)
122 }
123}