common_meta/ddl/alter_table/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19    AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request,
20};
21use snafu::OptionExt;
22use table::metadata::TableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{self, InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28    /// Makes alter kind proto that all regions can reuse.
29    /// Region alter request always add columns if not exist.
30    pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31        // Safety: Checked in `AlterTableProcedure::new`.
32        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33        // Safety: checked
34        let table_info = self.data.table_info().unwrap();
35        let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37        Ok(kind)
38    }
39}
40
41/// Creates region proto alter kind from `table_info` and `alter_kind`.
42///
43/// It always adds column if not exists and drops column if exists.
44/// It skips the column if it already exists in the table.
45fn create_proto_alter_kind(
46    table_info: &TableInfo,
47    alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49    match alter_kind {
50        Kind::AddColumns(x) => {
51            // Construct a set of existing columns in the table.
52            let existing_columns: HashSet<_> = table_info
53                .meta
54                .schema
55                .column_schemas()
56                .iter()
57                .map(|col| &col.name)
58                .collect();
59            let mut next_column_id = table_info.meta.next_column_id;
60
61            let mut add_columns = Vec::with_capacity(x.add_columns.len());
62            for add_column in &x.add_columns {
63                let column_def = add_column
64                    .column_def
65                    .as_ref()
66                    .context(InvalidProtoMsgSnafu {
67                        err_msg: "'column_def' is absent",
68                    })?;
69
70                // Skips existing columns.
71                if existing_columns.contains(&column_def.name) {
72                    continue;
73                }
74
75                let column_id = next_column_id;
76                next_column_id += 1;
77                let column_def = RegionColumnDef {
78                    column_def: Some(column_def.clone()),
79                    column_id,
80                };
81
82                add_columns.push(AddColumn {
83                    column_def: Some(column_def),
84                    location: add_column.location.clone(),
85                });
86            }
87
88            Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89                add_columns,
90            })))
91        }
92        Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93        Kind::DropColumns(x) => {
94            let drop_columns = x
95                .drop_columns
96                .iter()
97                .map(|x| DropColumn {
98                    name: x.name.clone(),
99                })
100                .collect::<Vec<_>>();
101
102            Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103                drop_columns,
104            })))
105        }
106        Kind::RenameTable(_) => Ok(None),
107        Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108        Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109        Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110        Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111        Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112        Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113        Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114        Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115        Kind::Repartition(_) => error::UnexpectedSnafu {
116            err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
117        }
118        .fail()?,
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use std::collections::HashMap;
125    use std::sync::Arc;
126
127    use api::v1::add_column_location::LocationType;
128    use api::v1::alter_table_expr::Kind;
129    use api::v1::region::RegionColumnDef;
130    use api::v1::region::region_request::Body;
131    use api::v1::{
132        AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
133        ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType, region,
134    };
135    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
136    use store_api::storage::{RegionId, TableId};
137
138    use crate::ddl::DdlContext;
139    use crate::ddl::alter_table::AlterTableProcedure;
140    use crate::ddl::alter_table::executor::make_alter_region_request;
141    use crate::ddl::test_util::columns::TestColumnDefBuilder;
142    use crate::ddl::test_util::create_table::{
143        TestCreateTableExprBuilder, build_raw_table_info_from_expr,
144    };
145    use crate::key::table_route::TableRouteValue;
146    use crate::peer::Peer;
147    use crate::rpc::ddl::AlterTableTask;
148    use crate::rpc::router::{Region, RegionRoute};
149    use crate::test_util::{MockDatanodeManager, new_ddl_context};
150
151    /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
152    async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
153        let datanode_manager = Arc::new(MockDatanodeManager::new(()));
154        let ddl_context = new_ddl_context(datanode_manager);
155        let table_id = 1024;
156        let region_id = RegionId::new(table_id, 1);
157        let table_name = "foo";
158
159        let create_table = TestCreateTableExprBuilder::default()
160            .column_defs([
161                TestColumnDefBuilder::default()
162                    .name("ts")
163                    .data_type(ColumnDataType::TimestampMillisecond)
164                    .semantic_type(SemanticType::Timestamp)
165                    .build()
166                    .unwrap()
167                    .into(),
168                TestColumnDefBuilder::default()
169                    .name("host")
170                    .data_type(ColumnDataType::String)
171                    .semantic_type(SemanticType::Tag)
172                    .build()
173                    .unwrap()
174                    .into(),
175                TestColumnDefBuilder::default()
176                    .name("cpu")
177                    .data_type(ColumnDataType::Float64)
178                    .semantic_type(SemanticType::Field)
179                    .is_nullable(true)
180                    .build()
181                    .unwrap()
182                    .into(),
183            ])
184            .table_id(table_id)
185            .time_index("ts")
186            .primary_keys(["host".into()])
187            .table_name(table_name)
188            .build()
189            .unwrap()
190            .into();
191        let table_info = build_raw_table_info_from_expr(&create_table);
192
193        // Puts a value to table name key.
194        ddl_context
195            .table_metadata_manager
196            .create_table_metadata(
197                table_info,
198                TableRouteValue::physical(vec![RegionRoute {
199                    region: Region::new_test(region_id),
200                    leader_peer: Some(Peer::empty(1)),
201                    follower_peers: vec![],
202                    leader_state: None,
203                    leader_down_since: None,
204                    write_route_policy: None,
205                }]),
206                HashMap::new(),
207            )
208            .await
209            .unwrap();
210        (ddl_context, table_id, region_id, table_name.to_string())
211    }
212
213    #[tokio::test]
214    async fn test_make_alter_region_request() {
215        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
216
217        let task = AlterTableTask {
218            alter_table: AlterTableExpr {
219                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
220                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
221                table_name,
222                kind: Some(Kind::AddColumns(AddColumns {
223                    add_columns: vec![AddColumn {
224                        column_def: Some(PbColumnDef {
225                            name: "my_tag3".to_string(),
226                            data_type: ColumnDataType::String as i32,
227                            is_nullable: true,
228                            default_constraint: Vec::new(),
229                            semantic_type: SemanticType::Tag as i32,
230                            comment: String::new(),
231                            ..Default::default()
232                        }),
233                        location: Some(AddColumnLocation {
234                            location_type: LocationType::After as i32,
235                            after_column_name: "host".to_string(),
236                        }),
237                        add_if_not_exists: false,
238                    }],
239                })),
240            },
241        };
242
243        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
244        procedure.on_prepare().await.unwrap();
245        let alter_kind = procedure.make_region_alter_kind().unwrap();
246        let Some(Body::Alter(alter_region_request)) =
247            make_alter_region_request(region_id, alter_kind).body
248        else {
249            unreachable!()
250        };
251        assert_eq!(alter_region_request.region_id, region_id.as_u64());
252        assert_eq!(alter_region_request.schema_version, 0);
253        assert_eq!(
254            alter_region_request.kind,
255            Some(region::alter_request::Kind::AddColumns(
256                region::AddColumns {
257                    add_columns: vec![region::AddColumn {
258                        column_def: Some(RegionColumnDef {
259                            column_def: Some(PbColumnDef {
260                                name: "my_tag3".to_string(),
261                                data_type: ColumnDataType::String as i32,
262                                is_nullable: true,
263                                default_constraint: Vec::new(),
264                                semantic_type: SemanticType::Tag as i32,
265                                comment: String::new(),
266                                ..Default::default()
267                            }),
268                            column_id: 3,
269                        }),
270                        location: Some(AddColumnLocation {
271                            location_type: LocationType::After as i32,
272                            after_column_name: "host".to_string(),
273                        }),
274                    }]
275                }
276            ))
277        );
278    }
279
280    #[tokio::test]
281    async fn test_make_alter_column_type_region_request() {
282        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
283
284        let task = AlterTableTask {
285            alter_table: AlterTableExpr {
286                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
287                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
288                table_name,
289                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
290                    modify_column_types: vec![ModifyColumnType {
291                        column_name: "cpu".to_string(),
292                        target_type: ColumnDataType::String as i32,
293                        target_type_extension: None,
294                    }],
295                })),
296            },
297        };
298
299        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
300        procedure.on_prepare().await.unwrap();
301        let alter_kind = procedure.make_region_alter_kind().unwrap();
302        let Some(Body::Alter(alter_region_request)) =
303            make_alter_region_request(region_id, alter_kind).body
304        else {
305            unreachable!()
306        };
307        assert_eq!(alter_region_request.region_id, region_id.as_u64());
308        assert_eq!(alter_region_request.schema_version, 0);
309        assert_eq!(
310            alter_region_request.kind,
311            Some(region::alter_request::Kind::ModifyColumnTypes(
312                ModifyColumnTypes {
313                    modify_column_types: vec![ModifyColumnType {
314                        column_name: "cpu".to_string(),
315                        target_type: ColumnDataType::String as i32,
316                        target_type_extension: None,
317                    }]
318                }
319            ))
320        );
321    }
322}