1use std::collections::HashSet;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use common_error::ext::BoxedError;
20use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo};
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::key::{
24 DeserializedValueWithBytes, MetadataValue, RegionDistribution, TableMetadataManager,
25};
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::rpc::router::{RegionRoute, region_distribution};
28use snafu::{OptionExt, ensure};
29use store_api::storage::TableId;
30use table::metadata::TableInfo;
31
32use crate::Tool;
33use crate::common::StoreConfig;
34use crate::error::{Error, InvalidArgumentsSnafu, TableNotFoundSnafu, UnexpectedSnafu};
35use crate::metadata::control::put::read_value;
36use crate::metadata::control::selector::TableSelector;
37
38#[derive(Subcommand)]
40pub enum PutTableCommand {
41 Info(PutTableInfoCommand),
42 Route(PutTableRouteCommand),
43}
44
45impl PutTableCommand {
46 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
47 match self {
48 PutTableCommand::Info(cmd) => cmd.build().await,
49 PutTableCommand::Route(cmd) => cmd.build().await,
50 }
51 }
52}
53
54#[derive(Debug, Parser)]
56pub struct PutTableInfoCommand {
57 #[clap(flatten)]
58 selector: TableSelector,
59
60 #[clap(long, required = true)]
62 value_stdin: bool,
63
64 #[clap(flatten)]
65 store: StoreConfig,
66}
67
68impl PutTableInfoCommand {
69 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
70 let kv_backend = self.store.build().await?;
71 self.build_tool(tokio::io::stdin(), kv_backend).await
72 }
73
74 async fn build_tool<R>(
75 &self,
76 reader: R,
77 kv_backend: KvBackendRef,
78 ) -> Result<Box<dyn Tool>, BoxedError>
79 where
80 R: tokio::io::AsyncRead + Unpin,
81 {
82 self.selector.validate()?;
83 Ok(Box::new(PutTableInfoTool {
84 kv_backend,
85 selector: self.selector.clone(),
86 value: read_value(reader).await?,
87 }))
88 }
89}
90
91struct PutTableInfoTool {
92 kv_backend: KvBackendRef,
93 selector: TableSelector,
94 value: Vec<u8>,
95}
96
97#[async_trait]
98impl Tool for PutTableInfoTool {
99 async fn do_work(&self) -> Result<(), BoxedError> {
100 let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
101 let Some(table_id) = self
102 .selector
103 .resolve_table_id(table_metadata_manager.table_name_manager())
104 .await?
105 else {
106 return Err(BoxedError::new(
107 UnexpectedSnafu {
108 msg: format!("Table({}) not found", self.selector.formatted_table_name()),
109 }
110 .build(),
111 ));
112 };
113
114 let (current_table_info, current_table_route) =
115 load_table_metadata(&table_metadata_manager, table_id).await?;
116 let new_table_info = TableInfoValue::try_from_raw_value(&self.value)
117 .map_err(|e| {
118 BoxedError::new(
119 InvalidArgumentsSnafu {
120 msg: format!("Invalid table info JSON: {e}"),
121 }
122 .build(),
123 )
124 })?
125 .table_info;
126 validate_table_info(table_id, ¤t_table_info.table_info, &new_table_info)
127 .map_err(BoxedError::new)?;
128
129 let region_distribution =
130 physical_region_distribution(current_table_route.get_inner_ref())?;
131
132 if current_table_info.table_info != new_table_info {
133 table_metadata_manager
134 .update_table_info(¤t_table_info, region_distribution, new_table_info)
135 .await
136 .map_err(BoxedError::new)?;
137 println!("Table({table_id}) info updated");
138 }
139
140 Ok(())
141 }
142}
143
144#[derive(Debug, Parser)]
146pub struct PutTableRouteCommand {
147 #[clap(flatten)]
148 selector: TableSelector,
149
150 #[clap(long, required = true)]
152 value_stdin: bool,
153
154 #[clap(flatten)]
155 store: StoreConfig,
156}
157
158impl PutTableRouteCommand {
159 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
160 let kv_backend = self.store.build().await?;
161 self.build_tool(tokio::io::stdin(), kv_backend).await
162 }
163
164 async fn build_tool<R>(
165 &self,
166 reader: R,
167 kv_backend: KvBackendRef,
168 ) -> Result<Box<dyn Tool>, BoxedError>
169 where
170 R: tokio::io::AsyncRead + Unpin,
171 {
172 self.selector.validate()?;
173 Ok(Box::new(PutTableRouteTool {
174 kv_backend,
175 selector: self.selector.clone(),
176 value: read_value(reader).await?,
177 }))
178 }
179}
180
181struct PutTableRouteTool {
182 kv_backend: KvBackendRef,
183 selector: TableSelector,
184 value: Vec<u8>,
185}
186
187#[async_trait]
188impl Tool for PutTableRouteTool {
189 async fn do_work(&self) -> Result<(), BoxedError> {
190 let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
191 let Some(table_id) = self
192 .selector
193 .resolve_table_id(table_metadata_manager.table_name_manager())
194 .await?
195 else {
196 return Err(BoxedError::new(
197 UnexpectedSnafu {
198 msg: format!("Table({}) not found", self.selector.formatted_table_name()),
199 }
200 .build(),
201 ));
202 };
203
204 let (current_table_info, current_table_route) =
205 load_table_metadata(&table_metadata_manager, table_id).await?;
206 let current_region_routes = current_table_route
207 .region_routes()
208 .map_err(BoxedError::new)?;
209 let new_table_route = TableRouteValue::try_from_raw_value(&self.value).map_err(|e| {
210 BoxedError::new(
211 InvalidArgumentsSnafu {
212 msg: format!("Invalid table route JSON: {e}"),
213 }
214 .build(),
215 )
216 })?;
217 let new_region_routes = new_table_route.region_routes().map_err(BoxedError::new)?;
218 validate_table_route(table_id, new_region_routes, current_region_routes)
219 .map_err(BoxedError::new)?;
220 let region_info =
221 load_region_info(&table_metadata_manager, table_id, current_region_routes).await?;
222 let new_region_options = current_table_info.table_info.to_region_options();
223 let new_region_wal_options = region_info.region_wal_options.clone();
224
225 if current_table_route.get_inner_ref() != &new_table_route {
226 table_metadata_manager
227 .update_table_route(
228 table_id,
229 region_info,
230 ¤t_table_route,
231 new_region_routes.clone(),
232 &new_region_options,
233 &new_region_wal_options,
234 )
235 .await
236 .map_err(BoxedError::new)?;
237 println!("Table({table_id}) route updated");
238 }
239
240 Ok(())
241 }
242}
243
244fn validate_table_route(
245 table_id: TableId,
246 new_region_routes: &[RegionRoute],
247 current_region_route: &[RegionRoute],
248) -> Result<(), Error> {
249 let current_region_ids = current_region_route
250 .iter()
251 .map(|r| r.region.id)
252 .collect::<HashSet<_>>();
253 for route in new_region_routes {
254 ensure!(
255 route.region.id.table_id() == table_id,
256 InvalidArgumentsSnafu {
257 msg: format!(
258 "Invalid table route: all region routes must have table id {table_id}, but got {}",
259 route.region.id.table_id()
260 ),
261 }
262 );
263 current_region_ids
265 .contains(&route.region.id)
266 .then_some(())
267 .context(InvalidArgumentsSnafu {
268 msg: format!(
269 "Invalid table route: region {} does not exist in current routes",
270 route.region.id
271 ),
272 })?;
273 }
274
275 Ok(())
276}
277
278fn validate_table_info(
279 table_id: TableId,
280 current_table_info: &TableInfo,
281 new_table_info: &TableInfo,
282) -> Result<(), Error> {
283 ensure!(
284 new_table_info.ident.table_id == table_id,
285 InvalidArgumentsSnafu {
286 msg: format!(
287 "Invalid table info: expected table id {table_id}, got {}",
288 new_table_info.ident.table_id
289 ),
290 }
291 );
292
293 ensure!(
294 current_table_info.catalog_name == new_table_info.catalog_name,
295 InvalidArgumentsSnafu {
296 msg: format!(
297 "Invalid table info: catalog name is immutable, expected {}, got {}",
298 current_table_info.catalog_name, new_table_info.catalog_name
299 ),
300 }
301 );
302
303 ensure!(
304 current_table_info.schema_name == new_table_info.schema_name,
305 InvalidArgumentsSnafu {
306 msg: format!(
307 "Invalid table info: schema name is immutable, expected {}, got {}",
308 current_table_info.schema_name, new_table_info.schema_name
309 ),
310 }
311 );
312
313 ensure!(
314 current_table_info.name == new_table_info.name,
315 InvalidArgumentsSnafu {
316 msg: format!(
317 "Invalid table info: table name is immutable, expected {}, got {}",
318 current_table_info.name, new_table_info.name
319 ),
320 }
321 );
322
323 Ok(())
324}
325
326async fn load_region_info(
327 table_metadata_manager: &TableMetadataManager,
328 table_id: TableId,
329 region_routes: &[RegionRoute],
330) -> Result<RegionInfo, BoxedError> {
331 let datanode_id = region_distribution(region_routes)
332 .into_keys()
333 .next()
334 .ok_or_else(|| {
335 BoxedError::new(
336 UnexpectedSnafu {
337 msg: format!(
338 "Missing datanode assignment for physical table route: {table_id}"
339 ),
340 }
341 .build(),
342 )
343 })?;
344
345 table_metadata_manager
346 .datanode_table_manager()
347 .get(&DatanodeTableKey::new(datanode_id, table_id))
348 .await
349 .map_err(BoxedError::new)?
350 .map(|value| value.region_info)
351 .ok_or_else(|| {
352 BoxedError::new(
353 UnexpectedSnafu {
354 msg: format!(
355 "Missing datanode table metadata for physical table route: {table_id}"
356 ),
357 }
358 .build(),
359 )
360 })
361}
362
363async fn load_table_metadata(
364 table_metadata_manager: &TableMetadataManager,
365 table_id: TableId,
366) -> Result<
367 (
368 DeserializedValueWithBytes<TableInfoValue>,
369 DeserializedValueWithBytes<TableRouteValue>,
370 ),
371 BoxedError,
372> {
373 let (table_info, table_route) = table_metadata_manager
374 .get_full_table_info(table_id)
375 .await
376 .map_err(BoxedError::new)?;
377 let table_info =
378 table_info.ok_or_else(|| BoxedError::new(TableNotFoundSnafu { table_id }.build()))?;
379 let table_route =
380 table_route.ok_or_else(|| BoxedError::new(TableNotFoundSnafu { table_id }.build()))?;
381 Ok((table_info, table_route))
382}
383
384fn physical_region_distribution(
385 table_route: &TableRouteValue,
386) -> Result<Option<RegionDistribution>, BoxedError> {
387 if !table_route.is_physical() {
388 return Ok(None);
389 }
390
391 table_route
392 .region_routes()
393 .map(|routes| Some(region_distribution(routes)))
394 .map_err(BoxedError::new)
395}
396
397#[cfg(test)]
398mod tests {
399 use std::collections::HashMap;
400 use std::sync::Arc;
401
402 use clap::Parser;
403 use common_error::ext::{BoxedError, ErrorExt};
404 use common_meta::key::TableMetadataManager;
405 use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableManager};
406 use common_meta::key::table_info::TableInfoValue;
407 use common_meta::key::table_route::TableRouteValue;
408 use common_meta::kv_backend::KvBackendRef;
409 use common_meta::kv_backend::memory::MemoryKvBackend;
410 use common_meta::peer::Peer;
411 use common_meta::rpc::router::RegionRoute;
412 use store_api::storage::RegionId;
413 use tokio::io::BufReader;
414
415 use super::{
416 PutTableInfoCommand, PutTableInfoTool, PutTableRouteCommand, PutTableRouteTool,
417 validate_table_route,
418 };
419 use crate::Tool;
420 use crate::metadata::control::selector::TableSelector;
421 use crate::metadata::control::test_utils::prepare_physical_table_metadata;
422
423 impl PutTableInfoCommand {
424 async fn build_for_test<R>(
425 &self,
426 reader: R,
427 kv_backend: KvBackendRef,
428 ) -> Result<Box<dyn Tool>, BoxedError>
429 where
430 R: tokio::io::AsyncRead + Unpin,
431 {
432 self.build_tool(reader, kv_backend).await
433 }
434 }
435
436 impl PutTableRouteCommand {
437 async fn build_for_test<R>(
438 &self,
439 reader: R,
440 kv_backend: KvBackendRef,
441 ) -> Result<Box<dyn Tool>, BoxedError>
442 where
443 R: tokio::io::AsyncRead + Unpin,
444 {
445 self.build_tool(reader, kv_backend).await
446 }
447 }
448
449 #[tokio::test]
450 async fn test_put_table_selector_validation() {
451 let command = PutTableInfoCommand::parse_from([
452 "info",
453 "--value-stdin",
454 "--backend",
455 "memory-store",
456 "--store-addrs",
457 "memory://",
458 ]);
459
460 let err = match command.build().await {
461 Ok(_) => panic!("expected validation failure"),
462 Err(err) => err,
463 };
464 assert!(
465 err.output_msg()
466 .contains("You must specify either --table-id or --table-name.")
467 );
468 }
469
470 #[tokio::test]
471 async fn test_put_table_command_builds_tool_with_table_name() {
472 let command = PutTableInfoCommand::parse_from([
473 "info",
474 "--table-name",
475 "my_table",
476 "--value-stdin",
477 "--backend",
478 "memory-store",
479 "--store-addrs",
480 "memory://",
481 ]);
482
483 let _tool = command
484 .build_for_test(
485 BufReader::new(&b"{}"[..]),
486 Arc::new(MemoryKvBackend::new()) as KvBackendRef,
487 )
488 .await
489 .unwrap();
490 }
491
492 #[tokio::test]
493 async fn test_put_table_info_rejects_table_name_change() {
494 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
495 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
496 let table_id = 1024;
497 let (table_info, table_route) =
498 prepare_physical_table_metadata("old_table", table_id).await;
499 table_metadata_manager
500 .create_table_metadata(
501 table_info.clone(),
502 TableRouteValue::Physical(table_route),
503 HashMap::new(),
504 )
505 .await
506 .unwrap();
507
508 let mut new_table_info = table_info;
509 new_table_info.name = "new_table".to_string();
510 let tool = PutTableInfoTool {
511 kv_backend: kv_backend.clone(),
512 selector: TableSelector::with_table_id(table_id),
513 value: serde_json::to_vec(&TableInfoValue::new(new_table_info)).unwrap(),
514 };
515
516 let err = tool.do_work().await.unwrap_err();
517 assert!(
518 err.output_msg()
519 .contains("Invalid table info: table name is immutable")
520 );
521 }
522
523 #[tokio::test]
524 async fn test_put_table_info_rejects_schema_change() {
525 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
526 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
527 let table_id = 1024;
528 let (table_info, table_route) =
529 prepare_physical_table_metadata("old_table", table_id).await;
530 table_metadata_manager
531 .create_table_metadata(
532 table_info.clone(),
533 TableRouteValue::Physical(table_route),
534 HashMap::new(),
535 )
536 .await
537 .unwrap();
538
539 let mut new_table_info = table_info;
540 new_table_info.schema_name = "another_schema".to_string();
541 let tool = PutTableInfoTool {
542 kv_backend,
543 selector: TableSelector::with_table_id(table_id),
544 value: serde_json::to_vec(&TableInfoValue::new(new_table_info)).unwrap(),
545 };
546
547 let err = tool.do_work().await.unwrap_err();
548 assert!(
549 err.output_msg()
550 .contains("Invalid table info: schema name is immutable")
551 );
552 }
553
554 #[tokio::test]
555 async fn test_put_table_route_updates_route_and_datanode_table() {
556 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
557 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
558 let table_id = 1024;
559 let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
560 table_metadata_manager
561 .create_table_metadata(
562 table_info,
563 TableRouteValue::Physical(table_route.clone()),
564 HashMap::new(),
565 )
566 .await
567 .unwrap();
568
569 let mut region_routes = table_route.region_routes.clone();
570 region_routes[0].leader_peer = Some(Peer::empty(2));
571 let new_table_route = TableRouteValue::physical(region_routes);
572 let tool = PutTableRouteTool {
573 kv_backend: kv_backend.clone(),
574 selector: TableSelector::with_table_id(table_id),
575 value: serde_json::to_vec(&new_table_route).unwrap(),
576 };
577
578 tool.do_work().await.unwrap();
579
580 let (_, current_route) = table_metadata_manager
581 .get_full_table_info(table_id)
582 .await
583 .unwrap();
584 let current_route = current_route.unwrap().into_inner();
585 assert_eq!(
586 current_route.region_routes().unwrap(),
587 new_table_route.region_routes().unwrap()
588 );
589
590 let datanode_table_manager = DatanodeTableManager::new(kv_backend);
591 let updated = datanode_table_manager
592 .get(&DatanodeTableKey::new(2, table_id))
593 .await
594 .unwrap();
595 assert!(updated.is_some());
596 }
597
598 #[tokio::test]
599 async fn test_put_table_route_rejects_logical_route() {
600 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
601 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
602 let table_id = 1024;
603 let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
604 table_metadata_manager
605 .create_table_metadata(
606 table_info,
607 TableRouteValue::Physical(table_route),
608 HashMap::new(),
609 )
610 .await
611 .unwrap();
612
613 let tool = PutTableRouteTool {
614 kv_backend,
615 selector: TableSelector::with_table_id(table_id),
616 value: serde_json::to_vec(&TableRouteValue::logical(table_id + 1)).unwrap(),
617 };
618
619 let err = tool.do_work().await.unwrap_err();
620 assert!(err.output_msg().contains("non-physical TableRouteValue."));
621 }
622
623 #[test]
624 fn test_validate_table_route_rejects_new_region_not_in_current_route() {
625 let table_id = 1024;
626 let current_region_routes = vec![
627 RegionRoute {
628 region: common_meta::rpc::router::Region {
629 id: RegionId::new(table_id, 1),
630 ..Default::default()
631 },
632 ..Default::default()
633 },
634 RegionRoute {
635 region: common_meta::rpc::router::Region {
636 id: RegionId::new(table_id, 2),
637 ..Default::default()
638 },
639 ..Default::default()
640 },
641 ];
642 let new_region_routes = vec![
643 RegionRoute {
644 region: common_meta::rpc::router::Region {
645 id: RegionId::new(table_id, 1),
646 ..Default::default()
647 },
648 ..Default::default()
649 },
650 RegionRoute {
651 region: common_meta::rpc::router::Region {
652 id: RegionId::new(table_id, 3),
653 ..Default::default()
654 },
655 ..Default::default()
656 },
657 ];
658
659 let err =
660 validate_table_route(table_id, ¤t_region_routes, &new_region_routes).unwrap_err();
661
662 assert!(err.to_string().contains("does not exist in current routes"));
663 }
664
665 #[tokio::test]
666 async fn test_put_table_command_builds_tool() {
667 let value = serde_json::to_vec(&TableRouteValue::logical(1025)).unwrap();
668 let command = PutTableRouteCommand::parse_from([
669 "route",
670 "--table-id",
671 "1024",
672 "--value-stdin",
673 "--backend",
674 "memory-store",
675 "--store-addrs",
676 "memory://",
677 ]);
678
679 let _tool = command
680 .build_for_test(
681 BufReader::new(value.as_slice()),
682 Arc::new(MemoryKvBackend::new()) as KvBackendRef,
683 )
684 .await
685 .unwrap();
686 }
687}