meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::lock_key::RegionLock;
23use common_meta::peer::Peer;
24use common_procedure::error::ToJsonSnafu;
25use common_procedure::{
26    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
27    Result as ProcedureResult, Status,
28};
29use common_telemetry::{debug, error, info, warn};
30use itertools::Itertools as _;
31use serde::{Deserialize, Serialize};
32use snafu::ResultExt as _;
33use store_api::storage::{FileRefsManifest, GcReport, RegionId};
34
35use crate::error::{self, Result, SerializeToJsonSnafu};
36use crate::gc::Region2Peers;
37use crate::handler::HeartbeatMailbox;
38use crate::service::mailbox::{Channel, MailboxRef};
39
40/// Helper function to send GetFileRefs instruction and wait for reply.
41async fn send_get_file_refs(
42    mailbox: &MailboxRef,
43    server_addr: &str,
44    peer: &Peer,
45    instruction: GetFileRefs,
46    timeout: Duration,
47) -> Result<GetFileRefsReply> {
48    let instruction = instruction::Instruction::GetFileRefs(instruction);
49    let msg = MailboxMessage::json_message(
50        &format!("Get file references: {}", instruction),
51        &format!("Metasrv@{}", server_addr),
52        &format!("Datanode-{}@{}", peer.id, peer.addr),
53        common_time::util::current_time_millis(),
54        &instruction,
55    )
56    .with_context(|_| SerializeToJsonSnafu {
57        input: instruction.to_string(),
58    })?;
59
60    let mailbox_rx = mailbox
61        .send(&Channel::Datanode(peer.id), msg, timeout)
62        .await?;
63
64    let reply = match mailbox_rx.await {
65        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
66        Err(e) => {
67            error!(
68                "Failed to receive reply from datanode {} for GetFileRefs: {}",
69                peer, e
70            );
71            return Err(e);
72        }
73    };
74
75    let InstructionReply::GetFileRefs(reply) = reply else {
76        return error::UnexpectedInstructionReplySnafu {
77            mailbox_message: format!("{:?}", reply),
78            reason: "Unexpected reply of the GetFileRefs instruction",
79        }
80        .fail();
81    };
82
83    Ok(reply)
84}
85
86/// Helper function to send GcRegions instruction and wait for reply.
87async fn send_gc_regions(
88    mailbox: &MailboxRef,
89    peer: &Peer,
90    gc_regions: GcRegions,
91    server_addr: &str,
92    timeout: Duration,
93    description: &str,
94) -> Result<GcReport> {
95    let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
96    let msg = MailboxMessage::json_message(
97        &format!("{}: {}", description, instruction),
98        &format!("Metasrv@{}", server_addr),
99        &format!("Datanode-{}@{}", peer.id, peer.addr),
100        common_time::util::current_time_millis(),
101        &instruction,
102    )
103    .with_context(|_| SerializeToJsonSnafu {
104        input: instruction.to_string(),
105    })?;
106
107    let mailbox_rx = mailbox
108        .send(&Channel::Datanode(peer.id), msg, timeout)
109        .await?;
110
111    let reply = match mailbox_rx.await {
112        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
113        Err(e) => {
114            error!(
115                "Failed to receive reply from datanode {} for {}: {}",
116                peer, description, e
117            );
118            return Err(e);
119        }
120    };
121
122    let InstructionReply::GcRegions(reply) = reply else {
123        return error::UnexpectedInstructionReplySnafu {
124            mailbox_message: format!("{:?}", reply),
125            reason: "Unexpected reply of the GcRegions instruction",
126        }
127        .fail();
128    };
129
130    let res = reply.result;
131    match res {
132        Ok(report) => Ok(report),
133        Err(e) => {
134            error!(
135                "Datanode {} reported error during GC for regions {:?}: {}",
136                peer, gc_regions, e
137            );
138            error::UnexpectedSnafu {
139                violated: format!(
140                    "Datanode {} reported error during GC for regions {:?}: {}",
141                    peer, gc_regions, e
142                ),
143            }
144            .fail()
145        }
146    }
147}
148
149/// TODO(discord9): another procedure which do both get file refs and gc regions.
150pub struct GcRegionProcedure {
151    mailbox: MailboxRef,
152    data: GcRegionData,
153}
154
155#[derive(Serialize, Deserialize)]
156pub struct GcRegionData {
157    server_addr: String,
158    peer: Peer,
159    gc_regions: GcRegions,
160    description: String,
161    timeout: Duration,
162}
163
164impl GcRegionProcedure {
165    pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
166
167    pub fn new(
168        mailbox: MailboxRef,
169        server_addr: String,
170        peer: Peer,
171        gc_regions: GcRegions,
172        description: String,
173        timeout: Duration,
174    ) -> Self {
175        Self {
176            mailbox,
177            data: GcRegionData {
178                peer,
179                server_addr,
180                gc_regions,
181                description,
182                timeout,
183            },
184        }
185    }
186
187    async fn send_gc_instr(&self) -> Result<GcReport> {
188        send_gc_regions(
189            &self.mailbox,
190            &self.data.peer,
191            self.data.gc_regions.clone(),
192            &self.data.server_addr,
193            self.data.timeout,
194            &self.data.description,
195        )
196        .await
197    }
198
199    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
200        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
201            error::UnexpectedSnafu {
202                violated: format!(
203                    "Failed to downcast procedure result to GcReport, got {:?}",
204                    std::any::type_name_of_val(&res.as_ref())
205                ),
206            }
207            .build()
208        })
209    }
210}
211
212#[async_trait::async_trait]
213impl Procedure for GcRegionProcedure {
214    fn type_name(&self) -> &str {
215        Self::TYPE_NAME
216    }
217
218    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
219        // Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of
220        // errors will be reported back via the oneshot channel.
221        let reply = self
222            .send_gc_instr()
223            .await
224            .map_err(ProcedureError::external)?;
225
226        Ok(Status::done_with_output(reply))
227    }
228
229    fn dump(&self) -> ProcedureResult<String> {
230        serde_json::to_string(&self.data).context(ToJsonSnafu)
231    }
232
233    /// Read lock all regions involved in this GC procedure.
234    /// So i.e. region migration won't happen during GC and cause race conditions.
235    ///
236    /// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
237    /// TODO:(discord9): integration test to verify this
238    fn lock_key(&self) -> LockKey {
239        let lock_key: Vec<_> = self
240            .data
241            .gc_regions
242            .regions
243            .iter()
244            .sorted() // sort to have a deterministic lock order
245            .map(|id| RegionLock::Read(*id).into())
246            .collect();
247
248        LockKey::new(lock_key)
249    }
250}
251
252/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function
253/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure.
254pub struct BatchGcProcedure {
255    mailbox: MailboxRef,
256    data: BatchGcData,
257}
258
259#[derive(Serialize, Deserialize)]
260pub struct BatchGcData {
261    state: State,
262    server_addr: String,
263    /// The regions to be GC-ed
264    regions: Vec<RegionId>,
265    full_file_listing: bool,
266    region_routes: Region2Peers,
267    /// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds.
268    related_regions: HashMap<RegionId, Vec<RegionId>>,
269    /// Acquired file references (Populated in Acquiring state)
270    file_refs: FileRefsManifest,
271    /// mailbox timeout duration
272    timeout: Duration,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
276pub enum State {
277    /// Initial state
278    Start,
279    /// Fetching file references from datanodes
280    Acquiring,
281    /// Sending GC instruction to the target datanode
282    Gcing,
283}
284
285impl BatchGcProcedure {
286    pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
287
288    pub fn new(
289        mailbox: MailboxRef,
290        server_addr: String,
291        regions: Vec<RegionId>,
292        full_file_listing: bool,
293        region_routes: Region2Peers,
294        related_regions: HashMap<RegionId, Vec<RegionId>>,
295        timeout: Duration,
296    ) -> Self {
297        Self {
298            mailbox,
299            data: BatchGcData {
300                state: State::Start,
301                server_addr,
302                regions,
303                full_file_listing,
304                region_routes,
305                related_regions,
306                file_refs: FileRefsManifest::default(),
307                timeout,
308            },
309        }
310    }
311
312    /// Get file references from all datanodes that host the regions
313    async fn get_file_references(&self) -> Result<FileRefsManifest> {
314        use std::collections::{HashMap, HashSet};
315
316        let query_regions = &self.data.regions;
317        let related_regions = &self.data.related_regions;
318        let region_routes = &self.data.region_routes;
319        let timeout = self.data.timeout;
320
321        // Group regions by datanode to minimize RPC calls
322        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
323
324        for region_id in query_regions {
325            if let Some((leader, followers)) = region_routes.get(region_id) {
326                datanode2query_regions
327                    .entry(leader.clone())
328                    .or_default()
329                    .push(*region_id);
330                // also need to send for follower regions for file refs in case query is running on follower
331                for follower in followers {
332                    datanode2query_regions
333                        .entry(follower.clone())
334                        .or_default()
335                        .push(*region_id);
336                }
337            } else {
338                return error::UnexpectedSnafu {
339                    violated: format!(
340                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
341                    ),
342                }
343                .fail();
344            }
345        }
346
347        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
348            HashMap::new();
349        for (related_region, queries) in related_regions {
350            if let Some((leader, _followers)) = region_routes.get(related_region) {
351                datanode2related_regions
352                    .entry(leader.clone())
353                    .or_default()
354                    .insert(*related_region, queries.clone());
355            } // since read from manifest, no need to send to followers
356        }
357
358        // Send GetFileRefs instructions to each datanode
359        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
360        let mut all_manifest_versions = HashMap::new();
361
362        for (peer, regions) in datanode2query_regions {
363            let related_regions_for_peer =
364                datanode2related_regions.remove(&peer).unwrap_or_default();
365
366            let instruction = GetFileRefs {
367                query_regions: regions.clone(),
368                related_regions: related_regions_for_peer,
369            };
370
371            let reply = send_get_file_refs(
372                &self.mailbox,
373                &self.data.server_addr,
374                &peer,
375                instruction,
376                timeout,
377            )
378            .await?;
379
380            if !reply.success {
381                return error::UnexpectedSnafu {
382                    violated: format!(
383                        "Failed to get file references from datanode {}: {:?}",
384                        peer, reply.error
385                    ),
386                }
387                .fail();
388            }
389
390            // Merge the file references from this datanode
391            for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
392                all_file_refs
393                    .entry(region_id)
394                    .or_default()
395                    .extend(file_refs);
396            }
397
398            // region manifest version should be the smallest one among all peers, so outdated region can be detected
399            for (region_id, version) in reply.file_refs_manifest.manifest_version {
400                let entry = all_manifest_versions.entry(region_id).or_insert(version);
401                *entry = (*entry).min(version);
402            }
403        }
404
405        Ok(FileRefsManifest {
406            file_refs: all_file_refs,
407            manifest_version: all_manifest_versions,
408        })
409    }
410
411    /// Send GC instruction to all datanodes that host the regions,
412    /// returns regions that need retry.
413    async fn send_gc_instructions(&self) -> Result<Vec<RegionId>> {
414        let regions = &self.data.regions;
415        let region_routes = &self.data.region_routes;
416        let file_refs = &self.data.file_refs;
417        let timeout = self.data.timeout;
418
419        // Group regions by datanode
420        let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
421
422        for region_id in regions {
423            if let Some((leader, _followers)) = region_routes.get(region_id) {
424                datanode2regions
425                    .entry(leader.clone())
426                    .or_default()
427                    .push(*region_id);
428            } else {
429                return error::UnexpectedSnafu {
430                    violated: format!(
431                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
432                    ),
433                }
434                .fail();
435            }
436        }
437
438        let mut all_need_retry = HashSet::new();
439        // Send GC instructions to each datanode
440        for (peer, regions_for_peer) in datanode2regions {
441            let gc_regions = GcRegions {
442                regions: regions_for_peer.clone(),
443                // file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation.
444                file_refs_manifest: file_refs.clone(),
445                full_file_listing: self.data.full_file_listing,
446            };
447
448            let report = send_gc_regions(
449                &self.mailbox,
450                &peer,
451                gc_regions,
452                self.data.server_addr.as_str(),
453                timeout,
454                "Batch GC",
455            )
456            .await?;
457
458            let success = report.deleted_files.keys().collect_vec();
459            let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
460
461            if need_retry.is_empty() {
462                info!(
463                    "GC report from datanode {}: successfully deleted files for regions {:?}",
464                    peer, success
465                );
466            } else {
467                warn!(
468                    "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
469                    peer, success, need_retry
470                );
471            }
472            all_need_retry.extend(report.need_retry_regions);
473        }
474
475        Ok(all_need_retry.into_iter().collect())
476    }
477}
478
479#[async_trait::async_trait]
480impl Procedure for BatchGcProcedure {
481    fn type_name(&self) -> &str {
482        Self::TYPE_NAME
483    }
484
485    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
486        match self.data.state {
487            State::Start => {
488                // Transition to Acquiring state
489                self.data.state = State::Acquiring;
490                Ok(Status::executing(false))
491            }
492            State::Acquiring => {
493                // Get file references from all datanodes
494                match self.get_file_references().await {
495                    Ok(file_refs) => {
496                        self.data.file_refs = file_refs;
497                        self.data.state = State::Gcing;
498                        Ok(Status::executing(false))
499                    }
500                    Err(e) => {
501                        error!("Failed to get file references: {}", e);
502                        Err(ProcedureError::external(e))
503                    }
504                }
505            }
506            State::Gcing => {
507                // Send GC instructions to all datanodes
508                // TODO(discord9): handle need-retry regions
509                match self.send_gc_instructions().await {
510                    Ok(_) => {
511                        info!(
512                            "Batch GC completed successfully for regions {:?}",
513                            self.data.regions
514                        );
515                        Ok(Status::done())
516                    }
517                    Err(e) => {
518                        error!("Failed to send GC instructions: {}", e);
519                        Err(ProcedureError::external(e))
520                    }
521                }
522            }
523        }
524    }
525
526    fn dump(&self) -> ProcedureResult<String> {
527        serde_json::to_string(&self.data).context(ToJsonSnafu)
528    }
529
530    /// Read lock all regions involved in this GC procedure.
531    /// So i.e. region migration won't happen during GC and cause race conditions.
532    fn lock_key(&self) -> LockKey {
533        let lock_key: Vec<_> = self
534            .data
535            .regions
536            .iter()
537            .sorted() // sort to have a deterministic lock order
538            .map(|id| RegionLock::Read(*id).into())
539            .collect();
540
541        LockKey::new(lock_key)
542    }
543}