common_meta/ddl/alter_logical_tables/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_catalog::format_full_table_name;
16use snafu::OptionExt;
17use table::metadata::TableId;
18
19use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
20use crate::error::{
21    AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
22    TableRouteNotFoundSnafu,
23};
24use crate::key::table_info::TableInfoValue;
25use crate::key::table_name::TableNameKey;
26use crate::key::table_route::TableRouteValue;
27use crate::key::DeserializedValueWithBytes;
28use crate::rpc::ddl::AlterTableTask;
29
30impl AlterLogicalTablesProcedure {
31    pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> {
32        debug_assert_eq!(finished_tasks.len(), self.data.tasks.len());
33        debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len());
34        self.data.tasks = self
35            .data
36            .tasks
37            .drain(..)
38            .zip(finished_tasks.iter())
39            .filter_map(|(task, finished)| if *finished { None } else { Some(task) })
40            .collect();
41        self.data.table_info_values = self
42            .data
43            .table_info_values
44            .drain(..)
45            .zip(finished_tasks.iter())
46            .filter_map(|(table_info_value, finished)| {
47                if *finished {
48                    None
49                } else {
50                    Some(table_info_value)
51                }
52            })
53            .collect();
54
55        Ok(())
56    }
57
58    pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
59        let (physical_table_info, physical_table_route) = self
60            .context
61            .table_metadata_manager
62            .get_full_table_info(self.data.physical_table_id)
63            .await?;
64
65        let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu {
66            table: format!("table id - {}", self.data.physical_table_id),
67        })?;
68        let physical_table_route = physical_table_route
69            .context(TableRouteNotFoundSnafu {
70                table_id: self.data.physical_table_id,
71            })?
72            .into_inner();
73
74        self.data.physical_table_info = Some(physical_table_info);
75        let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
76            return AlterLogicalTablesInvalidArgumentsSnafu {
77                err_msg: format!(
78                    "expected a physical table but got a logical table: {:?}",
79                    self.data.physical_table_id
80                ),
81            }
82            .fail();
83        };
84        self.data.physical_table_route = Some(physical_table_route);
85
86        Ok(())
87    }
88
89    pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> {
90        let table_ids = self.get_all_table_ids().await?;
91        let table_info_values = self.get_all_table_info_values(&table_ids).await?;
92        debug_assert_eq!(table_info_values.len(), self.data.tasks.len());
93        self.data.table_info_values = table_info_values;
94
95        Ok(())
96    }
97
98    async fn get_all_table_info_values(
99        &self,
100        table_ids: &[TableId],
101    ) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
102        let table_info_manager = self.context.table_metadata_manager.table_info_manager();
103        let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
104        let mut table_info_values = Vec::with_capacity(table_ids.len());
105        for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
106            let table_info_value =
107                table_info_map
108                    .remove(table_id)
109                    .with_context(|| TableInfoNotFoundSnafu {
110                        table: extract_table_name(task),
111                    })?;
112            table_info_values.push(table_info_value);
113        }
114
115        Ok(table_info_values)
116    }
117
118    async fn get_all_table_ids(&self) -> Result<Vec<TableId>> {
119        let table_name_manager = self.context.table_metadata_manager.table_name_manager();
120        let table_name_keys = self
121            .data
122            .tasks
123            .iter()
124            .map(|task| extract_table_name_key(task))
125            .collect();
126
127        let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
128        let mut table_ids = Vec::with_capacity(table_name_values.len());
129        for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) {
130            let table_id = value
131                .with_context(|| TableNotFoundSnafu {
132                    table_name: extract_table_name(task),
133                })?
134                .table_id();
135            table_ids.push(table_id);
136        }
137
138        Ok(table_ids)
139    }
140}
141
142#[inline]
143fn extract_table_name(task: &AlterTableTask) -> String {
144    format_full_table_name(
145        &task.alter_table.catalog_name,
146        &task.alter_table.schema_name,
147        &task.alter_table.table_name,
148    )
149}
150
151#[inline]
152fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey {
153    TableNameKey::new(
154        &task.alter_table.catalog_name,
155        &task.alter_table.schema_name,
156        &task.alter_table.table_name,
157    )
158}