cli/metadata/
repair.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_table;
16mod create_table;
17mod partition_column;
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use clap::{Parser, Subcommand};
24use client::api::v1::CreateTableExpr;
25use client::client_manager::NodeClients;
26use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
27use common_error::ext::{BoxedError, ErrorExt};
28use common_error::status_code::StatusCode;
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::error::Error as CommonMetaError;
31use common_meta::key::TableMetadataManager;
32use common_meta::kv_backend::KvBackendRef;
33use common_meta::node_manager::NodeManagerRef;
34use common_meta::peer::Peer;
35use common_meta::rpc::router::{RegionRoute, find_leaders};
36use common_telemetry::{error, info, warn};
37use futures::TryStreamExt;
38use partition_column::RepairPartitionColumnCommand;
39use snafu::{ResultExt, ensure};
40use store_api::storage::TableId;
41
42use crate::Tool;
43use crate::common::StoreConfig;
44use crate::error::{
45    InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
46};
47use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
48
49#[derive(Subcommand)]
50pub enum RepairCommand {
51    LogicalTables(RepairLogicalTablesCommand),
52    PartitionColumn(RepairPartitionColumnCommand),
53}
54
55impl RepairCommand {
56    pub(super) async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
57        match self {
58            Self::LogicalTables(x) => x.build().await,
59            Self::PartitionColumn(x) => x.build().await.map(|x| Box::new(x) as _),
60        }
61    }
62}
63
64/// Repair metadata of logical tables.
65#[derive(Debug, Default, Parser)]
66pub struct RepairLogicalTablesCommand {
67    /// The names of the tables to repair.
68    #[clap(long, value_delimiter = ',', alias = "table-name")]
69    table_names: Vec<String>,
70
71    /// The id of the table to repair.
72    #[clap(long, value_delimiter = ',', alias = "table-id")]
73    table_ids: Vec<TableId>,
74
75    /// The schema of the tables to repair.
76    #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
77    schema_name: String,
78
79    /// The catalog of the tables to repair.
80    #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
81    catalog_name: String,
82
83    /// Whether to fail fast if any repair operation fails.
84    #[clap(long)]
85    fail_fast: bool,
86
87    #[clap(flatten)]
88    store: StoreConfig,
89
90    /// The timeout for the client to operate the datanode.
91    #[clap(long, default_value_t = 30)]
92    client_timeout_secs: u64,
93
94    /// The timeout for the client to connect to the datanode.
95    #[clap(long, default_value_t = 3)]
96    client_connect_timeout_secs: u64,
97}
98
99impl RepairLogicalTablesCommand {
100    fn validate(&self) -> Result<()> {
101        ensure!(
102            !self.table_names.is_empty() || !self.table_ids.is_empty(),
103            InvalidArgumentsSnafu {
104                msg: "You must specify --table-names or --table-ids.",
105            }
106        );
107        Ok(())
108    }
109}
110
111impl RepairLogicalTablesCommand {
112    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
113        self.validate().map_err(BoxedError::new)?;
114        let kv_backend = self.store.build().await?;
115        let node_client_channel_config = ChannelConfig::new()
116            .timeout(Some(Duration::from_secs(self.client_timeout_secs)))
117            .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
118        let node_manager = Arc::new(NodeClients::new(node_client_channel_config));
119
120        Ok(Box::new(RepairTool {
121            table_names: self.table_names.clone(),
122            table_ids: self.table_ids.clone(),
123            schema_name: self.schema_name.clone(),
124            catalog_name: self.catalog_name.clone(),
125            fail_fast: self.fail_fast,
126            kv_backend,
127            node_manager,
128        }))
129    }
130}
131
132struct RepairTool {
133    table_names: Vec<String>,
134    table_ids: Vec<TableId>,
135    schema_name: String,
136    catalog_name: String,
137    fail_fast: bool,
138    kv_backend: KvBackendRef,
139    node_manager: NodeManagerRef,
140}
141
142#[async_trait]
143impl Tool for RepairTool {
144    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
145        self.repair_tables().await.map_err(BoxedError::new)
146    }
147}
148
149impl RepairTool {
150    fn generate_iterator_input(&self) -> Result<IteratorInput> {
151        if !self.table_names.is_empty() {
152            let table_names = &self.table_names;
153            let catalog = &self.catalog_name;
154            let schema_name = &self.schema_name;
155
156            let table_names = table_names
157                .iter()
158                .map(|table_name| (catalog.clone(), schema_name.clone(), table_name.clone()))
159                .collect::<Vec<_>>();
160            return Ok(IteratorInput::new_table_names(table_names));
161        } else if !self.table_ids.is_empty() {
162            return Ok(IteratorInput::new_table_ids(self.table_ids.clone()));
163        };
164
165        InvalidArgumentsSnafu {
166            msg: "You must specify --table-names or --table-id.",
167        }
168        .fail()
169    }
170
171    async fn repair_tables(&self) -> Result<()> {
172        let input = self.generate_iterator_input()?;
173        let mut table_metadata_iterator =
174            Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream());
175        let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
176
177        let mut skipped_table = 0;
178        let mut success_table = 0;
179        while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? {
180            let full_table_name = full_table_metadata.full_table_name();
181            if !full_table_metadata.is_metric_engine() {
182                warn!(
183                    "Skipping repair for non-metric engine table: {}",
184                    full_table_name
185                );
186                skipped_table += 1;
187                continue;
188            }
189
190            if full_table_metadata.is_physical_table() {
191                warn!("Skipping repair for physical table: {}", full_table_name);
192                skipped_table += 1;
193                continue;
194            }
195
196            let (physical_table_id, physical_table_route) = table_metadata_manager
197                .table_route_manager()
198                .get_physical_table_route(full_table_metadata.table_id)
199                .await
200                .context(TableMetadataSnafu)?;
201
202            if let Err(err) = self
203                .repair_table(
204                    &full_table_metadata,
205                    physical_table_id,
206                    &physical_table_route.region_routes,
207                )
208                .await
209            {
210                error!(
211                    err;
212                    "Failed to repair table: {}, skipped table: {}",
213                    full_table_name,
214                    skipped_table,
215                );
216
217                if self.fail_fast {
218                    return Err(err);
219                }
220            } else {
221                success_table += 1;
222            }
223        }
224
225        info!(
226            "Repair logical tables result: {} tables repaired, {} tables skipped",
227            success_table, skipped_table
228        );
229
230        Ok(())
231    }
232
233    async fn alter_table_on_datanodes(
234        &self,
235        full_table_metadata: &FullTableMetadata,
236        physical_region_routes: &[RegionRoute],
237    ) -> Result<Vec<(Peer, CommonMetaError)>> {
238        let logical_table_id = full_table_metadata.table_id;
239        let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns(
240            &full_table_metadata.table_info,
241        )?;
242        let node_manager = self.node_manager.clone();
243
244        let mut failed_peers = Vec::new();
245        info!(
246            "Sending alter table requests to all datanodes for table: {}, number of regions:{}.",
247            full_table_metadata.full_table_name(),
248            physical_region_routes.len()
249        );
250        let leaders = find_leaders(physical_region_routes);
251        for peer in &leaders {
252            let alter_table_request = alter_table::make_alter_region_request_for_peer(
253                logical_table_id,
254                &alter_table_expr,
255                peer,
256                physical_region_routes,
257            )?;
258            let datanode = node_manager.datanode(peer).await;
259            if let Err(err) = datanode.handle(alter_table_request).await {
260                failed_peers.push((peer.clone(), err));
261            }
262        }
263
264        Ok(failed_peers)
265    }
266
267    async fn create_table_on_datanode(
268        &self,
269        create_table_expr: &CreateTableExpr,
270        logical_table_id: TableId,
271        physical_table_id: TableId,
272        peer: &Peer,
273        physical_region_routes: &[RegionRoute],
274    ) -> Result<()> {
275        let node_manager = self.node_manager.clone();
276        let datanode = node_manager.datanode(peer).await;
277        let create_table_request = create_table::make_create_region_request_for_peer(
278            logical_table_id,
279            physical_table_id,
280            create_table_expr,
281            peer,
282            physical_region_routes,
283        )?;
284
285        datanode
286            .handle(create_table_request)
287            .await
288            .with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?;
289
290        Ok(())
291    }
292
293    async fn repair_table(
294        &self,
295        full_table_metadata: &FullTableMetadata,
296        physical_table_id: TableId,
297        physical_region_routes: &[RegionRoute],
298    ) -> Result<()> {
299        let full_table_name = full_table_metadata.full_table_name();
300        // First we sends alter table requests to all datanodes with all columns.
301        let failed_peers = self
302            .alter_table_on_datanodes(full_table_metadata, physical_region_routes)
303            .await?;
304
305        if failed_peers.is_empty() {
306            info!(
307                "All alter table requests sent successfully for table: {}",
308                full_table_name
309            );
310            return Ok(());
311        }
312        warn!(
313            "Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
314            full_table_name,
315            failed_peers
316                .iter()
317                .map(|(peer, _)| peer.id)
318                .collect::<Vec<_>>()
319        );
320
321        let create_table_expr =
322            create_table::generate_create_table_expr(&full_table_metadata.table_info)?;
323
324        let mut errors = Vec::new();
325        for (peer, err) in failed_peers {
326            if err.status_code() != StatusCode::RegionNotFound {
327                error!(
328                    err;
329                    "Sending alter table requests to datanode: {} for table: {} failed",
330                    peer.id,
331                    full_table_name,
332                );
333                continue;
334            }
335            info!(
336                "Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
337                full_table_name, peer.id
338            );
339
340            // If the alter table request fails for any datanode, we attempt to create the table on that datanode
341            // as a fallback mechanism to ensure table consistency across the cluster.
342            if let Err(err) = self
343                .create_table_on_datanode(
344                    &create_table_expr,
345                    full_table_metadata.table_id,
346                    physical_table_id,
347                    &peer,
348                    physical_region_routes,
349                )
350                .await
351            {
352                error!(
353                    err;
354                    "Failed to create table on datanode: {} for table: {}",
355                    peer.id, full_table_name
356                );
357                errors.push(err);
358                if self.fail_fast {
359                    break;
360                }
361            } else {
362                info!(
363                    "Created table on datanode: {} for table: {}",
364                    peer.id, full_table_name
365                );
366            }
367        }
368
369        if !errors.is_empty() {
370            return UnexpectedSnafu {
371                msg: format!(
372                    "Failed to create table on datanodes for table: {}",
373                    full_table_name,
374                ),
375            }
376            .fail();
377        }
378
379        Ok(())
380    }
381}