1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36 self.flush_scheduler.on_flush_failed(region_id, request.err);
37 debug!(
38 "Flush failed for region {}, handling stalled requests",
39 region_id
40 );
41 self.maybe_flush_worker();
43
44 self.handle_stalled_requests().await;
46 }
47
48 pub(crate) fn maybe_flush_worker(&mut self) {
51 if !self.write_buffer_manager.should_flush_engine() {
52 debug!("No need to flush worker");
53 return;
55 }
56
57 if let Err(e) = self.flush_regions_on_engine_full() {
60 error!(e; "Failed to flush worker");
61 }
62 }
63
64 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
66 let regions = self.regions.list_regions();
67 let now = self.time_provider.current_time_millis();
68 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
69 let mut pending_regions = vec![];
70
71 for region in ®ions {
72 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
73 continue;
75 }
76
77 let version = region.version();
78 let region_memtable_size =
79 version.memtables.mutable_usage() + version.memtables.immutables_usage();
80
81 if region.last_flush_millis() < min_last_flush_time {
82 let task = self.new_flush_task(
84 region,
85 FlushReason::EngineFull,
86 None,
87 self.config.clone(),
88 region.is_staging(),
89 );
90 self.flush_scheduler.schedule_flush(
91 region.region_id,
92 ®ion.version_control,
93 task,
94 )?;
95 } else if region_memtable_size > 0 {
96 pending_regions.push((region, region_memtable_size));
98 }
99 }
100 pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
101 let target_memory_usage = self.write_buffer_manager.flush_limit();
105 let mut memory_usage = self.write_buffer_manager.memory_usage();
106
107 #[cfg(test)]
108 {
109 debug!(
110 "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
111 target_memory_usage,
112 memory_usage,
113 pending_regions
114 .iter()
115 .map(|(region, mem_size)| (region.region_id, mem_size))
116 .collect::<Vec<_>>()
117 );
118 }
119 for (region, region_mem_size) in pending_regions.into_iter() {
122 if memory_usage < target_memory_usage {
124 break;
126 }
127 let task = self.new_flush_task(
128 region,
129 FlushReason::EngineFull,
130 None,
131 self.config.clone(),
132 region.is_staging(),
133 );
134 debug!("Scheduling flush task for region {}", region.region_id);
135 self.flush_scheduler
137 .schedule_flush(region.region_id, ®ion.version_control, task)?;
138 memory_usage = memory_usage.saturating_sub(region_mem_size);
140 }
141
142 Ok(())
143 }
144
145 pub(crate) fn new_flush_task(
147 &self,
148 region: &MitoRegionRef,
149 reason: FlushReason,
150 row_group_size: Option<usize>,
151 engine_config: Arc<MitoConfig>,
152 is_staging: bool,
153 ) -> RegionFlushTask {
154 RegionFlushTask {
155 region_id: region.region_id,
156 reason,
157 senders: Vec::new(),
158 request_sender: self.sender.clone(),
159 access_layer: region.access_layer.clone(),
160 listener: self.listener.clone(),
161 engine_config,
162 row_group_size,
163 cache_manager: self.cache_manager.clone(),
164 manifest_ctx: region.manifest_ctx.clone(),
165 index_options: region.version().options.index_options.clone(),
166 flush_semaphore: self.flush_semaphore.clone(),
167 is_staging,
168 }
169 }
170}
171
172impl<S: LogStore> RegionWorkerLoop<S> {
173 pub(crate) fn handle_flush_request(
175 &mut self,
176 region_id: RegionId,
177 request: RegionFlushRequest,
178 mut sender: OptionOutputTx,
179 ) {
180 let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
181 return;
182 };
183 self.update_topic_latest_entry_id(®ion);
187
188 let reason = if region.is_downgrading() {
189 FlushReason::Downgrading
190 } else {
191 FlushReason::Manual
192 };
193
194 let mut task = self.new_flush_task(
195 ®ion,
196 reason,
197 request.row_group_size,
198 self.config.clone(),
199 region.is_staging(),
200 );
201 task.push_sender(sender);
202 if let Err(e) =
203 self.flush_scheduler
204 .schedule_flush(region.region_id, ®ion.version_control, task)
205 {
206 error!(e; "Failed to schedule flush task for region {}", region.region_id);
207 }
208 }
209
210 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
212 let regions = self.regions.list_regions();
213 let now = self.time_provider.current_time_millis();
214 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
215
216 for region in ®ions {
217 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
218 continue;
220 }
221 self.update_topic_latest_entry_id(region);
222
223 if region.last_flush_millis() < min_last_flush_time {
224 let task = self.new_flush_task(
226 region,
227 FlushReason::Periodically,
228 None,
229 self.config.clone(),
230 region.is_staging(),
231 );
232 self.flush_scheduler.schedule_flush(
233 region.region_id,
234 ®ion.version_control,
235 task,
236 )?;
237 }
238 }
239
240 Ok(())
241 }
242
243 pub(crate) async fn handle_flush_finished(
245 &mut self,
246 region_id: RegionId,
247 mut request: FlushFinished,
248 ) {
249 self.notify_group();
252
253 let region = match self.regions.get_region(region_id) {
254 Some(region) => region,
255 None => {
256 request.on_failure(RegionNotFoundSnafu { region_id }.build());
257 return;
258 }
259 };
260
261 if request.is_staging {
262 info!(
264 "Skipping region metadata update for region {} in staging mode",
265 region_id
266 );
267 region.version_control.apply_edit(
268 None,
269 &request.memtables_to_remove,
270 region.file_purger.clone(),
271 );
272 } else {
273 region.version_control.apply_edit(
274 Some(request.edit.clone()),
275 &request.memtables_to_remove,
276 region.file_purger.clone(),
277 );
278 }
279
280 region.update_flush_millis();
281
282 info!(
284 "Region {} flush finished, tries to bump wal to {}",
285 region_id, request.flushed_entry_id
286 );
287 if let Err(e) = self
288 .wal
289 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
290 .await
291 {
292 error!(e; "Failed to write wal, region: {}", region_id);
293 request.on_failure(e);
294 return;
295 }
296
297 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
298
299 request.on_success();
301
302 if self.config.index.build_mode == IndexBuildMode::Async {
304 self.handle_rebuild_index(
305 BuildIndexRequest {
306 region_id,
307 build_type: IndexBuildType::Flush,
308 file_metas: index_build_file_metas,
309 },
310 OptionOutputTx::new(None),
311 )
312 .await;
313 }
314
315 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
317 self.flush_scheduler.on_flush_success(region_id)
318 {
319 self.handle_ddl_requests(&mut ddl_requests).await;
321 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
323 .await;
324 }
325
326 self.maybe_flush_worker();
328
329 self.handle_stalled_requests().await;
331
332 self.schedule_compaction(®ion).await;
334
335 self.listener.on_flush_success(region_id);
336 }
337
338 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
341 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
342 let latest_offset = self
343 .wal
344 .store()
345 .latest_entry_id(®ion.provider)
346 .unwrap_or(0);
347 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
348
349 if latest_offset > topic_last_entry_id {
350 region
351 .topic_latest_entry_id
352 .store(latest_offset, Ordering::Relaxed);
353 debug!(
354 "Region {} latest entry id updated to {}",
355 region.region_id, latest_offset
356 );
357 }
358 }
359 }
360}