Skip to main content

mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33fn resolve_flush_reason(
34    explicit_reason: Option<FlushReason>,
35    request_reason: Option<RegionFlushReason>,
36    is_downgrading: bool,
37) -> FlushReason {
38    explicit_reason
39        .or_else(|| request_reason.map(FlushReason::from))
40        .unwrap_or({
41            if is_downgrading {
42                FlushReason::Downgrading
43            } else {
44                FlushReason::Manual
45            }
46        })
47}
48
49impl<S: LogStore> RegionWorkerLoop<S> {
50    /// On region flush job failed.
51    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
52        self.flush_scheduler.on_flush_failed(region_id, request.err);
53        debug!(
54            "Flush failed for region {}, handling stalled requests",
55            region_id
56        );
57        // Maybe flush worker again.
58        self.maybe_flush_worker();
59
60        // Handle stalled requests.
61        self.handle_stalled_requests().await;
62    }
63
64    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
65    /// worker to flush.
66    pub(crate) fn maybe_flush_worker(&mut self) {
67        if !self.write_buffer_manager.should_flush_engine() {
68            debug!("No need to flush worker");
69            // No need to flush worker.
70            return;
71        }
72
73        // If the engine needs flush, each worker will find some regions to flush. We might
74        // flush more memory than expect but it should be acceptable.
75        if let Err(e) = self.flush_regions_on_engine_full() {
76            error!(e; "Failed to flush worker");
77        }
78    }
79
80    /// Finds some regions to flush to reduce write buffer usage.
81    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
82        let regions = self.regions.list_regions();
83        let now = self.time_provider.current_time_millis();
84        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
85        let mut pending_regions = vec![];
86
87        for region in &regions {
88            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
89                // Already flushing or not writable.
90                continue;
91            }
92
93            let version = region.version();
94            let region_memtable_size =
95                version.memtables.mutable_usage() + version.memtables.immutables_usage();
96
97            if region.last_flush_millis() < min_last_flush_time {
98                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
99                let task =
100                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
101                self.flush_scheduler.schedule_flush(
102                    region.region_id,
103                    &region.version_control,
104                    task,
105                )?;
106            } else if region_memtable_size > 0 {
107                // We should only consider regions with memtable size > 0 to flush.
108                pending_regions.push((region, region_memtable_size));
109            }
110        }
111        pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
112        // The flush target is the mutable memtable limit (half of the global buffer).
113        // When memory is full, we aggressively flush regions until usage drops below this target,
114        // not just below the full limit.
115        let target_memory_usage = self.write_buffer_manager.flush_limit();
116        let mut memory_usage = self.write_buffer_manager.memory_usage();
117
118        #[cfg(test)]
119        {
120            debug!(
121                "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
122                target_memory_usage,
123                memory_usage,
124                pending_regions
125                    .iter()
126                    .map(|(region, mem_size)| (region.region_id, mem_size))
127                    .collect::<Vec<_>>()
128            );
129        }
130        // Iterate over pending regions in descending order of their memory size and schedule flush tasks
131        // for each region until the overall memory usage drops below the flush limit.
132        for (region, region_mem_size) in pending_regions.into_iter() {
133            // Make sure the first region is always flushed.
134            if memory_usage < target_memory_usage {
135                // Stop flushing regions if memory usage is already below the flush limit
136                break;
137            }
138            let task =
139                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
140            debug!("Scheduling flush task for region {}", region.region_id);
141            // Schedule a flush task for the current region
142            self.flush_scheduler
143                .schedule_flush(region.region_id, &region.version_control, task)?;
144            // Reduce memory usage by the region's size, ensuring it doesn't go negative
145            memory_usage = memory_usage.saturating_sub(region_mem_size);
146        }
147
148        Ok(())
149    }
150
151    /// Creates a flush task with specific `reason` for the `region`.
152    pub(crate) fn new_flush_task(
153        &self,
154        region: &MitoRegionRef,
155        reason: FlushReason,
156        row_group_size: Option<usize>,
157        engine_config: Arc<MitoConfig>,
158    ) -> RegionFlushTask {
159        RegionFlushTask {
160            region_id: region.region_id,
161            reason,
162            senders: Vec::new(),
163            request_sender: self.sender.clone(),
164            access_layer: region.access_layer.clone(),
165            listener: self.listener.clone(),
166            engine_config,
167            row_group_size,
168            cache_manager: self.cache_manager.clone(),
169            manifest_ctx: region.manifest_ctx.clone(),
170            index_options: region.version().options.index_options.clone(),
171            flush_semaphore: self.flush_semaphore.clone(),
172            is_staging: region.is_staging(),
173            partition_expr: region.maybe_staging_partition_expr_str(),
174        }
175    }
176}
177
178impl<S: LogStore> RegionWorkerLoop<S> {
179    /// Handles manual flush request.
180    pub(crate) fn handle_flush_request(
181        &mut self,
182        region_id: RegionId,
183        request: RegionFlushRequest,
184        reason: Option<FlushReason>,
185        mut sender: OptionOutputTx,
186    ) {
187        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
188            return;
189        };
190        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
191        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
192        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
193        self.update_topic_latest_entry_id(&region);
194
195        let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading());
196        let mut task =
197            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
198        task.push_sender(sender);
199        if let Err(e) =
200            self.flush_scheduler
201                .schedule_flush(region.region_id, &region.version_control, task)
202        {
203            error!(e; "Failed to schedule flush task for region {}", region.region_id);
204        }
205    }
206
207    /// Flushes regions periodically.
208    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
209        let regions = self.regions.list_regions();
210        let now = self.time_provider.current_time_millis();
211        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
212
213        for region in &regions {
214            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
215                // Already flushing or not writable.
216                continue;
217            }
218            self.update_topic_latest_entry_id(region);
219
220            if region.last_flush_millis() < min_last_flush_time {
221                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
222                let task = self.new_flush_task(
223                    region,
224                    FlushReason::Periodically,
225                    None,
226                    self.config.clone(),
227                );
228                self.flush_scheduler.schedule_flush(
229                    region.region_id,
230                    &region.version_control,
231                    task,
232                )?;
233            }
234        }
235
236        Ok(())
237    }
238
239    /// On region flush job finished.
240    pub(crate) async fn handle_flush_finished(
241        &mut self,
242        region_id: RegionId,
243        mut request: FlushFinished,
244    ) {
245        // Notifies other workers. Even the remaining steps of this method fail we still
246        // wake up other workers as we have released some memory by flush.
247        self.notify_group();
248
249        let region = match self.regions.get_region(region_id) {
250            Some(region) => region,
251            None => {
252                request.on_failure(RegionNotFoundSnafu { region_id }.build());
253                return;
254            }
255        };
256
257        if request.is_staging {
258            // Skip the region metadata update.
259            info!(
260                "Skipping region metadata update for region {} in staging mode",
261                region_id
262            );
263            region.version_control.apply_edit(
264                None,
265                &request.memtables_to_remove,
266                region.file_purger.clone(),
267            );
268        } else {
269            region.version_control.apply_edit(
270                Some(request.edit.clone()),
271                &request.memtables_to_remove,
272                region.file_purger.clone(),
273            );
274        }
275
276        region.update_flush_millis();
277
278        // Delete wal.
279        info!(
280            "Region {} flush finished, tries to bump wal to {}",
281            region_id, request.flushed_entry_id
282        );
283        if let Err(e) = self
284            .wal
285            .obsolete(region_id, request.flushed_entry_id, &region.provider)
286            .await
287        {
288            error!(e; "Failed to write wal, region: {}", region_id);
289            request.on_failure(e);
290            return;
291        }
292
293        let flush_on_close = request.flush_reason == FlushReason::Closing;
294        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
295
296        // In async mode, create indexes after flush.
297        if self.config.index.build_mode == IndexBuildMode::Async {
298            self.handle_rebuild_index(
299                BuildIndexRequest {
300                    region_id,
301                    build_type: IndexBuildType::Flush,
302                    file_metas: index_build_file_metas,
303                },
304                OptionOutputTx::new(None),
305            )
306            .await;
307        }
308
309        if flush_on_close {
310            // Remove region from server for flush on closing,
311            // no need to handle requests and schedule compactions.
312            self.remove_region(region_id).await;
313            info!("Region {} closed after flush", region_id);
314            request.on_success();
315        } else {
316            // Notifies waiters and observes the flush timer.
317            request.on_success();
318            // Handle pending requests for the region.
319            if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
320                self.flush_scheduler.on_flush_success(region_id)
321            {
322                // Perform DDLs first because they require empty memtables.
323                self.handle_ddl_requests(&mut ddl_requests).await;
324                // Handle pending write requests, we don't stall these requests.
325                self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
326                    .await;
327            }
328            // Maybe flush worker again.
329            self.maybe_flush_worker();
330            // Handle stalled requests.
331            self.handle_stalled_requests().await;
332            // Schedules compaction.
333            self.schedule_compaction(&region).await;
334        }
335
336        self.listener.on_flush_success(region_id);
337    }
338
339    /// Updates the latest entry id since flush of the region.
340    /// **This is only used for remote WAL pruning.**
341    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
342        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
343            let latest_offset = self
344                .wal
345                .store()
346                .latest_entry_id(&region.provider)
347                .unwrap_or(0);
348            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
349
350            if latest_offset > topic_last_entry_id {
351                region
352                    .topic_latest_entry_id
353                    .store(latest_offset, Ordering::Relaxed);
354                debug!(
355                    "Region {} latest entry id updated to {}",
356                    region.region_id, latest_offset
357                );
358            }
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_resolve_flush_reason_prefers_explicit_reason() {
369        let reason = resolve_flush_reason(
370            Some(FlushReason::Closing),
371            Some(RegionFlushReason::RemoteWalPrune),
372            true,
373        );
374        assert_eq!(reason, FlushReason::Closing);
375    }
376
377    #[test]
378    fn test_resolve_flush_reason_uses_request_reason() {
379        assert_eq!(
380            resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true),
381            FlushReason::RegionMigration
382        );
383        assert_eq!(
384            resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false),
385            FlushReason::Repartition
386        );
387        assert_eq!(
388            resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false),
389            FlushReason::RemoteWalPrune
390        );
391    }
392
393    #[test]
394    fn test_resolve_flush_reason_fallback_unchanged() {
395        assert_eq!(
396            resolve_flush_reason(None, None, true),
397            FlushReason::Downgrading
398        );
399        assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual);
400    }
401}