common_meta/ddl/alter_table/
region_request.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::region_request::Body;
19use api::v1::region::{
20 alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
21 RegionRequest, RegionRequestHeader,
22};
23use common_telemetry::tracing_context::TracingContext;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26use table::metadata::RawTableInfo;
27
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::error::{InvalidProtoMsgSnafu, Result};
30
31impl AlterTableProcedure {
32 pub(crate) fn make_alter_region_request(
35 &self,
36 region_id: RegionId,
37 kind: Option<alter_request::Kind>,
38 ) -> Result<RegionRequest> {
39 let table_info = self.data.table_info().unwrap();
41
42 Ok(RegionRequest {
43 header: Some(RegionRequestHeader {
44 tracing_context: TracingContext::from_current_span().to_w3c(),
45 ..Default::default()
46 }),
47 body: Some(Body::Alter(AlterRequest {
48 region_id: region_id.as_u64(),
49 schema_version: table_info.ident.version,
50 kind,
51 })),
52 })
53 }
54
55 pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
58 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
60 let table_info = self.data.table_info().unwrap();
62 let kind = create_proto_alter_kind(table_info, alter_kind)?;
63
64 Ok(kind)
65 }
66}
67
68fn create_proto_alter_kind(
73 table_info: &RawTableInfo,
74 alter_kind: &Kind,
75) -> Result<Option<alter_request::Kind>> {
76 match alter_kind {
77 Kind::AddColumns(x) => {
78 let existing_columns: HashSet<_> = table_info
80 .meta
81 .schema
82 .column_schemas
83 .iter()
84 .map(|col| &col.name)
85 .collect();
86 let mut next_column_id = table_info.meta.next_column_id;
87
88 let mut add_columns = Vec::with_capacity(x.add_columns.len());
89 for add_column in &x.add_columns {
90 let column_def = add_column
91 .column_def
92 .as_ref()
93 .context(InvalidProtoMsgSnafu {
94 err_msg: "'column_def' is absent",
95 })?;
96
97 if existing_columns.contains(&column_def.name) {
99 continue;
100 }
101
102 let column_id = next_column_id;
103 next_column_id += 1;
104 let column_def = RegionColumnDef {
105 column_def: Some(column_def.clone()),
106 column_id,
107 };
108
109 add_columns.push(AddColumn {
110 column_def: Some(column_def),
111 location: add_column.location.clone(),
112 });
113 }
114
115 Ok(Some(alter_request::Kind::AddColumns(AddColumns {
116 add_columns,
117 })))
118 }
119 Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
120 Kind::DropColumns(x) => {
121 let drop_columns = x
122 .drop_columns
123 .iter()
124 .map(|x| DropColumn {
125 name: x.name.clone(),
126 })
127 .collect::<Vec<_>>();
128
129 Ok(Some(alter_request::Kind::DropColumns(DropColumns {
130 drop_columns,
131 })))
132 }
133 Kind::RenameTable(_) => Ok(None),
134 Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
135 Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
136 Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
137 Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
138 Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::collections::HashMap;
145 use std::sync::Arc;
146
147 use api::v1::add_column_location::LocationType;
148 use api::v1::alter_table_expr::Kind;
149 use api::v1::region::region_request::Body;
150 use api::v1::region::RegionColumnDef;
151 use api::v1::{
152 region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
153 ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
154 };
155 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
156 use store_api::storage::{RegionId, TableId};
157
158 use crate::ddl::alter_table::AlterTableProcedure;
159 use crate::ddl::test_util::columns::TestColumnDefBuilder;
160 use crate::ddl::test_util::create_table::{
161 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
162 };
163 use crate::ddl::DdlContext;
164 use crate::key::table_route::TableRouteValue;
165 use crate::peer::Peer;
166 use crate::rpc::ddl::AlterTableTask;
167 use crate::rpc::router::{Region, RegionRoute};
168 use crate::test_util::{new_ddl_context, MockDatanodeManager};
169
170 async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
172 let datanode_manager = Arc::new(MockDatanodeManager::new(()));
173 let ddl_context = new_ddl_context(datanode_manager);
174 let table_id = 1024;
175 let region_id = RegionId::new(table_id, 1);
176 let table_name = "foo";
177
178 let create_table = TestCreateTableExprBuilder::default()
179 .column_defs([
180 TestColumnDefBuilder::default()
181 .name("ts")
182 .data_type(ColumnDataType::TimestampMillisecond)
183 .semantic_type(SemanticType::Timestamp)
184 .build()
185 .unwrap()
186 .into(),
187 TestColumnDefBuilder::default()
188 .name("host")
189 .data_type(ColumnDataType::String)
190 .semantic_type(SemanticType::Tag)
191 .build()
192 .unwrap()
193 .into(),
194 TestColumnDefBuilder::default()
195 .name("cpu")
196 .data_type(ColumnDataType::Float64)
197 .semantic_type(SemanticType::Field)
198 .is_nullable(true)
199 .build()
200 .unwrap()
201 .into(),
202 ])
203 .table_id(table_id)
204 .time_index("ts")
205 .primary_keys(["host".into()])
206 .table_name(table_name)
207 .build()
208 .unwrap()
209 .into();
210 let table_info = build_raw_table_info_from_expr(&create_table);
211
212 ddl_context
214 .table_metadata_manager
215 .create_table_metadata(
216 table_info,
217 TableRouteValue::physical(vec![RegionRoute {
218 region: Region::new_test(region_id),
219 leader_peer: Some(Peer::empty(1)),
220 follower_peers: vec![],
221 leader_state: None,
222 leader_down_since: None,
223 }]),
224 HashMap::new(),
225 )
226 .await
227 .unwrap();
228 (ddl_context, table_id, region_id, table_name.to_string())
229 }
230
231 #[tokio::test]
232 async fn test_make_alter_region_request() {
233 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
234
235 let task = AlterTableTask {
236 alter_table: AlterTableExpr {
237 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
238 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
239 table_name,
240 kind: Some(Kind::AddColumns(AddColumns {
241 add_columns: vec![AddColumn {
242 column_def: Some(PbColumnDef {
243 name: "my_tag3".to_string(),
244 data_type: ColumnDataType::String as i32,
245 is_nullable: true,
246 default_constraint: Vec::new(),
247 semantic_type: SemanticType::Tag as i32,
248 comment: String::new(),
249 ..Default::default()
250 }),
251 location: Some(AddColumnLocation {
252 location_type: LocationType::After as i32,
253 after_column_name: "host".to_string(),
254 }),
255 add_if_not_exists: false,
256 }],
257 })),
258 },
259 };
260
261 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
262 procedure.on_prepare().await.unwrap();
263 let alter_kind = procedure.make_region_alter_kind().unwrap();
264 let Some(Body::Alter(alter_region_request)) = procedure
265 .make_alter_region_request(region_id, alter_kind)
266 .unwrap()
267 .body
268 else {
269 unreachable!()
270 };
271 assert_eq!(alter_region_request.region_id, region_id.as_u64());
272 assert_eq!(alter_region_request.schema_version, 1);
273 assert_eq!(
274 alter_region_request.kind,
275 Some(region::alter_request::Kind::AddColumns(
276 region::AddColumns {
277 add_columns: vec![region::AddColumn {
278 column_def: Some(RegionColumnDef {
279 column_def: Some(PbColumnDef {
280 name: "my_tag3".to_string(),
281 data_type: ColumnDataType::String as i32,
282 is_nullable: true,
283 default_constraint: Vec::new(),
284 semantic_type: SemanticType::Tag as i32,
285 comment: String::new(),
286 ..Default::default()
287 }),
288 column_id: 3,
289 }),
290 location: Some(AddColumnLocation {
291 location_type: LocationType::After as i32,
292 after_column_name: "host".to_string(),
293 }),
294 }]
295 }
296 ))
297 );
298 }
299
300 #[tokio::test]
301 async fn test_make_alter_column_type_region_request() {
302 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
303
304 let task = AlterTableTask {
305 alter_table: AlterTableExpr {
306 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
307 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
308 table_name,
309 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
310 modify_column_types: vec![ModifyColumnType {
311 column_name: "cpu".to_string(),
312 target_type: ColumnDataType::String as i32,
313 target_type_extension: None,
314 }],
315 })),
316 },
317 };
318
319 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
320 procedure.on_prepare().await.unwrap();
321 let alter_kind = procedure.make_region_alter_kind().unwrap();
322 let Some(Body::Alter(alter_region_request)) = procedure
323 .make_alter_region_request(region_id, alter_kind)
324 .unwrap()
325 .body
326 else {
327 unreachable!()
328 };
329 assert_eq!(alter_region_request.region_id, region_id.as_u64());
330 assert_eq!(alter_region_request.schema_version, 1);
331 assert_eq!(
332 alter_region_request.kind,
333 Some(region::alter_request::Kind::ModifyColumnTypes(
334 ModifyColumnTypes {
335 modify_column_types: vec![ModifyColumnType {
336 column_name: "cpu".to_string(),
337 target_type: ColumnDataType::String as i32,
338 target_type_extension: None,
339 }]
340 }
341 ))
342 );
343 }
344}