mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    /// On region flush job failed.
35    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36        self.flush_scheduler.on_flush_failed(region_id, request.err);
37        debug!(
38            "Flush failed for region {}, handling stalled requests",
39            region_id
40        );
41        // Maybe flush worker again.
42        self.maybe_flush_worker();
43
44        // Handle stalled requests.
45        self.handle_stalled_requests().await;
46    }
47
48    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
49    /// worker to flush.
50    pub(crate) fn maybe_flush_worker(&mut self) {
51        if !self.write_buffer_manager.should_flush_engine() {
52            debug!("No need to flush worker");
53            // No need to flush worker.
54            return;
55        }
56
57        // If the engine needs flush, each worker will find some regions to flush. We might
58        // flush more memory than expect but it should be acceptable.
59        if let Err(e) = self.flush_regions_on_engine_full() {
60            error!(e; "Failed to flush worker");
61        }
62    }
63
64    /// Finds some regions to flush to reduce write buffer usage.
65    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
66        let regions = self.regions.list_regions();
67        let now = self.time_provider.current_time_millis();
68        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
69        let mut pending_regions = vec![];
70
71        for region in &regions {
72            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
73                // Already flushing or not writable.
74                continue;
75            }
76
77            let version = region.version();
78            let region_memtable_size =
79                version.memtables.mutable_usage() + version.memtables.immutables_usage();
80
81            if region.last_flush_millis() < min_last_flush_time {
82                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
83                let task =
84                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
85                self.flush_scheduler.schedule_flush(
86                    region.region_id,
87                    &region.version_control,
88                    task,
89                )?;
90            } else if region_memtable_size > 0 {
91                // We should only consider regions with memtable size > 0 to flush.
92                pending_regions.push((region, region_memtable_size));
93            }
94        }
95        pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
96        // The flush target is the mutable memtable limit (half of the global buffer).
97        // When memory is full, we aggressively flush regions until usage drops below this target,
98        // not just below the full limit.
99        let target_memory_usage = self.write_buffer_manager.flush_limit();
100        let mut memory_usage = self.write_buffer_manager.memory_usage();
101
102        #[cfg(test)]
103        {
104            debug!(
105                "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
106                target_memory_usage,
107                memory_usage,
108                pending_regions
109                    .iter()
110                    .map(|(region, mem_size)| (region.region_id, mem_size))
111                    .collect::<Vec<_>>()
112            );
113        }
114        // Iterate over pending regions in descending order of their memory size and schedule flush tasks
115        // for each region until the overall memory usage drops below the flush limit.
116        for (region, region_mem_size) in pending_regions.into_iter() {
117            // Make sure the first region is always flushed.
118            if memory_usage < target_memory_usage {
119                // Stop flushing regions if memory usage is already below the flush limit
120                break;
121            }
122            let task =
123                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
124            debug!("Scheduling flush task for region {}", region.region_id);
125            // Schedule a flush task for the current region
126            self.flush_scheduler
127                .schedule_flush(region.region_id, &region.version_control, task)?;
128            // Reduce memory usage by the region's size, ensuring it doesn't go negative
129            memory_usage = memory_usage.saturating_sub(region_mem_size);
130        }
131
132        Ok(())
133    }
134
135    /// Creates a flush task with specific `reason` for the `region`.
136    pub(crate) fn new_flush_task(
137        &self,
138        region: &MitoRegionRef,
139        reason: FlushReason,
140        row_group_size: Option<usize>,
141        engine_config: Arc<MitoConfig>,
142    ) -> RegionFlushTask {
143        RegionFlushTask {
144            region_id: region.region_id,
145            reason,
146            senders: Vec::new(),
147            request_sender: self.sender.clone(),
148            access_layer: region.access_layer.clone(),
149            listener: self.listener.clone(),
150            engine_config,
151            row_group_size,
152            cache_manager: self.cache_manager.clone(),
153            manifest_ctx: region.manifest_ctx.clone(),
154            index_options: region.version().options.index_options.clone(),
155            flush_semaphore: self.flush_semaphore.clone(),
156            is_staging: region.is_staging(),
157            partition_expr: region.maybe_staging_partition_expr_str(),
158        }
159    }
160}
161
162impl<S: LogStore> RegionWorkerLoop<S> {
163    /// Handles manual flush request.
164    pub(crate) fn handle_flush_request(
165        &mut self,
166        region_id: RegionId,
167        request: RegionFlushRequest,
168        mut sender: OptionOutputTx,
169    ) {
170        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
171            return;
172        };
173        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
174        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
175        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
176        self.update_topic_latest_entry_id(&region);
177
178        let reason = if region.is_downgrading() {
179            FlushReason::Downgrading
180        } else {
181            FlushReason::Manual
182        };
183        let mut task =
184            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
185        task.push_sender(sender);
186        if let Err(e) =
187            self.flush_scheduler
188                .schedule_flush(region.region_id, &region.version_control, task)
189        {
190            error!(e; "Failed to schedule flush task for region {}", region.region_id);
191        }
192    }
193
194    /// Flushes regions periodically.
195    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
196        let regions = self.regions.list_regions();
197        let now = self.time_provider.current_time_millis();
198        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
199
200        for region in &regions {
201            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
202                // Already flushing or not writable.
203                continue;
204            }
205            self.update_topic_latest_entry_id(region);
206
207            if region.last_flush_millis() < min_last_flush_time {
208                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
209                let task = self.new_flush_task(
210                    region,
211                    FlushReason::Periodically,
212                    None,
213                    self.config.clone(),
214                );
215                self.flush_scheduler.schedule_flush(
216                    region.region_id,
217                    &region.version_control,
218                    task,
219                )?;
220            }
221        }
222
223        Ok(())
224    }
225
226    /// On region flush job finished.
227    pub(crate) async fn handle_flush_finished(
228        &mut self,
229        region_id: RegionId,
230        mut request: FlushFinished,
231    ) {
232        // Notifies other workers. Even the remaining steps of this method fail we still
233        // wake up other workers as we have released some memory by flush.
234        self.notify_group();
235
236        let region = match self.regions.get_region(region_id) {
237            Some(region) => region,
238            None => {
239                request.on_failure(RegionNotFoundSnafu { region_id }.build());
240                return;
241            }
242        };
243
244        if request.is_staging {
245            // Skip the region metadata update.
246            info!(
247                "Skipping region metadata update for region {} in staging mode",
248                region_id
249            );
250            region.version_control.apply_edit(
251                None,
252                &request.memtables_to_remove,
253                region.file_purger.clone(),
254            );
255        } else {
256            region.version_control.apply_edit(
257                Some(request.edit.clone()),
258                &request.memtables_to_remove,
259                region.file_purger.clone(),
260            );
261        }
262
263        region.update_flush_millis();
264
265        // Delete wal.
266        info!(
267            "Region {} flush finished, tries to bump wal to {}",
268            region_id, request.flushed_entry_id
269        );
270        if let Err(e) = self
271            .wal
272            .obsolete(region_id, request.flushed_entry_id, &region.provider)
273            .await
274        {
275            error!(e; "Failed to write wal, region: {}", region_id);
276            request.on_failure(e);
277            return;
278        }
279
280        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
281
282        // Notifies waiters and observes the flush timer.
283        request.on_success();
284
285        // In async mode, create indexes after flush.
286        if self.config.index.build_mode == IndexBuildMode::Async {
287            self.handle_rebuild_index(
288                BuildIndexRequest {
289                    region_id,
290                    build_type: IndexBuildType::Flush,
291                    file_metas: index_build_file_metas,
292                },
293                OptionOutputTx::new(None),
294            )
295            .await;
296        }
297
298        // Handle pending requests for the region.
299        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
300            self.flush_scheduler.on_flush_success(region_id)
301        {
302            // Perform DDLs first because they require empty memtables.
303            self.handle_ddl_requests(&mut ddl_requests).await;
304            // Handle pending write requests, we don't stall these requests.
305            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
306                .await;
307        }
308
309        // Maybe flush worker again.
310        self.maybe_flush_worker();
311
312        // Handle stalled requests.
313        self.handle_stalled_requests().await;
314
315        // Schedules compaction.
316        self.schedule_compaction(&region).await;
317
318        self.listener.on_flush_success(region_id);
319    }
320
321    /// Updates the latest entry id since flush of the region.
322    /// **This is only used for remote WAL pruning.**
323    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
324        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
325            let latest_offset = self
326                .wal
327                .store()
328                .latest_entry_id(&region.provider)
329                .unwrap_or(0);
330            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
331
332            if latest_offset > topic_last_entry_id {
333                region
334                    .topic_latest_entry_id
335                    .store(latest_offset, Ordering::Relaxed);
336                debug!(
337                    "Region {} latest entry id updated to {}",
338                    region.region_id, latest_offset
339                );
340            }
341        }
342    }
343}