meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{self, GcRegions, InstructionReply};
21use common_meta::lock_key::RegionLock;
22use common_meta::peer::Peer;
23use common_procedure::error::ToJsonSnafu;
24use common_procedure::{
25    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
26    Result as ProcedureResult, Status,
27};
28use common_telemetry::error;
29use itertools::Itertools as _;
30use serde::{Deserialize, Serialize};
31use snafu::ResultExt as _;
32use store_api::storage::GcReport;
33
34use crate::error::{self, Result, SerializeToJsonSnafu};
35use crate::handler::HeartbeatMailbox;
36use crate::service::mailbox::{Channel, MailboxRef};
37
38/// TODO(discord9): another procedure which do both get file refs and gc regions.
39pub struct GcRegionProcedure {
40    mailbox: MailboxRef,
41    data: GcRegionData,
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct GcRegionData {
46    server_addr: String,
47    peer: Peer,
48    gc_regions: GcRegions,
49    description: String,
50    timeout: Duration,
51}
52
53impl GcRegionProcedure {
54    pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
55
56    pub fn new(
57        mailbox: MailboxRef,
58        server_addr: String,
59        peer: Peer,
60        gc_regions: GcRegions,
61        description: String,
62        timeout: Duration,
63    ) -> Self {
64        Self {
65            mailbox,
66            data: GcRegionData {
67                peer,
68                server_addr,
69                gc_regions,
70                description,
71                timeout,
72            },
73        }
74    }
75
76    async fn send_gc_instr(&self) -> Result<GcReport> {
77        let peer = &self.data.peer;
78        let instruction = instruction::Instruction::GcRegions(self.data.gc_regions.clone());
79        let msg = MailboxMessage::json_message(
80            &format!("{}: {}", self.data.description, instruction),
81            &format!("Metasrv@{}", self.data.server_addr),
82            &format!("Datanode-{}@{}", peer.id, peer.addr),
83            common_time::util::current_time_millis(),
84            &instruction,
85        )
86        .with_context(|_| SerializeToJsonSnafu {
87            input: instruction.to_string(),
88        })?;
89
90        let mailbox_rx = self
91            .mailbox
92            .send(&Channel::Datanode(peer.id), msg, self.data.timeout)
93            .await?;
94
95        let reply = match mailbox_rx.await {
96            Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
97            Err(e) => {
98                error!(
99                    "Failed to receive reply from datanode {} for {}: {}",
100                    peer, self.data.description, e
101                );
102                return Err(e);
103            }
104        };
105
106        let InstructionReply::GcRegions(reply) = reply else {
107            return error::UnexpectedInstructionReplySnafu {
108                mailbox_message: format!("{:?}", reply),
109                reason: "Unexpected reply of the GcRegions instruction",
110            }
111            .fail();
112        };
113
114        let res = reply.result;
115        match res {
116            Ok(report) => Ok(report),
117            Err(e) => {
118                error!(
119                    "Datanode {} reported error during GC for regions {:?}: {}",
120                    peer, self.data.gc_regions, e
121                );
122                Err(error::UnexpectedSnafu {
123                    violated: format!(
124                        "Datanode {} reported error during GC for regions {:?}: {}",
125                        peer, self.data.gc_regions, e
126                    ),
127                }
128                .fail()?)
129            }
130        }
131    }
132
133    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
134        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
135            error::UnexpectedSnafu {
136                violated: format!(
137                    "Failed to downcast procedure result to GcReport, got {:?}",
138                    std::any::type_name_of_val(&res.as_ref())
139                ),
140            }
141            .build()
142        })
143    }
144}
145
146#[async_trait::async_trait]
147impl Procedure for GcRegionProcedure {
148    fn type_name(&self) -> &str {
149        Self::TYPE_NAME
150    }
151
152    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
153        // Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of
154        // errors will be reported back via the oneshot channel.
155        let reply = self
156            .send_gc_instr()
157            .await
158            .map_err(ProcedureError::external)?;
159
160        Ok(Status::done_with_output(reply))
161    }
162
163    fn dump(&self) -> ProcedureResult<String> {
164        serde_json::to_string(&self.data).context(ToJsonSnafu)
165    }
166
167    /// Write lock all regions involved in this GC procedure.
168    /// So i.e. region migration won't happen during GC and cause race conditions.
169    ///
170    /// only write lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
171    /// TODO:(discord9): integration test to verify this
172    fn lock_key(&self) -> LockKey {
173        let lock_key: Vec<_> = self
174            .data
175            .gc_regions
176            .regions
177            .iter()
178            .sorted() // sort to have a deterministic lock order
179            .map(|id| RegionLock::Read(*id).into())
180            .collect();
181
182        LockKey::new(lock_key)
183    }
184}