meta_srv/gc/
ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::datanode::RegionStat;
20use common_meta::instruction::{
21    GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
22};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::peer::Peer;
26use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
27use common_telemetry::{debug, error, warn};
28use snafu::{OptionExt as _, ResultExt as _};
29use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
30use table::metadata::TableId;
31
32use crate::cluster::MetaPeerClientRef;
33use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
34use crate::gc::Region2Peers;
35use crate::gc::procedure::GcRegionProcedure;
36use crate::handler::HeartbeatMailbox;
37use crate::service::mailbox::{Channel, MailboxRef};
38
39#[async_trait::async_trait]
40pub(crate) trait SchedulerCtx: Send + Sync {
41    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
42
43    async fn get_table_route(
44        &self,
45        table_id: TableId,
46    ) -> Result<(TableId, PhysicalTableRouteValue)>;
47
48    async fn get_file_references(
49        &self,
50        query_regions: &[RegionId],
51        related_regions: HashMap<RegionId, Vec<RegionId>>,
52        region_routes: &Region2Peers,
53        timeout: Duration,
54    ) -> Result<FileRefsManifest>;
55
56    async fn gc_regions(
57        &self,
58        peer: Peer,
59        region_ids: &[RegionId],
60        file_refs_manifest: &FileRefsManifest,
61        full_file_listing: bool,
62        timeout: Duration,
63    ) -> Result<GcReport>;
64}
65
66pub(crate) struct DefaultGcSchedulerCtx {
67    /// The metadata manager.
68    pub(crate) table_metadata_manager: TableMetadataManagerRef,
69    /// Procedure manager.
70    pub(crate) procedure_manager: ProcedureManagerRef,
71    /// For getting `RegionStats`.
72    pub(crate) meta_peer_client: MetaPeerClientRef,
73    /// The mailbox to send messages.
74    pub(crate) mailbox: MailboxRef,
75    /// The server address.
76    pub(crate) server_addr: String,
77}
78
79impl DefaultGcSchedulerCtx {
80    pub fn try_new(
81        table_metadata_manager: TableMetadataManagerRef,
82        procedure_manager: ProcedureManagerRef,
83        meta_peer_client: MetaPeerClientRef,
84        mailbox: MailboxRef,
85        server_addr: String,
86    ) -> Result<Self> {
87        // register a noop loader for `GcRegionProcedure` to avoid error when deserializing procedure when rebooting
88
89        procedure_manager
90            .register_loader(
91                GcRegionProcedure::TYPE_NAME,
92                Box::new(move |json| {
93                    common_procedure::error::ProcedureLoaderNotImplementedSnafu {
94                        type_name: GcRegionProcedure::TYPE_NAME.to_string(),
95                        reason:
96                            "GC procedure should be retried by scheduler, not reloaded from storage"
97                                .to_string(),
98                    }
99                    .fail()
100                }),
101            )
102            .context(error::RegisterProcedureLoaderSnafu {
103                type_name: GcRegionProcedure::TYPE_NAME,
104            });
105
106        Ok(Self {
107            table_metadata_manager,
108            procedure_manager,
109            meta_peer_client,
110            mailbox,
111            server_addr,
112        })
113    }
114}
115
116#[async_trait::async_trait]
117impl SchedulerCtx for DefaultGcSchedulerCtx {
118    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
119        let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
120        let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
121        for (_dn_id, stats) in dn_stats {
122            let mut stats = stats.stats;
123
124            let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
125                continue;
126            };
127
128            for region_stat in latest_stat.region_stats {
129                table_to_region_stats
130                    .entry(region_stat.id.table_id())
131                    .or_default()
132                    .push(region_stat);
133            }
134        }
135        Ok(table_to_region_stats)
136    }
137
138    async fn get_table_route(
139        &self,
140        table_id: TableId,
141    ) -> Result<(TableId, PhysicalTableRouteValue)> {
142        self.table_metadata_manager
143            .table_route_manager()
144            .get_physical_table_route(table_id)
145            .await
146            .context(TableMetadataManagerSnafu)
147    }
148
149    async fn gc_regions(
150        &self,
151        peer: Peer,
152        region_ids: &[RegionId],
153        file_refs_manifest: &FileRefsManifest,
154        full_file_listing: bool,
155        timeout: Duration,
156    ) -> Result<GcReport> {
157        self.gc_regions_inner(
158            peer,
159            region_ids,
160            file_refs_manifest,
161            full_file_listing,
162            timeout,
163        )
164        .await
165    }
166
167    async fn get_file_references(
168        &self,
169        query_regions: &[RegionId],
170        related_regions: HashMap<RegionId, Vec<RegionId>>,
171        region_routes: &Region2Peers,
172        timeout: Duration,
173    ) -> Result<FileRefsManifest> {
174        debug!(
175            "Getting file references for {} regions",
176            query_regions.len()
177        );
178
179        // Group regions by datanode to minimize RPC calls
180        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
181
182        for region_id in query_regions {
183            if let Some((leader, followers)) = region_routes.get(region_id) {
184                datanode2query_regions
185                    .entry(leader.clone())
186                    .or_default()
187                    .push(*region_id);
188                // also need to send for follower regions for file refs in case query is running on follower
189                for follower in followers {
190                    datanode2query_regions
191                        .entry(follower.clone())
192                        .or_default()
193                        .push(*region_id);
194                }
195            } else {
196                return error::UnexpectedSnafu {
197                    violated: format!(
198                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
199                    ),
200                }
201                .fail();
202            }
203        }
204        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
205            HashMap::new();
206        for (related_region, queries) in related_regions {
207            if let Some((leader, followers)) = region_routes.get(&related_region) {
208                datanode2related_regions
209                    .entry(leader.clone())
210                    .or_default()
211                    .insert(related_region, queries.clone());
212            } // since read from manifest, no need to send to followers
213        }
214
215        // Send GetFileRefs instructions to each datanode
216        let mut all_file_refs: HashMap<RegionId, HashSet<FileId>> = HashMap::new();
217        let mut all_manifest_versions = HashMap::new();
218
219        for (peer, regions) in datanode2query_regions {
220            let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
221            match self
222                .send_get_file_refs_instruction(&peer, &regions, related_regions, timeout)
223                .await
224            {
225                Ok(manifest) => {
226                    // TODO(discord9): if other regions provide file refs for one region on other datanode, and no version,
227                    // is it correct to merge manifest_version directly?
228                    // FIXME: follower region how to merge version???
229
230                    for (region_id, file_refs) in manifest.file_refs {
231                        all_file_refs
232                            .entry(region_id)
233                            .or_default()
234                            .extend(file_refs);
235                    }
236                    // region manifest version should be the smallest one among all peers, so outdated region can be detected
237                    for (region_id, version) in manifest.manifest_version {
238                        let entry = all_manifest_versions.entry(region_id).or_insert(version);
239                        *entry = (*entry).min(version);
240                    }
241                }
242                Err(e) => {
243                    warn!(
244                        "Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
245                        peer, e
246                    );
247                    // Continue processing other datanodes instead of failing the entire operation
248                    continue;
249                }
250            }
251        }
252
253        Ok(FileRefsManifest {
254            file_refs: all_file_refs,
255            manifest_version: all_manifest_versions,
256        })
257    }
258}
259
260impl DefaultGcSchedulerCtx {
261    async fn gc_regions_inner(
262        &self,
263        peer: Peer,
264        region_ids: &[RegionId],
265        file_refs_manifest: &FileRefsManifest,
266        full_file_listing: bool,
267        timeout: Duration,
268    ) -> Result<GcReport> {
269        debug!(
270            "Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
271            peer,
272            region_ids.len(),
273            full_file_listing
274        );
275
276        let gc_regions = GcRegions {
277            regions: region_ids.to_vec(),
278            file_refs_manifest: file_refs_manifest.clone(),
279            full_file_listing,
280        };
281        let procedure = GcRegionProcedure::new(
282            self.mailbox.clone(),
283            self.server_addr.clone(),
284            peer,
285            gc_regions,
286            format!("GC for {} regions", region_ids.len()),
287            timeout,
288        );
289        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291        let id = procedure_with_id.id;
292
293        let mut watcher = self
294            .procedure_manager
295            .submit(procedure_with_id)
296            .await
297            .context(error::SubmitProcedureSnafu)?;
298        let res = watcher::wait(&mut watcher)
299            .await
300            .context(error::WaitProcedureSnafu)?
301            .with_context(|| error::UnexpectedSnafu {
302                violated: format!(
303                    "GC procedure {id} successfully completed but no result returned"
304                ),
305            })?;
306
307        let gc_report = GcRegionProcedure::cast_result(res)?;
308
309        Ok(gc_report)
310    }
311
312    /// TODO(discord9): add support to read manifest of related regions for file refs too
313    /// (now it's only reading  active FileHandles)
314    async fn send_get_file_refs_instruction(
315        &self,
316        peer: &Peer,
317        query_regions: &[RegionId],
318        related_regions: HashMap<RegionId, Vec<RegionId>>,
319        timeout: Duration,
320    ) -> Result<FileRefsManifest> {
321        debug!(
322            "Sending GetFileRefs instruction to datanode {} for {} regions",
323            peer,
324            query_regions.len()
325        );
326
327        let instruction = Instruction::GetFileRefs(GetFileRefs {
328            query_regions: query_regions.to_vec(),
329            related_regions,
330        });
331
332        let reply = self
333            .send_instruction(peer, instruction, "Get file references", timeout)
334            .await?;
335
336        let InstructionReply::GetFileRefs(GetFileRefsReply {
337            file_refs_manifest,
338            success,
339            error,
340        }) = reply
341        else {
342            return error::UnexpectedInstructionReplySnafu {
343                mailbox_message: format!("{:?}", reply),
344                reason: "Unexpected reply of the GetFileRefs instruction",
345            }
346            .fail();
347        };
348
349        if !success {
350            return error::UnexpectedSnafu {
351                violated: format!(
352                    "Failed to get file references from datanode {}: {:?}",
353                    peer, error
354                ),
355            }
356            .fail();
357        }
358
359        Ok(file_refs_manifest)
360    }
361
362    async fn send_instruction(
363        &self,
364        peer: &Peer,
365        instruction: Instruction,
366        description: &str,
367        timeout: Duration,
368    ) -> Result<InstructionReply> {
369        let msg = MailboxMessage::json_message(
370            &format!("{}: {}", description, instruction),
371            &format!("Metasrv@{}", self.server_addr),
372            &format!("Datanode-{}@{}", peer.id, peer.addr),
373            common_time::util::current_time_millis(),
374            &instruction,
375        )
376        .with_context(|_| error::SerializeToJsonSnafu {
377            input: instruction.to_string(),
378        })?;
379
380        let mailbox_rx = self
381            .mailbox
382            .send(&Channel::Datanode(peer.id), msg, timeout)
383            .await?;
384
385        match mailbox_rx.await {
386            Ok(reply_msg) => {
387                let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
388                Ok(reply)
389            }
390            Err(e) => {
391                error!(
392                    "Failed to receive reply from datanode {} for {}: {}",
393                    peer, description, e
394                );
395                Err(e)
396            }
397        }
398    }
399}