mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::{IndexBuildMode, MitoConfig};
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::sst::index::IndexBuildType;
31use crate::worker::RegionWorkerLoop;
32
33impl<S> RegionWorkerLoop<S> {
34    /// On region flush job failed.
35    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
36        self.flush_scheduler.on_flush_failed(region_id, request.err);
37    }
38
39    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
40    /// worker to flush.
41    pub(crate) fn maybe_flush_worker(&mut self) {
42        if !self.write_buffer_manager.should_flush_engine() {
43            // No need to flush worker.
44            return;
45        }
46
47        // If the engine needs flush, each worker will find some regions to flush. We might
48        // flush more memory than expect but it should be acceptable.
49        if let Err(e) = self.flush_regions_on_engine_full() {
50            error!(e; "Failed to flush worker");
51        }
52    }
53
54    /// Finds some regions to flush to reduce write buffer usage.
55    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
56        let regions = self.regions.list_regions();
57        let now = self.time_provider.current_time_millis();
58        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
59        let mut max_mutable_size = 0;
60        // Region with max mutable memtable size.
61        let mut max_mem_region = None;
62
63        for region in &regions {
64            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
65                // Already flushing or not writable.
66                continue;
67            }
68
69            let version = region.version();
70            let region_mutable_size = version.memtables.mutable_usage();
71            // Tracks region with max mutable memtable size.
72            if region_mutable_size > max_mutable_size {
73                max_mem_region = Some(region);
74                max_mutable_size = region_mutable_size;
75            }
76
77            if region.last_flush_millis() < min_last_flush_time {
78                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
79                let task =
80                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
81                self.flush_scheduler.schedule_flush(
82                    region.region_id,
83                    &region.version_control,
84                    task,
85                )?;
86            }
87        }
88
89        // Flush memtable with max mutable memtable.
90        // TODO(yingwen): Maybe flush more tables to reduce write buffer size.
91        if let Some(region) = max_mem_region
92            && !self.flush_scheduler.is_flush_requested(region.region_id)
93        {
94            let task =
95                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
96            self.flush_scheduler
97                .schedule_flush(region.region_id, &region.version_control, task)?;
98        }
99
100        Ok(())
101    }
102
103    /// Creates a flush task with specific `reason` for the `region`.
104    pub(crate) fn new_flush_task(
105        &self,
106        region: &MitoRegionRef,
107        reason: FlushReason,
108        row_group_size: Option<usize>,
109        engine_config: Arc<MitoConfig>,
110    ) -> RegionFlushTask {
111        RegionFlushTask {
112            region_id: region.region_id,
113            reason,
114            senders: Vec::new(),
115            request_sender: self.sender.clone(),
116            access_layer: region.access_layer.clone(),
117            listener: self.listener.clone(),
118            engine_config,
119            row_group_size,
120            cache_manager: self.cache_manager.clone(),
121            manifest_ctx: region.manifest_ctx.clone(),
122            index_options: region.version().options.index_options.clone(),
123            flush_semaphore: self.flush_semaphore.clone(),
124        }
125    }
126}
127
128impl<S: LogStore> RegionWorkerLoop<S> {
129    /// Handles manual flush request.
130    pub(crate) async fn handle_flush_request(
131        &mut self,
132        region_id: RegionId,
133        request: RegionFlushRequest,
134        mut sender: OptionOutputTx,
135    ) {
136        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
137            return;
138        };
139        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
140        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
141        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
142        self.update_topic_latest_entry_id(&region);
143
144        let reason = if region.is_downgrading() {
145            FlushReason::Downgrading
146        } else {
147            FlushReason::Manual
148        };
149
150        let mut task =
151            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
152        task.push_sender(sender);
153        if let Err(e) =
154            self.flush_scheduler
155                .schedule_flush(region.region_id, &region.version_control, task)
156        {
157            error!(e; "Failed to schedule flush task for region {}", region.region_id);
158        }
159    }
160
161    /// Flushes regions periodically.
162    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
163        let regions = self.regions.list_regions();
164        let now = self.time_provider.current_time_millis();
165        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
166
167        for region in &regions {
168            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
169                // Already flushing or not writable.
170                continue;
171            }
172            self.update_topic_latest_entry_id(region);
173
174            if region.last_flush_millis() < min_last_flush_time {
175                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
176                let task = self.new_flush_task(
177                    region,
178                    FlushReason::Periodically,
179                    None,
180                    self.config.clone(),
181                );
182                self.flush_scheduler.schedule_flush(
183                    region.region_id,
184                    &region.version_control,
185                    task,
186                )?;
187            }
188        }
189
190        Ok(())
191    }
192
193    /// On region flush job finished.
194    pub(crate) async fn handle_flush_finished(
195        &mut self,
196        region_id: RegionId,
197        mut request: FlushFinished,
198    ) {
199        // Notifies other workers. Even the remaining steps of this method fail we still
200        // wake up other workers as we have released some memory by flush.
201        self.notify_group();
202
203        let region = match self.regions.get_region(region_id) {
204            Some(region) => region,
205            None => {
206                request.on_failure(RegionNotFoundSnafu { region_id }.build());
207                return;
208            }
209        };
210
211        // Check if region is currently in staging mode
212        let is_staging = region.manifest_ctx.current_state()
213            == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
214
215        if is_staging {
216            info!(
217                "Skipping region metadata update for region {} in staging mode",
218                region_id
219            );
220            region.version_control.apply_edit(
221                None,
222                &request.memtables_to_remove,
223                region.file_purger.clone(),
224            );
225        } else {
226            region.version_control.apply_edit(
227                Some(request.edit.clone()),
228                &request.memtables_to_remove,
229                region.file_purger.clone(),
230            );
231        }
232
233        region.update_flush_millis();
234
235        // Delete wal.
236        info!(
237            "Region {} flush finished, tries to bump wal to {}",
238            region_id, request.flushed_entry_id
239        );
240        if let Err(e) = self
241            .wal
242            .obsolete(region_id, request.flushed_entry_id, &region.provider)
243            .await
244        {
245            error!(e; "Failed to write wal, region: {}", region_id);
246            request.on_failure(e);
247            return;
248        }
249
250        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
251
252        // Notifies waiters and observes the flush timer.
253        request.on_success();
254
255        // In async mode, create indexes after flush.
256        if self.config.index.build_mode == IndexBuildMode::Async {
257            self.handle_rebuild_index(
258                BuildIndexRequest {
259                    region_id,
260                    build_type: IndexBuildType::Flush,
261                    file_metas: index_build_file_metas,
262                },
263                OptionOutputTx::new(None),
264            )
265            .await;
266        }
267
268        // Handle pending requests for the region.
269        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
270            self.flush_scheduler.on_flush_success(region_id)
271        {
272            // Perform DDLs first because they require empty memtables.
273            self.handle_ddl_requests(&mut ddl_requests).await;
274            // Handle pending write requests, we don't stall these requests.
275            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
276                .await;
277        }
278
279        // Handle stalled requests.
280        self.handle_stalled_requests().await;
281
282        // Schedules compaction.
283        self.schedule_compaction(&region).await;
284
285        self.listener.on_flush_success(region_id);
286    }
287
288    /// Updates the latest entry id since flush of the region.
289    /// **This is only used for remote WAL pruning.**
290    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
291        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
292            let latest_offset = self
293                .wal
294                .store()
295                .latest_entry_id(&region.provider)
296                .unwrap_or(0);
297            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
298
299            if latest_offset > topic_last_entry_id {
300                region
301                    .topic_latest_entry_id
302                    .store(latest_offset, Ordering::Relaxed);
303                debug!(
304                    "Region {} latest entry id updated to {}",
305                    region.region_id, latest_offset
306                );
307            }
308        }
309    }
310}