common_meta/ddl/alter_table/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19    AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request,
20};
21use snafu::OptionExt;
22use table::metadata::RawTableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{self, InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28    /// Makes alter kind proto that all regions can reuse.
29    /// Region alter request always add columns if not exist.
30    pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31        // Safety: Checked in `AlterTableProcedure::new`.
32        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33        // Safety: checked
34        let table_info = self.data.table_info().unwrap();
35        let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37        Ok(kind)
38    }
39}
40
41/// Creates region proto alter kind from `table_info` and `alter_kind`.
42///
43/// It always adds column if not exists and drops column if exists.
44/// It skips the column if it already exists in the table.
45fn create_proto_alter_kind(
46    table_info: &RawTableInfo,
47    alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49    match alter_kind {
50        Kind::AddColumns(x) => {
51            // Construct a set of existing columns in the table.
52            let existing_columns: HashSet<_> = table_info
53                .meta
54                .schema
55                .column_schemas
56                .iter()
57                .map(|col| &col.name)
58                .collect();
59            let mut next_column_id = table_info.meta.next_column_id;
60
61            let mut add_columns = Vec::with_capacity(x.add_columns.len());
62            for add_column in &x.add_columns {
63                let column_def = add_column
64                    .column_def
65                    .as_ref()
66                    .context(InvalidProtoMsgSnafu {
67                        err_msg: "'column_def' is absent",
68                    })?;
69
70                // Skips existing columns.
71                if existing_columns.contains(&column_def.name) {
72                    continue;
73                }
74
75                let column_id = next_column_id;
76                next_column_id += 1;
77                let column_def = RegionColumnDef {
78                    column_def: Some(column_def.clone()),
79                    column_id,
80                };
81
82                add_columns.push(AddColumn {
83                    column_def: Some(column_def),
84                    location: add_column.location.clone(),
85                });
86            }
87
88            Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89                add_columns,
90            })))
91        }
92        Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93        Kind::DropColumns(x) => {
94            let drop_columns = x
95                .drop_columns
96                .iter()
97                .map(|x| DropColumn {
98                    name: x.name.clone(),
99                })
100                .collect::<Vec<_>>();
101
102            Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103                drop_columns,
104            })))
105        }
106        Kind::RenameTable(_) => Ok(None),
107        Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108        Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109        Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110        Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111        Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112        Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113        Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114        Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115        Kind::Repartition(_) => error::UnexpectedSnafu {
116            err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
117        }
118        .fail()?,
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use std::collections::HashMap;
125    use std::sync::Arc;
126
127    use api::v1::add_column_location::LocationType;
128    use api::v1::alter_table_expr::Kind;
129    use api::v1::region::RegionColumnDef;
130    use api::v1::region::region_request::Body;
131    use api::v1::{
132        AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
133        ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType, region,
134    };
135    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
136    use store_api::storage::{RegionId, TableId};
137
138    use crate::ddl::DdlContext;
139    use crate::ddl::alter_table::AlterTableProcedure;
140    use crate::ddl::alter_table::executor::make_alter_region_request;
141    use crate::ddl::test_util::columns::TestColumnDefBuilder;
142    use crate::ddl::test_util::create_table::{
143        TestCreateTableExprBuilder, build_raw_table_info_from_expr,
144    };
145    use crate::key::table_route::TableRouteValue;
146    use crate::peer::Peer;
147    use crate::rpc::ddl::AlterTableTask;
148    use crate::rpc::router::{Region, RegionRoute};
149    use crate::test_util::{MockDatanodeManager, new_ddl_context};
150
151    /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
152    async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
153        let datanode_manager = Arc::new(MockDatanodeManager::new(()));
154        let ddl_context = new_ddl_context(datanode_manager);
155        let table_id = 1024;
156        let region_id = RegionId::new(table_id, 1);
157        let table_name = "foo";
158
159        let create_table = TestCreateTableExprBuilder::default()
160            .column_defs([
161                TestColumnDefBuilder::default()
162                    .name("ts")
163                    .data_type(ColumnDataType::TimestampMillisecond)
164                    .semantic_type(SemanticType::Timestamp)
165                    .build()
166                    .unwrap()
167                    .into(),
168                TestColumnDefBuilder::default()
169                    .name("host")
170                    .data_type(ColumnDataType::String)
171                    .semantic_type(SemanticType::Tag)
172                    .build()
173                    .unwrap()
174                    .into(),
175                TestColumnDefBuilder::default()
176                    .name("cpu")
177                    .data_type(ColumnDataType::Float64)
178                    .semantic_type(SemanticType::Field)
179                    .is_nullable(true)
180                    .build()
181                    .unwrap()
182                    .into(),
183            ])
184            .table_id(table_id)
185            .time_index("ts")
186            .primary_keys(["host".into()])
187            .table_name(table_name)
188            .build()
189            .unwrap()
190            .into();
191        let table_info = build_raw_table_info_from_expr(&create_table);
192
193        // Puts a value to table name key.
194        ddl_context
195            .table_metadata_manager
196            .create_table_metadata(
197                table_info,
198                TableRouteValue::physical(vec![RegionRoute {
199                    region: Region::new_test(region_id),
200                    leader_peer: Some(Peer::empty(1)),
201                    follower_peers: vec![],
202                    leader_state: None,
203                    leader_down_since: None,
204                }]),
205                HashMap::new(),
206            )
207            .await
208            .unwrap();
209        (ddl_context, table_id, region_id, table_name.to_string())
210    }
211
212    #[tokio::test]
213    async fn test_make_alter_region_request() {
214        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
215
216        let task = AlterTableTask {
217            alter_table: AlterTableExpr {
218                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
219                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
220                table_name,
221                kind: Some(Kind::AddColumns(AddColumns {
222                    add_columns: vec![AddColumn {
223                        column_def: Some(PbColumnDef {
224                            name: "my_tag3".to_string(),
225                            data_type: ColumnDataType::String as i32,
226                            is_nullable: true,
227                            default_constraint: Vec::new(),
228                            semantic_type: SemanticType::Tag as i32,
229                            comment: String::new(),
230                            ..Default::default()
231                        }),
232                        location: Some(AddColumnLocation {
233                            location_type: LocationType::After as i32,
234                            after_column_name: "host".to_string(),
235                        }),
236                        add_if_not_exists: false,
237                    }],
238                })),
239            },
240        };
241
242        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
243        procedure.on_prepare().await.unwrap();
244        let alter_kind = procedure.make_region_alter_kind().unwrap();
245        let Some(Body::Alter(alter_region_request)) =
246            make_alter_region_request(region_id, alter_kind).body
247        else {
248            unreachable!()
249        };
250        assert_eq!(alter_region_request.region_id, region_id.as_u64());
251        assert_eq!(alter_region_request.schema_version, 0);
252        assert_eq!(
253            alter_region_request.kind,
254            Some(region::alter_request::Kind::AddColumns(
255                region::AddColumns {
256                    add_columns: vec![region::AddColumn {
257                        column_def: Some(RegionColumnDef {
258                            column_def: Some(PbColumnDef {
259                                name: "my_tag3".to_string(),
260                                data_type: ColumnDataType::String as i32,
261                                is_nullable: true,
262                                default_constraint: Vec::new(),
263                                semantic_type: SemanticType::Tag as i32,
264                                comment: String::new(),
265                                ..Default::default()
266                            }),
267                            column_id: 3,
268                        }),
269                        location: Some(AddColumnLocation {
270                            location_type: LocationType::After as i32,
271                            after_column_name: "host".to_string(),
272                        }),
273                    }]
274                }
275            ))
276        );
277    }
278
279    #[tokio::test]
280    async fn test_make_alter_column_type_region_request() {
281        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
282
283        let task = AlterTableTask {
284            alter_table: AlterTableExpr {
285                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
286                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
287                table_name,
288                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
289                    modify_column_types: vec![ModifyColumnType {
290                        column_name: "cpu".to_string(),
291                        target_type: ColumnDataType::String as i32,
292                        target_type_extension: None,
293                    }],
294                })),
295            },
296        };
297
298        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
299        procedure.on_prepare().await.unwrap();
300        let alter_kind = procedure.make_region_alter_kind().unwrap();
301        let Some(Body::Alter(alter_region_request)) =
302            make_alter_region_request(region_id, alter_kind).body
303        else {
304            unreachable!()
305        };
306        assert_eq!(alter_region_request.region_id, region_id.as_u64());
307        assert_eq!(alter_region_request.schema_version, 0);
308        assert_eq!(
309            alter_region_request.kind,
310            Some(region::alter_request::Kind::ModifyColumnTypes(
311                ModifyColumnTypes {
312                    modify_column_types: vec![ModifyColumnType {
313                        column_name: "cpu".to_string(),
314                        target_type: ColumnDataType::String as i32,
315                        target_type_extension: None,
316                    }]
317                }
318            ))
319        );
320    }
321}