mito2/worker/
handle_drop.rs1use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::{AffectedRows, PathType};
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; const MAX_RETRY_TIMES: u64 = 12; impl<S> RegionWorkerLoop<S>
38where
39 S: LogStore,
40{
41 pub(crate) async fn handle_drop_request(
42 &mut self,
43 region_id: RegionId,
44 partial_drop: bool,
45 ) -> Result<AffectedRows> {
46 let region = self.regions.writable_region(region_id)?;
47
48 info!("Try to drop region: {}, worker: {}", region_id, self.id);
49
50 let is_staging = region.is_staging();
51 let expect_state = if is_staging {
52 RegionLeaderState::Staging
53 } else {
54 RegionLeaderState::Writable
55 };
56 region.set_dropping(expect_state)?;
58 let region_dir = region.access_layer.build_region_dir(region_id);
61 let path_type = region.access_layer.path_type();
62 let table_dir = region.access_layer.table_dir().to_string();
63 let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE);
64 region
65 .access_layer
66 .object_store()
67 .write(&marker_path, Bytes::new())
68 .await
69 .context(OpenDalSnafu)
70 .inspect_err(|e| {
71 error!(e; "Failed to write the drop marker file for region {}", region_id);
72
73 region.switch_state_to_writable(RegionLeaderState::Dropping);
76 })?;
77
78 region.stop().await;
79 self.regions.remove_region(region_id);
81 self.dropping_regions.insert_region(region.clone());
82
83 self.wal
85 .obsolete(
86 region_id,
87 region.version_control.current().last_entry_id,
88 ®ion.provider,
89 )
90 .await?;
91 self.flush_scheduler.on_region_dropped(region_id);
93 self.compaction_scheduler.on_region_dropped(region_id);
95 self.index_build_scheduler
97 .on_region_dropped(region_id)
98 .await;
99
100 region.version_control.mark_dropped();
102 info!(
103 "Region {} is dropped logically, but some files are not deleted yet",
104 region_id
105 );
106
107 self.region_count.dec();
108
109 let object_store = region.access_layer.object_store().clone();
110 let dropping_regions = self.dropping_regions.clone();
111 let listener = self.listener.clone();
112 let intm_manager = self.intermediate_manager.clone();
113 let cache_manager = self.cache_manager.clone();
114 let gc_enabled = self.file_ref_manager.is_gc_enabled();
115
116 common_runtime::spawn_global(async move {
117 let removed = if gc_enabled {
118 later_drop_task_with_global_gc(
119 region_id,
120 region_dir.clone(),
121 path_type,
122 object_store,
123 dropping_regions,
124 partial_drop,
125 )
126 .await
127 } else {
128 let gc_duration = listener
129 .on_later_drop_begin(region_id)
130 .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
131
132 later_drop_task_without_global_gc(
133 region_id,
134 region_dir.clone(),
135 object_store,
136 dropping_regions,
137 gc_duration,
138 )
139 .await
140 };
141
142 if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
143 warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
144 }
145
146 if let Some(write_cache) = cache_manager.write_cache()
147 && let Some(manifest_cache) = write_cache.manifest_cache()
148 {
149 manifest_cache.clean_manifests(&table_dir).await;
150 }
151
152 listener.on_later_drop_end(region_id, removed);
153 });
154
155 Ok(0)
156 }
157}
158
159async fn later_drop_task_without_global_gc(
170 region_id: RegionId,
171 region_path: String,
172 object_store: ObjectStore,
173 dropping_regions: RegionMapRef,
174 gc_duration: Duration,
175) -> bool {
176 remove_region_with_retry(
177 region_id,
178 region_path,
179 object_store,
180 dropping_regions,
181 Some(gc_duration),
182 false,
183 )
184 .await
185}
186
187async fn remove_region_with_retry(
188 region_id: RegionId,
189 region_path: String,
190 object_store: ObjectStore,
191 dropping_regions: std::sync::Arc<crate::region::RegionMap>,
192 gc_duration: Option<Duration>,
193 mut force: bool,
194) -> bool {
195 for _ in 0..MAX_RETRY_TIMES {
196 let result = remove_region_dir_once(®ion_path, &object_store, force).await;
197 match result {
198 Err(err) => {
199 warn!(
200 "Error occurs during trying to GC region dir {}: {}",
201 region_path, err
202 );
203 }
204 Ok(true) => {
205 dropping_regions.remove_region(region_id);
206 info!("Region {} is dropped, force: {}", region_path, force);
207 return true;
208 }
209 Ok(false) => (),
210 }
211 if let Some(duration) = gc_duration {
212 sleep(duration).await;
213 }
214 force = true;
216 }
217
218 warn!(
219 "Failed to GC region dir {} after {} retries, giving up",
220 region_path, MAX_RETRY_TIMES
221 );
222
223 false
224}
225
226async fn later_drop_task_with_global_gc(
227 region_id: RegionId,
228 region_path: String,
229 path_type: PathType,
230 object_store: ObjectStore,
231 dropping_regions: RegionMapRef,
232 partial_drop: bool,
233) -> bool {
234 if path_type == PathType::Metadata || !partial_drop {
239 remove_region_with_retry(
240 region_id,
241 region_path,
242 object_store,
243 dropping_regions,
244 None,
245 true,
246 )
247 .await
248 } else {
249 dropping_regions.remove_region(region_id);
251 true
252 }
253}
254
255pub(crate) async fn remove_region_dir_once(
259 region_path: &str,
260 object_store: &ObjectStore,
261 force: bool,
262) -> Result<bool> {
263 let mut has_parquet_file = false;
265 let mut files_to_remove_first = vec![];
267 let mut files = object_store
268 .lister_with(region_path)
269 .await
270 .context(OpenDalSnafu)?;
271 while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
272 if !force && file.path().ends_with(".parquet") {
273 has_parquet_file = true;
275 break;
276 } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
277 let meta = file.metadata();
278 if meta.mode() == EntryMode::FILE {
279 files_to_remove_first.push(file.path().to_string());
280 }
281 }
282 }
283
284 if !has_parquet_file {
285 object_store
288 .delete_iter(files_to_remove_first)
289 .await
290 .context(OpenDalSnafu)?;
291 object_store
293 .remove_all(region_path)
294 .await
295 .context(OpenDalSnafu)?;
296 Ok(true)
297 } else {
298 Ok(false)
299 }
300}