meta_srv/procedure/region_migration/
close_downgraded_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::RegionIdent;
19use common_meta::distributed_time_constants::default_distributed_time_constants;
20use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
21use common_procedure::{Context as ProcedureContext, Status};
22use common_telemetry::tracing_context::TracingContext;
23use common_telemetry::{info, warn};
24use serde::{Deserialize, Serialize};
25use snafu::ResultExt;
26
27use crate::error::{self, Result};
28use crate::handler::HeartbeatMailbox;
29use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
30use crate::procedure::region_migration::{Context, State};
31use crate::service::mailbox::Channel;
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct CloseDowngradedRegion;
35
36#[async_trait::async_trait]
37#[typetag::serde]
38impl State for CloseDowngradedRegion {
39 async fn next(
40 &mut self,
41 ctx: &mut Context,
42 _procedure_ctx: &ProcedureContext,
43 ) -> Result<(Box<dyn State>, Status)> {
44 if let Err(err) = self.close_downgraded_leader_region(ctx).await {
45 let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
46 let region_ids = &ctx.persistent_ctx.region_ids;
47 warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
48 }
49 info!(
50 "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
51 ctx.persistent_ctx.region_ids,
52 ctx.persistent_ctx.from_peer,
53 ctx.persistent_ctx.to_peer,
54 ctx.persistent_ctx.trigger_reason,
55 ctx.volatile_ctx.metrics,
56 );
57 Ok((Box::new(RegionMigrationEnd), Status::done()))
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63}
64
65impl CloseDowngradedRegion {
66 async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
71 let pc = &ctx.persistent_ctx;
72 let downgrade_leader_datanode_id = pc.from_peer.id;
73 let region_ids = &ctx.persistent_ctx.region_ids;
74 let mut idents = Vec::with_capacity(region_ids.len());
75
76 for region_id in region_ids {
77 idents.push(RegionIdent {
78 datanode_id: downgrade_leader_datanode_id,
79 table_id: region_id.table_id(),
80 region_number: region_id.region_number(),
81 engine: String::new(),
83 });
84 }
85
86 Ok(Instruction::CloseRegions(idents))
87 }
88
89 async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
91 let close_instruction = self.build_close_region_instruction(ctx).await?;
92 let region_ids = &ctx.persistent_ctx.region_ids;
93 let pc = &ctx.persistent_ctx;
94 let downgrade_leader_datanode = &pc.from_peer;
95 let tracing_ctx = TracingContext::from_current_span();
96 let msg = MailboxMessage::json_message(
97 &format!("Close downgraded regions: {:?}", region_ids),
98 &format!("Metasrv@{}", ctx.server_addr()),
99 &format!(
100 "Datanode-{}@{}",
101 downgrade_leader_datanode.id, downgrade_leader_datanode.addr
102 ),
103 common_time::util::current_time_millis(),
104 &close_instruction,
105 Some(tracing_ctx.to_w3c()),
106 )
107 .with_context(|_| error::SerializeToJsonSnafu {
108 input: close_instruction.to_string(),
109 })?;
110
111 let ch = Channel::Datanode(downgrade_leader_datanode.id);
112 let receiver = ctx
113 .mailbox
114 .send(&ch, msg, default_distributed_time_constants().region_lease)
115 .await?;
116
117 match receiver.await {
118 Ok(msg) => {
119 let reply = HeartbeatMailbox::json_reply(&msg)?;
120 info!(
121 "Received close downgraded leade region reply: {:?}, region: {:?}",
122 reply, region_ids
123 );
124 let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
125 return error::UnexpectedInstructionReplySnafu {
126 mailbox_message: msg.to_string(),
127 reason: "expect close region reply",
128 }
129 .fail();
130 };
131
132 if result {
133 Ok(())
134 } else {
135 error::UnexpectedSnafu {
136 violated: format!(
137 "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
138 downgrade_leader_datanode,
139 ),
140 }
141 .fail()
142 }
143 }
144
145 Err(e) => Err(e),
146 }
147 }
148}