1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36 self.flush_scheduler.on_flush_failed(region_id, request.err);
37 debug!(
38 "Flush failed for region {}, handling stalled requests",
39 region_id
40 );
41 self.maybe_flush_worker();
43
44 self.handle_stalled_requests().await;
46 }
47
48 pub(crate) fn maybe_flush_worker(&mut self) {
51 if !self.write_buffer_manager.should_flush_engine() {
52 debug!("No need to flush worker");
53 return;
55 }
56
57 if let Err(e) = self.flush_regions_on_engine_full() {
60 error!(e; "Failed to flush worker");
61 }
62 }
63
64 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
66 let regions = self.regions.list_regions();
67 let now = self.time_provider.current_time_millis();
68 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
69 let mut pending_regions = vec![];
70
71 for region in ®ions {
72 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
73 continue;
75 }
76
77 let version = region.version();
78 let region_memtable_size =
79 version.memtables.mutable_usage() + version.memtables.immutables_usage();
80
81 if region.last_flush_millis() < min_last_flush_time {
82 let task =
84 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
85 self.flush_scheduler.schedule_flush(
86 region.region_id,
87 ®ion.version_control,
88 task,
89 )?;
90 } else if region_memtable_size > 0 {
91 pending_regions.push((region, region_memtable_size));
93 }
94 }
95 pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
96 let target_memory_usage = self.write_buffer_manager.flush_limit();
100 let mut memory_usage = self.write_buffer_manager.memory_usage();
101
102 #[cfg(test)]
103 {
104 debug!(
105 "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
106 target_memory_usage,
107 memory_usage,
108 pending_regions
109 .iter()
110 .map(|(region, mem_size)| (region.region_id, mem_size))
111 .collect::<Vec<_>>()
112 );
113 }
114 for (region, region_mem_size) in pending_regions.into_iter() {
117 if memory_usage < target_memory_usage {
119 break;
121 }
122 let task =
123 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
124 debug!("Scheduling flush task for region {}", region.region_id);
125 self.flush_scheduler
127 .schedule_flush(region.region_id, ®ion.version_control, task)?;
128 memory_usage = memory_usage.saturating_sub(region_mem_size);
130 }
131
132 Ok(())
133 }
134
135 pub(crate) fn new_flush_task(
137 &self,
138 region: &MitoRegionRef,
139 reason: FlushReason,
140 row_group_size: Option<usize>,
141 engine_config: Arc<MitoConfig>,
142 ) -> RegionFlushTask {
143 RegionFlushTask {
144 region_id: region.region_id,
145 reason,
146 senders: Vec::new(),
147 request_sender: self.sender.clone(),
148 access_layer: region.access_layer.clone(),
149 listener: self.listener.clone(),
150 engine_config,
151 row_group_size,
152 cache_manager: self.cache_manager.clone(),
153 manifest_ctx: region.manifest_ctx.clone(),
154 index_options: region.version().options.index_options.clone(),
155 flush_semaphore: self.flush_semaphore.clone(),
156 is_staging: region.is_staging(),
157 partition_expr: region.maybe_staging_partition_expr_str(),
158 }
159 }
160}
161
162impl<S: LogStore> RegionWorkerLoop<S> {
163 pub(crate) fn handle_flush_request(
165 &mut self,
166 region_id: RegionId,
167 request: RegionFlushRequest,
168 reason: Option<FlushReason>,
169 mut sender: OptionOutputTx,
170 ) {
171 let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
172 return;
173 };
174 self.update_topic_latest_entry_id(®ion);
178
179 let reason = reason.unwrap_or_else(|| {
180 if region.is_downgrading() {
181 FlushReason::Downgrading
182 } else {
183 FlushReason::Manual
184 }
185 });
186 let mut task =
187 self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
188 task.push_sender(sender);
189 if let Err(e) =
190 self.flush_scheduler
191 .schedule_flush(region.region_id, ®ion.version_control, task)
192 {
193 error!(e; "Failed to schedule flush task for region {}", region.region_id);
194 }
195 }
196
197 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
199 let regions = self.regions.list_regions();
200 let now = self.time_provider.current_time_millis();
201 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
202
203 for region in ®ions {
204 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
205 continue;
207 }
208 self.update_topic_latest_entry_id(region);
209
210 if region.last_flush_millis() < min_last_flush_time {
211 let task = self.new_flush_task(
213 region,
214 FlushReason::Periodically,
215 None,
216 self.config.clone(),
217 );
218 self.flush_scheduler.schedule_flush(
219 region.region_id,
220 ®ion.version_control,
221 task,
222 )?;
223 }
224 }
225
226 Ok(())
227 }
228
229 pub(crate) async fn handle_flush_finished(
231 &mut self,
232 region_id: RegionId,
233 mut request: FlushFinished,
234 ) {
235 self.notify_group();
238
239 let region = match self.regions.get_region(region_id) {
240 Some(region) => region,
241 None => {
242 request.on_failure(RegionNotFoundSnafu { region_id }.build());
243 return;
244 }
245 };
246
247 if request.is_staging {
248 info!(
250 "Skipping region metadata update for region {} in staging mode",
251 region_id
252 );
253 region.version_control.apply_edit(
254 None,
255 &request.memtables_to_remove,
256 region.file_purger.clone(),
257 );
258 } else {
259 region.version_control.apply_edit(
260 Some(request.edit.clone()),
261 &request.memtables_to_remove,
262 region.file_purger.clone(),
263 );
264 }
265
266 region.update_flush_millis();
267
268 info!(
270 "Region {} flush finished, tries to bump wal to {}",
271 region_id, request.flushed_entry_id
272 );
273 if let Err(e) = self
274 .wal
275 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
276 .await
277 {
278 error!(e; "Failed to write wal, region: {}", region_id);
279 request.on_failure(e);
280 return;
281 }
282
283 let flush_on_close = request.flush_reason == FlushReason::Closing;
284 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
285
286 request.on_success();
288
289 if self.config.index.build_mode == IndexBuildMode::Async {
291 self.handle_rebuild_index(
292 BuildIndexRequest {
293 region_id,
294 build_type: IndexBuildType::Flush,
295 file_metas: index_build_file_metas,
296 },
297 OptionOutputTx::new(None),
298 )
299 .await;
300 }
301
302 if flush_on_close {
303 self.remove_region(region_id).await;
306 info!("Region {} closed after flush", region_id);
307 } else {
308 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
310 self.flush_scheduler.on_flush_success(region_id)
311 {
312 self.handle_ddl_requests(&mut ddl_requests).await;
314 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
316 .await;
317 }
318 self.maybe_flush_worker();
320 self.handle_stalled_requests().await;
322 self.schedule_compaction(®ion).await;
324 }
325
326 self.listener.on_flush_success(region_id);
327 }
328
329 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
332 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
333 let latest_offset = self
334 .wal
335 .store()
336 .latest_entry_id(®ion.provider)
337 .unwrap_or(0);
338 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
339
340 if latest_offset > topic_last_entry_id {
341 region
342 .topic_latest_entry_id
343 .store(latest_offset, Ordering::Relaxed);
344 debug!(
345 "Region {} latest entry id updated to {}",
346 region.region_id, latest_offset
347 );
348 }
349 }
350 }
351}