meta_srv/gc/
ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use common_meta::datanode::RegionStat;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_repart::TableRepartValue;
21use common_meta::key::table_route::PhysicalTableRouteValue;
22use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
23use common_telemetry::debug;
24use snafu::{OptionExt as _, ResultExt as _};
25use store_api::storage::{GcReport, RegionId};
26use table::metadata::TableId;
27
28use crate::cluster::MetaPeerClientRef;
29use crate::error::{self, Result, TableMetadataManagerSnafu};
30use crate::gc::Region2Peers;
31use crate::gc::procedure::BatchGcProcedure;
32use crate::service::mailbox::MailboxRef;
33
34#[async_trait::async_trait]
35pub(crate) trait SchedulerCtx: Send + Sync {
36    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
37
38    async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>>;
39
40    async fn get_table_route(
41        &self,
42        table_id: TableId,
43    ) -> Result<(TableId, PhysicalTableRouteValue)>;
44
45    async fn batch_get_table_route(
46        &self,
47        table_ids: &[TableId],
48    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>>;
49
50    async fn gc_regions(
51        &self,
52        region_ids: &[RegionId],
53        full_file_listing: bool,
54        timeout: Duration,
55        region_routes_override: Region2Peers,
56    ) -> Result<GcReport>;
57}
58
59pub(crate) struct DefaultGcSchedulerCtx {
60    /// The metadata manager.
61    pub(crate) table_metadata_manager: TableMetadataManagerRef,
62    /// Procedure manager.
63    pub(crate) procedure_manager: ProcedureManagerRef,
64    /// For getting `RegionStats`.
65    pub(crate) meta_peer_client: MetaPeerClientRef,
66    /// The mailbox to send messages.
67    pub(crate) mailbox: MailboxRef,
68    /// The server address.
69    pub(crate) server_addr: String,
70}
71
72impl DefaultGcSchedulerCtx {
73    pub fn try_new(
74        table_metadata_manager: TableMetadataManagerRef,
75        procedure_manager: ProcedureManagerRef,
76        meta_peer_client: MetaPeerClientRef,
77        mailbox: MailboxRef,
78        server_addr: String,
79    ) -> Result<Self> {
80        Ok(Self {
81            table_metadata_manager,
82            procedure_manager,
83            meta_peer_client,
84            mailbox,
85            server_addr,
86        })
87    }
88}
89
90#[async_trait::async_trait]
91impl SchedulerCtx for DefaultGcSchedulerCtx {
92    async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
93        let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
94        let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
95        for (_dn_id, stats) in dn_stats {
96            let stats = stats.stats;
97
98            let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
99                continue;
100            };
101
102            for region_stat in latest_stat.region_stats {
103                table_to_region_stats
104                    .entry(region_stat.id.table_id())
105                    .or_default()
106                    .push(region_stat);
107            }
108        }
109        Ok(table_to_region_stats)
110    }
111
112    async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
113        self.table_metadata_manager
114            .table_repart_manager()
115            .table_reparts()
116            .await
117            .context(TableMetadataManagerSnafu)
118    }
119
120    async fn get_table_route(
121        &self,
122        table_id: TableId,
123    ) -> Result<(TableId, PhysicalTableRouteValue)> {
124        self.table_metadata_manager
125            .table_route_manager()
126            .get_physical_table_route(table_id)
127            .await
128            .context(TableMetadataManagerSnafu)
129    }
130
131    async fn batch_get_table_route(
132        &self,
133        table_ids: &[TableId],
134    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
135        self.table_metadata_manager
136            .table_route_manager()
137            .batch_get_physical_table_routes(table_ids)
138            .await
139            .context(TableMetadataManagerSnafu)
140    }
141
142    async fn gc_regions(
143        &self,
144        region_ids: &[RegionId],
145        full_file_listing: bool,
146        timeout: Duration,
147        region_routes_override: Region2Peers,
148    ) -> Result<GcReport> {
149        self.gc_regions_inner(
150            region_ids,
151            full_file_listing,
152            timeout,
153            region_routes_override,
154        )
155        .await
156    }
157}
158
159impl DefaultGcSchedulerCtx {
160    async fn gc_regions_inner(
161        &self,
162        region_ids: &[RegionId],
163        full_file_listing: bool,
164        timeout: Duration,
165        region_routes_override: Region2Peers,
166    ) -> Result<GcReport> {
167        debug!(
168            "Sending GC instruction for {} regions (full_file_listing: {})",
169            region_ids.len(),
170            full_file_listing
171        );
172
173        let procedure = BatchGcProcedure::new(
174            self.mailbox.clone(),
175            self.table_metadata_manager.clone(),
176            self.server_addr.clone(),
177            region_ids.to_vec(),
178            full_file_listing,
179            timeout,
180            region_routes_override,
181        );
182        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
183
184        let id = procedure_with_id.id;
185
186        let mut watcher = self
187            .procedure_manager
188            .submit(procedure_with_id)
189            .await
190            .context(error::SubmitProcedureSnafu)?;
191        let res = watcher::wait(&mut watcher)
192            .await
193            .context(error::WaitProcedureSnafu)?
194            .with_context(|| error::UnexpectedSnafu {
195                violated: format!(
196                    "GC procedure {id} successfully completed but no result returned"
197                ),
198            })?;
199
200        let gc_report = BatchGcProcedure::cast_result(res)?;
201
202        Ok(gc_report)
203    }
204}