mito2/worker/
handle_drop.rs1use std::time::Duration;
18
19use bytes::Bytes;
20use common_telemetry::{error, info, warn};
21use futures::TryStreamExt;
22use object_store::util::join_path;
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25use store_api::logstore::LogStore;
26use store_api::region_request::AffectedRows;
27use store_api::storage::RegionId;
28use tokio::time::sleep;
29
30use crate::error::{OpenDalSnafu, Result};
31use crate::region::{RegionLeaderState, RegionMapRef};
32use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
33
34const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; const MAX_RETRY_TIMES: u64 = 12; impl<S> RegionWorkerLoop<S>
38where
39 S: LogStore,
40{
41 pub(crate) async fn handle_drop_request(
42 &mut self,
43 region_id: RegionId,
44 ) -> Result<AffectedRows> {
45 let region = self.regions.writable_non_staging_region(region_id)?;
46
47 info!("Try to drop region: {}, worker: {}", region_id, self.id);
48
49 region.set_dropping()?;
51 let region_dir = region.access_layer.build_region_dir(region_id);
54 let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE);
55 region
56 .access_layer
57 .object_store()
58 .write(&marker_path, Bytes::new())
59 .await
60 .context(OpenDalSnafu)
61 .inspect_err(|e| {
62 error!(e; "Failed to write the drop marker file for region {}", region_id);
63
64 region.switch_state_to_writable(RegionLeaderState::Dropping);
67 })?;
68
69 region.stop().await;
70 self.regions.remove_region(region_id);
72 self.dropping_regions.insert_region(region.clone());
73
74 self.wal
76 .obsolete(
77 region_id,
78 region.version_control.current().last_entry_id,
79 ®ion.provider,
80 )
81 .await?;
82 self.flush_scheduler.on_region_dropped(region_id);
84 self.compaction_scheduler.on_region_dropped(region_id);
86
87 region
89 .version_control
90 .mark_dropped(®ion.memtable_builder);
91 info!(
92 "Region {} is dropped logically, but some files are not deleted yet",
93 region_id
94 );
95
96 self.region_count.dec();
97
98 let object_store = region.access_layer.object_store().clone();
100 let dropping_regions = self.dropping_regions.clone();
101 let listener = self.listener.clone();
102 let intm_manager = self.intermediate_manager.clone();
103 common_runtime::spawn_global(async move {
104 let gc_duration = listener
105 .on_later_drop_begin(region_id)
106 .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
107 let removed = later_drop_task(
108 region_id,
109 region_dir,
110 object_store,
111 dropping_regions,
112 gc_duration,
113 )
114 .await;
115 if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
116 warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
117 }
118 listener.on_later_drop_end(region_id, removed);
119 });
120
121 Ok(0)
122 }
123}
124
125async fn later_drop_task(
136 region_id: RegionId,
137 region_path: String,
138 object_store: ObjectStore,
139 dropping_regions: RegionMapRef,
140 gc_duration: Duration,
141) -> bool {
142 let mut force = false;
143 for _ in 0..MAX_RETRY_TIMES {
144 let result = remove_region_dir_once(®ion_path, &object_store, force).await;
145 match result {
146 Err(err) => {
147 warn!(
148 "Error occurs during trying to GC region dir {}: {}",
149 region_path, err
150 );
151 }
152 Ok(true) => {
153 dropping_regions.remove_region(region_id);
154 info!("Region {} is dropped, force: {}", region_path, force);
155 return true;
156 }
157 Ok(false) => (),
158 }
159 sleep(gc_duration).await;
160 force = true;
162 }
163
164 warn!(
165 "Failed to GC region dir {} after {} retries, giving up",
166 region_path, MAX_RETRY_TIMES
167 );
168
169 false
170}
171
172pub(crate) async fn remove_region_dir_once(
176 region_path: &str,
177 object_store: &ObjectStore,
178 force: bool,
179) -> Result<bool> {
180 let mut has_parquet_file = false;
182 let mut files_to_remove_first = vec![];
184 let mut files = object_store
185 .lister_with(region_path)
186 .await
187 .context(OpenDalSnafu)?;
188 while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
189 if !force && file.path().ends_with(".parquet") {
190 has_parquet_file = true;
192 break;
193 } else if !file.path().ends_with(DROPPING_MARKER_FILE) {
194 let meta = file.metadata();
195 if meta.mode() == EntryMode::FILE {
196 files_to_remove_first.push(file.path().to_string());
197 }
198 }
199 }
200
201 if !has_parquet_file {
202 object_store
205 .delete_iter(files_to_remove_first)
206 .await
207 .context(OpenDalSnafu)?;
208 object_store
210 .remove_all(region_path)
211 .await
212 .context(OpenDalSnafu)?;
213 Ok(true)
214 } else {
215 Ok(false)
216 }
217}