Skip to main content

cli/metadata/control/put/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use common_error::ext::BoxedError;
20use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo};
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::key::{
24    DeserializedValueWithBytes, MetadataValue, RegionDistribution, TableMetadataManager,
25};
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::rpc::router::{RegionRoute, region_distribution};
28use snafu::{OptionExt, ensure};
29use store_api::storage::TableId;
30use table::metadata::TableInfo;
31
32use crate::Tool;
33use crate::common::StoreConfig;
34use crate::error::{Error, InvalidArgumentsSnafu, TableNotFoundSnafu, UnexpectedSnafu};
35use crate::metadata::control::put::read_value;
36use crate::metadata::control::selector::TableSelector;
37
38/// Put table metadata into the metadata store.
39#[derive(Subcommand)]
40pub enum PutTableCommand {
41    Info(PutTableInfoCommand),
42    Route(PutTableRouteCommand),
43}
44
45impl PutTableCommand {
46    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
47        match self {
48            PutTableCommand::Info(cmd) => cmd.build().await,
49            PutTableCommand::Route(cmd) => cmd.build().await,
50        }
51    }
52}
53
54/// Put table info into the metadata store.
55#[derive(Debug, Parser)]
56pub struct PutTableInfoCommand {
57    #[clap(flatten)]
58    selector: TableSelector,
59
60    /// Read the JSON-encoded [`TableInfoValue`] from standard input.
61    #[clap(long, required = true)]
62    value_stdin: bool,
63
64    #[clap(flatten)]
65    store: StoreConfig,
66}
67
68impl PutTableInfoCommand {
69    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
70        let kv_backend = self.store.build().await?;
71        self.build_tool(tokio::io::stdin(), kv_backend).await
72    }
73
74    async fn build_tool<R>(
75        &self,
76        reader: R,
77        kv_backend: KvBackendRef,
78    ) -> Result<Box<dyn Tool>, BoxedError>
79    where
80        R: tokio::io::AsyncRead + Unpin,
81    {
82        self.selector.validate()?;
83        Ok(Box::new(PutTableInfoTool {
84            kv_backend,
85            selector: self.selector.clone(),
86            value: read_value(reader).await?,
87        }))
88    }
89}
90
91struct PutTableInfoTool {
92    kv_backend: KvBackendRef,
93    selector: TableSelector,
94    value: Vec<u8>,
95}
96
97#[async_trait]
98impl Tool for PutTableInfoTool {
99    async fn do_work(&self) -> Result<(), BoxedError> {
100        let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
101        let Some(table_id) = self
102            .selector
103            .resolve_table_id(table_metadata_manager.table_name_manager())
104            .await?
105        else {
106            return Err(BoxedError::new(
107                UnexpectedSnafu {
108                    msg: format!("Table({}) not found", self.selector.formatted_table_name()),
109                }
110                .build(),
111            ));
112        };
113
114        let (current_table_info, current_table_route) =
115            load_table_metadata(&table_metadata_manager, table_id).await?;
116        let new_table_info = TableInfoValue::try_from_raw_value(&self.value)
117            .map_err(|e| {
118                BoxedError::new(
119                    InvalidArgumentsSnafu {
120                        msg: format!("Invalid table info JSON: {e}"),
121                    }
122                    .build(),
123                )
124            })?
125            .table_info;
126        validate_table_info(table_id, &current_table_info.table_info, &new_table_info)
127            .map_err(BoxedError::new)?;
128
129        let region_distribution =
130            physical_region_distribution(current_table_route.get_inner_ref())?;
131
132        if current_table_info.table_info != new_table_info {
133            table_metadata_manager
134                .update_table_info(&current_table_info, region_distribution, new_table_info)
135                .await
136                .map_err(BoxedError::new)?;
137            println!("Table({table_id}) info updated");
138        }
139
140        Ok(())
141    }
142}
143
144/// Put table route into the metadata store.
145#[derive(Debug, Parser)]
146pub struct PutTableRouteCommand {
147    #[clap(flatten)]
148    selector: TableSelector,
149
150    /// Read the JSON-encoded [`TableRouteValue`] from standard input.
151    #[clap(long, required = true)]
152    value_stdin: bool,
153
154    #[clap(flatten)]
155    store: StoreConfig,
156}
157
158impl PutTableRouteCommand {
159    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
160        let kv_backend = self.store.build().await?;
161        self.build_tool(tokio::io::stdin(), kv_backend).await
162    }
163
164    async fn build_tool<R>(
165        &self,
166        reader: R,
167        kv_backend: KvBackendRef,
168    ) -> Result<Box<dyn Tool>, BoxedError>
169    where
170        R: tokio::io::AsyncRead + Unpin,
171    {
172        self.selector.validate()?;
173        Ok(Box::new(PutTableRouteTool {
174            kv_backend,
175            selector: self.selector.clone(),
176            value: read_value(reader).await?,
177        }))
178    }
179}
180
181struct PutTableRouteTool {
182    kv_backend: KvBackendRef,
183    selector: TableSelector,
184    value: Vec<u8>,
185}
186
187#[async_trait]
188impl Tool for PutTableRouteTool {
189    async fn do_work(&self) -> Result<(), BoxedError> {
190        let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
191        let Some(table_id) = self
192            .selector
193            .resolve_table_id(table_metadata_manager.table_name_manager())
194            .await?
195        else {
196            return Err(BoxedError::new(
197                UnexpectedSnafu {
198                    msg: format!("Table({}) not found", self.selector.formatted_table_name()),
199                }
200                .build(),
201            ));
202        };
203
204        let (current_table_info, current_table_route) =
205            load_table_metadata(&table_metadata_manager, table_id).await?;
206        let current_region_routes = current_table_route
207            .region_routes()
208            .map_err(BoxedError::new)?;
209        let new_table_route = TableRouteValue::try_from_raw_value(&self.value).map_err(|e| {
210            BoxedError::new(
211                InvalidArgumentsSnafu {
212                    msg: format!("Invalid table route JSON: {e}"),
213                }
214                .build(),
215            )
216        })?;
217        let new_region_routes = new_table_route.region_routes().map_err(BoxedError::new)?;
218        validate_table_route(table_id, new_region_routes, current_region_routes)
219            .map_err(BoxedError::new)?;
220        let region_info =
221            load_region_info(&table_metadata_manager, table_id, current_region_routes).await?;
222        let new_region_options = current_table_info.table_info.to_region_options();
223        let new_region_wal_options = region_info.region_wal_options.clone();
224
225        if current_table_route.get_inner_ref() != &new_table_route {
226            table_metadata_manager
227                .update_table_route(
228                    table_id,
229                    region_info,
230                    &current_table_route,
231                    new_region_routes.clone(),
232                    &new_region_options,
233                    &new_region_wal_options,
234                )
235                .await
236                .map_err(BoxedError::new)?;
237            println!("Table({table_id}) route updated");
238        }
239
240        Ok(())
241    }
242}
243
244fn validate_table_route(
245    table_id: TableId,
246    new_region_routes: &[RegionRoute],
247    current_region_route: &[RegionRoute],
248) -> Result<(), Error> {
249    let current_region_ids = current_region_route
250        .iter()
251        .map(|r| r.region.id)
252        .collect::<HashSet<_>>();
253    for route in new_region_routes {
254        ensure!(
255            route.region.id.table_id() == table_id,
256            InvalidArgumentsSnafu {
257                msg: format!(
258                    "Invalid table route: all region routes must have table id {table_id}, but got {}",
259                    route.region.id.table_id()
260                ),
261            }
262        );
263        // Ensure the region in new route exists in current route
264        current_region_ids
265            .contains(&route.region.id)
266            .then_some(())
267            .context(InvalidArgumentsSnafu {
268                msg: format!(
269                    "Invalid table route: region {} does not exist in current routes",
270                    route.region.id
271                ),
272            })?;
273    }
274
275    Ok(())
276}
277
278fn validate_table_info(
279    table_id: TableId,
280    current_table_info: &TableInfo,
281    new_table_info: &TableInfo,
282) -> Result<(), Error> {
283    ensure!(
284        new_table_info.ident.table_id == table_id,
285        InvalidArgumentsSnafu {
286            msg: format!(
287                "Invalid table info: expected table id {table_id}, got {}",
288                new_table_info.ident.table_id
289            ),
290        }
291    );
292
293    ensure!(
294        current_table_info.catalog_name == new_table_info.catalog_name,
295        InvalidArgumentsSnafu {
296            msg: format!(
297                "Invalid table info: catalog name is immutable, expected {}, got {}",
298                current_table_info.catalog_name, new_table_info.catalog_name
299            ),
300        }
301    );
302
303    ensure!(
304        current_table_info.schema_name == new_table_info.schema_name,
305        InvalidArgumentsSnafu {
306            msg: format!(
307                "Invalid table info: schema name is immutable, expected {}, got {}",
308                current_table_info.schema_name, new_table_info.schema_name
309            ),
310        }
311    );
312
313    ensure!(
314        current_table_info.name == new_table_info.name,
315        InvalidArgumentsSnafu {
316            msg: format!(
317                "Invalid table info: table name is immutable, expected {}, got {}",
318                current_table_info.name, new_table_info.name
319            ),
320        }
321    );
322
323    Ok(())
324}
325
326async fn load_region_info(
327    table_metadata_manager: &TableMetadataManager,
328    table_id: TableId,
329    region_routes: &[RegionRoute],
330) -> Result<RegionInfo, BoxedError> {
331    let datanode_id = region_distribution(region_routes)
332        .into_keys()
333        .next()
334        .ok_or_else(|| {
335            BoxedError::new(
336                UnexpectedSnafu {
337                    msg: format!(
338                        "Missing datanode assignment for physical table route: {table_id}"
339                    ),
340                }
341                .build(),
342            )
343        })?;
344
345    table_metadata_manager
346        .datanode_table_manager()
347        .get(&DatanodeTableKey::new(datanode_id, table_id))
348        .await
349        .map_err(BoxedError::new)?
350        .map(|value| value.region_info)
351        .ok_or_else(|| {
352            BoxedError::new(
353                UnexpectedSnafu {
354                    msg: format!(
355                        "Missing datanode table metadata for physical table route: {table_id}"
356                    ),
357                }
358                .build(),
359            )
360        })
361}
362
363async fn load_table_metadata(
364    table_metadata_manager: &TableMetadataManager,
365    table_id: TableId,
366) -> Result<
367    (
368        DeserializedValueWithBytes<TableInfoValue>,
369        DeserializedValueWithBytes<TableRouteValue>,
370    ),
371    BoxedError,
372> {
373    let (table_info, table_route) = table_metadata_manager
374        .get_full_table_info(table_id)
375        .await
376        .map_err(BoxedError::new)?;
377    let table_info =
378        table_info.ok_or_else(|| BoxedError::new(TableNotFoundSnafu { table_id }.build()))?;
379    let table_route =
380        table_route.ok_or_else(|| BoxedError::new(TableNotFoundSnafu { table_id }.build()))?;
381    Ok((table_info, table_route))
382}
383
384fn physical_region_distribution(
385    table_route: &TableRouteValue,
386) -> Result<Option<RegionDistribution>, BoxedError> {
387    if !table_route.is_physical() {
388        return Ok(None);
389    }
390
391    table_route
392        .region_routes()
393        .map(|routes| Some(region_distribution(routes)))
394        .map_err(BoxedError::new)
395}
396
397#[cfg(test)]
398mod tests {
399    use std::collections::HashMap;
400    use std::sync::Arc;
401
402    use clap::Parser;
403    use common_error::ext::{BoxedError, ErrorExt};
404    use common_meta::key::TableMetadataManager;
405    use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableManager};
406    use common_meta::key::table_info::TableInfoValue;
407    use common_meta::key::table_route::TableRouteValue;
408    use common_meta::kv_backend::KvBackendRef;
409    use common_meta::kv_backend::memory::MemoryKvBackend;
410    use common_meta::peer::Peer;
411    use common_meta::rpc::router::RegionRoute;
412    use store_api::storage::RegionId;
413    use tokio::io::BufReader;
414
415    use super::{
416        PutTableInfoCommand, PutTableInfoTool, PutTableRouteCommand, PutTableRouteTool,
417        validate_table_route,
418    };
419    use crate::Tool;
420    use crate::metadata::control::selector::TableSelector;
421    use crate::metadata::control::test_utils::prepare_physical_table_metadata;
422
423    impl PutTableInfoCommand {
424        async fn build_for_test<R>(
425            &self,
426            reader: R,
427            kv_backend: KvBackendRef,
428        ) -> Result<Box<dyn Tool>, BoxedError>
429        where
430            R: tokio::io::AsyncRead + Unpin,
431        {
432            self.build_tool(reader, kv_backend).await
433        }
434    }
435
436    impl PutTableRouteCommand {
437        async fn build_for_test<R>(
438            &self,
439            reader: R,
440            kv_backend: KvBackendRef,
441        ) -> Result<Box<dyn Tool>, BoxedError>
442        where
443            R: tokio::io::AsyncRead + Unpin,
444        {
445            self.build_tool(reader, kv_backend).await
446        }
447    }
448
449    #[tokio::test]
450    async fn test_put_table_selector_validation() {
451        let command = PutTableInfoCommand::parse_from([
452            "info",
453            "--value-stdin",
454            "--backend",
455            "memory-store",
456            "--store-addrs",
457            "memory://",
458        ]);
459
460        let err = match command.build().await {
461            Ok(_) => panic!("expected validation failure"),
462            Err(err) => err,
463        };
464        assert!(
465            err.output_msg()
466                .contains("You must specify either --table-id or --table-name.")
467        );
468    }
469
470    #[tokio::test]
471    async fn test_put_table_command_builds_tool_with_table_name() {
472        let command = PutTableInfoCommand::parse_from([
473            "info",
474            "--table-name",
475            "my_table",
476            "--value-stdin",
477            "--backend",
478            "memory-store",
479            "--store-addrs",
480            "memory://",
481        ]);
482
483        let _tool = command
484            .build_for_test(
485                BufReader::new(&b"{}"[..]),
486                Arc::new(MemoryKvBackend::new()) as KvBackendRef,
487            )
488            .await
489            .unwrap();
490    }
491
492    #[tokio::test]
493    async fn test_put_table_info_rejects_table_name_change() {
494        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
495        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
496        let table_id = 1024;
497        let (table_info, table_route) =
498            prepare_physical_table_metadata("old_table", table_id).await;
499        table_metadata_manager
500            .create_table_metadata(
501                table_info.clone(),
502                TableRouteValue::Physical(table_route),
503                HashMap::new(),
504            )
505            .await
506            .unwrap();
507
508        let mut new_table_info = table_info;
509        new_table_info.name = "new_table".to_string();
510        let tool = PutTableInfoTool {
511            kv_backend: kv_backend.clone(),
512            selector: TableSelector::with_table_id(table_id),
513            value: serde_json::to_vec(&TableInfoValue::new(new_table_info)).unwrap(),
514        };
515
516        let err = tool.do_work().await.unwrap_err();
517        assert!(
518            err.output_msg()
519                .contains("Invalid table info: table name is immutable")
520        );
521    }
522
523    #[tokio::test]
524    async fn test_put_table_info_rejects_schema_change() {
525        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
526        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
527        let table_id = 1024;
528        let (table_info, table_route) =
529            prepare_physical_table_metadata("old_table", table_id).await;
530        table_metadata_manager
531            .create_table_metadata(
532                table_info.clone(),
533                TableRouteValue::Physical(table_route),
534                HashMap::new(),
535            )
536            .await
537            .unwrap();
538
539        let mut new_table_info = table_info;
540        new_table_info.schema_name = "another_schema".to_string();
541        let tool = PutTableInfoTool {
542            kv_backend,
543            selector: TableSelector::with_table_id(table_id),
544            value: serde_json::to_vec(&TableInfoValue::new(new_table_info)).unwrap(),
545        };
546
547        let err = tool.do_work().await.unwrap_err();
548        assert!(
549            err.output_msg()
550                .contains("Invalid table info: schema name is immutable")
551        );
552    }
553
554    #[tokio::test]
555    async fn test_put_table_route_updates_route_and_datanode_table() {
556        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
557        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
558        let table_id = 1024;
559        let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
560        table_metadata_manager
561            .create_table_metadata(
562                table_info,
563                TableRouteValue::Physical(table_route.clone()),
564                HashMap::new(),
565            )
566            .await
567            .unwrap();
568
569        let mut region_routes = table_route.region_routes.clone();
570        region_routes[0].leader_peer = Some(Peer::empty(2));
571        let new_table_route = TableRouteValue::physical(region_routes);
572        let tool = PutTableRouteTool {
573            kv_backend: kv_backend.clone(),
574            selector: TableSelector::with_table_id(table_id),
575            value: serde_json::to_vec(&new_table_route).unwrap(),
576        };
577
578        tool.do_work().await.unwrap();
579
580        let (_, current_route) = table_metadata_manager
581            .get_full_table_info(table_id)
582            .await
583            .unwrap();
584        let current_route = current_route.unwrap().into_inner();
585        assert_eq!(
586            current_route.region_routes().unwrap(),
587            new_table_route.region_routes().unwrap()
588        );
589
590        let datanode_table_manager = DatanodeTableManager::new(kv_backend);
591        let updated = datanode_table_manager
592            .get(&DatanodeTableKey::new(2, table_id))
593            .await
594            .unwrap();
595        assert!(updated.is_some());
596    }
597
598    #[tokio::test]
599    async fn test_put_table_route_rejects_logical_route() {
600        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
601        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
602        let table_id = 1024;
603        let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
604        table_metadata_manager
605            .create_table_metadata(
606                table_info,
607                TableRouteValue::Physical(table_route),
608                HashMap::new(),
609            )
610            .await
611            .unwrap();
612
613        let tool = PutTableRouteTool {
614            kv_backend,
615            selector: TableSelector::with_table_id(table_id),
616            value: serde_json::to_vec(&TableRouteValue::logical(table_id + 1)).unwrap(),
617        };
618
619        let err = tool.do_work().await.unwrap_err();
620        assert!(err.output_msg().contains("non-physical TableRouteValue."));
621    }
622
623    #[test]
624    fn test_validate_table_route_rejects_new_region_not_in_current_route() {
625        let table_id = 1024;
626        let current_region_routes = vec![
627            RegionRoute {
628                region: common_meta::rpc::router::Region {
629                    id: RegionId::new(table_id, 1),
630                    ..Default::default()
631                },
632                ..Default::default()
633            },
634            RegionRoute {
635                region: common_meta::rpc::router::Region {
636                    id: RegionId::new(table_id, 2),
637                    ..Default::default()
638                },
639                ..Default::default()
640            },
641        ];
642        let new_region_routes = vec![
643            RegionRoute {
644                region: common_meta::rpc::router::Region {
645                    id: RegionId::new(table_id, 1),
646                    ..Default::default()
647                },
648                ..Default::default()
649            },
650            RegionRoute {
651                region: common_meta::rpc::router::Region {
652                    id: RegionId::new(table_id, 3),
653                    ..Default::default()
654                },
655                ..Default::default()
656            },
657        ];
658
659        let err =
660            validate_table_route(table_id, &current_region_routes, &new_region_routes).unwrap_err();
661
662        assert!(err.to_string().contains("does not exist in current routes"));
663    }
664
665    #[tokio::test]
666    async fn test_put_table_command_builds_tool() {
667        let value = serde_json::to_vec(&TableRouteValue::logical(1025)).unwrap();
668        let command = PutTableRouteCommand::parse_from([
669            "route",
670            "--table-id",
671            "1024",
672            "--value-stdin",
673            "--backend",
674            "memory-store",
675            "--store-addrs",
676            "memory://",
677        ]);
678
679        let _tool = command
680            .build_for_test(
681                BufReader::new(value.as_slice()),
682                Arc::new(MemoryKvBackend::new()) as KvBackendRef,
683            )
684            .await
685            .unwrap();
686    }
687}