common_meta/ddl/alter_table/
region_request.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19 AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, alter_request,
20};
21use snafu::OptionExt;
22use table::metadata::RawTableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{self, InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28 pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33 let table_info = self.data.table_info().unwrap();
35 let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37 Ok(kind)
38 }
39}
40
41fn create_proto_alter_kind(
46 table_info: &RawTableInfo,
47 alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49 match alter_kind {
50 Kind::AddColumns(x) => {
51 let existing_columns: HashSet<_> = table_info
53 .meta
54 .schema
55 .column_schemas
56 .iter()
57 .map(|col| &col.name)
58 .collect();
59 let mut next_column_id = table_info.meta.next_column_id;
60
61 let mut add_columns = Vec::with_capacity(x.add_columns.len());
62 for add_column in &x.add_columns {
63 let column_def = add_column
64 .column_def
65 .as_ref()
66 .context(InvalidProtoMsgSnafu {
67 err_msg: "'column_def' is absent",
68 })?;
69
70 if existing_columns.contains(&column_def.name) {
72 continue;
73 }
74
75 let column_id = next_column_id;
76 next_column_id += 1;
77 let column_def = RegionColumnDef {
78 column_def: Some(column_def.clone()),
79 column_id,
80 };
81
82 add_columns.push(AddColumn {
83 column_def: Some(column_def),
84 location: add_column.location.clone(),
85 });
86 }
87
88 Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89 add_columns,
90 })))
91 }
92 Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93 Kind::DropColumns(x) => {
94 let drop_columns = x
95 .drop_columns
96 .iter()
97 .map(|x| DropColumn {
98 name: x.name.clone(),
99 })
100 .collect::<Vec<_>>();
101
102 Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103 drop_columns,
104 })))
105 }
106 Kind::RenameTable(_) => Ok(None),
107 Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108 Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109 Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110 Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111 Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112 Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113 Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114 Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115 Kind::Repartition(_) => error::UnexpectedSnafu {
116 err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
117 }
118 .fail()?,
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::collections::HashMap;
125 use std::sync::Arc;
126
127 use api::v1::add_column_location::LocationType;
128 use api::v1::alter_table_expr::Kind;
129 use api::v1::region::RegionColumnDef;
130 use api::v1::region::region_request::Body;
131 use api::v1::{
132 AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
133 ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType, region,
134 };
135 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
136 use store_api::storage::{RegionId, TableId};
137
138 use crate::ddl::DdlContext;
139 use crate::ddl::alter_table::AlterTableProcedure;
140 use crate::ddl::alter_table::executor::make_alter_region_request;
141 use crate::ddl::test_util::columns::TestColumnDefBuilder;
142 use crate::ddl::test_util::create_table::{
143 TestCreateTableExprBuilder, build_raw_table_info_from_expr,
144 };
145 use crate::key::table_route::TableRouteValue;
146 use crate::peer::Peer;
147 use crate::rpc::ddl::AlterTableTask;
148 use crate::rpc::router::{Region, RegionRoute};
149 use crate::test_util::{MockDatanodeManager, new_ddl_context};
150
151 async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
153 let datanode_manager = Arc::new(MockDatanodeManager::new(()));
154 let ddl_context = new_ddl_context(datanode_manager);
155 let table_id = 1024;
156 let region_id = RegionId::new(table_id, 1);
157 let table_name = "foo";
158
159 let create_table = TestCreateTableExprBuilder::default()
160 .column_defs([
161 TestColumnDefBuilder::default()
162 .name("ts")
163 .data_type(ColumnDataType::TimestampMillisecond)
164 .semantic_type(SemanticType::Timestamp)
165 .build()
166 .unwrap()
167 .into(),
168 TestColumnDefBuilder::default()
169 .name("host")
170 .data_type(ColumnDataType::String)
171 .semantic_type(SemanticType::Tag)
172 .build()
173 .unwrap()
174 .into(),
175 TestColumnDefBuilder::default()
176 .name("cpu")
177 .data_type(ColumnDataType::Float64)
178 .semantic_type(SemanticType::Field)
179 .is_nullable(true)
180 .build()
181 .unwrap()
182 .into(),
183 ])
184 .table_id(table_id)
185 .time_index("ts")
186 .primary_keys(["host".into()])
187 .table_name(table_name)
188 .build()
189 .unwrap()
190 .into();
191 let table_info = build_raw_table_info_from_expr(&create_table);
192
193 ddl_context
195 .table_metadata_manager
196 .create_table_metadata(
197 table_info,
198 TableRouteValue::physical(vec![RegionRoute {
199 region: Region::new_test(region_id),
200 leader_peer: Some(Peer::empty(1)),
201 follower_peers: vec![],
202 leader_state: None,
203 leader_down_since: None,
204 }]),
205 HashMap::new(),
206 )
207 .await
208 .unwrap();
209 (ddl_context, table_id, region_id, table_name.to_string())
210 }
211
212 #[tokio::test]
213 async fn test_make_alter_region_request() {
214 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
215
216 let task = AlterTableTask {
217 alter_table: AlterTableExpr {
218 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
219 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
220 table_name,
221 kind: Some(Kind::AddColumns(AddColumns {
222 add_columns: vec![AddColumn {
223 column_def: Some(PbColumnDef {
224 name: "my_tag3".to_string(),
225 data_type: ColumnDataType::String as i32,
226 is_nullable: true,
227 default_constraint: Vec::new(),
228 semantic_type: SemanticType::Tag as i32,
229 comment: String::new(),
230 ..Default::default()
231 }),
232 location: Some(AddColumnLocation {
233 location_type: LocationType::After as i32,
234 after_column_name: "host".to_string(),
235 }),
236 add_if_not_exists: false,
237 }],
238 })),
239 },
240 };
241
242 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
243 procedure.on_prepare().await.unwrap();
244 let alter_kind = procedure.make_region_alter_kind().unwrap();
245 let Some(Body::Alter(alter_region_request)) =
246 make_alter_region_request(region_id, alter_kind).body
247 else {
248 unreachable!()
249 };
250 assert_eq!(alter_region_request.region_id, region_id.as_u64());
251 assert_eq!(alter_region_request.schema_version, 0);
252 assert_eq!(
253 alter_region_request.kind,
254 Some(region::alter_request::Kind::AddColumns(
255 region::AddColumns {
256 add_columns: vec![region::AddColumn {
257 column_def: Some(RegionColumnDef {
258 column_def: Some(PbColumnDef {
259 name: "my_tag3".to_string(),
260 data_type: ColumnDataType::String as i32,
261 is_nullable: true,
262 default_constraint: Vec::new(),
263 semantic_type: SemanticType::Tag as i32,
264 comment: String::new(),
265 ..Default::default()
266 }),
267 column_id: 3,
268 }),
269 location: Some(AddColumnLocation {
270 location_type: LocationType::After as i32,
271 after_column_name: "host".to_string(),
272 }),
273 }]
274 }
275 ))
276 );
277 }
278
279 #[tokio::test]
280 async fn test_make_alter_column_type_region_request() {
281 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
282
283 let task = AlterTableTask {
284 alter_table: AlterTableExpr {
285 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
286 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
287 table_name,
288 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
289 modify_column_types: vec![ModifyColumnType {
290 column_name: "cpu".to_string(),
291 target_type: ColumnDataType::String as i32,
292 target_type_extension: None,
293 }],
294 })),
295 },
296 };
297
298 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
299 procedure.on_prepare().await.unwrap();
300 let alter_kind = procedure.make_region_alter_kind().unwrap();
301 let Some(Body::Alter(alter_region_request)) =
302 make_alter_region_request(region_id, alter_kind).body
303 else {
304 unreachable!()
305 };
306 assert_eq!(alter_region_request.region_id, region_id.as_u64());
307 assert_eq!(alter_region_request.schema_version, 0);
308 assert_eq!(
309 alter_region_request.kind,
310 Some(region::alter_request::Kind::ModifyColumnTypes(
311 ModifyColumnTypes {
312 modify_column_types: vec![ModifyColumnType {
313 column_name: "cpu".to_string(),
314 target_type: ColumnDataType::String as i32,
315 target_type_extension: None,
316 }]
317 }
318 ))
319 );
320 }
321}