common_meta/ddl/alter_table/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::region_request::Body;
19use api::v1::region::{
20    alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
21    RegionRequest, RegionRequestHeader,
22};
23use common_telemetry::tracing_context::TracingContext;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26use table::metadata::RawTableInfo;
27
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::error::{InvalidProtoMsgSnafu, Result};
30
31impl AlterTableProcedure {
32    /// Makes alter region request from existing an alter kind.
33    /// Region alter request always add columns if not exist.
34    pub(crate) fn make_alter_region_request(
35        &self,
36        region_id: RegionId,
37        kind: Option<alter_request::Kind>,
38    ) -> Result<RegionRequest> {
39        // Safety: checked
40        let table_info = self.data.table_info().unwrap();
41
42        Ok(RegionRequest {
43            header: Some(RegionRequestHeader {
44                tracing_context: TracingContext::from_current_span().to_w3c(),
45                ..Default::default()
46            }),
47            body: Some(Body::Alter(AlterRequest {
48                region_id: region_id.as_u64(),
49                schema_version: table_info.ident.version,
50                kind,
51            })),
52        })
53    }
54
55    /// Makes alter kind proto that all regions can reuse.
56    /// Region alter request always add columns if not exist.
57    pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
58        // Safety: Checked in `AlterTableProcedure::new`.
59        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
60        // Safety: checked
61        let table_info = self.data.table_info().unwrap();
62        let kind = create_proto_alter_kind(table_info, alter_kind)?;
63
64        Ok(kind)
65    }
66}
67
68/// Creates region proto alter kind from `table_info` and `alter_kind`.
69///
70/// It always adds column if not exists and drops column if exists.
71/// It skips the column if it already exists in the table.
72fn create_proto_alter_kind(
73    table_info: &RawTableInfo,
74    alter_kind: &Kind,
75) -> Result<Option<alter_request::Kind>> {
76    match alter_kind {
77        Kind::AddColumns(x) => {
78            // Construct a set of existing columns in the table.
79            let existing_columns: HashSet<_> = table_info
80                .meta
81                .schema
82                .column_schemas
83                .iter()
84                .map(|col| &col.name)
85                .collect();
86            let mut next_column_id = table_info.meta.next_column_id;
87
88            let mut add_columns = Vec::with_capacity(x.add_columns.len());
89            for add_column in &x.add_columns {
90                let column_def = add_column
91                    .column_def
92                    .as_ref()
93                    .context(InvalidProtoMsgSnafu {
94                        err_msg: "'column_def' is absent",
95                    })?;
96
97                // Skips existing columns.
98                if existing_columns.contains(&column_def.name) {
99                    continue;
100                }
101
102                let column_id = next_column_id;
103                next_column_id += 1;
104                let column_def = RegionColumnDef {
105                    column_def: Some(column_def.clone()),
106                    column_id,
107                };
108
109                add_columns.push(AddColumn {
110                    column_def: Some(column_def),
111                    location: add_column.location.clone(),
112                });
113            }
114
115            Ok(Some(alter_request::Kind::AddColumns(AddColumns {
116                add_columns,
117            })))
118        }
119        Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
120        Kind::DropColumns(x) => {
121            let drop_columns = x
122                .drop_columns
123                .iter()
124                .map(|x| DropColumn {
125                    name: x.name.clone(),
126                })
127                .collect::<Vec<_>>();
128
129            Ok(Some(alter_request::Kind::DropColumns(DropColumns {
130                drop_columns,
131            })))
132        }
133        Kind::RenameTable(_) => Ok(None),
134        Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
135        Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
136        Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
137        Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
138        Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::collections::HashMap;
145    use std::sync::Arc;
146
147    use api::v1::add_column_location::LocationType;
148    use api::v1::alter_table_expr::Kind;
149    use api::v1::region::region_request::Body;
150    use api::v1::region::RegionColumnDef;
151    use api::v1::{
152        region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
153        ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
154    };
155    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
156    use store_api::storage::{RegionId, TableId};
157
158    use crate::ddl::alter_table::AlterTableProcedure;
159    use crate::ddl::test_util::columns::TestColumnDefBuilder;
160    use crate::ddl::test_util::create_table::{
161        build_raw_table_info_from_expr, TestCreateTableExprBuilder,
162    };
163    use crate::ddl::DdlContext;
164    use crate::key::table_route::TableRouteValue;
165    use crate::peer::Peer;
166    use crate::rpc::ddl::AlterTableTask;
167    use crate::rpc::router::{Region, RegionRoute};
168    use crate::test_util::{new_ddl_context, MockDatanodeManager};
169
170    /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
171    async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
172        let datanode_manager = Arc::new(MockDatanodeManager::new(()));
173        let ddl_context = new_ddl_context(datanode_manager);
174        let table_id = 1024;
175        let region_id = RegionId::new(table_id, 1);
176        let table_name = "foo";
177
178        let create_table = TestCreateTableExprBuilder::default()
179            .column_defs([
180                TestColumnDefBuilder::default()
181                    .name("ts")
182                    .data_type(ColumnDataType::TimestampMillisecond)
183                    .semantic_type(SemanticType::Timestamp)
184                    .build()
185                    .unwrap()
186                    .into(),
187                TestColumnDefBuilder::default()
188                    .name("host")
189                    .data_type(ColumnDataType::String)
190                    .semantic_type(SemanticType::Tag)
191                    .build()
192                    .unwrap()
193                    .into(),
194                TestColumnDefBuilder::default()
195                    .name("cpu")
196                    .data_type(ColumnDataType::Float64)
197                    .semantic_type(SemanticType::Field)
198                    .is_nullable(true)
199                    .build()
200                    .unwrap()
201                    .into(),
202            ])
203            .table_id(table_id)
204            .time_index("ts")
205            .primary_keys(["host".into()])
206            .table_name(table_name)
207            .build()
208            .unwrap()
209            .into();
210        let table_info = build_raw_table_info_from_expr(&create_table);
211
212        // Puts a value to table name key.
213        ddl_context
214            .table_metadata_manager
215            .create_table_metadata(
216                table_info,
217                TableRouteValue::physical(vec![RegionRoute {
218                    region: Region::new_test(region_id),
219                    leader_peer: Some(Peer::empty(1)),
220                    follower_peers: vec![],
221                    leader_state: None,
222                    leader_down_since: None,
223                }]),
224                HashMap::new(),
225            )
226            .await
227            .unwrap();
228        (ddl_context, table_id, region_id, table_name.to_string())
229    }
230
231    #[tokio::test]
232    async fn test_make_alter_region_request() {
233        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
234
235        let task = AlterTableTask {
236            alter_table: AlterTableExpr {
237                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
238                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
239                table_name,
240                kind: Some(Kind::AddColumns(AddColumns {
241                    add_columns: vec![AddColumn {
242                        column_def: Some(PbColumnDef {
243                            name: "my_tag3".to_string(),
244                            data_type: ColumnDataType::String as i32,
245                            is_nullable: true,
246                            default_constraint: Vec::new(),
247                            semantic_type: SemanticType::Tag as i32,
248                            comment: String::new(),
249                            ..Default::default()
250                        }),
251                        location: Some(AddColumnLocation {
252                            location_type: LocationType::After as i32,
253                            after_column_name: "host".to_string(),
254                        }),
255                        add_if_not_exists: false,
256                    }],
257                })),
258            },
259        };
260
261        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
262        procedure.on_prepare().await.unwrap();
263        let alter_kind = procedure.make_region_alter_kind().unwrap();
264        let Some(Body::Alter(alter_region_request)) = procedure
265            .make_alter_region_request(region_id, alter_kind)
266            .unwrap()
267            .body
268        else {
269            unreachable!()
270        };
271        assert_eq!(alter_region_request.region_id, region_id.as_u64());
272        assert_eq!(alter_region_request.schema_version, 1);
273        assert_eq!(
274            alter_region_request.kind,
275            Some(region::alter_request::Kind::AddColumns(
276                region::AddColumns {
277                    add_columns: vec![region::AddColumn {
278                        column_def: Some(RegionColumnDef {
279                            column_def: Some(PbColumnDef {
280                                name: "my_tag3".to_string(),
281                                data_type: ColumnDataType::String as i32,
282                                is_nullable: true,
283                                default_constraint: Vec::new(),
284                                semantic_type: SemanticType::Tag as i32,
285                                comment: String::new(),
286                                ..Default::default()
287                            }),
288                            column_id: 3,
289                        }),
290                        location: Some(AddColumnLocation {
291                            location_type: LocationType::After as i32,
292                            after_column_name: "host".to_string(),
293                        }),
294                    }]
295                }
296            ))
297        );
298    }
299
300    #[tokio::test]
301    async fn test_make_alter_column_type_region_request() {
302        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
303
304        let task = AlterTableTask {
305            alter_table: AlterTableExpr {
306                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
307                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
308                table_name,
309                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
310                    modify_column_types: vec![ModifyColumnType {
311                        column_name: "cpu".to_string(),
312                        target_type: ColumnDataType::String as i32,
313                        target_type_extension: None,
314                    }],
315                })),
316            },
317        };
318
319        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
320        procedure.on_prepare().await.unwrap();
321        let alter_kind = procedure.make_region_alter_kind().unwrap();
322        let Some(Body::Alter(alter_region_request)) = procedure
323            .make_alter_region_request(region_id, alter_kind)
324            .unwrap()
325            .body
326        else {
327            unreachable!()
328        };
329        assert_eq!(alter_region_request.region_id, region_id.as_u64());
330        assert_eq!(alter_region_request.schema_version, 1);
331        assert_eq!(
332            alter_region_request.kind,
333            Some(region::alter_request::Kind::ModifyColumnTypes(
334                ModifyColumnTypes {
335                    modify_column_types: vec![ModifyColumnType {
336                        column_name: "cpu".to_string(),
337                        target_type: ColumnDataType::String as i32,
338                        target_type_extension: None,
339                    }]
340                }
341            ))
342        );
343    }
344}