1mod alter_table;
16mod create_table;
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use clap::Parser;
23use client::api::v1::CreateTableExpr;
24use client::client_manager::NodeClients;
25use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26use common_error::ext::{BoxedError, ErrorExt};
27use common_error::status_code::StatusCode;
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::error::Error as CommonMetaError;
30use common_meta::key::TableMetadataManager;
31use common_meta::kv_backend::KvBackendRef;
32use common_meta::node_manager::NodeManagerRef;
33use common_meta::peer::Peer;
34use common_meta::rpc::router::{find_leaders, RegionRoute};
35use common_telemetry::{error, info, warn};
36use futures::TryStreamExt;
37use snafu::{ensure, ResultExt};
38use store_api::storage::TableId;
39
40use crate::error::{
41 InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
42};
43use crate::metadata::common::StoreConfig;
44use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
45use crate::Tool;
46
47#[derive(Debug, Default, Parser)]
49pub struct RepairLogicalTablesCommand {
50 #[clap(long, value_delimiter = ',', alias = "table-name")]
52 table_names: Vec<String>,
53
54 #[clap(long, value_delimiter = ',', alias = "table-id")]
56 table_ids: Vec<TableId>,
57
58 #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
60 schema_name: String,
61
62 #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
64 catalog_name: String,
65
66 #[clap(long)]
68 fail_fast: bool,
69
70 #[clap(flatten)]
71 store: StoreConfig,
72
73 #[clap(long, default_value_t = 30)]
75 client_timeout_secs: u64,
76
77 #[clap(long, default_value_t = 3)]
79 client_connect_timeout_secs: u64,
80}
81
82impl RepairLogicalTablesCommand {
83 fn validate(&self) -> Result<()> {
84 ensure!(
85 !self.table_names.is_empty() || !self.table_ids.is_empty(),
86 InvalidArgumentsSnafu {
87 msg: "You must specify --table-names or --table-ids.",
88 }
89 );
90 Ok(())
91 }
92}
93
94impl RepairLogicalTablesCommand {
95 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
96 self.validate().map_err(BoxedError::new)?;
97 let kv_backend = self.store.build().await?;
98 let node_client_channel_config = ChannelConfig::new()
99 .timeout(Duration::from_secs(self.client_timeout_secs))
100 .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
101 let node_manager = Arc::new(NodeClients::new(node_client_channel_config));
102
103 Ok(Box::new(RepairTool {
104 table_names: self.table_names.clone(),
105 table_ids: self.table_ids.clone(),
106 schema_name: self.schema_name.clone(),
107 catalog_name: self.catalog_name.clone(),
108 fail_fast: self.fail_fast,
109 kv_backend,
110 node_manager,
111 }))
112 }
113}
114
115struct RepairTool {
116 table_names: Vec<String>,
117 table_ids: Vec<TableId>,
118 schema_name: String,
119 catalog_name: String,
120 fail_fast: bool,
121 kv_backend: KvBackendRef,
122 node_manager: NodeManagerRef,
123}
124
125#[async_trait]
126impl Tool for RepairTool {
127 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
128 self.repair_tables().await.map_err(BoxedError::new)
129 }
130}
131
132impl RepairTool {
133 fn generate_iterator_input(&self) -> Result<IteratorInput> {
134 if !self.table_names.is_empty() {
135 let table_names = &self.table_names;
136 let catalog = &self.catalog_name;
137 let schema_name = &self.schema_name;
138
139 let table_names = table_names
140 .iter()
141 .map(|table_name| {
142 (
143 catalog.to_string(),
144 schema_name.to_string(),
145 table_name.to_string(),
146 )
147 })
148 .collect::<Vec<_>>();
149 return Ok(IteratorInput::new_table_names(table_names));
150 } else if !self.table_ids.is_empty() {
151 return Ok(IteratorInput::new_table_ids(self.table_ids.clone()));
152 };
153
154 InvalidArgumentsSnafu {
155 msg: "You must specify --table-names or --table-id.",
156 }
157 .fail()
158 }
159
160 async fn repair_tables(&self) -> Result<()> {
161 let input = self.generate_iterator_input()?;
162 let mut table_metadata_iterator =
163 Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream());
164 let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
165
166 let mut skipped_table = 0;
167 let mut success_table = 0;
168 while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? {
169 let full_table_name = full_table_metadata.full_table_name();
170 if !full_table_metadata.is_metric_engine() {
171 warn!(
172 "Skipping repair for non-metric engine table: {}",
173 full_table_name
174 );
175 skipped_table += 1;
176 continue;
177 }
178
179 if full_table_metadata.is_physical_table() {
180 warn!("Skipping repair for physical table: {}", full_table_name);
181 skipped_table += 1;
182 continue;
183 }
184
185 let (physical_table_id, physical_table_route) = table_metadata_manager
186 .table_route_manager()
187 .get_physical_table_route(full_table_metadata.table_id)
188 .await
189 .context(TableMetadataSnafu)?;
190
191 if let Err(err) = self
192 .repair_table(
193 &full_table_metadata,
194 physical_table_id,
195 &physical_table_route.region_routes,
196 )
197 .await
198 {
199 error!(
200 err;
201 "Failed to repair table: {}, skipped table: {}",
202 full_table_name,
203 skipped_table,
204 );
205
206 if self.fail_fast {
207 return Err(err);
208 }
209 } else {
210 success_table += 1;
211 }
212 }
213
214 info!(
215 "Repair logical tables result: {} tables repaired, {} tables skipped",
216 success_table, skipped_table
217 );
218
219 Ok(())
220 }
221
222 async fn alter_table_on_datanodes(
223 &self,
224 full_table_metadata: &FullTableMetadata,
225 physical_region_routes: &[RegionRoute],
226 ) -> Result<Vec<(Peer, CommonMetaError)>> {
227 let logical_table_id = full_table_metadata.table_id;
228 let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns(
229 &full_table_metadata.table_info,
230 )?;
231 let node_manager = self.node_manager.clone();
232
233 let mut failed_peers = Vec::new();
234 info!(
235 "Sending alter table requests to all datanodes for table: {}, number of regions:{}.",
236 full_table_metadata.full_table_name(),
237 physical_region_routes.len()
238 );
239 let leaders = find_leaders(physical_region_routes);
240 for peer in &leaders {
241 let alter_table_request = alter_table::make_alter_region_request_for_peer(
242 logical_table_id,
243 &alter_table_expr,
244 full_table_metadata.table_info.ident.version,
245 peer,
246 physical_region_routes,
247 )?;
248 let datanode = node_manager.datanode(peer).await;
249 if let Err(err) = datanode.handle(alter_table_request).await {
250 failed_peers.push((peer.clone(), err));
251 }
252 }
253
254 Ok(failed_peers)
255 }
256
257 async fn create_table_on_datanode(
258 &self,
259 create_table_expr: &CreateTableExpr,
260 logical_table_id: TableId,
261 physical_table_id: TableId,
262 peer: &Peer,
263 physical_region_routes: &[RegionRoute],
264 ) -> Result<()> {
265 let node_manager = self.node_manager.clone();
266 let datanode = node_manager.datanode(peer).await;
267 let create_table_request = create_table::make_create_region_request_for_peer(
268 logical_table_id,
269 physical_table_id,
270 create_table_expr,
271 peer,
272 physical_region_routes,
273 )?;
274
275 datanode
276 .handle(create_table_request)
277 .await
278 .with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?;
279
280 Ok(())
281 }
282
283 async fn repair_table(
284 &self,
285 full_table_metadata: &FullTableMetadata,
286 physical_table_id: TableId,
287 physical_region_routes: &[RegionRoute],
288 ) -> Result<()> {
289 let full_table_name = full_table_metadata.full_table_name();
290 let failed_peers = self
292 .alter_table_on_datanodes(full_table_metadata, physical_region_routes)
293 .await?;
294
295 if failed_peers.is_empty() {
296 info!(
297 "All alter table requests sent successfully for table: {}",
298 full_table_name
299 );
300 return Ok(());
301 }
302 warn!(
303 "Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
304 full_table_name,
305 failed_peers.iter().map(|(peer, _)| peer.id).collect::<Vec<_>>()
306 );
307
308 let create_table_expr =
309 create_table::generate_create_table_expr(&full_table_metadata.table_info)?;
310
311 let mut errors = Vec::new();
312 for (peer, err) in failed_peers {
313 if err.status_code() != StatusCode::RegionNotFound {
314 error!(
315 err;
316 "Sending alter table requests to datanode: {} for table: {} failed",
317 peer.id,
318 full_table_name,
319 );
320 continue;
321 }
322 info!(
323 "Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
324 full_table_name,
325 peer.id
326 );
327
328 if let Err(err) = self
331 .create_table_on_datanode(
332 &create_table_expr,
333 full_table_metadata.table_id,
334 physical_table_id,
335 &peer,
336 physical_region_routes,
337 )
338 .await
339 {
340 error!(
341 err;
342 "Failed to create table on datanode: {} for table: {}",
343 peer.id, full_table_name
344 );
345 errors.push(err);
346 if self.fail_fast {
347 break;
348 }
349 } else {
350 info!(
351 "Created table on datanode: {} for table: {}",
352 peer.id, full_table_name
353 );
354 }
355 }
356
357 if !errors.is_empty() {
358 return UnexpectedSnafu {
359 msg: format!(
360 "Failed to create table on datanodes for table: {}",
361 full_table_name,
362 ),
363 }
364 .fail();
365 }
366
367 Ok(())
368 }
369}