1use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{error, info};
25use futures_util::future;
26use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
27use session::context::QueryContextRef;
28use snafu::prelude::*;
29use store_api::storage::RegionId;
30use table::requests::{CompactTableRequest, FlushTableRequest};
31
32use crate::error::{
33 CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
34 RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
35};
36use crate::region_req_factory::RegionRequestFactory;
37
38pub struct Requester {
40 catalog_manager: CatalogManagerRef,
41 partition_manager: PartitionRuleManagerRef,
42 node_manager: NodeManagerRef,
43}
44
45pub type RequesterRef = Arc<Requester>;
46
47impl Requester {
48 pub fn new(
49 catalog_manager: CatalogManagerRef,
50 partition_manager: PartitionRuleManagerRef,
51 node_manager: NodeManagerRef,
52 ) -> Self {
53 Self {
54 catalog_manager,
55 partition_manager,
56 node_manager,
57 }
58 }
59
60 pub async fn handle_table_flush(
62 &self,
63 request: FlushTableRequest,
64 ctx: QueryContextRef,
65 ) -> Result<AffectedRows> {
66 let partitions = self
67 .get_table_partitions(
68 &request.catalog_name,
69 &request.schema_name,
70 &request.table_name,
71 )
72 .await?;
73
74 let requests = partitions
75 .into_iter()
76 .map(|partition| {
77 RegionRequestBody::Flush(FlushRequest {
78 region_id: partition.id.into(),
79 })
80 })
81 .collect();
82
83 info!("Handle table manual flush request: {:?}", request);
84
85 self.do_request(
86 requests,
87 Some(build_db_string(&request.catalog_name, &request.schema_name)),
88 &ctx,
89 )
90 .await
91 }
92
93 pub async fn handle_table_compaction(
95 &self,
96 request: CompactTableRequest,
97 ctx: QueryContextRef,
98 ) -> Result<AffectedRows> {
99 let partitions = self
100 .get_table_partitions(
101 &request.catalog_name,
102 &request.schema_name,
103 &request.table_name,
104 )
105 .await?;
106
107 let requests = partitions
108 .into_iter()
109 .map(|partition| {
110 RegionRequestBody::Compact(CompactRequest {
111 region_id: partition.id.into(),
112 options: Some(request.compact_options),
113 })
114 })
115 .collect();
116
117 info!("Handle table manual compaction request: {:?}", request);
118
119 self.do_request(
120 requests,
121 Some(build_db_string(&request.catalog_name, &request.schema_name)),
122 &ctx,
123 )
124 .await
125 }
126
127 pub async fn handle_region_flush(
129 &self,
130 region_id: RegionId,
131 ctx: QueryContextRef,
132 ) -> Result<AffectedRows> {
133 let request = RegionRequestBody::Flush(FlushRequest {
134 region_id: region_id.into(),
135 });
136
137 info!("Handle region manual flush request: {region_id}");
138 self.do_request(vec![request], None, &ctx).await
139 }
140
141 pub async fn handle_region_compaction(
143 &self,
144 region_id: RegionId,
145 ctx: QueryContextRef,
146 ) -> Result<AffectedRows> {
147 let request = RegionRequestBody::Compact(CompactRequest {
148 region_id: region_id.into(),
149 options: None, });
151
152 info!("Handle region manual compaction request: {region_id}");
153 self.do_request(vec![request], None, &ctx).await
154 }
155}
156
157impl Requester {
158 async fn do_request(
159 &self,
160 requests: Vec<RegionRequestBody>,
161 db_string: Option<String>,
162 ctx: &QueryContextRef,
163 ) -> Result<AffectedRows> {
164 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
165 tracing_context: TracingContext::from_current_span().to_w3c(),
166 dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
167 ..Default::default()
168 });
169
170 let tasks = requests.into_iter().map(|req_body| {
171 let request = request_factory.build_request(req_body.clone());
172 let partition_manager = self.partition_manager.clone();
173 let node_manager = self.node_manager.clone();
174 common_runtime::spawn_global(async move {
175 let peer =
176 Self::find_region_leader_by_request(partition_manager, &req_body).await?;
177 node_manager
178 .datanode(&peer)
179 .await
180 .handle(request)
181 .await
182 .context(RequestRegionSnafu)
183 })
184 });
185 let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
186
187 let affected_rows = results
188 .into_iter()
189 .map(|resp| resp.map(|r| r.affected_rows))
190 .sum::<Result<AffectedRows>>()?;
191
192 Ok(affected_rows)
193 }
194
195 async fn find_region_leader_by_request(
196 partition_manager: PartitionRuleManagerRef,
197 req: &RegionRequestBody,
198 ) -> Result<Peer> {
199 let region_id = match req {
200 RegionRequestBody::Flush(req) => req.region_id,
201 RegionRequestBody::Compact(req) => req.region_id,
202 _ => {
203 error!("Unsupported region request: {:?}", req);
204 return UnsupportedRegionRequestSnafu {}.fail();
205 }
206 };
207
208 partition_manager
209 .find_region_leader(region_id.into())
210 .await
211 .context(FindRegionLeaderSnafu)
212 }
213
214 async fn get_table_partitions(
215 &self,
216 catalog: &str,
217 schema: &str,
218 table_name: &str,
219 ) -> Result<Vec<PartitionInfo>> {
220 let table = self
221 .catalog_manager
222 .table(catalog, schema, table_name, None)
223 .await
224 .context(CatalogSnafu)?;
225
226 let table = table.with_context(|| TableNotFoundSnafu {
227 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
228 })?;
229 let table_info = table.table_info();
230
231 self.partition_manager
232 .find_table_partitions(table_info.ident.table_id)
233 .await
234 .with_context(|_| FindTablePartitionRuleSnafu {
235 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
236 })
237 }
238}