operator/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error, info};
25use futures_util::future;
26use partition::cache::PhysicalPartitionInfo;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::prelude::*;
30use store_api::storage::RegionId;
31use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest};
32
33use crate::error::{
34    CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
35    RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38
39/// Region requester which processes flush, compact requests etc.
40pub struct Requester {
41    catalog_manager: CatalogManagerRef,
42    partition_manager: PartitionRuleManagerRef,
43    node_manager: NodeManagerRef,
44}
45
46pub type RequesterRef = Arc<Requester>;
47
48impl Requester {
49    pub fn new(
50        catalog_manager: CatalogManagerRef,
51        partition_manager: PartitionRuleManagerRef,
52        node_manager: NodeManagerRef,
53    ) -> Self {
54        Self {
55            catalog_manager,
56            partition_manager,
57            node_manager,
58        }
59    }
60
61    /// Handle the request to flush table.
62    pub async fn handle_table_flush(
63        &self,
64        request: FlushTableRequest,
65        ctx: QueryContextRef,
66    ) -> Result<AffectedRows> {
67        let partitions = &self
68            .get_table_partition_info(
69                &request.catalog_name,
70                &request.schema_name,
71                &request.table_name,
72            )
73            .await?
74            .partitions;
75
76        let requests = partitions
77            .iter()
78            .map(|partition| {
79                RegionRequestBody::Flush(FlushRequest {
80                    region_id: partition.id.into(),
81                })
82            })
83            .collect();
84
85        info!("Handle table manual flush request: {:?}", request);
86
87        self.do_request(
88            requests,
89            Some(build_db_string(&request.catalog_name, &request.schema_name)),
90            &ctx,
91        )
92        .await
93    }
94
95    /// Handle the request to build index for table.
96    pub async fn handle_table_build_index(
97        &self,
98        request: BuildIndexTableRequest,
99        ctx: QueryContextRef,
100    ) -> Result<AffectedRows> {
101        let partitions = &self
102            .get_table_partition_info(
103                &request.catalog_name,
104                &request.schema_name,
105                &request.table_name,
106            )
107            .await?
108            .partitions;
109
110        let requests = partitions
111            .iter()
112            .map(|partition| {
113                RegionRequestBody::BuildIndex(BuildIndexRequest {
114                    region_id: partition.id.into(),
115                })
116            })
117            .collect();
118
119        info!(
120            "Handle table manual build index for table {}",
121            request.table_name
122        );
123        debug!("Request details: {:?}", request);
124
125        self.do_request(
126            requests,
127            Some(build_db_string(&request.catalog_name, &request.schema_name)),
128            &ctx,
129        )
130        .await
131    }
132
133    /// Handle the request to compact table.
134    pub async fn handle_table_compaction(
135        &self,
136        request: CompactTableRequest,
137        ctx: QueryContextRef,
138    ) -> Result<AffectedRows> {
139        let partitions = &self
140            .get_table_partition_info(
141                &request.catalog_name,
142                &request.schema_name,
143                &request.table_name,
144            )
145            .await?
146            .partitions;
147
148        let requests = partitions
149            .iter()
150            .map(|partition| {
151                RegionRequestBody::Compact(CompactRequest {
152                    region_id: partition.id.into(),
153                    parallelism: request.parallelism,
154                    options: Some(request.compact_options),
155                })
156            })
157            .collect();
158
159        info!("Handle table manual compaction request: {:?}", request);
160
161        self.do_request(
162            requests,
163            Some(build_db_string(&request.catalog_name, &request.schema_name)),
164            &ctx,
165        )
166        .await
167    }
168
169    /// Handle the request to flush the region.
170    pub async fn handle_region_flush(
171        &self,
172        region_id: RegionId,
173        ctx: QueryContextRef,
174    ) -> Result<AffectedRows> {
175        let request = RegionRequestBody::Flush(FlushRequest {
176            region_id: region_id.into(),
177        });
178
179        info!("Handle region manual flush request: {region_id}");
180        self.do_request(vec![request], None, &ctx).await
181    }
182
183    /// Handle the request to compact the region.
184    pub async fn handle_region_compaction(
185        &self,
186        region_id: RegionId,
187        ctx: QueryContextRef,
188    ) -> Result<AffectedRows> {
189        let request = RegionRequestBody::Compact(CompactRequest {
190            region_id: region_id.into(),
191            parallelism: 1,
192            options: None, // todo(hl): maybe also support parameters in region compaction.
193        });
194
195        info!("Handle region manual compaction request: {region_id}");
196        self.do_request(vec![request], None, &ctx).await
197    }
198}
199
200impl Requester {
201    async fn do_request(
202        &self,
203        requests: Vec<RegionRequestBody>,
204        db_string: Option<String>,
205        ctx: &QueryContextRef,
206    ) -> Result<AffectedRows> {
207        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
208            tracing_context: TracingContext::from_current_span().to_w3c(),
209            dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
210            ..Default::default()
211        });
212
213        let tasks = requests.into_iter().map(|req_body| {
214            let request = request_factory.build_request(req_body.clone());
215            let partition_manager = self.partition_manager.clone();
216            let node_manager = self.node_manager.clone();
217            common_runtime::spawn_global(async move {
218                let peer =
219                    Self::find_region_leader_by_request(partition_manager, &req_body).await?;
220                node_manager
221                    .datanode(&peer)
222                    .await
223                    .handle(request)
224                    .await
225                    .context(RequestRegionSnafu)
226            })
227        });
228        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
229
230        let affected_rows = results
231            .into_iter()
232            .map(|resp| resp.map(|r| r.affected_rows))
233            .sum::<Result<AffectedRows>>()?;
234
235        Ok(affected_rows)
236    }
237
238    async fn find_region_leader_by_request(
239        partition_manager: PartitionRuleManagerRef,
240        req: &RegionRequestBody,
241    ) -> Result<Peer> {
242        let region_id = match req {
243            RegionRequestBody::Flush(req) => req.region_id,
244            RegionRequestBody::Compact(req) => req.region_id,
245            RegionRequestBody::BuildIndex(req) => req.region_id,
246            _ => {
247                error!("Unsupported region request: {:?}", req);
248                return UnsupportedRegionRequestSnafu {}.fail();
249            }
250        };
251
252        partition_manager
253            .find_region_leader(region_id.into())
254            .await
255            .context(FindRegionLeaderSnafu)
256    }
257
258    async fn get_table_partition_info(
259        &self,
260        catalog: &str,
261        schema: &str,
262        table_name: &str,
263    ) -> Result<Arc<PhysicalPartitionInfo>> {
264        let table = self
265            .catalog_manager
266            .table(catalog, schema, table_name, None)
267            .await
268            .context(CatalogSnafu)?;
269
270        let table = table.with_context(|| TableNotFoundSnafu {
271            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
272        })?;
273        let table_info = table.table_info();
274
275        self.partition_manager
276            .find_physical_partition_info(table_info.ident.table_id)
277            .await
278            .with_context(|_| FindTablePartitionRuleSnafu {
279                table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
280            })
281    }
282}