common_meta/ddl/alter_table/
region_request.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::{
19 alter_request, AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef,
20};
21use snafu::OptionExt;
22use table::metadata::RawTableInfo;
23
24use crate::ddl::alter_table::AlterTableProcedure;
25use crate::error::{InvalidProtoMsgSnafu, Result};
26
27impl AlterTableProcedure {
28 pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
31 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
33 let table_info = self.data.table_info().unwrap();
35 let kind = create_proto_alter_kind(table_info, alter_kind)?;
36
37 Ok(kind)
38 }
39}
40
41fn create_proto_alter_kind(
46 table_info: &RawTableInfo,
47 alter_kind: &Kind,
48) -> Result<Option<alter_request::Kind>> {
49 match alter_kind {
50 Kind::AddColumns(x) => {
51 let existing_columns: HashSet<_> = table_info
53 .meta
54 .schema
55 .column_schemas
56 .iter()
57 .map(|col| &col.name)
58 .collect();
59 let mut next_column_id = table_info.meta.next_column_id;
60
61 let mut add_columns = Vec::with_capacity(x.add_columns.len());
62 for add_column in &x.add_columns {
63 let column_def = add_column
64 .column_def
65 .as_ref()
66 .context(InvalidProtoMsgSnafu {
67 err_msg: "'column_def' is absent",
68 })?;
69
70 if existing_columns.contains(&column_def.name) {
72 continue;
73 }
74
75 let column_id = next_column_id;
76 next_column_id += 1;
77 let column_def = RegionColumnDef {
78 column_def: Some(column_def.clone()),
79 column_id,
80 };
81
82 add_columns.push(AddColumn {
83 column_def: Some(column_def),
84 location: add_column.location.clone(),
85 });
86 }
87
88 Ok(Some(alter_request::Kind::AddColumns(AddColumns {
89 add_columns,
90 })))
91 }
92 Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
93 Kind::DropColumns(x) => {
94 let drop_columns = x
95 .drop_columns
96 .iter()
97 .map(|x| DropColumn {
98 name: x.name.clone(),
99 })
100 .collect::<Vec<_>>();
101
102 Ok(Some(alter_request::Kind::DropColumns(DropColumns {
103 drop_columns,
104 })))
105 }
106 Kind::RenameTable(_) => Ok(None),
107 Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
108 Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
109 Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
110 Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
111 Kind::SetIndexes(v) => Ok(Some(alter_request::Kind::SetIndexes(v.clone()))),
112 Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113 Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114 Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use std::collections::HashMap;
121 use std::sync::Arc;
122
123 use api::v1::add_column_location::LocationType;
124 use api::v1::alter_table_expr::Kind;
125 use api::v1::region::region_request::Body;
126 use api::v1::region::RegionColumnDef;
127 use api::v1::{
128 region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
129 ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
130 };
131 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
132 use store_api::storage::{RegionId, TableId};
133
134 use crate::ddl::alter_table::executor::make_alter_region_request;
135 use crate::ddl::alter_table::AlterTableProcedure;
136 use crate::ddl::test_util::columns::TestColumnDefBuilder;
137 use crate::ddl::test_util::create_table::{
138 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
139 };
140 use crate::ddl::DdlContext;
141 use crate::key::table_route::TableRouteValue;
142 use crate::peer::Peer;
143 use crate::rpc::ddl::AlterTableTask;
144 use crate::rpc::router::{Region, RegionRoute};
145 use crate::test_util::{new_ddl_context, MockDatanodeManager};
146
147 async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
149 let datanode_manager = Arc::new(MockDatanodeManager::new(()));
150 let ddl_context = new_ddl_context(datanode_manager);
151 let table_id = 1024;
152 let region_id = RegionId::new(table_id, 1);
153 let table_name = "foo";
154
155 let create_table = TestCreateTableExprBuilder::default()
156 .column_defs([
157 TestColumnDefBuilder::default()
158 .name("ts")
159 .data_type(ColumnDataType::TimestampMillisecond)
160 .semantic_type(SemanticType::Timestamp)
161 .build()
162 .unwrap()
163 .into(),
164 TestColumnDefBuilder::default()
165 .name("host")
166 .data_type(ColumnDataType::String)
167 .semantic_type(SemanticType::Tag)
168 .build()
169 .unwrap()
170 .into(),
171 TestColumnDefBuilder::default()
172 .name("cpu")
173 .data_type(ColumnDataType::Float64)
174 .semantic_type(SemanticType::Field)
175 .is_nullable(true)
176 .build()
177 .unwrap()
178 .into(),
179 ])
180 .table_id(table_id)
181 .time_index("ts")
182 .primary_keys(["host".into()])
183 .table_name(table_name)
184 .build()
185 .unwrap()
186 .into();
187 let table_info = build_raw_table_info_from_expr(&create_table);
188
189 ddl_context
191 .table_metadata_manager
192 .create_table_metadata(
193 table_info,
194 TableRouteValue::physical(vec![RegionRoute {
195 region: Region::new_test(region_id),
196 leader_peer: Some(Peer::empty(1)),
197 follower_peers: vec![],
198 leader_state: None,
199 leader_down_since: None,
200 }]),
201 HashMap::new(),
202 )
203 .await
204 .unwrap();
205 (ddl_context, table_id, region_id, table_name.to_string())
206 }
207
208 #[tokio::test]
209 async fn test_make_alter_region_request() {
210 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
211
212 let task = AlterTableTask {
213 alter_table: AlterTableExpr {
214 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
215 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
216 table_name,
217 kind: Some(Kind::AddColumns(AddColumns {
218 add_columns: vec![AddColumn {
219 column_def: Some(PbColumnDef {
220 name: "my_tag3".to_string(),
221 data_type: ColumnDataType::String as i32,
222 is_nullable: true,
223 default_constraint: Vec::new(),
224 semantic_type: SemanticType::Tag as i32,
225 comment: String::new(),
226 ..Default::default()
227 }),
228 location: Some(AddColumnLocation {
229 location_type: LocationType::After as i32,
230 after_column_name: "host".to_string(),
231 }),
232 add_if_not_exists: false,
233 }],
234 })),
235 },
236 };
237
238 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
239 procedure.on_prepare().await.unwrap();
240 let alter_kind = procedure.make_region_alter_kind().unwrap();
241 let Some(Body::Alter(alter_region_request)) =
242 make_alter_region_request(region_id, alter_kind).body
243 else {
244 unreachable!()
245 };
246 assert_eq!(alter_region_request.region_id, region_id.as_u64());
247 assert_eq!(alter_region_request.schema_version, 0);
248 assert_eq!(
249 alter_region_request.kind,
250 Some(region::alter_request::Kind::AddColumns(
251 region::AddColumns {
252 add_columns: vec![region::AddColumn {
253 column_def: Some(RegionColumnDef {
254 column_def: Some(PbColumnDef {
255 name: "my_tag3".to_string(),
256 data_type: ColumnDataType::String as i32,
257 is_nullable: true,
258 default_constraint: Vec::new(),
259 semantic_type: SemanticType::Tag as i32,
260 comment: String::new(),
261 ..Default::default()
262 }),
263 column_id: 3,
264 }),
265 location: Some(AddColumnLocation {
266 location_type: LocationType::After as i32,
267 after_column_name: "host".to_string(),
268 }),
269 }]
270 }
271 ))
272 );
273 }
274
275 #[tokio::test]
276 async fn test_make_alter_column_type_region_request() {
277 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
278
279 let task = AlterTableTask {
280 alter_table: AlterTableExpr {
281 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
282 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
283 table_name,
284 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
285 modify_column_types: vec![ModifyColumnType {
286 column_name: "cpu".to_string(),
287 target_type: ColumnDataType::String as i32,
288 target_type_extension: None,
289 }],
290 })),
291 },
292 };
293
294 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
295 procedure.on_prepare().await.unwrap();
296 let alter_kind = procedure.make_region_alter_kind().unwrap();
297 let Some(Body::Alter(alter_region_request)) =
298 make_alter_region_request(region_id, alter_kind).body
299 else {
300 unreachable!()
301 };
302 assert_eq!(alter_region_request.region_id, region_id.as_u64());
303 assert_eq!(alter_region_request.schema_version, 0);
304 assert_eq!(
305 alter_region_request.kind,
306 Some(region::alter_request::Kind::ModifyColumnTypes(
307 ModifyColumnTypes {
308 modify_column_types: vec![ModifyColumnType {
309 column_name: "cpu".to_string(),
310 target_type: ColumnDataType::String as i32,
311 target_type_extension: None,
312 }]
313 }
314 ))
315 );
316 }
317}