mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::MitoConfig;
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33    /// On region flush job failed.
34    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
35        self.flush_scheduler.on_flush_failed(region_id, request.err);
36    }
37
38    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
39    /// worker to flush.
40    pub(crate) fn maybe_flush_worker(&mut self) {
41        if !self.write_buffer_manager.should_flush_engine() {
42            // No need to flush worker.
43            return;
44        }
45
46        // If the engine needs flush, each worker will find some regions to flush. We might
47        // flush more memory than expect but it should be acceptable.
48        if let Err(e) = self.flush_regions_on_engine_full() {
49            error!(e; "Failed to flush worker");
50        }
51    }
52
53    /// Finds some regions to flush to reduce write buffer usage.
54    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
55        let regions = self.regions.list_regions();
56        let now = self.time_provider.current_time_millis();
57        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
58        let mut max_mutable_size = 0;
59        // Region with max mutable memtable size.
60        let mut max_mem_region = None;
61
62        for region in &regions {
63            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
64                // Already flushing or not writable.
65                continue;
66            }
67
68            let version = region.version();
69            let region_mutable_size = version.memtables.mutable_usage();
70            // Tracks region with max mutable memtable size.
71            if region_mutable_size > max_mutable_size {
72                max_mem_region = Some(region);
73                max_mutable_size = region_mutable_size;
74            }
75
76            if region.last_flush_millis() < min_last_flush_time {
77                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
78                let task =
79                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
80                self.flush_scheduler.schedule_flush(
81                    region.region_id,
82                    &region.version_control,
83                    task,
84                )?;
85            }
86        }
87
88        // Flush memtable with max mutable memtable.
89        // TODO(yingwen): Maybe flush more tables to reduce write buffer size.
90        if let Some(region) = max_mem_region
91            && !self.flush_scheduler.is_flush_requested(region.region_id)
92        {
93            let task =
94                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
95            self.flush_scheduler
96                .schedule_flush(region.region_id, &region.version_control, task)?;
97        }
98
99        Ok(())
100    }
101
102    /// Creates a flush task with specific `reason` for the `region`.
103    pub(crate) fn new_flush_task(
104        &self,
105        region: &MitoRegionRef,
106        reason: FlushReason,
107        row_group_size: Option<usize>,
108        engine_config: Arc<MitoConfig>,
109    ) -> RegionFlushTask {
110        RegionFlushTask {
111            region_id: region.region_id,
112            reason,
113            senders: Vec::new(),
114            request_sender: self.sender.clone(),
115            access_layer: region.access_layer.clone(),
116            listener: self.listener.clone(),
117            engine_config,
118            row_group_size,
119            cache_manager: self.cache_manager.clone(),
120            manifest_ctx: region.manifest_ctx.clone(),
121            index_options: region.version().options.index_options.clone(),
122        }
123    }
124}
125
126impl<S: LogStore> RegionWorkerLoop<S> {
127    /// Handles manual flush request.
128    pub(crate) async fn handle_flush_request(
129        &mut self,
130        region_id: RegionId,
131        request: RegionFlushRequest,
132        mut sender: OptionOutputTx,
133    ) {
134        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
135            return;
136        };
137        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
138        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
139        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
140        self.update_topic_latest_entry_id(&region);
141
142        let reason = if region.is_downgrading() {
143            FlushReason::Downgrading
144        } else {
145            FlushReason::Manual
146        };
147
148        let mut task =
149            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
150        task.push_sender(sender);
151        if let Err(e) =
152            self.flush_scheduler
153                .schedule_flush(region.region_id, &region.version_control, task)
154        {
155            error!(e; "Failed to schedule flush task for region {}", region.region_id);
156        }
157    }
158
159    /// Flushes regions periodically.
160    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
161        let regions = self.regions.list_regions();
162        let now = self.time_provider.current_time_millis();
163        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
164
165        for region in &regions {
166            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
167                // Already flushing or not writable.
168                continue;
169            }
170            self.update_topic_latest_entry_id(region);
171
172            if region.last_flush_millis() < min_last_flush_time {
173                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
174                let task = self.new_flush_task(
175                    region,
176                    FlushReason::Periodically,
177                    None,
178                    self.config.clone(),
179                );
180                self.flush_scheduler.schedule_flush(
181                    region.region_id,
182                    &region.version_control,
183                    task,
184                )?;
185            }
186        }
187
188        Ok(())
189    }
190
191    /// On region flush job finished.
192    pub(crate) async fn handle_flush_finished(
193        &mut self,
194        region_id: RegionId,
195        mut request: FlushFinished,
196    ) {
197        // Notifies other workers. Even the remaining steps of this method fail we still
198        // wake up other workers as we have released some memory by flush.
199        self.notify_group();
200
201        let region = match self.regions.get_region(region_id) {
202            Some(region) => region,
203            None => {
204                request.on_failure(RegionNotFoundSnafu { region_id }.build());
205                return;
206            }
207        };
208
209        // Check if region is currently in staging mode
210        let is_staging = region.manifest_ctx.current_state()
211            == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
212
213        if is_staging {
214            info!(
215                "Skipping region metadata update for region {} in staging mode",
216                region_id
217            );
218            region.version_control.apply_edit(
219                None,
220                &request.memtables_to_remove,
221                region.file_purger.clone(),
222            );
223        } else {
224            region.version_control.apply_edit(
225                Some(request.edit.clone()),
226                &request.memtables_to_remove,
227                region.file_purger.clone(),
228            );
229        }
230
231        region.update_flush_millis();
232
233        // Delete wal.
234        info!(
235            "Region {} flush finished, tries to bump wal to {}",
236            region_id, request.flushed_entry_id
237        );
238        if let Err(e) = self
239            .wal
240            .obsolete(region_id, request.flushed_entry_id, &region.provider)
241            .await
242        {
243            error!(e; "Failed to write wal, region: {}", region_id);
244            request.on_failure(e);
245            return;
246        }
247
248        // Notifies waiters and observes the flush timer.
249        request.on_success();
250
251        // Handle pending requests for the region.
252        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
253            self.flush_scheduler.on_flush_success(region_id)
254        {
255            // Perform DDLs first because they require empty memtables.
256            self.handle_ddl_requests(&mut ddl_requests).await;
257            // Handle pending write requests, we don't stall these requests.
258            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
259                .await;
260        }
261
262        // Handle stalled requests.
263        self.handle_stalled_requests().await;
264
265        // Schedules compaction.
266        self.schedule_compaction(&region).await;
267
268        self.listener.on_flush_success(region_id);
269    }
270
271    /// Updates the latest entry id since flush of the region.
272    /// **This is only used for remote WAL pruning.**
273    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
274        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
275            let latest_offset = self
276                .wal
277                .store()
278                .latest_entry_id(&region.provider)
279                .unwrap_or(0);
280            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
281
282            if latest_offset > topic_last_entry_id {
283                region
284                    .topic_latest_entry_id
285                    .store(latest_offset, Ordering::Relaxed);
286                debug!(
287                    "Region {} latest entry id updated to {}",
288                    region.region_id, latest_offset
289                );
290            }
291        }
292    }
293}