meta_srv/procedure/region_migration/
close_downgraded_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::RegionIdent;
19use common_meta::distributed_time_constants::default_distributed_time_constants;
20use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
21use common_procedure::{Context as ProcedureContext, Status};
22use common_telemetry::{info, warn};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct CloseDowngradedRegion;
34
35#[async_trait::async_trait]
36#[typetag::serde]
37impl State for CloseDowngradedRegion {
38    async fn next(
39        &mut self,
40        ctx: &mut Context,
41        _procedure_ctx: &ProcedureContext,
42    ) -> Result<(Box<dyn State>, Status)> {
43        if let Err(err) = self.close_downgraded_leader_region(ctx).await {
44            let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
45            let region_ids = &ctx.persistent_ctx.region_ids;
46            warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
47        }
48        info!(
49            "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
50            ctx.persistent_ctx.region_ids,
51            ctx.persistent_ctx.from_peer,
52            ctx.persistent_ctx.to_peer,
53            ctx.persistent_ctx.trigger_reason,
54            ctx.volatile_ctx.metrics,
55        );
56        Ok((Box::new(RegionMigrationEnd), Status::done()))
57    }
58
59    fn as_any(&self) -> &dyn Any {
60        self
61    }
62}
63
64impl CloseDowngradedRegion {
65    /// Builds close region instruction.
66    ///
67    /// Abort(non-retry):
68    /// - Datanode Table is not found.
69    async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
70        let pc = &ctx.persistent_ctx;
71        let downgrade_leader_datanode_id = pc.from_peer.id;
72        let region_ids = &ctx.persistent_ctx.region_ids;
73        let mut idents = Vec::with_capacity(region_ids.len());
74
75        for region_id in region_ids {
76            idents.push(RegionIdent {
77                datanode_id: downgrade_leader_datanode_id,
78                table_id: region_id.table_id(),
79                region_number: region_id.region_number(),
80                // The `engine` field is not used for closing region.
81                engine: String::new(),
82            });
83        }
84
85        Ok(Instruction::CloseRegions(idents))
86    }
87
88    /// Closes the downgraded leader region.
89    async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
90        let close_instruction = self.build_close_region_instruction(ctx).await?;
91        let region_ids = &ctx.persistent_ctx.region_ids;
92        let pc = &ctx.persistent_ctx;
93        let downgrade_leader_datanode = &pc.from_peer;
94        let msg = MailboxMessage::json_message(
95            &format!("Close downgraded regions: {:?}", region_ids),
96            &format!("Metasrv@{}", ctx.server_addr()),
97            &format!(
98                "Datanode-{}@{}",
99                downgrade_leader_datanode.id, downgrade_leader_datanode.addr
100            ),
101            common_time::util::current_time_millis(),
102            &close_instruction,
103        )
104        .with_context(|_| error::SerializeToJsonSnafu {
105            input: close_instruction.to_string(),
106        })?;
107
108        let ch = Channel::Datanode(downgrade_leader_datanode.id);
109        let receiver = ctx
110            .mailbox
111            .send(&ch, msg, default_distributed_time_constants().region_lease)
112            .await?;
113
114        match receiver.await {
115            Ok(msg) => {
116                let reply = HeartbeatMailbox::json_reply(&msg)?;
117                info!(
118                    "Received close downgraded leade region reply: {:?}, region: {:?}",
119                    reply, region_ids
120                );
121                let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
122                    return error::UnexpectedInstructionReplySnafu {
123                        mailbox_message: msg.to_string(),
124                        reason: "expect close region reply",
125                    }
126                    .fail();
127                };
128
129                if result {
130                    Ok(())
131                } else {
132                    error::UnexpectedSnafu {
133                        violated: format!(
134                            "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
135                            downgrade_leader_datanode,
136                        ),
137                    }
138                    .fail()
139                }
140            }
141
142            Err(e) => Err(e),
143        }
144    }
145}