Skip to main content

mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33fn resolve_flush_reason(
34    request_reason: Option<RegionFlushReason>,
35    is_downgrading: bool,
36) -> FlushReason {
37    match request_reason {
38        Some(reason) => FlushReason::from(reason),
39        None if is_downgrading => FlushReason::Downgrading,
40        None => FlushReason::Manual,
41    }
42}
43
44impl<S: LogStore> RegionWorkerLoop<S> {
45    /// On region flush job failed.
46    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
47        self.flush_scheduler.on_flush_failed(region_id, request.err);
48        debug!(
49            "Flush failed for region {}, handling stalled requests",
50            region_id
51        );
52        // Maybe flush worker again.
53        self.maybe_flush_worker();
54
55        // Handle stalled requests.
56        self.handle_stalled_requests().await;
57    }
58
59    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
60    /// worker to flush.
61    pub(crate) fn maybe_flush_worker(&mut self) {
62        if !self.write_buffer_manager.should_flush_engine() {
63            debug!("No need to flush worker");
64            // No need to flush worker.
65            return;
66        }
67
68        // If the engine needs flush, each worker will find some regions to flush. We might
69        // flush more memory than expect but it should be acceptable.
70        if let Err(e) = self.flush_regions_on_engine_full() {
71            error!(e; "Failed to flush worker");
72        }
73    }
74
75    /// Finds some regions to flush to reduce write buffer usage.
76    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
77        let regions = self.regions.list_regions();
78        let now = self.time_provider.current_time_millis();
79        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
80        let mut pending_regions = vec![];
81
82        for region in &regions {
83            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
84                // Already flushing or not writable.
85                continue;
86            }
87
88            let version = region.version();
89            let region_memtable_size =
90                version.memtables.mutable_usage() + version.memtables.immutables_usage();
91
92            if region.last_flush_millis() < min_last_flush_time {
93                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
94                let task =
95                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
96                self.flush_scheduler.schedule_flush(
97                    region.region_id,
98                    &region.version_control,
99                    task,
100                )?;
101            } else if region_memtable_size > 0 {
102                // We should only consider regions with memtable size > 0 to flush.
103                pending_regions.push((region, region_memtable_size));
104            }
105        }
106        pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
107        // The flush target is the mutable memtable limit (half of the global buffer).
108        // When memory is full, we aggressively flush regions until usage drops below this target,
109        // not just below the full limit.
110        let target_memory_usage = self.write_buffer_manager.flush_limit();
111        let mut memory_usage = self.write_buffer_manager.memory_usage();
112
113        #[cfg(test)]
114        {
115            debug!(
116                "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
117                target_memory_usage,
118                memory_usage,
119                pending_regions
120                    .iter()
121                    .map(|(region, mem_size)| (region.region_id, mem_size))
122                    .collect::<Vec<_>>()
123            );
124        }
125        // Iterate over pending regions in descending order of their memory size and schedule flush tasks
126        // for each region until the overall memory usage drops below the flush limit.
127        for (region, region_mem_size) in pending_regions.into_iter() {
128            // Make sure the first region is always flushed.
129            if memory_usage < target_memory_usage {
130                // Stop flushing regions if memory usage is already below the flush limit
131                break;
132            }
133            let task =
134                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
135            debug!("Scheduling flush task for region {}", region.region_id);
136            // Schedule a flush task for the current region
137            self.flush_scheduler
138                .schedule_flush(region.region_id, &region.version_control, task)?;
139            // Reduce memory usage by the region's size, ensuring it doesn't go negative
140            memory_usage = memory_usage.saturating_sub(region_mem_size);
141        }
142
143        Ok(())
144    }
145
146    /// Creates a flush task with specific `reason` for the `region`.
147    pub(crate) fn new_flush_task(
148        &self,
149        region: &MitoRegionRef,
150        reason: FlushReason,
151        row_group_size: Option<usize>,
152        engine_config: Arc<MitoConfig>,
153    ) -> RegionFlushTask {
154        RegionFlushTask {
155            region_id: region.region_id,
156            reason,
157            senders: Vec::new(),
158            request_sender: self.sender.clone(),
159            access_layer: region.access_layer.clone(),
160            listener: self.listener.clone(),
161            engine_config,
162            row_group_size,
163            cache_manager: self.cache_manager.clone(),
164            manifest_ctx: region.manifest_ctx.clone(),
165            index_options: region.version().options.index_options.clone(),
166            flush_semaphore: self.flush_semaphore.clone(),
167            is_staging: region.is_staging(),
168            partition_expr: region.maybe_staging_partition_expr_str(),
169        }
170    }
171}
172
173impl<S: LogStore> RegionWorkerLoop<S> {
174    /// Handles manual flush request.
175    pub(crate) fn handle_flush_request(
176        &mut self,
177        region_id: RegionId,
178        request: RegionFlushRequest,
179        sender: OptionOutputTx,
180    ) {
181        let region = match self.regions.flushable_region(region_id) {
182            Ok(region) => region,
183            Err(e) => {
184                sender.send(Err(e));
185                return;
186            }
187        };
188
189        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
190        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
191        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
192        self.update_topic_latest_entry_id(&region);
193
194        let reason = resolve_flush_reason(request.reason, region.is_downgrading());
195        let mut task =
196            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
197        task.push_sender(sender);
198        if let Err(e) =
199            self.flush_scheduler
200                .schedule_flush(region.region_id, &region.version_control, task)
201        {
202            error!(e; "Failed to schedule flush task for region {}", region.region_id);
203        }
204    }
205
206    /// Flushes regions periodically.
207    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
208        let regions = self.regions.list_regions();
209        let now = self.time_provider.current_time_millis();
210        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
211
212        for region in &regions {
213            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
214                // Already flushing or not writable.
215                continue;
216            }
217            self.update_topic_latest_entry_id(region);
218
219            if region.last_flush_millis() < min_last_flush_time {
220                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
221                let task = self.new_flush_task(
222                    region,
223                    FlushReason::Periodically,
224                    None,
225                    self.config.clone(),
226                );
227                self.flush_scheduler.schedule_flush(
228                    region.region_id,
229                    &region.version_control,
230                    task,
231                )?;
232            }
233        }
234
235        Ok(())
236    }
237
238    /// On region flush job finished.
239    pub(crate) async fn handle_flush_finished(
240        &mut self,
241        region_id: RegionId,
242        mut request: FlushFinished,
243    ) {
244        // Notifies other workers. Even the remaining steps of this method fail we still
245        // wake up other workers as we have released some memory by flush.
246        self.notify_group();
247
248        let region = match self.regions.get_region(region_id) {
249            Some(region) => region,
250            None => {
251                request.on_failure(RegionNotFoundSnafu { region_id }.build());
252                return;
253            }
254        };
255
256        if request.is_staging {
257            // Skip the region metadata update.
258            info!(
259                "Skipping region metadata update for region {} in staging mode",
260                region_id
261            );
262            region.version_control.apply_edit(
263                None,
264                &request.memtables_to_remove,
265                region.file_purger.clone(),
266            );
267        } else {
268            region.version_control.apply_edit(
269                Some(request.edit.clone()),
270                &request.memtables_to_remove,
271                region.file_purger.clone(),
272            );
273        }
274
275        region.update_flush_millis();
276
277        // Delete wal.
278        info!(
279            "Region {} flush finished, tries to bump wal to {}",
280            region_id, request.flushed_entry_id
281        );
282        if let Err(e) = self
283            .wal
284            .obsolete(region_id, request.flushed_entry_id, &region.provider)
285            .await
286        {
287            error!(e; "Failed to write wal, region: {}", region_id);
288            request.on_failure(e);
289            return;
290        }
291
292        let flush_on_close = request.flush_reason == FlushReason::Closing;
293        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
294
295        // In async mode, create indexes after flush.
296        if self.config.index.build_mode == IndexBuildMode::Async {
297            self.handle_rebuild_index(
298                BuildIndexRequest {
299                    region_id,
300                    build_type: IndexBuildType::Flush,
301                    file_metas: index_build_file_metas,
302                },
303                OptionOutputTx::new(None),
304            )
305            .await;
306        }
307
308        if flush_on_close {
309            // Remove region from server for flush on closing,
310            // no need to handle requests and schedule compactions.
311            self.remove_region(region_id).await;
312            info!("Region {} closed after flush", region_id);
313            request.on_success();
314        } else {
315            // Notifies waiters and observes the flush timer.
316            request.on_success();
317            // Handle pending requests for the region.
318            if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
319                self.flush_scheduler.on_flush_success(region_id)
320            {
321                // Perform DDLs first because they require empty memtables.
322                self.handle_ddl_requests(&mut ddl_requests).await;
323                // Handle pending write requests, we don't stall these requests.
324                self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
325                    .await;
326            }
327            // Maybe flush worker again.
328            self.maybe_flush_worker();
329            // Handle stalled requests.
330            self.handle_stalled_requests().await;
331            // Schedules compaction.
332            self.schedule_compaction(&region).await;
333        }
334
335        self.listener.on_flush_success(region_id);
336    }
337
338    /// Updates the latest entry id since flush of the region.
339    /// **This is only used for remote WAL pruning.**
340    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
341        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
342            let latest_offset = self
343                .wal
344                .store()
345                .latest_entry_id(&region.provider)
346                .unwrap_or(0);
347            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
348
349            if latest_offset > topic_last_entry_id {
350                region
351                    .topic_latest_entry_id
352                    .store(latest_offset, Ordering::Relaxed);
353                debug!(
354                    "Region {} latest entry id updated to {}",
355                    region.region_id, latest_offset
356                );
357            }
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_resolve_flush_reason_uses_request_reason() {
368        assert_eq!(
369            resolve_flush_reason(Some(RegionFlushReason::RegionMigration), true),
370            FlushReason::RegionMigration
371        );
372        assert_eq!(
373            resolve_flush_reason(Some(RegionFlushReason::Repartition), false),
374            FlushReason::Repartition
375        );
376        assert_eq!(
377            resolve_flush_reason(Some(RegionFlushReason::RemoteWalPrune), false),
378            FlushReason::RemoteWalPrune
379        );
380        assert_eq!(
381            resolve_flush_reason(Some(RegionFlushReason::Closing), false),
382            FlushReason::Closing
383        );
384        assert_eq!(
385            resolve_flush_reason(Some(RegionFlushReason::Downgrading), false),
386            FlushReason::Downgrading
387        );
388    }
389
390    #[test]
391    fn test_resolve_flush_reason_fallback_unchanged() {
392        assert_eq!(resolve_flush_reason(None, true), FlushReason::Downgrading);
393        assert_eq!(resolve_flush_reason(None, false), FlushReason::Manual);
394    }
395}