mito2/worker/
handle_drop.rs1use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::AffectedRows;
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; const MAX_RETRY_TIMES: u64 = 12; impl<S> RegionWorkerLoop<S>
38where
39 S: LogStore,
40{
41 pub(crate) async fn handle_drop_request(
42 &mut self,
43 region_id: RegionId,
44 ) -> Result<AffectedRows> {
45 let region = self.regions.writable_non_staging_region(region_id)?;
46
47 info!("Try to drop region: {}, worker: {}", region_id, self.id);
48
49 region.set_dropping()?;
51 let region_dir = region.access_layer.build_region_dir(region_id);
54 let table_dir = region.access_layer.table_dir().to_string();
55 let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE);
56 region
57 .access_layer
58 .object_store()
59 .write(&marker_path, Bytes::new())
60 .await
61 .context(OpenDalSnafu)
62 .inspect_err(|e| {
63 error!(e; "Failed to write the drop marker file for region {}", region_id);
64
65 region.switch_state_to_writable(RegionLeaderState::Dropping);
68 })?;
69
70 region.stop().await;
71 self.regions.remove_region(region_id);
73 self.dropping_regions.insert_region(region.clone());
74
75 self.wal
77 .obsolete(
78 region_id,
79 region.version_control.current().last_entry_id,
80 ®ion.provider,
81 )
82 .await?;
83 self.flush_scheduler.on_region_dropped(region_id);
85 self.compaction_scheduler.on_region_dropped(region_id);
87 self.index_build_scheduler
89 .on_region_dropped(region_id)
90 .await;
91
92 region.version_control.mark_dropped();
94 info!(
95 "Region {} is dropped logically, but some files are not deleted yet",
96 region_id
97 );
98
99 self.region_count.dec();
100
101 let object_store = region.access_layer.object_store().clone();
103 let dropping_regions = self.dropping_regions.clone();
104 let listener = self.listener.clone();
105 let intm_manager = self.intermediate_manager.clone();
106 let cache_manager = self.cache_manager.clone();
107 common_runtime::spawn_global(async move {
108 let gc_duration = listener
109 .on_later_drop_begin(region_id)
110 .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
111 let removed = later_drop_task(
112 region_id,
113 region_dir.clone(),
114 object_store,
115 dropping_regions,
116 gc_duration,
117 )
118 .await;
119 if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
120 warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
121 }
122
123 if let Some(write_cache) = cache_manager.write_cache()
125 && let Some(manifest_cache) = write_cache.manifest_cache()
126 {
127 manifest_cache.clean_manifests(&table_dir).await;
130 }
131
132 listener.on_later_drop_end(region_id, removed);
133 });
134
135 Ok(0)
136 }
137}
138
139async fn later_drop_task(
150 region_id: RegionId,
151 region_path: String,
152 object_store: ObjectStore,
153 dropping_regions: RegionMapRef,
154 gc_duration: Duration,
155) -> bool {
156 let mut force = false;
157 for _ in 0..MAX_RETRY_TIMES {
158 let result = remove_region_dir_once(®ion_path, &object_store, force).await;
159 match result {
160 Err(err) => {
161 warn!(
162 "Error occurs during trying to GC region dir {}: {}",
163 region_path, err
164 );
165 }
166 Ok(true) => {
167 dropping_regions.remove_region(region_id);
168 info!("Region {} is dropped, force: {}", region_path, force);
169 return true;
170 }
171 Ok(false) => (),
172 }
173 sleep(gc_duration).await;
174 force = true;
176 }
177
178 warn!(
179 "Failed to GC region dir {} after {} retries, giving up",
180 region_path, MAX_RETRY_TIMES
181 );
182
183 false
184}
185
186pub(crate) async fn remove_region_dir_once(
190 region_path: &str,
191 object_store: &ObjectStore,
192 force: bool,
193) -> Result<bool> {
194 let mut has_parquet_file = false;
196 let mut files_to_remove_first = vec![];
198 let mut files = object_store
199 .lister_with(region_path)
200 .await
201 .context(OpenDalSnafu)?;
202 while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
203 if !force && file.path().ends_with(".parquet") {
204 has_parquet_file = true;
206 break;
207 } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
208 let meta = file.metadata();
209 if meta.mode() == EntryMode::FILE {
210 files_to_remove_first.push(file.path().to_string());
211 }
212 }
213 }
214
215 if !has_parquet_file {
216 object_store
219 .delete_iter(files_to_remove_first)
220 .await
221 .context(OpenDalSnafu)?;
222 object_store
224 .remove_all(region_path)
225 .await
226 .context(OpenDalSnafu)?;
227 Ok(true)
228 } else {
229 Ok(false)
230 }
231}