Skip to main content

mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::region_request::RegionFlushReason;
30use store_api::storage::{RegionId, SequenceNumber};
31use strum::IntoStaticStr;
32use tokio::sync::{Semaphore, mpsc, watch};
33
34use crate::access_layer::{
35    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
36};
37use crate::cache::CacheManagerRef;
38use crate::config::MitoConfig;
39use crate::error::{
40    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
41    RegionTruncatedSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
45use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
46use crate::metrics::{
47    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
48    INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::FlatSource;
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
54use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
55use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
56use crate::request::{
57    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
58    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
59};
60use crate::schedule::scheduler::{Job, SchedulerRef};
61use crate::sst::file::FileMeta;
62use crate::sst::parquet::metadata::extract_primary_key_range;
63use crate::sst::parquet::{
64    DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
65};
66use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
67use crate::worker::WorkerListener;
68
69/// Global write buffer (memtable) manager.
70///
71/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
72pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
73    /// Returns whether to trigger the engine.
74    fn should_flush_engine(&self) -> bool;
75
76    /// Returns whether to stall write requests.
77    fn should_stall(&self) -> bool;
78
79    /// Reserves `mem` bytes.
80    fn reserve_mem(&self, mem: usize);
81
82    /// Tells the manager we are freeing `mem` bytes.
83    ///
84    /// We are in the process of freeing `mem` bytes, so it is not considered
85    /// when checking the soft limit.
86    fn schedule_free_mem(&self, mem: usize);
87
88    /// We have freed `mem` bytes.
89    fn free_mem(&self, mem: usize);
90
91    /// Returns the total memory used by memtables.
92    fn memory_usage(&self) -> usize;
93
94    /// Returns the mutable memtable memory limit.
95    ///
96    /// The write buffer manager should flush memtables when the mutable memory usage
97    /// exceeds this limit.
98    fn flush_limit(&self) -> usize;
99}
100
101pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
102
103/// Default [WriteBufferManager] implementation.
104///
105/// Inspired by RocksDB's WriteBufferManager.
106/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
107#[derive(Debug)]
108pub struct WriteBufferManagerImpl {
109    /// Write buffer size for the engine.
110    global_write_buffer_size: usize,
111    /// Mutable memtable memory size limit.
112    mutable_limit: usize,
113    /// Memory in used (e.g. used by mutable and immutable memtables).
114    memory_used: AtomicUsize,
115    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
116    memory_active: AtomicUsize,
117    /// Optional notifier.
118    /// The manager can wake up the worker once we free the write buffer.
119    notifier: Option<watch::Sender<()>>,
120}
121
122impl WriteBufferManagerImpl {
123    /// Returns a new manager with specific `global_write_buffer_size`.
124    pub fn new(global_write_buffer_size: usize) -> Self {
125        Self {
126            global_write_buffer_size,
127            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
128            memory_used: AtomicUsize::new(0),
129            memory_active: AtomicUsize::new(0),
130            notifier: None,
131        }
132    }
133
134    /// Attaches a notifier to the manager.
135    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
136        self.notifier = Some(notifier);
137        self
138    }
139
140    /// Returns memory usage of mutable memtables.
141    pub fn mutable_usage(&self) -> usize {
142        self.memory_active.load(Ordering::Relaxed)
143    }
144
145    /// Returns the size limit for mutable memtables.
146    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
147        // Reserves half of the write buffer for mutable memtable.
148        global_write_buffer_size / 2
149    }
150}
151
152impl WriteBufferManager for WriteBufferManagerImpl {
153    fn should_flush_engine(&self) -> bool {
154        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
155        if mutable_memtable_memory_usage >= self.mutable_limit {
156            debug!(
157                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
158                mutable_memtable_memory_usage,
159                self.memory_usage(),
160                self.mutable_limit,
161                self.global_write_buffer_size,
162            );
163            return true;
164        }
165
166        let memory_usage = self.memory_used.load(Ordering::Relaxed);
167        if memory_usage >= self.global_write_buffer_size {
168            return true;
169        }
170
171        false
172    }
173
174    fn should_stall(&self) -> bool {
175        self.memory_usage() >= self.global_write_buffer_size
176    }
177
178    fn reserve_mem(&self, mem: usize) {
179        self.memory_used.fetch_add(mem, Ordering::Relaxed);
180        self.memory_active.fetch_add(mem, Ordering::Relaxed);
181    }
182
183    fn schedule_free_mem(&self, mem: usize) {
184        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
185    }
186
187    fn free_mem(&self, mem: usize) {
188        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
189        if let Some(notifier) = &self.notifier {
190            // Notifies the worker after the memory usage is decreased. When we drop the memtable
191            // outside of the worker, the worker may still stall requests because the memory usage
192            // is not updated. So we need to notify the worker to handle stalled requests again.
193            let _ = notifier.send(());
194        }
195    }
196
197    fn memory_usage(&self) -> usize {
198        self.memory_used.load(Ordering::Relaxed)
199    }
200
201    fn flush_limit(&self) -> usize {
202        self.mutable_limit
203    }
204}
205
206/// Reason of a flush task.
207#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
208pub enum FlushReason {
209    /// Other reasons.
210    Others,
211    /// Engine reaches flush threshold.
212    EngineFull,
213    /// Manual flush.
214    Manual,
215    /// Flush to alter table.
216    Alter,
217    /// Flush periodically.
218    Periodically,
219    /// Flush memtable during downgrading state.
220    Downgrading,
221    /// Enter staging mode.
222    EnterStaging,
223    /// Flush when region is closing.
224    Closing,
225    /// Flush triggered before region migration.
226    RegionMigration,
227    /// Flush triggered by repartition procedure.
228    Repartition,
229    /// Flush triggered by remote WAL pruning.
230    RemoteWalPrune,
231}
232
233impl FlushReason {
234    /// Get flush reason as static str.
235    fn as_str(&self) -> &'static str {
236        self.into()
237    }
238}
239
240impl From<RegionFlushReason> for FlushReason {
241    fn from(reason: RegionFlushReason) -> Self {
242        match reason {
243            RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
244            RegionFlushReason::Repartition => FlushReason::Repartition,
245            RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
246        }
247    }
248}
249
250/// Task to flush a region.
251pub(crate) struct RegionFlushTask {
252    /// Region to flush.
253    pub(crate) region_id: RegionId,
254    /// Reason to flush.
255    pub(crate) reason: FlushReason,
256    /// Flush result senders.
257    pub(crate) senders: Vec<OutputTx>,
258    /// Request sender to notify the worker.
259    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
260
261    pub(crate) access_layer: AccessLayerRef,
262    pub(crate) listener: WorkerListener,
263    pub(crate) engine_config: Arc<MitoConfig>,
264    pub(crate) row_group_size: Option<usize>,
265    pub(crate) cache_manager: CacheManagerRef,
266    pub(crate) manifest_ctx: ManifestContextRef,
267
268    /// Index options for the region.
269    pub(crate) index_options: IndexOptions,
270    /// Semaphore to control flush concurrency.
271    pub(crate) flush_semaphore: Arc<Semaphore>,
272    /// Whether the region is in staging mode.
273    pub(crate) is_staging: bool,
274    /// Partition expression of the region.
275    ///
276    /// This is used to generate the file meta.
277    pub(crate) partition_expr: Option<String>,
278}
279
280impl RegionFlushTask {
281    /// Push the sender if it is not none.
282    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
283        if let Some(sender) = sender.take_inner() {
284            self.senders.push(sender);
285        }
286    }
287
288    /// Consumes the task and notify the sender the job is success.
289    fn on_success(self) {
290        for sender in self.senders {
291            sender.send(Ok(0));
292        }
293    }
294
295    /// Send flush error to waiter.
296    fn on_failure(&mut self, err: Arc<Error>) {
297        for sender in self.senders.drain(..) {
298            sender.send(Err(err.clone()).context(FlushRegionSnafu {
299                region_id: self.region_id,
300            }));
301        }
302    }
303
304    /// Converts the flush task into a background job.
305    ///
306    /// We must call this in the region worker.
307    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
308        // Get a version of this region before creating a job to get current
309        // wal entry id, sequence and immutable memtables.
310        let version_data = version_control.current();
311
312        Box::pin(async move {
313            INFLIGHT_FLUSH_COUNT.inc();
314            self.do_flush(version_data).await;
315            INFLIGHT_FLUSH_COUNT.dec();
316        })
317    }
318
319    /// Runs the flush task.
320    async fn do_flush(&mut self, version_data: VersionControlData) {
321        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
322        self.listener.on_flush_begin(self.region_id).await;
323
324        let worker_request = match self.flush_memtables(&version_data).await {
325            Ok(edit) => {
326                let memtables_to_remove = version_data
327                    .version
328                    .memtables
329                    .immutables()
330                    .iter()
331                    .map(|m| m.id())
332                    .collect();
333                let flush_finished = FlushFinished {
334                    region_id: self.region_id,
335                    // The last entry has been flushed.
336                    flushed_entry_id: version_data.last_entry_id,
337                    senders: std::mem::take(&mut self.senders),
338                    _timer: timer,
339                    edit,
340                    memtables_to_remove,
341                    is_staging: self.is_staging,
342                    flush_reason: self.reason,
343                };
344                WorkerRequest::Background {
345                    region_id: self.region_id,
346                    notify: BackgroundNotify::FlushFinished(flush_finished),
347                }
348            }
349            Err(e) => {
350                error!(e; "Failed to flush region {}", self.region_id);
351                // Discard the timer.
352                timer.stop_and_discard();
353
354                let err = Arc::new(e);
355                self.on_failure(err.clone());
356                WorkerRequest::Background {
357                    region_id: self.region_id,
358                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
359                }
360            }
361        };
362        self.send_worker_request(worker_request).await;
363    }
364
365    /// Flushes memtables to level 0 SSTs and updates the manifest.
366    /// Returns the [RegionEdit] to apply.
367    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
368        // We must use the immutable memtables list and entry ids from the `version_data`
369        // for consistency as others might already modify the version in the `version_control`.
370        let version = &version_data.version;
371        let timer = FLUSH_ELAPSED
372            .with_label_values(&["flush_memtables"])
373            .start_timer();
374
375        let mut write_opts = WriteOptions {
376            write_buffer_size: self.engine_config.sst_write_buffer_size,
377            ..Default::default()
378        };
379        if let Some(row_group_size) = self.row_group_size {
380            write_opts.row_group_size = row_group_size;
381        }
382
383        let DoFlushMemtablesResult {
384            file_metas,
385            flushed_bytes,
386            series_count,
387            encoded_part_count,
388            flush_metrics,
389        } = self.do_flush_memtables(version, write_opts).await?;
390
391        if !file_metas.is_empty() {
392            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
393        }
394
395        let mut file_ids = Vec::with_capacity(file_metas.len());
396        let mut total_rows = 0;
397        let mut total_bytes = 0;
398        for meta in &file_metas {
399            file_ids.push(meta.file_id);
400            total_rows += meta.num_rows;
401            total_bytes += meta.file_size;
402        }
403        info!(
404            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
405            self.region_id,
406            self.reason.as_str(),
407            file_ids,
408            series_count,
409            total_rows,
410            total_bytes,
411            timer.stop_and_record(),
412            encoded_part_count,
413            flush_metrics,
414        );
415        flush_metrics.observe();
416
417        let edit = RegionEdit {
418            files_to_add: file_metas,
419            files_to_remove: Vec::new(),
420            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
421            compaction_time_window: None,
422            // The last entry has been flushed.
423            flushed_entry_id: Some(version_data.last_entry_id),
424            flushed_sequence: Some(version_data.committed_sequence),
425            committed_sequence: None,
426        };
427        info!(
428            "Applying {edit:?} to region {}, is_staging: {}",
429            self.region_id, self.is_staging
430        );
431
432        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
433
434        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
435            RegionLeaderState::Downgrading
436        } else {
437            // Check if region is in staging mode
438            let current_state = self.manifest_ctx.current_state();
439            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
440                RegionLeaderState::Staging
441            } else {
442                RegionLeaderState::Writable
443            }
444        };
445        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
446        // add a cleanup job to remove them later.
447        let version = self
448            .manifest_ctx
449            .update_manifest(expected_state, action_list, self.is_staging)
450            .await?;
451        info!(
452            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
453            self.region_id,
454            self.is_staging,
455            self.reason.as_str()
456        );
457
458        Ok(edit)
459    }
460
461    async fn do_flush_memtables(
462        &self,
463        version: &VersionRef,
464        write_opts: WriteOptions,
465    ) -> Result<DoFlushMemtablesResult> {
466        let memtables = version.memtables.immutables();
467        let mut file_metas = Vec::with_capacity(memtables.len());
468        let mut flushed_bytes = 0;
469        let mut series_count = 0;
470        let mut encoded_part_count = 0;
471        let mut flush_metrics = Metrics::new(WriteType::Flush);
472        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
473        for mem in memtables {
474            if mem.is_empty() {
475                // Skip empty memtables.
476                continue;
477            }
478
479            // Compact the memtable first, this waits the background compaction to finish.
480            let compact_start = std::time::Instant::now();
481            if let Err(e) = mem.compact(true) {
482                common_telemetry::error!(e; "Failed to compact memtable before flush");
483            }
484            let compact_cost = compact_start.elapsed();
485            flush_metrics.compact_memtable += compact_cost;
486
487            // Sets `for_flush` flag to true.
488            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
489            let num_mem_ranges = mem_ranges.ranges.len();
490
491            // Aggregate stats from all ranges
492            let num_mem_rows = mem_ranges.num_rows();
493            let memtable_series_count = mem_ranges.series_count();
494            let memtable_id = mem.id();
495            // Increases series count for each mem range. We consider each mem range has different series so
496            // the counter may have more series than the actual series count.
497            series_count += memtable_series_count;
498
499            let flush_start = Instant::now();
500            let FlushFlatMemResult {
501                num_encoded,
502                num_sources,
503                results,
504            } = self
505                .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
506                .await?;
507            encoded_part_count += num_encoded;
508            for (source_idx, result) in results.into_iter().enumerate() {
509                let (max_sequence, ssts_written, metrics) = result?;
510                if ssts_written.is_empty() {
511                    // No data written.
512                    continue;
513                }
514
515                common_telemetry::debug!(
516                    "Region {} flush one memtable {} {}/{}, metrics: {:?}",
517                    self.region_id,
518                    memtable_id,
519                    source_idx,
520                    num_sources,
521                    metrics
522                );
523
524                flush_metrics = flush_metrics.merge(metrics);
525
526                for sst_info in ssts_written {
527                    flushed_bytes += sst_info.file_size;
528                    let pk_range = sst_info
529                        .file_metadata
530                        .as_ref()
531                        .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
532                    file_metas.push(Self::new_file_meta(
533                        self.region_id,
534                        max_sequence,
535                        sst_info,
536                        partition_expr.clone(),
537                        pk_range,
538                    ));
539                }
540            }
541
542            common_telemetry::debug!(
543                "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
544                self.region_id,
545                num_sources,
546                memtable_id,
547                num_mem_ranges,
548                num_encoded,
549                num_mem_rows,
550                flush_start.elapsed(),
551                compact_cost,
552            );
553        }
554
555        Ok(DoFlushMemtablesResult {
556            file_metas,
557            flushed_bytes,
558            series_count,
559            encoded_part_count,
560            flush_metrics,
561        })
562    }
563
564    async fn flush_flat_mem_ranges(
565        &self,
566        version: &VersionRef,
567        write_opts: &WriteOptions,
568        mem_ranges: MemtableRanges,
569    ) -> Result<FlushFlatMemResult> {
570        let batch_schema = to_flat_sst_arrow_schema(
571            &version.metadata,
572            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
573        );
574        let field_column_start =
575            flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
576        let flat_sources = memtable_flat_sources(
577            batch_schema,
578            mem_ranges,
579            &version.options,
580            field_column_start,
581        )?;
582        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583        let num_encoded = flat_sources.encoded.len();
584        for (source, max_sequence) in flat_sources.sources {
585            let write_request = self.new_write_request(version, max_sequence, source);
586            let access_layer = self.access_layer.clone();
587            let write_opts = write_opts.clone();
588            let semaphore = self.flush_semaphore.clone();
589            let task = common_runtime::spawn_global(async move {
590                let _permit = semaphore.acquire().await.unwrap();
591                let mut metrics = Metrics::new(WriteType::Flush);
592                let ssts = access_layer
593                    .write_sst(write_request, &write_opts, &mut metrics)
594                    .await?;
595                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
596                Ok((max_sequence, ssts, metrics))
597            });
598            tasks.push(task);
599        }
600        for (encoded, max_sequence) in flat_sources.encoded {
601            let access_layer = self.access_layer.clone();
602            let cache_manager = self.cache_manager.clone();
603            let region_id = version.metadata.region_id;
604            let semaphore = self.flush_semaphore.clone();
605            let task = common_runtime::spawn_global(async move {
606                let _permit = semaphore.acquire().await.unwrap();
607                let metrics = access_layer
608                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
609                    .await?;
610                FLUSH_FILE_TOTAL.inc();
611                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
612            });
613            tasks.push(task);
614        }
615        let num_sources = tasks.len();
616        let results = futures::future::try_join_all(tasks)
617            .await
618            .context(JoinSnafu)?;
619        Ok(FlushFlatMemResult {
620            num_encoded,
621            num_sources,
622            results,
623        })
624    }
625
626    fn new_file_meta(
627        region_id: RegionId,
628        max_sequence: u64,
629        sst_info: SstInfo,
630        partition_expr: Option<PartitionExpr>,
631        primary_key_range: Option<(Bytes, Bytes)>,
632    ) -> FileMeta {
633        let (primary_key_min, primary_key_max) = match primary_key_range {
634            Some((min, max)) => (Some(min), Some(max)),
635            None => (None, None),
636        };
637        FileMeta {
638            region_id,
639            file_id: sst_info.file_id,
640            time_range: sst_info.time_range,
641            level: 0,
642            file_size: sst_info.file_size,
643            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
644            available_indexes: sst_info.index_metadata.build_available_indexes(),
645            indexes: sst_info.index_metadata.build_indexes(),
646            index_file_size: sst_info.index_metadata.file_size,
647            index_version: 0,
648            num_rows: sst_info.num_rows as u64,
649            num_row_groups: sst_info.num_row_groups,
650            sequence: NonZeroU64::new(max_sequence),
651            partition_expr,
652            num_series: sst_info.num_series,
653            primary_key_min,
654            primary_key_max,
655        }
656    }
657
658    fn new_write_request(
659        &self,
660        version: &VersionRef,
661        max_sequence: u64,
662        source: FlatSource,
663    ) -> SstWriteRequest {
664        let flat_format = version
665            .options
666            .sst_format
667            .map(|f| f == FormatType::Flat)
668            .unwrap_or(self.engine_config.default_flat_format);
669        SstWriteRequest {
670            op_type: OperationType::Flush,
671            metadata: version.metadata.clone(),
672            source,
673            cache_manager: self.cache_manager.clone(),
674            storage: version.options.storage.clone(),
675            max_sequence: Some(max_sequence),
676            sst_write_format: if flat_format {
677                FormatType::Flat
678            } else {
679                FormatType::PrimaryKey
680            },
681            index_options: self.index_options.clone(),
682            index_config: self.engine_config.index.clone(),
683            inverted_index_config: self.engine_config.inverted_index.clone(),
684            fulltext_index_config: self.engine_config.fulltext_index.clone(),
685            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
686            #[cfg(feature = "vector_index")]
687            vector_index_config: self.engine_config.vector_index.clone(),
688        }
689    }
690
691    /// Notify flush job status.
692    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
693        if let Err(e) = self
694            .request_sender
695            .send(WorkerRequestWithTime::new(request))
696            .await
697        {
698            error!(
699                "Failed to notify flush job status for region {}, request: {:?}",
700                self.region_id, e.0
701            );
702        }
703    }
704
705    /// Merge two flush tasks.
706    fn merge(&mut self, mut other: RegionFlushTask) {
707        assert_eq!(self.region_id, other.region_id);
708        // Now we only merge senders. They share the same flush reason.
709        self.senders.append(&mut other.senders);
710    }
711}
712
713struct FlushFlatMemResult {
714    num_encoded: usize,
715    num_sources: usize,
716    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
717}
718
719struct DoFlushMemtablesResult {
720    file_metas: Vec<FileMeta>,
721    flushed_bytes: u64,
722    series_count: usize,
723    encoded_part_count: usize,
724    flush_metrics: Metrics,
725}
726
727struct FlatSources {
728    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
729    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
730}
731
732/// Returns the max sequence and [FlatSource] for the given memtable.
733fn memtable_flat_sources(
734    schema: SchemaRef,
735    mem_ranges: MemtableRanges,
736    options: &RegionOptions,
737    field_column_start: usize,
738) -> Result<FlatSources> {
739    let MemtableRanges { ranges } = mem_ranges;
740    let mut flat_sources = FlatSources {
741        sources: SmallVec::new(),
742        encoded: SmallVec::new(),
743    };
744
745    if ranges.len() == 1 {
746        debug!("Flushing single flat range");
747
748        let only_range = ranges.into_values().next().unwrap();
749        let max_sequence = only_range.stats().max_sequence();
750        if let Some(encoded) = only_range.encoded() {
751            flat_sources.encoded.push((encoded, max_sequence));
752        } else {
753            let iter = only_range.build_record_batch_iter(None, None)?;
754            // Dedup according to append mode and merge mode.
755            // Even single range may have duplicate rows.
756            let iter = maybe_dedup_one(
757                options.append_mode,
758                options.merge_mode(),
759                field_column_start,
760                iter,
761            );
762            flat_sources
763                .sources
764                .push((FlatSource::Iter(iter), max_sequence));
765        };
766    } else {
767        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
768        // Calculate total rows from non-encoded ranges.
769        let total_rows: usize = ranges
770            .values()
771            .filter(|r| r.encoded().is_none())
772            .map(|r| r.num_rows())
773            .sum();
774        debug!(
775            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
776            total_rows,
777            min_flush_rows,
778            ranges.len()
779        );
780        let mut rows_remaining = total_rows;
781        let mut last_iter_rows = 0;
782        let num_ranges = ranges.len();
783        let mut input_iters = Vec::with_capacity(num_ranges);
784        let mut current_ranges = Vec::new();
785        for (_range_id, range) in ranges {
786            if let Some(encoded) = range.encoded() {
787                let max_sequence = range.stats().max_sequence();
788                flat_sources.encoded.push((encoded, max_sequence));
789                continue;
790            }
791
792            let iter = range.build_record_batch_iter(None, None)?;
793            input_iters.push(iter);
794            let range_rows = range.num_rows();
795            last_iter_rows += range_rows;
796            rows_remaining -= range_rows;
797            current_ranges.push(range);
798
799            // Flush if we have enough rows, but don't flush if the remaining rows
800            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
801            if last_iter_rows >= min_flush_rows
802                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
803            {
804                debug!(
805                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
806                    last_iter_rows,
807                    min_flush_rows,
808                    input_iters.len(),
809                    rows_remaining
810                );
811
812                // Calculate max_sequence from all merged ranges
813                let max_sequence = current_ranges
814                    .iter()
815                    .map(|r| r.stats().max_sequence())
816                    .max()
817                    .unwrap_or(0);
818
819                let maybe_dedup = merge_and_dedup(
820                    &schema,
821                    options.append_mode,
822                    options.merge_mode(),
823                    field_column_start,
824                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
825                )?;
826
827                flat_sources
828                    .sources
829                    .push((FlatSource::Iter(maybe_dedup), max_sequence));
830                last_iter_rows = 0;
831                current_ranges.clear();
832            }
833        }
834
835        // Handle remaining iters.
836        if !input_iters.is_empty() {
837            debug!(
838                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
839                last_iter_rows,
840                min_flush_rows,
841                input_iters.len(),
842                rows_remaining
843            );
844            let max_sequence = current_ranges
845                .iter()
846                .map(|r| r.stats().max_sequence())
847                .max()
848                .unwrap_or(0);
849
850            let maybe_dedup = merge_and_dedup(
851                &schema,
852                options.append_mode,
853                options.merge_mode(),
854                field_column_start,
855                input_iters,
856            )?;
857
858            flat_sources
859                .sources
860                .push((FlatSource::Iter(maybe_dedup), max_sequence));
861        }
862    }
863
864    Ok(flat_sources)
865}
866
867/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
868///
869/// This function is used during the flush process to combine data from multiple memtable ranges
870/// into a single stream while handling duplicate records according to the configured merge strategy.
871///
872/// # Arguments
873///
874/// * `schema` - The Arrow schema reference that defines the structure of the record batches
875/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
876///                  This is used for append-only workloads where duplicate handling is not required.
877/// * `merge_mode` - The strategy used for deduplication when not in append mode:
878///   - `MergeMode::LastRow`: Keeps the last record for each primary key
879///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
880/// * `field_column_start` - The starting column index for fields in the record batch.
881///                          Used when `MergeMode::LastNonNull` to identify which columns
882///                          contain field values versus primary key columns.
883/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
884///
885/// # Returns
886///
887/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
888/// record batches.
889///
890/// # Behavior
891///
892/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
893///    primary key and timestamp
894/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
895/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
896///    applies the specified merge mode:
897///    - `LastRow`: Removes duplicate rows, keeping only the last one
898///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
899///
900/// # Examples
901///
902/// ```ignore
903/// let merged_iter = merge_and_dedup(
904///     &schema,
905///     false,  // not append mode, apply dedup
906///     MergeMode::LastRow,
907///     2,  // fields start at column 2 after primary key columns
908///     vec![iter1, iter2, iter3],
909/// )?;
910/// ```
911pub fn merge_and_dedup(
912    schema: &SchemaRef,
913    append_mode: bool,
914    merge_mode: MergeMode,
915    field_column_start: usize,
916    input_iters: Vec<BoxedRecordBatchIterator>,
917) -> Result<BoxedRecordBatchIterator> {
918    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
919    let maybe_dedup = if append_mode {
920        // No dedup in append mode
921        Box::new(merge_iter) as _
922    } else {
923        // Dedup according to merge mode.
924        match merge_mode {
925            MergeMode::LastRow => {
926                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
927            }
928            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
929                merge_iter,
930                FlatLastNonNull::new(field_column_start, false),
931            )) as _,
932        }
933    };
934    Ok(maybe_dedup)
935}
936
937pub fn maybe_dedup_one(
938    append_mode: bool,
939    merge_mode: MergeMode,
940    field_column_start: usize,
941    input_iter: BoxedRecordBatchIterator,
942) -> BoxedRecordBatchIterator {
943    if append_mode {
944        // No dedup in append mode
945        input_iter
946    } else {
947        // Dedup according to merge mode.
948        match merge_mode {
949            MergeMode::LastRow => {
950                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
951            }
952            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
953                input_iter,
954                FlatLastNonNull::new(field_column_start, false),
955            )),
956        }
957    }
958}
959
960/// Manages background flushes of a worker.
961pub(crate) struct FlushScheduler {
962    /// Tracks regions need to flush.
963    region_status: HashMap<RegionId, FlushStatus>,
964    /// Background job scheduler.
965    scheduler: SchedulerRef,
966}
967
968impl FlushScheduler {
969    /// Creates a new flush scheduler.
970    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
971        FlushScheduler {
972            region_status: HashMap::new(),
973            scheduler,
974        }
975    }
976
977    /// Returns true if the region already requested flush.
978    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
979        self.region_status.contains_key(&region_id)
980    }
981
982    fn schedule_flush_task(
983        &mut self,
984        version_control: &VersionControlRef,
985        task: RegionFlushTask,
986    ) -> Result<()> {
987        let region_id = task.region_id;
988
989        // If current region doesn't have flush status, we can flush the region directly.
990        if let Err(e) = version_control.freeze_mutable() {
991            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
992
993            return Err(e);
994        }
995        // Submit a flush job.
996        let job = task.into_flush_job(version_control);
997        if let Err(e) = self.scheduler.schedule(job) {
998            // If scheduler returns error, senders in the job will be dropped and waiters
999            // can get recv errors.
1000            error!(e; "Failed to schedule flush job for region {}", region_id);
1001
1002            return Err(e);
1003        }
1004        Ok(())
1005    }
1006
1007    /// Schedules a flush `task` for specific `region`.
1008    pub(crate) fn schedule_flush(
1009        &mut self,
1010        region_id: RegionId,
1011        version_control: &VersionControlRef,
1012        task: RegionFlushTask,
1013    ) -> Result<()> {
1014        debug_assert_eq!(region_id, task.region_id);
1015
1016        let version = version_control.current().version;
1017        if version.memtables.is_empty() {
1018            debug_assert!(!self.region_status.contains_key(&region_id));
1019            // The region has nothing to flush.
1020            task.on_success();
1021            return Ok(());
1022        }
1023
1024        // Don't increase the counter if a region has nothing to flush.
1025        FLUSH_REQUESTS_TOTAL
1026            .with_label_values(&[task.reason.as_str()])
1027            .inc();
1028
1029        // If current region has flush status, merge the task.
1030        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1031            // Checks whether we can flush the region now.
1032            debug!("Merging flush task for region {}", region_id);
1033            flush_status.merge_task(task);
1034            return Ok(());
1035        }
1036
1037        self.schedule_flush_task(version_control, task)?;
1038
1039        // Add this region to status map.
1040        let _ = self.region_status.insert(
1041            region_id,
1042            FlushStatus::new(region_id, version_control.clone()),
1043        );
1044
1045        Ok(())
1046    }
1047
1048    /// Notifies the scheduler that the flush job is finished.
1049    ///
1050    /// Returns all pending requests if the region doesn't need to flush again.
1051    pub(crate) fn on_flush_success(
1052        &mut self,
1053        region_id: RegionId,
1054    ) -> Option<(
1055        Vec<SenderDdlRequest>,
1056        Vec<SenderWriteRequest>,
1057        Vec<SenderBulkRequest>,
1058    )> {
1059        let flush_status = self.region_status.get_mut(&region_id)?;
1060        // If region doesn't have any pending flush task, we need to remove it from the status.
1061        if flush_status.pending_task.is_none() {
1062            // The region doesn't have any pending flush task.
1063            // Safety: The flush status must exist.
1064            debug!(
1065                "Region {} doesn't have any pending flush task, removing it from the status",
1066                region_id
1067            );
1068            let flush_status = self.region_status.remove(&region_id).unwrap();
1069            return Some((
1070                flush_status.pending_ddls,
1071                flush_status.pending_writes,
1072                flush_status.pending_bulk_writes,
1073            ));
1074        }
1075
1076        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1077        let version_data = flush_status.version_control.current();
1078        if version_data.version.memtables.is_empty() {
1079            // The region has nothing to flush, we also need to remove it from the status.
1080            // Safety: The pending task is not None.
1081            let task = flush_status.pending_task.take().unwrap();
1082            // The region has nothing to flush. We can notify pending task.
1083            task.on_success();
1084            debug!(
1085                "Region {} has nothing to flush, removing it from the status",
1086                region_id
1087            );
1088            // Safety: The flush status must exist.
1089            let flush_status = self.region_status.remove(&region_id).unwrap();
1090            return Some((
1091                flush_status.pending_ddls,
1092                flush_status.pending_writes,
1093                flush_status.pending_bulk_writes,
1094            ));
1095        }
1096
1097        // If region has pending task and has something to flush, we need to schedule it.
1098        debug!("Scheduling pending flush task for region {}", region_id);
1099        // Safety: The flush status must exist.
1100        let task = flush_status.pending_task.take().unwrap();
1101        let version_control = flush_status.version_control.clone();
1102        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1103            error!(
1104                err;
1105                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1106            );
1107        }
1108        // We can flush the region again, keep it in the region status.
1109        None
1110    }
1111
1112    /// Notifies the scheduler that the flush job is failed.
1113    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1114        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1115
1116        FLUSH_FAILURE_TOTAL.inc();
1117
1118        // Remove this region.
1119        let Some(flush_status) = self.region_status.remove(&region_id) else {
1120            return;
1121        };
1122
1123        // Fast fail: cancels all pending tasks and sends error to their waiters.
1124        flush_status.on_failure(err);
1125    }
1126
1127    /// Notifies the scheduler that the region is dropped.
1128    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1129        self.remove_region_on_failure(
1130            region_id,
1131            Arc::new(RegionDroppedSnafu { region_id }.build()),
1132        );
1133    }
1134
1135    /// Notifies the scheduler that the region is closed.
1136    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1137        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1138    }
1139
1140    /// Notifies the scheduler that the region is truncated.
1141    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1142        self.remove_region_on_failure(
1143            region_id,
1144            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1145        );
1146    }
1147
1148    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1149        // Remove this region.
1150        let Some(flush_status) = self.region_status.remove(&region_id) else {
1151            return;
1152        };
1153
1154        // Notifies all pending tasks.
1155        flush_status.on_failure(err);
1156    }
1157
1158    /// Add ddl request to pending queue.
1159    ///
1160    /// # Panics
1161    /// Panics if region didn't request flush.
1162    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1163        let status = self.region_status.get_mut(&request.region_id).unwrap();
1164        status.pending_ddls.push(request);
1165    }
1166
1167    /// Add write request to pending queue.
1168    ///
1169    /// # Panics
1170    /// Panics if region didn't request flush.
1171    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1172        let status = self
1173            .region_status
1174            .get_mut(&request.request.region_id)
1175            .unwrap();
1176        status.pending_writes.push(request);
1177    }
1178
1179    /// Add bulk write request to pending queue.
1180    ///
1181    /// # Panics
1182    /// Panics if region didn't request flush.
1183    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1184        let status = self.region_status.get_mut(&request.region_id).unwrap();
1185        status.pending_bulk_writes.push(request);
1186    }
1187
1188    /// Returns true if the region has pending DDLs.
1189    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1190        self.region_status
1191            .get(&region_id)
1192            .map(|status| !status.pending_ddls.is_empty())
1193            .unwrap_or(false)
1194    }
1195}
1196
1197impl Drop for FlushScheduler {
1198    fn drop(&mut self) {
1199        for (region_id, flush_status) in self.region_status.drain() {
1200            // We are shutting down so notify all pending tasks.
1201            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1202        }
1203    }
1204}
1205
1206/// Flush status of a region scheduled by the [FlushScheduler].
1207///
1208/// Tracks running and pending flush tasks and all pending requests of a region.
1209struct FlushStatus {
1210    /// Current region.
1211    region_id: RegionId,
1212    /// Version control of the region.
1213    version_control: VersionControlRef,
1214    /// Task waiting for next flush.
1215    pending_task: Option<RegionFlushTask>,
1216    /// Pending ddl requests.
1217    pending_ddls: Vec<SenderDdlRequest>,
1218    /// Requests waiting to write after altering the region.
1219    pending_writes: Vec<SenderWriteRequest>,
1220    /// Bulk requests waiting to write after altering the region.
1221    pending_bulk_writes: Vec<SenderBulkRequest>,
1222}
1223
1224impl FlushStatus {
1225    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1226        FlushStatus {
1227            region_id,
1228            version_control,
1229            pending_task: None,
1230            pending_ddls: Vec::new(),
1231            pending_writes: Vec::new(),
1232            pending_bulk_writes: Vec::new(),
1233        }
1234    }
1235
1236    /// Merges the task to pending task.
1237    fn merge_task(&mut self, task: RegionFlushTask) {
1238        if let Some(pending) = &mut self.pending_task {
1239            pending.merge(task);
1240        } else {
1241            self.pending_task = Some(task);
1242        }
1243    }
1244
1245    fn on_failure(self, err: Arc<Error>) {
1246        if let Some(mut task) = self.pending_task {
1247            task.on_failure(err.clone());
1248        }
1249        for ddl in self.pending_ddls {
1250            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1251                region_id: self.region_id,
1252            }));
1253        }
1254        for write_req in self.pending_writes {
1255            write_req
1256                .sender
1257                .send(Err(err.clone()).context(FlushRegionSnafu {
1258                    region_id: self.region_id,
1259                }));
1260        }
1261    }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266    use mito_codec::row_converter::build_primary_key_codec;
1267    use tokio::sync::oneshot;
1268
1269    use super::*;
1270    use crate::cache::CacheManager;
1271    use crate::memtable::bulk::part::BulkPartConverter;
1272    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1273    use crate::memtable::{Memtable, RangesOptions};
1274    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1275    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1276    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1277    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1278
1279    #[test]
1280    fn test_get_mutable_limit() {
1281        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1282        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1283        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1284        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1285    }
1286
1287    #[test]
1288    fn test_over_mutable_limit() {
1289        // Mutable limit is 500.
1290        let manager = WriteBufferManagerImpl::new(1000);
1291        manager.reserve_mem(400);
1292        assert!(!manager.should_flush_engine());
1293        assert!(!manager.should_stall());
1294
1295        // More than mutable limit.
1296        manager.reserve_mem(400);
1297        assert!(manager.should_flush_engine());
1298
1299        // Freezes mutable.
1300        manager.schedule_free_mem(400);
1301        assert!(!manager.should_flush_engine());
1302        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1303        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1304
1305        // Releases immutable.
1306        manager.free_mem(400);
1307        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1308        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1309    }
1310
1311    #[test]
1312    fn test_over_global() {
1313        // Mutable limit is 500.
1314        let manager = WriteBufferManagerImpl::new(1000);
1315        manager.reserve_mem(1100);
1316        assert!(manager.should_stall());
1317        // Global usage is still 1100.
1318        manager.schedule_free_mem(200);
1319        assert!(manager.should_flush_engine());
1320        assert!(manager.should_stall());
1321
1322        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1323        manager.schedule_free_mem(450);
1324        assert!(manager.should_flush_engine());
1325        assert!(manager.should_stall());
1326
1327        // Now mutable is enough.
1328        manager.reserve_mem(50);
1329        assert!(manager.should_flush_engine());
1330        manager.reserve_mem(100);
1331        assert!(manager.should_flush_engine());
1332    }
1333
1334    #[test]
1335    fn test_manager_notify() {
1336        let (sender, receiver) = watch::channel(());
1337        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1338        manager.reserve_mem(500);
1339        assert!(!receiver.has_changed().unwrap());
1340        manager.schedule_free_mem(500);
1341        assert!(!receiver.has_changed().unwrap());
1342        manager.free_mem(500);
1343        assert!(receiver.has_changed().unwrap());
1344    }
1345
1346    #[tokio::test]
1347    async fn test_schedule_empty() {
1348        let env = SchedulerEnv::new().await;
1349        let (tx, _rx) = mpsc::channel(4);
1350        let mut scheduler = env.mock_flush_scheduler();
1351        let builder = VersionControlBuilder::new();
1352
1353        let version_control = Arc::new(builder.build());
1354        let (output_tx, output_rx) = oneshot::channel();
1355        let mut task = RegionFlushTask {
1356            region_id: builder.region_id(),
1357            reason: FlushReason::Others,
1358            senders: Vec::new(),
1359            request_sender: tx,
1360            access_layer: env.access_layer.clone(),
1361            listener: WorkerListener::default(),
1362            engine_config: Arc::new(MitoConfig::default()),
1363            row_group_size: None,
1364            cache_manager: Arc::new(CacheManager::default()),
1365            manifest_ctx: env
1366                .mock_manifest_context(version_control.current().version.metadata.clone())
1367                .await,
1368            index_options: IndexOptions::default(),
1369            flush_semaphore: Arc::new(Semaphore::new(2)),
1370            is_staging: false,
1371            partition_expr: None,
1372        };
1373        task.push_sender(OptionOutputTx::from(output_tx));
1374        scheduler
1375            .schedule_flush(builder.region_id(), &version_control, task)
1376            .unwrap();
1377        assert!(scheduler.region_status.is_empty());
1378        let output = output_rx.await.unwrap().unwrap();
1379        assert_eq!(output, 0);
1380        assert!(scheduler.region_status.is_empty());
1381    }
1382
1383    #[tokio::test]
1384    async fn test_schedule_pending_request() {
1385        let job_scheduler = Arc::new(VecScheduler::default());
1386        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1387        let (tx, _rx) = mpsc::channel(4);
1388        let mut scheduler = env.mock_flush_scheduler();
1389        let mut builder = VersionControlBuilder::new();
1390        // Overwrites the empty memtable builder.
1391        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1392        let version_control = Arc::new(builder.build());
1393        // Writes data to the memtable so it is not empty.
1394        let version_data = version_control.current();
1395        write_rows_to_version(&version_data.version, "host0", 0, 10);
1396        let manifest_ctx = env
1397            .mock_manifest_context(version_data.version.metadata.clone())
1398            .await;
1399        // Creates 3 tasks.
1400        let mut tasks: Vec<_> = (0..3)
1401            .map(|_| RegionFlushTask {
1402                region_id: builder.region_id(),
1403                reason: FlushReason::Others,
1404                senders: Vec::new(),
1405                request_sender: tx.clone(),
1406                access_layer: env.access_layer.clone(),
1407                listener: WorkerListener::default(),
1408                engine_config: Arc::new(MitoConfig::default()),
1409                row_group_size: None,
1410                cache_manager: Arc::new(CacheManager::default()),
1411                manifest_ctx: manifest_ctx.clone(),
1412                index_options: IndexOptions::default(),
1413                flush_semaphore: Arc::new(Semaphore::new(2)),
1414                is_staging: false,
1415                partition_expr: None,
1416            })
1417            .collect();
1418        // Schedule first task.
1419        let task = tasks.pop().unwrap();
1420        scheduler
1421            .schedule_flush(builder.region_id(), &version_control, task)
1422            .unwrap();
1423        // Should schedule 1 flush.
1424        assert_eq!(1, scheduler.region_status.len());
1425        assert_eq!(1, job_scheduler.num_jobs());
1426        // Check the new version.
1427        let version_data = version_control.current();
1428        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1429        // Schedule remaining tasks.
1430        let output_rxs: Vec<_> = tasks
1431            .into_iter()
1432            .map(|mut task| {
1433                let (output_tx, output_rx) = oneshot::channel();
1434                task.push_sender(OptionOutputTx::from(output_tx));
1435                scheduler
1436                    .schedule_flush(builder.region_id(), &version_control, task)
1437                    .unwrap();
1438                output_rx
1439            })
1440            .collect();
1441        // Assumes the flush job is finished.
1442        version_control.apply_edit(
1443            Some(RegionEdit {
1444                files_to_add: Vec::new(),
1445                files_to_remove: Vec::new(),
1446                timestamp_ms: None,
1447                compaction_time_window: None,
1448                flushed_entry_id: None,
1449                flushed_sequence: None,
1450                committed_sequence: None,
1451            }),
1452            &[0],
1453            builder.file_purger(),
1454        );
1455        scheduler.on_flush_success(builder.region_id());
1456        // No new flush task.
1457        assert_eq!(1, job_scheduler.num_jobs());
1458        // The flush status is cleared.
1459        assert!(scheduler.region_status.is_empty());
1460        for output_rx in output_rxs {
1461            let output = output_rx.await.unwrap().unwrap();
1462            assert_eq!(output, 0);
1463        }
1464    }
1465
1466    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1467    #[test]
1468    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1469        // Build test metadata and flat schema
1470        let metadata = metadata_for_test();
1471        let schema = to_flat_sst_arrow_schema(
1472            &metadata,
1473            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1474        );
1475
1476        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1477        // Two rows with identical keys and timestamps (ts = 1000), different field values
1478        let capacity = 16;
1479        let pk_codec = build_primary_key_codec(&metadata);
1480        let mut converter =
1481            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1482        let kvs = build_key_values_with_ts_seq_values(
1483            &metadata,
1484            "dup_key".to_string(),
1485            1,
1486            vec![1000i64, 1000i64].into_iter(),
1487            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1488            1,
1489        );
1490        converter.append_key_values(&kvs).unwrap();
1491        let part = converter.convert().unwrap();
1492
1493        // Helper to build MemtableRanges with a single range from one bulk part.
1494        // We use BulkMemtable directly because it produces record batch iterators.
1495        let build_ranges = |append_mode: bool| -> MemtableRanges {
1496            let memtable = crate::memtable::bulk::BulkMemtable::new(
1497                1,
1498                crate::memtable::bulk::BulkMemtableConfig::default(),
1499                metadata.clone(),
1500                None,
1501                None,
1502                append_mode,
1503                MergeMode::LastRow,
1504            );
1505            memtable.write_bulk(part.clone()).unwrap();
1506            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1507        };
1508
1509        // Case 1: append_mode = false => dedup happens, total rows should be 1
1510        {
1511            let mem_ranges = build_ranges(false);
1512            assert_eq!(1, mem_ranges.ranges.len());
1513
1514            let options = RegionOptions {
1515                append_mode: false,
1516                merge_mode: Some(MergeMode::LastRow),
1517                ..Default::default()
1518            };
1519
1520            let flat_sources = memtable_flat_sources(
1521                schema.clone(),
1522                mem_ranges,
1523                &options,
1524                metadata.primary_key.len(),
1525            )
1526            .unwrap();
1527            assert!(flat_sources.encoded.is_empty());
1528            assert_eq!(1, flat_sources.sources.len());
1529
1530            // Consume the iterator and count rows
1531            let mut total_rows = 0usize;
1532            for (source, _sequence) in flat_sources.sources {
1533                match source {
1534                    crate::read::FlatSource::Iter(iter) => {
1535                        for rb in iter {
1536                            total_rows += rb.unwrap().num_rows();
1537                        }
1538                    }
1539                    crate::read::FlatSource::Stream(_) => unreachable!(),
1540                }
1541            }
1542            assert_eq!(1, total_rows, "dedup should keep a single row");
1543        }
1544
1545        // Case 2: append_mode = true => no dedup, total rows should be 2
1546        {
1547            let mem_ranges = build_ranges(true);
1548            assert_eq!(1, mem_ranges.ranges.len());
1549
1550            let options = RegionOptions {
1551                append_mode: true,
1552                ..Default::default()
1553            };
1554
1555            let flat_sources =
1556                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1557                    .unwrap();
1558            assert!(flat_sources.encoded.is_empty());
1559            assert_eq!(1, flat_sources.sources.len());
1560
1561            let mut total_rows = 0usize;
1562            for (source, _sequence) in flat_sources.sources {
1563                match source {
1564                    crate::read::FlatSource::Iter(iter) => {
1565                        for rb in iter {
1566                            total_rows += rb.unwrap().num_rows();
1567                        }
1568                    }
1569                    crate::read::FlatSource::Stream(_) => unreachable!(),
1570                }
1571            }
1572            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1573        }
1574    }
1575
1576    #[tokio::test]
1577    async fn test_schedule_pending_request_on_flush_success() {
1578        common_telemetry::init_default_ut_logging();
1579        let job_scheduler = Arc::new(VecScheduler::default());
1580        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1581        let (tx, _rx) = mpsc::channel(4);
1582        let mut scheduler = env.mock_flush_scheduler();
1583        let mut builder = VersionControlBuilder::new();
1584        // Overwrites the empty memtable builder.
1585        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1586        let version_control = Arc::new(builder.build());
1587        // Writes data to the memtable so it is not empty.
1588        let version_data = version_control.current();
1589        write_rows_to_version(&version_data.version, "host0", 0, 10);
1590        let manifest_ctx = env
1591            .mock_manifest_context(version_data.version.metadata.clone())
1592            .await;
1593        // Creates 2 tasks.
1594        let mut tasks: Vec<_> = (0..2)
1595            .map(|_| RegionFlushTask {
1596                region_id: builder.region_id(),
1597                reason: FlushReason::Others,
1598                senders: Vec::new(),
1599                request_sender: tx.clone(),
1600                access_layer: env.access_layer.clone(),
1601                listener: WorkerListener::default(),
1602                engine_config: Arc::new(MitoConfig::default()),
1603                row_group_size: None,
1604                cache_manager: Arc::new(CacheManager::default()),
1605                manifest_ctx: manifest_ctx.clone(),
1606                index_options: IndexOptions::default(),
1607                flush_semaphore: Arc::new(Semaphore::new(2)),
1608                is_staging: false,
1609                partition_expr: None,
1610            })
1611            .collect();
1612        // Schedule first task.
1613        let task = tasks.pop().unwrap();
1614        scheduler
1615            .schedule_flush(builder.region_id(), &version_control, task)
1616            .unwrap();
1617        // Should schedule 1 flush.
1618        assert_eq!(1, scheduler.region_status.len());
1619        assert_eq!(1, job_scheduler.num_jobs());
1620        // Schedule second task.
1621        let task = tasks.pop().unwrap();
1622        scheduler
1623            .schedule_flush(builder.region_id(), &version_control, task)
1624            .unwrap();
1625        assert!(
1626            scheduler
1627                .region_status
1628                .get(&builder.region_id())
1629                .unwrap()
1630                .pending_task
1631                .is_some()
1632        );
1633
1634        // Check the new version.
1635        let version_data = version_control.current();
1636        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1637        // Assumes the flush job is finished.
1638        version_control.apply_edit(
1639            Some(RegionEdit {
1640                files_to_add: Vec::new(),
1641                files_to_remove: Vec::new(),
1642                timestamp_ms: None,
1643                compaction_time_window: None,
1644                flushed_entry_id: None,
1645                flushed_sequence: None,
1646                committed_sequence: None,
1647            }),
1648            &[0],
1649            builder.file_purger(),
1650        );
1651        write_rows_to_version(&version_data.version, "host1", 0, 10);
1652        scheduler.on_flush_success(builder.region_id());
1653        assert_eq!(2, job_scheduler.num_jobs());
1654        // The pending task is cleared.
1655        assert!(
1656            scheduler
1657                .region_status
1658                .get(&builder.region_id())
1659                .unwrap()
1660                .pending_task
1661                .is_none()
1662        );
1663    }
1664}