meta_srv/procedure/region_migration/
close_downgraded_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::RegionIdent;
19use common_meta::distributed_time_constants::default_distributed_time_constants;
20use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
21use common_procedure::{Context as ProcedureContext, Status};
22use common_telemetry::{info, warn};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct CloseDowngradedRegion;
34
35#[async_trait::async_trait]
36#[typetag::serde]
37impl State for CloseDowngradedRegion {
38 async fn next(
39 &mut self,
40 ctx: &mut Context,
41 _procedure_ctx: &ProcedureContext,
42 ) -> Result<(Box<dyn State>, Status)> {
43 if let Err(err) = self.close_downgraded_leader_region(ctx).await {
44 let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
45 let region_ids = &ctx.persistent_ctx.region_ids;
46 warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
47 }
48 info!(
49 "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
50 ctx.persistent_ctx.region_ids,
51 ctx.persistent_ctx.from_peer,
52 ctx.persistent_ctx.to_peer,
53 ctx.persistent_ctx.trigger_reason,
54 ctx.volatile_ctx.metrics,
55 );
56 Ok((Box::new(RegionMigrationEnd), Status::done()))
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62}
63
64impl CloseDowngradedRegion {
65 async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
70 let pc = &ctx.persistent_ctx;
71 let downgrade_leader_datanode_id = pc.from_peer.id;
72 let region_ids = &ctx.persistent_ctx.region_ids;
73 let mut idents = Vec::with_capacity(region_ids.len());
74
75 for region_id in region_ids {
76 idents.push(RegionIdent {
77 datanode_id: downgrade_leader_datanode_id,
78 table_id: region_id.table_id(),
79 region_number: region_id.region_number(),
80 engine: String::new(),
82 });
83 }
84
85 Ok(Instruction::CloseRegions(idents))
86 }
87
88 async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
90 let close_instruction = self.build_close_region_instruction(ctx).await?;
91 let region_ids = &ctx.persistent_ctx.region_ids;
92 let pc = &ctx.persistent_ctx;
93 let downgrade_leader_datanode = &pc.from_peer;
94 let msg = MailboxMessage::json_message(
95 &format!("Close downgraded regions: {:?}", region_ids),
96 &format!("Metasrv@{}", ctx.server_addr()),
97 &format!(
98 "Datanode-{}@{}",
99 downgrade_leader_datanode.id, downgrade_leader_datanode.addr
100 ),
101 common_time::util::current_time_millis(),
102 &close_instruction,
103 )
104 .with_context(|_| error::SerializeToJsonSnafu {
105 input: close_instruction.to_string(),
106 })?;
107
108 let ch = Channel::Datanode(downgrade_leader_datanode.id);
109 let receiver = ctx
110 .mailbox
111 .send(&ch, msg, default_distributed_time_constants().region_lease)
112 .await?;
113
114 match receiver.await {
115 Ok(msg) => {
116 let reply = HeartbeatMailbox::json_reply(&msg)?;
117 info!(
118 "Received close downgraded leade region reply: {:?}, region: {:?}",
119 reply, region_ids
120 );
121 let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
122 return error::UnexpectedInstructionReplySnafu {
123 mailbox_message: msg.to_string(),
124 reason: "expect close region reply",
125 }
126 .fail();
127 };
128
129 if result {
130 Ok(())
131 } else {
132 error::UnexpectedSnafu {
133 violated: format!(
134 "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
135 downgrade_leader_datanode,
136 ),
137 }
138 .fail()
139 }
140 }
141
142 Err(e) => Err(e),
143 }
144 }
145}