1use std::collections::{HashMap, HashSet};
16
17use async_trait::async_trait;
18use clap::Parser;
19use common_error::ext::BoxedError;
20use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
21use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
22use common_meta::key::{MetadataKey, MetadataValue};
23use common_meta::kv_backend::KvBackendRef;
24use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
25use common_meta::rpc::store::{PutRequest, RangeRequest};
26use common_telemetry::{info, warn};
27use futures::StreamExt;
28use partition::expr::PartitionExpr;
29use store_api::storage::TableId;
30use table::metadata::TableType;
31
32use crate::{StoreConfig, Tool};
33
34#[derive(Parser)]
36pub struct RepairPartitionColumnCommand {
37 #[clap(flatten)]
38 store_config: StoreConfig,
39
40 #[clap(long)]
42 dry_run: bool,
43
44 #[clap(long)]
46 update_limit: Option<u32>,
47}
48
49impl RepairPartitionColumnCommand {
50 pub(super) async fn build(&self) -> Result<RepairPartitionColumnTool, BoxedError> {
51 let kv_backend = self.store_config.build().await?;
52 Ok(RepairPartitionColumnTool {
53 kv_backend,
54 dry_run: self.dry_run,
55 update_limit: self.update_limit,
56 })
57 }
58}
59
60pub(crate) struct RepairPartitionColumnTool {
62 kv_backend: KvBackendRef,
63 dry_run: bool,
64 update_limit: Option<u32>,
65}
66
67impl RepairPartitionColumnTool {
68 async fn do_repair(
69 &self,
70 table_infos: HashMap<TableId, TableInfoValue>,
71 table_routes: HashMap<TableId, TableRouteValue>,
72 ) -> Result<(), BoxedError> {
73 let mut update_count = 0;
74 for (table_id, table_info_value) in &table_infos {
75 let table_meta = &table_info_value.table_info.meta;
76 let mut partition_columns = Vec::with_capacity(table_meta.partition_key_indices.len());
77 for i in &table_meta.partition_key_indices {
78 if let Some(x) = table_meta.schema.column_schemas().get(*i) {
79 partition_columns.push(&x.name);
80 } else {
81 warn!(
82 "Partition column not found by index: {i}, table: {}, id: {}",
83 table_info_value.table_name(),
84 table_id
85 );
86 }
87 }
88
89 let Some(TableRouteValue::Physical(table_route)) = table_routes.get(table_id) else {
90 continue;
91 };
92 let mut partition_expr_columns = HashSet::new();
93 for region_route in &table_route.region_routes {
94 let partition_expr_result =
95 PartitionExpr::from_json_str(®ion_route.region.partition_expr());
96 let partition_expr = match partition_expr_result {
97 Ok(Some(expr)) => expr,
98 Ok(None) => {
99 continue;
101 }
102 Err(e) => {
103 warn!(
104 e;
105 "Failed to deserialize partition expression for region: {:?}, table: {}",
106 region_route.region.id,
107 table_info_value.table_name()
108 );
109 continue;
110 }
111 };
112 partition_expr.collect_column_names(&mut partition_expr_columns);
113 }
114
115 let mut partition_expr_columns = partition_expr_columns.iter().collect::<Vec<_>>();
116 partition_expr_columns.sort();
117 partition_columns.sort();
118 if partition_expr_columns != partition_columns {
119 warn!(
120 "Columns in partition exprs: {:?} do not match partition columns: {:?} in table ‘{}’",
121 partition_expr_columns,
122 partition_columns,
123 table_info_value.table_name(),
124 );
125
126 if let Some(update_limit) = self.update_limit
127 && update_count >= update_limit
128 {
129 warn!(
130 "Reached update limit: {update_limit}. Stopping further table metadata updates."
131 );
132 return Ok(());
133 }
134 self.update_partition_columns(partition_expr_columns, table_info_value)
135 .await?;
136 update_count += 1;
137 }
138 }
139 Ok(())
140 }
141
142 async fn update_partition_columns(
143 &self,
144 partition_expr_columns: Vec<&String>,
145 table_info_value: &TableInfoValue,
146 ) -> Result<(), BoxedError> {
147 let column_schemas = table_info_value.table_info.meta.schema.column_schemas();
148 let mut partition_column_indices = Vec::with_capacity(partition_expr_columns.len());
149 for column_name in &partition_expr_columns {
150 if let Some((i, _)) = column_schemas
151 .iter()
152 .enumerate()
153 .find(|(_, x)| &x.name == *column_name)
154 {
155 partition_column_indices.push(i);
156 } else {
157 warn!(
158 "Partition column '{}' from partition expression not found in table schema '{}'. Skipping this column for update.",
159 column_name,
160 table_info_value.table_name()
161 );
162 }
163 }
164
165 info!(
166 "Updating partition columns to {:?} (by column indices: {:?}) in table '{}'",
167 partition_expr_columns,
168 partition_column_indices,
169 table_info_value.table_name(),
170 );
171 if self.dry_run {
172 info!("Dry run enabled, do nothing");
173 return Ok(());
174 }
175
176 let mut new_table_info = table_info_value.table_info.clone();
177 new_table_info.meta.partition_key_indices = partition_column_indices;
178 let table_info = table_info_value.update(new_table_info);
179
180 let request = PutRequest::new()
181 .with_key(TableInfoKey::new(table_info.table_info.ident.table_id).to_bytes())
182 .with_value(table_info.try_as_raw_value().map_err(BoxedError::new)?);
183 let _ = self
184 .kv_backend
185 .put(request)
186 .await
187 .map_err(BoxedError::new)?;
188 Ok(())
189 }
190}
191
192#[async_trait]
193impl Tool for RepairPartitionColumnTool {
194 async fn do_work(&self) -> Result<(), BoxedError> {
195 let key_values = PaginationStream::new(
196 self.kv_backend.clone(),
197 RangeRequest::new().with_range(vec![0], vec![0]),
198 DEFAULT_PAGE_SIZE,
199 Ok,
200 )
201 .into_stream();
202 let mut key_values = Box::pin(key_values);
203
204 let mut table_infos = HashMap::new();
205 let mut table_routes = HashMap::new();
206 while let Some(result) = key_values.next().await {
207 match result {
208 Ok(kv) => {
209 if let Ok(key) = TableInfoKey::from_bytes(kv.key()) {
210 let Ok(value) = TableInfoValue::try_from_raw_value(&kv.value) else {
211 warn!("Skip corrupted key: {key}");
212 continue;
213 };
214 if value.table_info.table_type == TableType::Base {
215 table_infos.insert(value.table_info.ident.table_id, value);
216 }
217 } else if let Ok(key) = TableRouteKey::from_bytes(kv.key()) {
218 let Ok(value) = TableRouteValue::try_from_raw_value(&kv.value) else {
219 warn!("Skip corrupted key: {key}");
220 continue;
221 };
222 if value.is_physical() {
223 table_routes.insert(key.table_id, value);
224 }
225 }
226 }
227 Err(e) => {
228 warn!(e; "Failed to get next key")
229 }
230 }
231 }
232
233 self.do_repair(table_infos, table_routes).await
234 }
235}
236
237#[cfg(test)]
238mod test {
239 use std::sync::Arc;
240
241 use common_meta::kv_backend::KvBackend;
242 use common_meta::kv_backend::memory::MemoryKvBackend;
243
244 use super::*;
245
246 #[tokio::test]
247 async fn test_repair_partition_column() {
248 common_telemetry::init_default_ut_logging();
249
250 let kv_backend = Arc::new(MemoryKvBackend::new());
251
252 let table_info_key = TableInfoKey::new(1282).to_bytes();
253 let table_info_value = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":10,"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"region_numbers":[0,1,2],"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[3]},"table_type":"Base"},"version":2}"#;
254 kv_backend
255 .put(
256 PutRequest::new()
257 .with_key(table_info_key.clone())
258 .with_value(table_info_value),
259 )
260 .await
261 .unwrap();
262
263 let table_route_key = TableRouteKey::new(1282).to_bytes();
264 let table_route_value = r#"{"type":"physical","region_routes":[{"region":{"id":5506148073472,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":1}}}}"]},"attrs":{}},"leader_peer":{"id":12,"addr":"192.168.1.1:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073473,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":1}}}},\"op\":\"And\",\"rhs\":{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"Lt\",\"rhs\":{\"Value\":{\"Int32\":2}}}}}}"]},"attrs":{}},"leader_peer":{"id":13,"addr":"192.168.1.2:3001"},"follower_peers":[],"leader_down_since":null},{"region":{"id":5506148073474,"name":"","partition":{"column_list":["c4"],"value_list":["{\"Expr\":{\"lhs\":{\"Column\":\"c4\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":2}}}}"]},"attrs":{}},"leader_peer":{"id":10,"addr":"192.168.1.3:3001"},"follower_peers":[],"leader_down_since":null}],"version":0}"#;
265 kv_backend
266 .put(
267 PutRequest::new()
268 .with_key(table_route_key)
269 .with_value(table_route_value),
270 )
271 .await
272 .unwrap();
273
274 let tool = RepairPartitionColumnTool {
275 kv_backend: kv_backend.clone(),
276 dry_run: true,
277 update_limit: None,
278 };
279 tool.do_work().await.unwrap();
280 let actual = String::from_utf8(
281 kv_backend
282 .get(&table_info_key)
283 .await
284 .unwrap()
285 .unwrap()
286 .value,
287 )
288 .unwrap();
289 assert_eq!(actual, table_info_value);
290
291 let tool = RepairPartitionColumnTool {
292 kv_backend: kv_backend.clone(),
293 dry_run: false,
294 update_limit: Some(0),
295 };
296 tool.do_work().await.unwrap();
297 let actual = String::from_utf8(
298 kv_backend
299 .get(&table_info_key)
300 .await
301 .unwrap()
302 .unwrap()
303 .value,
304 )
305 .unwrap();
306 assert_eq!(actual, table_info_value);
307
308 let tool = RepairPartitionColumnTool {
309 kv_backend: kv_backend.clone(),
310 dry_run: false,
311 update_limit: Some(1),
312 };
313 tool.do_work().await.unwrap();
314 let actual = String::from_utf8(
315 kv_backend
316 .get(&table_info_key)
317 .await
318 .unwrap()
319 .unwrap()
320 .value,
321 )
322 .unwrap();
323 let expected = r#"{"table_info":{"ident":{"table_id":1282,"version":2},"name":"foo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"c0","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c1","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c2","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c3","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c4","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c5","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c6","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c7","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c8","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c9","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"c10","data_type":{"Timestamp":{"Nanosecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"version":2},"primary_key_indices":[4,7],"value_indices":[0,1,2,3,5,6,8,9,10],"engine":"mito","next_column_id":11,"options":{"write_buffer_size":null,"ttl":"14days","skip_wal":false,"extra_options":{"append_mode":"true"}},"created_on":"2025-09-25T01:39:28.702584510Z","updated_on":"2025-09-25T01:39:28.702584510Z","partition_key_indices":[4],"column_ids":[]},"table_type":"Base"},"version":3}"#;
324 assert_eq!(actual, expected);
325 }
326}