meta_srv/procedure/region_migration/
close_downgraded_region.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::{info, warn};
24use serde::{Deserialize, Serialize};
25use snafu::ResultExt;
26
27use crate::error::{self, Result};
28use crate::handler::HeartbeatMailbox;
29use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
30use crate::procedure::region_migration::{Context, State};
31use crate::service::mailbox::Channel;
32
33const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct CloseDowngradedRegion;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for CloseDowngradedRegion {
42 async fn next(
43 &mut self,
44 ctx: &mut Context,
45 _procedure_ctx: &ProcedureContext,
46 ) -> Result<(Box<dyn State>, Status)> {
47 if let Err(err) = self.close_downgraded_leader_region(ctx).await {
48 let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
49 let region_ids = &ctx.persistent_ctx.region_ids;
50 warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
51 }
52 info!(
53 "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
54 ctx.persistent_ctx.region_ids,
55 ctx.persistent_ctx.from_peer,
56 ctx.persistent_ctx.to_peer,
57 ctx.persistent_ctx.trigger_reason,
58 ctx.volatile_ctx.metrics,
59 );
60 Ok((Box::new(RegionMigrationEnd), Status::done()))
61 }
62
63 fn as_any(&self) -> &dyn Any {
64 self
65 }
66}
67
68impl CloseDowngradedRegion {
69 async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
74 let pc = &ctx.persistent_ctx;
75 let downgrade_leader_datanode_id = pc.from_peer.id;
76 let region_ids = &ctx.persistent_ctx.region_ids;
77 let mut idents = Vec::with_capacity(region_ids.len());
78
79 for region_id in region_ids {
80 idents.push(RegionIdent {
81 datanode_id: downgrade_leader_datanode_id,
82 table_id: region_id.table_id(),
83 region_number: region_id.region_number(),
84 engine: String::new(),
86 });
87 }
88
89 Ok(Instruction::CloseRegions(idents))
90 }
91
92 async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
94 let close_instruction = self.build_close_region_instruction(ctx).await?;
95 let region_ids = &ctx.persistent_ctx.region_ids;
96 let pc = &ctx.persistent_ctx;
97 let downgrade_leader_datanode = &pc.from_peer;
98 let msg = MailboxMessage::json_message(
99 &format!("Close downgraded regions: {:?}", region_ids),
100 &format!("Metasrv@{}", ctx.server_addr()),
101 &format!(
102 "Datanode-{}@{}",
103 downgrade_leader_datanode.id, downgrade_leader_datanode.addr
104 ),
105 common_time::util::current_time_millis(),
106 &close_instruction,
107 )
108 .with_context(|_| error::SerializeToJsonSnafu {
109 input: close_instruction.to_string(),
110 })?;
111
112 let ch = Channel::Datanode(downgrade_leader_datanode.id);
113 let receiver = ctx
114 .mailbox
115 .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
116 .await?;
117
118 match receiver.await {
119 Ok(msg) => {
120 let reply = HeartbeatMailbox::json_reply(&msg)?;
121 info!(
122 "Received close downgraded leade region reply: {:?}, region: {:?}",
123 reply, region_ids
124 );
125 let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
126 return error::UnexpectedInstructionReplySnafu {
127 mailbox_message: msg.to_string(),
128 reason: "expect close region reply",
129 }
130 .fail();
131 };
132
133 if result {
134 Ok(())
135 } else {
136 error::UnexpectedSnafu {
137 violated: format!(
138 "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
139 downgrade_leader_datanode,
140 ),
141 }
142 .fail()
143 }
144 }
145
146 Err(e) => Err(e),
147 }
148 }
149}