mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32    Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::metrics::{
36    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
37    INFLIGHT_FLUSH_COUNT,
38};
39use crate::read::Source;
40use crate::region::options::IndexOptions;
41use crate::region::version::{VersionControlData, VersionControlRef};
42use crate::region::{ManifestContextRef, RegionLeaderState};
43use crate::request::{
44    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
45    SenderDdlRequest, SenderWriteRequest, WorkerRequest,
46};
47use crate::schedule::scheduler::{Job, SchedulerRef};
48use crate::sst::file::FileMeta;
49use crate::sst::parquet::WriteOptions;
50use crate::worker::WorkerListener;
51
52/// Global write buffer (memtable) manager.
53///
54/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
55pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
56    /// Returns whether to trigger the engine.
57    fn should_flush_engine(&self) -> bool;
58
59    /// Returns whether to stall write requests.
60    fn should_stall(&self) -> bool;
61
62    /// Reserves `mem` bytes.
63    fn reserve_mem(&self, mem: usize);
64
65    /// Tells the manager we are freeing `mem` bytes.
66    ///
67    /// We are in the process of freeing `mem` bytes, so it is not considered
68    /// when checking the soft limit.
69    fn schedule_free_mem(&self, mem: usize);
70
71    /// We have freed `mem` bytes.
72    fn free_mem(&self, mem: usize);
73
74    /// Returns the total memory used by memtables.
75    fn memory_usage(&self) -> usize;
76}
77
78pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
79
80/// Default [WriteBufferManager] implementation.
81///
82/// Inspired by RocksDB's WriteBufferManager.
83/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
84#[derive(Debug)]
85pub struct WriteBufferManagerImpl {
86    /// Write buffer size for the engine.
87    global_write_buffer_size: usize,
88    /// Mutable memtable memory size limit.
89    mutable_limit: usize,
90    /// Memory in used (e.g. used by mutable and immutable memtables).
91    memory_used: AtomicUsize,
92    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
93    memory_active: AtomicUsize,
94    /// Optional notifier.
95    /// The manager can wake up the worker once we free the write buffer.
96    notifier: Option<watch::Sender<()>>,
97}
98
99impl WriteBufferManagerImpl {
100    /// Returns a new manager with specific `global_write_buffer_size`.
101    pub fn new(global_write_buffer_size: usize) -> Self {
102        Self {
103            global_write_buffer_size,
104            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
105            memory_used: AtomicUsize::new(0),
106            memory_active: AtomicUsize::new(0),
107            notifier: None,
108        }
109    }
110
111    /// Attaches a notifier to the manager.
112    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
113        self.notifier = Some(notifier);
114        self
115    }
116
117    /// Returns memory usage of mutable memtables.
118    pub fn mutable_usage(&self) -> usize {
119        self.memory_active.load(Ordering::Relaxed)
120    }
121
122    /// Returns the size limit for mutable memtables.
123    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
124        // Reserves half of the write buffer for mutable memtable.
125        global_write_buffer_size / 2
126    }
127}
128
129impl WriteBufferManager for WriteBufferManagerImpl {
130    fn should_flush_engine(&self) -> bool {
131        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
132        if mutable_memtable_memory_usage > self.mutable_limit {
133            debug!(
134                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
135                mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
136            );
137            return true;
138        }
139
140        let memory_usage = self.memory_used.load(Ordering::Relaxed);
141        // If the memory exceeds the buffer size, we trigger more aggressive
142        // flush. But if already more than half memory is being flushed,
143        // triggering more flush may not help. We will hold it instead.
144        if memory_usage >= self.global_write_buffer_size {
145            if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
146                debug!(
147                "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
148                 mutable_usage: {}.",
149                memory_usage,
150                self.global_write_buffer_size,
151                mutable_memtable_memory_usage);
152                return true;
153            } else {
154                trace!(
155                    "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
156                    memory_usage,
157                    self.global_write_buffer_size,
158                    mutable_memtable_memory_usage);
159            }
160        }
161
162        false
163    }
164
165    fn should_stall(&self) -> bool {
166        self.memory_usage() >= self.global_write_buffer_size
167    }
168
169    fn reserve_mem(&self, mem: usize) {
170        self.memory_used.fetch_add(mem, Ordering::Relaxed);
171        self.memory_active.fetch_add(mem, Ordering::Relaxed);
172    }
173
174    fn schedule_free_mem(&self, mem: usize) {
175        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
176    }
177
178    fn free_mem(&self, mem: usize) {
179        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
180        if let Some(notifier) = &self.notifier {
181            // Notifies the worker after the memory usage is decreased. When we drop the memtable
182            // outside of the worker, the worker may still stall requests because the memory usage
183            // is not updated. So we need to notify the worker to handle stalled requests again.
184            let _ = notifier.send(());
185        }
186    }
187
188    fn memory_usage(&self) -> usize {
189        self.memory_used.load(Ordering::Relaxed)
190    }
191}
192
193/// Reason of a flush task.
194#[derive(Debug, IntoStaticStr)]
195pub enum FlushReason {
196    /// Other reasons.
197    Others,
198    /// Engine reaches flush threshold.
199    EngineFull,
200    /// Manual flush.
201    Manual,
202    /// Flush to alter table.
203    Alter,
204    /// Flush periodically.
205    Periodically,
206    /// Flush memtable during downgrading state.
207    Downgrading,
208}
209
210impl FlushReason {
211    /// Get flush reason as static str.
212    fn as_str(&self) -> &'static str {
213        self.into()
214    }
215}
216
217/// Task to flush a region.
218pub(crate) struct RegionFlushTask {
219    /// Region to flush.
220    pub(crate) region_id: RegionId,
221    /// Reason to flush.
222    pub(crate) reason: FlushReason,
223    /// Flush result senders.
224    pub(crate) senders: Vec<OutputTx>,
225    /// Request sender to notify the worker.
226    pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
227
228    pub(crate) access_layer: AccessLayerRef,
229    pub(crate) listener: WorkerListener,
230    pub(crate) engine_config: Arc<MitoConfig>,
231    pub(crate) row_group_size: Option<usize>,
232    pub(crate) cache_manager: CacheManagerRef,
233    pub(crate) manifest_ctx: ManifestContextRef,
234
235    /// Index options for the region.
236    pub(crate) index_options: IndexOptions,
237}
238
239impl RegionFlushTask {
240    /// Push the sender if it is not none.
241    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
242        if let Some(sender) = sender.take_inner() {
243            self.senders.push(sender);
244        }
245    }
246
247    /// Consumes the task and notify the sender the job is success.
248    fn on_success(self) {
249        for sender in self.senders {
250            sender.send(Ok(0));
251        }
252    }
253
254    /// Send flush error to waiter.
255    fn on_failure(&mut self, err: Arc<Error>) {
256        for sender in self.senders.drain(..) {
257            sender.send(Err(err.clone()).context(FlushRegionSnafu {
258                region_id: self.region_id,
259            }));
260        }
261    }
262
263    /// Converts the flush task into a background job.
264    ///
265    /// We must call this in the region worker.
266    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
267        // Get a version of this region before creating a job to get current
268        // wal entry id, sequence and immutable memtables.
269        let version_data = version_control.current();
270
271        Box::pin(async move {
272            INFLIGHT_FLUSH_COUNT.inc();
273            self.do_flush(version_data).await;
274            INFLIGHT_FLUSH_COUNT.dec();
275        })
276    }
277
278    /// Runs the flush task.
279    async fn do_flush(&mut self, version_data: VersionControlData) {
280        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
281        self.listener.on_flush_begin(self.region_id).await;
282
283        let worker_request = match self.flush_memtables(&version_data).await {
284            Ok(edit) => {
285                let memtables_to_remove = version_data
286                    .version
287                    .memtables
288                    .immutables()
289                    .iter()
290                    .map(|m| m.id())
291                    .collect();
292                let flush_finished = FlushFinished {
293                    region_id: self.region_id,
294                    // The last entry has been flushed.
295                    flushed_entry_id: version_data.last_entry_id,
296                    senders: std::mem::take(&mut self.senders),
297                    _timer: timer,
298                    edit,
299                    memtables_to_remove,
300                };
301                WorkerRequest::Background {
302                    region_id: self.region_id,
303                    notify: BackgroundNotify::FlushFinished(flush_finished),
304                }
305            }
306            Err(e) => {
307                error!(e; "Failed to flush region {}", self.region_id);
308                // Discard the timer.
309                timer.stop_and_discard();
310
311                let err = Arc::new(e);
312                self.on_failure(err.clone());
313                WorkerRequest::Background {
314                    region_id: self.region_id,
315                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
316                }
317            }
318        };
319        self.send_worker_request(worker_request).await;
320    }
321
322    /// Flushes memtables to level 0 SSTs and updates the manifest.
323    /// Returns the [RegionEdit] to apply.
324    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
325        // We must use the immutable memtables list and entry ids from the `version_data`
326        // for consistency as others might already modify the version in the `version_control`.
327        let version = &version_data.version;
328        let timer = FLUSH_ELAPSED
329            .with_label_values(&["flush_memtables"])
330            .start_timer();
331
332        let mut write_opts = WriteOptions {
333            write_buffer_size: self.engine_config.sst_write_buffer_size,
334            ..Default::default()
335        };
336        if let Some(row_group_size) = self.row_group_size {
337            write_opts.row_group_size = row_group_size;
338        }
339
340        let memtables = version.memtables.immutables();
341        let mut file_metas = Vec::with_capacity(memtables.len());
342        let mut flushed_bytes = 0;
343        let mut series_count = 0;
344        for mem in memtables {
345            if mem.is_empty() {
346                // Skip empty memtables.
347                continue;
348            }
349
350            let stats = mem.stats();
351            let max_sequence = stats.max_sequence();
352            series_count += stats.series_count();
353            let iter = mem.iter(None, None, None)?;
354            let source = Source::Iter(iter);
355
356            // Flush to level 0.
357            let write_request = SstWriteRequest {
358                op_type: OperationType::Flush,
359                metadata: version.metadata.clone(),
360                source,
361                cache_manager: self.cache_manager.clone(),
362                storage: version.options.storage.clone(),
363                max_sequence: Some(max_sequence),
364                index_options: self.index_options.clone(),
365                inverted_index_config: self.engine_config.inverted_index.clone(),
366                fulltext_index_config: self.engine_config.fulltext_index.clone(),
367                bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
368            };
369
370            let ssts_written = self
371                .access_layer
372                .write_sst(write_request, &write_opts)
373                .await?;
374            if ssts_written.is_empty() {
375                // No data written.
376                continue;
377            }
378
379            file_metas.extend(ssts_written.into_iter().map(|sst_info| {
380                flushed_bytes += sst_info.file_size;
381                FileMeta {
382                    region_id: self.region_id,
383                    file_id: sst_info.file_id,
384                    time_range: sst_info.time_range,
385                    level: 0,
386                    file_size: sst_info.file_size,
387                    available_indexes: sst_info.index_metadata.build_available_indexes(),
388                    index_file_size: sst_info.index_metadata.file_size,
389                    num_rows: sst_info.num_rows as u64,
390                    num_row_groups: sst_info.num_row_groups,
391                    sequence: NonZeroU64::new(max_sequence),
392                }
393            }));
394        }
395
396        if !file_metas.is_empty() {
397            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
398        }
399
400        let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
401        info!(
402            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}s",
403            self.region_id,
404            self.reason.as_str(),
405            file_ids,
406            series_count,
407            timer.stop_and_record(),
408        );
409
410        let edit = RegionEdit {
411            files_to_add: file_metas,
412            files_to_remove: Vec::new(),
413            compaction_time_window: None,
414            // The last entry has been flushed.
415            flushed_entry_id: Some(version_data.last_entry_id),
416            flushed_sequence: Some(version_data.committed_sequence),
417        };
418        info!("Applying {edit:?} to region {}", self.region_id);
419
420        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
421
422        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
423            RegionLeaderState::Downgrading
424        } else {
425            RegionLeaderState::Writable
426        };
427        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
428        // add a cleanup job to remove them later.
429        let version = self
430            .manifest_ctx
431            .update_manifest(expected_state, action_list)
432            .await?;
433        info!(
434            "Successfully update manifest version to {version}, region: {}, reason: {}",
435            self.region_id,
436            self.reason.as_str()
437        );
438
439        Ok(edit)
440    }
441
442    /// Notify flush job status.
443    async fn send_worker_request(&self, request: WorkerRequest) {
444        if let Err(e) = self.request_sender.send(request).await {
445            error!(
446                "Failed to notify flush job status for region {}, request: {:?}",
447                self.region_id, e.0
448            );
449        }
450    }
451
452    /// Merge two flush tasks.
453    fn merge(&mut self, mut other: RegionFlushTask) {
454        assert_eq!(self.region_id, other.region_id);
455        // Now we only merge senders. They share the same flush reason.
456        self.senders.append(&mut other.senders);
457    }
458}
459
460/// Manages background flushes of a worker.
461pub(crate) struct FlushScheduler {
462    /// Tracks regions need to flush.
463    region_status: HashMap<RegionId, FlushStatus>,
464    /// Background job scheduler.
465    scheduler: SchedulerRef,
466}
467
468impl FlushScheduler {
469    /// Creates a new flush scheduler.
470    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
471        FlushScheduler {
472            region_status: HashMap::new(),
473            scheduler,
474        }
475    }
476
477    /// Returns true if the region already requested flush.
478    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
479        self.region_status.contains_key(&region_id)
480    }
481
482    /// Schedules a flush `task` for specific `region`.
483    pub(crate) fn schedule_flush(
484        &mut self,
485        region_id: RegionId,
486        version_control: &VersionControlRef,
487        task: RegionFlushTask,
488    ) -> Result<()> {
489        debug_assert_eq!(region_id, task.region_id);
490
491        let version = version_control.current().version;
492        if version.memtables.is_empty() {
493            debug_assert!(!self.region_status.contains_key(&region_id));
494            // The region has nothing to flush.
495            task.on_success();
496            return Ok(());
497        }
498
499        // Don't increase the counter if a region has nothing to flush.
500        FLUSH_REQUESTS_TOTAL
501            .with_label_values(&[task.reason.as_str()])
502            .inc();
503
504        // Add this region to status map.
505        let flush_status = self
506            .region_status
507            .entry(region_id)
508            .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
509        // Checks whether we can flush the region now.
510        if flush_status.flushing {
511            // There is already a flush job running.
512            flush_status.merge_task(task);
513            return Ok(());
514        }
515
516        // TODO(yingwen): We can merge with pending and execute directly.
517        // If there are pending tasks, then we should push it to pending list.
518        if flush_status.pending_task.is_some() {
519            flush_status.merge_task(task);
520            return Ok(());
521        }
522
523        // Now we can flush the region directly.
524        if let Err(e) = version_control.freeze_mutable() {
525            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
526
527            // Remove from region status if we can't freeze the mutable memtable.
528            self.region_status.remove(&region_id);
529            return Err(e);
530        }
531        // Submit a flush job.
532        let job = task.into_flush_job(version_control);
533        if let Err(e) = self.scheduler.schedule(job) {
534            // If scheduler returns error, senders in the job will be dropped and waiters
535            // can get recv errors.
536            error!(e; "Failed to schedule flush job for region {}", region_id);
537
538            // Remove from region status if we can't submit the task.
539            self.region_status.remove(&region_id);
540            return Err(e);
541        }
542
543        flush_status.flushing = true;
544
545        Ok(())
546    }
547
548    /// Notifies the scheduler that the flush job is finished.
549    ///
550    /// Returns all pending requests if the region doesn't need to flush again.
551    pub(crate) fn on_flush_success(
552        &mut self,
553        region_id: RegionId,
554    ) -> Option<(
555        Vec<SenderDdlRequest>,
556        Vec<SenderWriteRequest>,
557        Vec<SenderBulkRequest>,
558    )> {
559        let flush_status = self.region_status.get_mut(&region_id)?;
560
561        // This region doesn't have running flush job.
562        flush_status.flushing = false;
563
564        let pending_requests = if flush_status.pending_task.is_none() {
565            // The region doesn't have any pending flush task.
566            // Safety: The flush status must exist.
567            let flush_status = self.region_status.remove(&region_id).unwrap();
568            Some((
569                flush_status.pending_ddls,
570                flush_status.pending_writes,
571                flush_status.pending_bulk_writes,
572            ))
573        } else {
574            let version_data = flush_status.version_control.current();
575            if version_data.version.memtables.is_empty() {
576                // The region has nothing to flush, we also need to remove it from the status.
577                // Safety: The pending task is not None.
578                let task = flush_status.pending_task.take().unwrap();
579                // The region has nothing to flush. We can notify pending task.
580                task.on_success();
581                // `schedule_next_flush()` may pick up the same region to flush, so we must remove
582                // it from the status to avoid leaking pending requests.
583                // Safety: The flush status must exist.
584                let flush_status = self.region_status.remove(&region_id).unwrap();
585                Some((
586                    flush_status.pending_ddls,
587                    flush_status.pending_writes,
588                    flush_status.pending_bulk_writes,
589                ))
590            } else {
591                // We can flush the region again, keep it in the region status.
592                None
593            }
594        };
595
596        // Schedule next flush job.
597        if let Err(e) = self.schedule_next_flush() {
598            error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
599        }
600
601        pending_requests
602    }
603
604    /// Notifies the scheduler that the flush job is failed.
605    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
606        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
607
608        FLUSH_FAILURE_TOTAL.inc();
609
610        // Remove this region.
611        let Some(flush_status) = self.region_status.remove(&region_id) else {
612            return;
613        };
614
615        // Fast fail: cancels all pending tasks and sends error to their waiters.
616        flush_status.on_failure(err);
617
618        // Still tries to schedule a new flush.
619        if let Err(e) = self.schedule_next_flush() {
620            error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
621        }
622    }
623
624    /// Notifies the scheduler that the region is dropped.
625    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
626        self.remove_region_on_failure(
627            region_id,
628            Arc::new(RegionDroppedSnafu { region_id }.build()),
629        );
630    }
631
632    /// Notifies the scheduler that the region is closed.
633    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
634        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
635    }
636
637    /// Notifies the scheduler that the region is truncated.
638    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
639        self.remove_region_on_failure(
640            region_id,
641            Arc::new(RegionTruncatedSnafu { region_id }.build()),
642        );
643    }
644
645    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
646        // Remove this region.
647        let Some(flush_status) = self.region_status.remove(&region_id) else {
648            return;
649        };
650
651        // Notifies all pending tasks.
652        flush_status.on_failure(err);
653    }
654
655    /// Add ddl request to pending queue.
656    ///
657    /// # Panics
658    /// Panics if region didn't request flush.
659    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
660        let status = self.region_status.get_mut(&request.region_id).unwrap();
661        status.pending_ddls.push(request);
662    }
663
664    /// Add write request to pending queue.
665    ///
666    /// # Panics
667    /// Panics if region didn't request flush.
668    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
669        let status = self
670            .region_status
671            .get_mut(&request.request.region_id)
672            .unwrap();
673        status.pending_writes.push(request);
674    }
675
676    /// Add bulk write request to pending queue.
677    ///
678    /// # Panics
679    /// Panics if region didn't request flush.
680    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
681        let status = self.region_status.get_mut(&request.region_id).unwrap();
682        status.pending_bulk_writes.push(request);
683    }
684
685    /// Returns true if the region has pending DDLs.
686    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
687        self.region_status
688            .get(&region_id)
689            .map(|status| !status.pending_ddls.is_empty())
690            .unwrap_or(false)
691    }
692
693    /// Schedules a new flush task when the scheduler can submit next task.
694    pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
695        debug_assert!(self
696            .region_status
697            .values()
698            .all(|status| status.flushing || status.pending_task.is_some()));
699
700        // Get the first region from status map.
701        let Some(flush_status) = self
702            .region_status
703            .values_mut()
704            .find(|status| status.pending_task.is_some())
705        else {
706            return Ok(());
707        };
708        debug_assert!(!flush_status.flushing);
709        let task = flush_status.pending_task.take().unwrap();
710        let region_id = flush_status.region_id;
711        let version_control = flush_status.version_control.clone();
712
713        self.schedule_flush(region_id, &version_control, task)
714    }
715}
716
717impl Drop for FlushScheduler {
718    fn drop(&mut self) {
719        for (region_id, flush_status) in self.region_status.drain() {
720            // We are shutting down so notify all pending tasks.
721            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
722        }
723    }
724}
725
726/// Flush status of a region scheduled by the [FlushScheduler].
727///
728/// Tracks running and pending flush tasks and all pending requests of a region.
729struct FlushStatus {
730    /// Current region.
731    region_id: RegionId,
732    /// Version control of the region.
733    version_control: VersionControlRef,
734    /// There is a flush task running.
735    ///
736    /// It is possible that a region is not flushing but has pending task if the scheduler
737    /// doesn't schedules this region.
738    flushing: bool,
739    /// Task waiting for next flush.
740    pending_task: Option<RegionFlushTask>,
741    /// Pending ddl requests.
742    pending_ddls: Vec<SenderDdlRequest>,
743    /// Requests waiting to write after altering the region.
744    pending_writes: Vec<SenderWriteRequest>,
745    /// Bulk requests waiting to write after altering the region.
746    pending_bulk_writes: Vec<SenderBulkRequest>,
747}
748
749impl FlushStatus {
750    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
751        FlushStatus {
752            region_id,
753            version_control,
754            flushing: false,
755            pending_task: None,
756            pending_ddls: Vec::new(),
757            pending_writes: Vec::new(),
758            pending_bulk_writes: Vec::new(),
759        }
760    }
761
762    /// Merges the task to pending task.
763    fn merge_task(&mut self, task: RegionFlushTask) {
764        if let Some(pending) = &mut self.pending_task {
765            pending.merge(task);
766        } else {
767            self.pending_task = Some(task);
768        }
769    }
770
771    fn on_failure(self, err: Arc<Error>) {
772        if let Some(mut task) = self.pending_task {
773            task.on_failure(err.clone());
774        }
775        for ddl in self.pending_ddls {
776            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
777                region_id: self.region_id,
778            }));
779        }
780        for write_req in self.pending_writes {
781            write_req
782                .sender
783                .send(Err(err.clone()).context(FlushRegionSnafu {
784                    region_id: self.region_id,
785                }));
786        }
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use tokio::sync::oneshot;
793
794    use super::*;
795    use crate::cache::CacheManager;
796    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
797    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
798    use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
799
800    #[test]
801    fn test_get_mutable_limit() {
802        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
803        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
804        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
805        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
806    }
807
808    #[test]
809    fn test_over_mutable_limit() {
810        // Mutable limit is 500.
811        let manager = WriteBufferManagerImpl::new(1000);
812        manager.reserve_mem(400);
813        assert!(!manager.should_flush_engine());
814        assert!(!manager.should_stall());
815
816        // More than mutable limit.
817        manager.reserve_mem(400);
818        assert!(manager.should_flush_engine());
819
820        // Freezes mutable.
821        manager.schedule_free_mem(400);
822        assert!(!manager.should_flush_engine());
823        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
824        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
825
826        // Releases immutable.
827        manager.free_mem(400);
828        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
829        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
830    }
831
832    #[test]
833    fn test_over_global() {
834        // Mutable limit is 500.
835        let manager = WriteBufferManagerImpl::new(1000);
836        manager.reserve_mem(1100);
837        assert!(manager.should_stall());
838        // Global usage is still 1100.
839        manager.schedule_free_mem(200);
840        assert!(manager.should_flush_engine());
841
842        // More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
843        manager.schedule_free_mem(450);
844        assert!(!manager.should_flush_engine());
845
846        // Now mutable is enough.
847        manager.reserve_mem(50);
848        assert!(manager.should_flush_engine());
849        manager.reserve_mem(100);
850        assert!(manager.should_flush_engine());
851    }
852
853    #[test]
854    fn test_manager_notify() {
855        let (sender, receiver) = watch::channel(());
856        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
857        manager.reserve_mem(500);
858        assert!(!receiver.has_changed().unwrap());
859        manager.schedule_free_mem(500);
860        assert!(!receiver.has_changed().unwrap());
861        manager.free_mem(500);
862        assert!(receiver.has_changed().unwrap());
863    }
864
865    #[tokio::test]
866    async fn test_schedule_empty() {
867        let env = SchedulerEnv::new().await;
868        let (tx, _rx) = mpsc::channel(4);
869        let mut scheduler = env.mock_flush_scheduler();
870        let builder = VersionControlBuilder::new();
871
872        let version_control = Arc::new(builder.build());
873        let (output_tx, output_rx) = oneshot::channel();
874        let mut task = RegionFlushTask {
875            region_id: builder.region_id(),
876            reason: FlushReason::Others,
877            senders: Vec::new(),
878            request_sender: tx,
879            access_layer: env.access_layer.clone(),
880            listener: WorkerListener::default(),
881            engine_config: Arc::new(MitoConfig::default()),
882            row_group_size: None,
883            cache_manager: Arc::new(CacheManager::default()),
884            manifest_ctx: env
885                .mock_manifest_context(version_control.current().version.metadata.clone())
886                .await,
887            index_options: IndexOptions::default(),
888        };
889        task.push_sender(OptionOutputTx::from(output_tx));
890        scheduler
891            .schedule_flush(builder.region_id(), &version_control, task)
892            .unwrap();
893        assert!(scheduler.region_status.is_empty());
894        let output = output_rx.await.unwrap().unwrap();
895        assert_eq!(output, 0);
896        assert!(scheduler.region_status.is_empty());
897    }
898
899    #[tokio::test]
900    async fn test_schedule_pending_request() {
901        let job_scheduler = Arc::new(VecScheduler::default());
902        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
903        let (tx, _rx) = mpsc::channel(4);
904        let mut scheduler = env.mock_flush_scheduler();
905        let mut builder = VersionControlBuilder::new();
906        // Overwrites the empty memtable builder.
907        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
908        let version_control = Arc::new(builder.build());
909        // Writes data to the memtable so it is not empty.
910        let version_data = version_control.current();
911        write_rows_to_version(&version_data.version, "host0", 0, 10);
912        let manifest_ctx = env
913            .mock_manifest_context(version_data.version.metadata.clone())
914            .await;
915        // Creates 3 tasks.
916        let mut tasks: Vec<_> = (0..3)
917            .map(|_| RegionFlushTask {
918                region_id: builder.region_id(),
919                reason: FlushReason::Others,
920                senders: Vec::new(),
921                request_sender: tx.clone(),
922                access_layer: env.access_layer.clone(),
923                listener: WorkerListener::default(),
924                engine_config: Arc::new(MitoConfig::default()),
925                row_group_size: None,
926                cache_manager: Arc::new(CacheManager::default()),
927                manifest_ctx: manifest_ctx.clone(),
928                index_options: IndexOptions::default(),
929            })
930            .collect();
931        // Schedule first task.
932        let task = tasks.pop().unwrap();
933        scheduler
934            .schedule_flush(builder.region_id(), &version_control, task)
935            .unwrap();
936        // Should schedule 1 flush.
937        assert_eq!(1, scheduler.region_status.len());
938        assert_eq!(1, job_scheduler.num_jobs());
939        // Check the new version.
940        let version_data = version_control.current();
941        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
942        // Schedule remaining tasks.
943        let output_rxs: Vec<_> = tasks
944            .into_iter()
945            .map(|mut task| {
946                let (output_tx, output_rx) = oneshot::channel();
947                task.push_sender(OptionOutputTx::from(output_tx));
948                scheduler
949                    .schedule_flush(builder.region_id(), &version_control, task)
950                    .unwrap();
951                output_rx
952            })
953            .collect();
954        // Assumes the flush job is finished.
955        version_control.apply_edit(
956            RegionEdit {
957                files_to_add: Vec::new(),
958                files_to_remove: Vec::new(),
959                compaction_time_window: None,
960                flushed_entry_id: None,
961                flushed_sequence: None,
962            },
963            &[0],
964            builder.file_purger(),
965        );
966        scheduler.on_flush_success(builder.region_id());
967        // No new flush task.
968        assert_eq!(1, job_scheduler.num_jobs());
969        // The flush status is cleared.
970        assert!(scheduler.region_status.is_empty());
971        for output_rx in output_rxs {
972            let output = output_rx.await.unwrap().unwrap();
973            assert_eq!(output, 0);
974        }
975    }
976}