mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    /// On region flush job failed.
35    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36        self.flush_scheduler.on_flush_failed(region_id, request.err);
37        debug!(
38            "Flush failed for region {}, handling stalled requests",
39            region_id
40        );
41        // Maybe flush worker again.
42        self.maybe_flush_worker();
43
44        // Handle stalled requests.
45        self.handle_stalled_requests().await;
46    }
47
48    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
49    /// worker to flush.
50    pub(crate) fn maybe_flush_worker(&mut self) {
51        if !self.write_buffer_manager.should_flush_engine() {
52            debug!("No need to flush worker");
53            // No need to flush worker.
54            return;
55        }
56
57        // If the engine needs flush, each worker will find some regions to flush. We might
58        // flush more memory than expect but it should be acceptable.
59        if let Err(e) = self.flush_regions_on_engine_full() {
60            error!(e; "Failed to flush worker");
61        }
62    }
63
64    /// Finds some regions to flush to reduce write buffer usage.
65    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
66        let regions = self.regions.list_regions();
67        let now = self.time_provider.current_time_millis();
68        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
69        let mut pending_regions = vec![];
70
71        for region in &regions {
72            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
73                // Already flushing or not writable.
74                continue;
75            }
76
77            let version = region.version();
78            let region_memtable_size =
79                version.memtables.mutable_usage() + version.memtables.immutables_usage();
80
81            if region.last_flush_millis() < min_last_flush_time {
82                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
83                let task = self.new_flush_task(
84                    region,
85                    FlushReason::EngineFull,
86                    None,
87                    self.config.clone(),
88                    region.is_staging(),
89                );
90                self.flush_scheduler.schedule_flush(
91                    region.region_id,
92                    &region.version_control,
93                    task,
94                )?;
95            } else if region_memtable_size > 0 {
96                // We should only consider regions with memtable size > 0 to flush.
97                pending_regions.push((region, region_memtable_size));
98            }
99        }
100        pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
101        // The flush target is the mutable memtable limit (half of the global buffer).
102        // When memory is full, we aggressively flush regions until usage drops below this target,
103        // not just below the full limit.
104        let target_memory_usage = self.write_buffer_manager.flush_limit();
105        let mut memory_usage = self.write_buffer_manager.memory_usage();
106
107        #[cfg(test)]
108        {
109            debug!(
110                "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
111                target_memory_usage,
112                memory_usage,
113                pending_regions
114                    .iter()
115                    .map(|(region, mem_size)| (region.region_id, mem_size))
116                    .collect::<Vec<_>>()
117            );
118        }
119        // Iterate over pending regions in descending order of their memory size and schedule flush tasks
120        // for each region until the overall memory usage drops below the flush limit.
121        for (region, region_mem_size) in pending_regions.into_iter() {
122            // Make sure the first region is always flushed.
123            if memory_usage < target_memory_usage {
124                // Stop flushing regions if memory usage is already below the flush limit
125                break;
126            }
127            let task = self.new_flush_task(
128                region,
129                FlushReason::EngineFull,
130                None,
131                self.config.clone(),
132                region.is_staging(),
133            );
134            debug!("Scheduling flush task for region {}", region.region_id);
135            // Schedule a flush task for the current region
136            self.flush_scheduler
137                .schedule_flush(region.region_id, &region.version_control, task)?;
138            // Reduce memory usage by the region's size, ensuring it doesn't go negative
139            memory_usage = memory_usage.saturating_sub(region_mem_size);
140        }
141
142        Ok(())
143    }
144
145    /// Creates a flush task with specific `reason` for the `region`.
146    pub(crate) fn new_flush_task(
147        &self,
148        region: &MitoRegionRef,
149        reason: FlushReason,
150        row_group_size: Option<usize>,
151        engine_config: Arc<MitoConfig>,
152        is_staging: bool,
153    ) -> RegionFlushTask {
154        RegionFlushTask {
155            region_id: region.region_id,
156            reason,
157            senders: Vec::new(),
158            request_sender: self.sender.clone(),
159            access_layer: region.access_layer.clone(),
160            listener: self.listener.clone(),
161            engine_config,
162            row_group_size,
163            cache_manager: self.cache_manager.clone(),
164            manifest_ctx: region.manifest_ctx.clone(),
165            index_options: region.version().options.index_options.clone(),
166            flush_semaphore: self.flush_semaphore.clone(),
167            is_staging,
168        }
169    }
170}
171
172impl<S: LogStore> RegionWorkerLoop<S> {
173    /// Handles manual flush request.
174    pub(crate) fn handle_flush_request(
175        &mut self,
176        region_id: RegionId,
177        request: RegionFlushRequest,
178        mut sender: OptionOutputTx,
179    ) {
180        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
181            return;
182        };
183        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
184        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
185        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
186        self.update_topic_latest_entry_id(&region);
187
188        let reason = if region.is_downgrading() {
189            FlushReason::Downgrading
190        } else {
191            FlushReason::Manual
192        };
193
194        let mut task = self.new_flush_task(
195            &region,
196            reason,
197            request.row_group_size,
198            self.config.clone(),
199            region.is_staging(),
200        );
201        task.push_sender(sender);
202        if let Err(e) =
203            self.flush_scheduler
204                .schedule_flush(region.region_id, &region.version_control, task)
205        {
206            error!(e; "Failed to schedule flush task for region {}", region.region_id);
207        }
208    }
209
210    /// Flushes regions periodically.
211    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
212        let regions = self.regions.list_regions();
213        let now = self.time_provider.current_time_millis();
214        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
215
216        for region in &regions {
217            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
218                // Already flushing or not writable.
219                continue;
220            }
221            self.update_topic_latest_entry_id(region);
222
223            if region.last_flush_millis() < min_last_flush_time {
224                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
225                let task = self.new_flush_task(
226                    region,
227                    FlushReason::Periodically,
228                    None,
229                    self.config.clone(),
230                    region.is_staging(),
231                );
232                self.flush_scheduler.schedule_flush(
233                    region.region_id,
234                    &region.version_control,
235                    task,
236                )?;
237            }
238        }
239
240        Ok(())
241    }
242
243    /// On region flush job finished.
244    pub(crate) async fn handle_flush_finished(
245        &mut self,
246        region_id: RegionId,
247        mut request: FlushFinished,
248    ) {
249        // Notifies other workers. Even the remaining steps of this method fail we still
250        // wake up other workers as we have released some memory by flush.
251        self.notify_group();
252
253        let region = match self.regions.get_region(region_id) {
254            Some(region) => region,
255            None => {
256                request.on_failure(RegionNotFoundSnafu { region_id }.build());
257                return;
258            }
259        };
260
261        if request.is_staging {
262            // Skip the region metadata update.
263            info!(
264                "Skipping region metadata update for region {} in staging mode",
265                region_id
266            );
267            region.version_control.apply_edit(
268                None,
269                &request.memtables_to_remove,
270                region.file_purger.clone(),
271            );
272        } else {
273            region.version_control.apply_edit(
274                Some(request.edit.clone()),
275                &request.memtables_to_remove,
276                region.file_purger.clone(),
277            );
278        }
279
280        region.update_flush_millis();
281
282        // Delete wal.
283        info!(
284            "Region {} flush finished, tries to bump wal to {}",
285            region_id, request.flushed_entry_id
286        );
287        if let Err(e) = self
288            .wal
289            .obsolete(region_id, request.flushed_entry_id, &region.provider)
290            .await
291        {
292            error!(e; "Failed to write wal, region: {}", region_id);
293            request.on_failure(e);
294            return;
295        }
296
297        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
298
299        // Notifies waiters and observes the flush timer.
300        request.on_success();
301
302        // In async mode, create indexes after flush.
303        if self.config.index.build_mode == IndexBuildMode::Async {
304            self.handle_rebuild_index(
305                BuildIndexRequest {
306                    region_id,
307                    build_type: IndexBuildType::Flush,
308                    file_metas: index_build_file_metas,
309                },
310                OptionOutputTx::new(None),
311            )
312            .await;
313        }
314
315        // Handle pending requests for the region.
316        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
317            self.flush_scheduler.on_flush_success(region_id)
318        {
319            // Perform DDLs first because they require empty memtables.
320            self.handle_ddl_requests(&mut ddl_requests).await;
321            // Handle pending write requests, we don't stall these requests.
322            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
323                .await;
324        }
325
326        // Maybe flush worker again.
327        self.maybe_flush_worker();
328
329        // Handle stalled requests.
330        self.handle_stalled_requests().await;
331
332        // Schedules compaction.
333        self.schedule_compaction(&region).await;
334
335        self.listener.on_flush_success(region_id);
336    }
337
338    /// Updates the latest entry id since flush of the region.
339    /// **This is only used for remote WAL pruning.**
340    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
341        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
342            let latest_offset = self
343                .wal
344                .store()
345                .latest_entry_id(&region.provider)
346                .unwrap_or(0);
347            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
348
349            if latest_offset > topic_last_entry_id {
350                region
351                    .topic_latest_entry_id
352                    .store(latest_offset, Ordering::Relaxed);
353                debug!(
354                    "Region {} latest entry id updated to {}",
355                    region.region_id, latest_offset
356                );
357            }
358        }
359    }
360}