mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    /// On region flush job failed.
35    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36        self.flush_scheduler.on_flush_failed(region_id, request.err);
37        debug!(
38            "Flush failed for region {}, handling stalled requests",
39            region_id
40        );
41        // Maybe flush worker again.
42        self.maybe_flush_worker();
43
44        // Handle stalled requests.
45        self.handle_stalled_requests().await;
46    }
47
48    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
49    /// worker to flush.
50    pub(crate) fn maybe_flush_worker(&mut self) {
51        if !self.write_buffer_manager.should_flush_engine() {
52            debug!("No need to flush worker");
53            // No need to flush worker.
54            return;
55        }
56
57        // If the engine needs flush, each worker will find some regions to flush. We might
58        // flush more memory than expect but it should be acceptable.
59        if let Err(e) = self.flush_regions_on_engine_full() {
60            error!(e; "Failed to flush worker");
61        }
62    }
63
64    /// Finds some regions to flush to reduce write buffer usage.
65    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
66        let regions = self.regions.list_regions();
67        let now = self.time_provider.current_time_millis();
68        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
69        let mut pending_regions = vec![];
70
71        for region in &regions {
72            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
73                // Already flushing or not writable.
74                continue;
75            }
76
77            let version = region.version();
78            let region_memtable_size =
79                version.memtables.mutable_usage() + version.memtables.immutables_usage();
80
81            if region.last_flush_millis() < min_last_flush_time {
82                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
83                let task =
84                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
85                self.flush_scheduler.schedule_flush(
86                    region.region_id,
87                    &region.version_control,
88                    task,
89                )?;
90            } else if region_memtable_size > 0 {
91                // We should only consider regions with memtable size > 0 to flush.
92                pending_regions.push((region, region_memtable_size));
93            }
94        }
95        pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
96        // The flush target is the mutable memtable limit (half of the global buffer).
97        // When memory is full, we aggressively flush regions until usage drops below this target,
98        // not just below the full limit.
99        let target_memory_usage = self.write_buffer_manager.flush_limit();
100        let mut memory_usage = self.write_buffer_manager.memory_usage();
101
102        #[cfg(test)]
103        {
104            debug!(
105                "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
106                target_memory_usage,
107                memory_usage,
108                pending_regions
109                    .iter()
110                    .map(|(region, mem_size)| (region.region_id, mem_size))
111                    .collect::<Vec<_>>()
112            );
113        }
114        // Iterate over pending regions in descending order of their memory size and schedule flush tasks
115        // for each region until the overall memory usage drops below the flush limit.
116        for (region, region_mem_size) in pending_regions.into_iter() {
117            // Make sure the first region is always flushed.
118            if memory_usage < target_memory_usage {
119                // Stop flushing regions if memory usage is already below the flush limit
120                break;
121            }
122            let task =
123                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
124            debug!("Scheduling flush task for region {}", region.region_id);
125            // Schedule a flush task for the current region
126            self.flush_scheduler
127                .schedule_flush(region.region_id, &region.version_control, task)?;
128            // Reduce memory usage by the region's size, ensuring it doesn't go negative
129            memory_usage = memory_usage.saturating_sub(region_mem_size);
130        }
131
132        Ok(())
133    }
134
135    /// Creates a flush task with specific `reason` for the `region`.
136    pub(crate) fn new_flush_task(
137        &self,
138        region: &MitoRegionRef,
139        reason: FlushReason,
140        row_group_size: Option<usize>,
141        engine_config: Arc<MitoConfig>,
142    ) -> RegionFlushTask {
143        RegionFlushTask {
144            region_id: region.region_id,
145            reason,
146            senders: Vec::new(),
147            request_sender: self.sender.clone(),
148            access_layer: region.access_layer.clone(),
149            listener: self.listener.clone(),
150            engine_config,
151            row_group_size,
152            cache_manager: self.cache_manager.clone(),
153            manifest_ctx: region.manifest_ctx.clone(),
154            index_options: region.version().options.index_options.clone(),
155            flush_semaphore: self.flush_semaphore.clone(),
156            is_staging: region.is_staging(),
157            partition_expr: region.maybe_staging_partition_expr_str(),
158        }
159    }
160}
161
162impl<S: LogStore> RegionWorkerLoop<S> {
163    /// Handles manual flush request.
164    pub(crate) fn handle_flush_request(
165        &mut self,
166        region_id: RegionId,
167        request: RegionFlushRequest,
168        reason: Option<FlushReason>,
169        mut sender: OptionOutputTx,
170    ) {
171        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
172            return;
173        };
174        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
175        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
176        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
177        self.update_topic_latest_entry_id(&region);
178
179        let reason = reason.unwrap_or_else(|| {
180            if region.is_downgrading() {
181                FlushReason::Downgrading
182            } else {
183                FlushReason::Manual
184            }
185        });
186        let mut task =
187            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
188        task.push_sender(sender);
189        if let Err(e) =
190            self.flush_scheduler
191                .schedule_flush(region.region_id, &region.version_control, task)
192        {
193            error!(e; "Failed to schedule flush task for region {}", region.region_id);
194        }
195    }
196
197    /// Flushes regions periodically.
198    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
199        let regions = self.regions.list_regions();
200        let now = self.time_provider.current_time_millis();
201        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
202
203        for region in &regions {
204            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
205                // Already flushing or not writable.
206                continue;
207            }
208            self.update_topic_latest_entry_id(region);
209
210            if region.last_flush_millis() < min_last_flush_time {
211                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
212                let task = self.new_flush_task(
213                    region,
214                    FlushReason::Periodically,
215                    None,
216                    self.config.clone(),
217                );
218                self.flush_scheduler.schedule_flush(
219                    region.region_id,
220                    &region.version_control,
221                    task,
222                )?;
223            }
224        }
225
226        Ok(())
227    }
228
229    /// On region flush job finished.
230    pub(crate) async fn handle_flush_finished(
231        &mut self,
232        region_id: RegionId,
233        mut request: FlushFinished,
234    ) {
235        // Notifies other workers. Even the remaining steps of this method fail we still
236        // wake up other workers as we have released some memory by flush.
237        self.notify_group();
238
239        let region = match self.regions.get_region(region_id) {
240            Some(region) => region,
241            None => {
242                request.on_failure(RegionNotFoundSnafu { region_id }.build());
243                return;
244            }
245        };
246
247        if request.is_staging {
248            // Skip the region metadata update.
249            info!(
250                "Skipping region metadata update for region {} in staging mode",
251                region_id
252            );
253            region.version_control.apply_edit(
254                None,
255                &request.memtables_to_remove,
256                region.file_purger.clone(),
257            );
258        } else {
259            region.version_control.apply_edit(
260                Some(request.edit.clone()),
261                &request.memtables_to_remove,
262                region.file_purger.clone(),
263            );
264        }
265
266        region.update_flush_millis();
267
268        // Delete wal.
269        info!(
270            "Region {} flush finished, tries to bump wal to {}",
271            region_id, request.flushed_entry_id
272        );
273        if let Err(e) = self
274            .wal
275            .obsolete(region_id, request.flushed_entry_id, &region.provider)
276            .await
277        {
278            error!(e; "Failed to write wal, region: {}", region_id);
279            request.on_failure(e);
280            return;
281        }
282
283        let flush_on_close = request.flush_reason == FlushReason::Closing;
284        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
285
286        // Notifies waiters and observes the flush timer.
287        request.on_success();
288
289        // In async mode, create indexes after flush.
290        if self.config.index.build_mode == IndexBuildMode::Async {
291            self.handle_rebuild_index(
292                BuildIndexRequest {
293                    region_id,
294                    build_type: IndexBuildType::Flush,
295                    file_metas: index_build_file_metas,
296                },
297                OptionOutputTx::new(None),
298            )
299            .await;
300        }
301
302        if flush_on_close {
303            // Remove region from server for flush on closing,
304            // no need to handle requests and schedule compactions.
305            self.remove_region(region_id).await;
306            info!("Region {} closed after flush", region_id);
307        } else {
308            // Handle pending requests for the region.
309            if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
310                self.flush_scheduler.on_flush_success(region_id)
311            {
312                // Perform DDLs first because they require empty memtables.
313                self.handle_ddl_requests(&mut ddl_requests).await;
314                // Handle pending write requests, we don't stall these requests.
315                self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
316                    .await;
317            }
318            // Maybe flush worker again.
319            self.maybe_flush_worker();
320            // Handle stalled requests.
321            self.handle_stalled_requests().await;
322            // Schedules compaction.
323            self.schedule_compaction(&region).await;
324        }
325
326        self.listener.on_flush_success(region_id);
327    }
328
329    /// Updates the latest entry id since flush of the region.
330    /// **This is only used for remote WAL pruning.**
331    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
332        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
333            let latest_offset = self
334                .wal
335                .store()
336                .latest_entry_id(&region.provider)
337                .unwrap_or(0);
338            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
339
340            if latest_offset > topic_last_entry_id {
341                region
342                    .topic_latest_entry_id
343                    .store(latest_offset, Ordering::Relaxed);
344                debug!(
345                    "Region {} latest entry id updated to {}",
346                    region.region_id, latest_offset
347                );
348            }
349        }
350    }
351}