mito2/worker/
handle_drop.rs1use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::AffectedRows;
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; const MAX_RETRY_TIMES: u64 = 12; impl<S> RegionWorkerLoop<S>
38where
39 S: LogStore,
40{
41 pub(crate) async fn handle_drop_request(
42 &mut self,
43 region_id: RegionId,
44 ) -> Result<AffectedRows> {
45 let region = self.regions.writable_region(region_id)?;
46
47 info!("Try to drop region: {}, worker: {}", region_id, self.id);
48
49 region.set_dropping()?;
51 let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE);
54 region
55 .access_layer
56 .object_store()
57 .write(&marker_path, Bytes::new())
58 .await
59 .context(OpenDalSnafu)
60 .inspect_err(|e| {
61 error!(e; "Failed to write the drop marker file for region {}", region_id);
62
63 region.switch_state_to_writable(RegionLeaderState::Dropping);
66 })?;
67
68 region.stop().await;
69 self.regions.remove_region(region_id);
71 self.dropping_regions.insert_region(region.clone());
72
73 self.wal
75 .obsolete(
76 region_id,
77 region.version_control.current().last_entry_id,
78 ®ion.provider,
79 )
80 .await?;
81 self.flush_scheduler.on_region_dropped(region_id);
83 self.compaction_scheduler.on_region_dropped(region_id);
85
86 region
88 .version_control
89 .mark_dropped(®ion.memtable_builder);
90 info!(
91 "Region {} is dropped logically, but some files are not deleted yet",
92 region_id
93 );
94
95 self.region_count.dec();
96
97 let region_dir = region.access_layer.region_dir().to_owned();
99 let object_store = region.access_layer.object_store().clone();
100 let dropping_regions = self.dropping_regions.clone();
101 let listener = self.listener.clone();
102 common_runtime::spawn_global(async move {
103 let gc_duration = listener
104 .on_later_drop_begin(region_id)
105 .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
106 let removed = later_drop_task(
107 region_id,
108 region_dir,
109 object_store,
110 dropping_regions,
111 gc_duration,
112 )
113 .await;
114 listener.on_later_drop_end(region_id, removed);
115 });
116
117 Ok(0)
118 }
119}
120
121async fn later_drop_task(
132 region_id: RegionId,
133 region_path: String,
134 object_store: ObjectStore,
135 dropping_regions: RegionMapRef,
136 gc_duration: Duration,
137) -> bool {
138 let mut force = false;
139 for _ in 0..MAX_RETRY_TIMES {
140 let result = remove_region_dir_once(®ion_path, &object_store, force).await;
141 match result {
142 Err(err) => {
143 warn!(
144 "Error occurs during trying to GC region dir {}: {}",
145 region_path, err
146 );
147 }
148 Ok(true) => {
149 dropping_regions.remove_region(region_id);
150 info!("Region {} is dropped, force: {}", region_path, force);
151 return true;
152 }
153 Ok(false) => (),
154 }
155 sleep(gc_duration).await;
156 force = true;
158 }
159
160 warn!(
161 "Failed to GC region dir {} after {} retries, giving up",
162 region_path, MAX_RETRY_TIMES
163 );
164
165 false
166}
167
168pub(crate) async fn remove_region_dir_once(
172 region_path: &str,
173 object_store: &ObjectStore,
174 force: bool,
175) -> Result<bool> {
176 let mut has_parquet_file = false;
178 let mut files_to_remove_first = vec![];
180 let mut files = object_store
181 .lister_with(region_path)
182 .await
183 .context(OpenDalSnafu)?;
184 while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
185 if !force && file.path().ends_with(".parquet") {
186 has_parquet_file = true;
188 break;
189 } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
190 let meta = file.metadata();
191 if meta.mode() == EntryMode::FILE {
192 files_to_remove_first.push(file.path().to_string());
193 }
194 }
195 }
196
197 if !has_parquet_file {
198 object_store
201 .delete_iter(files_to_remove_first)
202 .await
203 .context(OpenDalSnafu)?;
204 object_store
206 .remove_all(region_path)
207 .await
208 .context(OpenDalSnafu)?;
209 Ok(true)
210 } else {
211 Ok(false)
212 }
213}