1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33fn resolve_flush_reason(
34 explicit_reason: Option<FlushReason>,
35 request_reason: Option<RegionFlushReason>,
36 is_downgrading: bool,
37) -> FlushReason {
38 explicit_reason
39 .or_else(|| request_reason.map(FlushReason::from))
40 .unwrap_or({
41 if is_downgrading {
42 FlushReason::Downgrading
43 } else {
44 FlushReason::Manual
45 }
46 })
47}
48
49impl<S: LogStore> RegionWorkerLoop<S> {
50 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
52 self.flush_scheduler.on_flush_failed(region_id, request.err);
53 debug!(
54 "Flush failed for region {}, handling stalled requests",
55 region_id
56 );
57 self.maybe_flush_worker();
59
60 self.handle_stalled_requests().await;
62 }
63
64 pub(crate) fn maybe_flush_worker(&mut self) {
67 if !self.write_buffer_manager.should_flush_engine() {
68 debug!("No need to flush worker");
69 return;
71 }
72
73 if let Err(e) = self.flush_regions_on_engine_full() {
76 error!(e; "Failed to flush worker");
77 }
78 }
79
80 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
82 let regions = self.regions.list_regions();
83 let now = self.time_provider.current_time_millis();
84 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
85 let mut pending_regions = vec![];
86
87 for region in ®ions {
88 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
89 continue;
91 }
92
93 let version = region.version();
94 let region_memtable_size =
95 version.memtables.mutable_usage() + version.memtables.immutables_usage();
96
97 if region.last_flush_millis() < min_last_flush_time {
98 let task =
100 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
101 self.flush_scheduler.schedule_flush(
102 region.region_id,
103 ®ion.version_control,
104 task,
105 )?;
106 } else if region_memtable_size > 0 {
107 pending_regions.push((region, region_memtable_size));
109 }
110 }
111 pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
112 let target_memory_usage = self.write_buffer_manager.flush_limit();
116 let mut memory_usage = self.write_buffer_manager.memory_usage();
117
118 #[cfg(test)]
119 {
120 debug!(
121 "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
122 target_memory_usage,
123 memory_usage,
124 pending_regions
125 .iter()
126 .map(|(region, mem_size)| (region.region_id, mem_size))
127 .collect::<Vec<_>>()
128 );
129 }
130 for (region, region_mem_size) in pending_regions.into_iter() {
133 if memory_usage < target_memory_usage {
135 break;
137 }
138 let task =
139 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
140 debug!("Scheduling flush task for region {}", region.region_id);
141 self.flush_scheduler
143 .schedule_flush(region.region_id, ®ion.version_control, task)?;
144 memory_usage = memory_usage.saturating_sub(region_mem_size);
146 }
147
148 Ok(())
149 }
150
151 pub(crate) fn new_flush_task(
153 &self,
154 region: &MitoRegionRef,
155 reason: FlushReason,
156 row_group_size: Option<usize>,
157 engine_config: Arc<MitoConfig>,
158 ) -> RegionFlushTask {
159 RegionFlushTask {
160 region_id: region.region_id,
161 reason,
162 senders: Vec::new(),
163 request_sender: self.sender.clone(),
164 access_layer: region.access_layer.clone(),
165 listener: self.listener.clone(),
166 engine_config,
167 row_group_size,
168 cache_manager: self.cache_manager.clone(),
169 manifest_ctx: region.manifest_ctx.clone(),
170 index_options: region.version().options.index_options.clone(),
171 flush_semaphore: self.flush_semaphore.clone(),
172 is_staging: region.is_staging(),
173 partition_expr: region.maybe_staging_partition_expr_str(),
174 }
175 }
176}
177
178impl<S: LogStore> RegionWorkerLoop<S> {
179 pub(crate) fn handle_flush_request(
181 &mut self,
182 region_id: RegionId,
183 request: RegionFlushRequest,
184 reason: Option<FlushReason>,
185 mut sender: OptionOutputTx,
186 ) {
187 let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
188 return;
189 };
190 self.update_topic_latest_entry_id(®ion);
194
195 let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading());
196 let mut task =
197 self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
198 task.push_sender(sender);
199 if let Err(e) =
200 self.flush_scheduler
201 .schedule_flush(region.region_id, ®ion.version_control, task)
202 {
203 error!(e; "Failed to schedule flush task for region {}", region.region_id);
204 }
205 }
206
207 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
209 let regions = self.regions.list_regions();
210 let now = self.time_provider.current_time_millis();
211 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
212
213 for region in ®ions {
214 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
215 continue;
217 }
218 self.update_topic_latest_entry_id(region);
219
220 if region.last_flush_millis() < min_last_flush_time {
221 let task = self.new_flush_task(
223 region,
224 FlushReason::Periodically,
225 None,
226 self.config.clone(),
227 );
228 self.flush_scheduler.schedule_flush(
229 region.region_id,
230 ®ion.version_control,
231 task,
232 )?;
233 }
234 }
235
236 Ok(())
237 }
238
239 pub(crate) async fn handle_flush_finished(
241 &mut self,
242 region_id: RegionId,
243 mut request: FlushFinished,
244 ) {
245 self.notify_group();
248
249 let region = match self.regions.get_region(region_id) {
250 Some(region) => region,
251 None => {
252 request.on_failure(RegionNotFoundSnafu { region_id }.build());
253 return;
254 }
255 };
256
257 if request.is_staging {
258 info!(
260 "Skipping region metadata update for region {} in staging mode",
261 region_id
262 );
263 region.version_control.apply_edit(
264 None,
265 &request.memtables_to_remove,
266 region.file_purger.clone(),
267 );
268 } else {
269 region.version_control.apply_edit(
270 Some(request.edit.clone()),
271 &request.memtables_to_remove,
272 region.file_purger.clone(),
273 );
274 }
275
276 region.update_flush_millis();
277
278 info!(
280 "Region {} flush finished, tries to bump wal to {}",
281 region_id, request.flushed_entry_id
282 );
283 if let Err(e) = self
284 .wal
285 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
286 .await
287 {
288 error!(e; "Failed to write wal, region: {}", region_id);
289 request.on_failure(e);
290 return;
291 }
292
293 let flush_on_close = request.flush_reason == FlushReason::Closing;
294 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
295
296 if self.config.index.build_mode == IndexBuildMode::Async {
298 self.handle_rebuild_index(
299 BuildIndexRequest {
300 region_id,
301 build_type: IndexBuildType::Flush,
302 file_metas: index_build_file_metas,
303 },
304 OptionOutputTx::new(None),
305 )
306 .await;
307 }
308
309 if flush_on_close {
310 self.remove_region(region_id).await;
313 info!("Region {} closed after flush", region_id);
314 request.on_success();
315 } else {
316 request.on_success();
318 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
320 self.flush_scheduler.on_flush_success(region_id)
321 {
322 self.handle_ddl_requests(&mut ddl_requests).await;
324 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
326 .await;
327 }
328 self.maybe_flush_worker();
330 self.handle_stalled_requests().await;
332 self.schedule_compaction(®ion).await;
334 }
335
336 self.listener.on_flush_success(region_id);
337 }
338
339 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
342 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
343 let latest_offset = self
344 .wal
345 .store()
346 .latest_entry_id(®ion.provider)
347 .unwrap_or(0);
348 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
349
350 if latest_offset > topic_last_entry_id {
351 region
352 .topic_latest_entry_id
353 .store(latest_offset, Ordering::Relaxed);
354 debug!(
355 "Region {} latest entry id updated to {}",
356 region.region_id, latest_offset
357 );
358 }
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_resolve_flush_reason_prefers_explicit_reason() {
369 let reason = resolve_flush_reason(
370 Some(FlushReason::Closing),
371 Some(RegionFlushReason::RemoteWalPrune),
372 true,
373 );
374 assert_eq!(reason, FlushReason::Closing);
375 }
376
377 #[test]
378 fn test_resolve_flush_reason_uses_request_reason() {
379 assert_eq!(
380 resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true),
381 FlushReason::RegionMigration
382 );
383 assert_eq!(
384 resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false),
385 FlushReason::Repartition
386 );
387 assert_eq!(
388 resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false),
389 FlushReason::RemoteWalPrune
390 );
391 }
392
393 #[test]
394 fn test_resolve_flush_reason_fallback_unchanged() {
395 assert_eq!(
396 resolve_flush_reason(None, None, true),
397 FlushReason::Downgrading
398 );
399 assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual);
400 }
401}