meta_srv/gc/
ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::datanode::RegionStat;
20use common_meta::instruction::{
21    GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
22};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::peer::Peer;
26use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
27use common_telemetry::{debug, error, warn};
28use snafu::{OptionExt as _, ResultExt as _};
29use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
30use table::metadata::TableId;
31
32use crate::cluster::MetaPeerClientRef;
33use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
34use crate::gc::Region2Peers;
35use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure};
36use crate::handler::HeartbeatMailbox;
37use crate::service::mailbox::{Channel, MailboxRef};
38
39#[async_trait::async_trait]
40pub(crate) trait SchedulerCtx: Send + Sync {
41    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
42
43    async fn get_table_route(
44        &self,
45        table_id: TableId,
46    ) -> Result<(TableId, PhysicalTableRouteValue)>;
47
48    async fn get_file_references(
49        &self,
50        query_regions: &[RegionId],
51        related_regions: HashMap<RegionId, Vec<RegionId>>,
52        region_routes: &Region2Peers,
53        timeout: Duration,
54    ) -> Result<FileRefsManifest>;
55
56    async fn gc_regions(
57        &self,
58        peer: Peer,
59        region_ids: &[RegionId],
60        file_refs_manifest: &FileRefsManifest,
61        full_file_listing: bool,
62        timeout: Duration,
63    ) -> Result<GcReport>;
64}
65
66pub(crate) struct DefaultGcSchedulerCtx {
67    /// The metadata manager.
68    pub(crate) table_metadata_manager: TableMetadataManagerRef,
69    /// Procedure manager.
70    pub(crate) procedure_manager: ProcedureManagerRef,
71    /// For getting `RegionStats`.
72    pub(crate) meta_peer_client: MetaPeerClientRef,
73    /// The mailbox to send messages.
74    pub(crate) mailbox: MailboxRef,
75    /// The server address.
76    pub(crate) server_addr: String,
77}
78
79impl DefaultGcSchedulerCtx {
80    pub fn try_new(
81        table_metadata_manager: TableMetadataManagerRef,
82        procedure_manager: ProcedureManagerRef,
83        meta_peer_client: MetaPeerClientRef,
84        mailbox: MailboxRef,
85        server_addr: String,
86    ) -> Result<Self> {
87        Ok(Self {
88            table_metadata_manager,
89            procedure_manager,
90            meta_peer_client,
91            mailbox,
92            server_addr,
93        })
94    }
95}
96
97#[async_trait::async_trait]
98impl SchedulerCtx for DefaultGcSchedulerCtx {
99    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
100        let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
101        let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
102        for (_dn_id, stats) in dn_stats {
103            let mut stats = stats.stats;
104
105            let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
106                continue;
107            };
108
109            for region_stat in latest_stat.region_stats {
110                table_to_region_stats
111                    .entry(region_stat.id.table_id())
112                    .or_default()
113                    .push(region_stat);
114            }
115        }
116        Ok(table_to_region_stats)
117    }
118
119    async fn get_table_route(
120        &self,
121        table_id: TableId,
122    ) -> Result<(TableId, PhysicalTableRouteValue)> {
123        self.table_metadata_manager
124            .table_route_manager()
125            .get_physical_table_route(table_id)
126            .await
127            .context(TableMetadataManagerSnafu)
128    }
129
130    async fn gc_regions(
131        &self,
132        peer: Peer,
133        region_ids: &[RegionId],
134        file_refs_manifest: &FileRefsManifest,
135        full_file_listing: bool,
136        timeout: Duration,
137    ) -> Result<GcReport> {
138        self.gc_regions_inner(
139            peer,
140            region_ids,
141            file_refs_manifest,
142            full_file_listing,
143            timeout,
144        )
145        .await
146    }
147
148    async fn get_file_references(
149        &self,
150        query_regions: &[RegionId],
151        related_regions: HashMap<RegionId, Vec<RegionId>>,
152        region_routes: &Region2Peers,
153        timeout: Duration,
154    ) -> Result<FileRefsManifest> {
155        debug!(
156            "Getting file references for {} regions",
157            query_regions.len()
158        );
159
160        // Group regions by datanode to minimize RPC calls
161        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
162
163        for region_id in query_regions {
164            if let Some((leader, followers)) = region_routes.get(region_id) {
165                datanode2query_regions
166                    .entry(leader.clone())
167                    .or_default()
168                    .push(*region_id);
169                // also need to send for follower regions for file refs in case query is running on follower
170                for follower in followers {
171                    datanode2query_regions
172                        .entry(follower.clone())
173                        .or_default()
174                        .push(*region_id);
175                }
176            } else {
177                return error::UnexpectedSnafu {
178                    violated: format!(
179                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
180                    ),
181                }
182                .fail();
183            }
184        }
185        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
186            HashMap::new();
187        for (related_region, queries) in related_regions {
188            if let Some((leader, followers)) = region_routes.get(&related_region) {
189                datanode2related_regions
190                    .entry(leader.clone())
191                    .or_default()
192                    .insert(related_region, queries.clone());
193            } // since read from manifest, no need to send to followers
194        }
195
196        // Send GetFileRefs instructions to each datanode
197        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
198        let mut all_manifest_versions = HashMap::new();
199
200        for (peer, regions) in datanode2query_regions {
201            let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
202            match self
203                .send_get_file_refs_instruction(&peer, &regions, related_regions, timeout)
204                .await
205            {
206                Ok(manifest) => {
207                    // TODO(discord9): if other regions provide file refs for one region on other datanode, and no version,
208                    // is it correct to merge manifest_version directly?
209                    // FIXME: follower region how to merge version???
210
211                    for (region_id, file_refs) in manifest.file_refs {
212                        all_file_refs
213                            .entry(region_id)
214                            .or_default()
215                            .extend(file_refs);
216                    }
217                    // region manifest version should be the smallest one among all peers, so outdated region can be detected
218                    for (region_id, version) in manifest.manifest_version {
219                        let entry = all_manifest_versions.entry(region_id).or_insert(version);
220                        *entry = (*entry).min(version);
221                    }
222                }
223                Err(e) => {
224                    warn!(
225                        "Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
226                        peer, e
227                    );
228                    // Continue processing other datanodes instead of failing the entire operation
229                    continue;
230                }
231            }
232        }
233
234        Ok(FileRefsManifest {
235            file_refs: all_file_refs,
236            manifest_version: all_manifest_versions,
237        })
238    }
239}
240
241impl DefaultGcSchedulerCtx {
242    async fn gc_regions_inner(
243        &self,
244        peer: Peer,
245        region_ids: &[RegionId],
246        file_refs_manifest: &FileRefsManifest,
247        full_file_listing: bool,
248        timeout: Duration,
249    ) -> Result<GcReport> {
250        debug!(
251            "Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
252            peer,
253            region_ids.len(),
254            full_file_listing
255        );
256
257        let gc_regions = GcRegions {
258            regions: region_ids.to_vec(),
259            file_refs_manifest: file_refs_manifest.clone(),
260            full_file_listing,
261        };
262        let procedure = GcRegionProcedure::new(
263            self.mailbox.clone(),
264            self.server_addr.clone(),
265            peer,
266            gc_regions,
267            format!("GC for {} regions", region_ids.len()),
268            timeout,
269        );
270        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
271
272        let id = procedure_with_id.id;
273
274        let mut watcher = self
275            .procedure_manager
276            .submit(procedure_with_id)
277            .await
278            .context(error::SubmitProcedureSnafu)?;
279        let res = watcher::wait(&mut watcher)
280            .await
281            .context(error::WaitProcedureSnafu)?
282            .with_context(|| error::UnexpectedSnafu {
283                violated: format!(
284                    "GC procedure {id} successfully completed but no result returned"
285                ),
286            })?;
287
288        let gc_report = GcRegionProcedure::cast_result(res)?;
289
290        Ok(gc_report)
291    }
292
293    /// TODO(discord9): add support to read manifest of related regions for file refs too
294    /// (now it's only reading  active FileHandles)
295    async fn send_get_file_refs_instruction(
296        &self,
297        peer: &Peer,
298        query_regions: &[RegionId],
299        related_regions: HashMap<RegionId, Vec<RegionId>>,
300        timeout: Duration,
301    ) -> Result<FileRefsManifest> {
302        debug!(
303            "Sending GetFileRefs instruction to datanode {} for {} regions",
304            peer,
305            query_regions.len()
306        );
307
308        let instruction = Instruction::GetFileRefs(GetFileRefs {
309            query_regions: query_regions.to_vec(),
310            related_regions,
311        });
312
313        let reply = self
314            .send_instruction(peer, instruction, "Get file references", timeout)
315            .await?;
316
317        let InstructionReply::GetFileRefs(GetFileRefsReply {
318            file_refs_manifest,
319            success,
320            error,
321        }) = reply
322        else {
323            return error::UnexpectedInstructionReplySnafu {
324                mailbox_message: format!("{:?}", reply),
325                reason: "Unexpected reply of the GetFileRefs instruction",
326            }
327            .fail();
328        };
329
330        if !success {
331            return error::UnexpectedSnafu {
332                violated: format!(
333                    "Failed to get file references from datanode {}: {:?}",
334                    peer, error
335                ),
336            }
337            .fail();
338        }
339
340        Ok(file_refs_manifest)
341    }
342
343    async fn send_instruction(
344        &self,
345        peer: &Peer,
346        instruction: Instruction,
347        description: &str,
348        timeout: Duration,
349    ) -> Result<InstructionReply> {
350        let msg = MailboxMessage::json_message(
351            &format!("{}: {}", description, instruction),
352            &format!("Metasrv@{}", self.server_addr),
353            &format!("Datanode-{}@{}", peer.id, peer.addr),
354            common_time::util::current_time_millis(),
355            &instruction,
356        )
357        .with_context(|_| error::SerializeToJsonSnafu {
358            input: instruction.to_string(),
359        })?;
360
361        let mailbox_rx = self
362            .mailbox
363            .send(&Channel::Datanode(peer.id), msg, timeout)
364            .await?;
365
366        match mailbox_rx.await {
367            Ok(reply_msg) => {
368                let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
369                Ok(reply)
370            }
371            Err(e) => {
372                error!(
373                    "Failed to receive reply from datanode {} for {}: {}",
374                    peer, description, e
375                );
376                Err(e)
377            }
378        }
379    }
380}