common_meta/ddl/alter_table/
region_request.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::region::region_request::Body;
19use api::v1::region::{
20 alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
21 RegionRequest, RegionRequestHeader,
22};
23use common_telemetry::tracing_context::TracingContext;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26use table::metadata::RawTableInfo;
27
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::error::{InvalidProtoMsgSnafu, Result};
30
31impl AlterTableProcedure {
32 pub(crate) fn make_alter_region_request(
35 &self,
36 region_id: RegionId,
37 kind: Option<alter_request::Kind>,
38 ) -> Result<RegionRequest> {
39 let table_info = self.data.table_info().unwrap();
41
42 Ok(RegionRequest {
43 header: Some(RegionRequestHeader {
44 tracing_context: TracingContext::from_current_span().to_w3c(),
45 ..Default::default()
46 }),
47 body: Some(Body::Alter(AlterRequest {
48 region_id: region_id.as_u64(),
49 schema_version: table_info.ident.version,
50 kind,
51 })),
52 })
53 }
54
55 pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
58 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
60 let table_info = self.data.table_info().unwrap();
62 let kind = create_proto_alter_kind(table_info, alter_kind)?;
63
64 Ok(kind)
65 }
66}
67
68fn create_proto_alter_kind(
73 table_info: &RawTableInfo,
74 alter_kind: &Kind,
75) -> Result<Option<alter_request::Kind>> {
76 match alter_kind {
77 Kind::AddColumns(x) => {
78 let existing_columns: HashSet<_> = table_info
80 .meta
81 .schema
82 .column_schemas
83 .iter()
84 .map(|col| &col.name)
85 .collect();
86 let mut next_column_id = table_info.meta.next_column_id;
87
88 let mut add_columns = Vec::with_capacity(x.add_columns.len());
89 for add_column in &x.add_columns {
90 let column_def = add_column
91 .column_def
92 .as_ref()
93 .context(InvalidProtoMsgSnafu {
94 err_msg: "'column_def' is absent",
95 })?;
96
97 if existing_columns.contains(&column_def.name) {
99 continue;
100 }
101
102 let column_id = next_column_id;
103 next_column_id += 1;
104 let column_def = RegionColumnDef {
105 column_def: Some(column_def.clone()),
106 column_id,
107 };
108
109 add_columns.push(AddColumn {
110 column_def: Some(column_def),
111 location: add_column.location.clone(),
112 });
113 }
114
115 Ok(Some(alter_request::Kind::AddColumns(AddColumns {
116 add_columns,
117 })))
118 }
119 Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
120 Kind::DropColumns(x) => {
121 let drop_columns = x
122 .drop_columns
123 .iter()
124 .map(|x| DropColumn {
125 name: x.name.clone(),
126 })
127 .collect::<Vec<_>>();
128
129 Ok(Some(alter_request::Kind::DropColumns(DropColumns {
130 drop_columns,
131 })))
132 }
133 Kind::RenameTable(_) => Ok(None),
134 Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
135 Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
136 Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
137 Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::collections::HashMap;
144 use std::sync::Arc;
145
146 use api::v1::add_column_location::LocationType;
147 use api::v1::alter_table_expr::Kind;
148 use api::v1::region::region_request::Body;
149 use api::v1::region::RegionColumnDef;
150 use api::v1::{
151 region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
152 ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
153 };
154 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
155 use store_api::storage::{RegionId, TableId};
156
157 use crate::ddl::alter_table::AlterTableProcedure;
158 use crate::ddl::test_util::columns::TestColumnDefBuilder;
159 use crate::ddl::test_util::create_table::{
160 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
161 };
162 use crate::ddl::DdlContext;
163 use crate::key::table_route::TableRouteValue;
164 use crate::peer::Peer;
165 use crate::rpc::ddl::AlterTableTask;
166 use crate::rpc::router::{Region, RegionRoute};
167 use crate::test_util::{new_ddl_context, MockDatanodeManager};
168
169 async fn prepare_ddl_context() -> (DdlContext, TableId, RegionId, String) {
171 let datanode_manager = Arc::new(MockDatanodeManager::new(()));
172 let ddl_context = new_ddl_context(datanode_manager);
173 let table_id = 1024;
174 let region_id = RegionId::new(table_id, 1);
175 let table_name = "foo";
176
177 let create_table = TestCreateTableExprBuilder::default()
178 .column_defs([
179 TestColumnDefBuilder::default()
180 .name("ts")
181 .data_type(ColumnDataType::TimestampMillisecond)
182 .semantic_type(SemanticType::Timestamp)
183 .build()
184 .unwrap()
185 .into(),
186 TestColumnDefBuilder::default()
187 .name("host")
188 .data_type(ColumnDataType::String)
189 .semantic_type(SemanticType::Tag)
190 .build()
191 .unwrap()
192 .into(),
193 TestColumnDefBuilder::default()
194 .name("cpu")
195 .data_type(ColumnDataType::Float64)
196 .semantic_type(SemanticType::Field)
197 .is_nullable(true)
198 .build()
199 .unwrap()
200 .into(),
201 ])
202 .table_id(table_id)
203 .time_index("ts")
204 .primary_keys(["host".into()])
205 .table_name(table_name)
206 .build()
207 .unwrap()
208 .into();
209 let table_info = build_raw_table_info_from_expr(&create_table);
210
211 ddl_context
213 .table_metadata_manager
214 .create_table_metadata(
215 table_info,
216 TableRouteValue::physical(vec![RegionRoute {
217 region: Region::new_test(region_id),
218 leader_peer: Some(Peer::empty(1)),
219 follower_peers: vec![],
220 leader_state: None,
221 leader_down_since: None,
222 }]),
223 HashMap::new(),
224 )
225 .await
226 .unwrap();
227 (ddl_context, table_id, region_id, table_name.to_string())
228 }
229
230 #[tokio::test]
231 async fn test_make_alter_region_request() {
232 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
233
234 let task = AlterTableTask {
235 alter_table: AlterTableExpr {
236 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
237 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
238 table_name,
239 kind: Some(Kind::AddColumns(AddColumns {
240 add_columns: vec![AddColumn {
241 column_def: Some(PbColumnDef {
242 name: "my_tag3".to_string(),
243 data_type: ColumnDataType::String as i32,
244 is_nullable: true,
245 default_constraint: Vec::new(),
246 semantic_type: SemanticType::Tag as i32,
247 comment: String::new(),
248 ..Default::default()
249 }),
250 location: Some(AddColumnLocation {
251 location_type: LocationType::After as i32,
252 after_column_name: "host".to_string(),
253 }),
254 add_if_not_exists: false,
255 }],
256 })),
257 },
258 };
259
260 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
261 procedure.on_prepare().await.unwrap();
262 let alter_kind = procedure.make_region_alter_kind().unwrap();
263 let Some(Body::Alter(alter_region_request)) = procedure
264 .make_alter_region_request(region_id, alter_kind)
265 .unwrap()
266 .body
267 else {
268 unreachable!()
269 };
270 assert_eq!(alter_region_request.region_id, region_id.as_u64());
271 assert_eq!(alter_region_request.schema_version, 1);
272 assert_eq!(
273 alter_region_request.kind,
274 Some(region::alter_request::Kind::AddColumns(
275 region::AddColumns {
276 add_columns: vec![region::AddColumn {
277 column_def: Some(RegionColumnDef {
278 column_def: Some(PbColumnDef {
279 name: "my_tag3".to_string(),
280 data_type: ColumnDataType::String as i32,
281 is_nullable: true,
282 default_constraint: Vec::new(),
283 semantic_type: SemanticType::Tag as i32,
284 comment: String::new(),
285 ..Default::default()
286 }),
287 column_id: 3,
288 }),
289 location: Some(AddColumnLocation {
290 location_type: LocationType::After as i32,
291 after_column_name: "host".to_string(),
292 }),
293 }]
294 }
295 ))
296 );
297 }
298
299 #[tokio::test]
300 async fn test_make_alter_column_type_region_request() {
301 let (ddl_context, table_id, region_id, table_name) = prepare_ddl_context().await;
302
303 let task = AlterTableTask {
304 alter_table: AlterTableExpr {
305 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
306 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
307 table_name,
308 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
309 modify_column_types: vec![ModifyColumnType {
310 column_name: "cpu".to_string(),
311 target_type: ColumnDataType::String as i32,
312 target_type_extension: None,
313 }],
314 })),
315 },
316 };
317
318 let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
319 procedure.on_prepare().await.unwrap();
320 let alter_kind = procedure.make_region_alter_kind().unwrap();
321 let Some(Body::Alter(alter_region_request)) = procedure
322 .make_alter_region_request(region_id, alter_kind)
323 .unwrap()
324 .body
325 else {
326 unreachable!()
327 };
328 assert_eq!(alter_region_request.region_id, region_id.as_u64());
329 assert_eq!(alter_region_request.schema_version, 1);
330 assert_eq!(
331 alter_region_request.kind,
332 Some(region::alter_request::Kind::ModifyColumnTypes(
333 ModifyColumnTypes {
334 modify_column_types: vec![ModifyColumnType {
335 column_name: "cpu".to_string(),
336 target_type: ColumnDataType::String as i32,
337 target_type_extension: None,
338 }]
339 }
340 ))
341 );
342 }
343}