mito2/worker/
handle_drop.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling drop request.
16
17use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::{AffectedRows, PathType};
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
35const MAX_RETRY_TIMES: u64 = 12; // 1 hours (5m * 12)
36
37impl<S> RegionWorkerLoop<S>
38where
39    S: LogStore,
40{
41    pub(crate) async fn handle_drop_request(
42        &mut self,
43        region_id: RegionId,
44        partial_drop: bool,
45    ) -> Result<AffectedRows> {
46        let region = self.regions.writable_region(region_id)?;
47
48        info!("Try to drop region: {}, worker: {}", region_id, self.id);
49
50        let is_staging = region.is_staging();
51        let expect_state = if is_staging {
52            RegionLeaderState::Staging
53        } else {
54            RegionLeaderState::Writable
55        };
56        // Marks the region as dropping.
57        region.set_dropping(expect_state)?;
58        // Writes dropping marker
59        // We rarely drop a region so we still operate in the worker loop.
60        let region_dir = region.access_layer.build_region_dir(region_id);
61        let path_type = region.access_layer.path_type();
62        let table_dir = region.access_layer.table_dir().to_string();
63        let marker_path = join_path(&region_dir, DROPPING_MARKER_FILE);
64        region
65            .access_layer
66            .object_store()
67            .write(&marker_path, Bytes::new())
68            .await
69            .context(OpenDalSnafu)
70            .inspect_err(|e| {
71                error!(e; "Failed to write the drop marker file for region {}", region_id);
72
73                // Sets the state back to writable. It's possible that the marker file has been written.
74                // We set the state back to writable so we can retry the drop operation.
75                region.switch_state_to_writable(RegionLeaderState::Dropping);
76            })?;
77
78        region.stop().await;
79        // Removes this region from region map to prevent other requests from accessing this region
80        self.regions.remove_region(region_id);
81        self.dropping_regions.insert_region(region.clone());
82
83        // Delete region data in WAL.
84        self.wal
85            .obsolete(
86                region_id,
87                region.version_control.current().last_entry_id,
88                &region.provider,
89            )
90            .await?;
91        // Notifies flush scheduler.
92        self.flush_scheduler.on_region_dropped(region_id);
93        // Notifies compaction scheduler.
94        self.compaction_scheduler.on_region_dropped(region_id);
95        // notifies index build scheduler.
96        self.index_build_scheduler
97            .on_region_dropped(region_id)
98            .await;
99
100        // Marks region version as dropped
101        region.version_control.mark_dropped();
102        info!(
103            "Region {} is dropped logically, but some files are not deleted yet",
104            region_id
105        );
106
107        self.region_count.dec();
108
109        let object_store = region.access_layer.object_store().clone();
110        let dropping_regions = self.dropping_regions.clone();
111        let listener = self.listener.clone();
112        let intm_manager = self.intermediate_manager.clone();
113        let cache_manager = self.cache_manager.clone();
114        let gc_enabled = self.file_ref_manager.is_gc_enabled();
115
116        common_runtime::spawn_global(async move {
117            let removed = if gc_enabled {
118                later_drop_task_with_global_gc(
119                    region_id,
120                    region_dir.clone(),
121                    path_type,
122                    object_store,
123                    dropping_regions,
124                    partial_drop,
125                )
126                .await
127            } else {
128                let gc_duration = listener
129                    .on_later_drop_begin(region_id)
130                    .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
131
132                later_drop_task_without_global_gc(
133                    region_id,
134                    region_dir.clone(),
135                    object_store,
136                    dropping_regions,
137                    gc_duration,
138                )
139                .await
140            };
141
142            if let Err(err) = intm_manager.prune_region_dir(&region_id).await {
143                warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
144            }
145
146            if let Some(write_cache) = cache_manager.write_cache()
147                && let Some(manifest_cache) = write_cache.manifest_cache()
148            {
149                manifest_cache.clean_manifests(&table_dir).await;
150            }
151
152            listener.on_later_drop_end(region_id, removed);
153        });
154
155        Ok(0)
156    }
157}
158
159/// Background GC task to remove the entire region path once one of the following
160/// conditions is true:
161/// - It finds there is no parquet file left.
162/// - After `gc_duration`.
163///
164/// Returns whether the path is removed.
165///
166/// This task will retry on failure and keep running until finished. Any resource
167/// captured by it will not be released before then. Be sure to only pass weak reference
168/// if something is depended on ref-count mechanism.
169async fn later_drop_task_without_global_gc(
170    region_id: RegionId,
171    region_path: String,
172    object_store: ObjectStore,
173    dropping_regions: RegionMapRef,
174    gc_duration: Duration,
175) -> bool {
176    remove_region_with_retry(
177        region_id,
178        region_path,
179        object_store,
180        dropping_regions,
181        Some(gc_duration),
182        false,
183    )
184    .await
185}
186
187async fn remove_region_with_retry(
188    region_id: RegionId,
189    region_path: String,
190    object_store: ObjectStore,
191    dropping_regions: std::sync::Arc<crate::region::RegionMap>,
192    gc_duration: Option<Duration>,
193    mut force: bool,
194) -> bool {
195    for _ in 0..MAX_RETRY_TIMES {
196        let result = remove_region_dir_once(&region_path, &object_store, force).await;
197        match result {
198            Err(err) => {
199                warn!(
200                    "Error occurs during trying to GC region dir {}: {}",
201                    region_path, err
202                );
203            }
204            Ok(true) => {
205                dropping_regions.remove_region(region_id);
206                info!("Region {} is dropped, force: {}", region_path, force);
207                return true;
208            }
209            Ok(false) => (),
210        }
211        if let Some(duration) = gc_duration {
212            sleep(duration).await;
213        }
214        // Force recycle after gc duration.
215        force = true;
216    }
217
218    warn!(
219        "Failed to GC region dir {} after {} retries, giving up",
220        region_path, MAX_RETRY_TIMES
221    );
222
223    false
224}
225
226async fn later_drop_task_with_global_gc(
227    region_id: RegionId,
228    region_path: String,
229    path_type: PathType,
230    object_store: ObjectStore,
231    dropping_regions: RegionMapRef,
232    partial_drop: bool,
233) -> bool {
234    // For metadata regions or regions marked for full deletion (such as when dropping a table)
235    // the region directory is forcefully removed immediately.
236    //
237    // TODO(discord9): Evaluate removing files instantly rather than waiting for the GC period.
238    if path_type == PathType::Metadata || !partial_drop {
239        remove_region_with_retry(
240            region_id,
241            region_path,
242            object_store,
243            dropping_regions,
244            None,
245            true,
246        )
247        .await
248    } else {
249        // left for global gc
250        dropping_regions.remove_region(region_id);
251        true
252    }
253}
254
255// TODO(ruihang): place the marker in a separate dir
256/// Removes region dir if there is no parquet files, returns whether the directory is removed.
257/// If `force = true`, always removes the dir.
258pub(crate) async fn remove_region_dir_once(
259    region_path: &str,
260    object_store: &ObjectStore,
261    force: bool,
262) -> Result<bool> {
263    // list all files under the given region path to check if there are un-deleted parquet files
264    let mut has_parquet_file = false;
265    // record all paths that neither ends with .parquet nor the marker file
266    let mut files_to_remove_first = vec![];
267    let mut files = object_store
268        .lister_with(region_path)
269        .await
270        .context(OpenDalSnafu)?;
271    while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
272        if !force && file.path().ends_with(".parquet") {
273            // If not in force mode, we only remove the region dir if there is no parquet file
274            has_parquet_file = true;
275            break;
276        } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
277            let meta = file.metadata();
278            if meta.mode() == EntryMode::FILE {
279                files_to_remove_first.push(file.path().to_string());
280            }
281        }
282    }
283
284    if !has_parquet_file {
285        // no parquet file found, delete the region path
286        // first delete all files other than the marker
287        object_store
288            .delete_iter(files_to_remove_first)
289            .await
290            .context(OpenDalSnafu)?;
291        // then remove the marker with this dir
292        object_store
293            .remove_all(region_path)
294            .await
295            .context(OpenDalSnafu)?;
296        Ok(true)
297    } else {
298        Ok(false)
299    }
300}