cli/metadata/repair/
partition_column.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use async_trait::async_trait;
18use clap::Parser;
19use common_error::ext::BoxedError;
20use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
21use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
22use common_meta::key::{MetadataKey, MetadataValue};
23use common_meta::kv_backend::KvBackendRef;
24use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
25use common_meta::rpc::store::{PutRequest, RangeRequest};
26use common_telemetry::{info, warn};
27use futures::StreamExt;
28use partition::expr::PartitionExpr;
29use store_api::storage::TableId;
30use table::metadata::TableType;
31
32use crate::{StoreConfig, Tool};
33
34/// CLI command to repair partition column metadata mismatches.
35#[derive(Parser)]
36pub struct RepairPartitionColumnCommand {
37    #[clap(flatten)]
38    store_config: StoreConfig,
39
40    /// Whether to actually do the update in the underlying metadata store, or not.
41    #[clap(long)]
42    dry_run: bool,
43
44    /// The maximum count of update times.
45    #[clap(long)]
46    update_limit: Option<u32>,
47}
48
49impl RepairPartitionColumnCommand {
50    pub(super) async fn build(&self) -> Result<RepairPartitionColumnTool, BoxedError> {
51        let kv_backend = self.store_config.build().await?;
52        Ok(RepairPartitionColumnTool {
53            kv_backend,
54            dry_run: self.dry_run,
55            update_limit: self.update_limit,
56        })
57    }
58}
59
60/// Repair tool that reconciles partition columns between table info and routes.
61pub(crate) struct RepairPartitionColumnTool {
62    kv_backend: KvBackendRef,
63    dry_run: bool,
64    update_limit: Option<u32>,
65}
66
67impl RepairPartitionColumnTool {
68    async fn do_repair(
69        &self,
70        table_infos: HashMap<TableId, TableInfoValue>,
71        table_routes: HashMap<TableId, TableRouteValue>,
72    ) -> Result<(), BoxedError> {
73        let mut update_count = 0;
74        for (table_id, table_info_value) in &table_infos {
75            let table_meta = &table_info_value.table_info.meta;
76            let mut partition_columns = Vec::with_capacity(table_meta.partition_key_indices.len());
77            for i in &table_meta.partition_key_indices {
78                if let Some(x) = table_meta.schema.column_schemas().get(*i) {
79                    partition_columns.push(&x.name);
80                } else {
81                    warn!(
82                        "Partition column not found by index: {i}, table: {}, id: {}",
83                        table_info_value.table_name(),
84                        table_id
85                    );
86                }
87            }
88
89            let Some(TableRouteValue::Physical(table_route)) = table_routes.get(table_id) else {
90                continue;
91            };
92            let mut partition_expr_columns = HashSet::new();
93            for region_route in &table_route.region_routes {
94                let partition_expr_result =
95                    PartitionExpr::from_json_str(&region_route.region.partition_expr());
96                let partition_expr = match partition_expr_result {
97                    Ok(Some(expr)) => expr,
98                    Ok(None) => {
99                        // No partition expression found, which might be valid.
100                        continue;
101                    }
102                    Err(e) => {
103                        warn!(
104                            e;
105                            "Failed to deserialize partition expression for region: {:?}, table: {}",
106                            region_route.region.id,
107                            table_info_value.table_name()
108                        );
109                        continue;
110                    }
111                };
112                partition_expr.collect_column_names(&mut partition_expr_columns);
113            }
114
115            let mut partition_expr_columns = partition_expr_columns.iter().collect::<Vec<_>>();
116            partition_expr_columns.sort();
117            partition_columns.sort();
118            if partition_expr_columns != partition_columns {
119                warn!(
120                    "Columns in partition exprs: {:?} do not match partition columns: {:?} in table ‘{}’",
121                    partition_expr_columns,
122                    partition_columns,
123                    table_info_value.table_name(),
124                );
125
126                if let Some(update_limit) = self.update_limit
127                    && update_count >= update_limit
128                {
129                    warn!(
130                        "Reached update limit: {update_limit}. Stopping further table metadata updates."
131                    );
132                    return Ok(());
133                }
134                self.update_partition_columns(partition_expr_columns, table_info_value)
135                    .await?;
136                update_count += 1;
137            }
138        }
139        Ok(())
140    }
141
142    async fn update_partition_columns(
143        &self,
144        partition_expr_columns: Vec<&String>,
145        table_info_value: &TableInfoValue,
146    ) -> Result<(), BoxedError> {
147        let column_schemas = table_info_value.table_info.meta.schema.column_schemas();
148        let mut partition_column_indices = Vec::with_capacity(partition_expr_columns.len());
149        for column_name in &partition_expr_columns {
150            if let Some((i, _)) = column_schemas
151                .iter()
152                .enumerate()
153                .find(|(_, x)| &x.name == *column_name)
154            {
155                partition_column_indices.push(i);
156            } else {
157                warn!(
158                    "Partition column '{}' from partition expression not found in table schema '{}'. Skipping this column for update.",
159                    column_name,
160                    table_info_value.table_name()
161                );
162            }
163        }
164
165        info!(
166            "Updating partition columns to {:?} (by column indices: {:?}) in table '{}'",
167            partition_expr_columns,
168            partition_column_indices,
169            table_info_value.table_name(),
170        );
171        if self.dry_run {
172            info!("Dry run enabled, do nothing");
173            return Ok(());
174        }
175
176        let mut new_table_info = table_info_value.table_info.clone();
177        new_table_info.meta.partition_key_indices = partition_column_indices;
178        let table_info = table_info_value.update(new_table_info);
179
180        let request = PutRequest::new()
181            .with_key(TableInfoKey::new(table_info.table_info.ident.table_id).to_bytes())
182            .with_value(table_info.try_as_raw_value().map_err(BoxedError::new)?);
183        let _ = self
184            .kv_backend
185            .put(request)
186            .await
187            .map_err(BoxedError::new)?;
188        Ok(())
189    }
190}
191
192#[async_trait]
193impl Tool for RepairPartitionColumnTool {
194    async fn do_work(&self) -> Result<(), BoxedError> {
195        let key_values = PaginationStream::new(
196            self.kv_backend.clone(),
197            RangeRequest::new().with_range(vec![0], vec![0]),
198            DEFAULT_PAGE_SIZE,
199            Ok,
200        )
201        .into_stream();
202        let mut key_values = Box::pin(key_values);
203
204        let mut table_infos = HashMap::new();
205        let mut table_routes = HashMap::new();
206        while let Some(result) = key_values.next().await {
207            match result {
208                Ok(kv) => {
209                    if let Ok(key) = TableInfoKey::from_bytes(kv.key()) {
210                        let Ok(value) = TableInfoValue::try_from_raw_value(&kv.value) else {
211                            warn!("Skip corrupted key: {key}");
212                            continue;
213                        };
214                        if value.table_info.table_type == TableType::Base {
215                            table_infos.insert(value.table_info.ident.table_id, value);
216                        }
217                    } else if let Ok(key) = TableRouteKey::from_bytes(kv.key()) {
218                        let Ok(value) = TableRouteValue::try_from_raw_value(&kv.value) else {
219                            warn!("Skip corrupted key: {key}");
220                            continue;
221                        };
222                        if value.is_physical() {
223                            table_routes.insert(key.table_id, value);
224                        }
225                    }
226                }
227                Err(e) => {
228                    warn!(e; "Failed to get next key")
229                }
230            }
231        }
232
233        self.do_repair(table_infos, table_routes).await
234    }
235}
236
237#[cfg(test)]
238mod test {
239    use std::sync::Arc;
240
241    use common_meta::kv_backend::KvBackend;
242    use common_meta::kv_backend::memory::MemoryKvBackend;
243
244    use super::*;
245
246    #[tokio::test]
247    async fn test_repair_partition_column() {
248        common_telemetry::init_default_ut_logging();
249
250        let kv_backend = Arc::new(MemoryKvBackend::new());
251
252        let table_info_key = TableInfoKey::new(1282).to_bytes();
253        let table_info_value = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":10,"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"region_numbers":[0,1,2],"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[3]},"table_type":"Base"},"version":2}"#;
254        kv_backend
255            .put(
256                PutRequest::new()
257                    .with_key(table_info_key.clone())
258                    .with_value(table_info_value),
259            )
260            .await
261            .unwrap();
262
263        let table_route_key = TableRouteKey::new(1282).to_bytes();
264        let table_route_value = r#"{"type":"physical","region_routes":[{"region":{"id":5506148073472,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":1}}}}"]},"attrs":{}},"leader_peer":{"id":12,"addr":"192.168.1.1:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073473,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":1}}}},\"op\":\"And\",\"rhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":2}}}}}}"]},"attrs":{}},"leader_peer":{"id":13,"addr":"192.168.1.2:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073474,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":2}}}}"]},"attrs":{}},"leader_peer":{"id":10,"addr":"192.168.1.3:3001"},"follower_peers":[],"leader_down_since":null}],"version":0}"#;
265        kv_backend
266            .put(
267                PutRequest::new()
268                    .with_key(table_route_key)
269                    .with_value(table_route_value),
270            )
271            .await
272            .unwrap();
273
274        let tool = RepairPartitionColumnTool {
275            kv_backend: kv_backend.clone(),
276            dry_run: true,
277            update_limit: None,
278        };
279        tool.do_work().await.unwrap();
280        let actual = String::from_utf8(
281            kv_backend
282                .get(&table_info_key)
283                .await
284                .unwrap()
285                .unwrap()
286                .value,
287        )
288        .unwrap();
289        assert_eq!(actual, table_info_value);
290
291        let tool = RepairPartitionColumnTool {
292            kv_backend: kv_backend.clone(),
293            dry_run: false,
294            update_limit: Some(0),
295        };
296        tool.do_work().await.unwrap();
297        let actual = String::from_utf8(
298            kv_backend
299                .get(&table_info_key)
300                .await
301                .unwrap()
302                .unwrap()
303                .value,
304        )
305        .unwrap();
306        assert_eq!(actual, table_info_value);
307
308        let tool = RepairPartitionColumnTool {
309            kv_backend: kv_backend.clone(),
310            dry_run: false,
311            update_limit: Some(1),
312        };
313        tool.do_work().await.unwrap();
314        let actual = String::from_utf8(
315            kv_backend
316                .get(&table_info_key)
317                .await
318                .unwrap()
319                .unwrap()
320                .value,
321        )
322        .unwrap();
323        let expected = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","updated_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[4],"column_ids":[]},"table_type":"Base"},"version":3}"#;
324        assert_eq!(actual, expected);
325    }
326}