operator/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{error, info};
25use futures_util::future;
26use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
27use session::context::QueryContextRef;
28use snafu::prelude::*;
29use store_api::storage::RegionId;
30use table::requests::{CompactTableRequest, FlushTableRequest};
31
32use crate::error::{
33    CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
34    RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
35};
36use crate::region_req_factory::RegionRequestFactory;
37
38/// Region requester which processes flush, compact requests etc.
39pub struct Requester {
40    catalog_manager: CatalogManagerRef,
41    partition_manager: PartitionRuleManagerRef,
42    node_manager: NodeManagerRef,
43}
44
45pub type RequesterRef = Arc<Requester>;
46
47impl Requester {
48    pub fn new(
49        catalog_manager: CatalogManagerRef,
50        partition_manager: PartitionRuleManagerRef,
51        node_manager: NodeManagerRef,
52    ) -> Self {
53        Self {
54            catalog_manager,
55            partition_manager,
56            node_manager,
57        }
58    }
59
60    /// Handle the request to flush table.
61    pub async fn handle_table_flush(
62        &self,
63        request: FlushTableRequest,
64        ctx: QueryContextRef,
65    ) -> Result<AffectedRows> {
66        let partitions = self
67            .get_table_partitions(
68                &request.catalog_name,
69                &request.schema_name,
70                &request.table_name,
71            )
72            .await?;
73
74        let requests = partitions
75            .into_iter()
76            .map(|partition| {
77                RegionRequestBody::Flush(FlushRequest {
78                    region_id: partition.id.into(),
79                })
80            })
81            .collect();
82
83        info!("Handle table manual flush request: {:?}", request);
84
85        self.do_request(
86            requests,
87            Some(build_db_string(&request.catalog_name, &request.schema_name)),
88            &ctx,
89        )
90        .await
91    }
92
93    /// Handle the request to compact table.
94    pub async fn handle_table_compaction(
95        &self,
96        request: CompactTableRequest,
97        ctx: QueryContextRef,
98    ) -> Result<AffectedRows> {
99        let partitions = self
100            .get_table_partitions(
101                &request.catalog_name,
102                &request.schema_name,
103                &request.table_name,
104            )
105            .await?;
106
107        let requests = partitions
108            .into_iter()
109            .map(|partition| {
110                RegionRequestBody::Compact(CompactRequest {
111                    region_id: partition.id.into(),
112                    options: Some(request.compact_options),
113                })
114            })
115            .collect();
116
117        info!("Handle table manual compaction request: {:?}", request);
118
119        self.do_request(
120            requests,
121            Some(build_db_string(&request.catalog_name, &request.schema_name)),
122            &ctx,
123        )
124        .await
125    }
126
127    /// Handle the request to flush the region.
128    pub async fn handle_region_flush(
129        &self,
130        region_id: RegionId,
131        ctx: QueryContextRef,
132    ) -> Result<AffectedRows> {
133        let request = RegionRequestBody::Flush(FlushRequest {
134            region_id: region_id.into(),
135        });
136
137        info!("Handle region manual flush request: {region_id}");
138        self.do_request(vec![request], None, &ctx).await
139    }
140
141    /// Handle the request to compact the region.
142    pub async fn handle_region_compaction(
143        &self,
144        region_id: RegionId,
145        ctx: QueryContextRef,
146    ) -> Result<AffectedRows> {
147        let request = RegionRequestBody::Compact(CompactRequest {
148            region_id: region_id.into(),
149            options: None, // todo(hl): maybe also support parameters in region compaction.
150        });
151
152        info!("Handle region manual compaction request: {region_id}");
153        self.do_request(vec![request], None, &ctx).await
154    }
155}
156
157impl Requester {
158    async fn do_request(
159        &self,
160        requests: Vec<RegionRequestBody>,
161        db_string: Option<String>,
162        ctx: &QueryContextRef,
163    ) -> Result<AffectedRows> {
164        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
165            tracing_context: TracingContext::from_current_span().to_w3c(),
166            dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
167            ..Default::default()
168        });
169
170        let tasks = requests.into_iter().map(|req_body| {
171            let request = request_factory.build_request(req_body.clone());
172            let partition_manager = self.partition_manager.clone();
173            let node_manager = self.node_manager.clone();
174            common_runtime::spawn_global(async move {
175                let peer =
176                    Self::find_region_leader_by_request(partition_manager, &req_body).await?;
177                node_manager
178                    .datanode(&peer)
179                    .await
180                    .handle(request)
181                    .await
182                    .context(RequestRegionSnafu)
183            })
184        });
185        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
186
187        let affected_rows = results
188            .into_iter()
189            .map(|resp| resp.map(|r| r.affected_rows))
190            .sum::<Result<AffectedRows>>()?;
191
192        Ok(affected_rows)
193    }
194
195    async fn find_region_leader_by_request(
196        partition_manager: PartitionRuleManagerRef,
197        req: &RegionRequestBody,
198    ) -> Result<Peer> {
199        let region_id = match req {
200            RegionRequestBody::Flush(req) => req.region_id,
201            RegionRequestBody::Compact(req) => req.region_id,
202            _ => {
203                error!("Unsupported region request: {:?}", req);
204                return UnsupportedRegionRequestSnafu {}.fail();
205            }
206        };
207
208        partition_manager
209            .find_region_leader(region_id.into())
210            .await
211            .context(FindRegionLeaderSnafu)
212    }
213
214    async fn get_table_partitions(
215        &self,
216        catalog: &str,
217        schema: &str,
218        table_name: &str,
219    ) -> Result<Vec<PartitionInfo>> {
220        let table = self
221            .catalog_manager
222            .table(catalog, schema, table_name, None)
223            .await
224            .context(CatalogSnafu)?;
225
226        let table = table.with_context(|| TableNotFoundSnafu {
227            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
228        })?;
229        let table_info = table.table_info();
230
231        self.partition_manager
232            .find_table_partitions(table_info.ident.table_id)
233            .await
234            .with_context(|_| FindTablePartitionRuleSnafu {
235                table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
236            })
237    }
238}