meta_srv/procedure/region_migration/
close_downgraded_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::{info, warn};
24use serde::{Deserialize, Serialize};
25use snafu::ResultExt;
26
27use crate::error::{self, Result};
28use crate::handler::HeartbeatMailbox;
29use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
30use crate::procedure::region_migration::{Context, State};
31use crate::service::mailbox::Channel;
32
33/// Uses lease time of a region as the timeout of closing a downgraded region.
34const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct CloseDowngradedRegion;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for CloseDowngradedRegion {
42    async fn next(
43        &mut self,
44        ctx: &mut Context,
45        _procedure_ctx: &ProcedureContext,
46    ) -> Result<(Box<dyn State>, Status)> {
47        if let Err(err) = self.close_downgraded_leader_region(ctx).await {
48            let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
49            let region_ids = &ctx.persistent_ctx.region_ids;
50            warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
51        }
52        info!(
53            "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
54            ctx.persistent_ctx.region_ids,
55            ctx.persistent_ctx.from_peer,
56            ctx.persistent_ctx.to_peer,
57            ctx.persistent_ctx.trigger_reason,
58            ctx.volatile_ctx.metrics,
59        );
60        Ok((Box::new(RegionMigrationEnd), Status::done()))
61    }
62
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66}
67
68impl CloseDowngradedRegion {
69    /// Builds close region instruction.
70    ///
71    /// Abort(non-retry):
72    /// - Datanode Table is not found.
73    async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
74        let pc = &ctx.persistent_ctx;
75        let downgrade_leader_datanode_id = pc.from_peer.id;
76        let region_ids = &ctx.persistent_ctx.region_ids;
77        let mut idents = Vec::with_capacity(region_ids.len());
78
79        for region_id in region_ids {
80            idents.push(RegionIdent {
81                datanode_id: downgrade_leader_datanode_id,
82                table_id: region_id.table_id(),
83                region_number: region_id.region_number(),
84                // The `engine` field is not used for closing region.
85                engine: String::new(),
86            });
87        }
88
89        Ok(Instruction::CloseRegions(idents))
90    }
91
92    /// Closes the downgraded leader region.
93    async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
94        let close_instruction = self.build_close_region_instruction(ctx).await?;
95        let region_ids = &ctx.persistent_ctx.region_ids;
96        let pc = &ctx.persistent_ctx;
97        let downgrade_leader_datanode = &pc.from_peer;
98        let msg = MailboxMessage::json_message(
99            &format!("Close downgraded regions: {:?}", region_ids),
100            &format!("Metasrv@{}", ctx.server_addr()),
101            &format!(
102                "Datanode-{}@{}",
103                downgrade_leader_datanode.id, downgrade_leader_datanode.addr
104            ),
105            common_time::util::current_time_millis(),
106            &close_instruction,
107        )
108        .with_context(|_| error::SerializeToJsonSnafu {
109            input: close_instruction.to_string(),
110        })?;
111
112        let ch = Channel::Datanode(downgrade_leader_datanode.id);
113        let receiver = ctx
114            .mailbox
115            .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
116            .await?;
117
118        match receiver.await {
119            Ok(msg) => {
120                let reply = HeartbeatMailbox::json_reply(&msg)?;
121                info!(
122                    "Received close downgraded leade region reply: {:?}, region: {:?}",
123                    reply, region_ids
124                );
125                let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
126                    return error::UnexpectedInstructionReplySnafu {
127                        mailbox_message: msg.to_string(),
128                        reason: "expect close region reply",
129                    }
130                    .fail();
131                };
132
133                if result {
134                    Ok(())
135                } else {
136                    error::UnexpectedSnafu {
137                        violated: format!(
138                            "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
139                            downgrade_leader_datanode,
140                        ),
141                    }
142                    .fail()
143                }
144            }
145
146            Err(e) => Err(e),
147        }
148    }
149}