mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32    Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::memtable::MemtableRanges;
36use crate::metrics::{
37    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
38    INFLIGHT_FLUSH_COUNT,
39};
40use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
41use crate::read::merge::MergeReaderBuilder;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Source;
44use crate::region::options::{IndexOptions, MergeMode};
45use crate::region::version::{VersionControlData, VersionControlRef};
46use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
47use crate::request::{
48    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
49    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
50};
51use crate::schedule::scheduler::{Job, SchedulerRef};
52use crate::sst::file::FileMeta;
53use crate::sst::parquet::WriteOptions;
54use crate::worker::WorkerListener;
55
56/// Global write buffer (memtable) manager.
57///
58/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
59pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
60    /// Returns whether to trigger the engine.
61    fn should_flush_engine(&self) -> bool;
62
63    /// Returns whether to stall write requests.
64    fn should_stall(&self) -> bool;
65
66    /// Reserves `mem` bytes.
67    fn reserve_mem(&self, mem: usize);
68
69    /// Tells the manager we are freeing `mem` bytes.
70    ///
71    /// We are in the process of freeing `mem` bytes, so it is not considered
72    /// when checking the soft limit.
73    fn schedule_free_mem(&self, mem: usize);
74
75    /// We have freed `mem` bytes.
76    fn free_mem(&self, mem: usize);
77
78    /// Returns the total memory used by memtables.
79    fn memory_usage(&self) -> usize;
80}
81
82pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
83
84/// Default [WriteBufferManager] implementation.
85///
86/// Inspired by RocksDB's WriteBufferManager.
87/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
88#[derive(Debug)]
89pub struct WriteBufferManagerImpl {
90    /// Write buffer size for the engine.
91    global_write_buffer_size: usize,
92    /// Mutable memtable memory size limit.
93    mutable_limit: usize,
94    /// Memory in used (e.g. used by mutable and immutable memtables).
95    memory_used: AtomicUsize,
96    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
97    memory_active: AtomicUsize,
98    /// Optional notifier.
99    /// The manager can wake up the worker once we free the write buffer.
100    notifier: Option<watch::Sender<()>>,
101}
102
103impl WriteBufferManagerImpl {
104    /// Returns a new manager with specific `global_write_buffer_size`.
105    pub fn new(global_write_buffer_size: usize) -> Self {
106        Self {
107            global_write_buffer_size,
108            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
109            memory_used: AtomicUsize::new(0),
110            memory_active: AtomicUsize::new(0),
111            notifier: None,
112        }
113    }
114
115    /// Attaches a notifier to the manager.
116    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
117        self.notifier = Some(notifier);
118        self
119    }
120
121    /// Returns memory usage of mutable memtables.
122    pub fn mutable_usage(&self) -> usize {
123        self.memory_active.load(Ordering::Relaxed)
124    }
125
126    /// Returns the size limit for mutable memtables.
127    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
128        // Reserves half of the write buffer for mutable memtable.
129        global_write_buffer_size / 2
130    }
131}
132
133impl WriteBufferManager for WriteBufferManagerImpl {
134    fn should_flush_engine(&self) -> bool {
135        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
136        if mutable_memtable_memory_usage > self.mutable_limit {
137            debug!(
138                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
139                mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
140            );
141            return true;
142        }
143
144        let memory_usage = self.memory_used.load(Ordering::Relaxed);
145        // If the memory exceeds the buffer size, we trigger more aggressive
146        // flush. But if already more than half memory is being flushed,
147        // triggering more flush may not help. We will hold it instead.
148        if memory_usage >= self.global_write_buffer_size {
149            if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
150                debug!(
151                "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
152                 mutable_usage: {}.",
153                memory_usage,
154                self.global_write_buffer_size,
155                mutable_memtable_memory_usage);
156                return true;
157            } else {
158                trace!(
159                    "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
160                    memory_usage,
161                    self.global_write_buffer_size,
162                    mutable_memtable_memory_usage);
163            }
164        }
165
166        false
167    }
168
169    fn should_stall(&self) -> bool {
170        self.memory_usage() >= self.global_write_buffer_size
171    }
172
173    fn reserve_mem(&self, mem: usize) {
174        self.memory_used.fetch_add(mem, Ordering::Relaxed);
175        self.memory_active.fetch_add(mem, Ordering::Relaxed);
176    }
177
178    fn schedule_free_mem(&self, mem: usize) {
179        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
180    }
181
182    fn free_mem(&self, mem: usize) {
183        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
184        if let Some(notifier) = &self.notifier {
185            // Notifies the worker after the memory usage is decreased. When we drop the memtable
186            // outside of the worker, the worker may still stall requests because the memory usage
187            // is not updated. So we need to notify the worker to handle stalled requests again.
188            let _ = notifier.send(());
189        }
190    }
191
192    fn memory_usage(&self) -> usize {
193        self.memory_used.load(Ordering::Relaxed)
194    }
195}
196
197/// Reason of a flush task.
198#[derive(Debug, IntoStaticStr)]
199pub enum FlushReason {
200    /// Other reasons.
201    Others,
202    /// Engine reaches flush threshold.
203    EngineFull,
204    /// Manual flush.
205    Manual,
206    /// Flush to alter table.
207    Alter,
208    /// Flush periodically.
209    Periodically,
210    /// Flush memtable during downgrading state.
211    Downgrading,
212}
213
214impl FlushReason {
215    /// Get flush reason as static str.
216    fn as_str(&self) -> &'static str {
217        self.into()
218    }
219}
220
221/// Task to flush a region.
222pub(crate) struct RegionFlushTask {
223    /// Region to flush.
224    pub(crate) region_id: RegionId,
225    /// Reason to flush.
226    pub(crate) reason: FlushReason,
227    /// Flush result senders.
228    pub(crate) senders: Vec<OutputTx>,
229    /// Request sender to notify the worker.
230    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
231
232    pub(crate) access_layer: AccessLayerRef,
233    pub(crate) listener: WorkerListener,
234    pub(crate) engine_config: Arc<MitoConfig>,
235    pub(crate) row_group_size: Option<usize>,
236    pub(crate) cache_manager: CacheManagerRef,
237    pub(crate) manifest_ctx: ManifestContextRef,
238
239    /// Index options for the region.
240    pub(crate) index_options: IndexOptions,
241}
242
243impl RegionFlushTask {
244    /// Push the sender if it is not none.
245    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
246        if let Some(sender) = sender.take_inner() {
247            self.senders.push(sender);
248        }
249    }
250
251    /// Consumes the task and notify the sender the job is success.
252    fn on_success(self) {
253        for sender in self.senders {
254            sender.send(Ok(0));
255        }
256    }
257
258    /// Send flush error to waiter.
259    fn on_failure(&mut self, err: Arc<Error>) {
260        for sender in self.senders.drain(..) {
261            sender.send(Err(err.clone()).context(FlushRegionSnafu {
262                region_id: self.region_id,
263            }));
264        }
265    }
266
267    /// Converts the flush task into a background job.
268    ///
269    /// We must call this in the region worker.
270    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
271        // Get a version of this region before creating a job to get current
272        // wal entry id, sequence and immutable memtables.
273        let version_data = version_control.current();
274
275        Box::pin(async move {
276            INFLIGHT_FLUSH_COUNT.inc();
277            self.do_flush(version_data).await;
278            INFLIGHT_FLUSH_COUNT.dec();
279        })
280    }
281
282    /// Runs the flush task.
283    async fn do_flush(&mut self, version_data: VersionControlData) {
284        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
285        self.listener.on_flush_begin(self.region_id).await;
286
287        let worker_request = match self.flush_memtables(&version_data).await {
288            Ok(edit) => {
289                let memtables_to_remove = version_data
290                    .version
291                    .memtables
292                    .immutables()
293                    .iter()
294                    .map(|m| m.id())
295                    .collect();
296                let flush_finished = FlushFinished {
297                    region_id: self.region_id,
298                    // The last entry has been flushed.
299                    flushed_entry_id: version_data.last_entry_id,
300                    senders: std::mem::take(&mut self.senders),
301                    _timer: timer,
302                    edit,
303                    memtables_to_remove,
304                };
305                WorkerRequest::Background {
306                    region_id: self.region_id,
307                    notify: BackgroundNotify::FlushFinished(flush_finished),
308                }
309            }
310            Err(e) => {
311                error!(e; "Failed to flush region {}", self.region_id);
312                // Discard the timer.
313                timer.stop_and_discard();
314
315                let err = Arc::new(e);
316                self.on_failure(err.clone());
317                WorkerRequest::Background {
318                    region_id: self.region_id,
319                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
320                }
321            }
322        };
323        self.send_worker_request(worker_request).await;
324    }
325
326    /// Flushes memtables to level 0 SSTs and updates the manifest.
327    /// Returns the [RegionEdit] to apply.
328    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
329        // We must use the immutable memtables list and entry ids from the `version_data`
330        // for consistency as others might already modify the version in the `version_control`.
331        let version = &version_data.version;
332        let timer = FLUSH_ELAPSED
333            .with_label_values(&["flush_memtables"])
334            .start_timer();
335
336        let mut write_opts = WriteOptions {
337            write_buffer_size: self.engine_config.sst_write_buffer_size,
338            ..Default::default()
339        };
340        if let Some(row_group_size) = self.row_group_size {
341            write_opts.row_group_size = row_group_size;
342        }
343
344        let memtables = version.memtables.immutables();
345        let mut file_metas = Vec::with_capacity(memtables.len());
346        let mut flushed_bytes = 0;
347        let mut series_count = 0;
348        let mut flush_metrics = Metrics::new(WriteType::Flush);
349        for mem in memtables {
350            if mem.is_empty() {
351                // Skip empty memtables.
352                continue;
353            }
354
355            let MemtableRanges { ranges, stats } =
356                mem.ranges(None, PredicateGroup::default(), None)?;
357
358            let max_sequence = stats.max_sequence();
359            series_count += stats.series_count();
360
361            let source = if ranges.len() == 1 {
362                let only_range = ranges.into_values().next().unwrap();
363                let iter = only_range.build_iter()?;
364                Source::Iter(iter)
365            } else {
366                // todo(hl): a workaround since sync version of MergeReader is wip.
367                let sources = ranges
368                    .into_values()
369                    .map(|r| r.build_iter().map(Source::Iter))
370                    .collect::<Result<Vec<_>>>()?;
371                let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
372                let maybe_dedup = if version.options.append_mode {
373                    // no dedup in append mode
374                    Box::new(merge_reader) as _
375                } else {
376                    // dedup according to merge mode
377                    match version.options.merge_mode.unwrap_or(MergeMode::LastRow) {
378                        MergeMode::LastRow => {
379                            Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
380                        }
381                        MergeMode::LastNonNull => {
382                            Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
383                        }
384                    }
385                };
386                Source::Reader(maybe_dedup)
387            };
388
389            // Flush to level 0.
390            let write_request = SstWriteRequest {
391                op_type: OperationType::Flush,
392                metadata: version.metadata.clone(),
393                source,
394                cache_manager: self.cache_manager.clone(),
395                storage: version.options.storage.clone(),
396                max_sequence: Some(max_sequence),
397                index_options: self.index_options.clone(),
398                inverted_index_config: self.engine_config.inverted_index.clone(),
399                fulltext_index_config: self.engine_config.fulltext_index.clone(),
400                bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
401            };
402
403            let (ssts_written, metrics) = self
404                .access_layer
405                .write_sst(write_request, &write_opts, WriteType::Flush)
406                .await?;
407            if ssts_written.is_empty() {
408                // No data written.
409                continue;
410            }
411            flush_metrics = flush_metrics.merge(metrics);
412
413            file_metas.extend(ssts_written.into_iter().map(|sst_info| {
414                flushed_bytes += sst_info.file_size;
415                FileMeta {
416                    region_id: self.region_id,
417                    file_id: sst_info.file_id,
418                    time_range: sst_info.time_range,
419                    level: 0,
420                    file_size: sst_info.file_size,
421                    available_indexes: sst_info.index_metadata.build_available_indexes(),
422                    index_file_size: sst_info.index_metadata.file_size,
423                    num_rows: sst_info.num_rows as u64,
424                    num_row_groups: sst_info.num_row_groups,
425                    sequence: NonZeroU64::new(max_sequence),
426                }
427            }));
428        }
429
430        if !file_metas.is_empty() {
431            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
432        }
433
434        let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
435        info!(
436            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}",
437            self.region_id,
438            self.reason.as_str(),
439            file_ids,
440            series_count,
441            timer.stop_and_record(),
442            flush_metrics,
443        );
444        flush_metrics.observe();
445
446        let edit = RegionEdit {
447            files_to_add: file_metas,
448            files_to_remove: Vec::new(),
449            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
450            compaction_time_window: None,
451            // The last entry has been flushed.
452            flushed_entry_id: Some(version_data.last_entry_id),
453            flushed_sequence: Some(version_data.committed_sequence),
454        };
455        info!("Applying {edit:?} to region {}", self.region_id);
456
457        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
458
459        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
460            RegionLeaderState::Downgrading
461        } else {
462            // Check if region is in staging mode
463            let current_state = self.manifest_ctx.current_state();
464            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
465                RegionLeaderState::Staging
466            } else {
467                RegionLeaderState::Writable
468            }
469        };
470        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
471        // add a cleanup job to remove them later.
472        let version = self
473            .manifest_ctx
474            .update_manifest(expected_state, action_list)
475            .await?;
476        info!(
477            "Successfully update manifest version to {version}, region: {}, reason: {}",
478            self.region_id,
479            self.reason.as_str()
480        );
481
482        Ok(edit)
483    }
484
485    /// Notify flush job status.
486    async fn send_worker_request(&self, request: WorkerRequest) {
487        if let Err(e) = self
488            .request_sender
489            .send(WorkerRequestWithTime::new(request))
490            .await
491        {
492            error!(
493                "Failed to notify flush job status for region {}, request: {:?}",
494                self.region_id, e.0
495            );
496        }
497    }
498
499    /// Merge two flush tasks.
500    fn merge(&mut self, mut other: RegionFlushTask) {
501        assert_eq!(self.region_id, other.region_id);
502        // Now we only merge senders. They share the same flush reason.
503        self.senders.append(&mut other.senders);
504    }
505}
506
507/// Manages background flushes of a worker.
508pub(crate) struct FlushScheduler {
509    /// Tracks regions need to flush.
510    region_status: HashMap<RegionId, FlushStatus>,
511    /// Background job scheduler.
512    scheduler: SchedulerRef,
513}
514
515impl FlushScheduler {
516    /// Creates a new flush scheduler.
517    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
518        FlushScheduler {
519            region_status: HashMap::new(),
520            scheduler,
521        }
522    }
523
524    /// Returns true if the region already requested flush.
525    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
526        self.region_status.contains_key(&region_id)
527    }
528
529    /// Schedules a flush `task` for specific `region`.
530    pub(crate) fn schedule_flush(
531        &mut self,
532        region_id: RegionId,
533        version_control: &VersionControlRef,
534        task: RegionFlushTask,
535    ) -> Result<()> {
536        debug_assert_eq!(region_id, task.region_id);
537
538        let version = version_control.current().version;
539        if version.memtables.is_empty() {
540            debug_assert!(!self.region_status.contains_key(&region_id));
541            // The region has nothing to flush.
542            task.on_success();
543            return Ok(());
544        }
545
546        // Don't increase the counter if a region has nothing to flush.
547        FLUSH_REQUESTS_TOTAL
548            .with_label_values(&[task.reason.as_str()])
549            .inc();
550
551        // Add this region to status map.
552        let flush_status = self
553            .region_status
554            .entry(region_id)
555            .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
556        // Checks whether we can flush the region now.
557        if flush_status.flushing {
558            // There is already a flush job running.
559            flush_status.merge_task(task);
560            return Ok(());
561        }
562
563        // TODO(yingwen): We can merge with pending and execute directly.
564        // If there are pending tasks, then we should push it to pending list.
565        if flush_status.pending_task.is_some() {
566            flush_status.merge_task(task);
567            return Ok(());
568        }
569
570        // Now we can flush the region directly.
571        if let Err(e) = version_control.freeze_mutable() {
572            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
573
574            // Remove from region status if we can't freeze the mutable memtable.
575            self.region_status.remove(&region_id);
576            return Err(e);
577        }
578        // Submit a flush job.
579        let job = task.into_flush_job(version_control);
580        if let Err(e) = self.scheduler.schedule(job) {
581            // If scheduler returns error, senders in the job will be dropped and waiters
582            // can get recv errors.
583            error!(e; "Failed to schedule flush job for region {}", region_id);
584
585            // Remove from region status if we can't submit the task.
586            self.region_status.remove(&region_id);
587            return Err(e);
588        }
589
590        flush_status.flushing = true;
591
592        Ok(())
593    }
594
595    /// Notifies the scheduler that the flush job is finished.
596    ///
597    /// Returns all pending requests if the region doesn't need to flush again.
598    pub(crate) fn on_flush_success(
599        &mut self,
600        region_id: RegionId,
601    ) -> Option<(
602        Vec<SenderDdlRequest>,
603        Vec<SenderWriteRequest>,
604        Vec<SenderBulkRequest>,
605    )> {
606        let flush_status = self.region_status.get_mut(&region_id)?;
607
608        // This region doesn't have running flush job.
609        flush_status.flushing = false;
610
611        let pending_requests = if flush_status.pending_task.is_none() {
612            // The region doesn't have any pending flush task.
613            // Safety: The flush status must exist.
614            let flush_status = self.region_status.remove(&region_id).unwrap();
615            Some((
616                flush_status.pending_ddls,
617                flush_status.pending_writes,
618                flush_status.pending_bulk_writes,
619            ))
620        } else {
621            let version_data = flush_status.version_control.current();
622            if version_data.version.memtables.is_empty() {
623                // The region has nothing to flush, we also need to remove it from the status.
624                // Safety: The pending task is not None.
625                let task = flush_status.pending_task.take().unwrap();
626                // The region has nothing to flush. We can notify pending task.
627                task.on_success();
628                // `schedule_next_flush()` may pick up the same region to flush, so we must remove
629                // it from the status to avoid leaking pending requests.
630                // Safety: The flush status must exist.
631                let flush_status = self.region_status.remove(&region_id).unwrap();
632                Some((
633                    flush_status.pending_ddls,
634                    flush_status.pending_writes,
635                    flush_status.pending_bulk_writes,
636                ))
637            } else {
638                // We can flush the region again, keep it in the region status.
639                None
640            }
641        };
642
643        // Schedule next flush job.
644        if let Err(e) = self.schedule_next_flush() {
645            error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
646        }
647
648        pending_requests
649    }
650
651    /// Notifies the scheduler that the flush job is failed.
652    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
653        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
654
655        FLUSH_FAILURE_TOTAL.inc();
656
657        // Remove this region.
658        let Some(flush_status) = self.region_status.remove(&region_id) else {
659            return;
660        };
661
662        // Fast fail: cancels all pending tasks and sends error to their waiters.
663        flush_status.on_failure(err);
664
665        // Still tries to schedule a new flush.
666        if let Err(e) = self.schedule_next_flush() {
667            error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
668        }
669    }
670
671    /// Notifies the scheduler that the region is dropped.
672    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
673        self.remove_region_on_failure(
674            region_id,
675            Arc::new(RegionDroppedSnafu { region_id }.build()),
676        );
677    }
678
679    /// Notifies the scheduler that the region is closed.
680    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
681        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
682    }
683
684    /// Notifies the scheduler that the region is truncated.
685    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
686        self.remove_region_on_failure(
687            region_id,
688            Arc::new(RegionTruncatedSnafu { region_id }.build()),
689        );
690    }
691
692    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
693        // Remove this region.
694        let Some(flush_status) = self.region_status.remove(&region_id) else {
695            return;
696        };
697
698        // Notifies all pending tasks.
699        flush_status.on_failure(err);
700    }
701
702    /// Add ddl request to pending queue.
703    ///
704    /// # Panics
705    /// Panics if region didn't request flush.
706    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
707        let status = self.region_status.get_mut(&request.region_id).unwrap();
708        status.pending_ddls.push(request);
709    }
710
711    /// Add write request to pending queue.
712    ///
713    /// # Panics
714    /// Panics if region didn't request flush.
715    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
716        let status = self
717            .region_status
718            .get_mut(&request.request.region_id)
719            .unwrap();
720        status.pending_writes.push(request);
721    }
722
723    /// Add bulk write request to pending queue.
724    ///
725    /// # Panics
726    /// Panics if region didn't request flush.
727    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
728        let status = self.region_status.get_mut(&request.region_id).unwrap();
729        status.pending_bulk_writes.push(request);
730    }
731
732    /// Returns true if the region has pending DDLs.
733    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
734        self.region_status
735            .get(&region_id)
736            .map(|status| !status.pending_ddls.is_empty())
737            .unwrap_or(false)
738    }
739
740    /// Schedules a new flush task when the scheduler can submit next task.
741    pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
742        debug_assert!(self
743            .region_status
744            .values()
745            .all(|status| status.flushing || status.pending_task.is_some()));
746
747        // Get the first region from status map.
748        let Some(flush_status) = self
749            .region_status
750            .values_mut()
751            .find(|status| status.pending_task.is_some())
752        else {
753            return Ok(());
754        };
755        debug_assert!(!flush_status.flushing);
756        let task = flush_status.pending_task.take().unwrap();
757        let region_id = flush_status.region_id;
758        let version_control = flush_status.version_control.clone();
759
760        self.schedule_flush(region_id, &version_control, task)
761    }
762}
763
764impl Drop for FlushScheduler {
765    fn drop(&mut self) {
766        for (region_id, flush_status) in self.region_status.drain() {
767            // We are shutting down so notify all pending tasks.
768            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
769        }
770    }
771}
772
773/// Flush status of a region scheduled by the [FlushScheduler].
774///
775/// Tracks running and pending flush tasks and all pending requests of a region.
776struct FlushStatus {
777    /// Current region.
778    region_id: RegionId,
779    /// Version control of the region.
780    version_control: VersionControlRef,
781    /// There is a flush task running.
782    ///
783    /// It is possible that a region is not flushing but has pending task if the scheduler
784    /// doesn't schedules this region.
785    flushing: bool,
786    /// Task waiting for next flush.
787    pending_task: Option<RegionFlushTask>,
788    /// Pending ddl requests.
789    pending_ddls: Vec<SenderDdlRequest>,
790    /// Requests waiting to write after altering the region.
791    pending_writes: Vec<SenderWriteRequest>,
792    /// Bulk requests waiting to write after altering the region.
793    pending_bulk_writes: Vec<SenderBulkRequest>,
794}
795
796impl FlushStatus {
797    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
798        FlushStatus {
799            region_id,
800            version_control,
801            flushing: false,
802            pending_task: None,
803            pending_ddls: Vec::new(),
804            pending_writes: Vec::new(),
805            pending_bulk_writes: Vec::new(),
806        }
807    }
808
809    /// Merges the task to pending task.
810    fn merge_task(&mut self, task: RegionFlushTask) {
811        if let Some(pending) = &mut self.pending_task {
812            pending.merge(task);
813        } else {
814            self.pending_task = Some(task);
815        }
816    }
817
818    fn on_failure(self, err: Arc<Error>) {
819        if let Some(mut task) = self.pending_task {
820            task.on_failure(err.clone());
821        }
822        for ddl in self.pending_ddls {
823            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
824                region_id: self.region_id,
825            }));
826        }
827        for write_req in self.pending_writes {
828            write_req
829                .sender
830                .send(Err(err.clone()).context(FlushRegionSnafu {
831                    region_id: self.region_id,
832                }));
833        }
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use tokio::sync::oneshot;
840
841    use super::*;
842    use crate::cache::CacheManager;
843    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
844    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
845    use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
846
847    #[test]
848    fn test_get_mutable_limit() {
849        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
850        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
851        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
852        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
853    }
854
855    #[test]
856    fn test_over_mutable_limit() {
857        // Mutable limit is 500.
858        let manager = WriteBufferManagerImpl::new(1000);
859        manager.reserve_mem(400);
860        assert!(!manager.should_flush_engine());
861        assert!(!manager.should_stall());
862
863        // More than mutable limit.
864        manager.reserve_mem(400);
865        assert!(manager.should_flush_engine());
866
867        // Freezes mutable.
868        manager.schedule_free_mem(400);
869        assert!(!manager.should_flush_engine());
870        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
871        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
872
873        // Releases immutable.
874        manager.free_mem(400);
875        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
876        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
877    }
878
879    #[test]
880    fn test_over_global() {
881        // Mutable limit is 500.
882        let manager = WriteBufferManagerImpl::new(1000);
883        manager.reserve_mem(1100);
884        assert!(manager.should_stall());
885        // Global usage is still 1100.
886        manager.schedule_free_mem(200);
887        assert!(manager.should_flush_engine());
888
889        // More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
890        manager.schedule_free_mem(450);
891        assert!(!manager.should_flush_engine());
892
893        // Now mutable is enough.
894        manager.reserve_mem(50);
895        assert!(manager.should_flush_engine());
896        manager.reserve_mem(100);
897        assert!(manager.should_flush_engine());
898    }
899
900    #[test]
901    fn test_manager_notify() {
902        let (sender, receiver) = watch::channel(());
903        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
904        manager.reserve_mem(500);
905        assert!(!receiver.has_changed().unwrap());
906        manager.schedule_free_mem(500);
907        assert!(!receiver.has_changed().unwrap());
908        manager.free_mem(500);
909        assert!(receiver.has_changed().unwrap());
910    }
911
912    #[tokio::test]
913    async fn test_schedule_empty() {
914        let env = SchedulerEnv::new().await;
915        let (tx, _rx) = mpsc::channel(4);
916        let mut scheduler = env.mock_flush_scheduler();
917        let builder = VersionControlBuilder::new();
918
919        let version_control = Arc::new(builder.build());
920        let (output_tx, output_rx) = oneshot::channel();
921        let mut task = RegionFlushTask {
922            region_id: builder.region_id(),
923            reason: FlushReason::Others,
924            senders: Vec::new(),
925            request_sender: tx,
926            access_layer: env.access_layer.clone(),
927            listener: WorkerListener::default(),
928            engine_config: Arc::new(MitoConfig::default()),
929            row_group_size: None,
930            cache_manager: Arc::new(CacheManager::default()),
931            manifest_ctx: env
932                .mock_manifest_context(version_control.current().version.metadata.clone())
933                .await,
934            index_options: IndexOptions::default(),
935        };
936        task.push_sender(OptionOutputTx::from(output_tx));
937        scheduler
938            .schedule_flush(builder.region_id(), &version_control, task)
939            .unwrap();
940        assert!(scheduler.region_status.is_empty());
941        let output = output_rx.await.unwrap().unwrap();
942        assert_eq!(output, 0);
943        assert!(scheduler.region_status.is_empty());
944    }
945
946    #[tokio::test]
947    async fn test_schedule_pending_request() {
948        let job_scheduler = Arc::new(VecScheduler::default());
949        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
950        let (tx, _rx) = mpsc::channel(4);
951        let mut scheduler = env.mock_flush_scheduler();
952        let mut builder = VersionControlBuilder::new();
953        // Overwrites the empty memtable builder.
954        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
955        let version_control = Arc::new(builder.build());
956        // Writes data to the memtable so it is not empty.
957        let version_data = version_control.current();
958        write_rows_to_version(&version_data.version, "host0", 0, 10);
959        let manifest_ctx = env
960            .mock_manifest_context(version_data.version.metadata.clone())
961            .await;
962        // Creates 3 tasks.
963        let mut tasks: Vec<_> = (0..3)
964            .map(|_| RegionFlushTask {
965                region_id: builder.region_id(),
966                reason: FlushReason::Others,
967                senders: Vec::new(),
968                request_sender: tx.clone(),
969                access_layer: env.access_layer.clone(),
970                listener: WorkerListener::default(),
971                engine_config: Arc::new(MitoConfig::default()),
972                row_group_size: None,
973                cache_manager: Arc::new(CacheManager::default()),
974                manifest_ctx: manifest_ctx.clone(),
975                index_options: IndexOptions::default(),
976            })
977            .collect();
978        // Schedule first task.
979        let task = tasks.pop().unwrap();
980        scheduler
981            .schedule_flush(builder.region_id(), &version_control, task)
982            .unwrap();
983        // Should schedule 1 flush.
984        assert_eq!(1, scheduler.region_status.len());
985        assert_eq!(1, job_scheduler.num_jobs());
986        // Check the new version.
987        let version_data = version_control.current();
988        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
989        // Schedule remaining tasks.
990        let output_rxs: Vec<_> = tasks
991            .into_iter()
992            .map(|mut task| {
993                let (output_tx, output_rx) = oneshot::channel();
994                task.push_sender(OptionOutputTx::from(output_tx));
995                scheduler
996                    .schedule_flush(builder.region_id(), &version_control, task)
997                    .unwrap();
998                output_rx
999            })
1000            .collect();
1001        // Assumes the flush job is finished.
1002        version_control.apply_edit(
1003            RegionEdit {
1004                files_to_add: Vec::new(),
1005                files_to_remove: Vec::new(),
1006                timestamp_ms: None,
1007                compaction_time_window: None,
1008                flushed_entry_id: None,
1009                flushed_sequence: None,
1010            },
1011            &[0],
1012            builder.file_purger(),
1013        );
1014        scheduler.on_flush_success(builder.region_id());
1015        // No new flush task.
1016        assert_eq!(1, job_scheduler.num_jobs());
1017        // The flush status is cleared.
1018        assert!(scheduler.region_status.is_empty());
1019        for output_rx in output_rxs {
1020            let output = output_rx.await.unwrap().unwrap();
1021            assert_eq!(output, 0);
1022        }
1023    }
1024}