1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33fn resolve_flush_reason(
34 request_reason: Option<RegionFlushReason>,
35 is_downgrading: bool,
36) -> FlushReason {
37 match request_reason {
38 Some(reason) => FlushReason::from(reason),
39 None if is_downgrading => FlushReason::Downgrading,
40 None => FlushReason::Manual,
41 }
42}
43
44impl<S: LogStore> RegionWorkerLoop<S> {
45 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
47 self.flush_scheduler.on_flush_failed(region_id, request.err);
48 debug!(
49 "Flush failed for region {}, handling stalled requests",
50 region_id
51 );
52 self.maybe_flush_worker();
54
55 self.handle_stalled_requests().await;
57 }
58
59 pub(crate) fn maybe_flush_worker(&mut self) {
62 if !self.write_buffer_manager.should_flush_engine() {
63 debug!("No need to flush worker");
64 return;
66 }
67
68 if let Err(e) = self.flush_regions_on_engine_full() {
71 error!(e; "Failed to flush worker");
72 }
73 }
74
75 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
77 let regions = self.regions.list_regions();
78 let now = self.time_provider.current_time_millis();
79 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
80 let mut pending_regions = vec![];
81
82 for region in ®ions {
83 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
84 continue;
86 }
87
88 let version = region.version();
89 let region_memtable_size =
90 version.memtables.mutable_usage() + version.memtables.immutables_usage();
91
92 if region.last_flush_millis() < min_last_flush_time {
93 let task =
95 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
96 self.flush_scheduler.schedule_flush(
97 region.region_id,
98 ®ion.version_control,
99 task,
100 )?;
101 } else if region_memtable_size > 0 {
102 pending_regions.push((region, region_memtable_size));
104 }
105 }
106 pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
107 let target_memory_usage = self.write_buffer_manager.flush_limit();
111 let mut memory_usage = self.write_buffer_manager.memory_usage();
112
113 #[cfg(test)]
114 {
115 debug!(
116 "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
117 target_memory_usage,
118 memory_usage,
119 pending_regions
120 .iter()
121 .map(|(region, mem_size)| (region.region_id, mem_size))
122 .collect::<Vec<_>>()
123 );
124 }
125 for (region, region_mem_size) in pending_regions.into_iter() {
128 if memory_usage < target_memory_usage {
130 break;
132 }
133 let task =
134 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
135 debug!("Scheduling flush task for region {}", region.region_id);
136 self.flush_scheduler
138 .schedule_flush(region.region_id, ®ion.version_control, task)?;
139 memory_usage = memory_usage.saturating_sub(region_mem_size);
141 }
142
143 Ok(())
144 }
145
146 pub(crate) fn new_flush_task(
148 &self,
149 region: &MitoRegionRef,
150 reason: FlushReason,
151 row_group_size: Option<usize>,
152 engine_config: Arc<MitoConfig>,
153 ) -> RegionFlushTask {
154 RegionFlushTask {
155 region_id: region.region_id,
156 reason,
157 senders: Vec::new(),
158 request_sender: self.sender.clone(),
159 access_layer: region.access_layer.clone(),
160 listener: self.listener.clone(),
161 engine_config,
162 row_group_size,
163 cache_manager: self.cache_manager.clone(),
164 manifest_ctx: region.manifest_ctx.clone(),
165 index_options: region.version().options.index_options.clone(),
166 flush_semaphore: self.flush_semaphore.clone(),
167 is_staging: region.is_staging(),
168 partition_expr: region.maybe_staging_partition_expr_str(),
169 }
170 }
171}
172
173impl<S: LogStore> RegionWorkerLoop<S> {
174 pub(crate) fn handle_flush_request(
176 &mut self,
177 region_id: RegionId,
178 request: RegionFlushRequest,
179 sender: OptionOutputTx,
180 ) {
181 let region = match self.regions.flushable_region(region_id) {
182 Ok(region) => region,
183 Err(e) => {
184 sender.send(Err(e));
185 return;
186 }
187 };
188
189 self.update_topic_latest_entry_id(®ion);
193
194 let reason = resolve_flush_reason(request.reason, region.is_downgrading());
195 let mut task =
196 self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
197 task.push_sender(sender);
198 if let Err(e) =
199 self.flush_scheduler
200 .schedule_flush(region.region_id, ®ion.version_control, task)
201 {
202 error!(e; "Failed to schedule flush task for region {}", region.region_id);
203 }
204 }
205
206 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
208 let regions = self.regions.list_regions();
209 let now = self.time_provider.current_time_millis();
210 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
211
212 for region in ®ions {
213 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
214 continue;
216 }
217 self.update_topic_latest_entry_id(region);
218
219 if region.last_flush_millis() < min_last_flush_time {
220 let task = self.new_flush_task(
222 region,
223 FlushReason::Periodically,
224 None,
225 self.config.clone(),
226 );
227 self.flush_scheduler.schedule_flush(
228 region.region_id,
229 ®ion.version_control,
230 task,
231 )?;
232 }
233 }
234
235 Ok(())
236 }
237
238 pub(crate) async fn handle_flush_finished(
240 &mut self,
241 region_id: RegionId,
242 mut request: FlushFinished,
243 ) {
244 self.notify_group();
247
248 let region = match self.regions.get_region(region_id) {
249 Some(region) => region,
250 None => {
251 request.on_failure(RegionNotFoundSnafu { region_id }.build());
252 return;
253 }
254 };
255
256 if request.is_staging {
257 info!(
259 "Skipping region metadata update for region {} in staging mode",
260 region_id
261 );
262 region.version_control.apply_edit(
263 None,
264 &request.memtables_to_remove,
265 region.file_purger.clone(),
266 );
267 } else {
268 region.version_control.apply_edit(
269 Some(request.edit.clone()),
270 &request.memtables_to_remove,
271 region.file_purger.clone(),
272 );
273 }
274
275 region.update_flush_millis();
276
277 info!(
279 "Region {} flush finished, tries to bump wal to {}",
280 region_id, request.flushed_entry_id
281 );
282 if let Err(e) = self
283 .wal
284 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
285 .await
286 {
287 error!(e; "Failed to write wal, region: {}", region_id);
288 request.on_failure(e);
289 return;
290 }
291
292 let flush_on_close = request.flush_reason == FlushReason::Closing;
293 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
294
295 if self.config.index.build_mode == IndexBuildMode::Async {
297 self.handle_rebuild_index(
298 BuildIndexRequest {
299 region_id,
300 build_type: IndexBuildType::Flush,
301 file_metas: index_build_file_metas,
302 },
303 OptionOutputTx::new(None),
304 )
305 .await;
306 }
307
308 if flush_on_close {
309 self.remove_region(region_id).await;
312 info!("Region {} closed after flush", region_id);
313 request.on_success();
314 } else {
315 request.on_success();
317 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
319 self.flush_scheduler.on_flush_success(region_id)
320 {
321 self.handle_ddl_requests(&mut ddl_requests).await;
323 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
325 .await;
326 }
327 self.maybe_flush_worker();
329 self.handle_stalled_requests().await;
331 self.schedule_compaction(®ion).await;
333 }
334
335 self.listener.on_flush_success(region_id);
336 }
337
338 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
341 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
342 let latest_offset = self
343 .wal
344 .store()
345 .latest_entry_id(®ion.provider)
346 .unwrap_or(0);
347 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
348
349 if latest_offset > topic_last_entry_id {
350 region
351 .topic_latest_entry_id
352 .store(latest_offset, Ordering::Relaxed);
353 debug!(
354 "Region {} latest entry id updated to {}",
355 region.region_id, latest_offset
356 );
357 }
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_resolve_flush_reason_uses_request_reason() {
368 assert_eq!(
369 resolve_flush_reason(Some(RegionFlushReason::RegionMigration), true),
370 FlushReason::RegionMigration
371 );
372 assert_eq!(
373 resolve_flush_reason(Some(RegionFlushReason::Repartition), false),
374 FlushReason::Repartition
375 );
376 assert_eq!(
377 resolve_flush_reason(Some(RegionFlushReason::RemoteWalPrune), false),
378 FlushReason::RemoteWalPrune
379 );
380 assert_eq!(
381 resolve_flush_reason(Some(RegionFlushReason::Closing), false),
382 FlushReason::Closing
383 );
384 assert_eq!(
385 resolve_flush_reason(Some(RegionFlushReason::Downgrading), false),
386 FlushReason::Downgrading
387 );
388 }
389
390 #[test]
391 fn test_resolve_flush_reason_fallback_unchanged() {
392 assert_eq!(resolve_flush_reason(None, true), FlushReason::Downgrading);
393 assert_eq!(resolve_flush_reason(None, false), FlushReason::Manual);
394 }
395}