operator/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error, info};
25use futures_util::future;
26use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
27use session::context::QueryContextRef;
28use snafu::prelude::*;
29use store_api::storage::RegionId;
30use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest};
31
32use crate::error::{
33    CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
34    RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
35};
36use crate::region_req_factory::RegionRequestFactory;
37
38/// Region requester which processes flush, compact requests etc.
39pub struct Requester {
40    catalog_manager: CatalogManagerRef,
41    partition_manager: PartitionRuleManagerRef,
42    node_manager: NodeManagerRef,
43}
44
45pub type RequesterRef = Arc<Requester>;
46
47impl Requester {
48    pub fn new(
49        catalog_manager: CatalogManagerRef,
50        partition_manager: PartitionRuleManagerRef,
51        node_manager: NodeManagerRef,
52    ) -> Self {
53        Self {
54            catalog_manager,
55            partition_manager,
56            node_manager,
57        }
58    }
59
60    /// Handle the request to flush table.
61    pub async fn handle_table_flush(
62        &self,
63        request: FlushTableRequest,
64        ctx: QueryContextRef,
65    ) -> Result<AffectedRows> {
66        let partitions = self
67            .get_table_partitions(
68                &request.catalog_name,
69                &request.schema_name,
70                &request.table_name,
71            )
72            .await?;
73
74        let requests = partitions
75            .into_iter()
76            .map(|partition| {
77                RegionRequestBody::Flush(FlushRequest {
78                    region_id: partition.id.into(),
79                })
80            })
81            .collect();
82
83        info!("Handle table manual flush request: {:?}", request);
84
85        self.do_request(
86            requests,
87            Some(build_db_string(&request.catalog_name, &request.schema_name)),
88            &ctx,
89        )
90        .await
91    }
92
93    /// Handle the request to build index for table.
94    pub async fn handle_table_build_index(
95        &self,
96        request: BuildIndexTableRequest,
97        ctx: QueryContextRef,
98    ) -> Result<AffectedRows> {
99        let partitions = self
100            .get_table_partitions(
101                &request.catalog_name,
102                &request.schema_name,
103                &request.table_name,
104            )
105            .await?;
106
107        let requests = partitions
108            .into_iter()
109            .map(|partition| {
110                RegionRequestBody::BuildIndex(BuildIndexRequest {
111                    region_id: partition.id.into(),
112                })
113            })
114            .collect();
115
116        info!(
117            "Handle table manual build index for table {}",
118            request.table_name
119        );
120        debug!("Request details: {:?}", request);
121
122        self.do_request(
123            requests,
124            Some(build_db_string(&request.catalog_name, &request.schema_name)),
125            &ctx,
126        )
127        .await
128    }
129
130    /// Handle the request to compact table.
131    pub async fn handle_table_compaction(
132        &self,
133        request: CompactTableRequest,
134        ctx: QueryContextRef,
135    ) -> Result<AffectedRows> {
136        let partitions = self
137            .get_table_partitions(
138                &request.catalog_name,
139                &request.schema_name,
140                &request.table_name,
141            )
142            .await?;
143
144        let requests = partitions
145            .into_iter()
146            .map(|partition| {
147                RegionRequestBody::Compact(CompactRequest {
148                    region_id: partition.id.into(),
149                    parallelism: request.parallelism,
150                    options: Some(request.compact_options),
151                })
152            })
153            .collect();
154
155        info!("Handle table manual compaction request: {:?}", request);
156
157        self.do_request(
158            requests,
159            Some(build_db_string(&request.catalog_name, &request.schema_name)),
160            &ctx,
161        )
162        .await
163    }
164
165    /// Handle the request to flush the region.
166    pub async fn handle_region_flush(
167        &self,
168        region_id: RegionId,
169        ctx: QueryContextRef,
170    ) -> Result<AffectedRows> {
171        let request = RegionRequestBody::Flush(FlushRequest {
172            region_id: region_id.into(),
173        });
174
175        info!("Handle region manual flush request: {region_id}");
176        self.do_request(vec![request], None, &ctx).await
177    }
178
179    /// Handle the request to compact the region.
180    pub async fn handle_region_compaction(
181        &self,
182        region_id: RegionId,
183        ctx: QueryContextRef,
184    ) -> Result<AffectedRows> {
185        let request = RegionRequestBody::Compact(CompactRequest {
186            region_id: region_id.into(),
187            parallelism: 1,
188            options: None, // todo(hl): maybe also support parameters in region compaction.
189        });
190
191        info!("Handle region manual compaction request: {region_id}");
192        self.do_request(vec![request], None, &ctx).await
193    }
194}
195
196impl Requester {
197    async fn do_request(
198        &self,
199        requests: Vec<RegionRequestBody>,
200        db_string: Option<String>,
201        ctx: &QueryContextRef,
202    ) -> Result<AffectedRows> {
203        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
204            tracing_context: TracingContext::from_current_span().to_w3c(),
205            dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
206            ..Default::default()
207        });
208
209        let tasks = requests.into_iter().map(|req_body| {
210            let request = request_factory.build_request(req_body.clone());
211            let partition_manager = self.partition_manager.clone();
212            let node_manager = self.node_manager.clone();
213            common_runtime::spawn_global(async move {
214                let peer =
215                    Self::find_region_leader_by_request(partition_manager, &req_body).await?;
216                node_manager
217                    .datanode(&peer)
218                    .await
219                    .handle(request)
220                    .await
221                    .context(RequestRegionSnafu)
222            })
223        });
224        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
225
226        let affected_rows = results
227            .into_iter()
228            .map(|resp| resp.map(|r| r.affected_rows))
229            .sum::<Result<AffectedRows>>()?;
230
231        Ok(affected_rows)
232    }
233
234    async fn find_region_leader_by_request(
235        partition_manager: PartitionRuleManagerRef,
236        req: &RegionRequestBody,
237    ) -> Result<Peer> {
238        let region_id = match req {
239            RegionRequestBody::Flush(req) => req.region_id,
240            RegionRequestBody::Compact(req) => req.region_id,
241            RegionRequestBody::BuildIndex(req) => req.region_id,
242            _ => {
243                error!("Unsupported region request: {:?}", req);
244                return UnsupportedRegionRequestSnafu {}.fail();
245            }
246        };
247
248        partition_manager
249            .find_region_leader(region_id.into())
250            .await
251            .context(FindRegionLeaderSnafu)
252    }
253
254    async fn get_table_partitions(
255        &self,
256        catalog: &str,
257        schema: &str,
258        table_name: &str,
259    ) -> Result<Vec<PartitionInfo>> {
260        let table = self
261            .catalog_manager
262            .table(catalog, schema, table_name, None)
263            .await
264            .context(CatalogSnafu)?;
265
266        let table = table.with_context(|| TableNotFoundSnafu {
267            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
268        })?;
269        let table_info = table.table_info();
270
271        self.partition_manager
272            .find_table_partitions(table_info.ident.table_id)
273            .await
274            .with_context(|_| FindTablePartitionRuleSnafu {
275                table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
276            })
277    }
278}