mito2/worker/
handle_flush.rs1use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::MitoConfig;
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
35 self.flush_scheduler.on_flush_failed(region_id, request.err);
36 }
37
38 pub(crate) fn maybe_flush_worker(&mut self) {
41 if !self.write_buffer_manager.should_flush_engine() {
42 return;
44 }
45
46 if let Err(e) = self.flush_regions_on_engine_full() {
49 error!(e; "Failed to flush worker");
50 }
51 }
52
53 fn flush_regions_on_engine_full(&mut self) -> Result<()> {
55 let regions = self.regions.list_regions();
56 let now = self.time_provider.current_time_millis();
57 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
58 let mut max_mutable_size = 0;
59 let mut max_mem_region = None;
61
62 for region in ®ions {
63 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
64 continue;
66 }
67
68 let version = region.version();
69 let region_mutable_size = version.memtables.mutable_usage();
70 if region_mutable_size > max_mutable_size {
72 max_mem_region = Some(region);
73 max_mutable_size = region_mutable_size;
74 }
75
76 if region.last_flush_millis() < min_last_flush_time {
77 let task =
79 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
80 self.flush_scheduler.schedule_flush(
81 region.region_id,
82 ®ion.version_control,
83 task,
84 )?;
85 }
86 }
87
88 if let Some(region) = max_mem_region
91 && !self.flush_scheduler.is_flush_requested(region.region_id)
92 {
93 let task =
94 self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
95 self.flush_scheduler
96 .schedule_flush(region.region_id, ®ion.version_control, task)?;
97 }
98
99 Ok(())
100 }
101
102 pub(crate) fn new_flush_task(
104 &self,
105 region: &MitoRegionRef,
106 reason: FlushReason,
107 row_group_size: Option<usize>,
108 engine_config: Arc<MitoConfig>,
109 ) -> RegionFlushTask {
110 RegionFlushTask {
111 region_id: region.region_id,
112 reason,
113 senders: Vec::new(),
114 request_sender: self.sender.clone(),
115 access_layer: region.access_layer.clone(),
116 listener: self.listener.clone(),
117 engine_config,
118 row_group_size,
119 cache_manager: self.cache_manager.clone(),
120 manifest_ctx: region.manifest_ctx.clone(),
121 index_options: region.version().options.index_options.clone(),
122 flush_semaphore: self.flush_semaphore.clone(),
123 }
124 }
125}
126
127impl<S: LogStore> RegionWorkerLoop<S> {
128 pub(crate) async fn handle_flush_request(
130 &mut self,
131 region_id: RegionId,
132 request: RegionFlushRequest,
133 mut sender: OptionOutputTx,
134 ) {
135 let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
136 return;
137 };
138 self.update_topic_latest_entry_id(®ion);
142
143 let reason = if region.is_downgrading() {
144 FlushReason::Downgrading
145 } else {
146 FlushReason::Manual
147 };
148
149 let mut task =
150 self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
151 task.push_sender(sender);
152 if let Err(e) =
153 self.flush_scheduler
154 .schedule_flush(region.region_id, ®ion.version_control, task)
155 {
156 error!(e; "Failed to schedule flush task for region {}", region.region_id);
157 }
158 }
159
160 pub(crate) fn flush_periodically(&mut self) -> Result<()> {
162 let regions = self.regions.list_regions();
163 let now = self.time_provider.current_time_millis();
164 let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
165
166 for region in ®ions {
167 if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
168 continue;
170 }
171 self.update_topic_latest_entry_id(region);
172
173 if region.last_flush_millis() < min_last_flush_time {
174 let task = self.new_flush_task(
176 region,
177 FlushReason::Periodically,
178 None,
179 self.config.clone(),
180 );
181 self.flush_scheduler.schedule_flush(
182 region.region_id,
183 ®ion.version_control,
184 task,
185 )?;
186 }
187 }
188
189 Ok(())
190 }
191
192 pub(crate) async fn handle_flush_finished(
194 &mut self,
195 region_id: RegionId,
196 mut request: FlushFinished,
197 ) {
198 self.notify_group();
201
202 let region = match self.regions.get_region(region_id) {
203 Some(region) => region,
204 None => {
205 request.on_failure(RegionNotFoundSnafu { region_id }.build());
206 return;
207 }
208 };
209
210 let is_staging = region.manifest_ctx.current_state()
212 == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
213
214 if is_staging {
215 info!(
216 "Skipping region metadata update for region {} in staging mode",
217 region_id
218 );
219 region.version_control.apply_edit(
220 None,
221 &request.memtables_to_remove,
222 region.file_purger.clone(),
223 );
224 } else {
225 region.version_control.apply_edit(
226 Some(request.edit.clone()),
227 &request.memtables_to_remove,
228 region.file_purger.clone(),
229 );
230 }
231
232 region.update_flush_millis();
233
234 info!(
236 "Region {} flush finished, tries to bump wal to {}",
237 region_id, request.flushed_entry_id
238 );
239 if let Err(e) = self
240 .wal
241 .obsolete(region_id, request.flushed_entry_id, ®ion.provider)
242 .await
243 {
244 error!(e; "Failed to write wal, region: {}", region_id);
245 request.on_failure(e);
246 return;
247 }
248
249 request.on_success();
251
252 if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
254 self.flush_scheduler.on_flush_success(region_id)
255 {
256 self.handle_ddl_requests(&mut ddl_requests).await;
258 self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
260 .await;
261 }
262
263 self.handle_stalled_requests().await;
265
266 self.schedule_compaction(®ion).await;
268
269 self.listener.on_flush_success(region_id);
270 }
271
272 pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
275 if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
276 let latest_offset = self
277 .wal
278 .store()
279 .latest_entry_id(®ion.provider)
280 .unwrap_or(0);
281 let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
282
283 if latest_offset > topic_last_entry_id {
284 region
285 .topic_latest_entry_id
286 .store(latest_offset, Ordering::Relaxed);
287 debug!(
288 "Region {} latest entry id updated to {}",
289 region.region_id, latest_offset
290 );
291 }
292 }
293 }
294}