mito2/worker/
handle_flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling flush related requests.
16
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19
20use common_telemetry::{error, info};
21use store_api::logstore::LogStore;
22use store_api::region_request::RegionFlushRequest;
23use store_api::storage::RegionId;
24
25use crate::config::MitoConfig;
26use crate::error::{RegionNotFoundSnafu, Result};
27use crate::flush::{FlushReason, RegionFlushTask};
28use crate::region::MitoRegionRef;
29use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33    /// On region flush job failed.
34    pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
35        self.flush_scheduler.on_flush_failed(region_id, request.err);
36    }
37
38    /// Checks whether the engine reaches flush threshold. If so, finds regions in this
39    /// worker to flush.
40    pub(crate) fn maybe_flush_worker(&mut self) {
41        if !self.write_buffer_manager.should_flush_engine() {
42            // No need to flush worker.
43            return;
44        }
45
46        // If the engine needs flush, each worker will find some regions to flush. We might
47        // flush more memory than expect but it should be acceptable.
48        if let Err(e) = self.flush_regions_on_engine_full() {
49            error!(e; "Failed to flush worker");
50        }
51    }
52
53    /// Finds some regions to flush to reduce write buffer usage.
54    fn flush_regions_on_engine_full(&mut self) -> Result<()> {
55        let regions = self.regions.list_regions();
56        let now = self.time_provider.current_time_millis();
57        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
58        let mut max_mutable_size = 0;
59        // Region with max mutable memtable size.
60        let mut max_mem_region = None;
61
62        for region in &regions {
63            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
64                // Already flushing or not writable.
65                continue;
66            }
67
68            let version = region.version();
69            let region_mutable_size = version.memtables.mutable_usage();
70            // Tracks region with max mutable memtable size.
71            if region_mutable_size > max_mutable_size {
72                max_mem_region = Some(region);
73                max_mutable_size = region_mutable_size;
74            }
75
76            if region.last_flush_millis() < min_last_flush_time {
77                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
78                let task =
79                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
80                self.flush_scheduler.schedule_flush(
81                    region.region_id,
82                    &region.version_control,
83                    task,
84                )?;
85            }
86        }
87
88        // Flush memtable with max mutable memtable.
89        // TODO(yingwen): Maybe flush more tables to reduce write buffer size.
90        if let Some(region) = max_mem_region {
91            if !self.flush_scheduler.is_flush_requested(region.region_id) {
92                let task =
93                    self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
94                self.flush_scheduler.schedule_flush(
95                    region.region_id,
96                    &region.version_control,
97                    task,
98                )?;
99            }
100        }
101
102        Ok(())
103    }
104
105    /// Creates a flush task with specific `reason` for the `region`.
106    pub(crate) fn new_flush_task(
107        &self,
108        region: &MitoRegionRef,
109        reason: FlushReason,
110        row_group_size: Option<usize>,
111        engine_config: Arc<MitoConfig>,
112    ) -> RegionFlushTask {
113        RegionFlushTask {
114            region_id: region.region_id,
115            reason,
116            senders: Vec::new(),
117            request_sender: self.sender.clone(),
118            access_layer: region.access_layer.clone(),
119            listener: self.listener.clone(),
120            engine_config,
121            row_group_size,
122            cache_manager: self.cache_manager.clone(),
123            manifest_ctx: region.manifest_ctx.clone(),
124            index_options: region.version().options.index_options.clone(),
125        }
126    }
127}
128
129impl<S: LogStore> RegionWorkerLoop<S> {
130    /// Handles manual flush request.
131    pub(crate) async fn handle_flush_request(
132        &mut self,
133        region_id: RegionId,
134        request: RegionFlushRequest,
135        mut sender: OptionOutputTx,
136    ) {
137        let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
138            return;
139        };
140        // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
141        // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
142        // when handling flush request instead of in `schedule_flush` or `flush_finished`.
143        self.update_topic_latest_entry_id(&region);
144
145        let reason = if region.is_downgrading() {
146            FlushReason::Downgrading
147        } else {
148            FlushReason::Manual
149        };
150
151        let mut task =
152            self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
153        task.push_sender(sender);
154        if let Err(e) =
155            self.flush_scheduler
156                .schedule_flush(region.region_id, &region.version_control, task)
157        {
158            error!(e; "Failed to schedule flush task for region {}", region.region_id);
159        }
160    }
161
162    /// Flushes regions periodically.
163    pub(crate) fn flush_periodically(&mut self) -> Result<()> {
164        let regions = self.regions.list_regions();
165        let now = self.time_provider.current_time_millis();
166        let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
167
168        for region in &regions {
169            if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
170                // Already flushing or not writable.
171                continue;
172            }
173            self.update_topic_latest_entry_id(region);
174
175            if region.last_flush_millis() < min_last_flush_time {
176                // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
177                let task = self.new_flush_task(
178                    region,
179                    FlushReason::Periodically,
180                    None,
181                    self.config.clone(),
182                );
183                self.flush_scheduler.schedule_flush(
184                    region.region_id,
185                    &region.version_control,
186                    task,
187                )?;
188            }
189        }
190
191        Ok(())
192    }
193
194    /// On region flush job finished.
195    pub(crate) async fn handle_flush_finished(
196        &mut self,
197        region_id: RegionId,
198        mut request: FlushFinished,
199    ) {
200        // Notifies other workers. Even the remaining steps of this method fail we still
201        // wake up other workers as we have released some memory by flush.
202        self.notify_group();
203
204        let region = match self.regions.get_region(region_id) {
205            Some(region) => region,
206            None => {
207                request.on_failure(RegionNotFoundSnafu { region_id }.build());
208                return;
209            }
210        };
211
212        region.version_control.apply_edit(
213            request.edit.clone(),
214            &request.memtables_to_remove,
215            region.file_purger.clone(),
216        );
217
218        region.update_flush_millis();
219
220        // Delete wal.
221        info!(
222            "Region {} flush finished, tries to bump wal to {}",
223            region_id, request.flushed_entry_id
224        );
225        if let Err(e) = self
226            .wal
227            .obsolete(region_id, request.flushed_entry_id, &region.provider)
228            .await
229        {
230            error!(e; "Failed to write wal, region: {}", region_id);
231            request.on_failure(e);
232            return;
233        }
234
235        // Notifies waiters and observes the flush timer.
236        request.on_success();
237
238        // Handle pending requests for the region.
239        if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
240            self.flush_scheduler.on_flush_success(region_id)
241        {
242            // Perform DDLs first because they require empty memtables.
243            self.handle_ddl_requests(&mut ddl_requests).await;
244            // Handle pending write requests, we don't stall these requests.
245            self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
246                .await;
247        }
248
249        // Handle stalled requests.
250        self.handle_stalled_requests().await;
251
252        // Schedules compaction.
253        self.schedule_compaction(&region).await;
254
255        self.listener.on_flush_success(region_id);
256    }
257
258    /// Updates the latest entry id since flush of the region.
259    /// **This is only used for remote WAL pruning.**
260    pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
261        if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
262            let high_watermark = self
263                .wal
264                .store()
265                .high_watermark(&region.provider)
266                .unwrap_or(0);
267            let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
268
269            if high_watermark != 0 && high_watermark > topic_last_entry_id {
270                region
271                    .topic_latest_entry_id
272                    .store(high_watermark, Ordering::Relaxed);
273                info!(
274                    "Region {} high watermark updated to {}",
275                    region.region_id, high_watermark
276                );
277            }
278        }
279    }
280}