meta_srv/procedure/region_migration/
close_downgraded_region.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::distributed_time_constants::REGION_LEASE_SECS;
20use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
21use common_meta::key::datanode_table::RegionInfo;
22use common_meta::RegionIdent;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27
28use crate::error::{self, Result};
29use crate::handler::HeartbeatMailbox;
30use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
31use crate::procedure::region_migration::{Context, State};
32use crate::service::mailbox::Channel;
33
34const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct CloseDowngradedRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for CloseDowngradedRegion {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 _procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 if let Err(err) = self.close_downgraded_leader_region(ctx).await {
49 let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
50 let region_id = ctx.region_id();
51 warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
52 }
53 info!(
54 "Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
55 ctx.region_id(),
56 ctx.persistent_ctx.from_peer,
57 ctx.persistent_ctx.to_peer,
58 ctx.persistent_ctx.trigger_reason,
59 ctx.volatile_ctx.metrics,
60 );
61 Ok((Box::new(RegionMigrationEnd), Status::done()))
62 }
63
64 fn as_any(&self) -> &dyn Any {
65 self
66 }
67}
68
69impl CloseDowngradedRegion {
70 async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
75 let pc = &ctx.persistent_ctx;
76 let downgrade_leader_datanode_id = pc.from_peer.id;
77 let table_id = pc.region_id.table_id();
78 let region_number = pc.region_id.region_number();
79 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
80
81 let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();
82
83 Ok(Instruction::CloseRegion(RegionIdent {
84 datanode_id: downgrade_leader_datanode_id,
85 table_id,
86 region_number,
87 engine,
88 }))
89 }
90
91 async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
93 let close_instruction = self.build_close_region_instruction(ctx).await?;
94 let region_id = ctx.region_id();
95 let pc = &ctx.persistent_ctx;
96 let downgrade_leader_datanode = &pc.from_peer;
97 let msg = MailboxMessage::json_message(
98 &format!("Close downgraded region: {}", region_id),
99 &format!("Metasrv@{}", ctx.server_addr()),
100 &format!(
101 "Datanode-{}@{}",
102 downgrade_leader_datanode.id, downgrade_leader_datanode.addr
103 ),
104 common_time::util::current_time_millis(),
105 &close_instruction,
106 )
107 .with_context(|_| error::SerializeToJsonSnafu {
108 input: close_instruction.to_string(),
109 })?;
110
111 let ch = Channel::Datanode(downgrade_leader_datanode.id);
112 let receiver = ctx
113 .mailbox
114 .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
115 .await?;
116
117 match receiver.await {
118 Ok(msg) => {
119 let reply = HeartbeatMailbox::json_reply(&msg)?;
120 info!(
121 "Received close downgraded leade region reply: {:?}, region: {}",
122 reply, region_id
123 );
124 let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
125 return error::UnexpectedInstructionReplySnafu {
126 mailbox_message: msg.to_string(),
127 reason: "expect close region reply",
128 }
129 .fail();
130 };
131
132 if result {
133 Ok(())
134 } else {
135 error::UnexpectedSnafu {
136 violated: format!(
137 "Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
138 downgrade_leader_datanode,
139 ),
140 }
141 .fail()
142 }
143 }
144
145 Err(e) => Err(e),
146 }
147 }
148}