1use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error, info};
25use futures_util::future;
26use partition::cache::PhysicalPartitionInfo;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::prelude::*;
30use store_api::storage::RegionId;
31use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest};
32
33use crate::error::{
34 CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
35 RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38
39pub struct Requester {
41 catalog_manager: CatalogManagerRef,
42 partition_manager: PartitionRuleManagerRef,
43 node_manager: NodeManagerRef,
44}
45
46pub type RequesterRef = Arc<Requester>;
47
48impl Requester {
49 pub fn new(
50 catalog_manager: CatalogManagerRef,
51 partition_manager: PartitionRuleManagerRef,
52 node_manager: NodeManagerRef,
53 ) -> Self {
54 Self {
55 catalog_manager,
56 partition_manager,
57 node_manager,
58 }
59 }
60
61 pub async fn handle_table_flush(
63 &self,
64 request: FlushTableRequest,
65 ctx: QueryContextRef,
66 ) -> Result<AffectedRows> {
67 let partitions = &self
68 .get_table_partition_info(
69 &request.catalog_name,
70 &request.schema_name,
71 &request.table_name,
72 )
73 .await?
74 .partitions;
75
76 let requests = partitions
77 .iter()
78 .map(|partition| {
79 RegionRequestBody::Flush(FlushRequest {
80 region_id: partition.id.into(),
81 })
82 })
83 .collect();
84
85 info!("Handle table manual flush request: {:?}", request);
86
87 self.do_request(
88 requests,
89 Some(build_db_string(&request.catalog_name, &request.schema_name)),
90 &ctx,
91 )
92 .await
93 }
94
95 pub async fn handle_table_build_index(
97 &self,
98 request: BuildIndexTableRequest,
99 ctx: QueryContextRef,
100 ) -> Result<AffectedRows> {
101 let partitions = &self
102 .get_table_partition_info(
103 &request.catalog_name,
104 &request.schema_name,
105 &request.table_name,
106 )
107 .await?
108 .partitions;
109
110 let requests = partitions
111 .iter()
112 .map(|partition| {
113 RegionRequestBody::BuildIndex(BuildIndexRequest {
114 region_id: partition.id.into(),
115 })
116 })
117 .collect();
118
119 info!(
120 "Handle table manual build index for table {}",
121 request.table_name
122 );
123 debug!("Request details: {:?}", request);
124
125 self.do_request(
126 requests,
127 Some(build_db_string(&request.catalog_name, &request.schema_name)),
128 &ctx,
129 )
130 .await
131 }
132
133 pub async fn handle_table_compaction(
135 &self,
136 request: CompactTableRequest,
137 ctx: QueryContextRef,
138 ) -> Result<AffectedRows> {
139 let partitions = &self
140 .get_table_partition_info(
141 &request.catalog_name,
142 &request.schema_name,
143 &request.table_name,
144 )
145 .await?
146 .partitions;
147
148 let requests = partitions
149 .iter()
150 .map(|partition| {
151 RegionRequestBody::Compact(CompactRequest {
152 region_id: partition.id.into(),
153 parallelism: request.parallelism,
154 options: Some(request.compact_options),
155 })
156 })
157 .collect();
158
159 info!("Handle table manual compaction request: {:?}", request);
160
161 self.do_request(
162 requests,
163 Some(build_db_string(&request.catalog_name, &request.schema_name)),
164 &ctx,
165 )
166 .await
167 }
168
169 pub async fn handle_region_flush(
171 &self,
172 region_id: RegionId,
173 ctx: QueryContextRef,
174 ) -> Result<AffectedRows> {
175 let request = RegionRequestBody::Flush(FlushRequest {
176 region_id: region_id.into(),
177 });
178
179 info!("Handle region manual flush request: {region_id}");
180 self.do_request(vec![request], None, &ctx).await
181 }
182
183 pub async fn handle_region_compaction(
185 &self,
186 region_id: RegionId,
187 ctx: QueryContextRef,
188 ) -> Result<AffectedRows> {
189 let request = RegionRequestBody::Compact(CompactRequest {
190 region_id: region_id.into(),
191 parallelism: 1,
192 options: None, });
194
195 info!("Handle region manual compaction request: {region_id}");
196 self.do_request(vec![request], None, &ctx).await
197 }
198}
199
200impl Requester {
201 async fn do_request(
202 &self,
203 requests: Vec<RegionRequestBody>,
204 db_string: Option<String>,
205 ctx: &QueryContextRef,
206 ) -> Result<AffectedRows> {
207 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
208 tracing_context: TracingContext::from_current_span().to_w3c(),
209 dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
210 ..Default::default()
211 });
212
213 let tasks = requests.into_iter().map(|req_body| {
214 let request = request_factory.build_request(req_body.clone());
215 let partition_manager = self.partition_manager.clone();
216 let node_manager = self.node_manager.clone();
217 common_runtime::spawn_global(async move {
218 let peer =
219 Self::find_region_leader_by_request(partition_manager, &req_body).await?;
220 node_manager
221 .datanode(&peer)
222 .await
223 .handle(request)
224 .await
225 .context(RequestRegionSnafu)
226 })
227 });
228 let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
229
230 let affected_rows = results
231 .into_iter()
232 .map(|resp| resp.map(|r| r.affected_rows))
233 .sum::<Result<AffectedRows>>()?;
234
235 Ok(affected_rows)
236 }
237
238 async fn find_region_leader_by_request(
239 partition_manager: PartitionRuleManagerRef,
240 req: &RegionRequestBody,
241 ) -> Result<Peer> {
242 let region_id = match req {
243 RegionRequestBody::Flush(req) => req.region_id,
244 RegionRequestBody::Compact(req) => req.region_id,
245 RegionRequestBody::BuildIndex(req) => req.region_id,
246 _ => {
247 error!("Unsupported region request: {:?}", req);
248 return UnsupportedRegionRequestSnafu {}.fail();
249 }
250 };
251
252 partition_manager
253 .find_region_leader(region_id.into())
254 .await
255 .context(FindRegionLeaderSnafu)
256 }
257
258 async fn get_table_partition_info(
259 &self,
260 catalog: &str,
261 schema: &str,
262 table_name: &str,
263 ) -> Result<Arc<PhysicalPartitionInfo>> {
264 let table = self
265 .catalog_manager
266 .table(catalog, schema, table_name, None)
267 .await
268 .context(CatalogSnafu)?;
269
270 let table = table.with_context(|| TableNotFoundSnafu {
271 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
272 })?;
273 let table_info = table.table_info();
274
275 self.partition_manager
276 .find_physical_partition_info(table_info.ident.table_id)
277 .await
278 .with_context(|_| FindTablePartitionRuleSnafu {
279 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
280 })
281 }
282}