mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44    BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48    INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68/// Global write buffer (memtable) manager.
69///
70/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
71pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72    /// Returns whether to trigger the engine.
73    fn should_flush_engine(&self) -> bool;
74
75    /// Returns whether to stall write requests.
76    fn should_stall(&self) -> bool;
77
78    /// Reserves `mem` bytes.
79    fn reserve_mem(&self, mem: usize);
80
81    /// Tells the manager we are freeing `mem` bytes.
82    ///
83    /// We are in the process of freeing `mem` bytes, so it is not considered
84    /// when checking the soft limit.
85    fn schedule_free_mem(&self, mem: usize);
86
87    /// We have freed `mem` bytes.
88    fn free_mem(&self, mem: usize);
89
90    /// Returns the total memory used by memtables.
91    fn memory_usage(&self) -> usize;
92}
93
94pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
95
96/// Default [WriteBufferManager] implementation.
97///
98/// Inspired by RocksDB's WriteBufferManager.
99/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
100#[derive(Debug)]
101pub struct WriteBufferManagerImpl {
102    /// Write buffer size for the engine.
103    global_write_buffer_size: usize,
104    /// Mutable memtable memory size limit.
105    mutable_limit: usize,
106    /// Memory in used (e.g. used by mutable and immutable memtables).
107    memory_used: AtomicUsize,
108    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
109    memory_active: AtomicUsize,
110    /// Optional notifier.
111    /// The manager can wake up the worker once we free the write buffer.
112    notifier: Option<watch::Sender<()>>,
113}
114
115impl WriteBufferManagerImpl {
116    /// Returns a new manager with specific `global_write_buffer_size`.
117    pub fn new(global_write_buffer_size: usize) -> Self {
118        Self {
119            global_write_buffer_size,
120            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
121            memory_used: AtomicUsize::new(0),
122            memory_active: AtomicUsize::new(0),
123            notifier: None,
124        }
125    }
126
127    /// Attaches a notifier to the manager.
128    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
129        self.notifier = Some(notifier);
130        self
131    }
132
133    /// Returns memory usage of mutable memtables.
134    pub fn mutable_usage(&self) -> usize {
135        self.memory_active.load(Ordering::Relaxed)
136    }
137
138    /// Returns the size limit for mutable memtables.
139    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
140        // Reserves half of the write buffer for mutable memtable.
141        global_write_buffer_size / 2
142    }
143}
144
145impl WriteBufferManager for WriteBufferManagerImpl {
146    fn should_flush_engine(&self) -> bool {
147        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
148        if mutable_memtable_memory_usage > self.mutable_limit {
149            debug!(
150                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
151                mutable_memtable_memory_usage,
152                self.memory_usage(),
153                self.mutable_limit,
154                self.global_write_buffer_size,
155            );
156            return true;
157        }
158
159        let memory_usage = self.memory_used.load(Ordering::Relaxed);
160        // If the memory exceeds the buffer size, we trigger more aggressive
161        // flush. But if already more than half memory is being flushed,
162        // triggering more flush may not help. We will hold it instead.
163        if memory_usage >= self.global_write_buffer_size {
164            if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
165                debug!(
166                    "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
167                 mutable_usage: {}.",
168                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
169                );
170                return true;
171            } else {
172                trace!(
173                    "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
174                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
175                );
176            }
177        }
178
179        false
180    }
181
182    fn should_stall(&self) -> bool {
183        self.memory_usage() >= self.global_write_buffer_size
184    }
185
186    fn reserve_mem(&self, mem: usize) {
187        self.memory_used.fetch_add(mem, Ordering::Relaxed);
188        self.memory_active.fetch_add(mem, Ordering::Relaxed);
189    }
190
191    fn schedule_free_mem(&self, mem: usize) {
192        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
193    }
194
195    fn free_mem(&self, mem: usize) {
196        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
197        if let Some(notifier) = &self.notifier {
198            // Notifies the worker after the memory usage is decreased. When we drop the memtable
199            // outside of the worker, the worker may still stall requests because the memory usage
200            // is not updated. So we need to notify the worker to handle stalled requests again.
201            let _ = notifier.send(());
202        }
203    }
204
205    fn memory_usage(&self) -> usize {
206        self.memory_used.load(Ordering::Relaxed)
207    }
208}
209
210/// Reason of a flush task.
211#[derive(Debug, IntoStaticStr)]
212pub enum FlushReason {
213    /// Other reasons.
214    Others,
215    /// Engine reaches flush threshold.
216    EngineFull,
217    /// Manual flush.
218    Manual,
219    /// Flush to alter table.
220    Alter,
221    /// Flush periodically.
222    Periodically,
223    /// Flush memtable during downgrading state.
224    Downgrading,
225}
226
227impl FlushReason {
228    /// Get flush reason as static str.
229    fn as_str(&self) -> &'static str {
230        self.into()
231    }
232}
233
234/// Task to flush a region.
235pub(crate) struct RegionFlushTask {
236    /// Region to flush.
237    pub(crate) region_id: RegionId,
238    /// Reason to flush.
239    pub(crate) reason: FlushReason,
240    /// Flush result senders.
241    pub(crate) senders: Vec<OutputTx>,
242    /// Request sender to notify the worker.
243    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245    pub(crate) access_layer: AccessLayerRef,
246    pub(crate) listener: WorkerListener,
247    pub(crate) engine_config: Arc<MitoConfig>,
248    pub(crate) row_group_size: Option<usize>,
249    pub(crate) cache_manager: CacheManagerRef,
250    pub(crate) manifest_ctx: ManifestContextRef,
251
252    /// Index options for the region.
253    pub(crate) index_options: IndexOptions,
254    /// Semaphore to control flush concurrency.
255    pub(crate) flush_semaphore: Arc<Semaphore>,
256}
257
258impl RegionFlushTask {
259    /// Push the sender if it is not none.
260    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
261        if let Some(sender) = sender.take_inner() {
262            self.senders.push(sender);
263        }
264    }
265
266    /// Consumes the task and notify the sender the job is success.
267    fn on_success(self) {
268        for sender in self.senders {
269            sender.send(Ok(0));
270        }
271    }
272
273    /// Send flush error to waiter.
274    fn on_failure(&mut self, err: Arc<Error>) {
275        for sender in self.senders.drain(..) {
276            sender.send(Err(err.clone()).context(FlushRegionSnafu {
277                region_id: self.region_id,
278            }));
279        }
280    }
281
282    /// Converts the flush task into a background job.
283    ///
284    /// We must call this in the region worker.
285    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
286        // Get a version of this region before creating a job to get current
287        // wal entry id, sequence and immutable memtables.
288        let version_data = version_control.current();
289
290        Box::pin(async move {
291            INFLIGHT_FLUSH_COUNT.inc();
292            self.do_flush(version_data).await;
293            INFLIGHT_FLUSH_COUNT.dec();
294        })
295    }
296
297    /// Runs the flush task.
298    async fn do_flush(&mut self, version_data: VersionControlData) {
299        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
300        self.listener.on_flush_begin(self.region_id).await;
301
302        let worker_request = match self.flush_memtables(&version_data).await {
303            Ok(edit) => {
304                let memtables_to_remove = version_data
305                    .version
306                    .memtables
307                    .immutables()
308                    .iter()
309                    .map(|m| m.id())
310                    .collect();
311                let flush_finished = FlushFinished {
312                    region_id: self.region_id,
313                    // The last entry has been flushed.
314                    flushed_entry_id: version_data.last_entry_id,
315                    senders: std::mem::take(&mut self.senders),
316                    _timer: timer,
317                    edit,
318                    memtables_to_remove,
319                };
320                WorkerRequest::Background {
321                    region_id: self.region_id,
322                    notify: BackgroundNotify::FlushFinished(flush_finished),
323                }
324            }
325            Err(e) => {
326                error!(e; "Failed to flush region {}", self.region_id);
327                // Discard the timer.
328                timer.stop_and_discard();
329
330                let err = Arc::new(e);
331                self.on_failure(err.clone());
332                WorkerRequest::Background {
333                    region_id: self.region_id,
334                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335                }
336            }
337        };
338        self.send_worker_request(worker_request).await;
339    }
340
341    /// Flushes memtables to level 0 SSTs and updates the manifest.
342    /// Returns the [RegionEdit] to apply.
343    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344        // We must use the immutable memtables list and entry ids from the `version_data`
345        // for consistency as others might already modify the version in the `version_control`.
346        let version = &version_data.version;
347        let timer = FLUSH_ELAPSED
348            .with_label_values(&["flush_memtables"])
349            .start_timer();
350
351        let mut write_opts = WriteOptions {
352            write_buffer_size: self.engine_config.sst_write_buffer_size,
353            ..Default::default()
354        };
355        if let Some(row_group_size) = self.row_group_size {
356            write_opts.row_group_size = row_group_size;
357        }
358
359        let DoFlushMemtablesResult {
360            file_metas,
361            flushed_bytes,
362            series_count,
363            flush_metrics,
364        } = self.do_flush_memtables(version, write_opts).await?;
365
366        if !file_metas.is_empty() {
367            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368        }
369
370        let mut file_ids = Vec::with_capacity(file_metas.len());
371        let mut total_rows = 0;
372        let mut total_bytes = 0;
373        for meta in &file_metas {
374            file_ids.push(meta.file_id);
375            total_rows += meta.num_rows;
376            total_bytes += meta.file_size;
377        }
378        info!(
379            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380            self.region_id,
381            self.reason.as_str(),
382            file_ids,
383            series_count,
384            total_rows,
385            total_bytes,
386            timer.stop_and_record(),
387            flush_metrics,
388        );
389        flush_metrics.observe();
390
391        let edit = RegionEdit {
392            files_to_add: file_metas,
393            files_to_remove: Vec::new(),
394            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395            compaction_time_window: None,
396            // The last entry has been flushed.
397            flushed_entry_id: Some(version_data.last_entry_id),
398            flushed_sequence: Some(version_data.committed_sequence),
399            committed_sequence: None,
400        };
401        info!("Applying {edit:?} to region {}", self.region_id);
402
403        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
404
405        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
406            RegionLeaderState::Downgrading
407        } else {
408            // Check if region is in staging mode
409            let current_state = self.manifest_ctx.current_state();
410            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
411                RegionLeaderState::Staging
412            } else {
413                RegionLeaderState::Writable
414            }
415        };
416        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
417        // add a cleanup job to remove them later.
418        let version = self
419            .manifest_ctx
420            .update_manifest(expected_state, action_list)
421            .await?;
422        info!(
423            "Successfully update manifest version to {version}, region: {}, reason: {}",
424            self.region_id,
425            self.reason.as_str()
426        );
427
428        Ok(edit)
429    }
430
431    async fn do_flush_memtables(
432        &self,
433        version: &VersionRef,
434        write_opts: WriteOptions,
435    ) -> Result<DoFlushMemtablesResult> {
436        let memtables = version.memtables.immutables();
437        let mut file_metas = Vec::with_capacity(memtables.len());
438        let mut flushed_bytes = 0;
439        let mut series_count = 0;
440        // Convert partition expression once outside the map
441        let partition_expr = match &version.metadata.partition_expr {
442            None => None,
443            Some(json_expr) if json_expr.is_empty() => None,
444            Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
445                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
446        };
447        let mut flush_metrics = Metrics::new(WriteType::Flush);
448        for mem in memtables {
449            if mem.is_empty() {
450                // Skip empty memtables.
451                continue;
452            }
453
454            // Compact the memtable first, this waits the background compaction to finish.
455            let compact_start = std::time::Instant::now();
456            if let Err(e) = mem.compact(true) {
457                common_telemetry::error!(e; "Failed to compact memtable before flush");
458            }
459            let compact_cost = compact_start.elapsed();
460            flush_metrics.compact_memtable += compact_cost;
461
462            // Sets `for_flush` flag to true.
463            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
464            let num_mem_ranges = mem_ranges.ranges.len();
465            let num_mem_rows = mem_ranges.stats.num_rows();
466            let memtable_id = mem.id();
467            // Increases series count for each mem range. We consider each mem range has different series so
468            // the counter may have more series than the actual series count.
469            series_count += mem_ranges.stats.series_count();
470
471            if mem_ranges.is_record_batch() {
472                let flush_start = Instant::now();
473                let FlushFlatMemResult {
474                    num_encoded,
475                    max_sequence,
476                    num_sources,
477                    results,
478                } = self
479                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
480                    .await?;
481                for (source_idx, result) in results.into_iter().enumerate() {
482                    let (ssts_written, metrics) = result?;
483                    if ssts_written.is_empty() {
484                        // No data written.
485                        continue;
486                    }
487
488                    common_telemetry::debug!(
489                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
490                        self.region_id,
491                        memtable_id,
492                        source_idx,
493                        num_sources,
494                        metrics
495                    );
496
497                    flush_metrics = flush_metrics.merge(metrics);
498
499                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
500                        flushed_bytes += sst_info.file_size;
501                        Self::new_file_meta(
502                            self.region_id,
503                            max_sequence,
504                            sst_info,
505                            partition_expr.clone(),
506                        )
507                    }));
508                }
509
510                common_telemetry::debug!(
511                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
512                    self.region_id,
513                    num_sources,
514                    memtable_id,
515                    num_mem_ranges,
516                    num_encoded,
517                    num_mem_rows,
518                    flush_start.elapsed(),
519                    compact_cost,
520                );
521            } else {
522                let max_sequence = mem_ranges.stats.max_sequence();
523                let source = memtable_source(mem_ranges, &version.options).await?;
524
525                // Flush to level 0.
526                let source = Either::Left(source);
527                let write_request = self.new_write_request(version, max_sequence, source);
528
529                let mut metrics = Metrics::new(WriteType::Flush);
530                let ssts_written = self
531                    .access_layer
532                    .write_sst(write_request, &write_opts, &mut metrics)
533                    .await?;
534                if ssts_written.is_empty() {
535                    // No data written.
536                    continue;
537                }
538
539                debug!(
540                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
541                    self.region_id, num_mem_ranges, num_mem_rows, metrics
542                );
543
544                flush_metrics = flush_metrics.merge(metrics);
545
546                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
547                    flushed_bytes += sst_info.file_size;
548                    Self::new_file_meta(
549                        self.region_id,
550                        max_sequence,
551                        sst_info,
552                        partition_expr.clone(),
553                    )
554                }));
555            };
556        }
557
558        Ok(DoFlushMemtablesResult {
559            file_metas,
560            flushed_bytes,
561            series_count,
562            flush_metrics,
563        })
564    }
565
566    async fn flush_flat_mem_ranges(
567        &self,
568        version: &VersionRef,
569        write_opts: &WriteOptions,
570        mem_ranges: MemtableRanges,
571    ) -> Result<FlushFlatMemResult> {
572        let batch_schema = to_flat_sst_arrow_schema(
573            &version.metadata,
574            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
575        );
576        let flat_sources = memtable_flat_sources(
577            batch_schema,
578            mem_ranges,
579            &version.options,
580            version.metadata.primary_key.len(),
581        )?;
582        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583        let num_encoded = flat_sources.encoded.len();
584        let max_sequence = flat_sources.max_sequence;
585        for source in flat_sources.sources {
586            let source = Either::Right(source);
587            let write_request = self.new_write_request(version, max_sequence, source);
588            let access_layer = self.access_layer.clone();
589            let write_opts = write_opts.clone();
590            let semaphore = self.flush_semaphore.clone();
591            let task = common_runtime::spawn_global(async move {
592                let _permit = semaphore.acquire().await.unwrap();
593                let mut metrics = Metrics::new(WriteType::Flush);
594                let ssts = access_layer
595                    .write_sst(write_request, &write_opts, &mut metrics)
596                    .await?;
597                Ok((ssts, metrics))
598            });
599            tasks.push(task);
600        }
601        for encoded in flat_sources.encoded {
602            let access_layer = self.access_layer.clone();
603            let cache_manager = self.cache_manager.clone();
604            let region_id = version.metadata.region_id;
605            let semaphore = self.flush_semaphore.clone();
606            let task = common_runtime::spawn_global(async move {
607                let _permit = semaphore.acquire().await.unwrap();
608                let metrics = access_layer
609                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
610                    .await?;
611                Ok((smallvec![encoded.sst_info], metrics))
612            });
613            tasks.push(task);
614        }
615        let num_sources = tasks.len();
616        let results = futures::future::try_join_all(tasks)
617            .await
618            .context(JoinSnafu)?;
619        Ok(FlushFlatMemResult {
620            num_encoded,
621            max_sequence,
622            num_sources,
623            results,
624        })
625    }
626
627    fn new_file_meta(
628        region_id: RegionId,
629        max_sequence: u64,
630        sst_info: SstInfo,
631        partition_expr: Option<PartitionExpr>,
632    ) -> FileMeta {
633        FileMeta {
634            region_id,
635            file_id: sst_info.file_id,
636            time_range: sst_info.time_range,
637            level: 0,
638            file_size: sst_info.file_size,
639            available_indexes: sst_info.index_metadata.build_available_indexes(),
640            index_file_size: sst_info.index_metadata.file_size,
641            index_file_id: None,
642            num_rows: sst_info.num_rows as u64,
643            num_row_groups: sst_info.num_row_groups,
644            sequence: NonZeroU64::new(max_sequence),
645            partition_expr,
646            num_series: sst_info.num_series,
647        }
648    }
649
650    fn new_write_request(
651        &self,
652        version: &VersionRef,
653        max_sequence: u64,
654        source: Either<Source, FlatSource>,
655    ) -> SstWriteRequest {
656        SstWriteRequest {
657            op_type: OperationType::Flush,
658            metadata: version.metadata.clone(),
659            source,
660            cache_manager: self.cache_manager.clone(),
661            storage: version.options.storage.clone(),
662            max_sequence: Some(max_sequence),
663            index_options: self.index_options.clone(),
664            index_config: self.engine_config.index.clone(),
665            inverted_index_config: self.engine_config.inverted_index.clone(),
666            fulltext_index_config: self.engine_config.fulltext_index.clone(),
667            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
668        }
669    }
670
671    /// Notify flush job status.
672    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
673        if let Err(e) = self
674            .request_sender
675            .send(WorkerRequestWithTime::new(request))
676            .await
677        {
678            error!(
679                "Failed to notify flush job status for region {}, request: {:?}",
680                self.region_id, e.0
681            );
682        }
683    }
684
685    /// Merge two flush tasks.
686    fn merge(&mut self, mut other: RegionFlushTask) {
687        assert_eq!(self.region_id, other.region_id);
688        // Now we only merge senders. They share the same flush reason.
689        self.senders.append(&mut other.senders);
690    }
691}
692
693struct FlushFlatMemResult {
694    num_encoded: usize,
695    max_sequence: u64,
696    num_sources: usize,
697    results: Vec<Result<(SstInfoArray, Metrics)>>,
698}
699
700struct DoFlushMemtablesResult {
701    file_metas: Vec<FileMeta>,
702    flushed_bytes: u64,
703    series_count: usize,
704    flush_metrics: Metrics,
705}
706
707/// Returns a [Source] for the given memtable.
708async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
709    let source = if mem_ranges.ranges.len() == 1 {
710        let only_range = mem_ranges.ranges.into_values().next().unwrap();
711        let iter = only_range.build_iter()?;
712        Source::Iter(iter)
713    } else {
714        // todo(hl): a workaround since sync version of MergeReader is wip.
715        let sources = mem_ranges
716            .ranges
717            .into_values()
718            .map(|r| r.build_iter().map(Source::Iter))
719            .collect::<Result<Vec<_>>>()?;
720        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
721        let maybe_dedup = if options.append_mode {
722            // no dedup in append mode
723            Box::new(merge_reader) as _
724        } else {
725            // dedup according to merge mode
726            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
727                MergeMode::LastRow => {
728                    Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
729                }
730                MergeMode::LastNonNull => {
731                    Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
732                }
733            }
734        };
735        Source::Reader(maybe_dedup)
736    };
737    Ok(source)
738}
739
740struct FlatSources {
741    max_sequence: u64,
742    sources: SmallVec<[FlatSource; 4]>,
743    encoded: SmallVec<[EncodedRange; 4]>,
744}
745
746/// Returns the max sequence and [FlatSource] for the given memtable.
747fn memtable_flat_sources(
748    schema: SchemaRef,
749    mem_ranges: MemtableRanges,
750    options: &RegionOptions,
751    field_column_start: usize,
752) -> Result<FlatSources> {
753    let MemtableRanges { ranges, stats } = mem_ranges;
754    let max_sequence = stats.max_sequence();
755    let mut flat_sources = FlatSources {
756        max_sequence,
757        sources: SmallVec::new(),
758        encoded: SmallVec::new(),
759    };
760
761    if ranges.len() == 1 {
762        let only_range = ranges.into_values().next().unwrap();
763        if let Some(encoded) = only_range.encoded() {
764            flat_sources.encoded.push(encoded);
765        } else {
766            let iter = only_range.build_record_batch_iter(None)?;
767            // Dedup according to append mode and merge mode.
768            // Even single range may have duplicate rows.
769            let iter = maybe_dedup_one(options, field_column_start, iter);
770            flat_sources.sources.push(FlatSource::Iter(iter));
771        };
772    } else {
773        let min_flush_rows = stats.num_rows / 8;
774        let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
775        let mut last_iter_rows = 0;
776        let num_ranges = ranges.len();
777        let mut input_iters = Vec::with_capacity(num_ranges);
778        for (_range_id, range) in ranges {
779            if let Some(encoded) = range.encoded() {
780                flat_sources.encoded.push(encoded);
781                continue;
782            }
783
784            let iter = range.build_record_batch_iter(None)?;
785            input_iters.push(iter);
786            last_iter_rows += range.num_rows();
787
788            if last_iter_rows > min_flush_rows {
789                let maybe_dedup = merge_and_dedup(
790                    &schema,
791                    options,
792                    field_column_start,
793                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
794                )?;
795
796                flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
797                last_iter_rows = 0;
798            }
799        }
800
801        // Handle remaining iters.
802        if !input_iters.is_empty() {
803            let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
804
805            flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
806        }
807    }
808
809    Ok(flat_sources)
810}
811
812fn merge_and_dedup(
813    schema: &SchemaRef,
814    options: &RegionOptions,
815    field_column_start: usize,
816    input_iters: Vec<BoxedRecordBatchIterator>,
817) -> Result<BoxedRecordBatchIterator> {
818    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
819    let maybe_dedup = if options.append_mode {
820        // No dedup in append mode
821        Box::new(merge_iter) as _
822    } else {
823        // Dedup according to merge mode.
824        match options.merge_mode() {
825            MergeMode::LastRow => {
826                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
827            }
828            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
829                merge_iter,
830                FlatLastNonNull::new(field_column_start, false),
831            )) as _,
832        }
833    };
834    Ok(maybe_dedup)
835}
836
837fn maybe_dedup_one(
838    options: &RegionOptions,
839    field_column_start: usize,
840    input_iter: BoxedRecordBatchIterator,
841) -> BoxedRecordBatchIterator {
842    if options.append_mode {
843        // No dedup in append mode
844        input_iter
845    } else {
846        // Dedup according to merge mode.
847        match options.merge_mode() {
848            MergeMode::LastRow => {
849                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
850            }
851            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
852                input_iter,
853                FlatLastNonNull::new(field_column_start, false),
854            )),
855        }
856    }
857}
858
859/// Manages background flushes of a worker.
860pub(crate) struct FlushScheduler {
861    /// Tracks regions need to flush.
862    region_status: HashMap<RegionId, FlushStatus>,
863    /// Background job scheduler.
864    scheduler: SchedulerRef,
865}
866
867impl FlushScheduler {
868    /// Creates a new flush scheduler.
869    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
870        FlushScheduler {
871            region_status: HashMap::new(),
872            scheduler,
873        }
874    }
875
876    /// Returns true if the region already requested flush.
877    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
878        self.region_status.contains_key(&region_id)
879    }
880
881    /// Schedules a flush `task` for specific `region`.
882    pub(crate) fn schedule_flush(
883        &mut self,
884        region_id: RegionId,
885        version_control: &VersionControlRef,
886        task: RegionFlushTask,
887    ) -> Result<()> {
888        debug_assert_eq!(region_id, task.region_id);
889
890        let version = version_control.current().version;
891        if version.memtables.is_empty() {
892            debug_assert!(!self.region_status.contains_key(&region_id));
893            // The region has nothing to flush.
894            task.on_success();
895            return Ok(());
896        }
897
898        // Don't increase the counter if a region has nothing to flush.
899        FLUSH_REQUESTS_TOTAL
900            .with_label_values(&[task.reason.as_str()])
901            .inc();
902
903        // Add this region to status map.
904        let flush_status = self
905            .region_status
906            .entry(region_id)
907            .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
908        // Checks whether we can flush the region now.
909        if flush_status.flushing {
910            // There is already a flush job running.
911            flush_status.merge_task(task);
912            return Ok(());
913        }
914
915        // TODO(yingwen): We can merge with pending and execute directly.
916        // If there are pending tasks, then we should push it to pending list.
917        if flush_status.pending_task.is_some() {
918            flush_status.merge_task(task);
919            return Ok(());
920        }
921
922        // Now we can flush the region directly.
923        if let Err(e) = version_control.freeze_mutable() {
924            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
925
926            // Remove from region status if we can't freeze the mutable memtable.
927            self.region_status.remove(&region_id);
928            return Err(e);
929        }
930        // Submit a flush job.
931        let job = task.into_flush_job(version_control);
932        if let Err(e) = self.scheduler.schedule(job) {
933            // If scheduler returns error, senders in the job will be dropped and waiters
934            // can get recv errors.
935            error!(e; "Failed to schedule flush job for region {}", region_id);
936
937            // Remove from region status if we can't submit the task.
938            self.region_status.remove(&region_id);
939            return Err(e);
940        }
941
942        flush_status.flushing = true;
943
944        Ok(())
945    }
946
947    /// Notifies the scheduler that the flush job is finished.
948    ///
949    /// Returns all pending requests if the region doesn't need to flush again.
950    pub(crate) fn on_flush_success(
951        &mut self,
952        region_id: RegionId,
953    ) -> Option<(
954        Vec<SenderDdlRequest>,
955        Vec<SenderWriteRequest>,
956        Vec<SenderBulkRequest>,
957    )> {
958        let flush_status = self.region_status.get_mut(&region_id)?;
959
960        // This region doesn't have running flush job.
961        flush_status.flushing = false;
962
963        let pending_requests = if flush_status.pending_task.is_none() {
964            // The region doesn't have any pending flush task.
965            // Safety: The flush status must exist.
966            let flush_status = self.region_status.remove(&region_id).unwrap();
967            Some((
968                flush_status.pending_ddls,
969                flush_status.pending_writes,
970                flush_status.pending_bulk_writes,
971            ))
972        } else {
973            let version_data = flush_status.version_control.current();
974            if version_data.version.memtables.is_empty() {
975                // The region has nothing to flush, we also need to remove it from the status.
976                // Safety: The pending task is not None.
977                let task = flush_status.pending_task.take().unwrap();
978                // The region has nothing to flush. We can notify pending task.
979                task.on_success();
980                // `schedule_next_flush()` may pick up the same region to flush, so we must remove
981                // it from the status to avoid leaking pending requests.
982                // Safety: The flush status must exist.
983                let flush_status = self.region_status.remove(&region_id).unwrap();
984                Some((
985                    flush_status.pending_ddls,
986                    flush_status.pending_writes,
987                    flush_status.pending_bulk_writes,
988                ))
989            } else {
990                // We can flush the region again, keep it in the region status.
991                None
992            }
993        };
994
995        // Schedule next flush job.
996        if let Err(e) = self.schedule_next_flush() {
997            error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
998        }
999
1000        pending_requests
1001    }
1002
1003    /// Notifies the scheduler that the flush job is failed.
1004    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1005        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1006
1007        FLUSH_FAILURE_TOTAL.inc();
1008
1009        // Remove this region.
1010        let Some(flush_status) = self.region_status.remove(&region_id) else {
1011            return;
1012        };
1013
1014        // Fast fail: cancels all pending tasks and sends error to their waiters.
1015        flush_status.on_failure(err);
1016
1017        // Still tries to schedule a new flush.
1018        if let Err(e) = self.schedule_next_flush() {
1019            error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
1020        }
1021    }
1022
1023    /// Notifies the scheduler that the region is dropped.
1024    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1025        self.remove_region_on_failure(
1026            region_id,
1027            Arc::new(RegionDroppedSnafu { region_id }.build()),
1028        );
1029    }
1030
1031    /// Notifies the scheduler that the region is closed.
1032    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1033        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1034    }
1035
1036    /// Notifies the scheduler that the region is truncated.
1037    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1038        self.remove_region_on_failure(
1039            region_id,
1040            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1041        );
1042    }
1043
1044    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1045        // Remove this region.
1046        let Some(flush_status) = self.region_status.remove(&region_id) else {
1047            return;
1048        };
1049
1050        // Notifies all pending tasks.
1051        flush_status.on_failure(err);
1052    }
1053
1054    /// Add ddl request to pending queue.
1055    ///
1056    /// # Panics
1057    /// Panics if region didn't request flush.
1058    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1059        let status = self.region_status.get_mut(&request.region_id).unwrap();
1060        status.pending_ddls.push(request);
1061    }
1062
1063    /// Add write request to pending queue.
1064    ///
1065    /// # Panics
1066    /// Panics if region didn't request flush.
1067    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1068        let status = self
1069            .region_status
1070            .get_mut(&request.request.region_id)
1071            .unwrap();
1072        status.pending_writes.push(request);
1073    }
1074
1075    /// Add bulk write request to pending queue.
1076    ///
1077    /// # Panics
1078    /// Panics if region didn't request flush.
1079    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1080        let status = self.region_status.get_mut(&request.region_id).unwrap();
1081        status.pending_bulk_writes.push(request);
1082    }
1083
1084    /// Returns true if the region has pending DDLs.
1085    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1086        self.region_status
1087            .get(&region_id)
1088            .map(|status| !status.pending_ddls.is_empty())
1089            .unwrap_or(false)
1090    }
1091
1092    /// Schedules a new flush task when the scheduler can submit next task.
1093    pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1094        debug_assert!(
1095            self.region_status
1096                .values()
1097                .all(|status| status.flushing || status.pending_task.is_some())
1098        );
1099
1100        // Get the first region from status map.
1101        let Some(flush_status) = self
1102            .region_status
1103            .values_mut()
1104            .find(|status| status.pending_task.is_some())
1105        else {
1106            return Ok(());
1107        };
1108        debug_assert!(!flush_status.flushing);
1109        let task = flush_status.pending_task.take().unwrap();
1110        let region_id = flush_status.region_id;
1111        let version_control = flush_status.version_control.clone();
1112
1113        self.schedule_flush(region_id, &version_control, task)
1114    }
1115}
1116
1117impl Drop for FlushScheduler {
1118    fn drop(&mut self) {
1119        for (region_id, flush_status) in self.region_status.drain() {
1120            // We are shutting down so notify all pending tasks.
1121            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1122        }
1123    }
1124}
1125
1126/// Flush status of a region scheduled by the [FlushScheduler].
1127///
1128/// Tracks running and pending flush tasks and all pending requests of a region.
1129struct FlushStatus {
1130    /// Current region.
1131    region_id: RegionId,
1132    /// Version control of the region.
1133    version_control: VersionControlRef,
1134    /// There is a flush task running.
1135    ///
1136    /// It is possible that a region is not flushing but has pending task if the scheduler
1137    /// doesn't schedules this region.
1138    flushing: bool,
1139    /// Task waiting for next flush.
1140    pending_task: Option<RegionFlushTask>,
1141    /// Pending ddl requests.
1142    pending_ddls: Vec<SenderDdlRequest>,
1143    /// Requests waiting to write after altering the region.
1144    pending_writes: Vec<SenderWriteRequest>,
1145    /// Bulk requests waiting to write after altering the region.
1146    pending_bulk_writes: Vec<SenderBulkRequest>,
1147}
1148
1149impl FlushStatus {
1150    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1151        FlushStatus {
1152            region_id,
1153            version_control,
1154            flushing: false,
1155            pending_task: None,
1156            pending_ddls: Vec::new(),
1157            pending_writes: Vec::new(),
1158            pending_bulk_writes: Vec::new(),
1159        }
1160    }
1161
1162    /// Merges the task to pending task.
1163    fn merge_task(&mut self, task: RegionFlushTask) {
1164        if let Some(pending) = &mut self.pending_task {
1165            pending.merge(task);
1166        } else {
1167            self.pending_task = Some(task);
1168        }
1169    }
1170
1171    fn on_failure(self, err: Arc<Error>) {
1172        if let Some(mut task) = self.pending_task {
1173            task.on_failure(err.clone());
1174        }
1175        for ddl in self.pending_ddls {
1176            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1177                region_id: self.region_id,
1178            }));
1179        }
1180        for write_req in self.pending_writes {
1181            write_req
1182                .sender
1183                .send(Err(err.clone()).context(FlushRegionSnafu {
1184                    region_id: self.region_id,
1185                }));
1186        }
1187    }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192    use mito_codec::row_converter::build_primary_key_codec;
1193    use tokio::sync::oneshot;
1194
1195    use super::*;
1196    use crate::cache::CacheManager;
1197    use crate::memtable::bulk::part::BulkPartConverter;
1198    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1199    use crate::memtable::{Memtable, RangesOptions};
1200    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1201    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1202    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1203    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1204
1205    #[test]
1206    fn test_get_mutable_limit() {
1207        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1208        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1209        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1210        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1211    }
1212
1213    #[test]
1214    fn test_over_mutable_limit() {
1215        // Mutable limit is 500.
1216        let manager = WriteBufferManagerImpl::new(1000);
1217        manager.reserve_mem(400);
1218        assert!(!manager.should_flush_engine());
1219        assert!(!manager.should_stall());
1220
1221        // More than mutable limit.
1222        manager.reserve_mem(400);
1223        assert!(manager.should_flush_engine());
1224
1225        // Freezes mutable.
1226        manager.schedule_free_mem(400);
1227        assert!(!manager.should_flush_engine());
1228        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1229        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1230
1231        // Releases immutable.
1232        manager.free_mem(400);
1233        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1234        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1235    }
1236
1237    #[test]
1238    fn test_over_global() {
1239        // Mutable limit is 500.
1240        let manager = WriteBufferManagerImpl::new(1000);
1241        manager.reserve_mem(1100);
1242        assert!(manager.should_stall());
1243        // Global usage is still 1100.
1244        manager.schedule_free_mem(200);
1245        assert!(manager.should_flush_engine());
1246
1247        // More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
1248        manager.schedule_free_mem(450);
1249        assert!(!manager.should_flush_engine());
1250
1251        // Now mutable is enough.
1252        manager.reserve_mem(50);
1253        assert!(manager.should_flush_engine());
1254        manager.reserve_mem(100);
1255        assert!(manager.should_flush_engine());
1256    }
1257
1258    #[test]
1259    fn test_manager_notify() {
1260        let (sender, receiver) = watch::channel(());
1261        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1262        manager.reserve_mem(500);
1263        assert!(!receiver.has_changed().unwrap());
1264        manager.schedule_free_mem(500);
1265        assert!(!receiver.has_changed().unwrap());
1266        manager.free_mem(500);
1267        assert!(receiver.has_changed().unwrap());
1268    }
1269
1270    #[tokio::test]
1271    async fn test_schedule_empty() {
1272        let env = SchedulerEnv::new().await;
1273        let (tx, _rx) = mpsc::channel(4);
1274        let mut scheduler = env.mock_flush_scheduler();
1275        let builder = VersionControlBuilder::new();
1276
1277        let version_control = Arc::new(builder.build());
1278        let (output_tx, output_rx) = oneshot::channel();
1279        let mut task = RegionFlushTask {
1280            region_id: builder.region_id(),
1281            reason: FlushReason::Others,
1282            senders: Vec::new(),
1283            request_sender: tx,
1284            access_layer: env.access_layer.clone(),
1285            listener: WorkerListener::default(),
1286            engine_config: Arc::new(MitoConfig::default()),
1287            row_group_size: None,
1288            cache_manager: Arc::new(CacheManager::default()),
1289            manifest_ctx: env
1290                .mock_manifest_context(version_control.current().version.metadata.clone())
1291                .await,
1292            index_options: IndexOptions::default(),
1293            flush_semaphore: Arc::new(Semaphore::new(2)),
1294        };
1295        task.push_sender(OptionOutputTx::from(output_tx));
1296        scheduler
1297            .schedule_flush(builder.region_id(), &version_control, task)
1298            .unwrap();
1299        assert!(scheduler.region_status.is_empty());
1300        let output = output_rx.await.unwrap().unwrap();
1301        assert_eq!(output, 0);
1302        assert!(scheduler.region_status.is_empty());
1303    }
1304
1305    #[tokio::test]
1306    async fn test_schedule_pending_request() {
1307        let job_scheduler = Arc::new(VecScheduler::default());
1308        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1309        let (tx, _rx) = mpsc::channel(4);
1310        let mut scheduler = env.mock_flush_scheduler();
1311        let mut builder = VersionControlBuilder::new();
1312        // Overwrites the empty memtable builder.
1313        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1314        let version_control = Arc::new(builder.build());
1315        // Writes data to the memtable so it is not empty.
1316        let version_data = version_control.current();
1317        write_rows_to_version(&version_data.version, "host0", 0, 10);
1318        let manifest_ctx = env
1319            .mock_manifest_context(version_data.version.metadata.clone())
1320            .await;
1321        // Creates 3 tasks.
1322        let mut tasks: Vec<_> = (0..3)
1323            .map(|_| RegionFlushTask {
1324                region_id: builder.region_id(),
1325                reason: FlushReason::Others,
1326                senders: Vec::new(),
1327                request_sender: tx.clone(),
1328                access_layer: env.access_layer.clone(),
1329                listener: WorkerListener::default(),
1330                engine_config: Arc::new(MitoConfig::default()),
1331                row_group_size: None,
1332                cache_manager: Arc::new(CacheManager::default()),
1333                manifest_ctx: manifest_ctx.clone(),
1334                index_options: IndexOptions::default(),
1335                flush_semaphore: Arc::new(Semaphore::new(2)),
1336            })
1337            .collect();
1338        // Schedule first task.
1339        let task = tasks.pop().unwrap();
1340        scheduler
1341            .schedule_flush(builder.region_id(), &version_control, task)
1342            .unwrap();
1343        // Should schedule 1 flush.
1344        assert_eq!(1, scheduler.region_status.len());
1345        assert_eq!(1, job_scheduler.num_jobs());
1346        // Check the new version.
1347        let version_data = version_control.current();
1348        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1349        // Schedule remaining tasks.
1350        let output_rxs: Vec<_> = tasks
1351            .into_iter()
1352            .map(|mut task| {
1353                let (output_tx, output_rx) = oneshot::channel();
1354                task.push_sender(OptionOutputTx::from(output_tx));
1355                scheduler
1356                    .schedule_flush(builder.region_id(), &version_control, task)
1357                    .unwrap();
1358                output_rx
1359            })
1360            .collect();
1361        // Assumes the flush job is finished.
1362        version_control.apply_edit(
1363            Some(RegionEdit {
1364                files_to_add: Vec::new(),
1365                files_to_remove: Vec::new(),
1366                timestamp_ms: None,
1367                compaction_time_window: None,
1368                flushed_entry_id: None,
1369                flushed_sequence: None,
1370                committed_sequence: None,
1371            }),
1372            &[0],
1373            builder.file_purger(),
1374        );
1375        scheduler.on_flush_success(builder.region_id());
1376        // No new flush task.
1377        assert_eq!(1, job_scheduler.num_jobs());
1378        // The flush status is cleared.
1379        assert!(scheduler.region_status.is_empty());
1380        for output_rx in output_rxs {
1381            let output = output_rx.await.unwrap().unwrap();
1382            assert_eq!(output, 0);
1383        }
1384    }
1385
1386    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1387    #[test]
1388    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1389        // Build test metadata and flat schema
1390        let metadata = metadata_for_test();
1391        let schema = to_flat_sst_arrow_schema(
1392            &metadata,
1393            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1394        );
1395
1396        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1397        // Two rows with identical keys and timestamps (ts = 1000), different field values
1398        let capacity = 16;
1399        let pk_codec = build_primary_key_codec(&metadata);
1400        let mut converter =
1401            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1402        let kvs = build_key_values_with_ts_seq_values(
1403            &metadata,
1404            "dup_key".to_string(),
1405            1,
1406            vec![1000i64, 1000i64].into_iter(),
1407            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1408            1,
1409        );
1410        converter.append_key_values(&kvs).unwrap();
1411        let part = converter.convert().unwrap();
1412
1413        // Helper to build MemtableRanges with a single range from one bulk part.
1414        // We use BulkMemtable directly because it produces record batch iterators.
1415        let build_ranges = |append_mode: bool| -> MemtableRanges {
1416            let memtable = crate::memtable::bulk::BulkMemtable::new(
1417                1,
1418                metadata.clone(),
1419                None,
1420                None,
1421                append_mode,
1422                MergeMode::LastRow,
1423            );
1424            memtable.write_bulk(part.clone()).unwrap();
1425            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1426        };
1427
1428        // Case 1: append_mode = false => dedup happens, total rows should be 1
1429        {
1430            let mem_ranges = build_ranges(false);
1431            assert_eq!(1, mem_ranges.ranges.len());
1432
1433            let options = RegionOptions {
1434                append_mode: false,
1435                merge_mode: Some(MergeMode::LastRow),
1436                ..Default::default()
1437            };
1438
1439            let flat_sources = memtable_flat_sources(
1440                schema.clone(),
1441                mem_ranges,
1442                &options,
1443                metadata.primary_key.len(),
1444            )
1445            .unwrap();
1446            assert!(flat_sources.encoded.is_empty());
1447            assert_eq!(1, flat_sources.sources.len());
1448
1449            // Consume the iterator and count rows
1450            let mut total_rows = 0usize;
1451            for source in flat_sources.sources {
1452                match source {
1453                    crate::read::FlatSource::Iter(iter) => {
1454                        for rb in iter {
1455                            total_rows += rb.unwrap().num_rows();
1456                        }
1457                    }
1458                    crate::read::FlatSource::Stream(_) => unreachable!(),
1459                }
1460            }
1461            assert_eq!(1, total_rows, "dedup should keep a single row");
1462        }
1463
1464        // Case 2: append_mode = true => no dedup, total rows should be 2
1465        {
1466            let mem_ranges = build_ranges(true);
1467            assert_eq!(1, mem_ranges.ranges.len());
1468
1469            let options = RegionOptions {
1470                append_mode: true,
1471                ..Default::default()
1472            };
1473
1474            let flat_sources =
1475                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1476                    .unwrap();
1477            assert!(flat_sources.encoded.is_empty());
1478            assert_eq!(1, flat_sources.sources.len());
1479
1480            let mut total_rows = 0usize;
1481            for source in flat_sources.sources {
1482                match source {
1483                    crate::read::FlatSource::Iter(iter) => {
1484                        for rb in iter {
1485                            total_rows += rb.unwrap().num_rows();
1486                        }
1487                    }
1488                    crate::read::FlatSource::Stream(_) => unreachable!(),
1489                }
1490            }
1491            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1492        }
1493    }
1494}