1mod alter_table;
16mod create_table;
17mod partition_column;
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use clap::{Parser, Subcommand};
24use client::api::v1::CreateTableExpr;
25use client::client_manager::NodeClients;
26use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
27use common_error::ext::{BoxedError, ErrorExt};
28use common_error::status_code::StatusCode;
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::error::Error as CommonMetaError;
31use common_meta::key::TableMetadataManager;
32use common_meta::kv_backend::KvBackendRef;
33use common_meta::node_manager::NodeManagerRef;
34use common_meta::peer::Peer;
35use common_meta::rpc::router::{RegionRoute, find_leaders};
36use common_telemetry::{error, info, warn};
37use futures::TryStreamExt;
38use partition_column::RepairPartitionColumnCommand;
39use snafu::{ResultExt, ensure};
40use store_api::storage::TableId;
41
42use crate::Tool;
43use crate::common::StoreConfig;
44use crate::error::{
45 InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
46};
47use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
48
49#[derive(Subcommand)]
50pub enum RepairCommand {
51 LogicalTables(RepairLogicalTablesCommand),
52 PartitionColumn(RepairPartitionColumnCommand),
53}
54
55impl RepairCommand {
56 pub(super) async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
57 match self {
58 Self::LogicalTables(x) => x.build().await,
59 Self::PartitionColumn(x) => x.build().await.map(|x| Box::new(x) as _),
60 }
61 }
62}
63
64#[derive(Debug, Default, Parser)]
66pub struct RepairLogicalTablesCommand {
67 #[clap(long, value_delimiter = ',', alias = "table-name")]
69 table_names: Vec<String>,
70
71 #[clap(long, value_delimiter = ',', alias = "table-id")]
73 table_ids: Vec<TableId>,
74
75 #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
77 schema_name: String,
78
79 #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
81 catalog_name: String,
82
83 #[clap(long)]
85 fail_fast: bool,
86
87 #[clap(flatten)]
88 store: StoreConfig,
89
90 #[clap(long, default_value_t = 30)]
92 client_timeout_secs: u64,
93
94 #[clap(long, default_value_t = 3)]
96 client_connect_timeout_secs: u64,
97}
98
99impl RepairLogicalTablesCommand {
100 fn validate(&self) -> Result<()> {
101 ensure!(
102 !self.table_names.is_empty() || !self.table_ids.is_empty(),
103 InvalidArgumentsSnafu {
104 msg: "You must specify --table-names or --table-ids.",
105 }
106 );
107 Ok(())
108 }
109}
110
111impl RepairLogicalTablesCommand {
112 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
113 self.validate().map_err(BoxedError::new)?;
114 let kv_backend = self.store.build().await?;
115 let node_client_channel_config = ChannelConfig::new()
116 .timeout(Some(Duration::from_secs(self.client_timeout_secs)))
117 .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
118 let node_manager = Arc::new(NodeClients::new(node_client_channel_config));
119
120 Ok(Box::new(RepairTool {
121 table_names: self.table_names.clone(),
122 table_ids: self.table_ids.clone(),
123 schema_name: self.schema_name.clone(),
124 catalog_name: self.catalog_name.clone(),
125 fail_fast: self.fail_fast,
126 kv_backend,
127 node_manager,
128 }))
129 }
130}
131
132struct RepairTool {
133 table_names: Vec<String>,
134 table_ids: Vec<TableId>,
135 schema_name: String,
136 catalog_name: String,
137 fail_fast: bool,
138 kv_backend: KvBackendRef,
139 node_manager: NodeManagerRef,
140}
141
142#[async_trait]
143impl Tool for RepairTool {
144 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
145 self.repair_tables().await.map_err(BoxedError::new)
146 }
147}
148
149impl RepairTool {
150 fn generate_iterator_input(&self) -> Result<IteratorInput> {
151 if !self.table_names.is_empty() {
152 let table_names = &self.table_names;
153 let catalog = &self.catalog_name;
154 let schema_name = &self.schema_name;
155
156 let table_names = table_names
157 .iter()
158 .map(|table_name| (catalog.clone(), schema_name.clone(), table_name.clone()))
159 .collect::<Vec<_>>();
160 return Ok(IteratorInput::new_table_names(table_names));
161 } else if !self.table_ids.is_empty() {
162 return Ok(IteratorInput::new_table_ids(self.table_ids.clone()));
163 };
164
165 InvalidArgumentsSnafu {
166 msg: "You must specify --table-names or --table-id.",
167 }
168 .fail()
169 }
170
171 async fn repair_tables(&self) -> Result<()> {
172 let input = self.generate_iterator_input()?;
173 let mut table_metadata_iterator =
174 Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream());
175 let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
176
177 let mut skipped_table = 0;
178 let mut success_table = 0;
179 while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? {
180 let full_table_name = full_table_metadata.full_table_name();
181 if !full_table_metadata.is_metric_engine() {
182 warn!(
183 "Skipping repair for non-metric engine table: {}",
184 full_table_name
185 );
186 skipped_table += 1;
187 continue;
188 }
189
190 if full_table_metadata.is_physical_table() {
191 warn!("Skipping repair for physical table: {}", full_table_name);
192 skipped_table += 1;
193 continue;
194 }
195
196 let (physical_table_id, physical_table_route) = table_metadata_manager
197 .table_route_manager()
198 .get_physical_table_route(full_table_metadata.table_id)
199 .await
200 .context(TableMetadataSnafu)?;
201
202 if let Err(err) = self
203 .repair_table(
204 &full_table_metadata,
205 physical_table_id,
206 &physical_table_route.region_routes,
207 )
208 .await
209 {
210 error!(
211 err;
212 "Failed to repair table: {}, skipped table: {}",
213 full_table_name,
214 skipped_table,
215 );
216
217 if self.fail_fast {
218 return Err(err);
219 }
220 } else {
221 success_table += 1;
222 }
223 }
224
225 info!(
226 "Repair logical tables result: {} tables repaired, {} tables skipped",
227 success_table, skipped_table
228 );
229
230 Ok(())
231 }
232
233 async fn alter_table_on_datanodes(
234 &self,
235 full_table_metadata: &FullTableMetadata,
236 physical_region_routes: &[RegionRoute],
237 ) -> Result<Vec<(Peer, CommonMetaError)>> {
238 let logical_table_id = full_table_metadata.table_id;
239 let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns(
240 &full_table_metadata.table_info,
241 )?;
242 let node_manager = self.node_manager.clone();
243
244 let mut failed_peers = Vec::new();
245 info!(
246 "Sending alter table requests to all datanodes for table: {}, number of regions:{}.",
247 full_table_metadata.full_table_name(),
248 physical_region_routes.len()
249 );
250 let leaders = find_leaders(physical_region_routes);
251 for peer in &leaders {
252 let alter_table_request = alter_table::make_alter_region_request_for_peer(
253 logical_table_id,
254 &alter_table_expr,
255 peer,
256 physical_region_routes,
257 )?;
258 let datanode = node_manager.datanode(peer).await;
259 if let Err(err) = datanode.handle(alter_table_request).await {
260 failed_peers.push((peer.clone(), err));
261 }
262 }
263
264 Ok(failed_peers)
265 }
266
267 async fn create_table_on_datanode(
268 &self,
269 create_table_expr: &CreateTableExpr,
270 logical_table_id: TableId,
271 physical_table_id: TableId,
272 peer: &Peer,
273 physical_region_routes: &[RegionRoute],
274 ) -> Result<()> {
275 let node_manager = self.node_manager.clone();
276 let datanode = node_manager.datanode(peer).await;
277 let create_table_request = create_table::make_create_region_request_for_peer(
278 logical_table_id,
279 physical_table_id,
280 create_table_expr,
281 peer,
282 physical_region_routes,
283 )?;
284
285 datanode
286 .handle(create_table_request)
287 .await
288 .with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?;
289
290 Ok(())
291 }
292
293 async fn repair_table(
294 &self,
295 full_table_metadata: &FullTableMetadata,
296 physical_table_id: TableId,
297 physical_region_routes: &[RegionRoute],
298 ) -> Result<()> {
299 let full_table_name = full_table_metadata.full_table_name();
300 let failed_peers = self
302 .alter_table_on_datanodes(full_table_metadata, physical_region_routes)
303 .await?;
304
305 if failed_peers.is_empty() {
306 info!(
307 "All alter table requests sent successfully for table: {}",
308 full_table_name
309 );
310 return Ok(());
311 }
312 warn!(
313 "Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
314 full_table_name,
315 failed_peers
316 .iter()
317 .map(|(peer, _)| peer.id)
318 .collect::<Vec<_>>()
319 );
320
321 let create_table_expr =
322 create_table::generate_create_table_expr(&full_table_metadata.table_info)?;
323
324 let mut errors = Vec::new();
325 for (peer, err) in failed_peers {
326 if err.status_code() != StatusCode::RegionNotFound {
327 error!(
328 err;
329 "Sending alter table requests to datanode: {} for table: {} failed",
330 peer.id,
331 full_table_name,
332 );
333 continue;
334 }
335 info!(
336 "Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
337 full_table_name, peer.id
338 );
339
340 if let Err(err) = self
343 .create_table_on_datanode(
344 &create_table_expr,
345 full_table_metadata.table_id,
346 physical_table_id,
347 &peer,
348 physical_region_routes,
349 )
350 .await
351 {
352 error!(
353 err;
354 "Failed to create table on datanode: {} for table: {}",
355 peer.id, full_table_name
356 );
357 errors.push(err);
358 if self.fail_fast {
359 break;
360 }
361 } else {
362 info!(
363 "Created table on datanode: {} for table: {}",
364 peer.id, full_table_name
365 );
366 }
367 }
368
369 if !errors.is_empty() {
370 return UnexpectedSnafu {
371 msg: format!(
372 "Failed to create table on datanodes for table: {}",
373 full_table_name,
374 ),
375 }
376 .fail();
377 }
378
379 Ok(())
380 }
381}