1use std::sync::Arc;
16
17use api::v1::region::region_request::Body as RegionRequestBody;
18use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader};
19use catalog::CatalogManagerRef;
20use common_catalog::build_db_string;
21use common_meta::node_manager::{AffectedRows, NodeManagerRef};
22use common_meta::peer::Peer;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{error, info};
25use futures_util::future;
26use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
27use session::context::QueryContextRef;
28use snafu::prelude::*;
29use store_api::storage::RegionId;
30use table::requests::{CompactTableRequest, FlushTableRequest};
31
32use crate::error::{
33    CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu,
34    RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu,
35};
36use crate::region_req_factory::RegionRequestFactory;
37
38pub struct Requester {
40    catalog_manager: CatalogManagerRef,
41    partition_manager: PartitionRuleManagerRef,
42    node_manager: NodeManagerRef,
43}
44
45pub type RequesterRef = Arc<Requester>;
46
47impl Requester {
48    pub fn new(
49        catalog_manager: CatalogManagerRef,
50        partition_manager: PartitionRuleManagerRef,
51        node_manager: NodeManagerRef,
52    ) -> Self {
53        Self {
54            catalog_manager,
55            partition_manager,
56            node_manager,
57        }
58    }
59
60    pub async fn handle_table_flush(
62        &self,
63        request: FlushTableRequest,
64        ctx: QueryContextRef,
65    ) -> Result<AffectedRows> {
66        let partitions = self
67            .get_table_partitions(
68                &request.catalog_name,
69                &request.schema_name,
70                &request.table_name,
71            )
72            .await?;
73
74        let requests = partitions
75            .into_iter()
76            .map(|partition| {
77                RegionRequestBody::Flush(FlushRequest {
78                    region_id: partition.id.into(),
79                })
80            })
81            .collect();
82
83        info!("Handle table manual flush request: {:?}", request);
84
85        self.do_request(
86            requests,
87            Some(build_db_string(&request.catalog_name, &request.schema_name)),
88            &ctx,
89        )
90        .await
91    }
92
93    pub async fn handle_table_compaction(
95        &self,
96        request: CompactTableRequest,
97        ctx: QueryContextRef,
98    ) -> Result<AffectedRows> {
99        let partitions = self
100            .get_table_partitions(
101                &request.catalog_name,
102                &request.schema_name,
103                &request.table_name,
104            )
105            .await?;
106
107        let requests = partitions
108            .into_iter()
109            .map(|partition| {
110                RegionRequestBody::Compact(CompactRequest {
111                    region_id: partition.id.into(),
112                    parallelism: request.parallelism,
113                    options: Some(request.compact_options),
114                })
115            })
116            .collect();
117
118        info!("Handle table manual compaction request: {:?}", request);
119
120        self.do_request(
121            requests,
122            Some(build_db_string(&request.catalog_name, &request.schema_name)),
123            &ctx,
124        )
125        .await
126    }
127
128    pub async fn handle_region_flush(
130        &self,
131        region_id: RegionId,
132        ctx: QueryContextRef,
133    ) -> Result<AffectedRows> {
134        let request = RegionRequestBody::Flush(FlushRequest {
135            region_id: region_id.into(),
136        });
137
138        info!("Handle region manual flush request: {region_id}");
139        self.do_request(vec![request], None, &ctx).await
140    }
141
142    pub async fn handle_region_compaction(
144        &self,
145        region_id: RegionId,
146        ctx: QueryContextRef,
147    ) -> Result<AffectedRows> {
148        let request = RegionRequestBody::Compact(CompactRequest {
149            region_id: region_id.into(),
150            parallelism: 1,
151            options: None, });
153
154        info!("Handle region manual compaction request: {region_id}");
155        self.do_request(vec![request], None, &ctx).await
156    }
157}
158
159impl Requester {
160    async fn do_request(
161        &self,
162        requests: Vec<RegionRequestBody>,
163        db_string: Option<String>,
164        ctx: &QueryContextRef,
165    ) -> Result<AffectedRows> {
166        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
167            tracing_context: TracingContext::from_current_span().to_w3c(),
168            dbname: db_string.unwrap_or_else(|| ctx.get_db_string()),
169            ..Default::default()
170        });
171
172        let tasks = requests.into_iter().map(|req_body| {
173            let request = request_factory.build_request(req_body.clone());
174            let partition_manager = self.partition_manager.clone();
175            let node_manager = self.node_manager.clone();
176            common_runtime::spawn_global(async move {
177                let peer =
178                    Self::find_region_leader_by_request(partition_manager, &req_body).await?;
179                node_manager
180                    .datanode(&peer)
181                    .await
182                    .handle(request)
183                    .await
184                    .context(RequestRegionSnafu)
185            })
186        });
187        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
188
189        let affected_rows = results
190            .into_iter()
191            .map(|resp| resp.map(|r| r.affected_rows))
192            .sum::<Result<AffectedRows>>()?;
193
194        Ok(affected_rows)
195    }
196
197    async fn find_region_leader_by_request(
198        partition_manager: PartitionRuleManagerRef,
199        req: &RegionRequestBody,
200    ) -> Result<Peer> {
201        let region_id = match req {
202            RegionRequestBody::Flush(req) => req.region_id,
203            RegionRequestBody::Compact(req) => req.region_id,
204            _ => {
205                error!("Unsupported region request: {:?}", req);
206                return UnsupportedRegionRequestSnafu {}.fail();
207            }
208        };
209
210        partition_manager
211            .find_region_leader(region_id.into())
212            .await
213            .context(FindRegionLeaderSnafu)
214    }
215
216    async fn get_table_partitions(
217        &self,
218        catalog: &str,
219        schema: &str,
220        table_name: &str,
221    ) -> Result<Vec<PartitionInfo>> {
222        let table = self
223            .catalog_manager
224            .table(catalog, schema, table_name, None)
225            .await
226            .context(CatalogSnafu)?;
227
228        let table = table.with_context(|| TableNotFoundSnafu {
229            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
230        })?;
231        let table_info = table.table_info();
232
233        self.partition_manager
234            .find_table_partitions(table_info.ident.table_id)
235            .await
236            .with_context(|_| FindTablePartitionRuleSnafu {
237                table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
238            })
239    }
240}