cli/metadata/
repair.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_table;
16mod create_table;
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use clap::Parser;
23use client::api::v1::CreateTableExpr;
24use client::client_manager::NodeClients;
25use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26use common_error::ext::{BoxedError, ErrorExt};
27use common_error::status_code::StatusCode;
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::error::Error as CommonMetaError;
30use common_meta::key::TableMetadataManager;
31use common_meta::kv_backend::KvBackendRef;
32use common_meta::node_manager::NodeManagerRef;
33use common_meta::peer::Peer;
34use common_meta::rpc::router::{find_leaders, RegionRoute};
35use common_telemetry::{error, info, warn};
36use futures::TryStreamExt;
37use snafu::{ensure, ResultExt};
38use store_api::storage::TableId;
39
40use crate::error::{
41    InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
42};
43use crate::metadata::common::StoreConfig;
44use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
45use crate::Tool;
46
47/// Repair metadata of logical tables.
48#[derive(Debug, Default, Parser)]
49pub struct RepairLogicalTablesCommand {
50    /// The names of the tables to repair.
51    #[clap(long, value_delimiter = ',', alias = "table-name")]
52    table_names: Vec<String>,
53
54    /// The id of the table to repair.
55    #[clap(long, value_delimiter = ',', alias = "table-id")]
56    table_ids: Vec<TableId>,
57
58    /// The schema of the tables to repair.
59    #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
60    schema_name: String,
61
62    /// The catalog of the tables to repair.
63    #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
64    catalog_name: String,
65
66    /// Whether to fail fast if any repair operation fails.
67    #[clap(long)]
68    fail_fast: bool,
69
70    #[clap(flatten)]
71    store: StoreConfig,
72
73    /// The timeout for the client to operate the datanode.
74    #[clap(long, default_value_t = 30)]
75    client_timeout_secs: u64,
76
77    /// The timeout for the client to connect to the datanode.
78    #[clap(long, default_value_t = 3)]
79    client_connect_timeout_secs: u64,
80}
81
82impl RepairLogicalTablesCommand {
83    fn validate(&self) -> Result<()> {
84        ensure!(
85            !self.table_names.is_empty() || !self.table_ids.is_empty(),
86            InvalidArgumentsSnafu {
87                msg: "You must specify --table-names or --table-ids.",
88            }
89        );
90        Ok(())
91    }
92}
93
94impl RepairLogicalTablesCommand {
95    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
96        self.validate().map_err(BoxedError::new)?;
97        let kv_backend = self.store.build().await?;
98        let node_client_channel_config = ChannelConfig::new()
99            .timeout(Duration::from_secs(self.client_timeout_secs))
100            .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
101        let node_manager = Arc::new(NodeClients::new(node_client_channel_config));
102
103        Ok(Box::new(RepairTool {
104            table_names: self.table_names.clone(),
105            table_ids: self.table_ids.clone(),
106            schema_name: self.schema_name.clone(),
107            catalog_name: self.catalog_name.clone(),
108            fail_fast: self.fail_fast,
109            kv_backend,
110            node_manager,
111        }))
112    }
113}
114
115struct RepairTool {
116    table_names: Vec<String>,
117    table_ids: Vec<TableId>,
118    schema_name: String,
119    catalog_name: String,
120    fail_fast: bool,
121    kv_backend: KvBackendRef,
122    node_manager: NodeManagerRef,
123}
124
125#[async_trait]
126impl Tool for RepairTool {
127    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
128        self.repair_tables().await.map_err(BoxedError::new)
129    }
130}
131
132impl RepairTool {
133    fn generate_iterator_input(&self) -> Result<IteratorInput> {
134        if !self.table_names.is_empty() {
135            let table_names = &self.table_names;
136            let catalog = &self.catalog_name;
137            let schema_name = &self.schema_name;
138
139            let table_names = table_names
140                .iter()
141                .map(|table_name| {
142                    (
143                        catalog.to_string(),
144                        schema_name.to_string(),
145                        table_name.to_string(),
146                    )
147                })
148                .collect::<Vec<_>>();
149            return Ok(IteratorInput::new_table_names(table_names));
150        } else if !self.table_ids.is_empty() {
151            return Ok(IteratorInput::new_table_ids(self.table_ids.clone()));
152        };
153
154        InvalidArgumentsSnafu {
155            msg: "You must specify --table-names or --table-id.",
156        }
157        .fail()
158    }
159
160    async fn repair_tables(&self) -> Result<()> {
161        let input = self.generate_iterator_input()?;
162        let mut table_metadata_iterator =
163            Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream());
164        let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
165
166        let mut skipped_table = 0;
167        let mut success_table = 0;
168        while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? {
169            let full_table_name = full_table_metadata.full_table_name();
170            if !full_table_metadata.is_metric_engine() {
171                warn!(
172                    "Skipping repair for non-metric engine table: {}",
173                    full_table_name
174                );
175                skipped_table += 1;
176                continue;
177            }
178
179            if full_table_metadata.is_physical_table() {
180                warn!("Skipping repair for physical table: {}", full_table_name);
181                skipped_table += 1;
182                continue;
183            }
184
185            let (physical_table_id, physical_table_route) = table_metadata_manager
186                .table_route_manager()
187                .get_physical_table_route(full_table_metadata.table_id)
188                .await
189                .context(TableMetadataSnafu)?;
190
191            if let Err(err) = self
192                .repair_table(
193                    &full_table_metadata,
194                    physical_table_id,
195                    &physical_table_route.region_routes,
196                )
197                .await
198            {
199                error!(
200                    err;
201                    "Failed to repair table: {}, skipped table: {}",
202                    full_table_name,
203                    skipped_table,
204                );
205
206                if self.fail_fast {
207                    return Err(err);
208                }
209            } else {
210                success_table += 1;
211            }
212        }
213
214        info!(
215            "Repair logical tables result: {} tables repaired, {} tables skipped",
216            success_table, skipped_table
217        );
218
219        Ok(())
220    }
221
222    async fn alter_table_on_datanodes(
223        &self,
224        full_table_metadata: &FullTableMetadata,
225        physical_region_routes: &[RegionRoute],
226    ) -> Result<Vec<(Peer, CommonMetaError)>> {
227        let logical_table_id = full_table_metadata.table_id;
228        let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns(
229            &full_table_metadata.table_info,
230        )?;
231        let node_manager = self.node_manager.clone();
232
233        let mut failed_peers = Vec::new();
234        info!(
235            "Sending alter table requests to all datanodes for table: {}, number of regions:{}.",
236            full_table_metadata.full_table_name(),
237            physical_region_routes.len()
238        );
239        let leaders = find_leaders(physical_region_routes);
240        for peer in &leaders {
241            let alter_table_request = alter_table::make_alter_region_request_for_peer(
242                logical_table_id,
243                &alter_table_expr,
244                full_table_metadata.table_info.ident.version,
245                peer,
246                physical_region_routes,
247            )?;
248            let datanode = node_manager.datanode(peer).await;
249            if let Err(err) = datanode.handle(alter_table_request).await {
250                failed_peers.push((peer.clone(), err));
251            }
252        }
253
254        Ok(failed_peers)
255    }
256
257    async fn create_table_on_datanode(
258        &self,
259        create_table_expr: &CreateTableExpr,
260        logical_table_id: TableId,
261        physical_table_id: TableId,
262        peer: &Peer,
263        physical_region_routes: &[RegionRoute],
264    ) -> Result<()> {
265        let node_manager = self.node_manager.clone();
266        let datanode = node_manager.datanode(peer).await;
267        let create_table_request = create_table::make_create_region_request_for_peer(
268            logical_table_id,
269            physical_table_id,
270            create_table_expr,
271            peer,
272            physical_region_routes,
273        )?;
274
275        datanode
276            .handle(create_table_request)
277            .await
278            .with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?;
279
280        Ok(())
281    }
282
283    async fn repair_table(
284        &self,
285        full_table_metadata: &FullTableMetadata,
286        physical_table_id: TableId,
287        physical_region_routes: &[RegionRoute],
288    ) -> Result<()> {
289        let full_table_name = full_table_metadata.full_table_name();
290        // First we sends alter table requests to all datanodes with all columns.
291        let failed_peers = self
292            .alter_table_on_datanodes(full_table_metadata, physical_region_routes)
293            .await?;
294
295        if failed_peers.is_empty() {
296            info!(
297                "All alter table requests sent successfully for table: {}",
298                full_table_name
299            );
300            return Ok(());
301        }
302        warn!(
303            "Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
304            full_table_name,
305            failed_peers.iter().map(|(peer, _)| peer.id).collect::<Vec<_>>()
306        );
307
308        let create_table_expr =
309            create_table::generate_create_table_expr(&full_table_metadata.table_info)?;
310
311        let mut errors = Vec::new();
312        for (peer, err) in failed_peers {
313            if err.status_code() != StatusCode::RegionNotFound {
314                error!(
315                    err;
316                    "Sending alter table requests to datanode: {} for table: {} failed",
317                    peer.id,
318                    full_table_name,
319                );
320                continue;
321            }
322            info!(
323                "Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
324                full_table_name,
325                peer.id
326            );
327
328            // If the alter table request fails for any datanode, we attempt to create the table on that datanode
329            // as a fallback mechanism to ensure table consistency across the cluster.
330            if let Err(err) = self
331                .create_table_on_datanode(
332                    &create_table_expr,
333                    full_table_metadata.table_id,
334                    physical_table_id,
335                    &peer,
336                    physical_region_routes,
337                )
338                .await
339            {
340                error!(
341                    err;
342                    "Failed to create table on datanode: {} for table: {}",
343                    peer.id, full_table_name
344                );
345                errors.push(err);
346                if self.fail_fast {
347                    break;
348                }
349            } else {
350                info!(
351                    "Created table on datanode: {} for table: {}",
352                    peer.id, full_table_name
353                );
354            }
355        }
356
357        if !errors.is_empty() {
358            return UnexpectedSnafu {
359                msg: format!(
360                    "Failed to create table on datanodes for table: {}",
361                    full_table_name,
362                ),
363            }
364            .fail();
365        }
366
367        Ok(())
368    }
369}