mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::MitoConfig;
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33    /// On region flush job failed.
34    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
35        self.flush_scheduler.on_flush_failed(region_id, request.err);
36    }
37
38    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
39    /// worker to flush.
40    pub(crate) fn maybe_flush_worker(&mut self) {
41        if !self.write_buffer_manager.should_flush_engine() {
42            // No need to flush worker.
43            return;
44        }
45
46        // If the engine needs flush, each worker will find some regions to flush. We might
47        // flush more memory than expect but it should be acceptable.
48        if let Err(e) = self.flush_regions_on_engine_full() {
49            error!(e; "Failed to flush worker");
50        }
51    }
52
53    /// Finds some regions to flush to reduce write buffer usage.
54    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
55        let regions = self.regions.list_regions();
56        let now = self.time_provider.current_time_millis();
57        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
58        let mut max_mutable_size = 0;
59        // Region with max mutable memtable size.
60        let mut max_mem_region = None;
61
62        for region in &regions {
63            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
64                // Already flushing or not writable.
65                continue;
66            }
67
68            let version = region.version();
69            let region_mutable_size = version.memtables.mutable_usage();
70            // Tracks region with max mutable memtable size.
71            if region_mutable_size > max_mutable_size {
72                max_mem_region = Some(region);
73                max_mutable_size = region_mutable_size;
74            }
75
76            if region.last_flush_millis() < min_last_flush_time {
77                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
78                let task =
79                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
80                self.flush_scheduler.schedule_flush(
81                    region.region_id,
82                    &region.version_control,
83                    task,
84                )?;
85            }
86        }
87
88        // Flush memtable with max mutable memtable.
89        // TODO(yingwen): Maybe flush more tables to reduce write buffer size.
90        if let Some(region) = max_mem_region
91            && !self.flush_scheduler.is_flush_requested(region.region_id)
92        {
93            let task =
94                self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
95            self.flush_scheduler
96                .schedule_flush(region.region_id, &region.version_control, task)?;
97        }
98
99        Ok(())
100    }
101
102    /// Creates a flush task with specific `reason` for the `region`.
103    pub(crate) fn new_flush_task(
104        &self,
105        region: &MitoRegionRef,
106        reason: FlushReason,
107        row_group_size: Option<usize>,
108        engine_config: Arc<MitoConfig>,
109    ) -> RegionFlushTask {
110        RegionFlushTask {
111            region_id: region.region_id,
112            reason,
113            senders: Vec::new(),
114            request_sender: self.sender.clone(),
115            access_layer: region.access_layer.clone(),
116            listener: self.listener.clone(),
117            engine_config,
118            row_group_size,
119            cache_manager: self.cache_manager.clone(),
120            manifest_ctx: region.manifest_ctx.clone(),
121            index_options: region.version().options.index_options.clone(),
122            flush_semaphore: self.flush_semaphore.clone(),
123        }
124    }
125}
126
127impl<S: LogStore> RegionWorkerLoop<S> {
128    /// Handles manual flush request.
129    pub(crate) async fn handle_flush_request(
130        &mut self,
131        region_id: RegionId,
132        request: RegionFlushRequest,
133        mut sender: OptionOutputTx,
134    ) {
135        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
136            return;
137        };
138        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
139        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
140        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
141        self.update_topic_latest_entry_id(&region);
142
143        let reason = if region.is_downgrading() {
144            FlushReason::Downgrading
145        } else {
146            FlushReason::Manual
147        };
148
149        let mut task =
150            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
151        task.push_sender(sender);
152        if let Err(e) =
153            self.flush_scheduler
154                .schedule_flush(region.region_id, &region.version_control, task)
155        {
156            error!(e; "Failed to schedule flush task for region {}", region.region_id);
157        }
158    }
159
160    /// Flushes regions periodically.
161    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
162        let regions = self.regions.list_regions();
163        let now = self.time_provider.current_time_millis();
164        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
165
166        for region in &regions {
167            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
168                // Already flushing or not writable.
169                continue;
170            }
171            self.update_topic_latest_entry_id(region);
172
173            if region.last_flush_millis() < min_last_flush_time {
174                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
175                let task = self.new_flush_task(
176                    region,
177                    FlushReason::Periodically,
178                    None,
179                    self.config.clone(),
180                );
181                self.flush_scheduler.schedule_flush(
182                    region.region_id,
183                    &region.version_control,
184                    task,
185                )?;
186            }
187        }
188
189        Ok(())
190    }
191
192    /// On region flush job finished.
193    pub(crate) async fn handle_flush_finished(
194        &mut self,
195        region_id: RegionId,
196        mut request: FlushFinished,
197    ) {
198        // Notifies other workers. Even the remaining steps of this method fail we still
199        // wake up other workers as we have released some memory by flush.
200        self.notify_group();
201
202        let region = match self.regions.get_region(region_id) {
203            Some(region) => region,
204            None => {
205                request.on_failure(RegionNotFoundSnafu { region_id }.build());
206                return;
207            }
208        };
209
210        // Check if region is currently in staging mode
211        let is_staging = region.manifest_ctx.current_state()
212            == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
213
214        if is_staging {
215            info!(
216                "Skipping region metadata update for region {} in staging mode",
217                region_id
218            );
219            region.version_control.apply_edit(
220                None,
221                &request.memtables_to_remove,
222                region.file_purger.clone(),
223            );
224        } else {
225            region.version_control.apply_edit(
226                Some(request.edit.clone()),
227                &request.memtables_to_remove,
228                region.file_purger.clone(),
229            );
230        }
231
232        region.update_flush_millis();
233
234        // Delete wal.
235        info!(
236            "Region {} flush finished, tries to bump wal to {}",
237            region_id, request.flushed_entry_id
238        );
239        if let Err(e) = self
240            .wal
241            .obsolete(region_id, request.flushed_entry_id, &region.provider)
242            .await
243        {
244            error!(e; "Failed to write wal, region: {}", region_id);
245            request.on_failure(e);
246            return;
247        }
248
249        // Notifies waiters and observes the flush timer.
250        request.on_success();
251
252        // Handle pending requests for the region.
253        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
254            self.flush_scheduler.on_flush_success(region_id)
255        {
256            // Perform DDLs first because they require empty memtables.
257            self.handle_ddl_requests(&mut ddl_requests).await;
258            // Handle pending write requests, we don't stall these requests.
259            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
260                .await;
261        }
262
263        // Handle stalled requests.
264        self.handle_stalled_requests().await;
265
266        // Schedules compaction.
267        self.schedule_compaction(&region).await;
268
269        self.listener.on_flush_success(region_id);
270    }
271
272    /// Updates the latest entry id since flush of the region.
273    /// **This is only used for remote WAL pruning.**
274    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
275        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
276            let latest_offset = self
277                .wal
278                .store()
279                .latest_entry_id(&region.provider)
280                .unwrap_or(0);
281            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
282
283            if latest_offset > topic_last_entry_id {
284                region
285                    .topic_latest_entry_id
286                    .store(latest_offset, Ordering::Relaxed);
287                debug!(
288                    "Region {} latest entry id updated to {}",
289                    region.region_id, latest_offset
290                );
291            }
292        }
293    }
294}