common_meta/ddl/alter_table/
region_request.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19 AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request,
20};
21use snafu::OptionExt;
22use table::metadata::TableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{self, InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28 pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33 let table_info = self.data.table_info().unwrap();
35 let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37 Ok(kind)
38 }
39}
40
41fn create_proto_alter_kind(
46 table_info: &TableInfo,
47 alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49 match alter_kind {
50 Kind::AddColumns(x) => {
51 let existing_columns: HashSet<_> = table_info
53 .meta
54 .schema
55 .column_schemas()
56 .iter()
57 .map(|col| &col.name)
58 .collect();
59 let mut next_column_id = table_info.meta.next_column_id;
60
61 let mut add_columns = Vec::with_capacity(x.add_columns.len());
62 for add_column in &x.add_columns {
63 let column_def = add_column
64 .column_def
65 .as_ref()
66 .context(InvalidProtoMsgSnafu {
67 err_msg: "'column_def' is absent",
68 })?;
69
70 if existing_columns.contains(&column_def.name) {
72 continue;
73 }
74
75 let column_id = next_column_id;
76 next_column_id += 1;
77 let column_def = RegionColumnDef {
78 column_def: Some(column_def.clone()),
79 column_id,
80 };
81
82 add_columns.push(AddColumn {
83 column_def: Some(column_def),
84 location: add_column.location.clone(),
85 });
86 }
87
88 Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89 add_columns,
90 })))
91 }
92 Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93 Kind::DropColumns(x) => {
94 let drop_columns = x
95 .drop_columns
96 .iter()
97 .map(|x| DropColumn {
98 name: x.name.clone(),
99 })
100 .collect::<Vec<_>>();
101
102 Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103 drop_columns,
104 })))
105 }
106 Kind::RenameTable(_) => Ok(None),
107 Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108 Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109 Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110 Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111 Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112 Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113 Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114 Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115 Kind::Repartition(_) => error::UnexpectedSnafu {
116 err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
117 }
118 .fail()?,
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::collections::HashMap;
125 use std::sync::Arc;
126
127 use api::v1::add_column_location::LocationType;
128 use api::v1::alter_table_expr::Kind;
129 use api::v1::region::RegionColumnDef;
130 use api::v1::region::region_request::Body;
131 use api::v1::{
132 AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
133 ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType, region,
134 };
135 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
136 use store_api::storage::{RegionId, TableId};
137
138 use crate::ddl::DdlContext;
139 use crate::ddl::alter_table::AlterTableProcedure;
140 use crate::ddl::alter_table::executor::make_alter_region_request;
141 use crate::ddl::test_util::columns::TestColumnDefBuilder;
142 use crate::ddl::test_util::create_table::{
143 TestCreateTableExprBuilder, build_raw_table_info_from_expr,
144 };
145 use crate::key::table_route::TableRouteValue;
146 use crate::peer::Peer;
147 use crate::rpc::ddl::AlterTableTask;
148 use crate::rpc::router::{Region, RegionRoute};
149 use crate::test_util::{MockDatanodeManager, new_ddl_context};
150
151 async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
153 let datanode_manager = Arc::new(MockDatanodeManager::new(()));
154 let ddl_context = new_ddl_context(datanode_manager);
155 let table_id = 1024;
156 let region_id = RegionId::new(table_id, 1);
157 let table_name = "foo";
158
159 let create_table = TestCreateTableExprBuilder::default()
160 .column_defs([
161 TestColumnDefBuilder::default()
162 .name("ts")
163 .data_type(ColumnDataType::TimestampMillisecond)
164 .semantic_type(SemanticType::Timestamp)
165 .build()
166 .unwrap()
167 .into(),
168 TestColumnDefBuilder::default()
169 .name("host")
170 .data_type(ColumnDataType::String)
171 .semantic_type(SemanticType::Tag)
172 .build()
173 .unwrap()
174 .into(),
175 TestColumnDefBuilder::default()
176 .name("cpu")
177 .data_type(ColumnDataType::Float64)
178 .semantic_type(SemanticType::Field)
179 .is_nullable(true)
180 .build()
181 .unwrap()
182 .into(),
183 ])
184 .table_id(table_id)
185 .time_index("ts")
186 .primary_keys(["host".into()])
187 .table_name(table_name)
188 .build()
189 .unwrap()
190 .into();
191 let table_info = build_raw_table_info_from_expr(&create_table);
192
193 ddl_context
195 .table_metadata_manager
196 .create_table_metadata(
197 table_info,
198 TableRouteValue::physical(vec![RegionRoute {
199 region: Region::new_test(region_id),
200 leader_peer: Some(Peer::empty(1)),
201 follower_peers: vec![],
202 leader_state: None,
203 leader_down_since: None,
204 write_route_policy: None,
205 }]),
206 HashMap::new(),
207 )
208 .await
209 .unwrap();
210 (ddl_context, table_id, region_id, table_name.to_string())
211 }
212
213 #[tokio::test]
214 async fn test_make_alter_region_request() {
215 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
216
217 let task = AlterTableTask {
218 alter_table: AlterTableExpr {
219 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
220 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
221 table_name,
222 kind: Some(Kind::AddColumns(AddColumns {
223 add_columns: vec![AddColumn {
224 column_def: Some(PbColumnDef {
225 name: "my_tag3".to_string(),
226 data_type: ColumnDataType::String as i32,
227 is_nullable: true,
228 default_constraint: Vec::new(),
229 semantic_type: SemanticType::Tag as i32,
230 comment: String::new(),
231 ..Default::default()
232 }),
233 location: Some(AddColumnLocation {
234 location_type: LocationType::After as i32,
235 after_column_name: "host".to_string(),
236 }),
237 add_if_not_exists: false,
238 }],
239 })),
240 },
241 };
242
243 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
244 procedure.on_prepare().await.unwrap();
245 let alter_kind = procedure.make_region_alter_kind().unwrap();
246 let Some(Body::Alter(alter_region_request)) =
247 make_alter_region_request(region_id, alter_kind).body
248 else {
249 unreachable!()
250 };
251 assert_eq!(alter_region_request.region_id, region_id.as_u64());
252 assert_eq!(alter_region_request.schema_version, 0);
253 assert_eq!(
254 alter_region_request.kind,
255 Some(region::alter_request::Kind::AddColumns(
256 region::AddColumns {
257 add_columns: vec![region::AddColumn {
258 column_def: Some(RegionColumnDef {
259 column_def: Some(PbColumnDef {
260 name: "my_tag3".to_string(),
261 data_type: ColumnDataType::String as i32,
262 is_nullable: true,
263 default_constraint: Vec::new(),
264 semantic_type: SemanticType::Tag as i32,
265 comment: String::new(),
266 ..Default::default()
267 }),
268 column_id: 3,
269 }),
270 location: Some(AddColumnLocation {
271 location_type: LocationType::After as i32,
272 after_column_name: "host".to_string(),
273 }),
274 }]
275 }
276 ))
277 );
278 }
279
280 #[tokio::test]
281 async fn test_make_alter_column_type_region_request() {
282 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
283
284 let task = AlterTableTask {
285 alter_table: AlterTableExpr {
286 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
287 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
288 table_name,
289 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
290 modify_column_types: vec![ModifyColumnType {
291 column_name: "cpu".to_string(),
292 target_type: ColumnDataType::String as i32,
293 target_type_extension: None,
294 }],
295 })),
296 },
297 };
298
299 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
300 procedure.on_prepare().await.unwrap();
301 let alter_kind = procedure.make_region_alter_kind().unwrap();
302 let Some(Body::Alter(alter_region_request)) =
303 make_alter_region_request(region_id, alter_kind).body
304 else {
305 unreachable!()
306 };
307 assert_eq!(alter_region_request.region_id, region_id.as_u64());
308 assert_eq!(alter_region_request.schema_version, 0);
309 assert_eq!(
310 alter_region_request.kind,
311 Some(region::alter_request::Kind::ModifyColumnTypes(
312 ModifyColumnTypes {
313 modify_column_types: vec![ModifyColumnType {
314 column_name: "cpu".to_string(),
315 target_type: ColumnDataType::String as i32,
316 target_type_extension: None,
317 }]
318 }
319 ))
320 );
321 }
322}