1use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{BuildIndexRequest, CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error, info};
25use futures_util::future;
26use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
27use session::context::QueryContextRef;
28use snafu::prelude::*;
29use store_api::storage::RegionId;
30use table::requests::{BuildIndexTableRequest, CompactTableRequest, FlushTableRequest};
31
32use crate::error::{
33 CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
34 RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
35};
36use crate::region_req_factory::RegionRequestFactory;
37
38pub struct Requester {
40 catalog_manager: CatalogManagerRef,
41 partition_manager: PartitionRuleManagerRef,
42 node_manager: NodeManagerRef,
43}
44
45pub type RequesterRef = Arc<Requester>;
46
47impl Requester {
48 pub fn new(
49 catalog_manager: CatalogManagerRef,
50 partition_manager: PartitionRuleManagerRef,
51 node_manager: NodeManagerRef,
52 ) -> Self {
53 Self {
54 catalog_manager,
55 partition_manager,
56 node_manager,
57 }
58 }
59
60 pub async fn handle_table_flush(
62 &self,
63 request: FlushTableRequest,
64 ctx: QueryContextRef,
65 ) -> Result<AffectedRows> {
66 let partitions = self
67 .get_table_partitions(
68 &request.catalog_name,
69 &request.schema_name,
70 &request.table_name,
71 )
72 .await?;
73
74 let requests = partitions
75 .into_iter()
76 .map(|partition| {
77 RegionRequestBody::Flush(FlushRequest {
78 region_id: partition.id.into(),
79 })
80 })
81 .collect();
82
83 info!("Handle table manual flush request: {:?}", request);
84
85 self.do_request(
86 requests,
87 Some(build_db_string(&request.catalog_name, &request.schema_name)),
88 &ctx,
89 )
90 .await
91 }
92
93 pub async fn handle_table_build_index(
95 &self,
96 request: BuildIndexTableRequest,
97 ctx: QueryContextRef,
98 ) -> Result<AffectedRows> {
99 let partitions = self
100 .get_table_partitions(
101 &request.catalog_name,
102 &request.schema_name,
103 &request.table_name,
104 )
105 .await?;
106
107 let requests = partitions
108 .into_iter()
109 .map(|partition| {
110 RegionRequestBody::BuildIndex(BuildIndexRequest {
111 region_id: partition.id.into(),
112 })
113 })
114 .collect();
115
116 info!(
117 "Handle table manual build index for table {}",
118 request.table_name
119 );
120 debug!("Request details: {:?}", request);
121
122 self.do_request(
123 requests,
124 Some(build_db_string(&request.catalog_name, &request.schema_name)),
125 &ctx,
126 )
127 .await
128 }
129
130 pub async fn handle_table_compaction(
132 &self,
133 request: CompactTableRequest,
134 ctx: QueryContextRef,
135 ) -> Result<AffectedRows> {
136 let partitions = self
137 .get_table_partitions(
138 &request.catalog_name,
139 &request.schema_name,
140 &request.table_name,
141 )
142 .await?;
143
144 let requests = partitions
145 .into_iter()
146 .map(|partition| {
147 RegionRequestBody::Compact(CompactRequest {
148 region_id: partition.id.into(),
149 parallelism: request.parallelism,
150 options: Some(request.compact_options),
151 })
152 })
153 .collect();
154
155 info!("Handle table manual compaction request: {:?}", request);
156
157 self.do_request(
158 requests,
159 Some(build_db_string(&request.catalog_name, &request.schema_name)),
160 &ctx,
161 )
162 .await
163 }
164
165 pub async fn handle_region_flush(
167 &self,
168 region_id: RegionId,
169 ctx: QueryContextRef,
170 ) -> Result<AffectedRows> {
171 let request = RegionRequestBody::Flush(FlushRequest {
172 region_id: region_id.into(),
173 });
174
175 info!("Handle region manual flush request: {region_id}");
176 self.do_request(vec![request], None, &ctx).await
177 }
178
179 pub async fn handle_region_compaction(
181 &self,
182 region_id: RegionId,
183 ctx: QueryContextRef,
184 ) -> Result<AffectedRows> {
185 let request = RegionRequestBody::Compact(CompactRequest {
186 region_id: region_id.into(),
187 parallelism: 1,
188 options: None, });
190
191 info!("Handle region manual compaction request: {region_id}");
192 self.do_request(vec![request], None, &ctx).await
193 }
194}
195
196impl Requester {
197 async fn do_request(
198 &self,
199 requests: Vec<RegionRequestBody>,
200 db_string: Option<String>,
201 ctx: &QueryContextRef,
202 ) -> Result<AffectedRows> {
203 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
204 tracing_context: TracingContext::from_current_span().to_w3c(),
205 dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
206 ..Default::default()
207 });
208
209 let tasks = requests.into_iter().map(|req_body| {
210 let request = request_factory.build_request(req_body.clone());
211 let partition_manager = self.partition_manager.clone();
212 let node_manager = self.node_manager.clone();
213 common_runtime::spawn_global(async move {
214 let peer =
215 Self::find_region_leader_by_request(partition_manager, &req_body).await?;
216 node_manager
217 .datanode(&peer)
218 .await
219 .handle(request)
220 .await
221 .context(RequestRegionSnafu)
222 })
223 });
224 let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
225
226 let affected_rows = results
227 .into_iter()
228 .map(|resp| resp.map(|r| r.affected_rows))
229 .sum::<Result<AffectedRows>>()?;
230
231 Ok(affected_rows)
232 }
233
234 async fn find_region_leader_by_request(
235 partition_manager: PartitionRuleManagerRef,
236 req: &RegionRequestBody,
237 ) -> Result<Peer> {
238 let region_id = match req {
239 RegionRequestBody::Flush(req) => req.region_id,
240 RegionRequestBody::Compact(req) => req.region_id,
241 RegionRequestBody::BuildIndex(req) => req.region_id,
242 _ => {
243 error!("Unsupported region request: {:?}", req);
244 return UnsupportedRegionRequestSnafu {}.fail();
245 }
246 };
247
248 partition_manager
249 .find_region_leader(region_id.into())
250 .await
251 .context(FindRegionLeaderSnafu)
252 }
253
254 async fn get_table_partitions(
255 &self,
256 catalog: &str,
257 schema: &str,
258 table_name: &str,
259 ) -> Result<Vec<PartitionInfo>> {
260 let table = self
261 .catalog_manager
262 .table(catalog, schema, table_name, None)
263 .await
264 .context(CatalogSnafu)?;
265
266 let table = table.with_context(|| TableNotFoundSnafu {
267 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
268 })?;
269 let table_info = table.table_info();
270
271 self.partition_manager
272 .find_table_partitions(table_info.ident.table_id)
273 .await
274 .with_context(|_| FindTablePartitionRuleSnafu {
275 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
276 })
277 }
278}