mito2/worker/
handle_drop.rs1use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::AffectedRows;
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; const MAX_RETRY_TIMES: u64 = 12; impl<S> RegionWorkerLoop<S>
38where
39    S: LogStore,
40{
41    pub(crate) async fn handle_drop_request(
42        &mut self,
43        region_id: RegionId,
44    ) -> Result<AffectedRows> {
45        let region = self.regions.writable_non_staging_region(region_id)?;
46
47        info!("Try to drop region: {}, worker: {}", region_id, self.id);
48
49        region.set_dropping()?;
51        let region_dir = region.access_layer.build_region_dir(region_id);
54        let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE);
55        region
56            .access_layer
57            .object_store()
58            .write(&marker_path, Bytes::new())
59            .await
60            .context(OpenDalSnafu)
61            .inspect_err(|e| {
62                error!(e; "Failed to write the drop marker file for region {}", region_id);
63
64                region.switch_state_to_writable(RegionLeaderState::Dropping);
67            })?;
68
69        region.stop().await;
70        self.regions.remove_region(region_id);
72        self.dropping_regions.insert_region(region.clone());
73
74        self.wal
76            .obsolete(
77                region_id,
78                region.version_control.current().last_entry_id,
79                ®ion.provider,
80            )
81            .await?;
82        self.flush_scheduler.on_region_dropped(region_id);
84        self.compaction_scheduler.on_region_dropped(region_id);
86        self.index_build_scheduler
88            .on_region_dropped(region_id)
89            .await;
90
91        region
93            .version_control
94            .mark_dropped(®ion.memtable_builder);
95        info!(
96            "Region {} is dropped logically, but some files are not deleted yet",
97            region_id
98        );
99
100        self.region_count.dec();
101
102        let object_store = region.access_layer.object_store().clone();
104        let dropping_regions = self.dropping_regions.clone();
105        let listener = self.listener.clone();
106        let intm_manager = self.intermediate_manager.clone();
107        common_runtime::spawn_global(async move {
108            let gc_duration = listener
109                .on_later_drop_begin(region_id)
110                .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
111            let removed = later_drop_task(
112                region_id,
113                region_dir,
114                object_store,
115                dropping_regions,
116                gc_duration,
117            )
118            .await;
119            if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
120                warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
121            }
122            listener.on_later_drop_end(region_id, removed);
123        });
124
125        Ok(0)
126    }
127}
128
129async fn later_drop_task(
140    region_id: RegionId,
141    region_path: String,
142    object_store: ObjectStore,
143    dropping_regions: RegionMapRef,
144    gc_duration: Duration,
145) -> bool {
146    let mut force = false;
147    for _ in 0..MAX_RETRY_TIMES {
148        let result = remove_region_dir_once(®ion_path, &object_store, force).await;
149        match result {
150            Err(err) => {
151                warn!(
152                    "Error occurs during trying to GC region dir {}: {}",
153                    region_path, err
154                );
155            }
156            Ok(true) => {
157                dropping_regions.remove_region(region_id);
158                info!("Region {} is dropped, force: {}", region_path, force);
159                return true;
160            }
161            Ok(false) => (),
162        }
163        sleep(gc_duration).await;
164        force = true;
166    }
167
168    warn!(
169        "Failed to GC region dir {} after {} retries, giving up",
170        region_path, MAX_RETRY_TIMES
171    );
172
173    false
174}
175
176pub(crate) async fn remove_region_dir_once(
180    region_path: &str,
181    object_store: &ObjectStore,
182    force: bool,
183) -> Result<bool> {
184    let mut has_parquet_file = false;
186    let mut files_to_remove_first = vec![];
188    let mut files = object_store
189        .lister_with(region_path)
190        .await
191        .context(OpenDalSnafu)?;
192    while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
193        if !force && file.path().ends_with(".parquet") {
194            has_parquet_file = true;
196            break;
197        } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
198            let meta = file.metadata();
199            if meta.mode() == EntryMode::FILE {
200                files_to_remove_first.push(file.path().to_string());
201            }
202        }
203    }
204
205    if !has_parquet_file {
206        object_store
209            .delete_iter(files_to_remove_first)
210            .await
211            .context(OpenDalSnafu)?;
212        object_store
214            .remove_all(region_path)
215            .await
216            .context(OpenDalSnafu)?;
217        Ok(true)
218    } else {
219        Ok(false)
220    }
221}