common_meta/ddl/alter_table/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::region_request::Body;
19use api::v1::region::{
20    alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
21    RegionRequest, RegionRequestHeader,
22};
23use common_telemetry::tracing_context::TracingContext;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26use table::metadata::RawTableInfo;
27
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::error::{InvalidProtoMsgSnafu, Result};
30
31impl AlterTableProcedure {
32    /// Makes alter region request from existing an alter kind.
33    /// Region alter request always add columns if not exist.
34    pub(crate) fn make_alter_region_request(
35        &self,
36        region_id: RegionId,
37        kind: Option<alter_request::Kind>,
38    ) -> Result<RegionRequest> {
39        // Safety: checked
40        let table_info = self.data.table_info().unwrap();
41
42        Ok(RegionRequest {
43            header: Some(RegionRequestHeader {
44                tracing_context: TracingContext::from_current_span().to_w3c(),
45                ..Default::default()
46            }),
47            body: Some(Body::Alter(AlterRequest {
48                region_id: region_id.as_u64(),
49                schema_version: table_info.ident.version,
50                kind,
51            })),
52        })
53    }
54
55    /// Makes alter kind proto that all regions can reuse.
56    /// Region alter request always add columns if not exist.
57    pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
58        // Safety: Checked in `AlterTableProcedure::new`.
59        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
60        // Safety: checked
61        let table_info = self.data.table_info().unwrap();
62        let kind = create_proto_alter_kind(table_info, alter_kind)?;
63
64        Ok(kind)
65    }
66}
67
68/// Creates region proto alter kind from `table_info` and `alter_kind`.
69///
70/// It always adds column if not exists and drops column if exists.
71/// It skips the column if it already exists in the table.
72fn create_proto_alter_kind(
73    table_info: &RawTableInfo,
74    alter_kind: &Kind,
75) -> Result<Option<alter_request::Kind>> {
76    match alter_kind {
77        Kind::AddColumns(x) => {
78            // Construct a set of existing columns in the table.
79            let existing_columns: HashSet<_> = table_info
80                .meta
81                .schema
82                .column_schemas
83                .iter()
84                .map(|col| &col.name)
85                .collect();
86            let mut next_column_id = table_info.meta.next_column_id;
87
88            let mut add_columns = Vec::with_capacity(x.add_columns.len());
89            for add_column in &x.add_columns {
90                let column_def = add_column
91                    .column_def
92                    .as_ref()
93                    .context(InvalidProtoMsgSnafu {
94                        err_msg: "'column_def' is absent",
95                    })?;
96
97                // Skips existing columns.
98                if existing_columns.contains(&column_def.name) {
99                    continue;
100                }
101
102                let column_id = next_column_id;
103                next_column_id += 1;
104                let column_def = RegionColumnDef {
105                    column_def: Some(column_def.clone()),
106                    column_id,
107                };
108
109                add_columns.push(AddColumn {
110                    column_def: Some(column_def),
111                    location: add_column.location.clone(),
112                });
113            }
114
115            Ok(Some(alter_request::Kind::AddColumns(AddColumns {
116                add_columns,
117            })))
118        }
119        Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
120        Kind::DropColumns(x) => {
121            let drop_columns = x
122                .drop_columns
123                .iter()
124                .map(|x| DropColumn {
125                    name: x.name.clone(),
126                })
127                .collect::<Vec<_>>();
128
129            Ok(Some(alter_request::Kind::DropColumns(DropColumns {
130                drop_columns,
131            })))
132        }
133        Kind::RenameTable(_) => Ok(None),
134        Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
135        Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
136        Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
137        Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::collections::HashMap;
144    use std::sync::Arc;
145
146    use api::v1::add_column_location::LocationType;
147    use api::v1::alter_table_expr::Kind;
148    use api::v1::region::region_request::Body;
149    use api::v1::region::RegionColumnDef;
150    use api::v1::{
151        region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
152        ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
153    };
154    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
155    use store_api::storage::{RegionId, TableId};
156
157    use crate::ddl::alter_table::AlterTableProcedure;
158    use crate::ddl::test_util::columns::TestColumnDefBuilder;
159    use crate::ddl::test_util::create_table::{
160        build_raw_table_info_from_expr, TestCreateTableExprBuilder,
161    };
162    use crate::ddl::DdlContext;
163    use crate::key::table_route::TableRouteValue;
164    use crate::peer::Peer;
165    use crate::rpc::ddl::AlterTableTask;
166    use crate::rpc::router::{Region, RegionRoute};
167    use crate::test_util::{new_ddl_context, MockDatanodeManager};
168
169    /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
170    async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
171        let datanode_manager = Arc::new(MockDatanodeManager::new(()));
172        let ddl_context = new_ddl_context(datanode_manager);
173        let table_id = 1024;
174        let region_id = RegionId::new(table_id, 1);
175        let table_name = "foo";
176
177        let create_table = TestCreateTableExprBuilder::default()
178            .column_defs([
179                TestColumnDefBuilder::default()
180                    .name("ts")
181                    .data_type(ColumnDataType::TimestampMillisecond)
182                    .semantic_type(SemanticType::Timestamp)
183                    .build()
184                    .unwrap()
185                    .into(),
186                TestColumnDefBuilder::default()
187                    .name("host")
188                    .data_type(ColumnDataType::String)
189                    .semantic_type(SemanticType::Tag)
190                    .build()
191                    .unwrap()
192                    .into(),
193                TestColumnDefBuilder::default()
194                    .name("cpu")
195                    .data_type(ColumnDataType::Float64)
196                    .semantic_type(SemanticType::Field)
197                    .is_nullable(true)
198                    .build()
199                    .unwrap()
200                    .into(),
201            ])
202            .table_id(table_id)
203            .time_index("ts")
204            .primary_keys(["host".into()])
205            .table_name(table_name)
206            .build()
207            .unwrap()
208            .into();
209        let table_info = build_raw_table_info_from_expr(&create_table);
210
211        // Puts a value to table name key.
212        ddl_context
213            .table_metadata_manager
214            .create_table_metadata(
215                table_info,
216                TableRouteValue::physical(vec![RegionRoute {
217                    region: Region::new_test(region_id),
218                    leader_peer: Some(Peer::empty(1)),
219                    follower_peers: vec![],
220                    leader_state: None,
221                    leader_down_since: None,
222                }]),
223                HashMap::new(),
224            )
225            .await
226            .unwrap();
227        (ddl_context, table_id, region_id, table_name.to_string())
228    }
229
230    #[tokio::test]
231    async fn test_make_alter_region_request() {
232        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
233
234        let task = AlterTableTask {
235            alter_table: AlterTableExpr {
236                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
237                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
238                table_name,
239                kind: Some(Kind::AddColumns(AddColumns {
240                    add_columns: vec![AddColumn {
241                        column_def: Some(PbColumnDef {
242                            name: "my_tag3".to_string(),
243                            data_type: ColumnDataType::String as i32,
244                            is_nullable: true,
245                            default_constraint: Vec::new(),
246                            semantic_type: SemanticType::Tag as i32,
247                            comment: String::new(),
248                            ..Default::default()
249                        }),
250                        location: Some(AddColumnLocation {
251                            location_type: LocationType::After as i32,
252                            after_column_name: "host".to_string(),
253                        }),
254                        add_if_not_exists: false,
255                    }],
256                })),
257            },
258        };
259
260        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
261        procedure.on_prepare().await.unwrap();
262        let alter_kind = procedure.make_region_alter_kind().unwrap();
263        let Some(Body::Alter(alter_region_request)) = procedure
264            .make_alter_region_request(region_id, alter_kind)
265            .unwrap()
266            .body
267        else {
268            unreachable!()
269        };
270        assert_eq!(alter_region_request.region_id, region_id.as_u64());
271        assert_eq!(alter_region_request.schema_version, 1);
272        assert_eq!(
273            alter_region_request.kind,
274            Some(region::alter_request::Kind::AddColumns(
275                region::AddColumns {
276                    add_columns: vec![region::AddColumn {
277                        column_def: Some(RegionColumnDef {
278                            column_def: Some(PbColumnDef {
279                                name: "my_tag3".to_string(),
280                                data_type: ColumnDataType::String as i32,
281                                is_nullable: true,
282                                default_constraint: Vec::new(),
283                                semantic_type: SemanticType::Tag as i32,
284                                comment: String::new(),
285                                ..Default::default()
286                            }),
287                            column_id: 3,
288                        }),
289                        location: Some(AddColumnLocation {
290                            location_type: LocationType::After as i32,
291                            after_column_name: "host".to_string(),
292                        }),
293                    }]
294                }
295            ))
296        );
297    }
298
299    #[tokio::test]
300    async fn test_make_alter_column_type_region_request() {
301        let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
302
303        let task = AlterTableTask {
304            alter_table: AlterTableExpr {
305                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
306                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
307                table_name,
308                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
309                    modify_column_types: vec![ModifyColumnType {
310                        column_name: "cpu".to_string(),
311                        target_type: ColumnDataType::String as i32,
312                        target_type_extension: None,
313                    }],
314                })),
315            },
316        };
317
318        let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
319        procedure.on_prepare().await.unwrap();
320        let alter_kind = procedure.make_region_alter_kind().unwrap();
321        let Some(Body::Alter(alter_region_request)) = procedure
322            .make_alter_region_request(region_id, alter_kind)
323            .unwrap()
324            .body
325        else {
326            unreachable!()
327        };
328        assert_eq!(alter_region_request.region_id, region_id.as_u64());
329        assert_eq!(alter_region_request.schema_version, 1);
330        assert_eq!(
331            alter_region_request.kind,
332            Some(region::alter_request::Kind::ModifyColumnTypes(
333                ModifyColumnTypes {
334                    modify_column_types: vec![ModifyColumnType {
335                        column_name: "cpu".to_string(),
336                        target_type: ColumnDataType::String as i32,
337                        target_type_extension: None,
338                    }]
339                }
340            ))
341        );
342    }
343}