mito2/worker/
handle_drop.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling drop request.
16
17use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::AffectedRows;
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
35const MAX_RETRY_TIMES: u64 = 12; // 1 hours (5m * 12)
36
37impl<S> RegionWorkerLoop<S>
38where
39    S: LogStore,
40{
41    pub(crate) async fn handle_drop_request(
42        &mut self,
43        region_id: RegionId,
44    ) -> Result<AffectedRows> {
45        let region = self.regions.writable_non_staging_region(region_id)?;
46
47        info!("Try to drop region: {}, worker: {}", region_id, self.id);
48
49        // Marks the region as dropping.
50        region.set_dropping()?;
51        // Writes dropping marker
52        // We rarely drop a region so we still operate in the worker loop.
53        let region_dir = region.access_layer.build_region_dir(region_id);
54        let marker_path = join_path(&region_dir, DROPPING_MARKER_FILE);
55        region
56            .access_layer
57            .object_store()
58            .write(&marker_path, Bytes::new())
59            .await
60            .context(OpenDalSnafu)
61            .inspect_err(|e| {
62                error!(e; "Failed to write the drop marker file for region {}", region_id);
63
64                // Sets the state back to writable. It's possible that the marker file has been written.
65                // We set the state back to writable so we can retry the drop operation.
66                region.switch_state_to_writable(RegionLeaderState::Dropping);
67            })?;
68
69        region.stop().await;
70        // Removes this region from region map to prevent other requests from accessing this region
71        self.regions.remove_region(region_id);
72        self.dropping_regions.insert_region(region.clone());
73
74        // Delete region data in WAL.
75        self.wal
76            .obsolete(
77                region_id,
78                region.version_control.current().last_entry_id,
79                &region.provider,
80            )
81            .await?;
82        // Notifies flush scheduler.
83        self.flush_scheduler.on_region_dropped(region_id);
84        // Notifies compaction scheduler.
85        self.compaction_scheduler.on_region_dropped(region_id);
86        // notifies index build scheduler.
87        self.index_build_scheduler
88            .on_region_dropped(region_id)
89            .await;
90
91        // Marks region version as dropped
92        region.version_control.mark_dropped();
93        info!(
94            "Region {} is dropped logically, but some files are not deleted yet",
95            region_id
96        );
97
98        self.region_count.dec();
99
100        // Detaches a background task to delete the region dir
101        let object_store = region.access_layer.object_store().clone();
102        let dropping_regions = self.dropping_regions.clone();
103        let listener = self.listener.clone();
104        let intm_manager = self.intermediate_manager.clone();
105        common_runtime::spawn_global(async move {
106            let gc_duration = listener
107                .on_later_drop_begin(region_id)
108                .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
109            let removed = later_drop_task(
110                region_id,
111                region_dir,
112                object_store,
113                dropping_regions,
114                gc_duration,
115            )
116            .await;
117            if let Err(err) = intm_manager.prune_region_dir(&region_id).await {
118                warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
119            }
120            listener.on_later_drop_end(region_id, removed);
121        });
122
123        Ok(0)
124    }
125}
126
127/// Background GC task to remove the entire region path once one of the following
128/// conditions is true:
129/// - It finds there is no parquet file left.
130/// - After `gc_duration`.
131///
132/// Returns whether the path is removed.
133///
134/// This task will retry on failure and keep running until finished. Any resource
135/// captured by it will not be released before then. Be sure to only pass weak reference
136/// if something is depended on ref-count mechanism.
137async fn later_drop_task(
138    region_id: RegionId,
139    region_path: String,
140    object_store: ObjectStore,
141    dropping_regions: RegionMapRef,
142    gc_duration: Duration,
143) -> bool {
144    let mut force = false;
145    for _ in 0..MAX_RETRY_TIMES {
146        let result = remove_region_dir_once(&region_path, &object_store, force).await;
147        match result {
148            Err(err) => {
149                warn!(
150                    "Error occurs during trying to GC region dir {}: {}",
151                    region_path, err
152                );
153            }
154            Ok(true) => {
155                dropping_regions.remove_region(region_id);
156                info!("Region {} is dropped, force: {}", region_path, force);
157                return true;
158            }
159            Ok(false) => (),
160        }
161        sleep(gc_duration).await;
162        // Force recycle after gc duration.
163        force = true;
164    }
165
166    warn!(
167        "Failed to GC region dir {} after {} retries, giving up",
168        region_path, MAX_RETRY_TIMES
169    );
170
171    false
172}
173
174// TODO(ruihang): place the marker in a separate dir
175/// Removes region dir if there is no parquet files, returns whether the directory is removed.
176/// If `force = true`, always removes the dir.
177pub(crate) async fn remove_region_dir_once(
178    region_path: &str,
179    object_store: &ObjectStore,
180    force: bool,
181) -> Result<bool> {
182    // list all files under the given region path to check if there are un-deleted parquet files
183    let mut has_parquet_file = false;
184    // record all paths that neither ends with .parquet nor the marker file
185    let mut files_to_remove_first = vec![];
186    let mut files = object_store
187        .lister_with(region_path)
188        .await
189        .context(OpenDalSnafu)?;
190    while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
191        if !force && file.path().ends_with(".parquet") {
192            // If not in force mode, we only remove the region dir if there is no parquet file
193            has_parquet_file = true;
194            break;
195        } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
196            let meta = file.metadata();
197            if meta.mode() == EntryMode::FILE {
198                files_to_remove_first.push(file.path().to_string());
199            }
200        }
201    }
202
203    if !has_parquet_file {
204        // no parquet file found, delete the region path
205        // first delete all files other than the marker
206        object_store
207            .delete_iter(files_to_remove_first)
208            .await
209            .context(OpenDalSnafu)?;
210        // then remove the marker with this dir
211        object_store
212            .remove_all(region_path)
213            .await
214            .context(OpenDalSnafu)?;
215        Ok(true)
216    } else {
217        Ok(false)
218    }
219}