common_meta/ddl/alter_logical_tables/
region_request.rsuse api::v1;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::peer::Peer;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, RegionRoute};
impl AlterLogicalTablesProcedure {
pub(crate) fn make_request(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alters(alter_requests)),
};
Ok(request)
}
fn make_alter_region_requests(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<AlterRequests> {
let tasks = &self.data.tasks;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for (task, table) in self
.data
.tasks
.iter()
.zip(self.data.table_info_values.iter())
{
for region_number in ®ions_on_this_peer {
let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
let request = self.make_alter_region_request(region_id, task, table)?;
requests.push(request);
}
}
Ok(AlterRequests { requests })
}
fn make_alter_region_request(
&self,
region_id: RegionId,
task: &AlterTableTask,
table: &TableInfoValue,
) -> Result<AlterRequest> {
let region_id = region_id.as_u64();
let schema_version = table.table_info.ident.version;
let kind = match &task.alter_table.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), };
Ok(AlterRequest {
region_id,
schema_version,
kind,
})
}
}
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
let add_columns = add_columns
.add_columns
.iter()
.map(|add_column| {
let region_column_def = RegionColumnDef {
column_def: add_column.column_def.clone(),
..Default::default() };
AddColumn {
column_def: Some(region_column_def),
..Default::default() }
})
.collect();
AddColumns { add_columns }
}