1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36 self.flush_scheduler.on_flush_failed(region_id, request.err);
37 }
38
39 pub(crate) fn maybe_flush_worker(&mut self) {
42 if !self.write_buffer_manager.should_flush_engine() {
43 return;
45 }
46
47 if let Err(e) = self.flush_regions_on_engine_full() {
50 error!(e; "Failed to flush worker");
51 }
52 }
53
54 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
56 let regions = self.regions.list_regions();
57 let now = self.time_provider.current_time_millis();
58 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
59 let mut max_mutable_size = 0;
60 let mut max_mem_region = None;
62
63 for region in ®ions {
64 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
65 continue;
67 }
68
69 let version = region.version();
70 let region_mutable_size = version.memtables.mutable_usage();
71 if region_mutable_size > max_mutable_size {
73 max_mem_region = Some(region);
74 max_mutable_size = region_mutable_size;
75 }
76
77 if region.last_flush_millis() < min_last_flush_time {
78 let task =
80 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
81 self.flush_scheduler.schedule_flush(
82 region.region_id,
83 ®ion.version_control,
84 task,
85 )?;
86 }
87 }
88
89 if let Some(region) = max_mem_region
92 && !self.flush_scheduler.is_flush_requested(region.region_id)
93 {
94 let task =
95 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
96 self.flush_scheduler
97 .schedule_flush(region.region_id, ®ion.version_control, task)?;
98 }
99
100 Ok(())
101 }
102
103 pub(crate) fn new_flush_task(
105 &self,
106 region: &MitoRegionRef,
107 reason: FlushReason,
108 row_group_size: Option<usize>,
109 engine_config: Arc<MitoConfig>,
110 ) -> RegionFlushTask {
111 RegionFlushTask {
112 region_id: region.region_id,
113 reason,
114 senders: Vec::new(),
115 request_sender: self.sender.clone(),
116 access_layer: region.access_layer.clone(),
117 listener: self.listener.clone(),
118 engine_config,
119 row_group_size,
120 cache_manager: self.cache_manager.clone(),
121 manifest_ctx: region.manifest_ctx.clone(),
122 index_options: region.version().options.index_options.clone(),
123 flush_semaphore: self.flush_semaphore.clone(),
124 }
125 }
126}
127
128impl<S: LogStore> RegionWorkerLoop<S> {
129 pub(crate) async fn handle_flush_request(
131 &mut self,
132 region_id: RegionId,
133 request: RegionFlushRequest,
134 mut sender: OptionOutputTx,
135 ) {
136 let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
137 return;
138 };
139 self.update_topic_latest_entry_id(®ion);
143
144 let reason = if region.is_downgrading() {
145 FlushReason::Downgrading
146 } else {
147 FlushReason::Manual
148 };
149
150 let mut task =
151 self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
152 task.push_sender(sender);
153 if let Err(e) =
154 self.flush_scheduler
155 .schedule_flush(region.region_id, ®ion.version_control, task)
156 {
157 error!(e; "Failed to schedule flush task for region {}", region.region_id);
158 }
159 }
160
161 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
163 let regions = self.regions.list_regions();
164 let now = self.time_provider.current_time_millis();
165 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
166
167 for region in ®ions {
168 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
169 continue;
171 }
172 self.update_topic_latest_entry_id(region);
173
174 if region.last_flush_millis() < min_last_flush_time {
175 let task = self.new_flush_task(
177 region,
178 FlushReason::Periodically,
179 None,
180 self.config.clone(),
181 );
182 self.flush_scheduler.schedule_flush(
183 region.region_id,
184 ®ion.version_control,
185 task,
186 )?;
187 }
188 }
189
190 Ok(())
191 }
192
193 pub(crate) async fn handle_flush_finished(
195 &mut self,
196 region_id: RegionId,
197 mut request: FlushFinished,
198 ) {
199 self.notify_group();
202
203 let region = match self.regions.get_region(region_id) {
204 Some(region) => region,
205 None => {
206 request.on_failure(RegionNotFoundSnafu { region_id }.build());
207 return;
208 }
209 };
210
211 let is_staging = region.manifest_ctx.current_state()
213 == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
214
215 if is_staging {
216 info!(
217 "Skipping region metadata update for region {} in staging mode",
218 region_id
219 );
220 region.version_control.apply_edit(
221 None,
222 &request.memtables_to_remove,
223 region.file_purger.clone(),
224 );
225 } else {
226 region.version_control.apply_edit(
227 Some(request.edit.clone()),
228 &request.memtables_to_remove,
229 region.file_purger.clone(),
230 );
231 }
232
233 region.update_flush_millis();
234
235 info!(
237 "Region {} flush finished, tries to bump wal to {}",
238 region_id, request.flushed_entry_id
239 );
240 if let Err(e) = self
241 .wal
242 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
243 .await
244 {
245 error!(e; "Failed to write wal, region: {}", region_id);
246 request.on_failure(e);
247 return;
248 }
249
250 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
251
252 request.on_success();
254
255 if self.config.index.build_mode == IndexBuildMode::Async {
257 self.handle_rebuild_index(
258 BuildIndexRequest {
259 region_id,
260 build_type: IndexBuildType::Flush,
261 file_metas: index_build_file_metas,
262 },
263 OptionOutputTx::new(None),
264 )
265 .await;
266 }
267
268 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
270 self.flush_scheduler.on_flush_success(region_id)
271 {
272 self.handle_ddl_requests(&mut ddl_requests).await;
274 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
276 .await;
277 }
278
279 self.handle_stalled_requests().await;
281
282 self.schedule_compaction(®ion).await;
284
285 self.listener.on_flush_success(region_id);
286 }
287
288 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
291 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
292 let latest_offset = self
293 .wal
294 .store()
295 .latest_entry_id(®ion.provider)
296 .unwrap_or(0);
297 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
298
299 if latest_offset > topic_last_entry_id {
300 region
301 .topic_latest_entry_id
302 .store(latest_offset, Ordering::Relaxed);
303 debug!(
304 "Region {} latest entry id updated to {}",
305 region.region_id, latest_offset
306 );
307 }
308 }
309 }
310}