common_meta/ddl/alter_table/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19    alter_request, AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef,
20};
21use snafu::OptionExt;
22use table::metadata::RawTableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28    /// Makes alter kind proto that all regions can reuse.
29    /// Region alter request always add columns if not exist.
30    pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31        // Safety: Checked in `AlterTableProcedure::new`.
32        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33        // Safety: checked
34        let table_info = self.data.table_info().unwrap();
35        let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37        Ok(kind)
38    }
39}
40
41/// Creates region proto alter kind from `table_info` and `alter_kind`.
42///
43/// It always adds column if not exists and drops column if exists.
44/// It skips the column if it already exists in the table.
45fn create_proto_alter_kind(
46    table_info: &RawTableInfo,
47    alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49    match alter_kind {
50        Kind::AddColumns(x) => {
51            // Construct a set of existing columns in the table.
52            let existing_columns: HashSet<_> = table_info
53                .meta
54                .schema
55                .column_schemas
56                .iter()
57                .map(|col| &col.name)
58                .collect();
59            let mut next_column_id = table_info.meta.next_column_id;
60
61            let mut add_columns = Vec::with_capacity(x.add_columns.len());
62            for add_column in &x.add_columns {
63                let column_def = add_column
64                    .column_def
65                    .as_ref()
66                    .context(InvalidProtoMsgSnafu {
67                        err_msg: "'column_def' is absent",
68                    })?;
69
70                // Skips existing columns.
71                if existing_columns.contains(&column_def.name) {
72                    continue;
73                }
74
75                let column_id = next_column_id;
76                next_column_id += 1;
77                let column_def = RegionColumnDef {
78                    column_def: Some(column_def.clone()),
79                    column_id,
80                };
81
82                add_columns.push(AddColumn {
83                    column_def: Some(column_def),
84                    location: add_column.location.clone(),
85                });
86            }
87
88            Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89                add_columns,
90            })))
91        }
92        Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93        Kind::DropColumns(x) => {
94            let drop_columns = x
95                .drop_columns
96                .iter()
97                .map(|x| DropColumn {
98                    name: x.name.clone(),
99                })
100                .collect::<Vec<_>>();
101
102            Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103                drop_columns,
104            })))
105        }
106        Kind::RenameTable(_) => Ok(None),
107        Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108        Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109        Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110        Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111        Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112        Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113        Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114        Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use std::collections::HashMap;
121    use std::sync::Arc;
122
123    use api::v1::add_column_location::LocationType;
124    use api::v1::alter_table_expr::Kind;
125    use api::v1::region::region_request::Body;
126    use api::v1::region::RegionColumnDef;
127    use api::v1::{
128        region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
129        ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
130    };
131    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
132    use store_api::storage::{RegionId, TableId};
133
134    use crate::ddl::alter_table::executor::make_alter_region_request;
135    use crate::ddl::alter_table::AlterTableProcedure;
136    use crate::ddl::test_util::columns::TestColumnDefBuilder;
137    use crate::ddl::test_util::create_table::{
138        build_raw_table_info_from_expr, TestCreateTableExprBuilder,
139    };
140    use crate::ddl::DdlContext;
141    use crate::key::table_route::TableRouteValue;
142    use crate::peer::Peer;
143    use crate::rpc::ddl::AlterTableTask;
144    use crate::rpc::router::{Region, RegionRoute};
145    use crate::test_util::{new_ddl_context, MockDatanodeManager};
146
147    /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
148    async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
149        let datanode_manager = Arc::new(MockDatanodeManager::new(()));
150        let ddl_context = new_ddl_context(datanode_manager);
151        let table_id = 1024;
152        let region_id = RegionId::new(table_id, 1);
153        let table_name = "foo";
154
155        let create_table = TestCreateTableExprBuilder::default()
156            .column_defs([
157                TestColumnDefBuilder::default()
158                    .name("ts")
159                    .data_type(ColumnDataType::TimestampMillisecond)
160                    .semantic_type(SemanticType::Timestamp)
161                    .build()
162                    .unwrap()
163                    .into(),
164                TestColumnDefBuilder::default()
165                    .name("host")
166                    .data_type(ColumnDataType::String)
167                    .semantic_type(SemanticType::Tag)
168                    .build()
169                    .unwrap()
170                    .into(),
171                TestColumnDefBuilder::default()
172                    .name("cpu")
173                    .data_type(ColumnDataType::Float64)
174                    .semantic_type(SemanticType::Field)
175                    .is_nullable(true)
176                    .build()
177                    .unwrap()
178                    .into(),
179            ])
180            .table_id(table_id)
181            .time_index("ts")
182            .primary_keys(["host".into()])
183            .table_name(table_name)
184            .build()
185            .unwrap()
186            .into();
187        let table_info = build_raw_table_info_from_expr(&create_table);
188
189        // Puts a value to table name key.
190        ddl_context
191            .table_metadata_manager
192            .create_table_metadata(
193                table_info,
194                TableRouteValue::physical(vec![RegionRoute {
195                    region: Region::new_test(region_id),
196                    leader_peer: Some(Peer::empty(1)),
197                    follower_peers: vec![],
198                    leader_state: None,
199                    leader_down_since: None,
200                }]),
201                HashMap::new(),
202            )
203            .await
204            .unwrap();
205        (ddl_context, table_id, region_id, table_name.to_string())
206    }
207
208    #[tokio::test]
209    async fn test_make_alter_region_request() {
210        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
211
212        let task = AlterTableTask {
213            alter_table: AlterTableExpr {
214                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
215                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
216                table_name,
217                kind: Some(Kind::AddColumns(AddColumns {
218                    add_columns: vec![AddColumn {
219                        column_def: Some(PbColumnDef {
220                            name: "my_tag3".to_string(),
221                            data_type: ColumnDataType::String as i32,
222                            is_nullable: true,
223                            default_constraint: Vec::new(),
224                            semantic_type: SemanticType::Tag as i32,
225                            comment: String::new(),
226                            ..Default::default()
227                        }),
228                        location: Some(AddColumnLocation {
229                            location_type: LocationType::After as i32,
230                            after_column_name: "host".to_string(),
231                        }),
232                        add_if_not_exists: false,
233                    }],
234                })),
235            },
236        };
237
238        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
239        procedure.on_prepare().await.unwrap();
240        let alter_kind = procedure.make_region_alter_kind().unwrap();
241        let Some(Body::Alter(alter_region_request)) =
242            make_alter_region_request(region_id, alter_kind).body
243        else {
244            unreachable!()
245        };
246        assert_eq!(alter_region_request.region_id, region_id.as_u64());
247        assert_eq!(alter_region_request.schema_version, 0);
248        assert_eq!(
249            alter_region_request.kind,
250            Some(region::alter_request::Kind::AddColumns(
251                region::AddColumns {
252                    add_columns: vec![region::AddColumn {
253                        column_def: Some(RegionColumnDef {
254                            column_def: Some(PbColumnDef {
255                                name: "my_tag3".to_string(),
256                                data_type: ColumnDataType::String as i32,
257                                is_nullable: true,
258                                default_constraint: Vec::new(),
259                                semantic_type: SemanticType::Tag as i32,
260                                comment: String::new(),
261                                ..Default::default()
262                            }),
263                            column_id: 3,
264                        }),
265                        location: Some(AddColumnLocation {
266                            location_type: LocationType::After as i32,
267                            after_column_name: "host".to_string(),
268                        }),
269                    }]
270                }
271            ))
272        );
273    }
274
275    #[tokio::test]
276    async fn test_make_alter_column_type_region_request() {
277        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
278
279        let task = AlterTableTask {
280            alter_table: AlterTableExpr {
281                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
282                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
283                table_name,
284                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
285                    modify_column_types: vec![ModifyColumnType {
286                        column_name: "cpu".to_string(),
287                        target_type: ColumnDataType::String as i32,
288                        target_type_extension: None,
289                    }],
290                })),
291            },
292        };
293
294        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
295        procedure.on_prepare().await.unwrap();
296        let alter_kind = procedure.make_region_alter_kind().unwrap();
297        let Some(Body::Alter(alter_region_request)) =
298            make_alter_region_request(region_id, alter_kind).body
299        else {
300            unreachable!()
301        };
302        assert_eq!(alter_region_request.region_id, region_id.as_u64());
303        assert_eq!(alter_region_request.schema_version, 0);
304        assert_eq!(
305            alter_region_request.kind,
306            Some(region::alter_request::Kind::ModifyColumnTypes(
307                ModifyColumnTypes {
308                    modify_column_types: vec![ModifyColumnType {
309                        column_name: "cpu".to_string(),
310                        target_type: ColumnDataType::String as i32,
311                        target_type_extension: None,
312                    }]
313                }
314            ))
315        );
316    }
317}