operator/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use client::Output;
17use common_base::AffectedRows;
18use common_error::ext::BoxedError;
19use common_function::handlers::TableMutationHandler;
20use common_query::error as query_error;
21use common_query::error::Result as QueryResult;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use table::requests::{
26    CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest,
27    InsertRequest as TableInsertRequest,
28};
29
30use crate::delete::DeleterRef;
31use crate::insert::InserterRef;
32use crate::request::RequesterRef;
33
34pub struct TableMutationOperator {
35    inserter: InserterRef,
36    deleter: DeleterRef,
37    requester: RequesterRef,
38}
39
40impl TableMutationOperator {
41    pub fn new(inserter: InserterRef, deleter: DeleterRef, requester: RequesterRef) -> Self {
42        Self {
43            inserter,
44            deleter,
45            requester,
46        }
47    }
48}
49
50#[async_trait]
51impl TableMutationHandler for TableMutationOperator {
52    async fn insert(
53        &self,
54        request: TableInsertRequest,
55        ctx: QueryContextRef,
56    ) -> QueryResult<Output> {
57        self.inserter
58            .handle_table_insert(request, ctx)
59            .await
60            .map_err(BoxedError::new)
61            .context(query_error::TableMutationSnafu)
62    }
63
64    async fn delete(
65        &self,
66        request: TableDeleteRequest,
67        ctx: QueryContextRef,
68    ) -> QueryResult<AffectedRows> {
69        self.deleter
70            .handle_table_delete(request, ctx)
71            .await
72            .map_err(BoxedError::new)
73            .context(query_error::TableMutationSnafu)
74    }
75
76    async fn flush(
77        &self,
78        request: FlushTableRequest,
79        ctx: QueryContextRef,
80    ) -> QueryResult<AffectedRows> {
81        self.requester
82            .handle_table_flush(request, ctx)
83            .await
84            .map_err(BoxedError::new)
85            .context(query_error::TableMutationSnafu)
86    }
87
88    async fn compact(
89        &self,
90        request: CompactTableRequest,
91        ctx: QueryContextRef,
92    ) -> QueryResult<AffectedRows> {
93        self.requester
94            .handle_table_compaction(request, ctx)
95            .await
96            .map_err(BoxedError::new)
97            .context(query_error::TableMutationSnafu)
98    }
99
100    async fn flush_region(
101        &self,
102        region_id: RegionId,
103        ctx: QueryContextRef,
104    ) -> QueryResult<AffectedRows> {
105        self.requester
106            .handle_region_flush(region_id, ctx)
107            .await
108            .map_err(BoxedError::new)
109            .context(query_error::TableMutationSnafu)
110    }
111
112    async fn compact_region(
113        &self,
114        region_id: RegionId,
115        ctx: QueryContextRef,
116    ) -> QueryResult<AffectedRows> {
117        self.requester
118            .handle_region_compaction(region_id, ctx)
119            .await
120            .map_err(BoxedError::new)
121            .context(query_error::TableMutationSnafu)
122    }
123}