mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44    BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48    INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68/// Global write buffer (memtable) manager.
69///
70/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
71pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72    /// Returns whether to trigger the engine.
73    fn should_flush_engine(&self) -> bool;
74
75    /// Returns whether to stall write requests.
76    fn should_stall(&self) -> bool;
77
78    /// Reserves `mem` bytes.
79    fn reserve_mem(&self, mem: usize);
80
81    /// Tells the manager we are freeing `mem` bytes.
82    ///
83    /// We are in the process of freeing `mem` bytes, so it is not considered
84    /// when checking the soft limit.
85    fn schedule_free_mem(&self, mem: usize);
86
87    /// We have freed `mem` bytes.
88    fn free_mem(&self, mem: usize);
89
90    /// Returns the total memory used by memtables.
91    fn memory_usage(&self) -> usize;
92}
93
94pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
95
96/// Default [WriteBufferManager] implementation.
97///
98/// Inspired by RocksDB's WriteBufferManager.
99/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
100#[derive(Debug)]
101pub struct WriteBufferManagerImpl {
102    /// Write buffer size for the engine.
103    global_write_buffer_size: usize,
104    /// Mutable memtable memory size limit.
105    mutable_limit: usize,
106    /// Memory in used (e.g. used by mutable and immutable memtables).
107    memory_used: AtomicUsize,
108    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
109    memory_active: AtomicUsize,
110    /// Optional notifier.
111    /// The manager can wake up the worker once we free the write buffer.
112    notifier: Option<watch::Sender<()>>,
113}
114
115impl WriteBufferManagerImpl {
116    /// Returns a new manager with specific `global_write_buffer_size`.
117    pub fn new(global_write_buffer_size: usize) -> Self {
118        Self {
119            global_write_buffer_size,
120            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
121            memory_used: AtomicUsize::new(0),
122            memory_active: AtomicUsize::new(0),
123            notifier: None,
124        }
125    }
126
127    /// Attaches a notifier to the manager.
128    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
129        self.notifier = Some(notifier);
130        self
131    }
132
133    /// Returns memory usage of mutable memtables.
134    pub fn mutable_usage(&self) -> usize {
135        self.memory_active.load(Ordering::Relaxed)
136    }
137
138    /// Returns the size limit for mutable memtables.
139    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
140        // Reserves half of the write buffer for mutable memtable.
141        global_write_buffer_size / 2
142    }
143}
144
145impl WriteBufferManager for WriteBufferManagerImpl {
146    fn should_flush_engine(&self) -> bool {
147        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
148        if mutable_memtable_memory_usage > self.mutable_limit {
149            debug!(
150                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
151                mutable_memtable_memory_usage,
152                self.memory_usage(),
153                self.mutable_limit,
154                self.global_write_buffer_size,
155            );
156            return true;
157        }
158
159        let memory_usage = self.memory_used.load(Ordering::Relaxed);
160        // If the memory exceeds the buffer size, we trigger more aggressive
161        // flush. But if already more than half memory is being flushed,
162        // triggering more flush may not help. We will hold it instead.
163        if memory_usage >= self.global_write_buffer_size {
164            if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
165                debug!(
166                    "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
167                 mutable_usage: {}.",
168                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
169                );
170                return true;
171            } else {
172                trace!(
173                    "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
174                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
175                );
176            }
177        }
178
179        false
180    }
181
182    fn should_stall(&self) -> bool {
183        self.memory_usage() >= self.global_write_buffer_size
184    }
185
186    fn reserve_mem(&self, mem: usize) {
187        self.memory_used.fetch_add(mem, Ordering::Relaxed);
188        self.memory_active.fetch_add(mem, Ordering::Relaxed);
189    }
190
191    fn schedule_free_mem(&self, mem: usize) {
192        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
193    }
194
195    fn free_mem(&self, mem: usize) {
196        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
197        if let Some(notifier) = &self.notifier {
198            // Notifies the worker after the memory usage is decreased. When we drop the memtable
199            // outside of the worker, the worker may still stall requests because the memory usage
200            // is not updated. So we need to notify the worker to handle stalled requests again.
201            let _ = notifier.send(());
202        }
203    }
204
205    fn memory_usage(&self) -> usize {
206        self.memory_used.load(Ordering::Relaxed)
207    }
208}
209
210/// Reason of a flush task.
211#[derive(Debug, IntoStaticStr)]
212pub enum FlushReason {
213    /// Other reasons.
214    Others,
215    /// Engine reaches flush threshold.
216    EngineFull,
217    /// Manual flush.
218    Manual,
219    /// Flush to alter table.
220    Alter,
221    /// Flush periodically.
222    Periodically,
223    /// Flush memtable during downgrading state.
224    Downgrading,
225}
226
227impl FlushReason {
228    /// Get flush reason as static str.
229    fn as_str(&self) -> &'static str {
230        self.into()
231    }
232}
233
234/// Task to flush a region.
235pub(crate) struct RegionFlushTask {
236    /// Region to flush.
237    pub(crate) region_id: RegionId,
238    /// Reason to flush.
239    pub(crate) reason: FlushReason,
240    /// Flush result senders.
241    pub(crate) senders: Vec<OutputTx>,
242    /// Request sender to notify the worker.
243    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245    pub(crate) access_layer: AccessLayerRef,
246    pub(crate) listener: WorkerListener,
247    pub(crate) engine_config: Arc<MitoConfig>,
248    pub(crate) row_group_size: Option<usize>,
249    pub(crate) cache_manager: CacheManagerRef,
250    pub(crate) manifest_ctx: ManifestContextRef,
251
252    /// Index options for the region.
253    pub(crate) index_options: IndexOptions,
254    /// Semaphore to control flush concurrency.
255    pub(crate) flush_semaphore: Arc<Semaphore>,
256}
257
258impl RegionFlushTask {
259    /// Push the sender if it is not none.
260    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
261        if let Some(sender) = sender.take_inner() {
262            self.senders.push(sender);
263        }
264    }
265
266    /// Consumes the task and notify the sender the job is success.
267    fn on_success(self) {
268        for sender in self.senders {
269            sender.send(Ok(0));
270        }
271    }
272
273    /// Send flush error to waiter.
274    fn on_failure(&mut self, err: Arc<Error>) {
275        for sender in self.senders.drain(..) {
276            sender.send(Err(err.clone()).context(FlushRegionSnafu {
277                region_id: self.region_id,
278            }));
279        }
280    }
281
282    /// Converts the flush task into a background job.
283    ///
284    /// We must call this in the region worker.
285    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
286        // Get a version of this region before creating a job to get current
287        // wal entry id, sequence and immutable memtables.
288        let version_data = version_control.current();
289
290        Box::pin(async move {
291            INFLIGHT_FLUSH_COUNT.inc();
292            self.do_flush(version_data).await;
293            INFLIGHT_FLUSH_COUNT.dec();
294        })
295    }
296
297    /// Runs the flush task.
298    async fn do_flush(&mut self, version_data: VersionControlData) {
299        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
300        self.listener.on_flush_begin(self.region_id).await;
301
302        let worker_request = match self.flush_memtables(&version_data).await {
303            Ok(edit) => {
304                let memtables_to_remove = version_data
305                    .version
306                    .memtables
307                    .immutables()
308                    .iter()
309                    .map(|m| m.id())
310                    .collect();
311                let flush_finished = FlushFinished {
312                    region_id: self.region_id,
313                    // The last entry has been flushed.
314                    flushed_entry_id: version_data.last_entry_id,
315                    senders: std::mem::take(&mut self.senders),
316                    _timer: timer,
317                    edit,
318                    memtables_to_remove,
319                };
320                WorkerRequest::Background {
321                    region_id: self.region_id,
322                    notify: BackgroundNotify::FlushFinished(flush_finished),
323                }
324            }
325            Err(e) => {
326                error!(e; "Failed to flush region {}", self.region_id);
327                // Discard the timer.
328                timer.stop_and_discard();
329
330                let err = Arc::new(e);
331                self.on_failure(err.clone());
332                WorkerRequest::Background {
333                    region_id: self.region_id,
334                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335                }
336            }
337        };
338        self.send_worker_request(worker_request).await;
339    }
340
341    /// Flushes memtables to level 0 SSTs and updates the manifest.
342    /// Returns the [RegionEdit] to apply.
343    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344        // We must use the immutable memtables list and entry ids from the `version_data`
345        // for consistency as others might already modify the version in the `version_control`.
346        let version = &version_data.version;
347        let timer = FLUSH_ELAPSED
348            .with_label_values(&["flush_memtables"])
349            .start_timer();
350
351        let mut write_opts = WriteOptions {
352            write_buffer_size: self.engine_config.sst_write_buffer_size,
353            ..Default::default()
354        };
355        if let Some(row_group_size) = self.row_group_size {
356            write_opts.row_group_size = row_group_size;
357        }
358
359        let DoFlushMemtablesResult {
360            file_metas,
361            flushed_bytes,
362            series_count,
363            flush_metrics,
364        } = self.do_flush_memtables(version, write_opts).await?;
365
366        if !file_metas.is_empty() {
367            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368        }
369
370        let mut file_ids = Vec::with_capacity(file_metas.len());
371        let mut total_rows = 0;
372        let mut total_bytes = 0;
373        for meta in &file_metas {
374            file_ids.push(meta.file_id);
375            total_rows += meta.num_rows;
376            total_bytes += meta.file_size;
377        }
378        info!(
379            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380            self.region_id,
381            self.reason.as_str(),
382            file_ids,
383            series_count,
384            total_rows,
385            total_bytes,
386            timer.stop_and_record(),
387            flush_metrics,
388        );
389        flush_metrics.observe();
390
391        let edit = RegionEdit {
392            files_to_add: file_metas,
393            files_to_remove: Vec::new(),
394            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395            compaction_time_window: None,
396            // The last entry has been flushed.
397            flushed_entry_id: Some(version_data.last_entry_id),
398            flushed_sequence: Some(version_data.committed_sequence),
399            committed_sequence: None,
400        };
401        info!("Applying {edit:?} to region {}", self.region_id);
402
403        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
404
405        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
406            RegionLeaderState::Downgrading
407        } else {
408            // Check if region is in staging mode
409            let current_state = self.manifest_ctx.current_state();
410            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
411                RegionLeaderState::Staging
412            } else {
413                RegionLeaderState::Writable
414            }
415        };
416        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
417        // add a cleanup job to remove them later.
418        let version = self
419            .manifest_ctx
420            .update_manifest(expected_state, action_list)
421            .await?;
422        info!(
423            "Successfully update manifest version to {version}, region: {}, reason: {}",
424            self.region_id,
425            self.reason.as_str()
426        );
427
428        Ok(edit)
429    }
430
431    async fn do_flush_memtables(
432        &self,
433        version: &VersionRef,
434        write_opts: WriteOptions,
435    ) -> Result<DoFlushMemtablesResult> {
436        let memtables = version.memtables.immutables();
437        let mut file_metas = Vec::with_capacity(memtables.len());
438        let mut flushed_bytes = 0;
439        let mut series_count = 0;
440        // Convert partition expression once outside the map
441        let partition_expr = match &version.metadata.partition_expr {
442            None => None,
443            Some(json_expr) if json_expr.is_empty() => None,
444            Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
445                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
446        };
447        let mut flush_metrics = Metrics::new(WriteType::Flush);
448        for mem in memtables {
449            if mem.is_empty() {
450                // Skip empty memtables.
451                continue;
452            }
453
454            // Compact the memtable first, this waits the background compaction to finish.
455            let compact_start = std::time::Instant::now();
456            if let Err(e) = mem.compact(true) {
457                common_telemetry::error!(e; "Failed to compact memtable before flush");
458            }
459            let compact_cost = compact_start.elapsed();
460            flush_metrics.compact_memtable += compact_cost;
461
462            // Sets `for_flush` flag to true.
463            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
464            let num_mem_ranges = mem_ranges.ranges.len();
465            let num_mem_rows = mem_ranges.stats.num_rows();
466            let memtable_id = mem.id();
467            // Increases series count for each mem range. We consider each mem range has different series so
468            // the counter may have more series than the actual series count.
469            series_count += mem_ranges.stats.series_count();
470
471            if mem_ranges.is_record_batch() {
472                let flush_start = Instant::now();
473                let FlushFlatMemResult {
474                    num_encoded,
475                    max_sequence,
476                    num_sources,
477                    results,
478                } = self
479                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
480                    .await?;
481                for (source_idx, result) in results.into_iter().enumerate() {
482                    let (ssts_written, metrics) = result?;
483                    if ssts_written.is_empty() {
484                        // No data written.
485                        continue;
486                    }
487
488                    common_telemetry::debug!(
489                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
490                        self.region_id,
491                        memtable_id,
492                        source_idx,
493                        num_sources,
494                        metrics
495                    );
496
497                    flush_metrics = flush_metrics.merge(metrics);
498
499                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
500                        flushed_bytes += sst_info.file_size;
501                        Self::new_file_meta(
502                            self.region_id,
503                            max_sequence,
504                            sst_info,
505                            partition_expr.clone(),
506                        )
507                    }));
508                }
509
510                common_telemetry::debug!(
511                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
512                    self.region_id,
513                    num_sources,
514                    memtable_id,
515                    num_mem_ranges,
516                    num_encoded,
517                    num_mem_rows,
518                    flush_start.elapsed(),
519                    compact_cost,
520                );
521            } else {
522                let max_sequence = mem_ranges.stats.max_sequence();
523                let source = memtable_source(mem_ranges, &version.options).await?;
524
525                // Flush to level 0.
526                let source = Either::Left(source);
527                let write_request = self.new_write_request(version, max_sequence, source);
528
529                let mut metrics = Metrics::new(WriteType::Flush);
530                let ssts_written = self
531                    .access_layer
532                    .write_sst(write_request, &write_opts, &mut metrics)
533                    .await?;
534                if ssts_written.is_empty() {
535                    // No data written.
536                    continue;
537                }
538
539                debug!(
540                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
541                    self.region_id, num_mem_ranges, num_mem_rows, metrics
542                );
543
544                flush_metrics = flush_metrics.merge(metrics);
545
546                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
547                    flushed_bytes += sst_info.file_size;
548                    Self::new_file_meta(
549                        self.region_id,
550                        max_sequence,
551                        sst_info,
552                        partition_expr.clone(),
553                    )
554                }));
555            };
556        }
557
558        Ok(DoFlushMemtablesResult {
559            file_metas,
560            flushed_bytes,
561            series_count,
562            flush_metrics,
563        })
564    }
565
566    async fn flush_flat_mem_ranges(
567        &self,
568        version: &VersionRef,
569        write_opts: &WriteOptions,
570        mem_ranges: MemtableRanges,
571    ) -> Result<FlushFlatMemResult> {
572        let batch_schema = to_flat_sst_arrow_schema(
573            &version.metadata,
574            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
575        );
576        let flat_sources = memtable_flat_sources(
577            batch_schema,
578            mem_ranges,
579            &version.options,
580            version.metadata.primary_key.len(),
581        )?;
582        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583        let num_encoded = flat_sources.encoded.len();
584        let max_sequence = flat_sources.max_sequence;
585        for source in flat_sources.sources {
586            let source = Either::Right(source);
587            let write_request = self.new_write_request(version, max_sequence, source);
588            let access_layer = self.access_layer.clone();
589            let write_opts = write_opts.clone();
590            let semaphore = self.flush_semaphore.clone();
591            let task = common_runtime::spawn_global(async move {
592                let _permit = semaphore.acquire().await.unwrap();
593                let mut metrics = Metrics::new(WriteType::Flush);
594                let ssts = access_layer
595                    .write_sst(write_request, &write_opts, &mut metrics)
596                    .await?;
597                Ok((ssts, metrics))
598            });
599            tasks.push(task);
600        }
601        for encoded in flat_sources.encoded {
602            let access_layer = self.access_layer.clone();
603            let cache_manager = self.cache_manager.clone();
604            let region_id = version.metadata.region_id;
605            let semaphore = self.flush_semaphore.clone();
606            let task = common_runtime::spawn_global(async move {
607                let _permit = semaphore.acquire().await.unwrap();
608                let metrics = access_layer
609                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
610                    .await?;
611                Ok((smallvec![encoded.sst_info], metrics))
612            });
613            tasks.push(task);
614        }
615        let num_sources = tasks.len();
616        let results = futures::future::try_join_all(tasks)
617            .await
618            .context(JoinSnafu)?;
619        Ok(FlushFlatMemResult {
620            num_encoded,
621            max_sequence,
622            num_sources,
623            results,
624        })
625    }
626
627    fn new_file_meta(
628        region_id: RegionId,
629        max_sequence: u64,
630        sst_info: SstInfo,
631        partition_expr: Option<PartitionExpr>,
632    ) -> FileMeta {
633        FileMeta {
634            region_id,
635            file_id: sst_info.file_id,
636            time_range: sst_info.time_range,
637            level: 0,
638            file_size: sst_info.file_size,
639            available_indexes: sst_info.index_metadata.build_available_indexes(),
640            index_file_size: sst_info.index_metadata.file_size,
641            index_file_id: None,
642            num_rows: sst_info.num_rows as u64,
643            num_row_groups: sst_info.num_row_groups,
644            sequence: NonZeroU64::new(max_sequence),
645            partition_expr,
646            num_series: sst_info.num_series,
647        }
648    }
649
650    fn new_write_request(
651        &self,
652        version: &VersionRef,
653        max_sequence: u64,
654        source: Either<Source, FlatSource>,
655    ) -> SstWriteRequest {
656        SstWriteRequest {
657            op_type: OperationType::Flush,
658            metadata: version.metadata.clone(),
659            source,
660            cache_manager: self.cache_manager.clone(),
661            storage: version.options.storage.clone(),
662            max_sequence: Some(max_sequence),
663            index_options: self.index_options.clone(),
664            index_config: self.engine_config.index.clone(),
665            inverted_index_config: self.engine_config.inverted_index.clone(),
666            fulltext_index_config: self.engine_config.fulltext_index.clone(),
667            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
668        }
669    }
670
671    /// Notify flush job status.
672    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
673        if let Err(e) = self
674            .request_sender
675            .send(WorkerRequestWithTime::new(request))
676            .await
677        {
678            error!(
679                "Failed to notify flush job status for region {}, request: {:?}",
680                self.region_id, e.0
681            );
682        }
683    }
684
685    /// Merge two flush tasks.
686    fn merge(&mut self, mut other: RegionFlushTask) {
687        assert_eq!(self.region_id, other.region_id);
688        // Now we only merge senders. They share the same flush reason.
689        self.senders.append(&mut other.senders);
690    }
691}
692
693struct FlushFlatMemResult {
694    num_encoded: usize,
695    max_sequence: u64,
696    num_sources: usize,
697    results: Vec<Result<(SstInfoArray, Metrics)>>,
698}
699
700struct DoFlushMemtablesResult {
701    file_metas: Vec<FileMeta>,
702    flushed_bytes: u64,
703    series_count: usize,
704    flush_metrics: Metrics,
705}
706
707/// Returns a [Source] for the given memtable.
708async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
709    let source = if mem_ranges.ranges.len() == 1 {
710        let only_range = mem_ranges.ranges.into_values().next().unwrap();
711        let iter = only_range.build_iter()?;
712        Source::Iter(iter)
713    } else {
714        // todo(hl): a workaround since sync version of MergeReader is wip.
715        let sources = mem_ranges
716            .ranges
717            .into_values()
718            .map(|r| r.build_iter().map(Source::Iter))
719            .collect::<Result<Vec<_>>>()?;
720        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
721        let maybe_dedup = if options.append_mode {
722            // no dedup in append mode
723            Box::new(merge_reader) as _
724        } else {
725            // dedup according to merge mode
726            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
727                MergeMode::LastRow => {
728                    Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
729                }
730                MergeMode::LastNonNull => {
731                    Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
732                }
733            }
734        };
735        Source::Reader(maybe_dedup)
736    };
737    Ok(source)
738}
739
740struct FlatSources {
741    max_sequence: u64,
742    sources: SmallVec<[FlatSource; 4]>,
743    encoded: SmallVec<[EncodedRange; 4]>,
744}
745
746// TODO(yingwen): Flushes into multiple files in parallel.
747/// Returns the max sequence and [FlatSource] for the given memtable.
748fn memtable_flat_sources(
749    schema: SchemaRef,
750    mem_ranges: MemtableRanges,
751    options: &RegionOptions,
752    field_column_start: usize,
753) -> Result<FlatSources> {
754    let MemtableRanges { ranges, stats } = mem_ranges;
755    let max_sequence = stats.max_sequence();
756    let mut flat_sources = FlatSources {
757        max_sequence,
758        sources: SmallVec::new(),
759        encoded: SmallVec::new(),
760    };
761
762    if ranges.len() == 1 {
763        let only_range = ranges.into_values().next().unwrap();
764        if let Some(encoded) = only_range.encoded() {
765            flat_sources.encoded.push(encoded);
766        } else {
767            let iter = only_range.build_record_batch_iter(None)?;
768            flat_sources.sources.push(FlatSource::Iter(iter));
769        };
770    } else {
771        let min_flush_rows = stats.num_rows / 8;
772        let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
773        let mut last_iter_rows = 0;
774        let num_ranges = ranges.len();
775        let mut input_iters = Vec::with_capacity(num_ranges);
776        for (_range_id, range) in ranges {
777            if let Some(encoded) = range.encoded() {
778                flat_sources.encoded.push(encoded);
779                continue;
780            }
781
782            let iter = range.build_record_batch_iter(None)?;
783            input_iters.push(iter);
784            last_iter_rows += range.num_rows();
785
786            if last_iter_rows > min_flush_rows {
787                let maybe_dedup = merge_and_dedup(
788                    &schema,
789                    options,
790                    field_column_start,
791                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
792                )?;
793
794                flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
795                last_iter_rows = 0;
796            }
797        }
798
799        // Handle remaining iters.
800        if !input_iters.is_empty() {
801            let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
802
803            flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
804        }
805    }
806
807    Ok(flat_sources)
808}
809
810fn merge_and_dedup(
811    schema: &SchemaRef,
812    options: &RegionOptions,
813    field_column_start: usize,
814    input_iters: Vec<BoxedRecordBatchIterator>,
815) -> Result<BoxedRecordBatchIterator> {
816    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
817    let maybe_dedup = if options.append_mode {
818        // No dedup in append mode
819        Box::new(merge_iter) as _
820    } else {
821        // Dedup according to merge mode.
822        match options.merge_mode() {
823            MergeMode::LastRow => {
824                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
825            }
826            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
827                merge_iter,
828                FlatLastNonNull::new(field_column_start, false),
829            )) as _,
830        }
831    };
832    Ok(maybe_dedup)
833}
834
835/// Manages background flushes of a worker.
836pub(crate) struct FlushScheduler {
837    /// Tracks regions need to flush.
838    region_status: HashMap<RegionId, FlushStatus>,
839    /// Background job scheduler.
840    scheduler: SchedulerRef,
841}
842
843impl FlushScheduler {
844    /// Creates a new flush scheduler.
845    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
846        FlushScheduler {
847            region_status: HashMap::new(),
848            scheduler,
849        }
850    }
851
852    /// Returns true if the region already requested flush.
853    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
854        self.region_status.contains_key(&region_id)
855    }
856
857    /// Schedules a flush `task` for specific `region`.
858    pub(crate) fn schedule_flush(
859        &mut self,
860        region_id: RegionId,
861        version_control: &VersionControlRef,
862        task: RegionFlushTask,
863    ) -> Result<()> {
864        debug_assert_eq!(region_id, task.region_id);
865
866        let version = version_control.current().version;
867        if version.memtables.is_empty() {
868            debug_assert!(!self.region_status.contains_key(&region_id));
869            // The region has nothing to flush.
870            task.on_success();
871            return Ok(());
872        }
873
874        // Don't increase the counter if a region has nothing to flush.
875        FLUSH_REQUESTS_TOTAL
876            .with_label_values(&[task.reason.as_str()])
877            .inc();
878
879        // Add this region to status map.
880        let flush_status = self
881            .region_status
882            .entry(region_id)
883            .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
884        // Checks whether we can flush the region now.
885        if flush_status.flushing {
886            // There is already a flush job running.
887            flush_status.merge_task(task);
888            return Ok(());
889        }
890
891        // TODO(yingwen): We can merge with pending and execute directly.
892        // If there are pending tasks, then we should push it to pending list.
893        if flush_status.pending_task.is_some() {
894            flush_status.merge_task(task);
895            return Ok(());
896        }
897
898        // Now we can flush the region directly.
899        if let Err(e) = version_control.freeze_mutable() {
900            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
901
902            // Remove from region status if we can't freeze the mutable memtable.
903            self.region_status.remove(&region_id);
904            return Err(e);
905        }
906        // Submit a flush job.
907        let job = task.into_flush_job(version_control);
908        if let Err(e) = self.scheduler.schedule(job) {
909            // If scheduler returns error, senders in the job will be dropped and waiters
910            // can get recv errors.
911            error!(e; "Failed to schedule flush job for region {}", region_id);
912
913            // Remove from region status if we can't submit the task.
914            self.region_status.remove(&region_id);
915            return Err(e);
916        }
917
918        flush_status.flushing = true;
919
920        Ok(())
921    }
922
923    /// Notifies the scheduler that the flush job is finished.
924    ///
925    /// Returns all pending requests if the region doesn't need to flush again.
926    pub(crate) fn on_flush_success(
927        &mut self,
928        region_id: RegionId,
929    ) -> Option<(
930        Vec<SenderDdlRequest>,
931        Vec<SenderWriteRequest>,
932        Vec<SenderBulkRequest>,
933    )> {
934        let flush_status = self.region_status.get_mut(&region_id)?;
935
936        // This region doesn't have running flush job.
937        flush_status.flushing = false;
938
939        let pending_requests = if flush_status.pending_task.is_none() {
940            // The region doesn't have any pending flush task.
941            // Safety: The flush status must exist.
942            let flush_status = self.region_status.remove(&region_id).unwrap();
943            Some((
944                flush_status.pending_ddls,
945                flush_status.pending_writes,
946                flush_status.pending_bulk_writes,
947            ))
948        } else {
949            let version_data = flush_status.version_control.current();
950            if version_data.version.memtables.is_empty() {
951                // The region has nothing to flush, we also need to remove it from the status.
952                // Safety: The pending task is not None.
953                let task = flush_status.pending_task.take().unwrap();
954                // The region has nothing to flush. We can notify pending task.
955                task.on_success();
956                // `schedule_next_flush()` may pick up the same region to flush, so we must remove
957                // it from the status to avoid leaking pending requests.
958                // Safety: The flush status must exist.
959                let flush_status = self.region_status.remove(&region_id).unwrap();
960                Some((
961                    flush_status.pending_ddls,
962                    flush_status.pending_writes,
963                    flush_status.pending_bulk_writes,
964                ))
965            } else {
966                // We can flush the region again, keep it in the region status.
967                None
968            }
969        };
970
971        // Schedule next flush job.
972        if let Err(e) = self.schedule_next_flush() {
973            error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
974        }
975
976        pending_requests
977    }
978
979    /// Notifies the scheduler that the flush job is failed.
980    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
981        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
982
983        FLUSH_FAILURE_TOTAL.inc();
984
985        // Remove this region.
986        let Some(flush_status) = self.region_status.remove(&region_id) else {
987            return;
988        };
989
990        // Fast fail: cancels all pending tasks and sends error to their waiters.
991        flush_status.on_failure(err);
992
993        // Still tries to schedule a new flush.
994        if let Err(e) = self.schedule_next_flush() {
995            error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
996        }
997    }
998
999    /// Notifies the scheduler that the region is dropped.
1000    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1001        self.remove_region_on_failure(
1002            region_id,
1003            Arc::new(RegionDroppedSnafu { region_id }.build()),
1004        );
1005    }
1006
1007    /// Notifies the scheduler that the region is closed.
1008    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1009        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1010    }
1011
1012    /// Notifies the scheduler that the region is truncated.
1013    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1014        self.remove_region_on_failure(
1015            region_id,
1016            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1017        );
1018    }
1019
1020    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1021        // Remove this region.
1022        let Some(flush_status) = self.region_status.remove(&region_id) else {
1023            return;
1024        };
1025
1026        // Notifies all pending tasks.
1027        flush_status.on_failure(err);
1028    }
1029
1030    /// Add ddl request to pending queue.
1031    ///
1032    /// # Panics
1033    /// Panics if region didn't request flush.
1034    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1035        let status = self.region_status.get_mut(&request.region_id).unwrap();
1036        status.pending_ddls.push(request);
1037    }
1038
1039    /// Add write request to pending queue.
1040    ///
1041    /// # Panics
1042    /// Panics if region didn't request flush.
1043    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1044        let status = self
1045            .region_status
1046            .get_mut(&request.request.region_id)
1047            .unwrap();
1048        status.pending_writes.push(request);
1049    }
1050
1051    /// Add bulk write request to pending queue.
1052    ///
1053    /// # Panics
1054    /// Panics if region didn't request flush.
1055    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1056        let status = self.region_status.get_mut(&request.region_id).unwrap();
1057        status.pending_bulk_writes.push(request);
1058    }
1059
1060    /// Returns true if the region has pending DDLs.
1061    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1062        self.region_status
1063            .get(&region_id)
1064            .map(|status| !status.pending_ddls.is_empty())
1065            .unwrap_or(false)
1066    }
1067
1068    /// Schedules a new flush task when the scheduler can submit next task.
1069    pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1070        debug_assert!(
1071            self.region_status
1072                .values()
1073                .all(|status| status.flushing || status.pending_task.is_some())
1074        );
1075
1076        // Get the first region from status map.
1077        let Some(flush_status) = self
1078            .region_status
1079            .values_mut()
1080            .find(|status| status.pending_task.is_some())
1081        else {
1082            return Ok(());
1083        };
1084        debug_assert!(!flush_status.flushing);
1085        let task = flush_status.pending_task.take().unwrap();
1086        let region_id = flush_status.region_id;
1087        let version_control = flush_status.version_control.clone();
1088
1089        self.schedule_flush(region_id, &version_control, task)
1090    }
1091}
1092
1093impl Drop for FlushScheduler {
1094    fn drop(&mut self) {
1095        for (region_id, flush_status) in self.region_status.drain() {
1096            // We are shutting down so notify all pending tasks.
1097            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1098        }
1099    }
1100}
1101
1102/// Flush status of a region scheduled by the [FlushScheduler].
1103///
1104/// Tracks running and pending flush tasks and all pending requests of a region.
1105struct FlushStatus {
1106    /// Current region.
1107    region_id: RegionId,
1108    /// Version control of the region.
1109    version_control: VersionControlRef,
1110    /// There is a flush task running.
1111    ///
1112    /// It is possible that a region is not flushing but has pending task if the scheduler
1113    /// doesn't schedules this region.
1114    flushing: bool,
1115    /// Task waiting for next flush.
1116    pending_task: Option<RegionFlushTask>,
1117    /// Pending ddl requests.
1118    pending_ddls: Vec<SenderDdlRequest>,
1119    /// Requests waiting to write after altering the region.
1120    pending_writes: Vec<SenderWriteRequest>,
1121    /// Bulk requests waiting to write after altering the region.
1122    pending_bulk_writes: Vec<SenderBulkRequest>,
1123}
1124
1125impl FlushStatus {
1126    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1127        FlushStatus {
1128            region_id,
1129            version_control,
1130            flushing: false,
1131            pending_task: None,
1132            pending_ddls: Vec::new(),
1133            pending_writes: Vec::new(),
1134            pending_bulk_writes: Vec::new(),
1135        }
1136    }
1137
1138    /// Merges the task to pending task.
1139    fn merge_task(&mut self, task: RegionFlushTask) {
1140        if let Some(pending) = &mut self.pending_task {
1141            pending.merge(task);
1142        } else {
1143            self.pending_task = Some(task);
1144        }
1145    }
1146
1147    fn on_failure(self, err: Arc<Error>) {
1148        if let Some(mut task) = self.pending_task {
1149            task.on_failure(err.clone());
1150        }
1151        for ddl in self.pending_ddls {
1152            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1153                region_id: self.region_id,
1154            }));
1155        }
1156        for write_req in self.pending_writes {
1157            write_req
1158                .sender
1159                .send(Err(err.clone()).context(FlushRegionSnafu {
1160                    region_id: self.region_id,
1161                }));
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use tokio::sync::oneshot;
1169
1170    use super::*;
1171    use crate::cache::CacheManager;
1172    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1173    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1174    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1175
1176    #[test]
1177    fn test_get_mutable_limit() {
1178        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1179        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1180        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1181        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1182    }
1183
1184    #[test]
1185    fn test_over_mutable_limit() {
1186        // Mutable limit is 500.
1187        let manager = WriteBufferManagerImpl::new(1000);
1188        manager.reserve_mem(400);
1189        assert!(!manager.should_flush_engine());
1190        assert!(!manager.should_stall());
1191
1192        // More than mutable limit.
1193        manager.reserve_mem(400);
1194        assert!(manager.should_flush_engine());
1195
1196        // Freezes mutable.
1197        manager.schedule_free_mem(400);
1198        assert!(!manager.should_flush_engine());
1199        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1200        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1201
1202        // Releases immutable.
1203        manager.free_mem(400);
1204        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1205        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1206    }
1207
1208    #[test]
1209    fn test_over_global() {
1210        // Mutable limit is 500.
1211        let manager = WriteBufferManagerImpl::new(1000);
1212        manager.reserve_mem(1100);
1213        assert!(manager.should_stall());
1214        // Global usage is still 1100.
1215        manager.schedule_free_mem(200);
1216        assert!(manager.should_flush_engine());
1217
1218        // More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
1219        manager.schedule_free_mem(450);
1220        assert!(!manager.should_flush_engine());
1221
1222        // Now mutable is enough.
1223        manager.reserve_mem(50);
1224        assert!(manager.should_flush_engine());
1225        manager.reserve_mem(100);
1226        assert!(manager.should_flush_engine());
1227    }
1228
1229    #[test]
1230    fn test_manager_notify() {
1231        let (sender, receiver) = watch::channel(());
1232        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1233        manager.reserve_mem(500);
1234        assert!(!receiver.has_changed().unwrap());
1235        manager.schedule_free_mem(500);
1236        assert!(!receiver.has_changed().unwrap());
1237        manager.free_mem(500);
1238        assert!(receiver.has_changed().unwrap());
1239    }
1240
1241    #[tokio::test]
1242    async fn test_schedule_empty() {
1243        let env = SchedulerEnv::new().await;
1244        let (tx, _rx) = mpsc::channel(4);
1245        let mut scheduler = env.mock_flush_scheduler();
1246        let builder = VersionControlBuilder::new();
1247
1248        let version_control = Arc::new(builder.build());
1249        let (output_tx, output_rx) = oneshot::channel();
1250        let mut task = RegionFlushTask {
1251            region_id: builder.region_id(),
1252            reason: FlushReason::Others,
1253            senders: Vec::new(),
1254            request_sender: tx,
1255            access_layer: env.access_layer.clone(),
1256            listener: WorkerListener::default(),
1257            engine_config: Arc::new(MitoConfig::default()),
1258            row_group_size: None,
1259            cache_manager: Arc::new(CacheManager::default()),
1260            manifest_ctx: env
1261                .mock_manifest_context(version_control.current().version.metadata.clone())
1262                .await,
1263            index_options: IndexOptions::default(),
1264            flush_semaphore: Arc::new(Semaphore::new(2)),
1265        };
1266        task.push_sender(OptionOutputTx::from(output_tx));
1267        scheduler
1268            .schedule_flush(builder.region_id(), &version_control, task)
1269            .unwrap();
1270        assert!(scheduler.region_status.is_empty());
1271        let output = output_rx.await.unwrap().unwrap();
1272        assert_eq!(output, 0);
1273        assert!(scheduler.region_status.is_empty());
1274    }
1275
1276    #[tokio::test]
1277    async fn test_schedule_pending_request() {
1278        let job_scheduler = Arc::new(VecScheduler::default());
1279        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1280        let (tx, _rx) = mpsc::channel(4);
1281        let mut scheduler = env.mock_flush_scheduler();
1282        let mut builder = VersionControlBuilder::new();
1283        // Overwrites the empty memtable builder.
1284        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1285        let version_control = Arc::new(builder.build());
1286        // Writes data to the memtable so it is not empty.
1287        let version_data = version_control.current();
1288        write_rows_to_version(&version_data.version, "host0", 0, 10);
1289        let manifest_ctx = env
1290            .mock_manifest_context(version_data.version.metadata.clone())
1291            .await;
1292        // Creates 3 tasks.
1293        let mut tasks: Vec<_> = (0..3)
1294            .map(|_| RegionFlushTask {
1295                region_id: builder.region_id(),
1296                reason: FlushReason::Others,
1297                senders: Vec::new(),
1298                request_sender: tx.clone(),
1299                access_layer: env.access_layer.clone(),
1300                listener: WorkerListener::default(),
1301                engine_config: Arc::new(MitoConfig::default()),
1302                row_group_size: None,
1303                cache_manager: Arc::new(CacheManager::default()),
1304                manifest_ctx: manifest_ctx.clone(),
1305                index_options: IndexOptions::default(),
1306                flush_semaphore: Arc::new(Semaphore::new(2)),
1307            })
1308            .collect();
1309        // Schedule first task.
1310        let task = tasks.pop().unwrap();
1311        scheduler
1312            .schedule_flush(builder.region_id(), &version_control, task)
1313            .unwrap();
1314        // Should schedule 1 flush.
1315        assert_eq!(1, scheduler.region_status.len());
1316        assert_eq!(1, job_scheduler.num_jobs());
1317        // Check the new version.
1318        let version_data = version_control.current();
1319        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1320        // Schedule remaining tasks.
1321        let output_rxs: Vec<_> = tasks
1322            .into_iter()
1323            .map(|mut task| {
1324                let (output_tx, output_rx) = oneshot::channel();
1325                task.push_sender(OptionOutputTx::from(output_tx));
1326                scheduler
1327                    .schedule_flush(builder.region_id(), &version_control, task)
1328                    .unwrap();
1329                output_rx
1330            })
1331            .collect();
1332        // Assumes the flush job is finished.
1333        version_control.apply_edit(
1334            Some(RegionEdit {
1335                files_to_add: Vec::new(),
1336                files_to_remove: Vec::new(),
1337                timestamp_ms: None,
1338                compaction_time_window: None,
1339                flushed_entry_id: None,
1340                flushed_sequence: None,
1341                committed_sequence: None,
1342            }),
1343            &[0],
1344            builder.file_purger(),
1345        );
1346        scheduler.on_flush_success(builder.region_id());
1347        // No new flush task.
1348        assert_eq!(1, job_scheduler.num_jobs());
1349        // The flush status is cleared.
1350        assert!(scheduler.region_status.is_empty());
1351        for output_rx in output_rxs {
1352            let output = output_rx.await.unwrap().unwrap();
1353            assert_eq!(output, 0);
1354        }
1355    }
1356}